Source code for gpm.dataset.dataset

# -----------------------------------------------------------------------------.
# MIT License

# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# -----------------------------------------------------------------------------.
"""This module contains functions to read files into a GPM-API Dataset or DataTree."""
import warnings
from functools import partial

import xarray as xr

from gpm.configs import get_base_dir
from gpm.dataset.conventions import finalize_dataset
from gpm.dataset.granule import _multi_file_closer, get_scan_modes_datasets
from gpm.io.checks import (
    check_groups,
    check_product,
    check_scan_mode,
    check_scan_modes,
    check_start_end_time,
    check_valid_time_request,
    check_variables,
)
from gpm.io.find import find_filepaths
from gpm.utils.checks import has_missing_granules
from gpm.utils.warnings import GPM_Warning

# from gpm.utils.dask import get_scheduler


def _concat_datasets(l_datasets):
    """Concatenate datasets together."""
    dims = list(l_datasets[0].dims)
    is_grid = "time" in dims
    concat_dim = "time" if is_grid else "along_track"

    # Concatenate the datasets
    with xr.set_options(use_new_combine_kwarg_defaults=True):
        ds = xr.concat(
            l_datasets,
            dim=concat_dim,
            coords="minimal",  # "all"
            compat="override",
            join="outer",
            combine_attrs="override",
        )
    return ds


def _try_open_granule(filepath, scan_modes, decode_cf, variables, groups, prefix_group, chunks, **kwargs):
    """Try open a granule."""
    try:
        dict_ds_scan_modes, dt_closer = get_scan_modes_datasets(
            filepath=filepath,
            scan_modes=scan_modes,
            groups=groups,
            variables=variables,
            decode_cf=decode_cf,
            chunks=chunks,
            prefix_group=prefix_group,
            **kwargs,
        )
    except Exception as e:
        msg = f"The following error occurred while opening the {filepath} granule: {e}"
        warnings.warn(msg, GPM_Warning, stacklevel=2)
        dict_ds_scan_modes = None
        dt_closer = None
    return dict_ds_scan_modes, dt_closer


def _get_scan_modes_datasets_and_closers(filepaths, parallel, scan_modes, decode_cf=False, **open_kwargs):
    """Open the granule in parallel with dask delayed."""
    # Define functions to open files
    if parallel:
        import dask

        open_ = dask.delayed(_try_open_granule)
    else:
        open_ = _try_open_granule

    # ----------------------------------------------------.
    # Open files
    list_info = [open_(filepath, scan_modes=scan_modes, decode_cf=decode_cf, **open_kwargs) for filepath in filepaths]

    # If parallel=True, compute the delayed datasets lists here
    # - The underlying data are stored as dask arrays (and are not computed !)
    if parallel:
        list_info = dask.compute(*list_info)

    # Check that at least one file has been opened successfully
    if len(list_info) == 0:
        raise ValueError("No files could be opened with current request.")

    # Retrieve scan modes list
    scan_modes = list(list_info[0][0])

    # ----------------------------------------------------.
    # Retrieve datatree closers
    list_dt_closers = [dt_closer for _, dt_closer in list_info]

    # Retrieve scan modes closers
    list_dict_scan_modes = [dict_scan_modes for dict_scan_modes, _ in list_info]

    # Remove None elements from the list
    list_dt_closers = [closer for closer in list_dt_closers if closer is not None]
    list_dict_scan_modes = [dict_scan_modes for dict_scan_modes in list_dict_scan_modes if dict_scan_modes is not None]

    # Check there are valid granules
    if len(list_dict_scan_modes) == 0:
        raise ValueError("Impossible to open GPM granules with current request.")

    # Define list of dataset for each scan_mode
    dict_scan_modes_datasets = {
        scan_mode: [dict_scan_modes[scan_mode] for dict_scan_modes in list_dict_scan_modes] for scan_mode in scan_modes
    }
    # Define list of closers for each scan_mode
    dict_scan_modes_closers = {
        scan_mode: [dict_scan_modes[scan_mode]._close for dict_scan_modes in list_dict_scan_modes]
        for scan_mode in scan_modes
    }

    # Concat datasets within each scan mode
    dict_scan_modes_dataset = {
        scan_mode: _concat_datasets(list_datasets) for scan_mode, list_datasets in dict_scan_modes_datasets.items()
    }

    # Specify scan modes closers
    for scan_mode, scan_modes_closers in dict_scan_modes_closers.items():
        dict_scan_modes_dataset[scan_mode].set_close(partial(_multi_file_closer, scan_modes_closers))

    return dict_scan_modes_dataset, list_dt_closers


