# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module provides PMW utilities."""
import functools
import os
import re
from functools import total_ordering
import numpy as np
import xarray as xr
import gpm
from gpm.io.checks import check_sensor
from gpm.utils.list import flatten_list
from gpm.utils.xarray import (
get_default_variable,
get_xarray_variable,
)
from gpm.utils.yaml import read_yaml
[docs]
@functools.cache
def get_pmw_frequency_dict():
"""Get PMW info dictionary."""
from gpm import _root_path
filepath = os.path.join(_root_path, "gpm", "etc", "pmw", "frequencies.yaml")
return read_yaml(filepath)
[docs]
@functools.cache
def get_pmw_frequency(sensor, scan_mode):
"""Get product info dictionary."""
pmw_dict = get_pmw_frequency_dict()
return pmw_dict[sensor][scan_mode]
[docs]
def available_pmw_frequencies(sensor):
from gpm.io.products import available_sensors
# Check valid sensor
available_pmw_sensors = available_sensors(product_categories="PMW")
if sensor not in available_pmw_sensors:
raise ValueError(f"{sensor} is not a valid PMW sensor. Valid sensors: {available_pmw_sensors}. ")
# Retrieve dictionary with frequencies for each scan mode
pmw_dict = get_pmw_frequency_dict()
sensor_dict = pmw_dict[sensor]
# Retrieve frequencies list (across all scan modes)
frequencies = list(sensor_dict.values())
frequencies = flatten_list(frequencies)
frequencies = np.unique(frequencies).tolist()
return frequencies
####--------------------------------------------------------------------------------.
[docs]
def strip_trailing_zero_decimals(num: float) -> str:
"""Strip trailing zeros from a float, e.g. 183.0 -> '183'."""
s = f"{num}"
if "." in s:
s = s.rstrip("0").rstrip(".") # remove trailing zeros and '.'
return s
[docs]
@total_ordering
class PMWFrequency:
"""
Class to represent a Passive Microwave frequency channel.
Attributes
----------
center_frequency : float or str
The (nominal) center frequency in GHz.
polarization : str
Polarization code, e.g. 'V', 'H', 'QV', 'QH'.
offset : float or None
Offset from the center frequency in GHz (e.g., 3 for 183±3 GHz). None if not applicable.
"""
# Map for flipping polarization
_OPPOSITE_POLARIZATION = {
"": "",
"V": "H",
"H": "V",
"QV": "QH",
"QH": "QV",
}
def __init__(self, center_frequency: float, polarization: str = "", offset=None):
# Check valid polarization
if polarization not in self._OPPOSITE_POLARIZATION:
valid_polarization = ", ".join(self._OPPOSITE_POLARIZATION.keys())
raise ValueError(f"Invalid polarization '{polarization}'. Must be one of: {valid_polarization}.")
self.polarization = polarization
self.center_frequency = float(center_frequency)
self.offset = float(offset) if offset is not None else None
@property
def center_frequency_str(self):
"""Return center frequency string."""
return strip_trailing_zero_decimals(self.center_frequency)
@property
def offset_str(self):
"""Return center frequency offset string."""
if self.offset is not None and abs(self.offset) > 1e-9:
return strip_trailing_zero_decimals(self.offset)
return ""
[docs]
@classmethod
def from_string(cls, string: str) -> "PMWFrequency":
"""
Create a PMWFrequency object from a string like '10.65V', '18.7H', or '183V3'.
Pattern:
1. Numeric frequency (integer or float)
2. Polarization ('V', 'H', 'QV', 'QH')
3. Optional numeric offset, e.g. '3'
Examples
--------
'10.65V' -> center_frequency=10.65, polarization='V', offset=None
'183V3' -> center_frequency=183, polarization='V', offset=3
'183.31QH7.5' -> center_frequency=183.31, polarization='QH', offset=7.5
"""
if not isinstance(string, (str, np.str_)):
raise TypeError("Expecting a string.")
# Regex breakdown:
# ^(\d+(?:\.\d+)?) -> group 1: one or more digits, optional decimal part
# (V|H|QV|QH) -> group 2: exactly 'V', 'H', 'QV', or 'QH'
# (\d+(?:\.\d+)?)?$ -> group 3: optional one or more digits, optional decimal part
pattern = r"^(\d+(?:\.\d+)?)(V|H|QV|QH)(\d+(?:\.\d+)?)?$"
match = re.match(pattern, string)
if not match:
raise ValueError(f"String '{string}' is not a valid PMW channel string.")
freq_str, pol_str, offset_str = match.groups()
freq = float(freq_str)
offset = float(offset_str) if offset_str is not None else None
return cls(center_frequency=freq, polarization=pol_str, offset=offset)
[docs]
def title(self) -> str:
"""Return a nicely formatted string representation of the frequency channel."""
freq_str = self.center_frequency_str
offset_str = self.offset_str
if offset_str != "":
return f"{freq_str} ± {offset_str} GHz ({self.polarization})"
return f"{freq_str} GHz ({self.polarization})"
def __repr__(self) -> str:
"""Return a concise string representation."""
freq_str = self.center_frequency_str
offset_str = self.offset_str
if offset_str != "":
title = f"{freq_str} ± {offset_str} GHz ({self.polarization})"
else:
title = f"{freq_str} GHz ({self.polarization})"
return f"<PMWFrequency: {title}>"
[docs]
def to_string(self) -> str:
"""
Recreate the original acronym string from the PMWFrequency object.
Examples
--------
- "10.65V"
- "183V3"
- "89QV7.5"
"""
freq_str = self.center_frequency_str
offset_str = self.offset_str
if offset_str != "":
return f"{freq_str}{self.polarization}{offset_str}"
return f"{freq_str}{self.polarization}"
def __hash__(self) -> int:
"""
Return a hash value based on the string returned by `to_string()`.
This ensures that two PMWFrequency objects which produce the same
string representation share the same hash.
"""
return hash(self.to_string())
[docs]
def opposite_polarization(self):
"""Return a new PMWFrequency object with flipped polarization (V <-> H, QV <-> QH).
Examples
--------
10.65V -> 10.65H
183QH3 -> 183QV3
"""
if self.polarization in self._OPPOSITE_POLARIZATION:
new_pol = self._OPPOSITE_POLARIZATION[self.polarization]
return PMWFrequency(self.center_frequency, new_pol, self.offset)
return None
[docs]
def has_same_center_frequency(self, other: "PMWFrequency", tol: float = 1e-6) -> bool:
"""Return True if the center frequency is the same as `other` within a specified tolerance."""
return abs(self.center_frequency - other.center_frequency) < tol
[docs]
def has_same_polarization(self, other: "PMWFrequency") -> bool:
"""Return True if the polarization is the same as `other`."""
return self.polarization == other.polarization
[docs]
def has_same_offset(self, other: "PMWFrequency") -> bool:
"""Return True if offset is the same as `other`."""
if self.offset is None and other.offset is None:
return True
if self.offset is None and other.offset is not None:
return False
if self.offset is not None and other.offset is None:
return False
return self.offset == other.offset
@property
def wavelength(self) -> float:
"""Returns the channel wavelength in meters.
The wavelength is computed as c / (f * 1e9),
where c ~ 3e8 m/s and f is the center_frequency in GHz.
"""
c_m_s = 3.0e8 # Speed of light in m/s
return c_m_s / (self.center_frequency * 1e9)
def __eq__(self, other: object) -> bool:
"""Define equality between two PMWFrequency objects.
It performs:
- approximate comparison (1e-9) for center_frequency
- approximate comparison (1e-9) for offset (treat None as 0)
- exact match for polarization.
"""
if not isinstance(other, PMWFrequency):
return NotImplemented
tol = 1e-9
offset_self = 0.0 if self.offset is None else self.offset
offset_other = 0.0 if other.offset is None else other.offset
same_freq = abs(self.center_frequency - other.center_frequency) < tol
same_off = abs(offset_self - offset_other) < tol
same_pol = self.polarization == other.polarization
return same_freq and same_off and same_pol
def __lt__(self, other: object) -> bool:
"""Return True if self is less than other by comparing center frequency, offset, and polarization.
Polarization V or QV takes precedence over H
"""
if not isinstance(other, PMWFrequency):
return NotImplemented
offset_self = 0.0 if self.offset is None else self.offset
offset_other = 0.0 if other.offset is None else other.offset
# Compare by center frequency first.
if self.center_frequency != other.center_frequency:
return self.center_frequency < other.center_frequency
# If center frequencies are equal, compare offsets (treat None as 0.0).
offset_self = 0.0 if self.offset is None else self.offset
offset_other = 0.0 if other.offset is None else other.offset
if offset_self != offset_other:
return offset_self < offset_other
# If offsets are also equal, compare polarization.
# Custom order: V and QV are considered lower than H and QH
polarization_order = {"": 0, "V": 1, "QV": 2, "H": 3, "QH": 4}
return polarization_order.get(self.polarization) < polarization_order.get(other.polarization)
[docs]
def find_polarization_pairs(pmw_frequencies):
"""Identify polariazion pairs of PMWFrequency objects.
The PMWFrequency objects must share the same center frequency
but differ in polarization (e.g., vertical vs. horizontal).
This function iterates through each PMWFrequency in the input list and
attempts to match it with another PMWFrequency that:
- Has the same center frequency.
- Has the opposite polarization (e.g., V vs. H or QV vs. QH).
Once a valid pair is found, it is stored in a dictionary keyed by the
shared center frequency. For consistent ordering of pairs, any item with
vertical polarization (e.g., "V", "QV") is placed first in the tuple,
followed by the corresponding horizontal polarization ("H", "QH").
Parameters
----------
pmw_frequencies : list of PMWFrequency
A list of PMWFrequency objects to be examined for pairs.
Returns
-------
dict
A dictionary where keys are center frequencies (float or int),
and values are 2-tuples of PMWFrequency objects in (vertical, horizontal) order.
If no match is found for a given frequency, that frequency is not included
in the dictionary.
"""
dict_pairs = {}
for freq in pmw_frequencies:
for other_freq in pmw_frequencies:
if (
freq != other_freq
and freq.has_same_center_frequency(other_freq)
and freq.opposite_polarization() == other_freq
):
center_frequency_str = freq.center_frequency_str
if center_frequency_str not in dict_pairs:
# Enforce V and H order
if freq.polarization in ["V", "QV"]:
dict_pairs[center_frequency_str] = (freq, other_freq)
else:
dict_pairs[center_frequency_str] = (other_freq, freq)
return dict_pairs
PCT_COEFFICIENTS = { # (tolerance, coef)
PMWFrequency(center_frequency=89): (5, 0.7), # 91 of SSMIS, 85 of SSMI
PMWFrequency(center_frequency=36): (3, 1.15), # 36.5, 37
PMWFrequency(center_frequency=19): (1, 1.40), # 18.7 OK . AVOID 21, 22, 23.8 ?
PMWFrequency(center_frequency=10): (3, 1.50),
}
[docs]
def get_pct_coefficient(center_frequency):
for freq, (tol, coef) in PCT_COEFFICIENTS.items():
if freq.has_same_center_frequency(PMWFrequency(center_frequency=center_frequency), tol=tol):
return coef
raise ValueError(f"PCT coefficients are not available for the {center_frequency} GHz frequency.")
[docs]
def normalize_channel(arr, vmin, vmax, vmin_dynamic, vmax_dynamic, invert):
"""Normalize array between 0 and 1 with optional inversion and dynamic min/max adjustment.
Parameters
----------
arr : array-like
Input array to be normalized.
vmin : float
Minimum value for normalization.
vmax : float
Maximum value for normalization.
vmin_dynamic : bool
If True, update vmin to the maximum of the provided vmin and the array minimum.
vmax_dynamic : bool
If True, update vmax to the minimum of the provided vmax and the array maximum.
invert : bool
If True, invert the normalized values (i.e., compute 1 - normalized array).
Returns
-------
norm_arr : array-like
Normalized array with values clipped between 0 and 1.
"""
# Define dynamic vmin and vmax if asked
if vmax_dynamic:
vmax = np.minimum(vmax, arr.max()) # .item() not compatible with dask
if vmin_dynamic:
vmin = np.maximum(vmin, arr.min()) # .item() not compatible with dask
# Define range
vrange = vmax - vmin
# Retrieve normalized arrays
norm_arr = (arr - vmin) / vrange
norm_arr = norm_arr.clip(0, 1)
# Invert if asked
if invert:
norm_arr = 1 - norm_arr
return norm_arr
[docs]
def get_brightness_temperature(xr_obj, variable):
"""Retrieve the brightness temperature data array from an xarray object.
Parameters
----------
xr_obj : xarray.Dataset or xarray.DataArray
Input xarray object containing brightness temperature data.
variable : str or None
Variable name to extract. If None, a default variable is determined based on possible options.
Returns
-------
dataarray : xarray.DataArray
DataArray of the brightness temperature.
"""
if variable is None and isinstance(xr_obj, xr.Dataset):
variable = get_default_variable(xr_obj, possible_variables=["Tb", "Tc"])
return get_xarray_variable(xr_obj, variable=variable)
[docs]
def get_frequencies_polarized_pairs(ds):
"""Get center frequency of polarized pairs."""
# Retrieve available frequencies
pmw_frequencies = [PMWFrequency.from_string(freq) for freq in ds["pmw_frequency"].data]
# Retrieve polarized frequencies couples
dict_polarization_pairs = find_polarization_pairs(pmw_frequencies)
return list(dict_polarization_pairs)
[docs]
def get_available_pct_features(ds):
"""Get list of available PCT features."""
return [f"PCT_{center_freq}" for center_freq in get_frequencies_polarized_pairs(ds) if float(center_freq) < 95]
[docs]
def get_available_pd_features(ds):
"""Get list of available PCT features."""
return [f"PD_{center_freq}" for center_freq in get_frequencies_polarized_pairs(ds)]
[docs]
def get_available_pr_features(ds):
"""Get list of available PR features."""
return [f"PR_{center_freq}" for center_freq in get_frequencies_polarized_pairs(ds)]
[docs]
def get_pmw_channel(xr_obj, name, variable=None):
"""Extract a specific PMW (Passive Microwave) channel from an xarray object.
Parameters
----------
xr_obj : xarray.Dataset or xarray.DataArray
PMW L1B or L1C product containing the brightness temperature variable.
name : str or PMWFrequency
PMW channel name or PMWFrequency object representing the desired frequency.
variable : str, optional
Variable name to extract from the xarray object. If None, the default variable is used.
Returns
-------
dataarray : xarray.DataArray
DataArray corresponding to the selected PMW channel.
"""
# Check name type
if not isinstance(name, (str, PMWFrequency)):
raise TypeError("Parameter 'name' must be either a string or a PMWFrequency object.")
# Ensure to get a brightness temperature dataarray
da = get_brightness_temperature(xr_obj, variable=variable)
# Retrieve matching frequency
pmw_frequencies_str = da["pmw_frequency"].data
pmw_frequency = PMWFrequency.from_string(name) if not isinstance(name, (PMWFrequency)) else name
pmw_frequencies = [PMWFrequency.from_string(s) for s in pmw_frequencies_str]
matched_frequency = find_closely_matching_frequency(pmw_frequency, pmw_frequencies, center_frequency_tol=2)
if matched_frequency is None:
raise ValueError(
f"Requested PMW frequency '{pmw_frequency.to_string()}' is not available. "
f"Available frequencies: {list(pmw_frequencies_str)}.",
)
# freq_str = name.to_string() if isinstance(name, PMWFrequency) else name
# if freq_str not in pmw_frequencies_str:
# raise ValueError(
# f"Requested PMW frequency '{freq_str}' is not available. "
# f"Available frequencies: {list(pmw_frequencies_str)}.",
# )
return da.sel(pmw_frequency=matched_frequency.to_string()).squeeze()
[docs]
def find_closely_matching_frequency(pmw_frequency, pmw_frequencies, center_frequency_tol):
"""Find the closely matching frequency within a set of frequencies."""
for freq in pmw_frequencies:
if (
pmw_frequency.has_same_center_frequency(freq, tol=center_frequency_tol)
and pmw_frequency.has_same_polarization(freq)
and pmw_frequency.has_same_offset(freq)
):
return freq
return None
[docs]
def find_closely_matching_center_frequency(center_frequency, center_frequencies):
"""Find the closely matching center frequency within a set of frequencies."""
center_frequency = PMWFrequency(center_frequency=center_frequency)
center_frequencies = [PMWFrequency(center_frequency=freq) for freq in center_frequencies]
for freq in center_frequencies:
if center_frequency.has_same_center_frequency(freq, tol=2):
return freq.center_frequency_str
return None
def _get_polarized_pairs_datarray(ds, name, feature_prefix):
"""Return the polarized pair of V and H DataArray."""
# Check valid feature name
if not name.startswith(f"{feature_prefix}_"):
raise ValueError(f"Specify {feature_prefix} name as {feature_prefix}_<center_freq>")
# Retrieve center frequency
center_frequency_str = name.replace(f"{feature_prefix}_", "")
# Retrieve available frequencies
pmw_frequencies = [PMWFrequency.from_string(freq) for freq in ds["pmw_frequency"].data]
# Retrieve available polarized frequencies couples
dict_polarization_pairs = find_polarization_pairs(pmw_frequencies)
# If feature not available, raise error
key = find_closely_matching_center_frequency(
center_frequency=center_frequency_str,
center_frequencies=list(dict_polarization_pairs),
)
if key is None:
valid_pct = [f"{feature_prefix}_{c}" for c in list(dict_polarization_pairs)]
raise ValueError(f"{name} is unavailable. Available {feature_prefix} are {valid_pct}.")
# Retrieve polarization pair
freq_v, freq_h = dict_polarization_pairs[key]
da_v = get_pmw_channel(ds, name=freq_v)
da_h = get_pmw_channel(ds, name=freq_h)
return da_v, da_h
[docs]
def get_pct(ds, name):
"""Compute a PCT (Polarization Corrected Temperature) from a PMW L1C product.
Parameters
----------
ds : xarray.Dataset
PMW L1C product containing the brightness temperature variable.
name : str
Name of the PCT feature in the format 'PCT_<center_freq>'.
Returns
-------
dataarray : xarray.DataArray
DataArray representing the computed PCT.
"""
# Retrieve V and H Tb DataArrays
da_v, da_h = _get_polarized_pairs_datarray(ds, name=name, feature_prefix="PCT")
# Retrieve PCTs coefficient
center_frequency_str = name.replace("PCT_", "")
coef = get_pct_coefficient(center_frequency_str)
# Compute PCT
da_pct = (1 + coef) * da_v - coef * da_h
return da_pct.squeeze()
[docs]
def get_pd(ds, name):
"""Compute a PD (Polarization Difference) from a PMW L1C product.
Parameters
----------
ds : xarray.Dataset
PMW L1C product containing the brightness temperature variable.
name : str
Name of the PD feature in the format 'PD_<center_freq>'.
Returns
-------
dataarray : xarray.DataArray
DataArray representing the computed PD.
"""
# Retrieve V and H Tb DataArrays
da_v, da_h = _get_polarized_pairs_datarray(ds, name=name, feature_prefix="PD")
# Compute PD
da_pd = da_v - da_h
return da_pd.squeeze()
[docs]
def get_pr(ds, name):
"""Compute a PR (Polarization Ratio) from a PMW L1C product.
Parameters
----------
ds : xarray.Dataset
PMW L1C product containing the brightness temperature variable.
name : str
Name of the PR feature in the format 'PR_<center_freq>'.
Returns
-------
dataarray : xarray.DataArray
DataArray representing the computed PR.
"""
# Retrieve V and H Tb DataArrays
da_v, da_h = _get_polarized_pairs_datarray(ds, name=name, feature_prefix="PR")
# Compute PD
da_pr = da_v / da_h
return da_pr.squeeze()
[docs]
def get_pmw_feature(ds, name):
"""Retrieve or compute a PMW feature from an L1C PMW product.
This function can handle:
- Simple PMW feature names such as "PCT_37", "PD_37", or direct channel names.
- Complex expressions using PMW feature names combined with basic arithmetic operations.
Parameters
----------
ds : xarray.Dataset
PMW L1C product containing the brightness temperature variable.
name : str
A single PMW feature name (e.g. "PCT_37", "PD_37", "37V", ...)
or a string expression combining multiple PMW feature names via
mathematical operators (+, -, *, /) and parentheses
(e.g. ```"(PCT_37 + PCT_19)/(PCT_37 - PCT_19)" ```).
Returns
-------
xarray.DataArray
DataArray corresponding to the requested or computed PMW feature.
"""
# Check if the input is a simple feature name or a compound expression
if is_simple_feature_name(name):
# Handle direct feature retrieval
return _get_simple_feature(ds, name)
# Handle expression
return _evaluate_feature_expression(ds, name)
[docs]
def is_simple_feature_name(name):
"""Determine if `name` is a single PMW feature or a more complex expression.
A 'simple' feature name is assumed to not contain any arithmetic
symbols (+, -, *, /) nor parentheses.
"""
# If we see +, -, *, /, or parentheses, treat as an expression
if any(symbol in name for symbol in "+-*/()"): # noqa: SIM103
return False
return True
def _get_simple_feature(ds, name):
"""Internal helper to retrieve a simple PMW feature.
Simple PMW features are PCT, PD, or a PMW channel.
"""
# Check known prefixes
if name.startswith("PCT"):
return get_pct(ds, name)
if name.startswith("PD"):
return get_pd(ds, name)
# Fall back to channel retrieval
return get_pmw_channel(ds, name)
def _is_number(string):
"""Check if a string can be interpreted as a float/int."""
try:
float(string)
return True
except ValueError:
return False
ALLOWED_OPERATORS = {"+", "-", "*", "/", "**"}
PARENS = {"(", ")"}
def _evaluate_feature_expression(ds, expression):
"""Safely evaluate an expression containing multiple PMW features.
Parameters
----------
ds : xarray.Dataset
PMW L1C product.
expression : str
A string expression, for example:
- "PCT_37 - PCT_19"
- "(PCT_37 + PCT_19)/(PCT_37 - PCT_19)"
- "PCT_37**2 + 273.15"
Returns
-------
xr.DataArray
DataArray resulting from the expression evaluation.
"""
# A regex that will capture:
# - ** (exponent)
# - single-character operators + - * / ( )
# - sequences of alphanumeric + underscore + optional dot (e.g., "PCT_37", "37V", "273.15")
token_pattern = re.compile(r"(\*\*|[+\-*/()]|[A-Za-z0-9_\.]+)")
# 1. Tokenize the expression
tokens = token_pattern.findall(expression)
if not tokens:
raise ValueError(f"Invalid or empty expression: {expression}")
feature_dict = {}
var_counter = 0
new_tokens = []
for token in tokens:
# Check parentheses
if token in PARENS:
new_tokens.append(token)
continue
# Check operators
if token in ALLOWED_OPERATORS:
new_tokens.append(token)
continue
# Check if token is numeric
if _is_number(token):
new_tokens.append(token)
continue
# Otherwise, assume it's a PMW feature name
# (e.g. "PCT_37", "PD_19", or direct channel "37V")
var_key = f"_var{var_counter}"
feature_dict[var_key] = _get_simple_feature(ds, token)
new_tokens.append(var_key)
var_counter += 1
# 2. Rebuild expression with strictly allowed tokens or variable placeholders
safe_expression = " ".join(new_tokens)
# 3. Evaluate the expression with xarray DataArrays in a restricted namespace
safe_namespace = {
"__builtins__": None, # Remove builtins to reduce security concerns
}
try:
result = eval(safe_expression, safe_namespace, feature_dict)
except Exception as exc:
raise ValueError(
f"Failed to evaluate expression '{expression}'. " f"Check syntax and feature names. Original error: {exc}",
)
# 4. Verify result is an xarray DataArray
if not isinstance(result, xr.DataArray):
raise TypeError(
f"Expression did not result in an xr.DataArray. Actual type: {type(result)}",
)
return result
[docs]
def get_pmw_rgb_receipts(sensor):
"""Return the RGB composite receipts available for a PMW sensor."""
sensor = check_sensor(sensor)
filepath = os.path.join(gpm._root_path, "gpm", "etc", "pmw", "composites", f"{sensor}.yaml")
return read_yaml(filepath)
[docs]
def create_rgb_composite(ds, receipt):
"""Generate an RGB composite from a 1C PMW dataset using the provided receipt.
Parameters
----------
ds : xarray.Dataset
Input dataset containing PMW data.
receipt : dict
Dictionary containing configuration for each channel ('R', 'G', 'B') and optional global normalization.
Each channel configuration should include the following keys:
- 'name': The PMW feature name.
- 'vmin': Minimum value for normalization.
- 'vmax': Maximum value for normalization.
- 'vmin_dynamic': Boolean flag for dynamic minimum adjustment.
- 'vmax_dynamic': Boolean flag for dynamic maximum adjustment.
- 'invert': Boolean flag to invert the channel.
Optionally, the dictionary may include a 'global_normalization' key (bool).
If True, the vmin and vmax of channels with vmin_dynamic and vmax_dynamic equal False are
updated with the minimum and maximum values across all such channels.
The specified vmin and vmax are used to bound the update of vmin and vmax.
Returns
-------
xarray.DataArray
An RGB composite DataArray with a coordinate 'rgb' corresponding to channels ['r', 'g', 'b'].
"""
receipt = receipt.copy()
# Retrieve input channels
dict_channels = {c: get_pmw_feature(ds, receipt[c]["name"]) for c in ["R", "G", "B"]}
# Check if perform global normalization
if receipt.get("global_normalization", False):
# Compute global minimum and maximum across channels to not dynamically normalize
global_min = min(da.min() for key, da in dict_channels.items() if not receipt[key]["vmin_dynamic"])
global_max = max(da.max() for key, da in dict_channels.items() if not receipt[key]["vmax_dynamic"])
# Update each channel's normalization parameters
for key in ["R", "G", "B"]:
if not receipt[key]["vmin_dynamic"]:
if receipt[key]["vmin"] is None:
receipt[key]["vmin"] = global_min
else:
receipt[key]["vmin"] = np.maximum(receipt[key]["vmin"], global_min)
if not receipt[key]["vmax_dynamic"]:
if receipt[key]["vmax"] is None:
receipt[key]["vmax"] = global_max
else:
receipt[key]["vmax"] = np.minimum(receipt[key]["vmax"], global_max)
# Retrieve normalized channels
dict_rgb = {
c: normalize_channel(
da_channel,
vmin=receipt[c]["vmin"],
vmax=receipt[c]["vmax"],
vmin_dynamic=receipt[c]["vmin_dynamic"],
vmax_dynamic=receipt[c]["vmax_dynamic"],
invert=receipt[c]["invert"],
)
for c, da_channel in dict_channels.items()
}
# Concatenate channels
da_rgb = xr.concat(dict_rgb.values(), coords="minimal", dim="rgb", compat="override")
da_rgb = da_rgb.assign_coords({"rgb": ["r", "g", "b"]})
return da_rgb