# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module contains utilities for GPM Data Archiving."""
import warnings
import numpy as np
from gpm.io.checks import check_start_end_time
from gpm.io.find import find_filepaths
from gpm.io.info import (
get_end_time_from_filepaths,
get_granule_from_filepaths,
get_start_time_from_filepaths,
)
from gpm.utils.warnings import GPM_Warning
####--------------------------------------------------------------------------.
###########################
#### Data completeness ####
###########################
# TODO: move to io/archiving.py in future
[docs]
def check_no_duplicated_files(
product,
start_time,
end_time,
version=None,
product_type="RS",
verbose=True,
):
"""Check that there are not duplicated files based on granule number."""
##--------------------------------------------------------------------.
# Find filepaths
filepaths = find_filepaths(
storage="LOCAL",
version=version,
product=product,
product_type=product_type,
start_time=start_time,
end_time=end_time,
verbose=verbose,
)
##---------------------------------------------------------------------.
# Check that files have been downloaded on disk
if len(filepaths) == 0:
raise ValueError("No files found on disk. Please download them before.")
##---------------------------------------------------------------------.
# Retrieve granule id from filename
filepaths = np.array(filepaths)
granule_ids = get_granule_from_filepaths(filepaths)
# Count granule ids occurrence
ids, counts = np.unique(granule_ids, return_counts=True)
# Get duplicated indices
idx_ids_duplicated = np.where(counts > 1)[0].flatten()
n_duplicated = len(idx_ids_duplicated)
if n_duplicated > 0:
duplicated_ids = ids[idx_ids_duplicated]
for granule_id in duplicated_ids:
idx_paths_duplicated = np.where(granule_id == granule_ids)[0].flatten()
tmp_paths_duplicated = filepaths[idx_paths_duplicated].tolist()
print(f"Granule {granule_id} has duplicated filepaths:")
for path in tmp_paths_duplicated:
print(f"- {path}")
raise ValueError("There are {n_duplicated} duplicated granules.")
[docs]
def check_time_period_coverage(filepaths, start_time, end_time, raise_error=False):
"""Check time period start_time, end_time is covered.
If raise_error=True, raise error if time period is not covered.
If raise_error=False, it raise a GPM warning.
"""
# Check valid start/end time
start_time, end_time = check_start_end_time(start_time, end_time)
# Get first and last timestep from filepaths
filepaths = sorted(filepaths)
first_start = get_start_time_from_filepaths(filepaths[0])[0]
last_end = get_end_time_from_filepaths(filepaths[-1])[0]
# Check time period is covered
msg = ""
if first_start > start_time:
msg = f"The first file start_time ({first_start}) occurs after the specified start_time ({start_time})"
if last_end < end_time:
msg1 = f"The last file end_time ({last_end}) occurs before the specified end_time ({end_time})"
msg = msg + "; and t" + msg1[1:] if msg != "" else msg1
if msg != "":
if raise_error:
raise ValueError(msg)
warnings.warn(msg, GPM_Warning, stacklevel=1)
[docs]
def get_time_period_with_missing_files(filepaths):
"""It returns the time period where the are missing granules.
It assumes the input filepaths are for a single GPM product.
Parameters
----------
filepaths : list
List of GPM file paths.
Returns
-------
list_missing : list
List of tuple (start_time, end_time).
"""
from gpm.utils.checks import _is_contiguous_granule
from gpm.utils.slices import get_list_slices_from_bool_arr
# Retrieve granule id from filename
granule_ids = get_granule_from_filepaths(filepaths)
# Sort filepaths by granule number
indices = np.argsort(granule_ids)
filepaths = np.array(filepaths)[indices]
granule_ids = np.array(granule_ids)[indices]
# Check if next file granule number is +1
is_not_missing = _is_contiguous_granule(granule_ids)
# If there are missing files
list_missing = []
if np.any(~is_not_missing):
# Retrieve slices with unmissing granules
# - Do not skip consecutive False
# --> is_not_missing=np.array([False, False, True, True, False, False])
# --> list_slices = [slice(0, 1, None), slice(1, 2, None), slice(2, 5, None), slice(5, 6, None)]
list_slices = get_list_slices_from_bool_arr(
is_not_missing,
include_false=True,
skip_consecutive_false=False,
)
# Retrieve start and end_time where there are missing files
for slc in list_slices[0:-1]:
missing_start = get_end_time_from_filepaths(filepaths[slc.stop - 1])[0]
missing_end = get_start_time_from_filepaths(filepaths[slc.stop])[0]
list_missing.append((missing_start, missing_end))
return list_missing
[docs]
def check_archive_completeness(
product,
start_time,
end_time,
version=None,
product_type="RS",
download=True,
transfer_tool="WGET",
n_threads=4,
verbose=True,
):
"""Check that the GPM product archive is not missing granules over a given period.
This function does not require connection to the PPS to search for the missing files.
However, the start and end period are based on the first and last file found on disk !
If download=True, it attempt to download the missing granules.
Parameters
----------
product : str
GPM product acronym.
start_time : datetime.datetime
Start time.
end_time : datetime.datetime
End time.
product_type : str, optional
GPM product type. Either ``RS`` (Research) or ``NRT`` (Near-Real-Time).
version : int, optional
GPM version of the data to retrieve if ``product_type = "RS"``.
GPM data readers currently support version 4, 5, 6 and 7.
download : bool, optional
Whether to download the missing files.
The default is ``True``.
n_threads : int, optional
Number of parallel downloads. The default is set to 10.
transfer_tool : str, optional
Whether to use ``curl`` or ``wget`` for data download. The default is ``curl``.
verbose : bool, optional
Whether to print processing details. The default is ``False``.
"""
##--------------------------------------------------------------------.
from gpm.io.download import download_archive
# -------------------------------------------------------------------------.
# Check valid start/end time
start_time, end_time = check_start_end_time(start_time, end_time)
##--------------------------------------------------------------------.
# Find filepaths
filepaths = find_filepaths(
storage="LOCAL",
version=version,
product=product,
product_type=product_type,
start_time=start_time,
end_time=end_time,
verbose=verbose,
)
##---------------------------------------------------------------------.
# Check that files have been downloaded on disk
if len(filepaths) == 0:
raise ValueError("No files found on disk. Please download them before.")
##---------------------------------------------------------------------.
# Check that the specified time period is covered
check_time_period_coverage(filepaths, start_time, end_time, raise_error=False)
##---------------------------------------------------------------------.
# Loop over files and retrieve time period with missing granules
list_missing_periods = get_time_period_with_missing_files(filepaths)
# If there are missing data,
if len(list_missing_periods) > 0:
if download: # and download=True
# Attempt to download the missing data
for s_time, e_time in list_missing_periods:
download_archive(
version=version,
product=product,
product_type=product_type,
start_time=s_time,
end_time=e_time,
n_threads=n_threads,
transfer_tool=transfer_tool,
check_integrity=True,
remove_corrupted=True,
retry=2,
verbose=verbose,
)
else:
# Otherwise print time periods with missing data and raise error
for s_time, e_time in list_missing_periods:
print(f"- Missing data between {s_time} and {e_time}")
raise ValueError(
"The GPM {product} archive is not complete between {start_time} and {end_time}.",
)
####--------------------------------------------------------------------------.