# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module contains functions to analysis bucket archives."""
import datetime
import numpy as np
import pandas as pd
import polars as pl
import pyproj
from gpm.dataset.crs import set_dataset_crs
from gpm.utils.xarray import xr_drop_constant_dimension, xr_first
[docs]
def get_list_overpass_time(timesteps, interval=None):
"""Return a list with (start_time, end_time) of the overpasses.
This function is typically called on a regional subset of a bucket archive.
"""
# Check interval
if interval is None:
interval = np.array(60, dtype="m8[m]")
# Check timesteps
# - Ensure numpy.datetime64
timesteps = np.unique(timesteps)
timesteps = np.sort(timesteps)
# Deal with edge cases
if len(timesteps) == 0:
raise ValueError("No timesteps available.")
if len(timesteps) == 1:
return [(timesteps[0], timesteps[0])]
# Compute time difference
time_diff = np.diff(timesteps)
# Identify indices where the gap exceeds the allowed interval
gap_indices = np.where(time_diff > interval)[0]
# Determine the start indices for each group (the first element of the group)
start_indices = np.concatenate(([0], gap_indices + 1))
# Determine the end indices for each group (the last element of the group)
end_indices = np.concatenate((gap_indices, [timesteps.size - 1]))
# Define list time periods
list_time_periods = [
(timesteps[start], timesteps[end]) for start, end in zip(start_indices, end_indices, strict=False)
]
# # Initialize
# list_time_periods = []
# current_start_time = timesteps[0]
# for i, dt in enumerate(time_diff):
# if i == 0:
# continue
# if dt > interval:
# end_time = timesteps[i]
# time_period = (current_start_time, end_time)
# list_time_periods.append(time_period)
# # Update
# current_start_time = timesteps[i + 1]
# # Add the final group
# list_time_periods.append((current_start_time, timesteps[-1]))
return list_time_periods
[docs]
def split_by_overpass(df, interval=None, max_overpass=np.inf):
"""Split dataframe by overpass."""
list_time_periods = get_list_overpass_time(timesteps=df["time"], interval=interval)
list_time_periods = list_time_periods[: min(len(list_time_periods), max_overpass)]
if isinstance(df, pl.DataFrame):
list_df = [
df.filter(pl.col("time").is_between(start_time, end_time)) for start_time, end_time in list_time_periods
]
else:
list_df = [
df[np.logical_and(df["time"] >= start_time, df["time"] <= end_time)]
for start_time, end_time in list_time_periods
]
return list_df
[docs]
def get_swath_indices(df):
"""Retrieve orbit dimension indices.
Assumes only two granule id might be present.
Assumes that if two granule id are present, the max along_track_id of the
first granule is the real maximum.
"""
# Split gpm_id into granule_id and along_track_id (make sure they are integers)
df_along = df["gpm_id"].str.split("-", expand=True).astype(int)
df_along.columns = ["granule_id", "along_track_id"]
# We will assign new x indices so that each granule's along-track block is contiguous.
x_index_list = []
# Allocate an array to hold the new x value for each row in df.
x_values = np.empty(len(df), dtype=int)
offset = 0
# Process granules in sorted order (you may change this ordering if needed)
for granule in sorted(df_along["granule_id"].unique()):
# Create a boolean mask for rows corresponding to this granule
mask = df_along["granule_id"] == granule
# Get the along-track ids for this granule
along_vals = df_along.loc[mask, "along_track_id"]
# Determine the minimum and maximum along-track id in this granule.
min_track = along_vals.min()
max_track = along_vals.max()
# The number of along-track positions in this granule
n_tracks = max_track - min_track + 1
# Build the new x indices for this granule:
# They will run from "offset" to "offset+n_tracks-1"
new_x_indices = np.arange(offset, offset + n_tracks)
# Append these new indices to our full list of x indices
x_index_list.extend(new_x_indices)
# Now assign a new x coordinate for every row in this granule.
# The formula subtracts the minimum (to start at 0 for that granule)
# and then adds the current offset.
x_values[mask] = along_vals - min_track + offset
# Update the offset for the next granule.
offset += n_tracks
# The complete set of x indices is just all integers from 0 to offset-1.
x_index = np.arange(offset)
# Retrieve y_index and y_values from cross-track IDs
y_min = df["gpm_cross_track_id"].min()
y_max = df["gpm_cross_track_id"].max()
y_index = np.arange(y_min, y_max + 1)
y_values = df["gpm_cross_track_id"]
return (x_index, x_values), (y_index, y_values)
[docs]
def overpass_to_dataset(df_overpass):
"""Reshape an overpass dataframe to a xarray.Dataset.
The resulting dataset will have missing geolocation for footprints
that are not included in the df_overpass.
"""
if isinstance(df_overpass, pl.DataFrame):
df_overpass = df_overpass.to_pandas()
# Retrieve dimension indices
(x_index, x_values), (y_index, y_values) = get_swath_indices(df_overpass)
df_overpass["x_index"] = x_values
df_overpass["y_index"] = y_values
# Set index
df_overpass = df_overpass.set_index(["x_index", "y_index"])
# Create MultiIndex with all possible combinations
full_index = pd.MultiIndex.from_product([x_index, y_index], names=["x_index", "y_index"])
# Reindex to include all interval combinations
# --> Add nan to rows with inexisitng index combo
df_swath = df_overpass.reindex(full_index)
# Convert to dataset
ds_swath = df_swath.to_xarray()
ds_swath = ds_swath.rename_dims({"x_index": "along_track", "y_index": "cross_track"})
ds_swath["time"] = xr_first(ds_swath["time"], dim="cross_track")
ds_swath["gpm_id"] = xr_first(ds_swath["gpm_id"], dim="cross_track")
ds_swath["gpm_cross_track_id"] = xr_first(ds_swath["gpm_cross_track_id"], dim="along_track")
# Define set coordinates
candidate_coords = ["lon", "lat", "time", "gpm_id", "gpm_along_track_id", "gpm_cross_track_id"]
available_coords = [coord for coord in candidate_coords if coord in ds_swath]
# Drop dimension for coordinates that are all equals along a dimension
# - If all values are nan, drop dimension
ds_swath[available_coords] = xr_drop_constant_dimension(ds_swath[available_coords])
# Set coordinates
ds_swath = ds_swath.set_coords(available_coords)
# Drop dummy index
ds_swath = ds_swath.drop_vars(["x_index", "y_index"])
# Reorder dimensions
ds_swath = ds_swath.transpose("cross_track", "along_track", ...)
# Add CRS
ds_swath = set_dataset_crs(ds_swath, crs=pyproj.CRS.from_epsg(4326))
return ds_swath
[docs]
def add_overpass_id(df, interval=None, time="time"):
if interval is None:
interval = pd.Timedelta(minutes=2)
df = df.sort_values(by="time") # Sort by time
# TODO: drop column with missing time
# Initialize
group_labels = []
current_group = 0
group_labels.append(current_group) # first timestep
# Compute time difference
time_diff = df[time].diff().to_numpy()
# Assign group numbers based on the time intervals
for dt in time_diff[1:]:
if dt <= interval: # if same overpass
group_labels.append(current_group)
else:
current_group += 1
group_labels.append(current_group)
df["overpass_id"] = group_labels
return df
[docs]
def count_overpass_occurence(df, interval=None, time="time"):
df = add_overpass_id(df, interval=interval, time=time)
count_overpass_beams = df.groupby("overpass_id")[df.columns[0]].count()
count_overpass_beams.name = "count_overpass_occurence"
df = df.join(count_overpass_beams, on="overpass_id")
return df
[docs]
def ensure_start_end_time_interval(start_time, end_time, interval=None):
from gpm.io.checks import check_time
# Convert np.datetime64 to datetime if needed
start_time = check_time(start_time)
end_time = check_time(end_time)
if interval is None:
return start_time, end_time
# Ensure interval is of type datetime.timedelta
if not isinstance(interval, datetime.timedelta):
raise ValueError("Interval must be of type datetime.timedelta")
# Calculate the current time difference
time_difference = end_time - start_time
# If the time difference is less than the desired interval, modify the times
if time_difference < interval:
start_time = start_time - interval / 2
end_time = end_time + interval / 2
return start_time, end_time