diff --git a/OMPython/ModelicaSystem.py b/OMPython/ModelicaSystem.py index acdd41da..5c31fcd4 100644 --- a/OMPython/ModelicaSystem.py +++ b/OMPython/ModelicaSystem.py @@ -31,6 +31,7 @@ OMPathABC, OMSessionABC, + OMSessionRunner, ) # define logger using the current module name as ID @@ -2232,6 +2233,12 @@ def get_session(self) -> OMSessionABC: """ return self._mod.get_session() + def get_resultpath(self) -> OMPathABC: + """ + Get the path there the result data is saved. + """ + return self._resultpath + def prepare(self) -> int: """ Prepare the DoE by evaluating the parameters. Each structural parameter requires a new instance of @@ -2583,3 +2590,98 @@ class ModelicaSystemDoE(ModelicaDoEOMC): """ Compatibility class. """ + + +class ModelicaSystemRunner(ModelicaSystemABC): + """ + Class to simulate a Modelica model using a pre-compiled model binary. + """ + + def __init__( + self, + work_directory: Optional[str | os.PathLike] = None, + session: Optional[OMSessionABC] = None, + ) -> None: + if session is None: + session = OMSessionRunner() + + if not isinstance(session, OMSessionRunner): + raise ModelicaSystemError("Only working if OMCsessionDummy is used!") + + super().__init__( + work_directory=work_directory, + session=session, + ) + + def setup( + self, + model_name: Optional[str] = None, + variable_filter: Optional[str] = None, + ) -> None: + """ + Needed definitions to set up the runner class. This class expects the model (defined by model_name) to exists + within the working directory. At least two files are needed: + + * model executable (as '' or '.exe'; in case of Windows additional '.bat' + is expected to evaluate the path to needed dlls + * the model initialization file (as '_init.xml') + """ + + if self._model_name is not None: + raise ModelicaSystemError("Can not reuse this instance of ModelicaSystem " + f"defined for {repr(self._model_name)}!") + + if model_name is None or not isinstance(model_name, str): + raise ModelicaSystemError("A model name must be provided!") + + # set variables + self._model_name = model_name # Model class name + self._variable_filter = variable_filter + + # test if the model can be executed + self.check_model_executable() + + # read XML file + xml_file = self._session.omcpath(self.getWorkDirectory()) / f"{self._model_name}_init.xml" + self._xmlparse(xml_file=xml_file) + + +class ModelicaDoERunner(ModelicaDoEABC): + """ + Class to run DoEs based on a (Open)Modelica model using ModelicaSystemRunner + + The example is the same as defined for ModelicaDoEABC + """ + + def __init__( + self, + # ModelicaSystem definition to use + mod: ModelicaSystemABC, + # simulation specific input + # TODO: add more settings (simulation options, input options, ...) + simargs: Optional[dict[str, Optional[str | dict[str, str] | numbers.Number]]] = None, + # DoE specific inputs + resultpath: Optional[str | os.PathLike] = None, + parameters: Optional[dict[str, list[str] | list[int] | list[float]]] = None, + ) -> None: + if not isinstance(mod, ModelicaSystemABC): + raise ModelicaSystemError(f"Invalid definition for ModelicaSystem*: {type(mod)}!") + + super().__init__( + mod=mod, + simargs=simargs, + resultpath=resultpath, + parameters=parameters, + ) + + def _prepare_structure_parameters( + self, + idx_pc_structure: int, + pc_structure: Tuple, + param_structure: dict[str, list[str] | list[int] | list[float]], + ) -> dict[str, str | int | float]: + if len(param_structure.keys()) > 0: + raise ModelicaSystemError(f"{self.__class__.__name__} can not handle structure parameters as it uses a " + "pre-compiled binary of model.") + + return {} diff --git a/OMPython/OMCSession.py b/OMPython/OMCSession.py index 91115061..79f8d16b 100644 --- a/OMPython/OMCSession.py +++ b/OMPython/OMCSession.py @@ -289,6 +289,8 @@ class OMPathCompatibilityWindows(pathlib.WindowsPath, OMPathCompatibility): OMPathABC = OMPathCompatibility OMCPath = OMPathCompatibility + OMPathRunnerABC = OMPathCompatibility + OMPathRunnerLocal = OMPathCompatibility else: class OMPathABC(pathlib.PurePosixPath, metaclass=abc.ABCMeta): """ @@ -305,7 +307,13 @@ def __init__(self, *path, session: OMSessionABC) -> None: super().__init__(*path) self._session = session - def with_segments(self, *pathsegments): + def get_session(self) -> OMSessionABC: + """ + Get session definition used for this instance of OMPath. + """ + return self._session + + def with_segments(self, *pathsegments) -> OMPathABC: """ Create a new OMCPath object with the given path segments. @@ -326,7 +334,7 @@ def is_dir(self) -> bool: """ @abc.abstractmethod - def is_absolute(self): + def is_absolute(self) -> bool: """ Check if the path is an absolute path. """ @@ -338,13 +346,13 @@ def read_text(self) -> str: """ @abc.abstractmethod - def write_text(self, data: str): + def write_text(self, data: str) -> int: """ Write text data to the file represented by this path. """ @abc.abstractmethod - def mkdir(self, parents: bool = True, exist_ok: bool = False): + def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: """ Create a directory at the path represented by this class. @@ -354,7 +362,7 @@ def mkdir(self, parents: bool = True, exist_ok: bool = False): """ @abc.abstractmethod - def cwd(self): + def cwd(self) -> OMPathABC: """ Returns the current working directory as an OMPathABC object. """ @@ -366,12 +374,12 @@ def unlink(self, missing_ok: bool = False) -> None: """ @abc.abstractmethod - def resolve(self, strict: bool = False): + def resolve(self, strict: bool = False) -> OMPathABC: """ Resolve the path to an absolute path. """ - def absolute(self): + def absolute(self) -> OMPathABC: """ Resolve the path to an absolute path. Just a wrapper for resolve(). """ @@ -399,29 +407,38 @@ def is_file(self) -> bool: """ Check if the path is a regular file. """ - return self._session.sendExpression(expr=f'regularFileExists("{self.as_posix()}")') + retval = self.get_session().sendExpression(expr=f'regularFileExists("{self.as_posix()}")') + if not isinstance(retval, bool): + raise OMCSessionException(f"Invalid return value for is_file(): {retval} - expect bool") + return retval def is_dir(self) -> bool: """ Check if the path is a directory. """ - return self._session.sendExpression(expr=f'directoryExists("{self.as_posix()}")') + retval = self.get_session().sendExpression(expr=f'directoryExists("{self.as_posix()}")') + if not isinstance(retval, bool): + raise OMCSessionException(f"Invalid return value for is_dir(): {retval} - expect bool") + return retval - def is_absolute(self): + def is_absolute(self) -> bool: """ - Check if the path is an absolute path. + Check if the path is an absolute path. Special handling to differentiate Windows and Posix definitions. """ if isinstance(self._session, OMCSessionLocal) and platform.system() == 'Windows': return pathlib.PureWindowsPath(self.as_posix()).is_absolute() - return super().is_absolute() + return pathlib.PurePosixPath(self.as_posix()).is_absolute() def read_text(self) -> str: """ Read the content of the file represented by this path as text. """ - return self._session.sendExpression(expr=f'readFile("{self.as_posix()}")') + retval = self.get_session().sendExpression(expr=f'readFile("{self.as_posix()}")') + if not isinstance(retval, str): + raise OMCSessionException(f"Invalid return value for read_text(): {retval} - expect str") + return retval - def write_text(self, data: str): + def write_text(self, data: str) -> int: """ Write text data to the file represented by this path. """ @@ -433,7 +450,7 @@ def write_text(self, data: str): return len(data) - def mkdir(self, parents: bool = True, exist_ok: bool = False): + def mkdir(self, parents: bool = True, exist_ok: bool = False) -> None: """ Create a directory at the path represented by this class. @@ -444,14 +461,15 @@ def mkdir(self, parents: bool = True, exist_ok: bool = False): if self.is_dir() and not exist_ok: raise FileExistsError(f"Directory {self.as_posix()} already exists!") - return self._session.sendExpression(expr=f'mkdir("{self.as_posix()}")') + if not self._session.sendExpression(expr=f'mkdir("{self.as_posix()}")'): + raise OMCSessionException(f"Error on directory creation for {self.as_posix()}!") - def cwd(self): + def cwd(self) -> OMPathABC: """ Returns the current working directory as an OMPathABC object. """ cwd_str = self._session.sendExpression(expr='cd()') - return OMCPath(cwd_str, session=self._session) + return type(self)(cwd_str, session=self._session) def unlink(self, missing_ok: bool = False) -> None: """ @@ -461,7 +479,7 @@ def unlink(self, missing_ok: bool = False) -> None: if not res and not missing_ok: raise FileNotFoundError(f"Cannot delete file {self.as_posix()} - it does not exists!") - def resolve(self, strict: bool = False): + def resolve(self, strict: bool = False) -> OMPathABC: """ Resolve the path to an absolute path. This is done based on available OMC functions. """ @@ -492,8 +510,10 @@ def _omc_resolve(self, pathstr: str) -> str: 'cd(omcpath_cwd)') try: - result = self._session.sendExpression(expr=expr, parsed=False) - result_parts = result.split('\n') + retval = self.get_session().sendExpression(expr=expr, parsed=False) + if not isinstance(retval, str): + raise OMCSessionException(f"Invalid return value for _omc_resolve(): {retval} - expect str") + result_parts = retval.split('\n') pathstr_resolved = result_parts[1] pathstr_resolved = pathstr_resolved[1:-1] # remove quotes except OMCSessionException as ex: @@ -514,7 +534,95 @@ def size(self) -> int: raise OMCSessionException(f"Error reading file size for path {self.as_posix()}!") + class OMPathRunnerABC(OMPathABC, metaclass=abc.ABCMeta): + """ + Base function for OMPath definitions *without* OMC server + """ + + def _path(self) -> pathlib.Path: + return pathlib.Path(self.as_posix()) + + class _OMPathRunnerLocal(OMPathRunnerABC): + """ + Implementation of OMPathBase which does not use the session data at all. Thus, this implementation can run + locally without any usage of OMC. + + This class is based on OMPathBase and, therefore, on pathlib.PurePosixPath. This is working well, but it is not + the correct implementation on Windows systems. To get a valid Windows representation of the path, use the + conversion via pathlib.Path(.as_posix()). + """ + + def is_file(self) -> bool: + """ + Check if the path is a regular file. + """ + return self._path().is_file() + + def is_dir(self) -> bool: + """ + Check if the path is a directory. + """ + return self._path().is_dir() + + def is_absolute(self): + """ + Check if the path is an absolute path. + """ + return self._path().is_absolute() + + def read_text(self) -> str: + """ + Read the content of the file represented by this path as text. + """ + return self._path().read_text(encoding='utf-8') + + def write_text(self, data: str): + """ + Write text data to the file represented by this path. + """ + return self._path().write_text(data=data, encoding='utf-8') + + def mkdir(self, parents: bool = True, exist_ok: bool = False): + """ + Create a directory at the path represented by this class. + + The argument parents with default value True exists to ensure compatibility with the fallback solution for + Python < 3.12. In this case, pathlib.Path is used directly and this option ensures, that missing parent + directories are also created. + """ + return self._path().mkdir(parents=parents, exist_ok=exist_ok) + + def cwd(self): + """ + Returns the current working directory as an OMPathBase object. + """ + return self._path().cwd() + + def unlink(self, missing_ok: bool = False) -> None: + """ + Unlink (delete) the file or directory represented by this path. + """ + return self._path().unlink(missing_ok=missing_ok) + + def resolve(self, strict: bool = False): + """ + Resolve the path to an absolute path. This is done based on available OMC functions. + """ + path_resolved = self._path().resolve(strict=strict) + return type(self)(path_resolved, session=self._session) + + def size(self) -> int: + """ + Get the size of the file in bytes - implementation baseon on pathlib.Path. + """ + if not self.is_file(): + raise OMCSessionException(f"Path {self.as_posix()} is not a file!") + + path = self._path() + return path.stat().st_size + OMCPath = _OMCPath + OMPathRunnerLocal = _OMPathRunnerLocal class ModelExecutionException(Exception): @@ -601,67 +709,6 @@ def run(self) -> int: return returncode -class OMCSessionZMQ: - """ - This class is a compatibility layer for the new schema using OMCSession* classes. - """ - - def __init__( - self, - timeout: float = 10.00, - omhome: Optional[str] = None, - omc_process: Optional[OMCSessionABC] = None, - ) -> None: - """ - Initialisation for OMCSessionZMQ - """ - warnings.warn(message="The class OMCSessionZMQ is depreciated and will be removed in future versions; " - "please use OMCProcess* classes instead!", - category=DeprecationWarning, - stacklevel=2) - - if omc_process is None: - omc_process = OMCSessionLocal(omhome=omhome, timeout=timeout) - elif not isinstance(omc_process, OMCSessionABC): - raise OMCSessionException("Invalid definition of the OMC process!") - self.omc_process = omc_process - - def __del__(self): - del self.omc_process - - @staticmethod - def escape_str(value: str) -> str: - """ - Escape a string such that it can be used as string within OMC expressions, i.e. escape all double quotes. - """ - return OMCSessionABC.escape_str(value=value) - - def omcpath(self, *path) -> OMPathABC: - """ - Create an OMCPath object based on the given path segments and the current OMC process definition. - """ - return self.omc_process.omcpath(*path) - - def omcpath_tempdir(self, tempdir_base: Optional[OMPathABC] = None) -> OMPathABC: - """ - Get a temporary directory using OMC. It is our own implementation as non-local usage relies on OMC to run all - filesystem related access. - """ - return self.omc_process.omcpath_tempdir(tempdir_base=tempdir_base) - - def execute(self, command: str): - return self.omc_process.execute(command=command) - - def sendExpression(self, command: str, parsed: bool = True) -> Any: - """ - Send an expression to the OMC server and return the result. - - The complete error handling of the OMC result is done within this method using '"getMessagesStringInternal()'. - Caller should only check for OMCSessionException. - """ - return self.omc_process.sendExpression(expr=command, parsed=parsed) - - class PostInitCaller(type): """ Metaclass definition to define a new function __post_init__() which is called after all __init__() functions where @@ -719,12 +766,20 @@ def __init__( # store variables self._timeout = timeout + # command prefix (to be used for docker or WSL) + self._cmd_prefix: list[str] = [] def __post_init__(self) -> None: """ Post initialisation method. """ + def get_cmd_prefix(self) -> list[str]: + """ + Get session definition used for this instance of OMPath. + """ + return self._cmd_prefix.copy() + @staticmethod def escape_str(value: str) -> str: """ @@ -753,7 +808,7 @@ def set_workdir(self, workdir: OMPathABC) -> None: @abc.abstractmethod def omcpath(self, *path) -> OMPathABC: """ - Create an OMPathBase object based on the given path segments and the current class. + Create an OMPathABC object based on the given path segments and the current class. """ @abc.abstractmethod @@ -817,13 +872,12 @@ def __init__( """ Initialisation for OMCSession """ + super().__init__(timeout=timeout) # some helper data self.model_execution_windows = platform.system() == "Windows" self.model_execution_local = False - # store variables - self._timeout = timeout # generate a random string for this instance of OMC self._random_string = uuid.uuid4().hex # get a temporary directory @@ -900,6 +954,7 @@ def __del__(self): self._omc_process.kill() self._omc_process.wait() finally: + self._omc_process = None def _timeout_loop( @@ -1289,6 +1344,77 @@ def _omc_port_get(self) -> str: return port +class OMCSessionZMQ(OMSessionABC): + """ + This class is a compatibility layer for the new schema using OMCSession* classes. + """ + + def __init__( + self, + timeout: float = 10.00, + omhome: Optional[str] = None, + omc_process: Optional[OMCSessionABC] = None, + ) -> None: + """ + Initialisation for OMCSessionZMQ + """ + warnings.warn(message="The class OMCSessionZMQ is depreciated and will be removed in future versions; " + "please use OMCProcess* classes instead!", + category=DeprecationWarning, + stacklevel=2) + + if omc_process is None: + omc_process = OMCSessionLocal(omhome=omhome, timeout=timeout) + elif not isinstance(omc_process, OMCSessionABC): + raise OMCSessionException("Invalid definition of the OMC process!") + self.omc_process = omc_process + + def __del__(self): + if hasattr(self, 'omc_process'): + del self.omc_process + + @staticmethod + def escape_str(value: str) -> str: + """ + Escape a string such that it can be used as string within OMC expressions, i.e. escape all double quotes. + """ + return OMCSessionABC.escape_str(value=value) + + def omcpath(self, *path) -> OMPathABC: + """ + Create an OMCPath object based on the given path segments and the current OMC process definition. + """ + return self.omc_process.omcpath(*path) + + def omcpath_tempdir(self, tempdir_base: Optional[OMPathABC] = None) -> OMPathABC: + """ + Get a temporary directory using OMC. It is our own implementation as non-local usage relies on OMC to run all + filesystem related access. + """ + return self.omc_process.omcpath_tempdir(tempdir_base=tempdir_base) + + def execute(self, command: str): + return self.omc_process.execute(command=command) + + def sendExpression(self, command: str, parsed: bool = True) -> Any: + """ + Send an expression to the OMC server and return the result. + + The complete error handling of the OMC result is done within this method using '"getMessagesStringInternal()'. + Caller should only check for OMCSessionException. + """ + return self.omc_process.sendExpression(expr=command, parsed=parsed) + + def get_version(self) -> str: + return self.omc_process.get_version() + + def model_execution_prefix(self, cwd: Optional[OMPathABC] = None) -> list[str]: + return self.omc_process.model_execution_prefix(cwd=cwd) + + def set_workdir(self, workdir: OMPathABC) -> None: + return self.omc_process.set_workdir(workdir=workdir) + + class OMCSessionDockerABC(OMCSessionABC, metaclass=abc.ABCMeta): """ Base class for OMCSession implementations which run the OMC server in a Docker container. @@ -1735,3 +1861,61 @@ def _omc_port_get(self) -> str: f"pid={self._omc_process.pid if isinstance(self._omc_process, subprocess.Popen) else '?'}") return port + + +class OMSessionRunner(OMSessionABC): + """ + Implementation based on OMSessionABC without any use of an OMC server. + """ + + def __init__( + self, + timeout: float = 10.00, + version: str = "1.27.0" + ) -> None: + super().__init__(timeout=timeout) + self.model_execution_local = True + self._version = version + + def __post_init__(self) -> None: + """ + No connection to an OMC server is created by this class! + """ + + def model_execution_prefix(self, cwd: Optional[OMPathABC] = None) -> list[str]: + """ + Helper function which returns a command prefix. + """ + return [] + + def get_version(self) -> str: + """ + We can not provide an OM version as we are not link to an OMC server. Thus, the provided version string is used + directly. + """ + return self._version + + def set_workdir(self, workdir: OMPathABC) -> None: + """ + Set the workdir for this session. + """ + os.chdir(workdir.as_posix()) + + def omcpath(self, *path) -> OMPathABC: + """ + Create an OMCPath object based on the given path segments and the current OMCSession* class. + """ + return OMPathRunnerLocal(*path, session=self) + + def omcpath_tempdir(self, tempdir_base: Optional[OMPathABC] = None) -> OMPathABC: + """ + Get a temporary directory without using OMC. + """ + if tempdir_base is None: + tempdir_str = tempfile.gettempdir() + tempdir_base = self.omcpath(tempdir_str) + + return self._tempdir(tempdir_base=tempdir_base) + + def sendExpression(self, expr: str, parsed: bool = True) -> Any: + raise OMCSessionException(f"{self.__class__.__name__} does not uses an OMC server!") diff --git a/OMPython/__init__.py b/OMPython/__init__.py index b04db846..d6016e53 100644 --- a/OMPython/__init__.py +++ b/OMPython/__init__.py @@ -19,6 +19,8 @@ ModelicaSystemDoE, ModelicaDoEOMC, ModelicaSystemError, + ModelicaSystemRunner, + ModelicaDoERunner, doe_get_solutions, ) @@ -26,6 +28,8 @@ OMPathABC, OMCPath, + OMSessionRunner, + OMCSessionABC, ModelExecutionData, @@ -55,9 +59,14 @@ 'ModelicaDoEOMC', 'ModelicaSystemError', + 'ModelicaSystemRunner', + 'ModelicaDoERunner', + 'OMPathABC', 'OMCPath', + 'OMSessionRunner', + 'OMCSessionABC', 'doe_get_solutions', diff --git a/tests/test_ModelicaDoERunner.py b/tests/test_ModelicaDoERunner.py new file mode 100644 index 00000000..2d41315f --- /dev/null +++ b/tests/test_ModelicaDoERunner.py @@ -0,0 +1,158 @@ +import pathlib +import sys + +import numpy as np +import pytest + +import OMPython + +skip_python_older_312 = pytest.mark.skipif( + sys.version_info < (3, 12), + reason="OMCPath(non-local) only working for Python >= 3.12.", +) + + +@pytest.fixture +def model_doe(tmp_path: pathlib.Path) -> pathlib.Path: + # see: https://trac.openmodelica.org/OpenModelica/ticket/4052 + mod = tmp_path / "M.mo" + # TODO: update for bool and string parameters; check if these can be used in DoE + mod.write_text(""" +model M + parameter Integer p=1; + parameter Integer q=1; + parameter Real a = -1; + parameter Real b = -1; + Real x[p]; + Real y[q]; +equation + der(x) = a * fill(1.0, p); + der(y) = b * fill(1.0, q); +end M; +""") + return mod + + +@pytest.fixture +def param_doe() -> dict[str, list]: + param = { + # simple + 'a': [5, 6], + 'b': [7, 8], + } + return param + + +def test_ModelicaDoERunner_ModelicaSystemOMC(tmp_path, model_doe, param_doe): + tmpdir = tmp_path / 'DoE' + tmpdir.mkdir(exist_ok=True) + + mod = OMPython.ModelicaSystemOMC() + mod.model( + model_file=model_doe, + model_name="M", + ) + + resultfile_mod = mod.getWorkDirectory() / f"{mod.get_model_name()}_res_mod.mat" + _run_simulation(mod=mod, resultfile=resultfile_mod, param=param_doe) + + doe_mod = OMPython.ModelicaDoERunner( + mod=mod, + parameters=param_doe, + resultpath=tmpdir, + ) + + _run_ModelicaDoERunner(doe_mod=doe_mod) + + _check_runner_result(mod=mod, doe_mod=doe_mod) + + +def test_ModelicaDoERunner_ModelicaSystemRunner(tmp_path, model_doe, param_doe): + tmpdir = tmp_path / 'DoE' + tmpdir.mkdir(exist_ok=True) + + mod = OMPython.ModelicaSystemOMC() + mod.model( + model_file=model_doe, + model_name="M", + ) + + resultfile_mod = mod.getWorkDirectory() / f"{mod.get_model_name()}_res_mod.mat" + _run_simulation(mod=mod, resultfile=resultfile_mod, param=param_doe) + + # run the model using only the runner class + omcs = OMPython.OMSessionRunner( + version=mod.get_session().get_version(), + ) + modr = OMPython.ModelicaSystemRunner( + session=omcs, + work_directory=mod.getWorkDirectory(), + ) + modr.setup( + model_name="M", + ) + doe_mod = OMPython.ModelicaDoERunner( + mod=modr, + parameters=param_doe, + resultpath=tmpdir, + ) + + _run_ModelicaDoERunner(doe_mod=doe_mod) + + _check_runner_result(mod=mod, doe_mod=doe_mod) + + +def _run_simulation(mod, resultfile, param): + simOptions = {"stopTime": 1.0, "stepSize": 0.1, "tolerance": 1e-8} + mod.setSimulationOptions(**simOptions) + mod.simulate(resultfile=resultfile) + + assert resultfile.exists() + + +def _run_ModelicaDoERunner(doe_mod): + doe_count = doe_mod.prepare() + assert doe_count == 4 + + doe_def = doe_mod.get_doe_definition() + assert isinstance(doe_def, dict) + assert len(doe_def.keys()) == doe_count + + doe_cmd = doe_mod.get_doe_command() + assert isinstance(doe_cmd, dict) + assert len(doe_cmd.keys()) == doe_count + + doe_status = doe_mod.simulate() + assert doe_status is True + + +def _check_runner_result(mod, doe_mod): + doe_cmd = doe_mod.get_doe_command() + doe_def = doe_mod.get_doe_definition() + + doe_sol = OMPython.doe_get_solutions( + msomc=mod, + resultpath=doe_mod.get_resultpath(), + doe_def=doe_def, + ) + assert isinstance(doe_sol, dict) + assert len(doe_sol.keys()) == len(doe_cmd.keys()) + + assert sorted(doe_def.keys()) == sorted(doe_cmd.keys()) + assert sorted(doe_cmd.keys()) == sorted(doe_sol.keys()) + + for resultfilename in doe_def: + row = doe_def[resultfilename] + + assert resultfilename in doe_sol + sol = doe_sol[resultfilename] + + var_dict = { + # simple / non-structural parameters + 'a': float(row['a']), + 'b': float(row['b']), + } + + for var in var_dict: + assert var in sol['data'] + assert np.isclose(sol['data'][var][-1], var_dict[var]) diff --git a/tests/test_ModelicaSystemRunner.py b/tests/test_ModelicaSystemRunner.py new file mode 100644 index 00000000..35541c99 --- /dev/null +++ b/tests/test_ModelicaSystemRunner.py @@ -0,0 +1,96 @@ +import numpy as np +import pytest + +import OMPython + + +@pytest.fixture +def model_firstorder_content(): + return """ +model M + Real x(start = 1, fixed = true); + parameter Real a = -1; +equation + der(x) = x*a; +end M; +""" + + +@pytest.fixture +def model_firstorder(tmp_path, model_firstorder_content): + mod = tmp_path / "M.mo" + mod.write_text(model_firstorder_content) + return mod + + +@pytest.fixture +def param(): + x0 = 1 + a = -1 + tau = -1 / a + stopTime = 5*tau + + return { + 'x0': x0, + 'a': a, + 'stopTime': stopTime, + } + + +def test_runner(model_firstorder, param): + # create a model using ModelicaSystem + mod = OMPython.ModelicaSystem() + mod.model( + model_file=model_firstorder, + model_name="M", + ) + + resultfile_mod = mod.getWorkDirectory() / f"{mod.get_model_name()}_res_mod.mat" + _run_simulation(mod=mod, resultfile=resultfile_mod, param=param) + + # run the model using only the runner class + omcs = OMPython.OMSessionRunner( + version=mod.get_session().get_version(), + ) + modr = OMPython.ModelicaSystemRunner( + session=omcs, + work_directory=mod.getWorkDirectory(), + ) + modr.setup( + model_name="M", + ) + + resultfile_modr = mod.getWorkDirectory() / f"{mod.get_model_name()}_res_modr.mat" + _run_simulation(mod=modr, resultfile=resultfile_modr, param=param) + + # cannot check the content as runner does not have the capability to open a result file + assert resultfile_mod.size() == resultfile_modr.size() + + # check results + _check_result(mod=mod, resultfile=resultfile_mod, param=param) + _check_result(mod=mod, resultfile=resultfile_modr, param=param) + + +def _run_simulation(mod, resultfile, param): + simOptions = {"stopTime": param['stopTime'], "stepSize": 0.1, "tolerance": 1e-8} + mod.setSimulationOptions(**simOptions) + mod.simulate(resultfile=resultfile) + + assert resultfile.exists() + + +def _check_result(mod, resultfile, param): + x = mod.getSolutions(resultfile=resultfile, varList="x") + t, x2 = mod.getSolutions(resultfile=resultfile, varList=["time", "x"]) + assert (x2 == x).all() + sol_names = mod.getSolutions(resultfile=resultfile) + assert isinstance(sol_names, tuple) + assert "time" in sol_names + assert "x" in sol_names + assert "der(x)" in sol_names + with pytest.raises(OMPython.ModelicaSystemError): + mod.getSolutions(resultfile=resultfile, varList="thisVariableDoesNotExist") + assert np.isclose(t[0], 0), "time does not start at 0" + assert np.isclose(t[-1], param['stopTime']), "time does not end at stopTime" + x_analytical = param['x0'] * np.exp(param['a']*t) + assert np.isclose(x, x_analytical, rtol=1e-4).all()