"""This module provides classes and utilities for managing configurations,
including models, experiments, machines, compilers, and their interactions.
The module includes classes for defining configuration elements such as
Model, Experiment, Machine, and Compiler. Additionally, it provides a
Configuration class for composing these elements into a complete configuration.
ConfigManager is a class used to establish configuration objects and
facilitate saving and loading configurations for later use. The module also
includes abstract classes such as ConfigDatabase for defining an abstract
interface for a configuration database and its concrete implementation JsonConfigDatabase.
Lastly, the module contains Factory classes (eg. CompilerFactory),
which provide methods for creating instances of analogous classes (eg. Compiler).
The module is designed to support flexibility in configuration management,
allowing configurations to be composed, serialized, and deserialized.
This module is a WIP. It is next required to create infrastructure
modules that use the configurations created here.
Neil Swart, April 2024
"""
# TODO:
# - Add a setup / user config (selections incoming from setup / subsequent "config" calls)
# - Perhaps represent/process shell_parameters more explicitly
from typing import Dict
import copy
import json
from omegaconf import OmegaConf
import pickle
from pathlib import Path
from pydantic import BaseModel
import yaml
from imsi.utils.dict_tools import (
parse_config_inheritance,
update,
recursive_lookup,
replace_curlies_in_dict,
load_config_file,
)
from imsi.config_manager.databases import ConfigDatabase, JsonConfigDatabase, YAMLConfigDatabase
from imsi.config_manager.schema.machine import Machine
from imsi.config_manager.schema.model import Model
from imsi.config_manager.schema.experiment import Experiment
from imsi.config_manager.schema.components import Components
from imsi.config_manager.schema.compiler import Compiler
from imsi.config_manager.schema.sequencing import Sequencing
from imsi.config_manager.schema.post_processing import PostProcessing
from imsi.config_manager.schema.setup_params import SetupParams
from imsi.config_manager.schema.utilities import Utilities
[docs]def database_factory(
imsi_config_path: str, config_type: str = "yaml"
) -> ConfigDatabase:
"""Factory function to create a ConfigDatabase instance based on the config_type"""
config_types = {
"json": JsonConfigDatabase(imsi_config_path),
"yaml": YAMLConfigDatabase(imsi_config_path),
}
try:
return config_types[config_type]
except KeyError:
raise ValueError(f"Invalid config_type: {config_type}")
[docs]class CompilerFactory:
"""Class containing methods to instantiate an instance of Compiler.
Like for machines, several parsing functions are required and encapsulated here.
"""
[docs] @staticmethod
def create_from_database(
db: ConfigDatabase, machine: Machine, compiler_name: str = None
):
compiler_name = compiler_name or machine.default_compiler
machine.validate_compiler(compiler_name=compiler_name)
compiler_config = db.get_parsed_config("compilers", compiler_name)
return Compiler(name=compiler_name, **compiler_config)
[docs]class SequencingFactory:
"""Class containing methods to instantiate an instance of Sequencing.
Like for machines, several parsing functions are required and encapsulated here.
"""
[docs] @staticmethod
def create_from_database(
db: ConfigDatabase,
machine: Machine,
sequencer_name: str,
flow_name: str = None,
model_type: str = None,
):
"""Create a sequencing instance given the config db, the machine, sequencer and flow names"""
sequencing_config = {} # To build up here
sequencing_configs = db.get_config("sequencing") # Everything from the DB
# Validity checks
SequencingFactory.verify_sequencing_structure(sequencing_configs)
# Run dates
sequencing_config["run_dates"] = sequencing_configs["run_dates"]
# This is the sequencer specific content (encapsulate in a function)
sequencing_config["sequencer"] = SequencingFactory.set_sequencer_config(
sequencer_name=sequencer_name,
sequencers=sequencing_configs["sequencers"],
machine_name=machine.name,
)
# Set default flow if it is not defined
flow_name = flow_name or SequencingFactory.determine_default_flow(
model_type,
sequencer_name,
sequencing_config["sequencer"].get("baseflows"),
flows=sequencing_configs["sequencing_flow"],
machine=machine,
)
sequencing_config["sequencing_flow"] = SequencingFactory.set_flow_config(
flow_name=flow_name,
flows=sequencing_configs["sequencing_flow"],
sequencer_name=sequencer_name,
sequencer_config=sequencing_config["sequencer"],
machine=machine,
model_type=model_type,
)
# Resolve sequencer config. It is a bit messy at there is some iterative dependency resolving between sequencer and flow
# flow_name.split('-')[0] is extracting the baseflow using the name convention for flows.
try:
sequencing_config["sequencer"]["baseflows"] = sequencing_config[
"sequencer"
]["baseflows"][model_type][flow_name.split("-")[0]]
except:
raise KeyError(
"Error is setting sequencer_config, using the flow name: {flow_name}. Is the flow_name valid?"
)
return Sequencing(**sequencing_config)
[docs] @staticmethod
def verify_sequencing_structure(sequencing_config: dict):
"""
Checks what we got from the db includes mandatory sections
(replaceable by schema validation?)
"""
required_keys = ["run_dates", "sequencing_flow", "sequencers"]
for key in required_keys:
if key not in sequencing_config.keys():
raise KeyError(
f"The key {key} is not in the 'sequencing' configuration "
"provided, but is a required element"
)
[docs] @staticmethod
def determine_default_flow(
model_type: str,
sequencer_name: str,
sequencer_baseflows: dict,
flows,
machine: Machine,
):
"""Get the default sequencing flow, which handles the selected model_type, and
also has configuration support for the selected sequencer and machine.
Parameters:
model_type (str): The model configuration, e.g. ESM, AMIP or OMIP
"""
if model_type not in sequencer_baseflows:
supported_model_type = ", ".join(sequencer_baseflows.keys())
raise KeyError(
f"The selected model configuration: {model_type} is not supported by "
f"available workflows of the selected sequencer: {sequencer_name}. "
f"Supported model configurations are: {supported_model_type}"
)
# All baseflows for this sequencer, and this model_type
base_flows = sequencer_baseflows[model_type].keys()
# The platform/machine specific implementation of baseflows is denoted with a suffix on baseflow
# This will return the first baseflow/machine specific flow in the list (which is constrain to be
# only those suppoting the specific model_type)
for baseflow in base_flows:
machine_specific_flow = f"{baseflow}-{machine.default_sequencing_suffix}" # convention for the naming
if machine_specific_flow in flows:
return machine_specific_flow
supported_base_flows = ", ".join(base_flows)
raise KeyError(
f"Could not determine a default sequencing flow for sequencer: {sequencer_name} "
f"on machine: {machine.name}/{machine.default_sequencing_suffix} that supports model_type: {model_type}. "
f"Support baseflows found for {model_type} are: {supported_base_flows}. "
f"Machine specific version(s) for {machine.default_sequencing_suffix} are "
"not implemented but required."
)
[docs] @staticmethod
def set_sequencer_config(sequencer_name: str, sequencers: dict, machine_name: str):
# This is the sequencer specific content (encapsulate in a function)
if sequencer_name not in sequencers:
supported_sequencers = ", ".join(sequencers.keys())
raise KeyError(
f"Selected sequencer {sequencer_name} not in list of configured sequencers {supported_sequencers}"
)
sequencer_config = parse_config_inheritance(sequencers, sequencer_name)
# Validation
if "supported_machines" not in sequencer_config:
raise KeyError(
f"No 'supported_machines' field in sequencer definition for {sequencer_name}"
)
if machine_name not in sequencer_config.get("supported_machines"):
supported_machines = ", ".join(sequencer_config.get("supported_machines"))
raise KeyError(
f"Machine {machine_name} not listed as supported by sequencer {sequencer_name}. "
f"Supported machines for {sequencer_name} are {supported_machines}. "
"Either change machine or sequencer."
)
if not sequencer_config or "baseflows" not in sequencer_config:
raise ValueError(
f"The sequencer config for {sequencer_name} does not contain a 'baseflows' definition"
)
return sequencer_config
[docs] @staticmethod
def set_flow_config(
flow_name: str,
flows: dict,
sequencer_name: str,
sequencer_config: dict,
machine: Machine,
model_type: str,
):
if flow_name not in flows:
supported_flows = ", ".join(flows.keys())
raise KeyError(
f"Selected sequencing flow {flow_name} not in list of configured flows:"
f"{supported_flows}"
)
flow_config = parse_config_inheritance(flows, flow_name)
# Validation
if not flow_config and "base_flow" in flow_config:
raise ValueError(
"The flow configuration for flow {flow_name} is not valid "
"or does not contain the required 'base_flow'"
)
base_flow = flow_config.get("base_flow")
if base_flow not in sequencer_config["baseflows"][model_type]:
supported_base_flows = ", ".join(
sequencer_config["baseflows"][model_type].keys()
)
raise ValueError(
f"The sequencer config for {sequencer_name} does not contain a "
f"flow definition for flow: {flow_name}. Supported (base) flows are: {supported_base_flows}"
)
return flow_config
[docs]class PostprocFactory:
"""Class containing methods to instantiate an instance of Postprocessing.
Fetch default from Experiment object (preferred) or Model object (backup)
if postproc is not set by the user.
"""
[docs] @staticmethod
def create_from_database(
db: ConfigDatabase,
model: Model,
experiment: Experiment,
postproc_name: str = None,
):
if (
postproc_name is None or postproc_name == ""
): # FIXME temp check '', refactor create_configuration required
postproc_name = PostprocFactory.get_default_postproc(model, experiment)
postproc_config = db.get_parsed_config("post-processing", postproc_name)
return PostProcessing(**postproc_config)
[docs] @staticmethod
def get_default_postproc(model, experiment) -> str:
"""Retrieve the default postproc_profile"""
if "postproc_profile" in experiment.model_dump():
return experiment.postproc_profile
elif "postproc_profile" in model.model_dump():
return model.postproc_profile
else:
raise ValueError(
f"No default postproc_profile defined for {experiment.name} or {model.name}"
)
[docs]class ExperimentFactory:
"""Class containing methods to instantiate an instance of Experiment.
On particular check that the model and experiment are consistent
"""
[docs] @staticmethod
def create_from_database(db: ConfigDatabase, experiment_name: str, model_name: str):
"""Create a sequencing instance given the config db, the machine, sequencer and flow names"""
# An issue with this check here is that if a users changes the model and does
# imsi config, this will not be triggered, but it could be invalid (should only be done
# via imsi set)
experiment_data = db.get_parsed_config("experiments", experiment_name)
experiment = Experiment(name=experiment_name, **experiment_data)
experiment.validate_model_name(model_name)
return experiment
[docs]class Configuration(BaseModel):
"""Container class that combines sub-configurations and serves as the goto reference
defining the configuration of a simulation.
"""
# maybe better to include model (header), experiment (header) and a new component object (merged)??
model: Model
experiment: Experiment
components: Components
machine: Machine
compiler: Compiler
postproc: PostProcessing
setup_params: SetupParams
utilities: Utilities
sequencing: Sequencing
# Add "output templates" of which "shell_params" could be one?
# This would avoid needing the DB downstream in shell_interface, and make the
# Configuration complete.
[docs] def model_post_init(self, __context):
"""
Used to specifically update defaults only for specific configs
"""
# For sequencing, we know we want to fill default time parameters
# with those from experiment.
# We are handling similar replacements in shell_parameters in the shell
# config. However, generalizing templates might be important (see #22)
sequencing_dict = self.sequencing.model_dump()
sequencing_dict = replace_curlies_in_dict(sequencing_dict, self.model_dump())
self.sequencing = Sequencing(**sequencing_dict)
# after model is validated, instantiate without components
self.model = Model(
**self.model.model_dump(exclude={"components"})
)
# after experiment is validated, instantiate without components
self.experiment = Experiment(
**self.experiment.model_dump(exclude={"components"})
)
[docs] def get_unique_key_value(self, key: str):
"""Search recursively through the nested dicts of the configuration to try and find a specified key
and return its value if the key is unique. If mulitple instances of they key exist, return an error.
"""
# This is searching the whole imsi config for a match for key (variable)
result = set(list(recursive_lookup(key, self.model_dump())))
if len(result) != 1:
# No results or no unique results
# No unique results is a major challenge. But resolving this would require specifying more information
# than just {{variable}}. For example, something like "input_files" which appears in the configs for
# each model would not be unique. So far, we only need to search for uniquely defined values.
raise ValueError(f"Could not find a unique imsi definition of {key}")
else:
return result.pop()
[docs]class ConfigManager:
"""This class is used to establish configuration objects, as well as save/load them for later use"""
# Injecting the DB is good, but it might be useful to initialize it inline.
def __init__(self, db: ConfigDatabase = None):
self.db = db
[docs] def create_configuration(
self,
model_name: str,
experiment_name: str,
machine_name: str = "",
compiler_name: str = "",
sequencer_name: str = "",
flow_name: str = "",
postproc_profile: str = "",
**kwargs,
):
"""Create the individual instances of config elements and return a configuration composed of these"""
# Capture all key=value pairs passed, filtering out "self" and kwargs (else it would be nested below a kwargs key)
setup_params = {
key: value
for key, value in locals().items()
if key != "self" and key != "kwargs"
}
# Merge kwargs into setup_params, without being nested
setup_params.update(kwargs)
if self.db is None:
raise RuntimeError("imsi ConfigManager has no database")
experiment = self.create_experiment(experiment_name, model_name)
model = self.create_model(model_name)
machine = self.create_machine(machine_name)
components = self.create_components(model, experiment)
compiler = self.create_compiler(machine, compiler_name)
setup_params = self.create_SetupParams(setup_params, machine) # improvable
utilities = self.create_utilities()
sequencing = self.create_sequencing(
machine, setup_params.sequencer_name, flow_name, experiment
)
postproc = self.create_postproc(model, experiment, postproc_profile)
return Configuration(
model=model,
experiment=experiment,
components=components,
machine=machine,
compiler=compiler,
postproc=postproc,
setup_params=setup_params,
utilities=utilities,
sequencing=sequencing,
)
[docs] def create_experiment(self, experiment_name: str, model_name: str) -> Experiment:
return ExperimentFactory.create_from_database(
self.db, experiment_name, model_name
)
[docs] def create_model(self, model_name: str) -> Model:
model_data = self.db.get_parsed_config("models", model_name)
return Model(name=model_name, **model_data)
[docs] def create_components(
self, model: Model, experiment: Experiment
) -> Components:
# deep merge the components of the experiment and model
# experiment overrides model when params are shared
components = update(model.components, experiment.components)
# Create the merged components object
return Components(**components)
[docs] def create_machine(self, machine_name: str = None) -> Machine:
return Machine.create_from_database(self.db, machine_name)
[docs] def create_compiler(self, machine: Machine, compiler_name: str = None) -> Compiler:
return CompilerFactory.create_from_database(self.db, machine, compiler_name)
[docs] def create_postproc(
self, model: Model, experiment: Experiment, postproc_profile: str
) -> PostProcessing:
return PostprocFactory.create_from_database(
self.db, model, experiment, postproc_profile
)
[docs] def create_SetupParams(self, setup_params: Dict, machine: Machine) -> SetupParams:
if "sequencer_name" in setup_params:
if not setup_params["sequencer_name"]: # It is empty
setup_params["sequencer_name"] = machine.get_default_sequencer()
return SetupParams(**setup_params)
[docs] def create_utilities(self) -> Utilities:
utilities_data = copy.deepcopy(self.db.get_config("utility_config"))
return Utilities(**utilities_data)
[docs] def create_sequencing(
self,
machine: Machine,
sequencer_name: str,
flow_name: str,
experiment: Experiment,
) -> Sequencing:
return SequencingFactory.create_from_database(
self.db,
machine=machine,
sequencer_name=sequencer_name,
flow_name=flow_name,
model_type=experiment.model_type,
)
[docs] @classmethod
def save_configuration(self, configuration: Configuration, filepath: str):
"""Save the configuration to a file"""
dumped = configuration.model_dump()
if not dumped:
raise ValueError("Configuration is empty, nothing to save")
with open(filepath, 'w') as f:
# preserve order with sort_keys=False
yaml.dump(dumped, f, default_flow_style=False, sort_keys=False)
[docs] def load_state(self, filepath) -> Configuration:
"""Load the configuration state from a file"""
with open(filepath, "rb") as file:
return pickle.load(file)
[docs] @classmethod
def save_state(self, configuration: Configuration, filepath: str):
"""Pickle the configuration object"""
with open(filepath, 'wb') as f:
pickle.dump(configuration, f)
# Simple de-serialization
[docs] def load_configuration(self, filepath) -> Configuration:
"""Load the configuration from a file"""
with open(filepath, 'rb') as f:
cfg = yaml.safe_load(f)
return Configuration(**cfg)