Source code for gpm.utils.directories

# -----------------------------------------------------------------------------.
# MIT License

# Copyright (c) 2024 GPM-API developers
#
# This file is part of GPM-API.

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

# -----------------------------------------------------------------------------.
"""This module contains functions to search files and directories into the local machine."""
import concurrent
import fnmatch
import glob
import os
import pathlib
import re

from gpm.utils.list import flatten_list


def _recursive_glob(dir_path, glob_pattern):
    dir_path = pathlib.Path(dir_path)
    return [str(path) for path in dir_path.rglob(glob_pattern)]


[docs] def list_paths(dir_path, glob_pattern, recursive=False): """Return a list of filepaths and directory paths.""" if not recursive: return glob.glob(os.path.join(dir_path, glob_pattern)) return _recursive_glob(dir_path, glob_pattern)
[docs] def list_files(dir_path, glob_pattern, recursive=False): """Return a list of filepaths (exclude directory paths).""" paths = list_paths(dir_path, glob_pattern, recursive=recursive) return [f for f in paths if os.path.isfile(f)]
[docs] def list_directories(dir_path, glob_pattern, recursive=False): """Return a list of filepaths (exclude directory paths).""" paths = list_paths(dir_path, glob_pattern, recursive=recursive) return [f for f in paths if os.path.isdir(f)]
########################### #### Search and filter #### ###########################
[docs] def match_extension(filename, extension=None): if extension is None: return True return filename.endswith(extension)
[docs] def match_regex_pattern(filename, pattern=None): # assume regex pattern is re.compiled ! if pattern is None: return True return re.match(pattern, filename) is not None
[docs] def match_glob_pattern(filename, pattern=None): # assume Unix shell-style wildcards if pattern is None: return True return fnmatch.fnmatch(filename, pattern)
[docs] def match_filters(filename, file_extension=None, glob_pattern=None, regex_pattern=None): return ( match_extension(filename=filename, extension=file_extension) and match_regex_pattern(filename=filename, pattern=regex_pattern) and match_glob_pattern(filename=filename, pattern=glob_pattern) )
[docs] def list_and_filter_files(path, file_extension=None, glob_pattern=None, regex_pattern=None): """Retrieve list of files (filtered by extension and custom patterns).""" with os.scandir(path) as file_it: filepaths = [ file_entry.path for file_entry in file_it if ( file_entry.is_file() and match_filters( filename=file_entry.name, file_extension=file_extension, glob_pattern=glob_pattern, regex_pattern=regex_pattern, ) ) ] return filepaths
[docs] def get_parallel_list_results(function, inputs, **kwargs): with concurrent.futures.ThreadPoolExecutor() as executor: future_dict = {executor.submit(function, i, **kwargs): i for i in inputs} results = [future.result() for future in concurrent.futures.as_completed(future_dict)] return results
[docs] def get_parallel_dict_results(function, inputs, **kwargs): results = {} with concurrent.futures.ThreadPoolExecutor() as executor: future_dict = {executor.submit(function, i, **kwargs): i for i in inputs} for future in concurrent.futures.as_completed(future_dict): i = future_dict[future] try: result = future.result() results[i] = result except Exception as e: print(f"Error while processing {i}: {e}") return results
[docs] def get_filepaths_within_paths(paths, parallel=True, file_extension=None, glob_pattern=None, regex_pattern=None): """Return a list with all filepaths within a list of directories matching the filename filtering criteria.""" if regex_pattern is not None: regex_pattern = re.compile(regex_pattern) if parallel: filepaths = get_parallel_list_results( function=list_and_filter_files, inputs=paths, file_extension=file_extension, glob_pattern=glob_pattern, regex_pattern=regex_pattern, ) else: filepaths = [ list_and_filter_files( path, file_extension=file_extension, glob_pattern=glob_pattern, regex_pattern=regex_pattern, ) for path in paths ] # Unflatten filepaths return sorted(flatten_list(filepaths))
[docs] def get_filepaths_by_path(paths, parallel=True, file_extension=None, glob_pattern=None, regex_pattern=None): """Return a dictionary with the files within each directory path matching the filename filtering criteria.""" if regex_pattern is not None: regex_pattern = re.compile(regex_pattern) if parallel: dict_partitions = get_parallel_dict_results( function=list_and_filter_files, inputs=paths, file_extension=file_extension, glob_pattern=glob_pattern, regex_pattern=regex_pattern, ) else: dict_partitions = { path: list_and_filter_files( path, file_extension=file_extension, glob_pattern=glob_pattern, regex_pattern=regex_pattern, ) for path in paths } return dict_partitions
[docs] def get_subdirectories(base_dir, path=True): """Return the name or path of the directories present in the input directory.""" with os.scandir(base_dir) as base_it: if path: list_sub_dirs = [sub_entry.path for sub_entry in base_it if sub_entry.is_dir()] else: list_sub_dirs = [sub_entry.name for sub_entry in base_it if sub_entry.is_dir()] return list_sub_dirs
def _search_leaf_directories(base_dir): """Search leaf directories paths.""" leaf_directories = [] # Search for leaf directories def scan_directory(current_dir): is_leaf = True with os.scandir(current_dir) as it: for entry in it: if entry.is_dir(): is_leaf = False scan_directory(os.path.join(current_dir, entry.name)) if is_leaf: leaf_directories.append(current_dir) scan_directory(base_dir) return leaf_directories
[docs] def search_leaf_directories(base_dir, parallel=True, remove_base_path=True): """Search leaf directories.""" if not parallel: leaf_directories = _search_leaf_directories(base_dir) else: # Find directories in the base_dir list_dirs = get_subdirectories(base_dir, path=True) # Search in parallel across subdirectories list_leaf_directories = get_parallel_list_results(function=_search_leaf_directories, inputs=list_dirs) leaf_directories = flatten_list(list_leaf_directories) # Remove base_dir path if remove_base_path: leaf_directories = [path.removeprefix(str(base_dir)).strip(os.path.sep) for path in leaf_directories] return leaf_directories
[docs] def search_leaf_files(base_dir, parallel=True, file_extension=None, glob_pattern=None, regex_pattern=None): """Search files in leaf directories.""" paths = search_leaf_directories(base_dir, parallel=parallel, remove_base_path=False) filepaths = get_filepaths_within_paths( paths, parallel=parallel, file_extension=file_extension, glob_pattern=glob_pattern, regex_pattern=regex_pattern, ) return filepaths