# -----------------------------------------------------------------------------.
# MIT License
# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
# -----------------------------------------------------------------------------.
"""This module provide tools to extraction information from the granules' filenames."""
import datetime
import os
import re
from collections import defaultdict
import numpy as np
####---------------------------------------------------------------------------
########################
#### filename PATTERNS ####
########################
# General pattern for all GPM products
NASA_RS_filename_PATTERN = "{product_level:s}.{satellite:s}.{sensor:s}.{algorithm:s}.{start_date:%Y%m%d}-S{start_time:%H%M%S}-E{end_time:%H%M%S}.{granule_id}.{version}.{data_format}" # noqa
NASA_NRT_filename_PATTERN = "{product_level:s}.{satellite:s}.{sensor:s}.{algorithm:s}.{start_date:%Y%m%d}-S{start_time:%H%M%S}-E{end_time:%H%M%S}.{version}.{data_format}" # noqa
# General pattern for all JAXA products
# - Pattern for 1B-Ku and 1B-Ka
JAXA_filename_PATTERN = "{mission_id}_{sensor:s}_{start_date_time:%y%m%d%H%M}_{end_time:%H%M}_{granule_id}_{product_level:2s}{product_type}_{algorithm:s}_{version}.{data_format}" # noqa
####---------------------------------------------------------------------------.
##########################
#### Filename parsers ####
##########################
def _parse_gpm_filename(filename):
from trollsift import Parser
# Retrieve information from filename
try:
p = Parser(NASA_RS_filename_PATTERN)
info_dict = p.parse(filename)
info_dict["product_type"] = "RS"
except ValueError:
p = Parser(NASA_NRT_filename_PATTERN)
info_dict = p.parse(filename)
info_dict["product_type"] = "NRT"
# Retrieve correct start_time and end_time
start_date = info_dict["start_date"]
start_time = info_dict["start_time"]
end_time = info_dict["end_time"]
start_datetime = start_date.replace(
hour=start_time.hour,
minute=start_time.minute,
second=start_time.second,
)
end_datetime = start_date.replace(
hour=end_time.hour,
minute=end_time.minute,
second=end_time.second,
)
if end_time < start_time:
end_datetime = end_datetime + datetime.timedelta(days=1)
info_dict.pop("start_date")
info_dict["start_time"] = start_datetime
info_dict["end_time"] = end_datetime
# Cast granule_id to integer
if info_dict["product_type"] == "RS":
info_dict["granule_id"] = int(info_dict["granule_id"])
return info_dict
def _parse_jaxa_filename(filename):
from trollsift import Parser
p = Parser(JAXA_filename_PATTERN)
info_dict = p.parse(filename)
# Retrieve correct start_time and end_time
start_datetime = info_dict["start_date_time"]
end_time = info_dict["end_time"]
end_datetime = start_datetime.replace(
hour=end_time.hour,
minute=end_time.minute,
second=end_time.second,
)
if end_datetime < start_datetime:
end_datetime = end_datetime + datetime.timedelta(days=1)
info_dict.pop("start_date_time")
info_dict["start_time"] = start_datetime
info_dict["end_time"] = end_datetime
# Product type
product_type = info_dict["product_type"]
if product_type == "S":
info_dict["product_type"] = "RS"
elif product_type == "R":
info_dict["product_type"] = "NRT"
else:
raise ValueError("Report the bug.")
# Infer satellite
mission_id = info_dict["mission_id"]
if "GPM" in mission_id:
info_dict["satellite"] = "GPM"
if "TRMM" in mission_id:
info_dict["satellite"] = "TRMM"
# Cast granule_id to integer
info_dict["granule_id"] = int(info_dict["granule_id"])
return info_dict
def _get_info_from_filename(filename):
"""Retrieve file information dictionary from filename."""
try:
info_dict = _parse_gpm_filename(filename)
except ValueError:
try:
info_dict = _parse_jaxa_filename(filename)
except Exception:
raise ValueError(f"Impossible to infer file information from '{filename}'")
# Add product information
# - ATTENTION: can not be inferred for products not defined in etc/products.yaml
info_dict["product"] = get_product_from_filepath(filename)
# Return info dictionary
return info_dict
[docs]
def get_info_from_filepath(filepath):
"""Retrieve file information dictionary from filepath."""
if not isinstance(filepath, str):
raise TypeError("'filepath' must be a string.")
filename = os.path.basename(filepath)
return _get_info_from_filename(filename)
[docs]
def get_key_from_filepath(filepath, key):
"""Extract specific key information from a list of filepaths."""
return get_info_from_filepath(filepath)[key]
[docs]
def get_key_from_filepaths(filepaths, key):
"""Extract specific key information from a list of filepaths."""
if isinstance(filepaths, str):
filepaths = [filepaths]
return [get_key_from_filepath(filepath, key=key) for filepath in filepaths]
####--------------------------------------------------------------------------.
#########################################
#### Product and version information ####
#########################################
[docs]
def get_product_from_filepath(filepath):
"""Infer granules ``product`` from file path."""
from gpm.io.products import get_products_pattern_dict
patterns_dict = get_products_pattern_dict()
for product, pattern in patterns_dict.items():
if re.search(pattern, filepath):
return product
raise ValueError(f"GPM Product unknown for {filepath}.")
[docs]
def get_product_from_filepaths(filepaths):
"""Infer granules ``product`` from file paths."""
if isinstance(filepaths, str):
filepaths = [filepaths]
return [get_product_from_filepath(filepath) for filepath in filepaths]
[docs]
def get_version_from_filepath(filepath, integer=True):
"""Infer granule ``version`` from file path."""
version = get_key_from_filepath(filepath, key="version")
if integer:
version = int(re.findall("\\d+", version)[0])
return version
[docs]
def get_version_from_filepaths(filepaths, integer=True):
"""Infer granules ``version`` from file paths."""
if isinstance(filepaths, str):
filepaths = [filepaths]
return [get_version_from_filepath(filepath, integer=integer) for filepath in filepaths]
[docs]
def get_granule_from_filepaths(filepaths):
"""Infer GPM Granule IDs from file paths."""
return get_key_from_filepaths(filepaths, key="granule_id")
[docs]
def get_start_time_from_filepaths(filepaths):
"""Infer granules ``start_time`` from file paths."""
return get_key_from_filepaths(filepaths, key="start_time")
[docs]
def get_end_time_from_filepaths(filepaths):
"""Infer granules ``end_time`` from file paths."""
return get_key_from_filepaths(filepaths, key="end_time")
[docs]
def get_start_end_time_from_filepaths(filepaths):
"""Infer granules ``start_time`` and ``end_time`` from file paths."""
list_start_time = get_key_from_filepaths(filepaths, key="start_time")
list_end_time = get_key_from_filepaths(filepaths, key="end_time")
return np.array(list_start_time), np.array(list_end_time)
####--------------------------------------------------------------------------.
#######################
#### Group utility ####
#######################
FILE_KEYS = [
"product_level",
"satellite",
"sensor",
"algorithm",
"start_time",
"end_time",
"granule_id",
"version",
"product_type",
"product",
"data_format",
]
TIME_KEYS = [
"year",
"month",
"month_name",
"quarter",
"season",
"day",
"doy",
"dow",
"hour",
"minute",
"second",
]
[docs]
def check_groups(groups):
"""Check groups validity."""
if not isinstance(groups, (str, list)):
raise TypeError("'groups' must be a list (or a string if a single group is specified.")
if isinstance(groups, str):
groups = [groups]
groups = np.array(groups)
valid_keys = FILE_KEYS + TIME_KEYS
invalid_keys = groups[np.isin(groups, valid_keys, invert=True)]
if len(invalid_keys) > 0:
raise ValueError(f"The following group keys are invalid: {invalid_keys}. Valid values are {valid_keys}.")
return groups.tolist()
[docs]
def get_season(time):
"""Get season from `datetime.datetime` or `datetime.date` object."""
month = time.month
if month in [12, 1, 2]:
return "DJF" # Winter (December, January, February)
if month in [3, 4, 5]:
return "MAM" # Spring (March, April, May)
if month in [6, 7, 8]:
return "JJA" # Summer (June, July, August)
return "SON" # Autumn (September, October, November)
[docs]
def get_time_component(time, component):
"""Get time component from `datetime.datetime` object."""
func_dict = {
"year": lambda time: time.year,
"month": lambda time: time.month,
"day": lambda time: time.day,
"doy": lambda time: time.timetuple().tm_yday, # Day of year
"dow": lambda time: time.weekday(), # Day of week (0=Monday, 6=Sunday)
"hour": lambda time: time.hour,
"minute": lambda time: time.minute,
"second": lambda time: time.second,
# Additional
"month_name": lambda time: time.strftime("%B"), # Full month name
"quarter": lambda time: (time.month - 1) // 3 + 1, # Quarter (1-4)
"season": lambda time: get_season(time), # Season (DJF, MAM, JJA, SON)
}
return str(func_dict[component](time))
def _get_groups_value(groups, filepath):
"""Return the value associated to the groups keys.
If multiple keys are specified, the value returned is a string of format: ``<group_value_1>/<group_value_2>/...``
If a single key is specified and is ``start_time`` or ``end_time``, the function
returns a :py:class:`datetime.datetime` object.
"""
single_key = len(groups) == 1
info_dict = get_info_from_filepath(filepath)
start_time = info_dict["start_time"]
list_key_values = []
for key in groups:
if key in TIME_KEYS:
list_key_values.append(get_time_component(start_time, component=key))
else:
value = info_dict.get(key, f"{key}=None")
list_key_values.append(value if single_key else str(value))
if single_key:
return list_key_values[0]
return "/".join(list_key_values)
[docs]
def group_filepaths(filepaths, groups=None):
"""
Group filepaths in a dictionary if groups are specified.
Parameters
----------
filepaths : list
List of filepaths.
groups: list or str
The group keys by which to group the filepaths.
Valid group keys are ``product_level``, ``satellite``, ``sensor``, ``algorithm``,
``start_time``, ``end_time``,
``granule_id``, ``version``, ``product_type``, ``product``, ``data_format``,
``year``, ``month``, ``day``, ``doy``, ``dow``, ``hour``, ``minute``, ``second``,
``month_name``, ``quarter``, ``season``.
The time components are extracted from ``start_time`` !
If groups is ``None`` returns the input filepaths list.
The default is ``None``.
Returns
-------
dict or list
Either a dictionary of format ``{<group_value>: <list_filepaths>}``.
or the original input filepaths (if ``groups=None``)
"""
if groups is None:
return filepaths
groups = check_groups(groups)
filepaths_dict = defaultdict(list)
_ = [filepaths_dict[_get_groups_value(groups, filepath)].append(filepath) for filepath in filepaths]
return dict(filepaths_dict)