Source code for gpm.bucket.routines

# -----------------------------------------------------------------------------.
# MIT License

# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# -----------------------------------------------------------------------------.
"""This module provides the routines for the creation of GPM Geographic Buckets."""
import os
import time

import dask
import numpy as np
import pyarrow as pa
import pyarrow.dataset
import pyarrow.parquet as pq
from tqdm import tqdm

from gpm.bucket.io import get_bucket_partitioning, get_filepaths_by_partition, write_bucket_info
from gpm.bucket.writers import preprocess_writer_kwargs, write_dataset_metadata, write_partitioned_dataset
from gpm.io.info import group_filepaths
from gpm.utils.dask import clean_memory, get_client
from gpm.utils.parallel import compute_list_delayed
from gpm.utils.timing import print_task_elapsed_time

####--------------------------------------------------------------------------------------------------.
#### Bucket Granules


[docs] def split_list_in_blocks(values, block_size): return [values[i : i + block_size] for i in range(0, len(values), block_size)]
[docs] def write_granule_bucket( src_filepath, bucket_dir, partitioning, granule_to_df_func, x="lon", y="lat", # Writer kwargs **writer_kwargs, ): """Write a geographically partitioned Parquet Dataset of a GPM granules. Parameters ---------- src_filepath : str File path of the granule to store in the bucket archive. bucket_dir: str Base directory of the per-granule bucket archive. partitioning: gpm.bucket.SpatialPartitioning A spatial partitioning class. granule_to_df_func : Callable Function taking a granule filepath, opening it and returning a pandas or dask dataframe. x: str The name of the x column. The default is "lon". y: str The name of the y column. The default is "lat". **writer_kwargs: dict Optional arguments to be passed to the pyarrow Dataset Writer. Common arguments are 'format' and 'use_threads'. The default file ``format`` is ``'parquet'``. The default ``use_threads`` is ``True``, which enable multithreaded file writing. More information available at https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html """ # Define unique prefix name so to add files to the bucket archive # - This prevent risk of overwriting # - If df is pandas.dataframe --> f"{filename_prefix}_" + "{i}.parquet" # - if df is a dask.dataframe --> f"{filename_prefix}_dask.partition_{part_index}" filename_prefix = os.path.splitext(os.path.basename(src_filepath))[0] # Retrieve dataframe df = granule_to_df_func(src_filepath) # Add partitioning columns df = partitioning.add_labels(df=df, x=x, y=y) # Write partitioned dataframe write_partitioned_dataset( df=df, base_dir=bucket_dir, filename_prefix=filename_prefix, partitions=partitioning.order, partitioning_flavor=partitioning.flavor, **writer_kwargs, )
def _try_write_granule_bucket(**kwargs): try: # synchronous with dask.config.set(scheduler="single-threaded"): write_granule_bucket(**kwargs) # If works, return None info = None except Exception as e: # Define tuple to return src_filepath = kwargs["src_filepath"] info = src_filepath, str(e) return info @print_task_elapsed_time(prefix="Granules Bucketing Operation Terminated.") def write_granules_bucket( filepaths, # Bucket configuration bucket_dir, partitioning, granule_to_df_func, # Processing options parallel=True, max_concurrent_tasks=None, max_dask_total_tasks=500, # Writer kwargs use_threads=False, row_group_size="500MB", **writer_kwargs, ): """Write a geographically partitioned Parquet Dataset of GPM granules. Parameters ---------- filepaths : str File paths of the GPM granules to store in the bucket archive. bucket_dir: str Base directory of the per-granule bucket archive. partitioning: gpm.bucket.SpatialPartitioning A spatial partitioning class. Carefully consider the size of the partitions. Earth partitioning by: - 1° degree corresponds to 64800 directories (360*180) - 5° degree corresponds to 2592 directories (72*36) - 10° degree corresponds to 648 directories (36*18) - 15° degree corresponds to 288 directories (24*12) granule_to_df_func : callable Function taking a granule filepath, opening it and returning a pandas or dask dataframe. parallel : bool Whether to bucket several granules in parallel. The default is ``True``. max_concurrent_tasks : int The maximum number of Dask tasks to be concurrently executed. If ``None``, let the Dask Scheduler to choose. The default is ``None``. max_dask_total_tasks : int The maximum number of Dask tasks to be scheduled. The default is 500. use_threads : bool Whether to write Parquet files with multiple threads. If bucketing granules in a multiprocessing environment, it's better to set it to ``False``. The default is ``False``. row_group_size : int or str, optional Maximum number of rows in each written Parquet row group. If specified as a string (i.e. "500 MB"), the equivalent row group size number is estimated. The default is "500MB". **writer_kwargs: dict Optional arguments to be passed to the pyarrow Dataset Writer. Common arguments are ``format`` and ``use_threads``. The default file ``format`` is ``parquet``. The default ``use_threads`` is ``True``, which enable multithreaded file writing. More information available at https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html """ # Define flavor of directory partitioning writer_kwargs["row_group_size"] = row_group_size writer_kwargs["use_threads"] = use_threads # Write down the information of the bucket write_bucket_info(bucket_dir=bucket_dir, partitioning=partitioning) # Split long list of files in blocks list_blocks = split_list_in_blocks(filepaths, block_size=max_dask_total_tasks) # Execute tasks by blocks to avoid dask overhead n_blocks = len(list_blocks) for i, block_filepaths in enumerate(list_blocks): print(f"Executing tasks block {i+1}/{n_blocks}") # Loop over granules func = dask.delayed(_try_write_granule_bucket) if parallel else _try_write_granule_bucket list_results = [ func( src_filepath=src_filepath, bucket_dir=bucket_dir, partitioning=partitioning, granule_to_df_func=granule_to_df_func, # Writer kwargs **writer_kwargs, ) for src_filepath in block_filepaths ] # If delayed, execute the tasks if parallel: list_results = compute_list_delayed( list_results, max_concurrent_tasks=max_concurrent_tasks, ) # Process results to detect errors list_errors = [error_info for error_info in list_results if error_info is not None] for src_filepath, error_str in list_errors: print(f"An error occurred while processing {src_filepath}: {error_str}") # If parallel=True, retrieve client, clean the memory and restart if parallel: client = get_client() clean_memory(client) client.restart() ####--------------------------------------------------------------------------------------------------. #### Bucket DataFrame @print_task_elapsed_time(prefix="Dataset Bucket Operation Terminated.") def write_bucket( df, bucket_dir, partitioning, x="lon", y="lat", # Writer arguments filename_prefix="part", row_group_size="500MB", **writer_kwargs, ): """ Write a geographically partitioned Parquet Dataset. Parameters ---------- df : pandas.DataFrame or dask.dataframe.DataFrame Pandas or Dask dataframe to be written into a geographic bucket. bucket_dir: str Base directory of the geographic bucket archive. partitioning: gpm.bucket.SpatialPartitioning A spatial partitioning class. Carefully consider the size of the partitions. Earth partitioning by: - 1° degree corresponds to 64800 directories (360*180) - 5° degree corresponds to 2592 directories (72*36) - 10° degree corresponds to 648 directories (36*18) - 15° degree corresponds to 288 directories (24*12) x: str The name of the x column. The default is "lon". y: str The name of the y column. The default is "lat". row_group_size : int or str, optional Maximum number of rows in each written Parquet row group. If specified as a string (i.e. "500 MB"), the equivalent row group size number is estimated. The default is "500MB". **writer_kwargs: dict Optional arguments to be passed to the pyarrow Dataset Writer. Common arguments are 'format' and 'use_threads'. The default file ``format`` is ``'parquet'``. The default ``use_threads`` is ``True``, which enable multithreaded file writing. More information available at https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html """ # Write down the information of the bucket write_bucket_info( bucket_dir=bucket_dir, partitioning=partitioning, ) # Add partitioning columns df = partitioning.add_labels(df=df, x=x, y=y) # Write bucket writer_kwargs["row_group_size"] = row_group_size write_partitioned_dataset( df=df, base_dir=bucket_dir, partitions=partitioning.order, partitioning_flavor=partitioning.flavor, filename_prefix=filename_prefix, **writer_kwargs, ) ####--------------------------------------------------------------------------------------------------. #### Merge Granules @print_task_elapsed_time(prefix="Bucket Merging Terminated.") def merge_granule_buckets( src_bucket_dir, dst_bucket_dir, force=False, row_group_size="400MB", max_file_size="1GB", compression="snappy", compression_level=None, write_metadata=False, write_statistics=False, # Computing options max_open_files=0, use_threads=True, # Scanner options batch_size=131_072, batch_readahead=16, fragment_readahead=4, ): """Merge the per-granule bucket archive in a single optimized archive. Set ulimit -n 999999 before running this routine ! Parameters ---------- src_bucket_dir : str Base directory of the per-granule bucket archive. dst_bucket_dir : str Directory path of the final bucket archive. row_group_size : int or str, optional Maximum number of rows to be written in each Parquet row group. If specified as a string (i.e. ``"400 MB"``), the equivalent number of rows is estimated. The default is ``"400MB"``. max_file_size: str, optional Maximum number of rows to be written in a Parquet file. If specified as a string, the equivalent number of rows is estimated. Ideally the value should be a multiple of ``row_group_size``. The default is ``"2GB"``. compression : str, optional Specify the compression codec, either on a general basis or per-column. Valid values: ``{"none", "snappy", "gzip", "brotli", "lz4", "zstd"}``. The default is ``snappy``. compression_level : int or dict, optional Specify the compression level for a codec, either on a general basis or per-column. If ``None`` is passed, arrow selects the compression level for the compression codec in use. The compression level has a different meaning for each codec, so you have to read the pyArrow documentation of the codec you are using at https://arrow.apache.org/docs/python/generated/pyarrow.Codec.html The default is ``None``. max_open_files, int, optional If greater than 0 then this will limit the maximum number of files that can be left open. If an attempt is made to open too many files then the least recently used file will be closed. If this setting is set too low you may end up fragmenting your data into many small files. The default is ``0``. Note that Linux has a default limit of ``1024``. Before starting the python session, increase it with ``ulimit -n <new_much_higher_limit>``. use_threads: bool, optional If enabled, then maximum parallelism will be used to read and write files (in multithreading). The number of threads is determined by the number of available CPU cores. The default is ``True``. batch_size, int, optional The maximum row count for scanned record batches. If scanned record batches are overflowing memory then this value can be reduced to reduce the memory usage. The default value is ``131_072``. batch_readahead The number of batches to read ahead in a file. Increasing this number will increase RAM usage but could also improve IO utilization. The default is ``16``. fragment_readahead The number of files to read ahead. Increasing this number will increase RAM usage but could also improve IO utilization. The default is ``4``. Returns ------- None. """ # Retrieve partitioning class partitioning = get_bucket_partitioning(bucket_dir=src_bucket_dir) # Identify Parquet filepaths for each bin print("Searching of Parquet files has started.") t_i = time.time() dict_partition_files = get_filepaths_by_partition(src_bucket_dir, parallel=True) n_geographic_bins = len(dict_partition_files) t_f = time.time() t_elapsed = round((t_f - t_i) / 60, 1) print(f"Searching of Parquet files ended. Elapsed time: {t_elapsed} minutes.") print(f"{n_geographic_bins} geographic partitions to process.") # Retrieve list of partitions list_partitions = list(dict_partition_files.keys()) # Write the new partitioning class # TODO: add option maybe to provide new partitioning to this routine ! # --> Will require to load data into memory inside a partition (instead of scanner) ! # --> Check that new partitioning is aligned and subset of original partitioning? write_bucket_info(bucket_dir=dst_bucket_dir, partitioning=partitioning) # -----------------------------------------------------------------------------------------------. # Retrieve table schema # - partitioning.levels are read by pq.read_table as dictionaries (depending on pyarrow version) # - partitioning.levels columns must be dropped by the table if present template_filepath = dict_partition_files[list_partitions[0]][0] template_table = pq.read_table(template_filepath) if np.all(np.isin(partitioning.levels, template_table.column_names)): template_table = template_table.drop_columns(partitioning.levels) schema = template_table.schema # Define writer_kwargs writer_kwargs = {} writer_kwargs["row_group_size"] = row_group_size writer_kwargs["max_file_size"] = max_file_size writer_kwargs["compression"] = compression writer_kwargs["compression_level"] = compression_level writer_kwargs["max_open_files"] = max_open_files writer_kwargs["use_threads"] = use_threads writer_kwargs["write_metadata"] = write_metadata writer_kwargs["write_statistics"] = write_statistics writer_kwargs, metadata_collector = preprocess_writer_kwargs( writer_kwargs=writer_kwargs, df=template_table, ) # -----------------------------------------------------------------------------------------------. # Concatenate data within bins # - Cannot rewrite directly the full pyarrow.dataset because there is no way to specify when # data from each partition have been scanned completely (and can be written to disk) print("Start concatenating the granules bucket archive") # partition_label = "latbin=0/lonbin=10" # filepaths = dict_partition_files[partition_label] n_partitions = len(dict_partition_files) for partition_label, filepaths in tqdm(dict_partition_files.items(), total=n_partitions): partition_dir = os.path.join(dst_bucket_dir, partition_label) # Choose if too skip # - TODO: search which year already there and only add remainings if not force and os.path.exists(partition_dir): continue year_dict = group_filepaths(filepaths, groups="year") for year, year_filepaths in year_dict.items(): basename_template = f"{year}_" + "{i}.parquet" # Read Dataset dataset = pyarrow.dataset.dataset(year_filepaths, format="parquet") # Define scanner scanner = dataset.scanner( batch_size=batch_size, batch_readahead=batch_readahead, fragment_readahead=fragment_readahead, use_threads=use_threads, ) # Rewrite dataset pa.dataset.write_dataset( scanner, base_dir=partition_dir, basename_template=basename_template, # Directory options create_dir=True, existing_data_behavior="overwrite_or_ignore", # Options **writer_kwargs, ) if metadata_collector: write_dataset_metadata(base_dir=dst_bucket_dir, metadata_collector=metadata_collector, schema=schema)