[docs] def open_dataset( product, start_time, end_time, variables=None, groups=None, scan_mode=None, version=None, product_type="RS", chunks=-1, decode_cf=True, parallel=False, prefix_group=False, verbose=False, base_dir=None, **kwargs, ): """Lazily map HDF5 data into xarray.Dataset with relevant GPM data and attributes. Note: - ``gpm.open_dataset`` does not load GPM granules with the FileHeader flag ``'EmptyGranule' != 'NOT_EMPTY'``. - The coordinates ``Quality`` or ``dataQuality`` provide an overall quality flag status. - The coordinate ``SCorientation`` provides the orientation of the sensor from the forward track of the satellite. Parameters ---------- product : str GPM product acronym. start_time : datetime.datetime, datetime.date, numpy.datetime64 or str Start time. Accepted types: ``datetime.datetime``, ``datetime.date``, ``numpy.datetime64`` or ``str``. If string type, it expects the isoformat ``YYYY-MM-DD hh:mm:ss``. end_time : datetime.datetime, datetime.date, numpy.datetime64 or str End time. Accepted types: ``datetime.datetime``, ``datetime.date``, ``numpy.datetime64`` or ``str``. If string type, it expects the isoformat ``YYYY-MM-DD hh:mm:ss``. variables : list, str, optional Variables to read from the HDF5 file. The default is ``None`` (all variables). groups : list, str, optional HDF5 Groups from which to read all variables. The default is ``None`` (all groups). scan_mode : str, optional Scan mode of the GPM product. The default is ``None``. Use ``gpm.available_scan_modes(product, version)`` to get the available scan modes for a specific product. The radar products have the following scan modes: - ``'FS'``: Full Scan. For Ku, Ka and DPR (since version 7 products). - ``'NS'``: Normal Scan. For Ku band and DPR (till version 6 products). - ``'MS'``: Matched Scan. For Ka band and DPR (till version 6 products). - ``'HS'``: High-sensitivity Scan. For Ka band and DPR. product_type : str, optional GPM product type. Either ``'RS'`` (Research) or ``'NRT'`` (Near-Real-Time). The default is ``'RS'``. version : int, optional GPM version of the data to retrieve if ``product_type = "RS"``. GPM data readers currently support version 4, 5, 6 and 7. chunks : int, dict, str or None, optional Chunk size for dask array: - ``chunks=-1`` loads the dataset with dask using a single chunk for each granule arrays. - ``chunks={}`` loads the dataset with dask using the file chunks. - ``chunks='auto'`` will use dask ``auto`` chunking taking into account the file chunks. If you want to load data in memory directly, specify ``chunks=None``. The default is ``auto``. Hint: xarray's lazy loading of remote or on-disk datasets is often but not always desirable. Before performing computationally intense operations, load the dataset entirely into memory by invoking ``ds.compute()``. decode_cf: bool, optional Whether to decode the dataset. The default is ``False``. prefix_group: bool, optional Whether to add the group as a prefix to the variable names. If you aim to save the Dataset to disk as netCDF or Zarr, you need to set ``prefix_group=False`` or later remove the prefix before writing the dataset. The default is ``False``. parallel : bool If ``True``, the dataset are opened in parallel using :py:class:`dask.delayed.delayed`. If ``parallel=True``, ``'chunks'`` can not be ``None``. The underlying data must be :py:class:`dask.array.Array`. The default is ``False``. **kwargs : dict Additional keyword arguments passed to :py:func:`~xarray.open_dataset` for each group. Returns ------- xarray.Dataset """ ## Check valid product and variables base_dir = get_base_dir(base_dir=base_dir) product = check_product(product, product_type=product_type) variables = check_variables(variables) groups = check_groups(groups) ## Check scan_mode scan_mode = check_scan_mode(scan_mode, product, version=version) # Check valid start/end time start_time, end_time = check_start_end_time(start_time, end_time) start_time, end_time = check_valid_time_request(start_time, end_time, product) # Check parallel and chunks arguments if parallel and chunks is None: raise ValueError("If parallel=True, 'chunks' can not be None.") ##------------------------------------------------------------------------. # Find filepaths filepaths = find_filepaths( storage="LOCAL", version=version, product=product, product_type=product_type, start_time=start_time, end_time=end_time, verbose=verbose, base_dir=base_dir, ) ##------------------------------------------------------------------------. # Check that files have been downloaded on disk if len(filepaths) == 0: raise ValueError("No files found on disk. Please download them before.") ##------------------------------------------------------------------------. # Open and concatenate the scan mode of each granule dict_scan_modes, list_dt_closers = _get_scan_modes_datasets_and_closers( filepaths=filepaths, parallel=parallel, scan_modes=[scan_mode], decode_cf=False, # Custom options variables=variables, groups=groups, prefix_group=prefix_group, chunks=chunks, **kwargs, ) ds = dict_scan_modes[scan_mode] ##-------------------------------------------------------------------------. # TODO - Extract attributes and add as coordinate ? # - From each granule, select relevant (discard/sum values/copy) # - Sum of MissingData, NumberOfRainPixels # - MissingData in FileHeaderGroup: The number of missing scans. ##-------------------------------------------------------------------------. # Finalize dataset ds = finalize_dataset( ds=ds, product=product, scan_mode=scan_mode, decode_cf=decode_cf, start_time=start_time, end_time=end_time, ) ##------------------------------------------------------------------------. # Warns about missing granules if has_missing_granules(ds): msg = "The GPM Dataset has missing granules !" warnings.warn(msg, GPM_Warning, stacklevel=1) ##-------------------------------------------------------------------------. # Specify files closers ds.set_close(partial(_multi_file_closer, list_dt_closers)) ##------------------------------------------------------------------------. # Return Dataset return ds
[docs] def open_datatree( product, start_time, end_time, variables=None, groups=None, scan_modes=None, version=None, product_type="RS", chunks=-1, decode_cf=True, parallel=False, prefix_group=False, verbose=False, base_dir=None, **kwargs, ): """Lazily map HDF5 data into xarray.DataTree objects with relevant GPM data and attributes. Note: - ``gpm.open_datatree`` does not load GPM granules with the FileHeader flag ``'EmptyGranule' != 'NOT_EMPTY'``. - The coordinates ``Quality`` or ``dataQuality`` provide an overall quality flag status. - The coordinate ``SCorientation`` provides the orientation of the sensor from the forward track of the satellite. Parameters ---------- product : str GPM product acronym. start_time : datetime.datetime, datetime.date, numpy.datetime64 or str Start time. Accepted types: ``datetime.datetime``, ``datetime.date``, ``numpy.datetime64`` or ``str``. If string type, it expects the isoformat ``YYYY-MM-DD hh:mm:ss``. end_time : datetime.datetime, datetime.date, numpy.datetime64 or str End time. Accepted types: ``datetime.datetime``, ``datetime.date``, ``numpy.datetime64`` or ``str``. If string type, it expects the isoformat ``YYYY-MM-DD hh:mm:ss``. variables : list, str, optional Variables to read from the HDF5 file. The default is ``None`` (all variables). groups : list, str, optional HDF5 Groups from which to read all variables. The default is ``None`` (all groups). scan_modes : str, optional Scan mode of the GPM product. If ``None`` (the default), loads all scan modes. Use ``gpm.available_scan_modes(product, version)`` to see the available scan modes for a specific product. The radar products have the following scan modes: - ``'FS'``: Full Scan. For Ku, Ka and DPR (since version 7 products). - ``'NS'``: Normal Scan. For Ku band and DPR (till version 6 products). - ``'MS'``: Matched Scan. For Ka band and DPR (till version 6 products). - ``'HS'``: High-sensitivity Scan. For Ka band and DPR. product_type : str, optional GPM product type. Either ``'RS'`` (Research) or ``'NRT'`` (Near-Real-Time). The default is ``'RS'``. version : int, optional GPM version of the data to retrieve if ``product_type = "RS"``. GPM data readers currently support version 4, 5, 6 and 7. chunks : int, dict, str or None, optional Chunk size for dask array: - ``chunks=-1`` loads the dataset with dask using a single chunk for each granule arrays. - ``chunks={}`` loads the dataset with dask using the file chunks. - ``chunks='auto'`` will use dask ``auto`` chunking taking into account the file chunks. If you want to load data in memory directly, specify ``chunks=None``. The default is ``auto``. Hint: xarray's lazy loading of remote or on-disk datasets is often but not always desirable. Before performing computationally intense operations, load the dataset entirely into memory by invoking ``ds.compute()``. decode_cf: bool, optional Whether to decode the dataset. The default is ``False``. prefix_group: bool, optional Whether to add the group as a prefix to the variable names. If you aim to save the Dataset to disk as netCDF or Zarr, you need to set ``prefix_group=False`` or later remove the prefix before writing the dataset. The default is ``False``. parallel : bool If ``True``, the dataset are opened in parallel using :py:class:`dask.delayed.delayed`. If ``parallel=True``, ``'chunks'`` can not be ``None``. The underlying data must be :py:class:`dask.array.Array`. The default is ``False``. **kwargs : dict Additional keyword arguments passed to :py:func:`~xarray.open_datatree` for each group. Returns ------- xarray.DataTree """ ## Check valid product and variables product = check_product(product, product_type=product_type) variables = check_variables(variables) groups = check_groups(groups) base_dir = get_base_dir(base_dir=base_dir) # Check scan_modes scan_modes = check_scan_modes(scan_modes=scan_modes, product=product, version=version) # Check valid start/end time start_time, end_time = check_start_end_time(start_time, end_time) start_time, end_time = check_valid_time_request(start_time, end_time, product) # Check parallel and chunks arguments if parallel and chunks is None: raise ValueError("If parallel=True, 'chunks' can not be None.") ##------------------------------------------------------------------------. # Find filepaths filepaths = find_filepaths( storage="LOCAL", version=version, product=product, product_type=product_type, start_time=start_time, end_time=end_time, verbose=verbose, base_dir=base_dir, ) ##------------------------------------------------------------------------. # Check that files have been downloaded on disk if len(filepaths) == 0: raise ValueError("No files found on disk. Please download them before.") ##------------------------------------------------------------------------. dict_scan_modes, list_dt_closers = _get_scan_modes_datasets_and_closers( filepaths=filepaths, parallel=parallel, scan_modes=scan_modes, decode_cf=False, # Custom options variables=variables, groups=groups, prefix_group=prefix_group, chunks=chunks, **kwargs, ) # Finalize datatree dict_scan_modes = { scan_mode: finalize_dataset( ds=ds, product=product, scan_mode=scan_mode, decode_cf=decode_cf, start_time=start_time, end_time=end_time, ) for scan_mode, ds in dict_scan_modes.items() } # Create datatree dt = xr.DataTree.from_dict(dict_scan_modes) # Specify scan modes closers for scan_mode, ds in dict_scan_modes.items(): dt[scan_mode].set_close(ds._close) # Specify files closers dt.set_close(partial(_multi_file_closer, list_dt_closers)) ##------------------------------------------------------------------------. # Warns about missing granules if has_missing_granules(dt[scan_mode]): msg = "The GPM DataTree has missing granules !" warnings.warn(msg, GPM_Warning, stacklevel=1) ##------------------------------------------------------------------------. return dt
def _infer_product_name(ds) -> str | None: """Infer product name from GPM Dataset attributes.""" from gpm.io.products import get_products_attributes_dict # Safely retrieve required attributes; return None if any are missing. algorithm_id = ds.attrs.get("AlgorithmID") satellite_name = ds.attrs.get("SatelliteName") instrument_name = ds.attrs.get("InstrumentName") if algorithm_id is None or satellite_name is None or instrument_name is None: return None # Try to identify the product based on the file attributes products_dict = get_products_attributes_dict() for product, attrs in products_dict.items(): if ( attrs["AlgorithmID"] == algorithm_id and attrs["SatelliteName"] == satellite_name and attrs["InstrumentName"] == instrument_name ): return product return None
[docs] def open_files( filepaths, parallel=False, scan_modes=None, groups=None, variables=None, prefix_group=False, start_time=None, end_time=None, chunks=-1, decode_cf=True, **kwargs, ): ##------------------------------------------------------------------------. # Ensure filepaths is a list if isinstance(filepaths, str): filepaths = [filepaths] if len(filepaths) == 0: raise ValueError("No filepaths provided.") ##------------------------------------------------------------------------. dict_scan_modes, list_dt_closers = _get_scan_modes_datasets_and_closers( filepaths=filepaths, parallel=parallel, scan_modes=scan_modes, decode_cf=False, # Custom options variables=variables, groups=groups, prefix_group=prefix_group, chunks=chunks, **kwargs, ) if len(dict_scan_modes) == 0: raise ValueError("GPM-API couldn't open the file. Open a GitHub issue explaining the problem.") # Retrieve scan_modes from dictionary scan_modes = sorted(dict_scan_modes) # Infer product from file product = _infer_product_name(dict_scan_modes[scan_modes[0]]) # Warn if product is unknown if product is None: msg = "GPM-API didn't apply specialized variables decoding because product is unknown !" warnings.warn(msg, GPM_Warning, stacklevel=2) # Finalize datatree dict_scan_modes = { scan_mode: finalize_dataset( ds=ds, product=product, scan_mode=scan_mode, decode_cf=decode_cf, start_time=start_time, end_time=end_time, ) for scan_mode, ds in dict_scan_modes.items() } # Create datatree dt = xr.DataTree.from_dict(dict_scan_modes) # Specify scan modes closers for scan_mode, ds in dict_scan_modes.items(): dt[scan_mode].set_close(ds._close) # Specify files closers dt.set_close(partial(_multi_file_closer, list_dt_closers)) return dt