# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module contains tools for zonal statistics computations."""
import numpy as np
import shapely
from shapely.strtree import STRtree
[docs]
def split_dict(dictionary, npartitions):
"""
Splits a dictionary into n_parts of approximately equal size.
Parameters
----------
input_dict : str
The dictionary to split..
npartitions : int
The number of parts to split the dictionary into..
Returns
-------
list_dicts : list
A list of dictionaries.
"""
# Convert dictionary items to a NumPy array
keys = np.array(list(dictionary.keys()))
# Calculate indices to split the array into n_parts
split_keys = np.array_split(np.arange(len(keys)), npartitions)
# Use the indices to create subarrays
list_dicts = [{k: dictionary[k] for k in keys} for keys in split_keys]
return list_dicts
[docs]
def create_target_dictionary(target_indices, values):
list_values = np.split(values, np.unique(target_indices, return_index=True)[1])[1:]
target_indices_ids = np.unique(target_indices)
# Create dict_indices {target: values}
dict_indices = dict(zip(target_indices_ids, list_values))
return dict_indices
[docs]
def create_list_indices(source_polygons, target_polygons):
# Retrieve target-source intersection indices
# - Target/Sources Indices are not returned for empty and non-intersecting polygons !
# - Intersects also include touching ! Intersection polygons becomes LineString !
str_tree = STRtree(source_polygons)
target_indices, source_indices = str_tree.query(target_polygons, predicate="intersects")
# Check there are some geometries intersecting
if target_indices.size == 0:
raise ValueError("No intersecting geometries.")
# Create list indices
# - first row: target indices
# - second row: source indices
list_indices = np.vstack((target_indices, source_indices))
return list_indices
[docs]
class InverseDistanceWeights:
"""
Class for calculating weights for Inverse Distance Weighting.
Parameters
----------
order : int, float or numpy.ndarray
The order(s) of the inverse distance weighting.
"""
def __init__(self, order=1):
if np.any(order < 1) or not np.all(np.isfinite(order)):
raise ValueError("'order' must have finite integers greater than or equal to 1.")
self.order = np.array(order)
[docs]
def set_size(self, n):
if self.order.ndim == 0:
self.order = np.full(n, self.order)
if np.size(self.order) != n:
raise ValueError(f"Size of 'order' array must be {n}.")
[docs]
def get_weights(self, idx, distances):
return 1 / np.power(distances, self.order[idx])
[docs]
class BarnesGaussianWeights:
"""
Class for calculating weights for Barnes Gaussian Weighting.
Parameters
----------
kappa : float or numpy.ndarray
The smoothing parameter(s).
"""
def __init__(self, kappa):
if np.any(kappa <= 0) or not np.all(np.isfinite(kappa)):
raise ValueError("'kappa' must have finite positive values.")
self.kappa = np.array(kappa)
[docs]
def set_size(self, n):
if self.kappa.ndim == 0:
self.kappa = np.full(n, self.kappa)
if len(self.kappa) != n:
raise ValueError(f"Size of 'kappa' array must be {n}.")
[docs]
def get_weights(self, idx, distances):
return np.exp(-np.power(distances, 2) / np.power(self.kappa[idx], 2))
[docs]
class CressmanWeights:
"""
Class for calculating weights using Cressman Weighting.
Parameters
----------
max_distance : float or array-like
The maximum allowable distance(s). If scalar, it will be replicated.
"""
def __init__(self, max_distance):
if np.any(max_distance <= 0) or not np.all(np.isfinite(max_distance)):
raise ValueError("'max_distance' must have finite positive values.")
self.max_distance = np.array(max_distance)
[docs]
def set_size(self, n):
if self.max_distance.ndim == 0:
self.max_distance = np.full(n, self.max_distance)
if len(self.max_distance) != n:
raise ValueError(f"Size of 'max_distance' array must be {n}.")
[docs]
def get_weights(self, idx, distances):
max_distance = self.max_distance[idx]
max_dist_sq = max_distance**2
dist_sq = distances**2
weights = (max_dist_sq - dist_sq) / (max_dist_sq + dist_sq)
weights[distances > max_distance] = 0
return weights
DISTANCE_WEIGHTING_CLASSES = [InverseDistanceWeights, BarnesGaussianWeights, CressmanWeights]
DISTANCE_WEIGHTING_CLASSES_IMPLEMENTED = ["InverseDistanceWeights", "BarnesGaussianWeights", "CressmanWeights"]
[docs]
class PolyAggregator:
def __init__(self, source_polygons, target_polygons, parallel=False):
"""
Initialize the PolyAggregator.
Parameters
----------
source_polygons : list of shapely.Polygon
List of source polygons.
target_polygons : list of shapely.Polygon
List of target polygons.
parallel : bool, optional
Whether to run in parallel. Default is False.
use_multiprocessing : bool, optional
Whether to use multiprocessing (if parallel is True). Default is False.
"""
self.source_polygons = np.array(source_polygons)
self.target_polygons = np.array(target_polygons)
self.parallel = parallel
self.list_indices = create_list_indices(
source_polygons=self.source_polygons,
target_polygons=self.target_polygons,
)
self.dict_indices = create_target_dictionary(target_indices=self.list_indices[0], values=self.list_indices[1])
# Fields to be optionally populated
self._source_polygons_areas = None
self._target_polygons_areas = None
self._target_centroids = None
self._source_centroids = None
self._list_intersection_areas = None
self._list_distances = None
self._dict_intersection_areas = None
self._dict_distances = None
def _update_aggregator(self, valid_indices):
# Update list indices
self.list_indices = self.list_indices[:, valid_indices]
# Update dict indices
self.dict_indices = create_target_dictionary(target_indices=self.list_indices[0], values=self.list_indices[1])
if self._list_intersection_areas is not None:
self._list_intersection_areas = self._list_intersection_areas[valid_indices]
self._dict_intersection_areas = create_target_dictionary(
target_indices=self.list_indices[0],
values=self._list_intersection_areas,
)
if self._list_distances is not None:
self._list_distances = self._list_distances[valid_indices]
self._dict_distances = create_target_dictionary(
target_indices=self.list_indices[0],
values=self._list_distances,
)
@property
def target_polygons_areas(self):
if self._target_polygons_areas is None:
self._target_polygons_areas = shapely.area(self.target_polygons)
return self._target_polygons_areas
@property
def source_polygons_areas(self):
if self._source_polygons_areas is None:
self._source_polygons_areas = shapely.area(self.source_polygons)
return self._source_polygons_areas
@property
def target_centroids(self):
if self._target_centroids is None:
self._target_centroids = shapely.centroid(self.target_polygons)
return self._target_centroids
@property
def source_centroids(self):
if self._source_centroids is None:
self._source_centroids = shapely.centroid(self.source_polygons)
return self._source_centroids
@property
def list_distances(self):
if self._list_distances is None:
# Compute distance between polygon centroids
distance = shapely.distance(
self.target_centroids[self.list_indices[0]],
self.source_centroids[self.list_indices[1]],
)
self._list_distances = distance
return self._list_distances
@property
def list_intersection_areas(self):
if self._list_intersection_areas is None:
self._list_intersection_areas = self._compute_list_intersection_areas()
return self._list_intersection_areas
@property
def dict_intersection_areas(self):
if self._dict_intersection_areas is None:
self._dict_intersection_areas = create_target_dictionary(
target_indices=self.list_indices[0],
values=self.list_intersection_areas,
)
return self._dict_intersection_areas
@property
def dict_distances(self):
if self._dict_distances is None:
self._dict_distances = create_target_dictionary(
target_indices=self.list_indices[0],
values=self.list_distances,
)
return self._dict_distances
def _compute_list_intersection_areas(self):
# Compute intersection polygons
intersection_geom = shapely.intersection(
self.target_polygons[self.list_indices[0]],
self.source_polygons[self.list_indices[1]],
)
# Check valid intersection (intersecting Polygon ... not line of touching polygon)
valid_intersection = np.array([geom.geom_type == "Polygon" for geom in intersection_geom])
# Update aggregator lists and dictionaries
self._update_aggregator(valid_indices=valid_intersection)
# Compute intersection areas
valid_intersection_geom = intersection_geom[valid_intersection]
areas = shapely.area(valid_intersection_geom)
return areas
@property
def n_target_polygons(self):
return self.target_polygons.size
@property
def n_source_polygons(self):
return self.source_polygons.size
@property
def target_intersecting_indices(self):
return np.array(list(self.dict_indices.keys()))
@property
def target_non_intersecting_indices(self):
intersecting_indices = self.target_intersecting_indices
target_indices = np.arange(0, self.n_target_polygons)
return target_indices[np.isin(target_indices, intersecting_indices, invert=True)]
def _normalize_weights(self, weights):
"""
Normalize the weights.
Parameters
----------
weights : array-like
Array of weights.
Returns
-------
array
Normalized weights.
"""
total = np.sum(weights)
if total == 0:
return np.zeros_like(weights)
return weights / total
def _agg(self, func, target_idx, values, weights, area_weighting=True, distance_weighting=False, skip_na=True):
source_shape = values.shape
# Retrieve area_weights
if area_weighting:
area_weights = self.dict_intersection_areas[target_idx]
else:
area_weights = np.ones_like(values[:, 0], dtype="float")
# Retrieve distance_weights
if distance_weighting:
distance_weights = distance_weighting.get_weights(idx=target_idx, distances=self.dict_distances[target_idx])
else:
distance_weights = np.ones_like(values[:, 0], dtype="float")
# Discard NaN and Inf values if asked
if skip_na:
is_valid = np.all(np.isfinite(values), axis=1)
values = values[is_valid]
weights = weights[is_valid]
distance_weights = distance_weights[is_valid]
area_weights = area_weights[is_valid]
# Normalize weights
area_weights = self._normalize_weights(area_weights)
distance_weights = self._normalize_weights(distance_weights)
weights = self._normalize_weights(weights)
# Combine weights
# --> Potentially here provided weighting parameters for weights ...
list_weights = [weights, area_weights, distance_weights]
combined_weights = self._normalize_weights(np.prod(list_weights, axis=0))
# Deal with no indices
if values.size == 0 or np.sum(combined_weights) == 0:
values = np.ones((1, source_shape[1]), dtype="float") * np.nan
combined_weights = np.array([np.nan])
# Squeeze to 1D if possible
values = np.atleast_1d(np.squeeze(values)) # or pass axis to func !
# Aggregate
result = func(values, weights=combined_weights)
return result
[docs]
def apply(self, func, values, weights=None, area_weighting=True, distance_weighting=False, skip_na=True):
"""
Apply a custom aggregation function to the data.
Parameters
----------
func : callable
Aggregation function to apply.
values : list or array-like, optional
Array of source values to aggregate.
weights : list or array-like, optional
Array of source weights. Default is None.
area_weighting : bool, optional
Whether to weight by the intersection area. Default is True.
distance_weighting : bool or gpm.utils.zonal_stats.BaseDistanceWeights class, optional
Whether to weight by distance between poylgon centroids. Default is False.
Currently accepted classes are InverseDistanceWeights, BarnesGaussianWeights, CressmanWeights.
skip_na : bool, optional
Whether to discard NaN values before applying the aggregation function. Default is True.
Returns
-------
list
List of aggregated values for each target polygon.
"""
# Check inputs
values = np.asanyarray(values)
values = np.atleast_2d(values).T
expected_nvalues = len(self.source_polygons)
if values.shape[0] != expected_nvalues:
raise ValueError(f"Expecting {expected_nvalues} values. Got {values.shape[0]}.")
# Check custom weights
if weights is None:
weights = np.ones_like(values[:, 0], dtype="float")
weights = np.asanyarray(weights)
if len(weights) != expected_nvalues:
raise ValueError(f"Expecting {expected_nvalues} weights values. Got {len(weights)}.")
# Check distance_weighting
if distance_weighting:
if not isinstance(distance_weighting, tuple(DISTANCE_WEIGHTING_CLASSES)):
raise ValueError(
f"The accepted distance_weighting classes are {DISTANCE_WEIGHTING_CLASSES_IMPLEMENTED}",
)
distance_weighting.set_size(len(self.target_polygons))
# Aggregate intersecting geometries
results = [
self._agg(
func,
target_idx=target_idx,
values=values[source_indices],
weights=weights[source_indices],
area_weighting=area_weighting,
distance_weighting=distance_weighting,
skip_na=skip_na,
)
for target_idx, source_indices in self.dict_indices.items()
]
out = np.vstack(results).squeeze()
# Add missing values for non intersecting geometries
if out.ndim == 1:
arr = np.zeros(self.n_target_polygons) * np.nan
arr = arr.astype(out.dtype) # ensure datetime becomes NaT
arr[self.target_intersecting_indices] = out
elif out.ndim == 2:
shape = (self.n_target_polygons, out.shape[1])
arr = np.zeros(shape) * np.nan
arr = arr.astype(out.dtype) # ensure datetime becomes NaT
arr[self.target_intersecting_indices, :] = out
else:
raise NotImplementedError
return arr
[docs]
def fraction_covered_area(self):
"""Compute the fraction of covered area of each target polygon by the source polygons."""
# Compute fraction covered area
target_areas = shapely.area(self.target_polygons.take(list(self.dict_intersection_areas)))
out = [
np.round(np.sum(intersection_areas) / target_areas[i], 6)
for i, intersection_areas in enumerate(self.dict_intersection_areas.values())
]
# Infill for no intersection with target polygons
arr = np.zeros(self.n_target_polygons) * np.nan
arr[self.target_intersecting_indices] = out
return arr
[docs]
def counts(self):
"""Compute the number of source polygons intersecting each target polygon."""
out = [len(indices) for indices in self.dict_indices.values()]
# Infill for no intersection with target polygons
arr = np.zeros(self.n_target_polygons) * np.nan
arr[self.target_intersecting_indices] = out
return arr
[docs]
def first(self, values, skip_na=True):
def func(values, weights="dummy"): # noqa
return values[0]
return self.apply(func=func, values=values, weights=None, area_weighting=False, skip_na=skip_na)
[docs]
def sum(self, values, weights=None, area_weighting=True, distance_weighting=False, skip_na=True):
def func(values, weights):
return np.sum(values * weights)
return self.apply(
func=func,
values=values,
weights=weights,
area_weighting=area_weighting,
distance_weighting=distance_weighting,
skip_na=skip_na,
)
[docs]
def average(self, values, weights=None, area_weighting=True, distance_weighting=False, skip_na=True):
def func(values, weights):
return np.average(values, weights=weights)
return self.apply(
func=func,
values=values,
weights=weights,
area_weighting=area_weighting,
distance_weighting=distance_weighting,
skip_na=skip_na,
)
[docs]
def mean(self, values, weights=None, area_weighting=True, distance_weighting=False, skip_na=True):
return self.average(
values=values,
weights=weights,
area_weighting=area_weighting,
distance_weighting=distance_weighting,
skip_na=skip_na,
)
[docs]
def var(self, values, weights=None, area_weighting=True, distance_weighting=False, skip_na=True):
def func(values, weights):
mean = np.average(values, weights=weights)
return np.average((values - mean) ** 2, weights=weights)
return self.apply(
func=func,
values=values,
weights=weights,
area_weighting=area_weighting,
distance_weighting=distance_weighting,
skip_na=skip_na,
)
[docs]
def std(self, values, weights=None, area_weighting=True, distance_weighting=False, skip_na=True):
return np.sqrt(
self.var(
values=values,
weights=weights,
area_weighting=area_weighting,
distance_weighting=distance_weighting,
skip_na=skip_na,
),
)
[docs]
def max(self, values, skip_na=True):
def func(values, weights="dummy"): # noqa
return np.max(values)
return self.apply(func=func, values=values, weights=None, area_weighting=False, skip_na=skip_na)
[docs]
def min(self, values, skip_na=True):
def func(values, weights="dummy"): # noqa
return np.min(values)
return self.apply(func=func, values=values, weights=None, area_weighting=False, skip_na=skip_na)