# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module contains functions to sanitize GPM-API Dataset coordinates."""
import functools
import os
import numpy as np
from gpm.utils.yaml import read_yaml
[docs]
def ensure_valid_coords(ds, raise_error=False):
"""Ensure geographic coordinates are within expected range."""
invalid_coords = np.logical_or(
np.logical_or(ds["lon"].data < -180, ds["lon"].data > 180),
np.logical_or(ds["lat"].data < -90, ds["lat"].data > 90),
)
if np.any(invalid_coords):
if raise_error:
raise ValueError("Invalid geographic coordinate in the granule.")
da_invalid_coords = ds["lon"].copy()
da_invalid_coords.data = invalid_coords
# For each variable, set NaN value where invalid coordinates
ds = ds.where(~da_invalid_coords)
# Add NaN to longitude and latitude
ds["lon"] = ds["lon"].where(~da_invalid_coords)
ds["lat"] = ds["lat"].where(~da_invalid_coords)
return ds
[docs]
def add_lh_height(ds):
"""Add 'height' coordinate to latent heat CSH and SLH products."""
# Fixed heights for 2HSLH and 2HCSH
# - FileSpec v7: p.2395, 2463
# NOTE: In SLH/CSH, the first row of the array correspond to the surface
# Instead, for the other GPM RADAR products, is the last row that correspond to the surface !!!
height = np.linspace(0.25 / 2, 20 - 0.25 / 2, 80) * 1000 # in meters
ds = ds.assign_coords({"height": ("range", height)})
ds["height"].attrs["units"] = "m a.s.l"
return ds
[docs]
def add_cmb_height(ds):
"""Add the 'height' coordinate to CMB products."""
from gpm.utils.manipulations import get_vertical_datarray_prototype
if "ellipsoidBinOffset" in ds and "localZenithAngle" in ds and "range" in ds.dims:
# Retrieve required DataArrays
range_bin = get_vertical_datarray_prototype(ds, fill_value=1) * ds["range"] # start at 1 !
ellipsoidBinOffset = ds["ellipsoidBinOffset"]
localZenithAngle = ds["localZenithAngle"]
rangeBinSize = 250 # approximation
# Compute height
height = ((1 - range_bin) * rangeBinSize + ellipsoidBinOffset) * np.cos(np.deg2rad(localZenithAngle))
ds = ds.assign_coords({"height": height})
return ds
# def _add_cmb_range_coordinate(ds, scan_mode):
# """Add range coordinate to 2B-<CMB> products."""
# if "range" in list(ds.dims) and scan_mode in ["NS", "KuKaGMI", "KuGMI", "KuTMI"]:
# range_values = np.arange(0, 88 * 250, step=250)
# ds = ds.assign_coords({"range_interval": ("range", range_values)})
# ds["range_interval"].attrs["units"] = "m"
# return ds
# def _add_radar_range_coordinate(ds, scan_mode):
# """Add range coordinate to 2A-<RADAR> products."""
# # - V6 and V7: 1BKu 260 bins NS and MS, 130 at HS
# if "range" in list(ds.dims):
# if scan_mode in ["HS"]:
# range_values = np.arange(0, 88 * 250, step=250)
# ds = ds.assign_coords({"range_interval": ("range", range_values)})
# if scan_mode in ["FS", "MS", "NS"]:
# range_values = np.arange(0, 176 * 125, step=125)
# ds = ds.assign_coords({"range_interval": ("range", range_values)})
# ds["range_interval"].attrs["units"] = "m"
# return ds
def _add_cmb_coordinates(ds, product, scan_mode):
"""Set coordinates of 2B-GPM-CORRA product."""
if "pmw_frequency" in list(ds.dims):
pmw_frequency = get_pmw_frequency_corra(product)
ds = ds.assign_coords({"pmw_frequency": pmw_frequency})
if (scan_mode in ("KuKaGMI", "NS")) and "radar_frequency" in list(ds.dims):
ds = ds.assign_coords({"radar_frequency": ["Ku", "Ka"]})
# Add height coordinate
ds = add_cmb_height(ds)
# # Add range_interval coordinate
# ds = _add_cmb_range_coordinate(ds, scan_mode)
return ds
def _add_wished_coordinates(ds):
"""Add wished coordinates to the dataset."""
from gpm.dataset.groups_variables import WISHED_COORDS
for var in WISHED_COORDS:
if var in list(ds):
ds = ds.set_coords(var)
return ds
def _add_radar_coordinates(ds, product, scan_mode): # noqa ARG001
"""Add range, height, radar_frequency, paramDSD coordinates to <RADAR> products."""
if product == "2A-DPR" and "radar_frequency" in list(ds.dims):
ds = ds.assign_coords({"radar_frequency": ["Ku", "Ka"]})
if product in ["2A-DPR", "2A-Ku", "2A-Ka", "2A-PR"] and "paramDSD" in list(ds):
ds = ds.assign_coords({"DSD_params": ["Nw", "Dm"]})
# # Add range_interval coordinate
# ds = _add_radar_range_coordinate(ds, scan_mode)
return ds
[docs]
@functools.cache
def get_pmw_frequency_dict():
"""Get PMW info dictionary."""
from gpm import _root_path
filepath = os.path.join(_root_path, "gpm", "etc", "pmw", "frequencies.yaml")
return read_yaml(filepath)
[docs]
@functools.cache
def get_pmw_frequency(sensor, scan_mode):
"""Get product info dictionary."""
pmw_dict = get_pmw_frequency_dict()
return pmw_dict[sensor][scan_mode]
[docs]
def get_pmw_frequency_corra(product):
if product == "2B-GPM-CORRA":
return get_pmw_frequency("GMI", scan_mode="S1") + get_pmw_frequency("GMI", scan_mode="S2")
if product == "2B-TRMM-CORRA":
return (
get_pmw_frequency("TMI", scan_mode="S1")
+ get_pmw_frequency("TMI", scan_mode="S2")
+ get_pmw_frequency("TMI", scan_mode="S3")
)
raise ValueError("Invalid (CORRA) product {product}.")
def _parse_sun_local_time(ds):
"""Ensure sunLocalTime to be in float type."""
dtype = ds["sunLocalTime"].data.dtype
if dtype == "timedelta64[ns]":
ds["sunLocalTime"] = ds["sunLocalTime"].astype(float) / 10**9 / 60 / 60
elif np.issubdtype(dtype, np.floating):
pass
else:
raise ValueError("Expecting sunLocalTime as float or timedelta64[ns]")
ds["sunLocalTime"].attrs["units"] = "decimal hours" # to avoid open_dataset netCDF convert to timedelta64[ns]
return ds
def _add_1c_pmw_frequency(ds, product, scan_mode):
"""Add the 'pmw_frequency' coordinates to 1C-<PMW> products."""
if "pmw_frequency" in list(ds.dims):
pmw_frequency = get_pmw_frequency(sensor=product.split("-")[1], scan_mode=scan_mode)
ds = ds.assign_coords({"pmw_frequency": pmw_frequency})
return ds
def _add_pmw_coordinates(ds, product, scan_mode):
"""Add coordinates to PMW products."""
if product.startswith("1C"):
ds = _add_1c_pmw_frequency(ds, product, scan_mode)
return ds
[docs]
def set_coordinates(ds, product, scan_mode):
# Ensure valid coordinates
if "cross_track" in list(ds.dims):
ds = ensure_valid_coords(ds, raise_error=False)
# Add range and gpm_range_id coordinates
# - range starts at 1 (for value-based selection with bin variables)
# - gpm_range_id starts at 0 (following python based indexing)
if "range" in list(ds.dims):
range_size = ds.sizes["range"]
range_coords = {
"range": ("range", np.arange(1, range_size + 1)),
"gpm_range_id": ("range", np.arange(0, range_size)),
}
ds = ds.assign_coords(range_coords)
# Add wished coordinates
ds = _add_wished_coordinates(ds)
# Convert sunLocalTime to float
if "sunLocalTime" in ds:
ds = _parse_sun_local_time(ds)
ds = ds.set_coords("sunLocalTime")
#### PMW
# - 1C products
if product.startswith("1C"):
ds = _add_pmw_coordinates(ds, product, scan_mode)
#### RADAR
if product in ["2A-DPR", "2A-Ku", "2A-Ka", "2A-PR", "2A-ENV-DPR", "2A-ENV-PR", "2A-ENV-Ka", "2A-ENV-Ku"]:
ds = _add_radar_coordinates(ds, product, scan_mode)
#### CMB
if product in ["2B-GPM-CORRA", "2B-TRMM-CORRA"]:
ds = _add_cmb_coordinates(ds, product, scan_mode)
#### SLH and CSH products
if product in ["2A-GPM-SLH", "2B-GPM-CSH"] and "range" in list(ds.dims):
ds = add_lh_height(ds)
#### IMERG
if "HQobservationTime" in ds:
da = ds["HQobservationTime"]
da = da.where(da >= 0) # -9999.9 --> NaN
da.attrs["units"] = "minutes from the start of the current half hour"
da.encoding["dtype"] = "uint8"
da.encoding["_FillValue"] = 255
da.attrs.pop("_FillValue", None)
da.attrs.pop("CodeMissingValue", None)
ds["HQobservationTime"] = da
if "MWobservationTime" in ds:
da = ds["MWobservationTime"]
da = da.where(da >= 0) # -9999.9 --> NaN
da.attrs["units"] = "minutes from the start of the current half hour"
da.attrs.pop("_FillValue", None)
da.attrs.pop("CodeMissingValue", None)
da.encoding["dtype"] = "uint8"
da.encoding["_FillValue"] = 255
ds["MWobservationTime"] = da
return ds