Source code for gpm.bucket.io

# -----------------------------------------------------------------------------.
# MIT License

# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# -----------------------------------------------------------------------------.
"""This module provide utilities to search GPM Geographic Buckets files."""
import importlib
import os

from gpm.utils.directories import get_filepaths_by_path, get_filepaths_within_paths
from gpm.utils.yaml import read_yaml, write_yaml


[docs] def read_bucket_info(bucket_dir): os.makedirs(bucket_dir, exist_ok=True) bucket_info_filepath = os.path.join(bucket_dir, "bucket_info.yaml") bucket_info = read_yaml(filepath=bucket_info_filepath) return bucket_info
[docs] def get_bucket_partitioning(bucket_dir): bucket_info = read_bucket_info(bucket_dir) class_name = bucket_info.pop("partitioning_class") partitioning_class = getattr(importlib.import_module("gpm.bucket.partitioning"), class_name) partitioning = partitioning_class(**bucket_info) return partitioning
[docs] def write_bucket_info(bucket_dir, partitioning): os.makedirs(bucket_dir, exist_ok=True) bucket_info = partitioning.to_dict() bucket_info_filepath = os.path.join(bucket_dir, "bucket_info.yaml") write_yaml(bucket_info, filepath=bucket_info_filepath, sort_keys=False)
####------------------------------------------------------------------------------------------------------------------. #### Bucket partitions utilities
[docs] def get_exisiting_partitions_paths(bucket_dir, dir_trees): """Get the path of existing bucket partitions on disk.""" # Retrieve current partitions paths = [os.path.join(bucket_dir, dir_tree) for dir_tree in dir_trees] # Select existing directories paths = [path for path in paths if os.path.exists(path)] return paths
[docs] def get_partitions_paths(bucket_dir): """Get the path of the bucket partitions.""" partitioning = get_bucket_partitioning(bucket_dir=bucket_dir) dir_trees = partitioning.directories return get_exisiting_partitions_paths(bucket_dir, dir_trees)
[docs] def get_filepaths(bucket_dir, parallel=True, file_extension=None, glob_pattern=None, regex_pattern=None): """Return the filepaths matching the specified filename filtering criteria.""" partitioning = get_bucket_partitioning(bucket_dir=bucket_dir) dir_trees = partitioning.directories partitions_paths = get_exisiting_partitions_paths(bucket_dir, dir_trees) filepaths = get_filepaths_within_paths( paths=partitions_paths, parallel=parallel, file_extension=file_extension, glob_pattern=glob_pattern, regex_pattern=regex_pattern, ) return filepaths
[docs] def get_filepaths_by_partition(bucket_dir, parallel=True, file_extension=None, glob_pattern=None, regex_pattern=None): """Return a dictionary with the list of filepaths for each bucket partition.""" partitioning = get_bucket_partitioning(bucket_dir=bucket_dir) n_levels = partitioning.n_levels dir_trees = partitioning.directories partitions_paths = get_exisiting_partitions_paths(bucket_dir, dir_trees) dict_filepaths = get_filepaths_by_path( paths=partitions_paths, parallel=parallel, file_extension=file_extension, glob_pattern=glob_pattern, regex_pattern=regex_pattern, ) sep = os.path.sep dict_partition_files = {sep.join(k.strip(sep).split(sep)[-n_levels:]): v for k, v in dict_filepaths.items()} return dict_partition_files