Source code for imsi.user_interface.ui_manager

import copy
import os
from omegaconf import OmegaConf
from pathlib import Path
from pydantic import BaseModel, field_validator
import subprocess
import shlex
import shutil
import textwrap
from typing import Dict, Iterable

from imsi.config_manager import config_manager as cm
from imsi.shell_interface import shell_interface_manager
from imsi.shell_interface.config_hooks_manager import call_hooks
from imsi.sequencer_interface.sequencers import create_sequencer
from imsi.user_interface.ui_utils import apply_options_overrides, save_setup_configuration, load_run_config, _init_runpath_link
from imsi.user_interface.exceptions import IMSIVersionMismatchError, IMSIVersionRequirementNotFoundError, SubprocessError
from imsi.utils.dict_tools import parse_vars, update, load_json
from imsi.utils.general import change_dir, is_broken_symlink, _return_with_message
from imsi.utils.git_tools import is_repo_clean


from imsi import __version__


[docs]def get_required_imsi_version(source_config_path: Path = Path("src/imsi-config"), version_req_file: str = "version_requirements.yaml"): """Get the imsi version of the source repo. Raises IMSIVersionRequirementNotFoundError if the version_requirement_file is missing. Returns a tuple of the version requirement file path and the version """ path_to_version_req_file = Path(source_config_path) / version_req_file if not path_to_version_req_file.exists(): raise IMSIVersionRequirementNotFoundError(f"Required {path_to_version_req_file} doesn't exist. Try updating your IMSI source repo.") required_version = OmegaConf.load(path_to_version_req_file)["imsi_version_requirements"] return path_to_version_req_file, required_version
[docs]def get_current_imsi_version(no_patch=True): if no_patch: return ".".join(__version__.split(".")[:2]) else: return __version__
[docs]def assert_imsi_version_match(version): """Check and return the version requirements contained in the version controlled config files. Raises if versions do not match. The imsi minor version is toggled when there are config breaking changes, as such we only require the major and minor version match. Inputs: version : version to check, usually from a requirements file Returns: Raises IMSIVersionMismatchError if versions are mismatched. Otherwise returns True. """ current = get_current_imsi_version(no_patch=True) if version != current: raise IMSIVersionMismatchError(current=current, required=version) return True
[docs]def validate_version_reqs(source_config_path: Path = Path("src/imsi-config")): """Validates if the imsi-config source repo's version works with the imsi version being used. """ req_file, required_version = get_required_imsi_version(source_config_path) try: assert_imsi_version_match(required_version) except IMSIVersionMismatchError as e: raise SystemExit(textwrap.dedent(f"""\ ERROR: IMSI VERSION MISMATCH: the Major and Minor version must match. The source repo's configuration files are setup to use -> {e.required}.* <- But the IMSI version used is -> {__version__} <- Source repo version requirements: {req_file.resolve()} To resolve this error, try: - Changing to the appropriate IMSI version, then run the command again. - Updating your repo's configuration files; for more information, see: https://imsi.readthedocs.io/en/main/config_breaking_changes.html """) )
[docs]def check_if_imsi_compliant_version(path: Path, print_message: bool = False) -> bool: try: validate_version_reqs(path) return True except (IMSIVersionMismatchError, IMSIVersionRequirementNotFoundError, SystemExit): if print_message: imsi_version = get_current_imsi_version() required_imsi_version = get_required_imsi_version(path)[1] print(f"IMSI version mismatch: current={imsi_version}, required={required_imsi_version}") return False
[docs]def create_imsi_configuration( imsi_config_path: str, setup_params: Dict ) -> (cm.Configuration, cm.ConfigDatabase): """Build and return configuration instance and config db given imsi_config_path""" if not os.path.isdir(imsi_config_path): raise FileNotFoundError( f"Could not find imsi config directory at: {imsi_config_path}" ) # create a DB / cm db = cm.database_factory(imsi_config_path) config_manager = cm.ConfigManager(db) configuration = config_manager.create_configuration(**setup_params) # Save the configuration for future editing/reference save_setup_configuration(configuration, save_config=True) # Create a configuration based on user input from setup cli return (configuration, db)
[docs]def build_run_config_on_disk( configuration: cm.Configuration, db: cm.ConfigDatabase, track=True, force=False ): """This actually creates the physical config directory on disk, and extracts/modifies various relevant files""" # Build the actual config directory with contents for this configuration shell_interface_manager.build_config_dir(db, configuration, track=track, force=force) # Do scheduler/sequencer setup sequencer = create_sequencer(configuration.setup_params.sequencer_name) sequencer.setup(configuration, force=force) sequencer.config(configuration, force=force) # other setup steps to finalize work_dir work_dir = configuration.get_unique_key_value("work_dir") _init_runpath_link(work_dir, configuration.machine.run_storage_dir) # Do other config hooks (only if constraints are met as defined in current # config, which will be checked via call_hooks) call_hooks(configuration, "post-config", force=force) if track: run_config_path = configuration.get_unique_key_value('run_config_path') # hooks may have changed contents of config folder - track these clean, _ = is_repo_clean(run_config_path) if not clean: subprocess.run(shlex.split('git add -A'), cwd=run_config_path) subprocess.run(shlex.split('git commit -q -m "IMSI: config_hooks:post-hook"'), cwd=run_config_path)
[docs]def reload_config_from_source(force=False): """ Build a new config directory from upstream imsi source This will re-extract everthing out of the cloned repository to re-create the config directory. I.e. if one made changes in the repo after setup, and wanted to apply them, they would call this update function. """ original_configuration = load_run_config() # This will rebuild the config directory completely imsi_config_path = original_configuration.get_unique_key_value("imsi_config_path") new_configuration, db = create_imsi_configuration( imsi_config_path, original_configuration.setup_params.model_dump(), ) build_run_config_on_disk(new_configuration, db, force=force)
[docs]def update_config_from_state(force=False): """Apply changes made in the "imsi_configuration_${runid}" state file to the configuration and update the config directory as appropriate. """ # 1. Save configuration from the configuration object to .imsi... # 2. Load the configuration from the .imsi... file user_facing_configuration = load_run_config(serialized=False) save_setup_configuration(user_facing_configuration, save_config=False) # no need to instantly save again state_configuration = load_run_config() # This will rebuild the config directory completely based on what is in the configuration file. # It would be good if there were nominal validity testing. db = cm.database_factory( state_configuration.get_unique_key_value("imsi_config_path") ) build_run_config_on_disk(state_configuration, db, force=force)
[docs]class Override(BaseModel): options: str
[docs] @field_validator("options") def validate_options(cls, v): if "/" not in v: raise ValueError("Option must be in the format <group>/<option>") return v
[docs]def parse_override_options(options: Iterable[str]) -> list[dict]: return [Override(options=o).model_dump() for o in options]
[docs]def parse_override(options: Iterable[str], force=False): """Parse and apply command-line option overrides to the current IMSI run configuration. Parameters: - options: An iterable of option identifier strings. Each string must contain a '/' separating the group and option name (e.g. "group/option" or "group/option=value"). - force: If True, forces rebuilding of on-disk run artifacts and other actions performed by build_run_config_on_disk even if not strictly necessary. Defaults to False. """ # get existing simulation user config (not state configuration) configuration = load_run_config(serialized=False) db = cm.database_factory(configuration.get_unique_key_value("imsi_config_path")) print(f"Overriding configuration with: {options}") options_list = parse_override_options(options) configuration_with_overrides = apply_options_overrides( config_dictionary=configuration.model_dump(), options=options_list ) new_configuration = cm.Configuration(**configuration_with_overrides) # Update the user configuration file accordingly save_setup_configuration(new_configuration, save_state=False, save_config=True)
[docs]def set_selections(parm_file=None, selections=None, force=False): """Parse key=value pairs of selection given on the command line Try to apply these to the imsi selections for the sim. """ # get existing simulation config configuration = load_run_config() db = cm.database_factory(configuration.get_unique_key_value("imsi_config_path")) config_manager = cm.ConfigManager(db) updated_setup_params = copy.deepcopy(configuration.setup_params) if parm_file: file_values = load_json( parm_file ) # would actually be more valuable for options I think, since # selections are few bu options possibly many. # This is updating the ._config dict in place updated_setup_params = update(updated_setup_params, file_values) if selections: print(f"set selections: {selections}") values = parse_vars(selections, none_as_str=False) setup_params = configuration.setup_params # selections must match imsi setup cli options to match parts of config # (some but not all allowed) updated_setup_params.model_name = values.pop('model', setup_params.model_name) updated_setup_params.experiment_name = values.pop('exp', setup_params.experiment_name) updated_setup_params.machine_name = values.pop('machine', setup_params.machine_name) updated_setup_params.compiler_name = values.pop('compiler', setup_params.compiler_name) updated_setup_params.sequencer_name = values.pop('sequencer', setup_params.sequencer_name) updated_setup_params.flow_name = values.pop('flow', setup_params.flow_name) updated_setup_params.postproc_profile = values.pop('postproc', setup_params.postproc_profile) # warn for bad selections, and don't add them to setup params # TODO: would be better to do this via cli (early) for k,v in values.items(): print(f"**WARNING**: selection '{k}={v}' not in setup params; not added to configuration") # Create a new simulation that we imbue with these properties new_configuration = config_manager.create_configuration(**updated_setup_params.model_dump()) # Update the saved configuration file accordingly # (including triggering rebuilding of /sequencer folder too, running # hooks, etc): build_run_config_on_disk(new_configuration, db, force=force) # Update the saved configuration file accordingly save_setup_configuration(new_configuration, save_config=True)
[docs]def compile_model_execs(args, force=False): """ Builds all component executables by calling an upstream script from the repository. Should be under /bin but not enforceable. """ configuration = load_run_config() work_dir = configuration.get_unique_key_value("work_dir") runid = configuration.setup_params.runid if not os.path.isdir(work_dir): raise FileNotFoundError( f"Could not find the run working directory at: {work_dir}" ) comp_script_basename = 'imsi-tmp-compile.sh' comp_script = os.path.join(work_dir, comp_script_basename) if not os.path.exists(comp_script): raise FileNotFoundError( f"Could not find compilation file {comp_script_basename} at: {work_dir}" ) # handle bin folder bin_dir = os.path.join(work_dir, 'bin') real_bin_dir = os.path.realpath(bin_dir) exe_storage_dir = configuration.machine.exe_storage_dir # path|None if exe_storage_dir is None: real_exe_storage_dir = None else: exep_exe = os.path.expandvars(os.path.realpath(exe_storage_dir)) if exep_exe == real_bin_dir: # pointing to itself exe_storage_dir = None real_exe_storage_dir = None else: # exe_storage_dir/{runid}/bin <- bin real_exe_storage_dir = os.path.join( os.path.expandvars(os.path.realpath(exe_storage_dir)), runid, 'bin' ) # nothing to build: NTB_MESSAGE = "Nothing to build. To clear the executable folder(s), try: imsi -f build" if force: # cleanup all bin folders if os.path.exists(real_bin_dir): if os.path.islink(real_bin_dir): os.path.unlink(real_bin_dir) else: shutil.rmtree(real_bin_dir) if (exe_storage_dir is not None): if os.path.exists(real_exe_storage_dir): shutil.rmtree(real_exe_storage_dir) # setup the bin folders - local (in work_dir) and storage (exe_storage_dir as the base) # and account for: # - changing between inputs of exe_storage_dir and # - re-running imsi build (preventing full builds when not needed) if (exe_storage_dir is None) or (real_bin_dir == real_exe_storage_dir): if is_broken_symlink(bin_dir): # self-pointing case os.unlink(bin_dir) if not os.path.exists(real_exe_storage_dir): os.makedirs(real_exe_storage_dir) os.symlink(real_exe_storage_dir, bin_dir) # build required elif os.path.exists(bin_dir): if os.path.islink(bin_dir) and (real_exe_storage_dir != real_bin_dir): # switch from ln to dir os.unlink(bin_dir) os.makedirs(bin_dir) # build required else: os.makedirs(bin_dir) # build required else: # linking required if is_broken_symlink(bin_dir): os.unlink(bin_dir) else: if os.path.exists(real_exe_storage_dir): if os.path.exists(bin_dir): if os.path.islink(bin_dir): return _return_with_message(NTB_MESSAGE) else: # relink (eg previously wasn't linked to storage) shutil.rmtree(bin_dir) os.symlink(real_exe_storage_dir, real_bin_dir) return _return_with_message(NTB_MESSAGE) else: if os.path.exists(real_exe_storage_dir): os.symlink(real_exe_storage_dir, real_bin_dir) return _return_with_message(NTB_MESSAGE) else: # switching or remake ln os.makedirs(real_exe_storage_dir) os.symlink(real_exe_storage_dir, real_bin_dir) # build required elif real_exe_storage_dir == real_bin_dir: return _return_with_message(NTB_MESSAGE) else: os.makedirs(real_exe_storage_dir, exist_ok=True) if os.path.islink(bin_dir): os.unlink(bin_dir) if os.path.exists(bin_dir) and os.path.isdir(bin_dir): shutil.rmtree(bin_dir) os.symlink(real_exe_storage_dir, bin_dir) proc = subprocess.Popen( [os.path.join('.', comp_script_basename)] + list(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, cwd=work_dir, ) # stream the stdout during the process for line in proc.stdout: print(line, end='') # capture/raise if error proc.wait() if proc.returncode != 0: raise SubprocessError(proc, prefix=f"Error: Compiling failed with {comp_script}:")
[docs]def submit_run(): """Instantiate the configuration object and submit job to queue""" configuration = load_run_config() setup_params = configuration.setup_params sequencer = create_sequencer(setup_params.sequencer_name) sequencer.submit(configuration)
[docs]def save_restarts(args): """Execute the save restarts script""" if Path("./save_restart_files.sh").exists(): p = subprocess.Popen(["./save_restart_files.sh"] + list(args)) p.wait() else: raise FileNotFoundError( "Could not find the save_restart_files.sh script. Are you in the correct directory?" )
[docs]def tapeload_rs(args): """Execute the tapeload rs script""" if Path("./tapeload_rs.sh").exists(): p = subprocess.Popen(["./tapeload_rs.sh"] + list(args)) p.wait() else: raise FileNotFoundError( "Could not find the tapeload_rs.sh script. Are you in the correct directory?" )
[docs]def get_setup_param(setting: str, work_dir: str | Path = None): """Get settings from setup_params in Configuration""" if work_dir is not None: # TODO: load_run_config should take explicit path with change_dir(work_dir): user_facing_configuration = load_run_config(serialized=True) else: # relative to cwd/pwd user_facing_configuration = load_run_config(serialized=True) return getattr(user_facing_configuration.setup_params, setting)
[docs]def get_init_state_folder(work_dir: str | Path, realpath: bool = True) -> str: """Return the path to the initial imsi state hash folder. Default is the true path (realpath=True, ie readlink). Otherwise the return is {work_dir}/.imsi/init/state. """ init_state_folder = Path(work_dir) / '.imsi/init/state' if not init_state_folder.exists(): raise FileNotFoundError(f"Can't find .imsi/init/state file in {work_dir}") if realpath: return init_state_folder.resolve(strict=False) return init_state_folder
[docs]def get_init_state_hash(work_dir: str | Path): """Return the initial imsi state hash, from .imsi/init/state""" path = get_init_state_folder(work_dir, realpath=True) return path.name
[docs]def get_all_state_files(state_path): """Return all the files in the state folder""" if not state_path.exists(): raise FileNotFoundError(f'state folder does not exist: {state_path}') all_state_files = list(Path(state_path).iterdir()) if not all_state_files: raise IndexError('no files found') return all_state_files
[docs]def get_stateful_folder_files(state_path, stateful_folder): """Return a mapping of files for the stateful_folder from the state folder, keys correspond to type of artefact (rev, diff, status) """ all_state_files = get_all_state_files(state_path) if not any([f.name.startswith(stateful_folder) for f in all_state_files]): raise FileNotFoundError(f"no state files found related to '{stateful_folder}' in {state_path}") src_files_map = { f.name.split('_')[-1].split('.')[0]: f for f in all_state_files if f.name.startswith(stateful_folder) } return src_files_map
[docs]def get_init_state_files(work_dir, stateful_folder: str = 'src'): """Return list of files for the stateful_folder from init state""" init_state_folder = get_init_state_folder(work_dir) return get_stateful_folder_files(init_state_folder, stateful_folder)
[docs]def get_init_src_descr(work_dir: str | Path): """Returns the contents of the initial src repo state from setup time, contained in `{work_dir}/.imsi/init/state_descriptor_src_setup.txt`. """ filename = (Path(work_dir) / '.imsi/init/state_descriptor_src_setup.txt') if not filename.exists(): raise FileNotFoundError(f'could not find {filename}') text = filename.read_text(encoding='utf-8').strip() if len(text.splitlines()) > 1: raise ValueError(f'malformed descriptor file under {filename}, expected exactly 1 line.') return text
[docs]def parse_state_components(work_dir, state, stateful_folder: str = 'src', exclude: str | list[str] = 'diff', header=True, ): """Parse the contents of the stateful_folder contents from the state folder, returned as a single string. exclude will exclude the type of state artefact (rev, status, diff) """ exclude = [] if exclude is None else exclude exclusions = [exclude] if isinstance(exclude, str) else exclude controlled_state_artefacts = ['diff', 'rev', 'status'] if set(exclusions) == set(controlled_state_artefacts): raise ValueError('no components to show') if not all([e in controlled_state_artefacts for e in exclusions]): raise ValueError(f'exclusions can only be {', '.join(controlled_state_artefacts)}') if state == 'init': statefiles = get_init_state_files(work_dir, stateful_folder=stateful_folder) else: state_path = Path(work_dir).joinpath('.imsi', 'states', state) statefiles = get_stateful_folder_files(state_path, stateful_folder) out = '' for ftype, fname in statefiles.items(): if ftype in exclusions: continue with open(fname, 'r') as f: if header: out += f'{ftype}\n' out += ''.join(f.readlines()) + '\n' return out.strip() + '\n'
[docs]def get_init_src_state(work_dir, verbose=False): """Return the initial state description for the src folder, from setup time. If verbose, returns the contents of the state artefact files for rev-parse (*rev.txt) and status (*status.txt) (separated by newline). Otherwise, returns the single-line git repo status query (hash and clean/dirty flags). """ try: if verbose: content = parse_state_components(work_dir, 'init', exclude=['diff']) else: content = get_init_src_descr(work_dir) except (ValueError, FileNotFoundError): content = None return content
[docs]def get_sequencer_status(): configuration = load_run_config() setup_params = configuration.setup_params sequencer = create_sequencer(setup_params.sequencer_name) sequencer.status(configuration, setup_params)
# WIP
[docs]def query_time(): """Instantiate the configuration / SimulationTime instances and enable querying timers""" configuration = load_run_config() sequencing_config = configuration.sequencing