Source code for gpm.bucket.routines

# -----------------------------------------------------------------------------.
# MIT License

# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# -----------------------------------------------------------------------------.
"""This module provides the routines for the creation of GPM Geographic Buckets."""
import os

import dask
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.dataset
import pyarrow.parquet as pq
from tqdm import tqdm

from gpm.bucket.io import (
    get_bucket_spatial_partitioning,
    get_bucket_temporal_partitioning,
    get_exisiting_partitions_paths,
    get_filepaths_within_paths,
    write_bucket_info,
)
from gpm.bucket.writers import (
    preprocess_writer_kwargs,
    write_dataset_metadata,
    write_partitioned_dataset,
)
from gpm.io.checks import check_start_end_time
from gpm.io.filter import filter_filepaths, is_within_time_period
from gpm.io.info import get_start_end_time_from_filepaths
from gpm.utils.dask import clean_memory, get_client
from gpm.utils.directories import get_first_file, list_and_filter_files
from gpm.utils.parallel import compute_list_delayed
from gpm.utils.timing import print_task_elapsed_time

####--------------------------------------------------------------------------------------------------.
#### Bucket Granules


[docs] def split_list_in_blocks(values, block_size): return [values[i : i + block_size] for i in range(0, len(values), block_size)]
[docs] def write_granule_bucket( src_filepath, bucket_dir, spatial_partitioning, granule_to_df_func, x="lon", y="lat", # Writer kwargs **writer_kwargs, ): """Write a geographically partitioned Parquet Dataset of a GPM granules. Parameters ---------- src_filepath : str File path of the granule to store in the bucket archive. bucket_dir: str Base directory of the per-granule bucket archive. spatial_partitioning: gpm.bucket.SpatialPartitioning A spatial partitioning class. granule_to_df_func : Callable Function taking a granule filepath, opening it and returning a pandas or dask dataframe. x: str The name of the x column. The default is "lon". y: str The name of the y column. The default is "lat". **writer_kwargs: dict Optional arguments to be passed to the pyarrow Dataset Writer. Common arguments are 'format' and 'use_threads'. The default file ``format`` is ``'parquet'``. The default ``use_threads`` is ``True``, which enable multithreaded file writing. More information available at https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html """ # Define unique prefix name so to add files to the bucket archive # - This prevent risk of overwriting # - If df is pandas.dataframe --> f"{filename_prefix}_" + "{i}.parquet" # - if df is a dask.dataframe --> f"{filename_prefix}_dask.partition_{part_index}" filename_prefix = os.path.splitext(os.path.basename(src_filepath))[0] # Retrieve dataframe df = granule_to_df_func(src_filepath) # Do nothing if returns None # - For example when no intersecting AOI if df is None: return # Add partitioning columns df = spatial_partitioning.add_labels(df=df, x=x, y=y) # Write partitioned dataframe write_partitioned_dataset( df=df, base_dir=bucket_dir, filename_prefix=filename_prefix, partitions=spatial_partitioning.order, partitioning_flavor=spatial_partitioning.flavor, **writer_kwargs, )
def _try_write_granule_bucket(**kwargs): try: # synchronous with dask.config.set(scheduler="single-threaded"): write_granule_bucket(**kwargs) # If works, return None info = None except Exception as e: # Define tuple to return src_filepath = kwargs["src_filepath"] info = src_filepath, str(e) return info @print_task_elapsed_time(prefix="Granules Bucketing Operation Terminated.") def write_granules_bucket( filepaths, # Bucket configuration bucket_dir, spatial_partitioning, granule_to_df_func, # Processing options parallel=True, max_concurrent_tasks=None, max_dask_total_tasks=500, # Writer kwargs use_threads=False, row_group_size="500MB", **writer_kwargs, ): """Write a geographically partitioned Parquet Dataset of GPM granules. Parameters ---------- filepaths : str File paths of the GPM granules to store in the bucket archive. bucket_dir: str Base directory of the per-granule bucket archive. spatial_partitioning: gpm.bucket.SpatialPartitioning A spatial partitioning class. Carefully consider the size of the partitions. Earth partitioning by: - 1° degree corresponds to 64800 directories (360*180) - 5° degree corresponds to 2592 directories (72*36) - 10° degree corresponds to 648 directories (36*18) - 15° degree corresponds to 288 directories (24*12) granule_to_df_func : callable Function taking a granule filepath, opening it and returning a pandas or dask dataframe. parallel : bool Whether to bucket several granules in parallel. The default is ``True``. max_concurrent_tasks : int The maximum number of Dask tasks to be concurrently executed. If ``None``, let the Dask Scheduler to choose. The default is ``None``. max_dask_total_tasks : int The maximum number of Dask tasks to be scheduled. The default is 500. use_threads : bool Whether to write Parquet files with multiple threads. If bucketing granules in a multiprocessing environment, it's better to set it to ``False``. The default is ``False``. row_group_size : int or str, optional Maximum number of rows in each written Parquet row group. If specified as a string (i.e. "500 MB"), the equivalent row group size number is estimated. The default is "500MB". **writer_kwargs: dict Optional arguments to be passed to the pyarrow Dataset Writer. Common arguments are ``format`` and ``use_threads``. The default file ``format`` is ``parquet``. The default ``use_threads`` is ``True``, which enable multithreaded file writing. More information available at https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html """ # Define flavor of directory partitioning writer_kwargs["row_group_size"] = row_group_size writer_kwargs["use_threads"] = use_threads # Write down the information of the bucket write_bucket_info(bucket_dir=bucket_dir, spatial_partitioning=spatial_partitioning) # Split long list of files in blocks list_blocks = split_list_in_blocks(filepaths, block_size=max_dask_total_tasks) # Execute tasks by blocks to avoid dask overhead n_blocks = len(list_blocks) for i, block_filepaths in enumerate(list_blocks): print(f"Executing tasks block {i+1}/{n_blocks}") # Loop over granules func = dask.delayed(_try_write_granule_bucket) if parallel else _try_write_granule_bucket list_results = [ func( src_filepath=src_filepath, bucket_dir=bucket_dir, spatial_partitioning=spatial_partitioning, granule_to_df_func=granule_to_df_func, # Writer kwargs **writer_kwargs, ) for src_filepath in block_filepaths ] # If delayed, execute the tasks if parallel: list_results = compute_list_delayed( list_results, max_concurrent_tasks=max_concurrent_tasks, ) # Process results to detect errors list_errors = [error_info for error_info in list_results if error_info is not None] for src_filepath, error_str in list_errors: print(f"An error occurred while processing {src_filepath}: {error_str}") # If parallel=True, retrieve client, clean the memory and restart if parallel: client = get_client() clean_memory(client) client.restart() ####--------------------------------------------------------------------------------------------------. #### Bucket DataFrame @print_task_elapsed_time(prefix="Dataset Bucket Operation Terminated.") def write_bucket( df, bucket_dir, spatial_partitioning, x="lon", y="lat", # Writer arguments filename_prefix="part", row_group_size="500MB", **writer_kwargs, ): """ Write a geographically partitioned Parquet Dataset. Parameters ---------- df : pandas.DataFrame or dask.dataframe.DataFrame Pandas or Dask dataframe to be written into a geographic bucket. bucket_dir: str Base directory of the geographic bucket archive. spatial_partitioning: gpm.bucket.SpatialPartitioning A spatial partitioning class. Carefully consider the size of the partitions. Earth partitioning by: - 1° degree corresponds to 64800 directories (360*180) - 5° degree corresponds to 2592 directories (72*36) - 10° degree corresponds to 648 directories (36*18) - 15° degree corresponds to 288 directories (24*12) x: str The name of the x column. The default is "lon". y: str The name of the y column. The default is "lat". row_group_size : int or str, optional Maximum number of rows in each written Parquet row group. If specified as a string (i.e. "500 MB"), the equivalent row group size number is estimated. The default is "500MB". **writer_kwargs: dict Optional arguments to be passed to the pyarrow Dataset Writer. Common arguments are 'format' and 'use_threads'. The default file ``format`` is ``'parquet'``. The default ``use_threads`` is ``True``, which enable multithreaded file writing. More information available at https://arrow.apache.org/docs/python/generated/pyarrow.dataset.write_dataset.html """ # Write down the information of the bucket write_bucket_info( bucket_dir=bucket_dir, spatial_partitioning=spatial_partitioning, ) # Add partitioning columns df = spatial_partitioning.add_labels(df=df, x=x, y=y) # Write bucket writer_kwargs["row_group_size"] = row_group_size write_partitioned_dataset( df=df, base_dir=bucket_dir, partitions=spatial_partitioning.order, partitioning_flavor=spatial_partitioning.flavor, filename_prefix=filename_prefix, **writer_kwargs, ) ####--------------------------------------------------------------------------------------------------. #### Merge Granules
[docs] def check_temporal_partitioning(temporal_partitioning): """Check validity of temporal_partitioning argument.""" valid_values = ["year", "month", "season", "quarter"] if not isinstance(temporal_partitioning, str): raise TypeError("'temporal_partitioning' must be a string.") if temporal_partitioning not in valid_values: raise ValueError(f"Invalid '{temporal_partitioning}'. Valid values are {valid_values}") return temporal_partitioning
[docs] def define_dict_partitions(src_bucket_dir, src_spatial_partitioning, dst_spatial_partitioning=None): """List source partitions directories paths for each destination partition.""" if dst_spatial_partitioning is not None: raise NotImplementedError("Repartitioning not yet implemented.") dst_spatial_partitioning = src_spatial_partitioning # Retrieve path to source partitions n_levels = dst_spatial_partitioning.n_levels dir_trees = dst_spatial_partitioning.directories partitions_paths = get_exisiting_partitions_paths(src_bucket_dir, dir_trees) # on 4096 directories ... # Define list of destination bucket partitions and source bucket directories sep = os.path.sep dict_partitions = {sep.join(path.strip(sep).split(sep)[-n_levels:]): [path] for path in partitions_paths} return dict_partitions
[docs] def get_template_table(dict_partitions): """Read and return a table template.""" # Take the first file in the partitions file_found = False for list_src_dir in dict_partitions.values(): for path in list_src_dir: filepath = get_first_file(path) if filepath is not None: file_found = True break # should break all loops if file_found: break if not file_found: raise ValueError("No file found in the source bucket archive.") # Read the first parquet file found template_table = pq.read_table(filepath) return template_table
[docs] def get_time_prefix(timestep, temporal_partitioning): """Define a time prefix string from a datetime object.""" if temporal_partitioning == "year": return f"{timestep.year}" # e.g. "2021" if temporal_partitioning == "month": return f"{timestep.year}_{timestep.month}" # e.g. "2021_1" if temporal_partitioning == "quarter": # Q1: Jan-Mar, Q2: Apr-Jun, Q3: Jul-Sep, Q4: Oct-Dec quarter = (timestep.month - 1) // 3 + 1 return f"{timestep.year}_{quarter}" # e.g. "2021_1" for Q1 2021 if temporal_partitioning == "day": return f"{timestep.year}_{timestep.month}_{timestep.day}" raise NotImplementedError(f"Invalid '{temporal_partitioning}' temporal_partitioning")
[docs] def get_partitioning_boundaries(start_time, end_time, temporal_partitioning): """Define a time prefix string from a datetime object.""" # -------- # YEAR if temporal_partitioning == "year": if end_time != pd.Timestamp(f"{end_time.year}-01-01 00:00:00"): end_time = end_time + pd.DateOffset(years=1) boundaries = pd.date_range( start=pd.Timestamp(f"{start_time.year}-01-01"), end=pd.Timestamp(f"{end_time.year}-01-01"), freq="YS", ) return boundaries # -------- # MONTH if temporal_partitioning == "month": if end_time != pd.Timestamp(f"{end_time.year}-{end_time.month:02d}-01 00:00:00"): end_time = end_time + pd.DateOffset(months=1) boundaries = pd.date_range( start=pd.Timestamp(f"{start_time.year}-{start_time.month}-01 00:00:00"), end=pd.Timestamp(f"{end_time.year}-{end_time.month}-01 00:00:00"), freq="MS", ) return boundaries # -------- # QUARTER # Q1: Jan-Mar, Q2: Apr-Jun, Q3: Jul-Sep, Q4: Oct-Dec if temporal_partitioning == "quarter": # Define start time quarter month start_quarter_idx = (start_time.month - 1) // 3 + 1 start_quarter_start_month = 3 * (start_quarter_idx - 1) + 1 # Define end time quarter month end_quarter_idx = (end_time.month - 1) // 3 + 1 end_quarter_start_month = 3 * (end_quarter_idx - 1) + 1 # Update to next quarter if necessary if end_time != pd.Timestamp(f"{end_time.year}-{end_quarter_start_month:02d}-01 00:00:00"): end_time = end_time + pd.DateOffset(months=3) end_quarter_idx = (end_time.month - 1) // 3 + 1 end_quarter_start_month = 3 * (end_quarter_idx - 1) + 1 boundaries = pd.date_range( start=pd.Timestamp(f"{start_time.year}-{start_quarter_start_month}-01 00:00:00"), end=pd.Timestamp(f"{end_time.year}-{end_quarter_start_month}-01 00:00:00"), freq="QS", ) return boundaries # -------- # DAY if temporal_partitioning == "day": if end_time != end_time.normalize(): end_time = end_time.normalize() + pd.DateOffset(days=1) boundaries = pd.date_range( start=pd.Timestamp(f"{start_time.year}-{start_time.month}-{start_time.day} 00:00:00"), end=end_time, freq="D", ) return boundaries raise NotImplementedError(f"Invalid '{temporal_partitioning}' temporal_partitioning")
[docs] def get_list_group_periods(start_time, end_time, temporal_partitioning): """List group time periods.""" # Retrieve group time boundaries boundaries = get_partitioning_boundaries(start_time, end_time, temporal_partitioning) # Define list with group time information intervals = [] for i, group_start in enumerate(boundaries): # Retrieve group end time group_end = boundaries[i + 1] if i < len(boundaries) - 1 else end_time # Clamp the boundaries to the overall [start_time, end_time) group_start = max(group_start, start_time) group_end = min(group_end, end_time) # Build time prefix (e.g., "2021", "2021_1", "2021_1_15", etc.) time_prefix = get_time_prefix(group_start, temporal_partitioning) # Avoid zero-length intervals if group_start < group_end: intervals.append((time_prefix, group_start, group_end)) return intervals
[docs] def group_files_by_time(filepaths, start_time, end_time, temporal_partitioning): # Convert to numpy array filepaths = np.array(filepaths) # Retrieve start time and end_time of each file l_start_time, l_end_time = get_start_end_time_from_filepaths(filepaths) # Initialize start_time and end_time if None if start_time is None: start_time = pd.Timestamp(l_start_time.min()) if end_time is None: end_time = pd.Timestamp(l_end_time.max()) # Define possible group_start_time and group_end_time list_group_periods = get_list_group_periods(start_time, end_time, temporal_partitioning) # List all filepaths for each time group groups_dict = {} # group_key, group_start_time, group_end_time = list_group_periods[2] for group_key, group_start_time, group_end_time in list_group_periods: is_within_group = is_within_time_period( l_start_time=l_start_time, l_end_time=l_end_time, start_time=group_start_time, end_time=group_end_time, ) list_group_filepaths = filepaths[is_within_group] if len(list_group_filepaths) > 0: groups_dict[group_key] = (group_start_time, group_end_time, list_group_filepaths) return groups_dict
[docs] def define_dataset_filter( start_time, end_time, # dst_spatial_partitioning, partition_label or extent ): time_filter = (pyarrow.dataset.field("time") >= start_time) & (pyarrow.dataset.field("time") < end_time) # Create the spatial filter for longitude and latitude # lon_min, lon_max, lat_min, lat_max = -180, 180, -90, 90 # location_filter = ( # (pyarrow.dataset.field("lon") >= lon_min) & (pyarrow.dataset.field("lon") <= lon_max) & # (pyarrow.dataset.field("lat") >= lat_min) & (pyarrow.dataset.field("lat") <= lat_max) # ) # Combine both filters with an AND operation # dataset_filter = time_filter & location_filter dataset_filter = time_filter return dataset_filter
@print_task_elapsed_time(prefix="Bucket Merging Terminated.") def merge_granule_buckets( src_bucket_dir, dst_bucket_dir, # Bucket structure dst_spatial_partitioning=None, temporal_partitioning="year", # Update options start_time=None, end_time=None, update=False, # Parquet options row_group_size="200MB", max_file_size="2GB", compression="snappy", compression_level=None, write_metadata=False, write_statistics=False, # Computing options max_open_files=0, use_threads=True, # Scanner options batch_size=131_072, batch_readahead=10, fragment_readahead=20, ): """Merge the per-granule bucket archive in a single optimized archive. Set ulimit -n 999999 before running this routine ! If you use ``write_metadata=True``, the archive can't currently be updated ! Parameters ---------- src_bucket_dir : str Base directory of the per-granule bucket archive. dst_bucket_dir : str Directory path of the final bucket archive. start_time : datetime.datetime Start time of the file to consolidate. The default is ``None``. end_time : datetime.datetime Non-inclusive end time of the file to consolidate. The default is ``None``. temporal_partitioning: Define the temporal partitions over which to groups files together. Only to be defined for a new bucket archive. Valid values are "year", "month", "season", "quarter" or "day". The default value is "year". If ``update=True``, use the temporal partitions of the existing bucket archive. update : bool Whether to update an existing bucket archive with new data. Make sure to specify start_time and end_time covering the time period of groups to avoid loss of data. If grouping by "year", specify start_time to be on January first at 00:00:00. If grouping by year and months, specify the start time to be the first day of the month at 00:00:00. The default is ``False``. write_metadata: bool, The default is ``False``. Collect in a single metadata file all row groups statistics of all files. If ``True``, slow down a lot the routine. If you use ``write_metadata=True``, the archive can't currently be updated ! write_statistics: bool or list Whether to compute and include column-level statistics (such as minimum, maximum, and null counts) in the metadata of each row group for all or some specific columns. Row group statistics allow for some more efficient queries as they might allow skipping irrelevant row groups or reading of entire files. If ``True`` (or some columns are specified), the routine can take much longer to execute ! The default is ``False`` row_group_size : int or str, optional Maximum number of rows to be written in each Parquet row group. If specified as a string (i.e. ``"200 MB"``), the equivalent number of rows is estimated. The default is ``"200MB"``. max_file_size: str, optional Maximum number of rows to be written in a Parquet file. If specified as a string, the equivalent number of rows is estimated. Ideally the value should be a multiple of ``row_group_size``. The default is ``"2GB"``. compression : str, optional Specify the compression codec, either on a general basis or per-column. Valid values: ``{"none", "snappy", "gzip", "brotli", "lz4", "zstd"}``. The default is ``snappy``. compression_level : int or dict, optional Specify the compression level for a codec, either on a general basis or per-column. If ``None`` is passed, arrow selects the compression level for the compression codec in use. The compression level has a different meaning for each codec, so you have to read the pyArrow documentation of the codec you are using at https://arrow.apache.org/docs/python/generated/pyarrow.Codec.html The default is ``None``. max_open_files, int, optional If greater than 0 then this will limit the maximum number of files that can be left open. If an attempt is made to open too many files then the least recently used file will be closed. If this setting is set too low you may end up fragmenting your data into many small files. The default is ``0``. Note that Linux has a default limit of ``1024``. Before starting the python session, increase it with ``ulimit -n <new_much_higher_limit>``. use_threads: bool, optional If enabled, then maximum parallelism will be used to read and write files (in multithreading). The number of threads is determined by the number of available CPU cores. The default is ``True``. batch_size : int Maximum number of rows per record batch produced by the dataset scanner. For concatenating small files (each typically a single fragment with one row group), set batch_size larger than the number of rows of the small files so that data from multiple files can be aggregated into a single batch. This helps reduce per-batch overhead. If scanned record batches are overflowing memory then this value can be reduced to reduce the memory usage. The default value is ``131_072``. batch_readahead : int The number of batches to prefetch asynchronously from an open file. Increasing this number will increase RAM usage but could also improve IO utilization. When each file contains a single row group (and thus only one batch), the benefit of batch_readahead is limited. In such cases, a lower value is generally sufficient. The default is ``10``. fragment_readahead : int The number of individual small files to prefetch concurrently. Increasing this number will increase RAM usage but could also improve IO utilization. Prefetching multiple fragments concurrently helps hide the latency of opening and reading each file. The default is ``20``. Recommendations --------------- - For small files with a single row group, ensure that batch_size exceeds the number of rows in any individual file to allow efficient aggregation. - Focus on tuning fragment_readahead to prefetch multiple files simultaneously, as this yields greater performance benefits than batch_readahead in this context. - Adjust these parameters based on system memory and available threads; while they operate asynchronously, excessively high values may oversubscribe system resources without further gains. Returns ------- None. """ # Check arguments for update=True if update: # Check destination bucket dir exists if update=True if not os.path.exists(dst_bucket_dir): raise OSError(f"The bucket directory {dst_bucket_dir} does not exists.") # Check write_metadata argument if write_metadata: raise NotImplementedError("If update=True, it is currently not possible to update the metadata.") # Check start_time and end_time if start_time is None or end_time is None: raise ValueError("Define both 'start_time' and 'end_time' if update=True.") start_time, end_time = check_start_end_time(start_time, end_time) # Retrieve source spatial partitioning src_spatial_partitioning = get_bucket_spatial_partitioning(bucket_dir=src_bucket_dir) # Retrieve destination spatial partitioning if update: dst_spatial_partitioning = get_bucket_spatial_partitioning(bucket_dir=dst_bucket_dir) temporal_partitioning = get_bucket_temporal_partitioning(bucket_dir=dst_bucket_dir) elif dst_spatial_partitioning is None: dst_spatial_partitioning = src_spatial_partitioning else: raise NotImplementedError("Repartitioning not implemented yet.") # Check temporal partitioning check_temporal_partitioning(temporal_partitioning) # Identify destination partitions # - Output: {dst_partition_tree: [src_partition_path, src_partition_path]} # - Currently we assume same partitioning between source and destination # TODO: # - Spatial Repartitioning is not yet implemented # - group/split src partition paths for desired dst partitions # - Perform filtering on lon and lat in dataset scanner by dst_spatial_partitioning bounds dict_partitions = define_dict_partitions( src_bucket_dir=src_bucket_dir, src_spatial_partitioning=src_spatial_partitioning, dst_spatial_partitioning=None, ) # TODO: not yet implemented n_partitions = len(dict_partitions) print(f"{n_partitions} geographic partitions to process.") # Write bucket info if not update: write_bucket_info( bucket_dir=dst_bucket_dir, spatial_partitioning=dst_spatial_partitioning, temporal_partitioning=temporal_partitioning, ) # -----------------------------------------------------------------------------------------------. # Retrieve table schema # - spatial_partitioning.levels are read by pq.read_table as dictionaries (depending on pyarrow version) # - spatial_partitioning.levels columns must be dropped by the table if present template_table = get_template_table(dict_partitions) if np.all(np.isin(dst_spatial_partitioning.levels, template_table.column_names)): template_table = template_table.drop_columns(dst_spatial_partitioning.levels) schema = template_table.schema # Define writer_kwargs writer_kwargs = {} writer_kwargs["row_group_size"] = row_group_size writer_kwargs["max_file_size"] = max_file_size writer_kwargs["compression"] = compression writer_kwargs["compression_level"] = compression_level writer_kwargs["max_open_files"] = max_open_files writer_kwargs["use_threads"] = use_threads writer_kwargs["write_metadata"] = write_metadata writer_kwargs["write_statistics"] = write_statistics writer_kwargs, metadata_collector = preprocess_writer_kwargs( writer_kwargs=writer_kwargs, df=template_table, ) # -----------------------------------------------------------------------------------------------. # Concatenate data within bins # - Cannot rewrite directly the full pyarrow.dataset because there is no way to specify when # data from each partition have been scanned completely (and can be written to disk) print("Start concatenating the granules bucket archive") n_partitions = len(dict_partitions) # partition_label = list(dict_partitions)[0] # list_src_partition_dir = dict_partitions[partition_label] for partition_label, list_src_partition_dir in tqdm(dict_partitions.items(), total=n_partitions): # Retrieve all available filepaths (sorted) filepaths = get_filepaths_within_paths( paths=list_src_partition_dir, parallel=False, file_extension=".parquet", glob_pattern=None, regex_pattern=None, ) # Filter by time window if start_time is not None and end_time is not None: filepaths = filter_filepaths(filepaths, start_time=start_time, end_time=end_time) # Check file left if len(filepaths) == 0: print(f"No data to consolidate for partition {partition_label}.") continue # Define destination partition directory dst_partition_dir = os.path.join(dst_bucket_dir, partition_label) # If the directory already exists and update=False, raise an error ! if not update and os.path.exists(dst_partition_dir): raise ValueError(f"The partition {partition_label} already exists. Use 'update=True' to update an archive.") # Define groups to create groups_dict = group_files_by_time( filepaths=filepaths, start_time=start_time, end_time=end_time, temporal_partitioning=temporal_partitioning, ) # If update=True, remove archived destination files of groups to be updated if update: # Ensure partition directory exists os.makedirs(dst_partition_dir, exist_ok=True) # Retrieve path of existing files in the destination archive existing_filepaths = list_and_filter_files( path=dst_partition_dir, file_extension=".parquet", glob_pattern=None, regex_pattern=None, sort=False, # small speed up ) # Remove files that starts with the same time_prefix (old file) # - Examples of groups_dict keys: 2021_1 when groups=["year", "month"] # - Example of filename: 2021_1_{i}.parquet where {i} is an id given by pyarrow parquet for time_prefix in list(groups_dict): for filepath in existing_filepaths: if os.path.basename(filepath).startswith(time_prefix): os.remove(filepath) # Save a consolidated parquet by the specified time group for time_prefix, (group_start_time, group_end_time, src_filepaths) in groups_dict.items(): # Define filename pattern basename_template = f"{time_prefix}_" + "{i}.parquet" # Define pyarrow.Expression to filter rows based on time and geolocation/geometry # - TODO: geolocation filter when repartitioning dataset_filter = define_dataset_filter( start_time=group_start_time, end_time=group_end_time, # dst_spatial_partitioning, partition_label or extent ) # Read Dataset dataset = pyarrow.dataset.dataset(list(src_filepaths), format="parquet") # Define scanner scanner = dataset.scanner( batch_size=batch_size, batch_readahead=batch_readahead, fragment_readahead=fragment_readahead, use_threads=use_threads, filter=dataset_filter, ) # Rewrite dataset pa.dataset.write_dataset( scanner, base_dir=dst_partition_dir, basename_template=basename_template, # Directory options create_dir=True, existing_data_behavior="overwrite_or_ignore", # Options **writer_kwargs, ) # Write metadata if asked if write_metadata and metadata_collector: write_dataset_metadata(base_dir=dst_bucket_dir, metadata_collector=metadata_collector, schema=schema)