# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module contains utilities for time processing."""
from typing import Optional
import numpy as np
import pandas as pd
import xarray as xr
from xarray.core import dtypes
from gpm.io.checks import (
check_start_end_time,
check_time,
)
####--------------------------------------------------------------------------.
############################
#### Subsetting by time ####
############################
[docs]
def subset_by_time(xr_obj, start_time=None, end_time=None):
"""Filter a GPM xarray object by start_time and end_time.
Parameters
----------
xr_obj :
A xarray object.
start_time : datetime.datetime
Start time.
By default is ``None``
end_time : datetime.datetime
End time.
By default is ``None``
Returns
-------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object
"""
# Check inputs
if start_time is not None:
start_time = check_time(start_time)
if end_time is not None:
end_time = check_time(end_time)
if start_time is not None and end_time is not None:
start_time, end_time = check_start_end_time(start_time, end_time)
# Get 1D dimension of the coordinate
dim_coords = list(xr_obj["time"].dims)
# If no dimension, it means that nothing to be subsetted
# - Likely the time dimension has been dropped ...
if len(dim_coords) == 0:
raise ValueError("Impossible to subset a non-dimensional time coordinate.")
# If dimension > 1, not clear how to collapse to 1D the boolean array
if len(dim_coords) != 1:
raise ValueError(
"Impossible to subset a non-dimensional time coordinate with dimension >=2.",
)
dim_coord = dim_coords[0]
# Subset by start_time
if start_time is not None:
isel_bool = xr_obj["time"] >= pd.Timestamp(start_time)
if not np.any(isel_bool):
raise ValueError(f"No timesteps to return with start_time {start_time}.")
isel_dict = {dim_coord: isel_bool}
xr_obj = xr_obj.isel(isel_dict)
# Subset by end_time
if end_time is not None:
isel_bool = xr_obj["time"] <= pd.Timestamp(end_time)
if not np.any(isel_bool):
raise ValueError(f"No timesteps to return with end_time {end_time}.")
isel_dict = {dim_coord: isel_bool}
xr_obj = xr_obj.isel(isel_dict)
return xr_obj
[docs]
def subset_by_time_slice(xr_obj, slice):
start_time = slice.start
end_time = slice.stop
return subset_by_time(xr_obj, start_time=start_time, end_time=end_time)
####--------------------------------------------------------------------------.
##################################
#### Time Dimension Sanitizer ####
##################################
[docs]
def is_nat(timesteps):
"""Return a boolean array indicating timesteps which are NaT."""
# pd.isnull(np.datetime64('NaT'))
# pd.isna(np.datetime64('NaT'))
return pd.isna(timesteps)
[docs]
def has_nat(timesteps):
"""Return True if any of the timesteps is NaT."""
return np.any(is_nat(timesteps))
[docs]
def interpolate_nat(timesteps, method="linear", limit=5, limit_direction=None, limit_area=None):
"""Fill NaT values using an interpolation method.
For further information refers to :py:class:`pandas.DataFrame.interpolate`.
Parameters
----------
method : str
Interpolation technique to use.
``'linear'``, the default, treat the timesteps as equally spaced.
limit : int, optional
Maximum number of consecutive NaTs to fill. Must be greater than 0.
limit_direction : str, optional
Valid values are ``'forward'``, ``'backward'`` and ``'both'``.
Consecutive NaTs will be filled in this direction.
limit_area : str, None
Valid values are ``None``, ``'inside'`` and ``'outside'``,
If limit is specified, consecutive NaTs will be filled with this restriction.
* ``None``: No fill restriction.
* 'inside': Only fill NaTs surrounded by valid values (interpolate).
* 'outside': Only fill NaTs outside valid values (extrapolate).
Notes
-----
Depending on the interpolation method (i.e. linear) the infilled values could have ns resolution.
For further information refers to :py:class:`pandas.DataFrame.interpolate`.
Returns
-------
timesteps: numpy.ndarray
Timesteps array of type datetime64[ns].
"""
if len(timesteps) == 0:
return timesteps
# Retrieve input dtype
timesteps_dtype = timesteps.dtype
# Convert timesteps to float
timesteps_num = timesteps.astype(float)
# Convert NaT to np.nan
timesteps_num[is_nat(timesteps)] = np.nan
# Create pd.Series
series = pd.Series(timesteps_num)
# Estimate NaT value
series = series.interpolate(
method=method,
limit=limit,
limit_direction=limit_direction,
limit_area=limit_area,
)
# Convert back to numpy array input dtype
return series.to_numpy().astype(timesteps_dtype)
[docs]
def infill_timesteps(timesteps, limit):
"""Infill missing timesteps if less than <limit> consecutive."""
# Interpolate if maximum <limit> timesteps are missing
timesteps = interpolate_nat(timesteps, method="linear", limit=limit, limit_area="inside")
# Check if there are still residual NaT
if np.any(is_nat(timesteps)):
if len(timesteps) <= 2:
error_message = "Not enough timesteps available to infill NaTs."
elif is_nat(timesteps[0]) or is_nat(timesteps[-1]):
error_message = "NaTs present at the beginning or at the end of the timesteps cannot be inferred."
else:
error_message = f"More than {limit} consecutive timesteps are NaT."
raise ValueError(error_message)
return timesteps
[docs]
def ensure_time_validity(xr_obj, limit=10):
"""Attempt to correct the time coordinate if less than 'limit' consecutive NaT values are present.
It raise a ValueError if more than consecutive NaT occurs.
Parameters
----------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
Returns
-------
xr_obj : xarray.DataArray or xarray.Dataset
GPM xarray object.
"""
attrs = xr_obj["time"].attrs
timesteps = xr_obj["time"].to_numpy()
timesteps = infill_timesteps(timesteps, limit=limit)
if "time" not in list(xr_obj.dims):
xr_obj["time"].data = timesteps
else:
xr_obj = xr_obj.assign_coords({"time": timesteps})
xr_obj["time"].attrs = attrs
return xr_obj
####--------------------------------------------------------------------------.
[docs]
def get_dataset_start_end_time(ds: xr.Dataset, time_dim="time"):
"""Retrieves dataset starting and ending time.
Parameters
----------
ds : xarray.Dataset
Input dataset
time_dim: str
Name of the time dimension.
The default is "time".
Returns
-------
tuple
(``starting_time``, ``ending_time``)
"""
starting_time = ds[time_dim].to_numpy()[0]
ending_time = ds[time_dim].to_numpy()[-1]
return (starting_time, ending_time)
def _define_fill_value(ds, fill_value):
fill_value = {}
for var in ds.data_vars:
if np.issubdtype(ds[var].dtype, np.floating):
fill_value[var] = dtypes.NA
elif np.issubdtype(ds[var].dtype, np.integer):
if "_FillValue" in ds[var].attrs:
fill_value[var] = ds[var].attrs["_FillValue"]
else:
fill_value[var] = np.iinfo(ds[var].dtype).max
return fill_value
def _check_time_sorted(ds, time_dim):
time_diff = np.diff(ds[time_dim].data.astype(int))
if np.any(time_diff == 0):
raise ValueError(f"In the {time_dim} dimension there are duplicated timesteps !")
if not np.all(time_diff > 0):
print(f"The {time_dim} dimension was not sorted. Sorting it now !")
ds = ds.sortby(time_dim)
return ds
[docs]
def regularize_dataset(
ds: xr.Dataset,
freq: str,
time_dim: str = "time",
method: Optional[str] = None,
fill_value=None,
):
"""Regularize a dataset across time dimension with uniform resolution.
Parameters
----------
ds : xarray.Dataset
xarray Dataset.
time_dim : str, optional
The time dimension in the xarray.Dataset. The default is ``"time"``.
freq : str
The ``freq`` string to pass to `pd.date_range()` to define the new time coordinates.
Examples: ``freq="2min"``.
method : str, optional
Method to use for filling missing timesteps.
If ``None``, fill with ``fill_value``. The default is ``None``.
For other possible methods, see xarray.Dataset.reindex()`.
fill_value : (float, dict), optional
Fill value to fill missing timesteps.
If not specified, for float variables it uses ``dtypes.NA`` while for
for integers variables it uses the maximum allowed integer value or,
in case of undecoded variables, the ``_FillValue`` DataArray attribute..
Returns
-------
ds_reindexed : xarray.Dataset
Regularized dataset.
"""
ds = _check_time_sorted(ds, time_dim=time_dim)
start_time, end_time = get_dataset_start_end_time(ds, time_dim=time_dim)
new_time_index = pd.date_range(
start=pd.to_datetime(start_time),
end=pd.to_datetime(end_time),
freq=freq,
)
# Define fill_value dictionary
if fill_value is None:
fill_value = _define_fill_value(ds, fill_value)
# Regularize dataset and fill with NA values
ds = ds.reindex(
{time_dim: new_time_index},
method=method, # do not fill gaps
# tolerance=tolerance, # mismatch in seconds
fill_value=fill_value,
)
return ds