import contextlib
from datetime import datetime
import os
from pathlib import Path
import re
import shutil
import stat
import subprocess
import sys
from typing import Generator, List
[docs]def is_path(var : str) -> bool:
"""
Determines if the given string represents a path or a base filename.
Args:
var (str): The string to check, which may be a base filename or a path.
Returns:
bool: True if `var` includes a directory component, indicating it's a path;
False if it's only a base filename.
Example:
is_path("woo") => False
is_path("boo/woo") => True
"""
pth = Path(var)
return pth.parent != Path(".") # '.' is returned if there is no path component
[docs]def remove_folder(path, force=False):
"""Removes a folder. Does not remove files.
Raises OSError if the folder is not empty, unless force=True
(force has no effect if the folder is empty).
For folders, this mimics the difference between OS-level
"rm <dir>" (force=False) vs "rm -rf <dir>" (force=True).
"""
if not os.path.isdir(path):
return
try:
os.rmdir(path)
except OSError as e:
if e.errno == 39:
if force:
shutil.rmtree(path)
else:
raise e
[docs]def is_root_of(base_path: Path, target_path: Path, greedy: bool=False) -> bool:
"""Return True if base_path is a root of target_path.
If the paths are the same and `greedy=True`, return True.
Paths do not need to exist.
"""
b = Path(base_path)
t = Path(target_path)
try:
t.relative_to(b)
except ValueError:
return False
if t.resolve() == b.resolve(): # not p1.samefile(p2)
if greedy:
return True
else:
return False
return True
[docs]def is_broken_symlink(path):
"""Return True if path provided is a broken symlink"""
if os.path.islink(path):
# if it's a symlink, check if its target exists
if not os.path.exists(path):
return True
return False
[docs]def list_dir_contents(directory: Path, extensions: list=None, keep_folders: bool=True) -> list:
"""Returns a sorted list of the directory contents.
extensions:
Include only files with the extensions.
keep_folders:
Include folders in addition to the files from `extensions`.
"""
# explicitly return a list (not generator)
directory = Path(directory)
extensions = [extensions] if isinstance(extensions, str) else extensions
all_contents = list(sorted(directory.rglob('*')))
def include(p):
if p.is_dir():
return keep_folders
if extensions is None:
return True
return p.suffix in extensions
return [p for p in all_contents if include(p)]
[docs]def write_shell_string(file_path: str, script_content: str, mode="w", make_executable=False):
"""Write string to shell script preserving newlines"""
with open(file_path, mode) as f:
f.write(script_content)
if make_executable:
st = os.stat(file_path)
os.chmod(file_path, st.st_mode | stat.S_IEXEC)
[docs]def get_active_venv():
# Check if the VIRTUAL_ENV environment variable is set
venv_path = os.getenv('VIRTUAL_ENV')
if venv_path:
return venv_path
else:
# Fallback: Check if sys.prefix is not the same as sys.base_prefix (which indicates a venv is active)
if sys.prefix != sys.base_prefix:
return sys.prefix
else:
return None
[docs]def get_pip_freeze(exe=None):
"""Return a list of currently installed packages and their exact versions"""
if exe is None:
exe = sys.executable
try:
result = subprocess.run(
[exe, '-m', 'pip', 'freeze'],
capture_output=True, text=True, check=True
)
except subprocess.CalledProcessError as e:
raise e
return(result.stdout)
[docs]def write_shell_script(file_path: str, script_content: List[str], mode='w', make_executable=False):
"""Write shell script content to a file."""
with open(file_path, mode) as f:
f.write('\n'.join(script_content))
if make_executable:
st = os.stat(file_path)
os.chmod(file_path, st.st_mode | stat.S_IEXEC)
[docs]def get_date_string():
"""Return a datestring of now time"""
now = datetime.now() # current date and time
return now.strftime("%Y-%m-%d %H:%M")
[docs]def delete_or_abort(path):
"""Asks a user for input to abort or delete and replace an existing directory.
"""
choice = input(f'{path} already exists: Abort (a) or replace (r)?')
if choice == "a":
print( "Exiting")
exit()
elif choice == "r":
if os.path.islink(path):
print(f"Deleting {path}")
os.unlink(path)
elif os.path.isdir(path):
print(f"Deleting {path}")
shutil.rmtree(path)
# no case for file -- needed?
else:
print("Invalid input! Choices are 'a' or 'r'")
delete_or_abort(path)
[docs]def yes_or_no(question, compact=True):
"""Prompts a user if they want to proceed (y) or not (n) given the prompt
(the question).
"""
_options = {'n': 'no', 'y': 'yes'}
if compact:
option_string = '[{}]'.format('/'.join(_options))
else:
option_string = ' '.join([f'{v} ({k})' for k,v in _options.items()])
choice = input(f'{question}: {option_string} ')
if choice == "n":
print( "Exiting")
exit()
elif choice == "y":
pass
else:
option_keys = ', '.join([f"'{k}'" for k in _options.keys()])
print(f"Invalid input! Choices are: {option_keys}")
yes_or_no(question)
def _return_with_message(message, value=None):
print(message)
return value
def _get_memory_unit_factors_to_bytes(base=2):
# conversion factors for memory unit to bytes
if base == 2:
return {'K': 1 << 10, 'M': 1 << 20, 'G': 1 << 30}
elif base == 10:
return {'K': 1e3, 'M': 1e6, 'G': 1e9}
else:
raise ValueError('base must be 2 or 10')
[docs]def parse_memory_string_to_bytes(memory : str, base=2):
"""Parses a string that specifies memory value and unit and
returns the value in bytes.
Parameters
memory : a string composed of the value and units, where units
are denoted as KB, MB, or GB (case insensitive, with or
without trailing 'b'). Space is permitted between the value
and units.
base : either 2 or 10, denoting the conversion from the input
units to bytes. Use with caution. Default 2.
Returns
value of the information in bytes
Example
>>> parse_memory_string_to_bytes("1 kb")
1024
>>> parse_memory_string_to_bytes("10GB")
10737418240
"""
unit_map = _get_memory_unit_factors_to_bytes(base)
M = memory.upper()
valid = re.search(r"([0-9]+)\s?([KMG]B?\b)", M)
if valid is None:
raise ValueError(f"unsupported memory format for '{memory}'; units must be one of {{K, G, M, KB, GB, MB}} (case insensitive)")
value = int(valid.group(1))
units = valid.group(2).rstrip('B')
nbytes = int(value * unit_map[units])
return nbytes
[docs]@contextlib.contextmanager
def change_dir(path: Path) -> Generator:
"""Temporarily changes the working context to the given path."""
cwd = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(cwd)