Source code for imsi.utils.general

import contextlib
from datetime import datetime
import os
from pathlib import Path
import re
import shutil
import stat
import subprocess
import sys
from typing import Generator, List

[docs]def is_path(var : str) -> bool: """ Determines if the given string represents a path or a base filename. Args: var (str): The string to check, which may be a base filename or a path. Returns: bool: True if `var` includes a directory component, indicating it's a path; False if it's only a base filename. Example: is_path("woo") => False is_path("boo/woo") => True """ pth = Path(var) return pth.parent != Path(".") # '.' is returned if there is no path component
[docs]def remove_folder(path, force=False): """Removes a folder. Does not remove files. Raises OSError if the folder is not empty, unless force=True (force has no effect if the folder is empty). For folders, this mimics the difference between OS-level "rm <dir>" (force=False) vs "rm -rf <dir>" (force=True). """ if not os.path.isdir(path): return try: os.rmdir(path) except OSError as e: if e.errno == 39: if force: shutil.rmtree(path) else: raise e
[docs]def is_root_of(base_path: Path, target_path: Path, greedy: bool=False) -> bool: """Return True if base_path is a root of target_path. If the paths are the same and `greedy=True`, return True. Paths do not need to exist. """ b = Path(base_path) t = Path(target_path) try: t.relative_to(b) except ValueError: return False if t.resolve() == b.resolve(): # not p1.samefile(p2) if greedy: return True else: return False return True
[docs]def list_dir_contents(directory: Path, extensions: list=None, keep_folders: bool=True) -> list: """Returns a sorted list of the directory contents. extensions: Include only files with the extensions. keep_folders: Include folders in addition to the files from `extensions`. """ # explicitly return a list (not generator) directory = Path(directory) extensions = [extensions] if isinstance(extensions, str) else extensions all_contents = list(sorted(directory.rglob('*'))) def include(p): if p.is_dir(): return keep_folders if extensions is None: return True return p.suffix in extensions return [p for p in all_contents if include(p)]
[docs]def write_shell_string(file_path: str, script_content: str, mode="w", make_executable=False): """Write string to shell script preserving newlines""" with open(file_path, mode) as f: f.write(script_content) if make_executable: st = os.stat(file_path) os.chmod(file_path, st.st_mode | stat.S_IEXEC)
[docs]def get_active_venv(): # Check if the VIRTUAL_ENV environment variable is set venv_path = os.getenv('VIRTUAL_ENV') if venv_path: return venv_path else: # Fallback: Check if sys.prefix is not the same as sys.base_prefix (which indicates a venv is active) if sys.prefix != sys.base_prefix: return sys.prefix else: return None
[docs]def get_pip_freeze(exe=None): """Return a list of currently installed packages and their exact versions""" if exe is None: exe = sys.executable try: result = subprocess.run( [exe, '-m', 'pip', 'freeze'], capture_output=True, text=True, check=True ) except subprocess.CalledProcessError as e: raise e return(result.stdout)
[docs]def write_shell_script(file_path: str, script_content: List[str], mode='w', make_executable=False): """Write shell script content to a file.""" with open(file_path, mode) as f: f.write('\n'.join(script_content)) if make_executable: st = os.stat(file_path) os.chmod(file_path, st.st_mode | stat.S_IEXEC)
[docs]def get_date_string(): """Return a datestring of now time""" now = datetime.now() # current date and time return now.strftime("%Y-%m-%d %H:%M")
[docs]def delete_or_abort(path): """Asks a user for input to abort or delete and replace an existing directory. """ choice = input(f'{path} already exists: Abort (a) or replace (r)?') if choice == "a": print( "Exiting") exit() elif choice == "r": if os.path.islink(path): print(f"Deleting {path}") os.unlink(path) elif os.path.isdir(path): print(f"Deleting {path}") shutil.rmtree(path) # no case for file -- needed? else: print("Invalid input! Choices are 'a' or 'r'") delete_or_abort(path)
[docs]def yes_or_no(question, compact=True): """Prompts a user if they want to proceed (y) or not (n) given the prompt (the question). """ _options = {'n': 'no', 'y': 'yes'} if compact: option_string = '[{}]'.format('/'.join(_options)) else: option_string = ' '.join([f'{v} ({k})' for k,v in _options.items()]) choice = input(f'{question}: {option_string} ') if choice == "n": print( "Exiting") exit() elif choice == "y": pass else: option_keys = ', '.join([f"'{k}'" for k in _options.keys()]) print(f"Invalid input! Choices are: {option_keys}") yes_or_no(question)
def _return_with_message(message, value=None): print(message) return value def _get_memory_unit_factors_to_bytes(base=2): # conversion factors for memory unit to bytes if base == 2: return {'K': 1 << 10, 'M': 1 << 20, 'G': 1 << 30} elif base == 10: return {'K': 1e3, 'M': 1e6, 'G': 1e9} else: raise ValueError('base must be 2 or 10')
[docs]def parse_memory_string_to_bytes(memory : str, base=2): """Parses a string that specifies memory value and unit and returns the value in bytes. Parameters memory : a string composed of the value and units, where units are denoted as KB, MB, or GB (case insensitive, with or without trailing 'b'). Space is permitted between the value and units. base : either 2 or 10, denoting the conversion from the input units to bytes. Use with caution. Default 2. Returns value of the information in bytes Example >>> parse_memory_string_to_bytes("1 kb") 1024 >>> parse_memory_string_to_bytes("10GB") 10737418240 """ unit_map = _get_memory_unit_factors_to_bytes(base) M = memory.upper() valid = re.search(r"([0-9]+)\s?([KMG]B?\b)", M) if valid is None: raise ValueError(f"unsupported memory format for '{memory}'; units must be one of {{K, G, M, KB, GB, MB}} (case insensitive)") value = int(valid.group(1)) units = valid.group(2).rstrip('B') nbytes = int(value * unit_map[units]) return nbytes
[docs]@contextlib.contextmanager def change_dir(path: Path) -> Generator: """Temporarily changes the working context to the given path.""" cwd = os.getcwd() os.chdir(path) try: yield finally: os.chdir(cwd)