Source code for gpm.retrievals.retrieval_1b_c_pmw

# -----------------------------------------------------------------------------.
# MIT License

# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# -----------------------------------------------------------------------------.
"""This module contains GPM PMW 1B and 1C products community-based retrievals."""
import numpy as np
import pandas as pd
import xarray as xr

from gpm.checks import check_is_spatial_2d
from gpm.utils.decorators import check_software_availability
from gpm.utils.pmw import (
    PMWFrequency,
    create_rgb_composite,
    get_available_pct_features,
    get_available_pd_features,
    get_available_pr_features,
    get_pct,
    get_pd,
    get_pmw_rgb_receipts,
    get_pr,
)
from gpm.utils.xarray import (
    get_default_variable,
    get_xarray_variable,
)


[docs] def retrieve_rgb_composites(ds): """Retrieve the available PMW RGB composites.""" # Retrieve sensor if "InstrumentName" in ds.attrs: sensor = ds.attrs["InstrumentName"] elif "instrument" in ds.attrs: # compatibility with TC PRIMED sensor = ds.attrs["instrument"] else: raise ValueError("Impossible to determine instrument name.") # Retrieve defined RGB receipts receipts = get_pmw_rgb_receipts(sensor) # Compute RGB composites list_ds_rgb = [] for name, receipt in receipts.items(): try: da_rgb = create_rgb_composite(ds, receipt=receipt).to_dataset(name=name) list_ds_rgb.append(da_rgb) except Exception as e: print(f"RGB Composite {name} not available: {e!s}") ds_rgb = xr.merge(list_ds_rgb, compat="override", join="outer", combine_attrs="override") return ds_rgb
[docs] def retrieve_polarization_corrected_temperature(xr_obj, variable=None): """Retrieve PMW Polarization-Corrected Temperature (PCT). Coefficients are taken from Cecil et al., 2018. References ---------- Cecil, D. J., and T. Chronis, 2018. Polarization-Corrected Temperatures for 10-, 19-, 37-, and 89-GHz Passive Microwave Frequencies. J. Appl. Meteor. Climatol., 57, 2249-2265, https://doi.org/10.1175/JAMC-D-18-0022.1. """ # Retrieve DataArray with brightness temperatures if variable is None and isinstance(xr_obj, xr.Dataset): variable = get_default_variable(xr_obj, possible_variables=["Tb", "Tc"]) da = get_xarray_variable(xr_obj, variable=variable) # If no combo, raise error pct_features = get_available_pct_features(da) if len(pct_features) == 0: pmw_frequencies = [PMWFrequency.from_string(freq) for freq in da["pmw_frequency"].data] pmw_frequencies = [freq.title() for freq in pmw_frequencies] raise ValueError(f"Impossible to compute polarized corrected temperature with channels: {pmw_frequencies}.") # Create PCTs dataset dict_pct = {name: get_pct(da, name=name).rename(name) for name in pct_features} ds_pct = xr.merge(dict_pct.values(), compat="minimal") return ds_pct
[docs] def retrieve_polarization_difference(xr_obj, variable=None): """Retrieve PMW Channels Polarized Difference (PD).""" # Retrieve DataArray with brightness temperatures if variable is None and isinstance(xr_obj, xr.Dataset): variable = get_default_variable(xr_obj, possible_variables=["Tb", "Tc"]) da = get_xarray_variable(xr_obj, variable=variable) # If no combo, raise error pd_features = get_available_pd_features(da) if len(pd_features) == 0: pmw_frequencies = [PMWFrequency.from_string(freq) for freq in da["pmw_frequency"].data] pmw_frequencies = [freq.title() for freq in pmw_frequencies] raise ValueError(f"Impossible to compute polarized difference with channels: {pmw_frequencies}. No pairs.") # Create PDs dataset dict_pd = {name: get_pd(da, name=name).rename(name) for name in pd_features} ds_pd = xr.merge(dict_pd.values(), compat="minimal", join="outer", combine_attrs="override") return ds_pd
[docs] def retrieve_polarization_ratio(xr_obj, variable=None): """Retrieve PMW Channels Polarization Ratio (PR).""" # Retrieve DataArray with brightness temperatures if variable is None and isinstance(xr_obj, xr.Dataset): variable = get_default_variable(xr_obj, possible_variables=["Tb", "Tc"]) da = get_xarray_variable(xr_obj, variable=variable) # Retrieve polarized frequencies couples pr_features = get_available_pr_features(da) # If no combo, raise error if len(pr_features) == 0: pmw_frequencies = [PMWFrequency.from_string(freq) for freq in da["pmw_frequency"].data] pmw_frequencies = [freq.title() for freq in pmw_frequencies] raise ValueError(f"Impossible to compute polarization ratio with channels: {pmw_frequencies}. No pairs.") # Create PDs dataset dict_pr = {name: get_pr(da, name=name).rename(name) for name in pr_features} ds_pr = xr.merge(dict_pr.values(), compat="minimal", join="outer", combine_attrs="override") return ds_pr
#### ALIAS retrieve_PCT = retrieve_polarization_corrected_temperature retrieve_PD = retrieve_polarization_difference retrieve_PR = retrieve_polarization_ratio #### PESCA def _np_pesca_classification(Tc_23V, Tc_37V, Tc_89V, t2m, theta, sensor): # noqa: PLR0911 """ Classify a single pixel based on PESCA algorithm. Parameters ---------- - Tc_23V: Brightness temperature at 23V - Tc_37V: Brightness temperature at 37V - Tc_89V: Brightness temperature at 89V - t2m: 2-meter temperature - theta: viewing angle Returns ------- - Snow class (int): 0 = No snow 1 = Deep Dry Snow 2 = Polar Winter Snow 3 = Perennial Snow 4 = Thin Snow """ # Define thresholds th1 = 280 # Threshold to launch the PESCA classification th2 = 1.01 # Threshold for RLF th3 = 257 - t2m if sensor == "GMI": th4 = (495 - t2m) / 250 # Threshold for Perennial Snow th5 = 5 # SI threshold for Thin Snow elif sensor == "ATMS": th4 = (465 - t2m) / 225 # Threshold for Perennial Snow th5 = 3 / np.cos(np.deg2rad(theta)) # SI threshold for Thin Snow else: return -1 # Exclude pixels with T2m > 280 K if t2m > th1: return 0 # No snow # Compute derived metrics RLF = Tc_23V / Tc_37V # ratio low frequency SI = Tc_23V - Tc_89V # scattering index # Test 2: RLF > th2 if th2 < RLF: # Test 3: Differentiate Deep Dry Snow and Polar Winter Snow if th3 < SI: return 1 # Deep Dry Snow return 2 # Polar Winter Snow # Test 4: Perennial Snow if Tc_23V / t2m < th4: return 3 # Perennial Snow # Test 5: Thin Snow if th5 < SI: return 4 # Thin Snow # Default: No snow return 0
[docs] def retrieve_PESCA(ds, t2m="t2m"): """Retrieve PESCA snow-classification.""" # Retrieve sensor sensor = ds.attrs["InstrumentName"] # Retrieve viewing angle da_theta = get_xarray_variable(ds, variable="incidenceAngle") # TODO retrieve viewing angle # Retrieve surface temperature da_t2m = get_xarray_variable(ds, variable=t2m) # Unstack Tb ds_tb = ds["Tc"].gpm.unstack_dimension(dim="pmw_frequency", suffix="_") # Retrieve Tb channels and incidence angle if sensor == "GMI": da_t23 = ds_tb["Tc_23V"] da_t37 = ds_tb["Tc_37V"] da_t89 = ds_tb["Tc_89V"] elif sensor == "ATMS": da_t23 = ds_tb["Tc_23.8QV"] da_t37 = ds_tb["Tc_31.4QV"] da_t89 = ds_tb["Tc_88.2QV"] else: raise NotImplementedError("PESCA not yet implemented for {sensor} sensor.") # Apply the function to the dataset using apply_ufunc kwargs = {"sensor": sensor} da_pesca = xr.apply_ufunc( _np_pesca_classification, da_t23, # Tc_23V da_t37, # Tc_37V da_t89, # Tc_89V da_t2m, # T2m da_theta, kwargs=kwargs, vectorize=True, # Allow the function to apply element-wise dask="parallelized", # Enable parallel computation with Dask output_dtypes=[np.float32], # Specify output data type ) # Set attributes for the classification output da_pesca.name = "PESCA" da_pesca.attrs["description"] = "Snow classification based on PESCA algorithm" dict_pesca_classes = { 0: "No snow", 1: "Deep Dry Snow", 2: "Polar Winter Snow", 3: "Perennial Snow", 4: "Thin Snow", } da_pesca.attrs["flag_values"] = list(dict_pesca_classes) da_pesca.attrs["flag_meanings"] = list(dict_pesca_classes.values()) return da_pesca
[docs] def retrieve_mwcc_hail_probability(ds): """Compute MWCC-H hail probability from GMI, SSMIS, ATMS and MHS sensors. This function implements the MWCC-H (Microwave Cloud Classification - Hail) method to estimate the probability of hail from microwave radiometer observations. For GMI, ATMS, and SSMIS, the native channel brightness temperatures are first mapped to an *equivalent* MHS 157 GHz V-polarization brightness temperature (TB_157V,eq) using a linear intercalibration plus a piecewise correction factor. For MHS, the native 157V channel is used directly. The hail probability is then computed as a function of TB_157V,eq following the references below. Returns ------- xarray.DataArray Hail probability as a unitless field in the range [0, 1] Notes ----- The algorithm uses: - alpha = 104 - K = alpha / TB_157V,eq - P_hail = 0.9844 * ln(K) + 0.9072 References ---------- Laviola, S., Levizzani, V., Ferraro, R. R., & Beauchamp, J. 2020. Hailstorm Detection by Satellite Microwave Radiometers. Remote Sensing 12(4), 621. https://doi.org/10.3390/rs12040621 Laviola, S., Monte, G., Levizzani, V., Ferraro, R. R., & Beauchamp, J. 2020. A New Method for Hail Detection from the GPM Constellation: A Prospect for a Global Hailstorm Climatology. Remote Sensing, 12(21), 3553. https://doi.org/10.3390/rs12213553 """ from gpm.utils.pmw import get_pmw_channel # Check input valid_sensors = ["GMI", "SSMIS", "ATMS", "MHS"] instrument = ds.attrs.get("InstrumentName", None) if instrument not in ["GMI", "SSMIS", "ATMS", "MHS"]: raise ValueError(f"MWCC Hail Probability method only available for {valid_sensors}") # ------------------------------------------------------. # Map to equivalent MHS 157V TB (K) # - GMI if instrument == "GMI": tb = get_pmw_channel(ds, name="165V") mask = np.isnan(tb) correction_factor = xr.where( tb > 188, -0.9975, xr.where( (tb >= 163) & (tb <= 188), 5.4268, xr.where((tb >= 149) & (tb < 163), -3.8086, xr.where((tb >= 122) & (tb < 149), 4.3922, -3.9719)), ), ) # tb < 122 tb_157_mhs = (tb + 3.2588) / 0.9612 + correction_factor # - ATMS if instrument == "ATMS": tb = get_pmw_channel(ds, name="165.5QH") mask = np.isnan(tb) correction_factor = xr.where( tb > 234, 1.7449, xr.where( (tb >= 216) & (tb <= 234), -1.0071, xr.where((tb >= 195) & (tb < 216), -0.8656, xr.where((tb >= 174) & (tb < 195), 0.2584, -3.4761)), ), ) # tb < 174 tb_157_mhs = (tb + 3.9855) / 1.0178 + correction_factor # - SSMIS if instrument == "SSMIS": tb = get_pmw_channel(ds, name="150H") mask = np.isnan(tb) correction_factor = xr.where( tb > 240, -5.3656, xr.where( (tb >= 195) & (tb <= 240), 3.6134, xr.where((tb >= 180) & (tb < 195), 1.6910, xr.where((tb >= 154) & (tb < 180), -0.4578, -7.3520)), ), ) # tb < 154 tb_157_mhs = (tb + 49.923) / 1.1530 + correction_factor # - MHS if instrument == "MHS": tb_157_mhs = get_pmw_channel(ds, name="157V") mask = np.isnan(tb_157_mhs) # ------------------------------------------------------. # Compute hail probability alpha = 104 Kx = alpha / tb_157_mhs # x = Tb157 of MHS Hx = 0.9844 * np.log(Kx) + 0.9072 Hx = Hx.where(~mask) # set to NaN where Bt input was NaN Hx = Hx.clip(min=0, max=1) # limit to [0, 1] # ------------------------------------------------------. # Add attributes Hx.name = "MWCC-H" Hx.attrs = { "description": "Probability of hail using MWCC-H algorithm", "standard_name": "probability_of_hail", "long_name": "MWCC-H probability of hail", "units": "1", "valid_range": [0.0, 1.0], "algorithm": "MWCC-H (Microwave Cloud Classification - Hail)", "instrument": instrument, } return Hx
[docs] @check_software_availability(software="sklearn", conda_package="scikit-learn") @check_software_availability(software="umap", conda_package="umap-learn") def retrieve_UMAP_RGB(ds, scaler=None, n_neighbors=10, min_dist=0.01, random_state=None, **kwargs): """Create a UMAP RGB composite.""" import umap from sklearn.preprocessing import MinMaxScaler # Check dataset has only spatial 2D variables check_is_spatial_2d(ds) # Define variables variables = list(ds.data_vars) # Convert to dataframe df = ds.gpm.to_pandas_dataframe(drop_index=False) # Retrieve dataset coordinates which are present in the dataframe coordinates = [column for column in list(ds.coords) if column in df] # Remove rows with non finite values df_valid = df[np.isfinite(df[variables]).all(axis=1) & (~np.isnan(df[variables])).all(axis=1)] # Retrieve dataframe with only variables df_data = df_valid[variables] # Define scaler if scaler is not None: scaler.fit(df_data) scaled_data = scaler.transform(df_data) else: scaled_data = df_data # Compute 3D embedding reducer = umap.UMAP(n_neighbors=n_neighbors, min_dist=min_dist, n_components=3, random_state=random_state, **kwargs) embedding = reducer.fit_transform(scaled_data) # Define RGB scaler rgb_scaler = MinMaxScaler() rgb_scaler = rgb_scaler.fit(embedding) # Scale UMAP embedding between 0 and 1 rgb_data = rgb_scaler.transform(embedding) rgb_data = np.clip(rgb_data, a_min=0, a_max=1) # Create RGB dataframe of valid pixels df_rgb_valid = pd.DataFrame(rgb_data, index=df_data.index, columns=["R", "G", "B"]) # Create original RGB dataframe df_rgb = df[coordinates] df_rgb = df_rgb.merge(df_rgb_valid, how="outer", left_index=True, right_index=True) # Convert back to xarray ds_rgb = df_rgb.to_xarray() ds_rgb = ds_rgb.set_coords(coordinates) # Define RGB DataArray da_rgb = ds_rgb[["R", "G", "B"]].to_array(dim="rgb") # Add missing coordinates missing_coords = {coord: ds[coord] for coord in set(ds.coords) - set(da_rgb.coords)} da_rgb = da_rgb.assign_coords(missing_coords) # Return RGB DataArray return da_rgb
[docs] @check_software_availability(software="sklearn", conda_package="scikit-learn") def retrieve_PCA_RGB(ds, scaler=None): """Create a PCA RGB composite.""" from sklearn.decomposition import PCA from sklearn.preprocessing import MinMaxScaler # Check dataset has only spatial 2D variables check_is_spatial_2d(ds) # Define variables variables = list(ds.data_vars) # Convert to dataframe df = ds.gpm.to_pandas_dataframe(drop_index=False) # Retrieve dataset coordinates which are present in the dataframe coordinates = [column for column in list(ds.coords) if column in df] # Remove rows with non finite values df_valid = df[np.isfinite(df[variables]).all(axis=1) & (~np.isnan(df[variables])).all(axis=1)] # Retrieve dataframe with only variables df_data = df_valid[variables] # Define scaler if scaler is not None: scaler.fit(df_data) scaled_data = scaler.transform(df_data) else: scaled_data = df_data # Compute 3D embedding pca = PCA(n_components=3) pca.fit(scaled_data) embedding = pca.transform(scaled_data) # Define RGB scaler rgb_scaler = MinMaxScaler() rgb_scaler = rgb_scaler.fit(embedding) # Scale UMAP embedding between 0 and 1 rgb_data = rgb_scaler.transform(embedding) rgb_data = np.clip(rgb_data, a_min=0, a_max=1) # Create RGB dataframe of valid pixels df_rgb_valid = pd.DataFrame(rgb_data, index=df_data.index, columns=["R", "G", "B"]) # Create original RGB dataframe df_rgb = df[coordinates] df_rgb = df_rgb.merge(df_rgb_valid, how="outer", left_index=True, right_index=True) # Convert back to xarray ds_rgb = df_rgb.to_xarray() ds_rgb = ds_rgb.set_coords(coordinates) # Define RGB DataArray da_rgb = ds_rgb[["R", "G", "B"]].to_array(dim="rgb") # Add missing coordinates missing_coords = {coord: ds[coord] for coord in set(ds.coords) - set(da_rgb.coords)} da_rgb = da_rgb.assign_coords(missing_coords) # Return RGB DataArray return da_rgb
####----------------------------------------------------------------------------------------.