# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module contains utilities to check GPM-API Dataset coordinates."""
import functools
import numpy as np
import pandas as pd
from gpm.checks import check_has_along_track_dim, is_grid, is_orbit
from gpm.utils.decorators import (
check_is_gpm_object,
check_is_orbit,
)
from gpm.utils.slices import (
get_list_slices_from_bool_arr,
list_slices_difference,
list_slices_filter,
list_slices_flatten,
list_slices_intersection,
list_slices_simplify,
list_slices_sort,
list_slices_union,
)
# TODO: maybe infer from time? <3 s for DPR, up to 5s per old PMW
ORBIT_TIME_TOLERANCE = np.timedelta64(5, "s")
DEFAULT_CROSS_TRACK_DIM = "cross_track"
DEFAULT_ALONG_TRACK_DIM = "along_track"
DEFAULT_X = "lon"
DEFAULT_Y = "lat"
####--------------------------------------------------------------------------.
##########################
#### Regular granules ####
##########################
[docs]
def get_missing_granule_numbers(xr_obj):
"""Return ID numbers of missing granules.
It assumes xr_obj is a GPM ORBIT object.
"""
granule_ids = np.unique(xr_obj["gpm_granule_id"].data)
min_id = min(granule_ids)
max_id = max(granule_ids)
possible_ids = np.arange(min_id, max_id + 1)
return possible_ids[~np.isin(possible_ids, granule_ids)]
def _is_contiguous_granule(granule_ids):
"""Return a boolean array indicating if the next scan is not the same/next granule."""
# Retrieve if next scan is in the same or next granule (True) or False.
bool_arr = np.diff(granule_ids) <= 1
# Add True to last position
return np.append(bool_arr, True)
[docs]
@check_is_gpm_object
def get_slices_contiguous_granules(xr_obj, min_size=2):
"""Return a list of slices ensuring contiguous granules.
The minimum size of the output slices is ``2``.
Note: for GRID (i.e. IMERG) products, it checks for regular timesteps !
Note: No ``granule_id`` is provided for GRID products.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
min_size : int
Minimum size for a slice to be returned.
Returns
-------
list_slices : list
List of slice object to select contiguous granules.
Output format: ``[slice(start,stop), slice(start,stop),...]``
"""
if is_grid(xr_obj):
return get_slices_regular_time(xr_obj, tolerance=None, min_size=min_size)
if is_orbit(xr_obj):
# Get number of scans/timesteps
n_scans = xr_obj["gpm_granule_id"].shape[0]
# Define behaviour if less than 2 scans/timesteps
# - If n_scans 0, slice(0, 0) could return empty array
# - If n_scans 1, slice(0, 1) could return the single scan
# --> Here we decide to return an empty list !
if n_scans < min_size:
return []
# Get boolean array indicating if the next scan/timesteps is same or next granule
bool_arr = _is_contiguous_granule(xr_obj["gpm_granule_id"].data)
# If granules are missing present, get the slices with non-missing granules
list_slices = get_list_slices_from_bool_arr(
bool_arr,
include_false=True,
skip_consecutive_false=True,
)
# Select only slices with at least 2 scans
return list_slices_filter(list_slices, min_size=min_size)
return None
# Return list of contiguous scan slices
[docs]
def check_missing_granules(xr_obj):
"""Check no missing granules in the GPM Dataset.
It assumes xr_obj is a GPM ORBIT object.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
xarray object.
"""
missing_ids = get_missing_granule_numbers(xr_obj)
n_missing = len(missing_ids)
if n_missing > 0:
msg = f"There are {n_missing} missing granules. Their IDs are {missing_ids}."
raise ValueError(msg)
[docs]
def check_contiguous_granules(xr_obj):
"""Check no missing granules in the GPM Dataset.
It assumes xr_obj is a GPM ORBIT object.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
xarray object.
"""
return check_missing_granules(xr_obj)
[docs]
@check_is_gpm_object
def has_contiguous_granules(xr_obj):
"""Checks GPM object is composed of consecutive granules.
For ORBIT objects, it checks the ``gpm_granule_id``.
For GRID objects, it checks timesteps regularity.
"""
# ORBIT
if is_orbit(xr_obj):
return bool(np.all(_is_contiguous_granule(xr_obj["gpm_granule_id"].data)))
# if is_grid(xr_obj):
return has_regular_time(xr_obj)
[docs]
@check_is_gpm_object
def has_missing_granules(xr_obj):
"""Checks GPM object has missing granules.
For ORBIT objects, it checks the ``gpm_granule_id``.
For GRID objects, it checks timesteps regularity.
"""
# ORBIT
if is_orbit(xr_obj):
return bool(np.any(~_is_contiguous_granule(xr_obj["gpm_granule_id"].data)))
# if is_grid(xr_obj):
return not has_regular_time(xr_obj)
####--------------------------------------------------------------------------.
############################
#### Regular timesteps ####
############################
def _get_timesteps(xr_obj):
"""Get timesteps with second precision from xarray.DataArray or xarray.Dataset."""
timesteps = xr_obj["time"].to_numpy()
return timesteps.astype("M8[s]")
@check_is_gpm_object
def _infer_time_tolerance(xr_obj):
"""Infer time interval tolerance between timesteps."""
# For GPM ORBIT objects, use the ORBIT_TIME_TOLERANCE
if is_orbit(xr_obj):
tolerance = ORBIT_TIME_TOLERANCE
# For GPM GRID objects, infer it from the time difference between first two timesteps
elif is_grid(xr_obj):
timesteps = _get_timesteps(xr_obj)
tolerance = np.diff(timesteps[0:2])[0]
return tolerance
def _is_regular_time(xr_obj, tolerance=None):
"""Return a boolean array indicating if the next regular timestep is present."""
# Retrieve timesteps
timesteps = _get_timesteps(xr_obj)
# Infer tolerance if not specified
tolerance = _infer_time_tolerance(xr_obj) if tolerance is None else tolerance
# Identify if the next regular timestep is present
bool_arr = np.diff(timesteps) <= tolerance
# Add True to last position
return np.append(bool_arr, True)
[docs]
def get_slices_regular_time(xr_obj, tolerance=None, min_size=1):
"""Return a list of slices ensuring timesteps to be regular.
Output format: [slice(start,stop), slice(start,stop),...]
Consecutive non-regular timesteps leads to slices of size 1.
An xarray object with a single timestep leads to a slice of size 1.
If min_size=1 (the default), such slices are returned.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
tolerance : numpy.timedelta64, optional
The timedelta tolerance to define regular vs. non-regular timesteps.
The default is ``None``.
If GPM GRID object, it uses the first 2 timesteps to derive the tolerance timedelta.
If GPM ORBIT object, it uses the ``ORBIT_TIME_TOLERANCE``.
min_size : int
Minimum size for a slice to be returned.
Returns
-------
list_slices : list
List of slice object to select regular time intervals.
Output format: ``[slice(start,stop), slice(start,stop),...]``
"""
# Retrieve timestep
timesteps = _get_timesteps(xr_obj)
n_timesteps = len(timesteps)
# Define special behaviour if less than 2 timesteps
# - If n_timesteps 0, slice(0, 0) returns empty array
# - If n_timesteps 1, slice(0, 1) returns the single timestep
# --> If less than 2 timesteps, assume regular timesteps
if n_timesteps < 2:
list_slices = [slice(0, n_timesteps)]
# Otherwise
else:
# Get boolean array indicating if the next regular timestep is present
is_regular = _is_regular_time(xr_obj, tolerance=tolerance)
# If non-regular timesteps are present, get the slices for each regular interval
# - If consecutive non-regular timestep occurs, returns slices of size 1
list_slices = get_list_slices_from_bool_arr(
is_regular,
include_false=True,
skip_consecutive_false=False,
)
# Select only slices with at least min_size timesteps
return list_slices_filter(list_slices, min_size=min_size)
# Return list of slices with regular timesteps
[docs]
def get_slices_non_regular_time(xr_obj, tolerance=None):
"""Return a list of slices where there are supposedly missing timesteps.
The output slices have size 2.
An input with less than 2 scans (along-track) returns an empty list.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
tolerance : numpy.timedelta64, optional
The timedelta tolerance to define regular vs. non-regular timesteps.
The default is ``None``.
If GPM GRID object, it uses the first 2 timesteps to derive the tolerance timedelta.
If GPM ORBIT object, it uses the ``ORBIT_TIME_TOLERANCE``.
It is discouraged to use this function for GPM ORBIT objects !
Returns
-------
list_slices : list
List of slice object to select intervals with non-regular timesteps.
Output format: ``[slice(start,stop), slice(start,stop),...]``
"""
# Retrieve timestep
timesteps = _get_timesteps(xr_obj)
n_timesteps = len(timesteps)
# Define behaviour if less than 2 timesteps
# --> Here we decide to return an empty list !
if n_timesteps < 2:
return []
# Get boolean array indicating if the next regular timestep is present
is_regular = _is_regular_time(xr_obj, tolerance=tolerance)
# If non-regular timesteps are present, get the slices where timesteps non-regularity occur
if not np.all(is_regular):
indices_next_nonregular = np.argwhere(~is_regular).flatten()
list_slices = [slice(i, i + 2) for i in indices_next_nonregular]
else:
list_slices = []
return list_slices
[docs]
def check_regular_time(xr_obj, tolerance=None, verbose=True):
"""Check no missing timesteps for longer than 'tolerance' seconds.
Note:
- This sometimes occurs between orbit/grid granules
- This sometimes occurs within a orbit granule
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
xarray object.
tolerance : numpy.timedelta64, optional
The timedelta tolerance to define regular vs. non-regular timesteps.
The default is ``None``.
If GPM GRID object, it uses the first 2 timesteps to derive the tolerance timedelta.
If GPM ORBIT object, it uses the ``ORBIT_TIME_TOLERANCE``.
verbose : bool
If True, it prints the time interval when the non contiguous scans occurs.
The default is ``True``.
"""
list_discontinuous_slices = get_slices_non_regular_time(xr_obj, tolerance=tolerance)
n_discontinuous = len(list_discontinuous_slices)
if n_discontinuous > 0:
# Retrieve discontinuous timesteps interval
timesteps = _get_timesteps(xr_obj)
list_discontinuous = [(timesteps[slc.start], timesteps[slc.stop - 1]) for slc in list_discontinuous_slices]
first_problematic_timestep = list_discontinuous[0][0]
# Print non-regular timesteps
if verbose:
for start, stop in list_discontinuous:
print(f"- Missing timesteps between {start} and {stop}")
# Raise error and highlight first non-contiguous scan
raise ValueError(
f"There are {n_discontinuous} non-regular timesteps. The first occur at {first_problematic_timestep}.",
)
[docs]
def has_regular_time(xr_obj):
"""Return True if all timesteps are regular. False otherwise."""
list_discontinuous_slices = get_slices_non_regular_time(xr_obj)
n_discontinuous = len(list_discontinuous_slices)
return n_discontinuous == 0
####--------------------------------------------------------------------------.
########################
#### Regular scans ####
########################
def _select_lons_lats_centroids(xr_obj, x=DEFAULT_X, y=DEFAULT_Y, cross_track_dim=DEFAULT_CROSS_TRACK_DIM):
if cross_track_dim not in xr_obj.dims:
lons = xr_obj[x].to_numpy()
lats = xr_obj[y].to_numpy()
else:
# Select centroids coordinates in the middle of the cross_track scan
middle_idx = int(xr_obj[cross_track_dim].shape[0] / 2)
lons = xr_obj[x].isel({cross_track_dim: middle_idx}).to_numpy()
lats = xr_obj[y].isel({cross_track_dim: middle_idx}).to_numpy()
return lons, lats
def _get_along_track_scan_distance(xr_obj, x=DEFAULT_X, y=DEFAULT_Y, cross_track_dim=DEFAULT_CROSS_TRACK_DIM):
"""Compute the distance between along_track centroids."""
from pyproj import Geod
# Select centroids coordinates
# - If no cross-track, take the available lat/lon 1D array
# - If cross-track dimension is present, takes the coordinates in the swath middle.
lons, lats = _select_lons_lats_centroids(xr_obj, x=x, y=y, cross_track_dim=cross_track_dim)
# Define between-centroids line coordinates
start_lons = lons[:-1]
start_lats = lats[:-1]
end_lons = lons[1:]
end_lats = lats[1:]
# Compute distance
geod = Geod(ellps="sphere")
_, _, dist = geod.inv(start_lons, start_lats, end_lons, end_lats)
return dist
def _is_contiguous_scans(xr_obj, x=DEFAULT_X, y=DEFAULT_Y, cross_track_dim=DEFAULT_CROSS_TRACK_DIM):
"""Return a boolean array indicating if the next scan is contiguous.
It assumes at least 3 scans are provided.
The last element is set to True since it can not be verified.
"""
# Compute along track scan distance
dist = _get_along_track_scan_distance(xr_obj, x=x, y=y, cross_track_dim=cross_track_dim)
# Convert to km and round
dist_km = np.round(dist / 1000, 0)
## Option 1
# Identify if the next scan is contiguous
# - Use the smallest distance as reference
# - Assumed to be non contiguous if separated by more than min_dist + half min_dist
# - This fails if duplicated geolocation --> min_dist = 0
min_dist = np.nanmin(dist_km)
bool_arr = dist_km < (min_dist + min_dist / 2)
### Option 2
# # Identify if the next scan is contiguous
# # - Use the smallest distance as reference
# # - Assumed to be non contiguous if exceeds min_dist + min_dist/2
# # - This fails when more discontiguous/repeated than contiguous
# dist_unique, dist_counts = np.unique(dist_km, return_counts=True)
# most_common_dist = dist_unique[np.argmax(dist_counts)]
# # Identify if the next scan is contiguous
# bool_arr = dist_km < (most_common_dist + most_common_dist/2)
# Add True to last position
return np.append(bool_arr, True)
[docs]
@check_is_orbit
def get_slices_contiguous_scans(
xr_obj,
min_size=2,
min_n_scans=3,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Return a list of slices ensuring contiguous scans (and granules).
It checks for contiguous scans only in the middle of the cross-track !
If a scan geolocation is NaN, it will be considered non-contiguous.
An input with less than 3 scans (along-track) returns an empty list, since
scan contiguity can't be verified.
Consecutive non-contiguous scans are discarded and not included in the outputs.
The minimum size of the output slices is ``2``.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
min_size : int
Minimum size for a slice to be returned.
Returns
-------
list_slices : list
List of slice object to select contiguous scans.
Output format: ``[slice(start,stop), slice(start,stop),...]``
"""
check_has_along_track_dim(xr_obj, dim=along_track_dim)
# Get number of scans
n_scans = xr_obj[along_track_dim].shape[0]
# Define behaviour if less than 2/3 scan along track
# --> Contiguity can't be verified without at least 3 slices !
# --> But for visualization purpose, if only 2 scans available, we want to plot it (and consider it contiguous)
# --> Here we decide to return an empty list !
if n_scans < min_n_scans:
return []
# Get boolean array indicating if the next scan is contiguous
is_contiguous = _is_contiguous_scans(xr_obj, x=x, y=y, cross_track_dim=cross_track_dim)
# If non-contiguous scans are present, get the slices with contiguous scans
# - It discard consecutive non-contiguous scans
list_slices = get_list_slices_from_bool_arr(
is_contiguous,
include_false=True,
skip_consecutive_false=True,
)
# Select only slices with at least 2 scans
list_slices = list_slices_filter(list_slices, min_size=min_size)
# Also retrieve the slices with non missing granule
list_slices1 = get_slices_contiguous_granules(xr_obj)
# Perform list_slices intersection
return list_slices_intersection(list_slices, list_slices1)
# Return list of contiguous scan slices
[docs]
@check_is_orbit
def get_slices_non_contiguous_scans(
xr_obj,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Return a list of slices where the scans discontinuity occurs.
An input with less than ``2`` scans (along-track) returns an empty list.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
Returns
-------
list_slices : list
List of slice object to select discontiguous scans.
Output format: ``[slice(start,stop), slice(start,stop),...]``
"""
check_has_along_track_dim(xr_obj, dim=along_track_dim)
# Get number of scans
n_scans = xr_obj[along_track_dim].shape[0]
# Define behaviour if less than 3 scan along track
# --> Contiguity can't be verified without at least 3 slices !
# --> Here we decide to return an empty list !
if n_scans < 3:
return []
### CODE to output slices with size 2.
# # Get boolean array indicating if the next scan is contiguous
# is_contiguous = _is_contiguous_scans(xr_obj)
# # If non-contiguous scans are present, get the slices when discontinuity occurs
# if not np.all(is_contiguous):
# indices_next_discontinuous = np.argwhere(~is_contiguous).flatten()
# list_slices = [slice(i, i + 2) for i in indices_next_discontinuous]
# else:
# list_slices = []
# return list_slices
list_slices_valid = get_slices_contiguous_scans(
xr_obj,
min_size=2,
x=x,
y=y,
along_track_dim=along_track_dim,
cross_track_dim=cross_track_dim,
)
list_slices_full = [slice(0, len(xr_obj[along_track_dim]))]
return list_slices_difference(list_slices_full, list_slices_valid)
[docs]
def check_contiguous_scans(
xr_obj,
verbose=True,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Check no missing scans across the along_track direction.
Note:
- This sometimes occurs between orbit granules.
- This sometimes occurs within a orbit granule.
- This function also works for nadir-looking only orbits (no cross-track).
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
xarray object.
verbose : bool
If ``True``, it prints the time interval when the non contiguous scans occurs.
"""
check_has_along_track_dim(xr_obj, dim=along_track_dim)
list_discontinuous_slices = get_slices_non_contiguous_scans(
xr_obj,
x=x,
y=y,
along_track_dim=along_track_dim,
cross_track_dim=cross_track_dim,
)
n_discontinuous = len(list_discontinuous_slices)
if n_discontinuous > 0:
# Retrieve discontinuous timesteps interval
timesteps = _get_timesteps(xr_obj)
list_discontinuous = [(timesteps[slc.start], timesteps[slc.stop - 1]) for slc in list_discontinuous_slices]
first_problematic_timestep = list_discontinuous[0][0]
# Print non-contiguous scans
if verbose:
for start, stop in list_discontinuous:
print(f"- Missing scans between {start} and {stop}")
# Raise error and highlight first non-contiguous scan
msg = f"There are {n_discontinuous} non-contiguous scans. "
msg += f"The first occur at {first_problematic_timestep}. "
msg += "Use get_slices_contiguous_scans() to retrieve a list of contiguous slices."
raise ValueError(msg)
[docs]
def has_contiguous_scans(
xr_obj,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Return ``True`` if all scans are contiguous. ``False`` otherwise.
This functions also works with nadir-only looking orbit.
"""
check_has_along_track_dim(xr_obj, dim=along_track_dim)
list_discontinuous_slices = get_slices_non_contiguous_scans(
xr_obj,
x=x,
y=y,
along_track_dim=along_track_dim,
cross_track_dim=cross_track_dim,
)
n_discontinuous = len(list_discontinuous_slices)
return n_discontinuous == 0
####--------------------------------------------------------------------------.
#############################
#### Regular geolocation ####
#############################
def _is_non_valid_geolocation(xr_obj, coord):
"""Return a boolean array indicating if the geolocation is invalid.
`True = Invalid`, `False = Valid`.
"""
return np.isnan(xr_obj[coord])
def _is_valid_geolocation(xr_obj, coord):
"""Return a boolean array indicating if the geolocation is valid.
`True = Valid`, `False = Invalid`.
"""
return ~np.isnan(xr_obj[coord])
[docs]
@check_is_orbit
def get_slices_valid_geolocation(
xr_obj,
min_size=2,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Return a list of GPM ORBIT along-track slices with valid geolocation.
The minimum size of the output slices is ``2``.
If at a given cross-track index, there are always wrong geolocation,
it discards such cross-track index(es) before identifying the along-track slices.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM ORBIT xarray object.
min_size : int
Minimum size for a slice to be returned.
The default is ``2``.
Returns
-------
list_slices : list
List of slice object with valid geolocation.
Output format: ``[slice(start,stop), slice(start,stop),...]``
"""
# - Get invalid coordinates
invalid_lon_coords = _is_non_valid_geolocation(xr_obj, coord=x)
invalid_lat_coords = _is_non_valid_geolocation(xr_obj, coord=y)
invalid_coords = np.logical_or(invalid_lon_coords, invalid_lat_coords)
# - Identify cross-track index that along-track are always invalid
idx_cross_track_not_all_invalid = np.where(~invalid_coords.all(along_track_dim))[0]
# - If all invalid, return empty list
if len(idx_cross_track_not_all_invalid) == 0:
return []
# - Select only cross-track index that are not all invalid along-track
invalid_coords = invalid_coords.isel({cross_track_dim: idx_cross_track_not_all_invalid})
# - Now identify scans across which there are still invalid coordinates
invalid_scans = invalid_coords.any(dim=cross_track_dim)
valid_scans = ~invalid_scans
# - Now identify valid along-track slices
list_slices = get_list_slices_from_bool_arr(
valid_scans,
include_false=False,
skip_consecutive_false=True,
)
# Select only slices with at least 2 scans
return list_slices_filter(list_slices, min_size=min_size)
[docs]
def get_slices_non_valid_geolocation(
xr_obj,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Return a list of GPM ORBIT along-track slices with non-valid geolocation.
The minimum size of the output slices is 2.
If at a given cross-track index, there are always wrong geolocation,
it discards such cross-track index(es) before identifying the along-track slices.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
min_size : int
Minimum size for a slice to be returned.
The default is ``1``.
Returns
-------
list_slices : list
List of slice object with non-valid geolocation.
Output format: ``[slice(start,stop), slice(start,stop),...]``
"""
list_slices_valid = get_slices_valid_geolocation(
xr_obj,
min_size=1,
x=x,
y=y,
along_track_dim=along_track_dim,
cross_track_dim=cross_track_dim,
)
list_slices_full = [slice(0, len(xr_obj[along_track_dim]))]
return list_slices_difference(list_slices_full, list_slices_valid)
[docs]
def check_valid_geolocation(
xr_obj,
verbose=True,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Check no geolocation errors in the GPM Dataset.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
xarray object.
"""
list_invalid_slices = get_slices_non_valid_geolocation(
xr_obj,
x=x,
y=y,
along_track_dim=along_track_dim,
cross_track_dim=cross_track_dim,
)
n_invalid_scan_slices = len(list_invalid_slices)
if n_invalid_scan_slices > 0:
# Retrieve timesteps interval with non valid geolocation
timesteps = _get_timesteps(xr_obj)
list_invalid = [(timesteps[slc][0], timesteps[slc][-1]) for slc in list_invalid_slices]
first_problematic_timestep = list_invalid[0][0]
# Print non-contiguous scans
if verbose:
for start, stop in list_invalid:
print(f"- Missing scans between {start} and {stop}")
# Raise error and highlight first non-contiguous scan
msg = f"There are {n_invalid_scan_slices} swath portions with non-valid geolocation."
msg += f"The first occur at {first_problematic_timestep}."
raise ValueError(msg)
[docs]
@check_is_gpm_object
def has_valid_geolocation(
xr_obj,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Checks GPM object has valid geolocation."""
if is_orbit(xr_obj):
list_invalid_slices = get_slices_non_valid_geolocation(
xr_obj,
x=x,
y=y,
along_track_dim=along_track_dim,
cross_track_dim=cross_track_dim,
)
n_invalid_scan_slices = len(list_invalid_slices)
return n_invalid_scan_slices == 0
return bool(is_grid(xr_obj))
[docs]
def apply_on_valid_geolocation(function):
"""Decorator appliying the input function on valid geolocation GPM ORBIT slices."""
@functools.wraps(function)
def wrapper(*args, **kwargs):
xr_obj = args[0]
# Get slices with valid geolocation
list_slices_valid = get_slices_valid_geolocation(xr_obj)
# Loop over valid slices and retrieve the final slices
list_slices = []
new_args = list(args)
for slc in list_slices_valid:
# Retrieve slice offset
start_offset = slc.start
# Retrieve dataset subset
subset_xr_obj = xr_obj.isel({"along_track": slc})
# Update args
new_args[0] = subset_xr_obj
# Apply function
subset_slices = function(*args, **kwargs)
# Add start offset to subset slices
if len(subset_slices) > 0:
subset_slices = [slice(slc.start + start_offset, slc.stop + start_offset) for slc in subset_slices]
list_slices.append(subset_slices)
# Flatten the list
return list_slices_flatten(list_slices)
# Return list of slices
return wrapper
####--------------------------------------------------------------------------.
############################
#### Non-wobbling orbit ####
############################
def _replace_0_values(x):
"""Replace 0 values with previous left non-zero occurring values.
If the array start with 0, it take the first non-zero occurring values
"""
# Check inputs
x = np.array(x)
dtype = x.dtype
if np.all(x == 0):
raise ValueError("It's a flat swath orbit.")
# Set 0 to NaN
x = x.astype(float)
x[x == 0] = np.nan
# Infill from left values, and then from right (if x start with 0)
x = pd.Series(x).ffill().bfill().to_numpy()
# Reset original dtype
return x.astype(dtype)
def _get_non_wobbling_lats(lats, threshold=100):
# Get direction (1 ascending , -1 descending)
directions = np.sign(np.diff(lats))
directions = np.append(directions[0], directions) # include startpoint
directions = _replace_0_values(directions)
# Identify index where next element change
idxs_change = np.where(np.diff(directions) != 0)[0]
# Retrieve valid slices
indices = np.unique(np.concatenate(([0], idxs_change, [len(lats) - 1])))
orbit_slices = [slice(indices[i], indices[i + 1] + 1) for i in range(len(indices) - 1)]
list_slices = list_slices_filter(orbit_slices, min_size=threshold)
return list_slices_simplify(list_slices)
[docs]
@apply_on_valid_geolocation
def get_slices_non_wobbling_swath(
xr_obj,
threshold=100,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Return the GPM ORBIT along-track slices along which the swath is not wobbling.
For wobbling, we define the occurrence of changes in latitude directions
in less than ``threshold`` scans.
The function extract the along-track boundary on both swath sides and
identify where the change in orbit direction occurs.
"""
xr_obj = xr_obj.transpose(cross_track_dim, along_track_dim, ...)
lats = xr_obj[y].to_numpy()
lats_side0 = lats[0, :]
lats_side2 = lats[-1, :]
# Get valid slices
list_slices1 = _get_non_wobbling_lats(lats_side0, threshold=threshold)
list_slices2 = _get_non_wobbling_lats(lats_side2, threshold=threshold)
return list_slices_intersection(list_slices1, list_slices2)
[docs]
@apply_on_valid_geolocation
def get_slices_wobbling_swath(
xr_obj,
threshold=100,
y=DEFAULT_Y,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
):
"""Return the GPM ORBIT along-track slices along which the swath is wobbling.
For wobbling, we define the occurrence of changes in latitude directions
in less than `threshold` scans.
The function extract the along-track boundary on both swath sides and
identify where the change in orbit direction occurs.
"""
list_slices1 = get_slices_non_wobbling_swath(
xr_obj,
threshold=threshold,
y=y,
along_track_dim=along_track_dim,
cross_track_dim=cross_track_dim,
)
list_slices_full = [slice(0, len(xr_obj[along_track_dim]))]
return list_slices_difference(list_slices_full, list_slices1)
####---------------------------------------------------------------------------
#####################################
#### Check GPM object regularity ####
#####################################
[docs]
@check_is_gpm_object
def is_regular(xr_obj):
"""Checks the GPM object is regular.
For GPM ORBITS, it checks that the scans are contiguous.
For GPM GRID, it checks that the timesteps are regularly spaced.
"""
if is_orbit(xr_obj):
return has_contiguous_scans(xr_obj)
if is_grid(xr_obj):
return has_regular_time(xr_obj)
return False
[docs]
@check_is_gpm_object
def get_slices_regular(
xr_obj,
min_size=None,
min_n_scans=3,
x=DEFAULT_X,
y=DEFAULT_Y,
along_track_dim=DEFAULT_ALONG_TRACK_DIM,
cross_track_dim=DEFAULT_CROSS_TRACK_DIM,
):
"""Return a list of slices to select regular GPM objects.
For GPM ORBITS, it returns slices to select contiguous scans with valid geolocation.
For GPM GRID, it returns slices to select periods with regular timesteps.
For more information, read the documentation of:
- ``gpm.utils.checks.get_slices_contiguous_scans``
- ``gpm.utils.checks.get_slices_regular_time``
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
min_size : int
Minimum size for a slice to be returned.
If ``None``, default to ``1`` for GRID objects, ``2`` for ORBIT objects.
min_n_scans : int
Minimum number of scans to be able to check for scan contiguity.
For visualization purpose, this value might want to be set to ``2``.
This parameter applies only to ORBIT objects.
Returns
-------
list_slices : list
List of slice object to select regular portions.
Output format: ``[slice(start,stop), slice(start,stop),...]``
"""
if is_orbit(xr_obj):
min_size = 2 if min_size is None else min_size
# Get swath portions where there are not missing scans (and granules)
list_slices_contiguous = get_slices_contiguous_scans(
xr_obj,
min_size=min_size,
min_n_scans=min_n_scans,
x=x,
y=y,
along_track_dim=along_track_dim,
cross_track_dim=cross_track_dim,
)
# Get swath portions where there are valid geolocation
list_slices_geolocation = get_slices_valid_geolocation(xr_obj, min_size=min_size, x=x, y=y)
# Find swath portions meeting all the requirements
return list_slices_intersection(list_slices_geolocation, list_slices_contiguous)
if is_grid(xr_obj):
min_size = 1 if min_size is None else min_size
return get_slices_regular_time(xr_obj, min_size=min_size)
return None
####--------------------------------------------------------------------------.
#### Get slices from GPM object variable values
def _check_criteria(criteria):
if criteria not in ["all", "any"]:
raise ValueError("Invalid value for criteria parameter. Must be 'all' or 'any'.")
return criteria
def _get_slices_variable_equal_value(da, value, dim=None, criteria="all"):
"""Return the slices with contiguous `value` in the `dim` dimension."""
# Check dim
if isinstance(dim, str):
dim = [dim]
dim = set(dim)
# Retrieve dims
dims = set(da.dims)
# Identify regions where the value occurs
da_bool = da == value
# Collapse other dimension than dim
dims_apply_over = list(dims.difference(dim))
bool_arr = da_bool.all(dim=dims_apply_over).data if criteria == "all" else da_bool.any(dim=dims_apply_over).data
# Get list of slices with contiguous value
return get_list_slices_from_bool_arr(bool_arr, include_false=False, skip_consecutive_false=True)
[docs]
def get_slices_var_equals(da, dim, values, union=True, criteria="all"):
"""Return a list of slices along the `dim` dimension where `values` occurs.
The function is applied recursively to each value in `values`.
If the xarray.DataArray has additional dimensions, the "criteria" parameter is
used to determine whether all values within each slice index must be equal
to value (if set to ``"all"``) or if at least one value within the slice index
must be equal to value (if set to ``"any"``).
If `values` are a list of values:
- if ``union=True``, it return slices corresponding to the sequence of consecutive values.
- if ``union=False``, it return slices for each value in values.
If ``union=False`` ``[0,0, 1, 1]`` with ``values=[0,1]`` will return ``[slice(0,2), slice(2,4)]``
If ``union=True`` [0,0, 1, 1] with ``values=[0,1]`` will return ``[slice(0,4)]``
`union` matters when multiple values are specified
`criteria` matters when the xarray.DataArray has multiple dimensions.
"""
criteria = _check_criteria(criteria)
# Get data array and dimensions
da = da.compute()
# Retrieve the slices where the value(s) occur
if isinstance(values, (float, int)):
return _get_slices_variable_equal_value(da=da, value=values, dim=dim, criteria=criteria)
# If multiple values, apply recursively
list_of_list_slices = [
_get_slices_variable_equal_value(da=da, value=value, dim=dim, criteria=criteria) for value in values
]
# Return list_slices
return list_slices_union(*list_of_list_slices) if union else list_slices_sort(*list_of_list_slices)
[docs]
def get_slices_var_between(da, dim, vmin=-np.inf, vmax=np.inf, criteria="all"):
"""Return a list of slices along the `dim` dimension where values are between the interval.
If the xarray.DataArray has additional dimensions, the ``criteria`` parameter is
used to determine whether all values within each slice index must be between
the interval (if set to ``"all"``) or if at least one value within the slice index
must be between the interval (if set to ``"any"``).
"""
criteria = _check_criteria(criteria)
# Check dim
if isinstance(dim, str):
dim = [dim]
dim = set(dim)
# Get data array and dimensions
da = da.compute()
# Retrieve dims
dims = set(da.dims)
# Identify regions where the value are between the thresholds
da_bool = np.logical_and(da >= vmin, da <= vmax)
# Collapse other dimension than dim
dims_apply_over = list(dims.difference(dim))
bool_arr = da_bool.all(dim=dims_apply_over).data if criteria == "all" else da_bool.any(dim=dims_apply_over).data
# Get list of slices with contiguous value
return get_list_slices_from_bool_arr(bool_arr, include_false=False, skip_consecutive_false=True)