# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module implements Spatial Partitioning classes."""
import os
from functools import reduce, wraps
import numpy as np
import pandas as pd
from gpm.bucket.dataframe import (
check_valid_dataframe,
df_add_column,
df_get_column,
df_is_column_in,
df_select_valid_rows,
df_to_pandas,
)
from gpm.utils.geospatial import (
Extent,
_check_size,
check_extent,
get_continent_extent,
get_country_extent,
get_extent_around_point,
get_geographic_extent_around_point,
)
pd.options.mode.copy_on_write = True
# Future methods:
# to_spherically (geographic)
# to_geopandas [lat_bin, lon_bin, geometry]
def _apply_flatten_arrays(self, func, x, y, **kwargs):
if isinstance(x, np.ndarray) and isinstance(y, np.ndarray) and x.ndim == 2 and y.ndim == 2:
original_shape = x.shape
x_flat = x.flatten()
y_flat = y.flatten()
result = func(self, x_flat, y_flat, **kwargs)
if isinstance(result, tuple):
result = tuple(r.reshape(original_shape) for r in result)
else: # np.array
result = result.reshape(original_shape + result.shape[1:])
return result
return func(self, x, y, **kwargs)
[docs]
def flatten_xy_arrays(func):
@wraps(func)
def wrapper(self, x, y, **kwargs):
return _apply_flatten_arrays(self, func=func, x=x, y=y, **kwargs)
return wrapper
[docs]
def flatten_indices_arrays(func):
@wraps(func)
def wrapper(self, x_indices, y_indices, **kwargs):
return _apply_flatten_arrays(self, func=func, x=x_indices, y=y_indices, **kwargs)
return wrapper
[docs]
def np_broadcast_like(x, shape):
arr = np.zeros(shape, x.dtype)
arr[:] = np.expand_dims(x, axis=tuple(range(1, len(shape))))
return arr
[docs]
def mask_invalid_indices(flag_value=np.nan):
def decorator(func):
@wraps(func)
def wrapper(self, *args, **kwargs):
# Extract arguments
x_indices = kwargs.get("x_indices", args[0] if len(args) > 0 else None)
y_indices = kwargs.get("y_indices", args[1] if len(args) > 1 else None)
# Ensure is a 1D numpy array
x_indices = np.atleast_1d(np.asanyarray(x_indices))
y_indices = np.atleast_1d(np.asanyarray(y_indices))
# Determine invalid indices
invalid_indices = ~np.isfinite(x_indices) | ~np.isfinite(y_indices)
# Set dummy value for invalid indices
x_indices[invalid_indices] = 0 # dummy index
y_indices[invalid_indices] = 0 # dummy index
# Ensure indices are integers !
x_indices = x_indices.astype(int)
y_indices = y_indices.astype(int)
# Call the original function
result = func(self, x_indices, y_indices, **kwargs)
# Apply the mask to the result
if isinstance(result, tuple):
masked_result = tuple(np.where(invalid_indices, flag_value, r) for r in result)
else: # np.array
invalid_indices = np_broadcast_like(invalid_indices, result.shape)
masked_result = np.where(invalid_indices, flag_value, result)
return masked_result
return wrapper
return decorator
def _check_labels_decimals(decimals):
"""Check and normalize the size input.
This function accepts the number of labels decimals defined as an integer, float, tuple, or list.
It normalizes the input into a tuple of two elements, each representing the
desired number of decimals for the x and y partition labels.
Returns
-------
list
A list of two elements (x_decimals, y_decimals)
"""
if isinstance(decimals, (int, np.integer)):
decimals = list([decimals] * 2)
elif isinstance(decimals, (tuple, list)):
if len(decimals) != 2:
raise ValueError("Expecting a decimals (x, y) tuple.")
else:
raise TypeError("Accepted decimals type are int, list or tuple.")
if np.any(np.array(decimals) < 0):
raise ValueError("Expecting positive 'labels_decimals' values.")
return list(decimals)
[docs]
def check_default_levels(levels, default_levels):
if levels is None:
levels = default_levels
if isinstance(levels, str):
levels = [levels]
if not isinstance(levels, list):
raise TypeError("'levels' must be a list specifying the partition names.")
return levels
[docs]
def check_partitioning_order(levels, order):
if set(levels) != set(order):
raise ValueError(f"Partitions 'order' ({order}) does not match with partition names {levels}.")
return order
[docs]
def check_partitioning_flavor(flavor):
"""Validate the flavor argument.
If ``None``, defaults to "directory".
"""
if flavor is None:
flavor = "directory"
valid_flavors = ["directory", "hive"]
if flavor not in valid_flavors:
raise ValueError(f"Invalid partitioning 'flavor '{flavor}'. Valid options are {valid_flavors}.")
return flavor
[docs]
def check_valid_x_y(df, x, y):
"""Check if the x and y columns are in the dataframe."""
if not df_is_column_in(df, column=y):
raise ValueError(f"y='{y}' is not a column of the dataframe.")
if not df_is_column_in(df, column=x):
raise ValueError(f"x='{x}' is not a column of the dataframe.")
[docs]
def get_array_combinations(x, y):
"""Return all combinations between the two input arrays."""
# Create the mesh grid
grid1, grid2 = np.meshgrid(x, y)
# Stack and reshape the grid arrays to get combinations
combinations = np.vstack([grid1.ravel(), grid2.ravel()]).T
return combinations[:, 0], combinations[:, 1]
[docs]
def get_centroids_from_bounds(bounds):
"""Define partitions centroids from bounds."""
centroids = (bounds[:-1] + bounds[1:]) / 2
return centroids
[docs]
def query_indices(values, bounds):
"""Return the index for the specified coordinates.
Invalid values (NaN, None) or out of bounds values returns NaN.
"""
values = np.atleast_1d(np.asanyarray(values)).astype(float)
return pd.cut(values, bins=bounds, labels=False, include_lowest=True, right=True)
[docs]
def get_partition_dir_name(partition_name, partition_labels, flavor):
"""Return the directories name of a partition."""
if flavor == "hive":
return reduce(np.char.add, [partition_name, "=", partition_labels, os.sep])
return np.char.add(partition_labels, os.sep)
[docs]
def get_directories(dict_labels, order, flavor):
"""Return the directory trees of a partitioned dataset."""
list_dir_names = []
for partition in order:
dir_name = get_partition_dir_name(
partition_name=partition,
partition_labels=dict_labels[partition],
flavor=flavor,
)
list_dir_names.append(dir_name)
dir_trees = reduce(np.char.add, list_dir_names)
dir_trees = np.char.rstrip(dir_trees, os.sep)
return dir_trees
####-------------------------------------------------------------------------------------------------------------------.
##################################
#### XYPartitioning Utilities ####
##################################
[docs]
def get_n_decimals(number):
"""Get the number of decimals of a number."""
number_str = str(number)
decimal_index = number_str.find(".")
if decimal_index == -1:
return 0 # No decimal point found
# Count the number of characters after the decimal point
return len(number_str) - decimal_index - 1
[docs]
def get_bounds(size, vmin, vmax):
"""Define partitions edges."""
bounds = np.arange(vmin, vmax, size)
if bounds[-1] != vmax:
bounds = np.append(bounds, np.array([vmax]))
return bounds
####-----------------------------------------------------------------------------------------------------------------.
#### Tiles Utilities
[docs]
def justify_labels(labels, length):
return np.char.rjust(labels, length, "0")
[docs]
def get_tile_xy_labels(x_indices, y_indices, origin, n_x, n_y, justify=False):
"""Return the 2D tile labels for the specified x,y indices."""
x_labels = x_indices.astype(str)
y_labels = y_indices.astype(str) if origin == "top" else (n_y - 1 - y_indices).astype(str)
# Optional justify the labels
if justify:
x_labels = justify_labels(x_labels, length=len(str(n_x)))
y_labels = justify_labels(y_labels, length=len(str(n_y)))
return x_labels, y_labels
[docs]
def get_tile_id_labels(x_indices, y_indices, origin, direction, n_x, n_y, justify):
"""Return the 1D tile labels for the specified x,y indices."""
if direction == "x":
if origin == "top":
flattened_indices = np.ravel_multi_index((y_indices, x_indices), (n_y, n_x), order="C")
else: # origin == "bottom"
y_indices_flipped = n_y - 1 - y_indices
flattened_indices = np.ravel_multi_index((y_indices_flipped, x_indices), (n_y, n_x), order="C")
elif origin == "top":
flattened_indices = np.ravel_multi_index((y_indices, x_indices), (n_y, n_x), order="F")
else: # origin == "bottom"
y_indices_flipped = n_y - 1 - y_indices
flattened_indices = np.ravel_multi_index((y_indices_flipped, x_indices), (n_y, n_x), order="F")
# Conversion to string
labels = flattened_indices.astype(str)
# Optional justify the labels
if justify:
labels = justify_labels(labels, length=len(str(n_x * n_y)))
return labels
####-----------------------------------------------------------------------------------------------------------------.
#### Xarray reformatting utility
def _ensure_indices_list(indices):
if indices is None:
indices = []
indices = [indices] if isinstance(indices, str) else list(indices)
if indices == [None]: # what is returned by df.index.names if no index !
indices = []
return indices
####------------------------------------------------------------------------------------------------------------------.
#### 2D Partitioning Classes
[docs]
class Base2DPartitioning:
"""
Handles partitioning of 2D data into rectangular tiles.
The size of the partitions can varies between and across the x and y directions.
Parameters
----------
levels : str or list
Name or names of the partitions.
If partitioning by 1 level (i.e. by a unique partition id), specify a single partition name.
If partitioning by 2 or more levels (i.e. by x and y), specify the x, y (z, ...) partition levels names.
x_bounds : numpy.ndarray
The partition bounds across the x (horizontal) dimension.
y_bounds : numpy.ndarray
The partition bounds across the y (vertical) dimension.
Please provide the bounds with increasing values order.
The origin of the partition class indices is the top, left corner.
order : list
The order of the partitions when writing multi-level partitions (i.e. x, y) to disk.
The default, ``None``, corresponds to ``names``.
flavor : str
This argument governs the directories names of partitioned datasets.
The default, ``None``, name the directories with the partitions labels (DirectoryPartitioning).
The option ``"hive"``, name the directories with the format ``{partition_name}={partition_label}``.
"""
def __init__(self, x_bounds, y_bounds, levels, flavor=None, order=None):
self.x_bounds = np.asanyarray(x_bounds)
self.y_bounds = np.asanyarray(y_bounds)
self.x_centroids = get_centroids_from_bounds(self.x_bounds)
self.y_centroids = get_centroids_from_bounds(self.y_bounds)
# Define partitions names, order and flavour
self.levels = check_default_levels(levels=levels, default_levels=None)
if order is None:
self.order = self.levels
else:
self.order = check_partitioning_order(
levels=self.levels,
order=order,
)
self.flavor = check_partitioning_flavor(flavor)
# Define info
self.shape = (len(self.y_centroids), len(self.x_centroids))
self.n_partitions = self.shape[0] * self.shape[1]
self.n_levels = len(self.levels)
self.n_x = self.shape[1]
self.n_y = self.shape[0]
# Define private attrs
self._labels = None
self._centroids = None
self._x_coord = "x_c" # default name for x centroid column for add_centroids
self._y_coord = "y_c" # default name for y centroid column for add_centroids
[docs]
@flatten_xy_arrays
def query_indices(self, x, y):
"""Return the 2D partition indices for the specified x,y coordinates."""
x_indices = query_indices(x, bounds=self.x_bounds)
y_indices = query_indices(y, bounds=self.y_bounds)
return x_indices, y_indices
[docs]
@flatten_indices_arrays
@mask_invalid_indices(flag_value="nan")
def query_labels_by_indices(self, x_indices, y_indices):
"""Return the partition labels as function of the specified 2D partitions indices."""
return self._custom_labels_function(x_indices=x_indices, y_indices=y_indices)
def _custom_labels_function(self, x_indices, y_indices): # noqa
"""Return the partition labels for the specified x,y indices."""
class_name = self.__class__.name
raise NotImplementedError(f"'_custom_labels_function' has yet be implemented for subclass {class_name}!")
[docs]
@flatten_xy_arrays
def query_labels(self, x, y):
"""Return the partition labels for the specified x,y coordinates."""
x_indices, y_indices = self.query_indices(x=x, y=y)
return self.query_labels_by_indices(x_indices, y_indices)
[docs]
@flatten_indices_arrays
@mask_invalid_indices(flag_value=np.nan)
def query_centroids_by_indices(self, x_indices, y_indices):
"""Return the partition centroids for the specified x,y indices."""
x_centroids = self.x_centroids[x_indices]
y_centroids = self.y_centroids[y_indices]
return x_centroids, y_centroids
[docs]
@flatten_xy_arrays
def query_centroids(self, x, y):
"""Return the partition centroids for the specified x,y coordinates."""
x_indices, y_indices = self.query_indices(x=x, y=y)
return self.query_centroids_by_indices(x_indices, y_indices)
@property
def labels(self):
"""Return the labels array of shape (n_y, n_x, n_levels)."""
if self._labels is None:
# Retrieve labels combination of all (x,y) indices
x_indices, y_indices = np.meshgrid(np.arange(self.n_x), np.arange(self.n_y))
# Retrieve labels
# - If n_levels >= 2 --> query_labels_by_indices return a tuple !
labels = self.query_labels_by_indices(x_indices=x_indices, y_indices=y_indices)
if self.n_levels >= 2:
labels = np.stack(labels, axis=-1)
self._labels = labels
return self._labels
@property
def centroids(self):
"""Return the centroids array of shape (n_y, n_x, 2)."""
if self._centroids is None:
# Retrieve centroids of all (x,y) indices
x_indices, y_indices = np.meshgrid(np.arange(self.n_x), np.arange(self.n_y))
centroids = self.query_centroids_by_indices(x_indices, y_indices)
centroids = np.stack(centroids, axis=-1)
self._centroids = centroids
return self._centroids
@property
def bounds(self):
"""Return the partitions bounds."""
return self.x_bounds, self.y_bounds
[docs]
def quadmesh_corners(self, origin="bottom"):
"""Return the quadrilateral mesh corners.
A quadrilateral mesh is a grid of M by N adjacent quadrilaterals that are defined via a (M+1, N+1)
grid of vertices.
The quadrilateral mesh is accepted by :py:class:`matplotlib.pyplot.pcolormesh`,
:py:class:`matplotlib.collections.QuadMesh` and :py:class:`matplotlib.collections.PolyQuadMesh`.
Parameters
----------
origin: str
Origin of the y axis.
The default is ``bottom``.
Return
--------
(x_corners, y_corners): tuple
Numpy array of shape (M+1, N+1)
"""
x_corners, y_corners = np.meshgrid(self.x_bounds, self.y_bounds)
if origin == "bottom":
y_corners = y_corners[::-1, :]
return x_corners, y_corners
[docs]
def vertices(self, ccw=True, origin="bottom"):
"""Return the partitions vertices in an array of shape (N, M, 4, 2).
The output vertices, once the first 2 dimensions are flattened,
can be passed directly to a :py:class:`matplotlib.collections.PolyCollection`.
For plotting with cartopy, the polygon order must be counterclockwise ordered.
Parameters
----------
ccw : bool, optional
If ``True``, vertices are ordered counterclockwise.
If ``False``, vertices are ordered clockwise.
The default is ``True``.
origin : str
Origin of the y axis.
The default is ``bottom``.
"""
from gpm.utils.area import get_quadmesh_from_corners
x_corners, y_corners = self.quadmesh_corners(origin=origin)
vertices = get_quadmesh_from_corners(x_corners, y_corners, ccw=ccw, origin=origin)
return vertices
[docs]
def to_shapely(self):
"""Return an array with shapely polygons."""
import shapely
return shapely.polygons(self.vertices(ccw=True))
[docs]
@flatten_indices_arrays
@mask_invalid_indices(flag_value=np.nan)
def query_vertices_by_indices(self, x_indices, y_indices, ccw=True):
"""Return the partitions vertices in an array of shape (indices, 4, 2)."""
x_indices = np.atleast_1d(np.asanyarray(x_indices))
y_indices = np.atleast_1d(np.asanyarray(y_indices))
x_bnds = (self.x_bounds[x_indices], self.x_bounds[x_indices + 1])
y_bnds = (self.y_bounds[y_indices], self.y_bounds[y_indices + 1])
top_left = np.stack((x_bnds[0], y_bnds[1]), axis=1)
top_right = np.stack((x_bnds[1], y_bnds[1]), axis=1)
bottom_right = np.stack((x_bnds[1], y_bnds[0]), axis=1)
bottom_left = np.stack((x_bnds[0], y_bnds[0]), axis=1)
if ccw:
list_vertices = [top_left, bottom_left, bottom_right, top_right]
else:
list_vertices = [top_left, top_right, bottom_right, bottom_left]
vertices = np.stack(list_vertices, axis=1)
return vertices
[docs]
@flatten_xy_arrays
def query_vertices(self, x, y, ccw=True):
x_indices, y_indices = self.query_indices(x, y)
return self.query_vertices_by_indices(x_indices, y_indices, ccw=ccw)
def _get_dict_labels_combo(self, x_indices, y_indices):
# Retrieve labels combination of all (x,y) indices
indices = get_array_combinations(x_indices, y_indices)
# Retrieve corresponding labels
# If n_levels >= 2 --> self.labels is a tuple
# If n_levels == 1 --> self.labels is a 1D array
labels = self.query_labels_by_indices(x_indices=indices[0], y_indices=indices[1])
dict_labels = {}
if self.n_levels > 1:
dict_labels = {self.levels[i]: labels[i] for i in range(0, self.n_levels)}
else: # (tile_id)
dict_labels = {self.levels[0]: labels}
return dict_labels
def _directories(self, dict_labels):
return get_directories(
dict_labels=dict_labels,
order=self.order,
flavor=self.flavor,
)
@property
def directories(self):
"""Return the directory trees."""
dict_labels = self._get_dict_labels_combo(x_indices=np.arange(0, self.n_x), y_indices=np.arange(0, self.n_y))
return self._directories(dict_labels=dict_labels)
[docs]
def get_partitions_by_extent(self, extent):
"""Return the partitions labels containing data within the extent."""
extent = check_extent(extent)
# Define valid query extent (to be aligned with partitioning extent)
query_extent = [
max(extent.xmin, self.extent.xmin),
min(extent.xmax, self.extent.xmax),
max(extent.ymin, self.extent.ymin),
min(extent.ymax, self.extent.ymax),
]
query_extent = Extent(*query_extent)
# Retrieve centroids
(xmin, xmax), (ymin, ymax) = self.query_centroids(
x=[query_extent.xmin, query_extent.xmax],
y=[query_extent.ymin, query_extent.ymax],
)
# Retrieve univariate x and y labels within the extent
x_indices = np.where(np.logical_and(self.x_centroids >= xmin, self.x_centroids <= xmax))[0]
y_indices = np.where(np.logical_and(self.y_centroids >= ymin, self.y_centroids <= ymax))[0]
# Retrieve labels corresponding to the combination of all (x,y) indices
return self._get_dict_labels_combo(x_indices, y_indices)
[docs]
def get_partitions_around_point(self, x, y, distance=None, size=None):
"""Return the partition labels with data within the distance/size from a point."""
extent = get_extent_around_point(x, y, distance=distance, size=size)
return self.get_partitions_by_extent(extent=extent)
[docs]
def directories_by_extent(self, extent):
"""Return the directory trees with data within the specified extent."""
dict_labels = self.get_partitions_by_extent(extent=extent)
return self._directories(dict_labels=dict_labels)
[docs]
def directories_around_point(self, x, y, distance=None, size=None):
"""Return the directory trees with data within the specified distance from a point."""
dict_labels = self.get_partitions_around_point(x=x, y=y, distance=distance, size=size)
return self._directories(dict_labels=dict_labels)
[docs]
def add_labels(self, df, x, y, remove_invalid_rows=True):
"""Add partitions labels to the dataframe.
Parameters
----------
df : pandas.DataFrame, dask.dataframe.DataFrame, polars.DataFrame, pyarrow.Table or polars.LazyFrame
Dataframe to which add partitions centroids.
x : str
Column name with the x coordinate.
y : str
Column name with the y coordinate.
remove_invalid_rows: bool, optional
Whether to remove dataframe rows for which coordinates are invalid or out of the partitioning extent.
The default is ``True``.
Returns
-------
df : pandas.DataFrame, dask.dataframe.DataFrame, polars.DataFrame, pyarrow.Table or polars.LazyFrame
Dataframe with the partitions label(s) column(s).
"""
check_valid_dataframe(df)
check_valid_x_y(df, x=x, y=y)
x_arr = df_get_column(df, column=x)
y_arr = df_get_column(df, column=y)
# Retrieve labels
# - If n_level = 1: array
# - If n_level = 2: tuple
labels = self.query_labels(x_arr, y_arr)
if self.n_levels == 1:
labels = [labels]
# Add labels to dataframe
for partition, values in zip(self.levels, labels):
df = df_add_column(df=df, column=partition, values=values)
# Check if invalid labels
invalid_rows = labels[0] == "nan"
invalid_rows_indices = np.where(invalid_rows)[0]
if invalid_rows_indices.size > 0:
if not remove_invalid_rows:
raise ValueError(f"Invalid labels at rows: {invalid_rows_indices.tolist()}")
# Remove invalid labels if remove_invalid_rows=True
df = df_select_valid_rows(df, valid_rows=~invalid_rows)
return df
[docs]
def add_centroids(self, df, x, y, x_coord=None, y_coord=None, remove_invalid_rows=True):
"""Add partitions centroids to the dataframe.
Parameters
----------
df : pandas.DataFrame, dask.dataframe.DataFrame, polars.DataFrame, pyarrow.Table or polars.LazyFrame
Dataframe to which add partitions centroids.
x : str
Column name with the x coordinate.
y : str
Column name with the y coordinate..
x_coord : str, optional
Name of the new column with the centroids x coordinates.
The default is "x_c".
y_coord : str, optional
Name of the new column with the centroids y coordinates.
The default is "y_c".
remove_invalid_rows: bool, optional
Whether to remove dataframe rows for which coordinates are invalid or out of the partitioning extent.
The default is ``True``.
Returns
-------
df : pandas.DataFrame, dask.dataframe.DataFrame, polars.DataFrame, pyarrow.Table or polars.LazyFrame
Dataframe with the partitions centroids x and y coordinates columns.
"""
# Check inputs and retrieve default values
check_valid_dataframe(df)
check_valid_x_y(df, x=x, y=y)
if x_coord is None:
x_coord = self._x_coord
if y_coord is None:
y_coord = self._y_coord
# Retrieve x and y coordinates arrays
x_arr = df_get_column(df, column=x)
y_arr = df_get_column(df, column=y)
# Retrieve centroids tuple (x, y)
x_centroids, y_centroids = self.query_centroids(x_arr, y_arr)
# Add centroids to dataframe
df = df_add_column(df=df, column=x_coord, values=x_centroids)
df = df_add_column(df=df, column=y_coord, values=y_centroids)
# Check if invalid labels
invalid_rows = np.isnan(x_centroids)
invalid_rows_indices = np.where(invalid_rows)[0]
if invalid_rows_indices.size > 0:
if not remove_invalid_rows:
raise ValueError(f"Invalid centroids at rows: {invalid_rows_indices.tolist()}")
# Remove invalid labels if remove_invalid_rows=True
df = df_select_valid_rows(df, valid_rows=~invalid_rows)
return df
[docs]
def to_xarray(self, df, spatial_coords=None, aux_coords=None):
"""Convert dataframe to spatial xarray Dataset based on partitions centroids.
This routine assumes that you have grouped and aggregated the dataframe over
the partition labels or the partition centroids!
Please add the partition centroids to the dataframe with ``add_centroids`` before calling this method.
Please specify the partition centroids x and y columns in the ``spatial_coords`` argument.
Please also specify the presence of auxiliary coordinates (indices) with ``aux_coords``.
The array cells with coordinates not included in the dataframe will have NaN values.
"""
# Check inputs
check_valid_dataframe(df)
spatial_coords = _ensure_indices_list(spatial_coords) # [] if None
aux_coords = _ensure_indices_list(aux_coords) # [] if None
# Ensure dataframe is pandas
df = df_to_pandas(df)
# Reset dataframe indices if present
src_indices = df.index.names # no index returns None
src_indices = _ensure_indices_list(src_indices)
if src_indices:
df = df.reset_index()
# Check aux_coords are in df (index or column)
# - If aux_coords were already in the DataFrame index, no need to specify it.
if aux_coords:
for coord in aux_coords:
if coord not in df.columns:
raise ValueError(f"Auxiliary coordinate '{coord}' not found in DataFrame columns or index.")
# Check spatial coords are in df (if specified)
if spatial_coords:
for coord in spatial_coords:
if coord not in df.columns and coord not in src_indices:
raise ValueError(f"Spatial coordinate '{coord}' not found in DataFrame columns or index.")
else: # tentative guess and raise error if not present
spatial_coords = [self._x_coord, self._y_coord]
if self._x_coord not in df.columns or self._y_coord not in df.columns:
raise ValueError(
"Partitiong centroids not found in the dataframe. Please add partitions centroids "
"using the 'add_centroids' method and specify the columns in the 'spatial_coords' "
"argument of 'to_xarray'.",
)
# Finalize auxiliary coords
possible_coords = np.unique([*spatial_coords, *aux_coords, *src_indices]).tolist()
possible_aux_coords = set(possible_coords).symmetric_difference(set(spatial_coords))
aux_coords = possible_aux_coords.difference(set(self.levels)) # exclude also partition names
coords = list(spatial_coords) + list(aux_coords)
# Ensure valid coordinates types
# - Ensure indices are int, float or str (no categorical)
# - Ensure spatial indices are float
df = _ensure_valid_coordinates_dtype(df, spatial_coords=spatial_coords, aux_coords=aux_coords)
# Set coordinates as MultiIndex
df = df.set_index(coords)
# Define dictionary of current indices
dict_indices = {coord: df.index.get_level_values(coord).unique().to_numpy() for coord in coords}
# Update dictionary with the full x and centroids
dict_indices[spatial_coords[0]] = self.x_centroids
dict_indices[spatial_coords[1]] = self.y_centroids
# Create an empty DataFrame with the MultiIndex
multi_index = pd.MultiIndex.from_product(
dict_indices.values(),
names=dict_indices.keys(),
)
empty_df = pd.DataFrame(index=multi_index)
# Create final dataframe
df_full = empty_df.join(df, how="left")
# Reshape to xarray
ds = df_full.to_xarray()
return ds
def _ensure_valid_coordinates_dtype(df, spatial_coords, aux_coords):
for column in spatial_coords:
df[column] = df[column].astype(float)
for column in aux_coords:
if df.dtypes[column].name == "category":
df[column] = df[column].astype(str)
return df
[docs]
class XYPartitioning(Base2DPartitioning):
"""
Handles partitioning of data into x and y regularly spaced bins.
Parameters
----------
size : int, float, tuple, list
The size value(s) of the bins.
The function interprets the input as follows:
- int or float: The same size is enforced in both x and y directions.
- tuple or list: The bin size for the x and y directions.
extent : list
The extent for the partitioning specified as ``[xmin, xmax, ymin, ymax]``.
levels: list, optional
Names of the x and y partitions.
The default is ``["xbin", "ybin"]``.
order : list, optional
The order of the x and y partitions when writing partitioned datasets.
The default, ``None``, corresponds to ``levels``.
flavor : str, optional
This argument governs the directories names of partitioned datasets.
The default, ``None``, name the directories with the partitions labels (DirectoryPartitioning).
The option ``"hive"``, name the directories with the format ``{partition_name}={partition_label}``.
"""
def __init__(
self,
size,
extent,
levels=None,
order=None,
flavor=None,
labels_decimals=None,
):
# Check and set extent
self.extent = check_extent(extent)
# Check and set partitions size (except maybe last one)
self.size = _check_size(size)
# Set partition names
self.levels = check_default_levels(levels=levels, default_levels=["xbin", "ybin"])
# Calculate partitions bounds
x_bounds = get_bounds(size=self.size[0], vmin=self.extent.xmin, vmax=self.extent.xmax)
y_bounds = get_bounds(size=self.size[1], vmin=self.extent.ymin, vmax=self.extent.ymax)
# Define options for labels
if labels_decimals is None:
labels_decimals = get_n_decimals(self.size[0]) + 1, get_n_decimals(self.size[1]) + 1
self._labels_decimals = _check_labels_decimals(labels_decimals)
# Initialize private attributes for labels
self._xlabels = None
self._ylabels = None
# Initialize class
super().__init__(
levels=self.levels,
x_bounds=x_bounds,
y_bounds=y_bounds,
order=order,
flavor=flavor,
)
# -----------------------------------------------------------------------------------.
def _custom_labels_function(self, x_indices, y_indices):
"""Return the partition labels as function of the specified 2D partitions indices."""
x_labels_value = self.x_centroids[x_indices].round(self._labels_decimals[0])
y_labels_value = self.y_centroids[y_indices].round(self._labels_decimals[1])
if self._labels_decimals[0] == 0:
x_labels_value = x_labels_value.astype(int)
if self._labels_decimals[1] == 0:
y_labels_value = y_labels_value.astype(int)
x_labels = x_labels_value.astype(str)
y_labels = y_labels_value.astype(str)
return x_labels, y_labels
[docs]
def to_dict(self):
"""Return the partitioning settings."""
dictionary = {
"partitioning_class": self.__class__.__name__,
"extent": list(self.extent),
"size": list(self.size),
"levels": self.levels,
"order": self.order,
"flavor": self.flavor,
"labels_decimals": list(self._labels_decimals),
}
return dictionary
@property
def x_labels(self):
"""Return the partition labels across the horizontal dimension."""
if self._xlabels is None:
x_labels, _ = self.query_labels_by_indices(
x_indices=np.arange(0, self.n_x),
y_indices=np.zeros(self.n_x),
)
self._xlabels = x_labels
return self._xlabels
@property
def y_labels(self):
"""Return the partition labels across the vertical dimension."""
if self._ylabels is None:
_, y_labels = self.query_labels_by_indices(
x_indices=np.zeros(self.n_y),
y_indices=np.arange(0, self.n_y),
)
self._ylabels = y_labels
return self._ylabels
[docs]
class TilePartitioning(Base2DPartitioning):
"""
Handles partitioning of data into tiles.
Parameters
----------
size : int, float, tuple, list
The size value(s) of the bins.
The function interprets the input as follows:
- int or float: The same size is enforced in both x and y directions.
- tuple or list: The bin size for the x and y directions.
extent : list
The extent for the partitioning specified as ``[xmin, xmax, ymin, ymax]``.
n_levels: int
The number of tile partitioning levels.
If ``n_levels=2``, a (x,y) label is assigned to each tile.
If ``n_levels=1``, a unique id label is assigned to each tile combining the x and y tile indices.
The ``origin`` and ``direction`` parameters governs its value.
levels: list, optional
If ``n_levels>=2``, the first two names must correspond to the x and y partitions.
The first two levels must
The default with ``n_levels=1`` is ``["tile"]``.
The default with ``n_levels=2`` is ``["x", "y"]``.
origin: str, optional
The origin of the Y axis. Either ``"bottom"`` or ``"top"``.
TMS tiles assumes ``origin="top"``.
Google Maps tiles assumes ``origin="bottom"``.
The default is ``"bottom"``.
direction: str, optional
The direction to follow to define tile ids if ``levels=1`` is specified.
Valid direction values are "x" and "y".
``direction=x`` numbers the tiles rows by rows.
``direction=y`` numbers the tiles columns by columns.
justify: bool, optional
Whether to justify the labels to ensure having all same number of characters.
0 is added on the left side of the labels to justify the length.
THe default is ``False``.
order : list, optional
The order of the partitions when writing partitioned datasets.
The default, ``None``, corresponds to ``levels``.
flavor : str, optional
This argument governs the directories names of partitioned datasets.
The default, ``None``, name the directories with the partitions labels (DirectoryPartitioning).
The option ``"hive"``, name the directories with the format ``{partition_name}={partition_label}``.
"""
def __init__(
self,
size,
extent,
n_levels,
levels=None,
origin="bottom",
direction="x",
justify=False,
flavor=None,
order=None,
):
# Check levels
if n_levels not in [1, 2]:
raise ValueError("Invalid value for 'levels'. Must be 1 or 2.")
default_levels_dict = {1: "tile", 2: ["x", "y"]}
levels = check_default_levels(levels=levels, default_levels=default_levels_dict[n_levels])
if len(levels) != n_levels:
raise ValueError(f"{n_levels} n_levels specified, but {len(levels)} partitions names specified.")
# Check and set extent
self.extent = check_extent(extent)
# Check and set partitions size (except maybe last one)
self.size = _check_size(size)
# Calculate partitions bounds
x_bounds = get_bounds(size=self.size[0], vmin=self.extent.xmin, vmax=self.extent.xmax)
y_bounds = get_bounds(size=self.size[1], vmin=self.extent.ymin, vmax=self.extent.ymax)
# Define tiling options
if direction not in ["x", "y"]:
raise ValueError("Invalid value for 'direction'. Must be 'x' or 'y'.")
if origin not in ["top", "bottom"]:
raise ValueError("Invalid value for 'origin'. Must be 'top' or 'bottom'.")
self.direction = direction
self.origin = origin
self.justify = justify
# Initialize class
super().__init__(
levels=levels,
x_bounds=x_bounds,
y_bounds=y_bounds,
order=order,
flavor=flavor,
)
# -----------------------------------------------------------------------------------.
def _custom_labels_function(self, x_indices, y_indices):
"""Return the partition labels for the specified x,y indices based on the direction, origin, and levels."""
if self.n_levels == 2:
return get_tile_xy_labels(
x_indices,
y_indices,
origin=self.origin,
n_x=self.n_x,
n_y=self.n_y,
justify=self.justify,
)
# n_levels == 1
return get_tile_id_labels(
x_indices,
y_indices,
origin=self.origin,
direction=self.direction,
n_x=self.n_x,
n_y=self.n_y,
justify=self.justify,
)
[docs]
def to_dict(self):
"""Return the partitioning settings."""
dictionary = {
"partitioning_class": self.__class__.__name__,
"extent": list(self.extent),
"size": list(self.size),
"n_levels": self.n_levels,
"levels": self.levels,
"origin": self.origin,
"direction": self.direction,
"justify": self.justify,
"order": self.order,
"flavor": self.flavor,
}
return dictionary
[docs]
class LonLatPartitioning(XYPartitioning):
"""Handles geographic partitioning of data based on longitude and latitude bin sizes within a defined extent.
The last bin size (in lon and lat direction) might not be of size ``size`` !
Parameters
----------
size : float
The uniform size for longitude and latitude binning.
Carefully consider the size of the partitions.
Earth partitioning by:
- 1° degree corresponds to 64800 directories (360*180)
- 5° degree corresponds to 2592 directories (72*36)
- 10° degree corresponds to 648 directories (36*18)
- 15° degree corresponds to 288 directories (24*12)
levels: list, optional
Names of the longitude and latitude partitions.
The default is ``["lon_bin", "lat_bin"]``.
extent : list, optional
The geographical extent for the partitioning specified as ``[xmin, xmax, ymin, ymax]``.
Default is the whole Earth: ``[-180, 180, -90, 90]``.
order : list, optional
The order of the partitions when writing partitioned datasets.
The default, ``None``, corresponds to ``levels``.
flavor : str, optional
This argument governs the directories names of partitioned datasets.
The default, `"hive"``, names the directories with the format ``{partition_name}={partition_label}``.
If ``None``, names the directories with the partitions labels (DirectoryPartitioning).
Inherits:
----------
XYPartitioning
"""
def __init__(
self,
size,
extent=[-180, 180, -90, 90],
levels=None,
flavor="hive",
order=None,
labels_decimals=None,
):
levels = check_default_levels(levels=levels, default_levels=["lon_bin", "lat_bin"])
super().__init__(
size=size,
extent=extent,
levels=levels,
order=order,
flavor=flavor,
labels_decimals=labels_decimals,
)
self._x_coord = "lon_c" # default name for x centroid column for add_centroids
self._y_coord = "lat_c" # default name for y centroid column for add_centroids
[docs]
def get_partitions_around_point(self, lon, lat, distance=None, size=None):
"""Return the partition labels with data within the distance/size from a point."""
extent = get_geographic_extent_around_point(
lon=lon,
lat=lat,
distance=distance,
size=size,
)
return self.get_partitions_by_extent(extent=extent)
[docs]
def get_partitions_by_country(self, name, padding=None):
"""Return the partition labels enclosing the specified country."""
extent = get_country_extent(name=name, padding=padding)
return self.get_partitions_by_extent(extent=extent)
[docs]
def get_partitions_by_continent(self, name, padding=None):
"""Return the partition labels enclosing the specified continent."""
extent = get_continent_extent(name=name, padding=padding)
return self.get_partitions_by_extent(extent=extent)
[docs]
def directories_by_country(self, name, padding=None):
"""Return the directory trees with data within a country."""
dict_labels = self.get_partitions_by_country(name=name, padding=padding)
return self._directories(dict_labels=dict_labels)
[docs]
def directories_by_continent(self, name, padding=None):
"""Return the directory trees with data within a continent."""
dict_labels = self.get_partitions_by_continent(name=name, padding=padding)
return self._directories(dict_labels=dict_labels)
[docs]
def directories_around_point(self, lon, lat, distance=None, size=None):
"""Return the directory trees with data within the distance/size from a point."""
dict_labels = self.get_partitions_around_point(lon=lon, lat=lat, distance=distance, size=size)
return self._directories(dict_labels=dict_labels)