# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module contains general utility to convert xarray objects to dataframes."""
import dask
import numpy as np
import pandas as pd
import polars as pl
from gpm.dataset.granule import remove_unused_var_dims
from gpm.utils.xarray import ensure_unique_chunking
[docs]
def get_df_object_columns(df):
"""Get the dataframe columns which have 'object' type."""
return list(df.select_dtypes(include=["object", "string"]).columns)
[docs]
def ensure_pyarrow_string_columns(df):
"""Convert 'object' type columns to pyarrow strings."""
for column in get_df_object_columns(df):
df[column] = df[column].astype("string[pyarrow]")
return df
[docs]
def drop_undesired_columns(df):
"""Drop undesired columns like dataset dimensions without coordinates."""
undesired_columns = ["cross_track", "along_track", "range", "beam", "pixel", "crsWGS84", "spatial_ref"]
undesired_columns = [column for column in undesired_columns if column in df.columns]
return df.drop(columns=undesired_columns)
[docs]
def to_pandas_dataframe(ds, drop_index=True):
"""Convert an xarray.Dataset to a :py:class:`pandas.DataFrame`."""
# Drop unrelevant coordinates
ds = remove_unused_var_dims(ds)
# Convert to pandas dataframe
# - strings are converted to object !
df = ds.to_dataframe(dim_order=None)
# Convert object columns to pyarrow string
df = ensure_pyarrow_string_columns(df)
# Remove MultiIndex
if drop_index:
df = df.reset_index(drop=True)
# Drop unrequired columns (previous dataset dimensions)
return drop_undesired_columns(df)
[docs]
def to_dask_dataframe(ds):
"""Convert an xarray.Dataset to a :py:class:`dask.dataframe.DataFrame`."""
# Drop unrelevant coordinates
ds = remove_unused_var_dims(ds)
# Check dataset uniform chunking
ds = ensure_unique_chunking(ds)
# Reset multindex if any (dask.dataframe does not support them)
multiindex_dims = [dim for dim, idx in ds.indexes.items() if isinstance(idx, pd.MultiIndex)]
ds = ds.reset_index(multiindex_dims)
# Convert to to dask dataframe
# - strings are converted to object !
with dask.config.set(**{"array.slicing.split_large_chunks": True}):
df = ds.to_dask_dataframe(dim_order=None, set_index=False)
# Convert object columns to pyarrow string
df = ensure_pyarrow_string_columns(df)
# Drop unrequired columns (previous dataset dimensions)
return drop_undesired_columns(df)
[docs]
def compute_2d_histogram(df, x, y, var=None, x_bins=10, y_bins=10, x_labels=None, y_labels=None, prefix_name=True):
"""Compute bivariate statistics.
Parameters
----------
df : pandas.DataFrame
Input dataframe
x : str
Column name for x-axis binning (will be rounded to integers)
y : str
Column name for y-axis binning
var : str, optional
Column name for which statistics will be computed.
If None, only counts are computed.
x_bins : int or array-like
Number of bins or bin edges for x
y_bins : int or array-like
Number of bins or bin edges for y
x_labels : array-like, optional
Labels for x bins. If None, uses bin centers
y_labels : array-like, optional
Labels for y bins. If None, uses bin centers
Returns
-------
xarray.Dataset
Dataset with dimensions corresponding to binned variables and
data variables for each statistic
"""
# If polars, cast to pandas
if isinstance(df, pl.DataFrame):
df = df.to_pandas()
# Copy data
df = df.copy()
# If no var specified, create dummy variable
var_specified = True
if var is None:
var = "dummy"
df["dummy"] = np.ones(df[x].shape)
var_specified = False
# Handle x-axis binning
if isinstance(x_bins, int):
x_bins = np.linspace(df[x].min(), df[x].max(), x_bins + 1)
# Handle y-axis binning
if isinstance(y_bins, int):
y_bins = np.linspace(df[y].min(), df[y].max(), y_bins + 1)
# Drop rows where any of the key columns have NaN
df = df.dropna(subset=[x, y, var])
if len(df) == 0:
raise ValueError("No valid data points after removing NaN values")
# Create binned columns with explicit handling of out-of-bounds values
df[f"{x}_binned"] = pd.cut(df[x], bins=x_bins, include_lowest=True)
df[f"{y}_binned"] = pd.cut(df[y], bins=y_bins, include_lowest=True)
# Create complete IntervalIndex for both dimensions
x_intervals = df[f"{x}_binned"].cat.categories
y_intervals = df[f"{y}_binned"].cat.categories
# Prepare prefix
prefix = f"{var}_" if prefix_name and var_specified else ""
# Define statistics to compute
if var_specified:
list_stats = [
("count", "count"),
(f"{prefix}median", "median"),
(f"{prefix}std", "std"),
(f"{prefix}min", "min"),
(f"{prefix}max", "max"),
]
else:
list_stats = [("count", "count")]
# Compute statistics
df_stats = df.groupby([f"{x}_binned", f"{y}_binned"])[var].agg(list_stats)
# Create MultiIndex with all possible combinations
full_index = pd.MultiIndex.from_product([x_intervals, y_intervals], names=[f"{x}_binned", f"{y}_binned"])
# Reindex to include all interval combinations
df_stats = df_stats.reindex(full_index)
# Determine coordinates
x_centers = x_intervals.mid
y_centers = y_intervals.mid
# Use provided labels if available
x_coords = x_labels if x_labels is not None else x_centers
y_coords = y_labels if y_labels is not None else y_centers
# Reset index and set new coordinates
df_stats = df_stats.reset_index()
df_stats[f"{x}"] = pd.Categorical(df_stats[f"{x}_binned"].map(dict(zip(x_intervals, x_coords, strict=False))))
df_stats[f"{y}"] = pd.Categorical(df_stats[f"{y}_binned"].map(dict(zip(y_intervals, y_coords, strict=False))))
# Set new MultiIndex with coordinates
df_stats = df_stats.set_index([f"{x}", f"{y}"])
df_stats = df_stats.drop(columns=[f"{x}_binned", f"{y}_binned"])
# Convert to dataset
ds = df_stats.to_xarray()
# Transpose arrays
ds = ds.transpose(y, x)
return ds