Source code for qmflows.packages._packages

"""Common funcionality to call all the quantum packages."""

from __future__ import annotations

import types
import fnmatch
import importlib
import inspect
import os
import sys
import warnings
from abc import abstractmethod, ABC
from types import ModuleType
from pathlib import Path
from functools import partial
from os.path import join
from warnings import warn
from collections.abc import Callable, Mapping, Iterator
from typing import Any, ClassVar, TypeVar, TYPE_CHECKING, overload

import numpy as np
import pandas as pd
from more_itertools import collapse
from noodles import has_scheduled_methods, schedule, serial
from noodles.run.threading.sqlite3 import run_parallel
from noodles.serial import AsDict, Registry
from noodles.serial.numpy import SerNumpyScalar, arrays_to_hdf5
from noodles.serial.path import SerPath
from noodles.serial.reasonable import SerReasonableObject
from scm import plams

from .. import __file__ as _qmflows_file
from ._serializer import SerMolecule, SerMol, SerSettings, SerNDFrame, SerReduce
from ..type_hints import WarnMap, WarnDict, WarnParser, PromisedObject, MolType, _Settings
from ..utils import InitRestart
from ..fileFunctions import yaml2Settings
from .._settings import _Settings as _SettingsType, Settings
from ..warnings_qmflows import QMFlows_Warning

if TYPE_CHECKING:
    from rdkit import Chem
    from scm.plams import from_rdmol
else:
    try:
        from rdkit import Chem
        from scm.plams import from_rdmol
    except ImportError:
        Chem = None

        def from_rdmol(mol: plams.Molecule) -> plams.Molecule:
            return mol

_Self = TypeVar("_Self", bound="Package")

__all__ = ['Package', 'Result', 'run']


def load_properties(name: str, prefix: str = 'properties') -> _Settings:
    """Load the properties-defining .yaml file from ."""
    file_name = os.path.join(
        os.path.dirname(_qmflows_file), 'data', 'dictionaries', f'{prefix}{name}.yaml',
    )
    with open(file_name, "r", encoding="utf8") as f:
        return yaml2Settings(f.read(), mapping_type=_SettingsType)


[docs]class Result: """Class containing the results associated with a quantum chemistry simulation.""" #: A :class:`Settings` instance with :class:`Result`-specific properties. #: Should be set when creating a subclass. prop_mapping: ClassVar[_Settings] = NotImplemented
[docs] def __init__(self, settings: None | Settings, molecule: None | plams.Molecule, job_name: str, dill_path: None | str | os.PathLike[str] = None, plams_dir: None | str | os.PathLike[str] = None, work_dir: None | str | os.PathLike[str] = None, status: str = 'successful', warnings: None | WarnMap = None) -> None: """Initialize a :class:`Result` instance. :param settings: Job Settings. :type settings: :class:`qmflows.Settings` :param molecule: molecular Geometry :type molecule: :class:`plams.Molecule<scm.plams.mol.molecule.Molecule>` :param job_name: Name of the computations :type job_name: str :param dill_path: The absolute path to the pickled .dill file. :type dill_path: str :param plams_dir: path to the ``Plams`` folder. :type plams_dir: str :param work_dir: scratch or another directory different from the ``plams_dir``. :type: work_dir: str """ plams_dir = None if plams_dir is None else Path(plams_dir) self.settings = settings self._molecule = molecule self.archive = {"plams_dir": plams_dir, 'work_dir': work_dir} self.job_name = job_name self.status = status self.warnings = warnings self._results_open = False self._results = dill_path
def __deepcopy__(self, memo: None | dict[int, Any] = None) -> Result: """Return a deep copy of this instance.""" cls = type(self) # Construct an empty instance while bypassing __init__() copy_instance = cls.__new__(cls) # Manually set all instance variables copy_instance.__dict__ = self.__dict__.copy() return copy_instance # Hide `__getattr__` from the type checker and use explicit attribute annotations if not TYPE_CHECKING: def __getattr__(self, prop: str) -> Any: """Return a section of the results. For example: ..code:: python >>> from qmflows.packages import Result >>> result = Result(...) # doctest: +SKIP >>> dipole = result.dipole # doctest: +SKIP """ is_private = prop.startswith('__') and prop.endswith('__') has_crashed = self.status in {'failed', 'crashed'} if not has_crashed and prop in self.prop_mapping: return self.get_property(prop) elif not (has_crashed or is_private or prop in self.prop_mapping): if self._results_open: warn( f"Generic property {prop!r} not defined", category=QMFlows_Warning, stacklevel=2, ) # Do not issue this warning if the Results object is still pickled else: # Unpickle the Results instance and try again self._unpack_results() try: return vars(self)[prop] # Avoid recursive `getattr` calls except KeyError: warn( f"Generic property {prop!r} not defined", category=QMFlows_Warning, stacklevel=2, ) elif has_crashed and not is_private: warn(f""" It is not possible to retrieve property: {prop!r} Because Job: {self.job_name!r} has {self.status}. Check the output.\n Are you sure that you have the package installed or you have loaded the package in the cluster. For example: `module load AwesomeQuantumPackage/3.141592` """, category=QMFlows_Warning, stacklevel=2) return None def __dir__(self) -> list[str]: """Implement ``dir(self)``.""" # Insert the highly dynamic `get_property`-based attributes dir_set = set(super().__dir__()) | self.prop_mapping.keys() return sorted(dir_set)
[docs] def get_property(self, prop: str) -> Any: """Look for the optional arguments to parse a property, which are stored in the properties dictionary.""" # noqa try: return super().__getattribute__(prop) except AttributeError: pass # Read the .yaml dictionary than contains the parsers names ds = self.prop_mapping[prop] # extension of the output file containing the property value file_ext = ds.get('file_ext') # If there is not work_dir returns None work_dir = self.archive.get('work_dir') # Plams dir plams_dir = self.archive['plams_dir'] # Search for the specified output file in the folders if file_ext != "rkf": file_pattern = ds.get( 'file_pattern', f'{self.job_name}*.{file_ext}') else: # AMS rename all the DFTB job names file_pattern = "dftb.rkf" output_files = list(collapse(map(partial(find_file_pattern, file_pattern), [plams_dir, work_dir]))) if output_files: file_out = output_files[0] fun = getattr(import_parser(ds), ds['function']) # Read the keywords arguments from the properties dictionary kwargs = ds.get('kwargs', {}) ret = ignore_unused_kwargs(fun, file_out, plams_dir=plams_dir, **kwargs) # Cache the property and return if sys.getsizeof(ret) < 10e5: setattr(self, prop, ret) return ret else: raise FileNotFoundError(f""" Property {prop} not found. No output file called: {file_pattern}. Folder used: plams_dir = {plams_dir}\n work_dir {work_dir}\n """)
@property def results(self) -> None | plams.Results: """Getter for :attr:`Result.results`. Get will load the .dill file and add all of its class attributes to this instance, barring the following three exceptions: * Private attributes/methods. * Magic methods. * Methods/attributes with names already contained within this instance. This attribute's value is set to ``None`` if the unpickling process fails. """ if not self._results_open: self._unpack_results() return self._results def _unpack_results(self) -> None: """Unpack the pickled .dill file for :attr:`Results.results`.""" self._results_open = True # Do not bother unpacking if None; i.e. if the job crashed if self._results is None: return # Ignore the Result.__getattr__() warnings for now with warnings.catch_warnings(): warnings.simplefilter("ignore", category=QMFlows_Warning) # Unpickle the results try: results = plams.load(self._results).results assert results is not None, f'Failed to unpickle {self._results!r}' except (AssertionError, plams.FileError) as ex: file_exc = ex else: file_exc = None attr_set = set(dir(self)) for name in dir(results): if name.startswith('_') or name in attr_set: continue # Skip methods which are either private, magic or preexisting results_func = getattr(results, name) setattr(self, name, results_func) # Failed to find or unpickle the .dill file; issue a warning if file_exc is not None: self._results = None warn(f"{file_exc}, setting value to 'None'", category=QMFlows_Warning) else: self._results = results
PT = TypeVar("PT", bound="Package")
[docs]@has_scheduled_methods class Package(ABC): """:class:`Package` is the base class to handle the invocation to different quantum package. The only relevant (instance) attribute of this class is :attr:`Package.pkg_name` which is a string representing the quantum package name that is going to be used to carry out the compuation. The life-cycle of :class:`Package` consists of 5 general steps: 1. Initializing an instance: :meth:`Package.__init__`. 2. Starting the job: :meth:`Package.__call__`. This method handles the task distribution between the instance's various methods. 3. Converting all generic into specific settings: :meth:`Package.generic2specific`. 4. Running the actual :class:`plams.Job<scm.plams.core.basejob.Job>` (including pre- and post-processing): :meth:`Package.run_job`. 5. Returning the final :class:`Result` instance at the end of :meth:`Package.__call__`. """ #: A class variable pointing to the :class:`Package`-specific :class:`Result` class. #: Should be set when creating a subclass. result_type: ClassVar[type[Result]] = NotImplemented #: A class variable with the name of the generic .yaml file. #: Should be set when creating a subclass. generic_mapping: ClassVar[_Settings] = NotImplemented #: An instance variable with the name of the respective quantum chemical package. pkg_name: str @property def __defaults__(self) -> tuple[Any, ...] | None: """Get access to :attr:`~__call__.__defaults__`.""" return self.__call__.__defaults__ @property def __kwdefaults__(self) -> dict[str, Any]: """Get access to :attr:`~__call__.__kwdefaults__`.""" return self.__call__.__kwdefaults__
[docs] def __init__(self, pkg_name: str) -> None: """Initialize a :class:`Package` instance. Parameters ---------- pkg_name : :class:`str` The name of the respective quantum chemical package. See :attr:`Package.pkg_name`. """ self.pkg_name = pkg_name # Ensure compatibility with the (typing-only) `builtins.function` class cls = type(self) self.__name__: str = pkg_name self.__qualname__: str = pkg_name self.__module__: str = cls.__module__ self.__annotations__: dict[str, Any] = cls.__call__.__annotations__ self.__signature__: inspect.Signature = inspect.signature(cls.__call__) self.__doc__ = self.__call__.__doc__
@overload def __get__(self: _Self, obj: None, type: type) -> _Self: ... @overload def __get__(self, obj: object, type: None | type = ...) -> types.MethodType: ... def __get__(self, obj: object, type: None | type = None) -> Any: """Allows binding :class:`Package` instances as methods.""" if obj is None and type is None: raise TypeError("__get__(None, None) is invalid") elif obj is None: return self else: return types.MethodType(self, obj) def __reduce__(self: PT) -> tuple[type[PT], tuple[str]]: """A helper function for :mod:`pickle`.""" return type(self), (self.pkg_name,)
[docs] @schedule( display="Running {self.pkg_name} {job_name}...", store=True, confirm=True) def __call__(self, settings: Settings, mol: MolType, job_name: str = '', validate_output: bool = True, **kwargs: Any) -> Result: r"""Perform a job with the package specified by :attr:`Package.pkg_name`. Parameters ---------- settings : :class:`qmflows.Settings` The user settings. mol : :class:`plams.Molecule<scm.plams.mol.molecule.Molecule>` or :class:`rdkit.Mol<rdkit.Chem.rdchem.Mol>` A PLAMS or RDKit molecule to-be passed to the calculation. job_name : :class:`str` The name of the job. validate_output : :class:`bool` If :data:`True`, perform a package-specific validation of the output files' content. Only relevant if the particular :class:`Package` subclass has actually implemented output validation. \**kwargs : :data:`~typing.Any` Further keyword arguments to-be passed to :meth:`Package.prerun`, :meth:`Package.run_job` and :meth:`Package.post_run`. Returns ------- :class:`Result` A new Result instance. """ # noqa kwargs['validate_output'] = validate_output # Ensure that these variables have an actual value # Precaution against passing unbound variables to self.postrun() output_warnings = plams_mol = job_settings = None # There are not data from previous nodes in the dependecy trees # because of a failure upstream or the user provided None as argument if all(x is not None for x in [settings, mol]): # Check if plams finishes normally try: # If molecule is an RDKIT molecule translate it to plams plams_mol = from_rdmol(mol) if job_name != '': kwargs['job_name'] = job_name # Settings transformations self.prerun(settings, plams_mol, **kwargs) job_settings = self.generic2specific(settings, mol) # Run the job result = self.run_job(job_settings, plams_mol, **kwargs) # Check if there are warnings in the output that render the calculation # useless from the point of view of the user warnings_tolerance = kwargs.get( "terminate_job_in_case_of_warnings") output_warnings = result.warnings if warnings_tolerance is not None and output_warnings is not None: issues = [w(msg) for msg, w in output_warnings.items() if w in warnings_tolerance] if issues: warn(f""" The Following Warning are rendered unacceptable in the Current Workflow: {issues}\n The results from Job: {job_name} are discarded. """, category=QMFlows_Warning) result = self.result_type(None, None, job_name=job_name, dill_path=None, status='failed') # Otherwise pass an empty Result instance downstream except plams.core.errors.PlamsError as err: warn(f"Job {job_name} has failed.\n{err}", category=QMFlows_Warning) result = self.result_type(None, None, job_name=job_name, dill_path=None, status='failed') else: warn(f""" Job {job_name} has failed. Either the Settings or Molecule objects are None, probably due to a previous calculation failure """, category=QMFlows_Warning) # Send an empty object downstream result = self.result_type(None, None, job_name=job_name, dill_path=None, status='failed') # Label this calculation as failed if there are not dependecies coming # from upstream self.postrun(result, output_warnings, job_settings, plams_mol, **kwargs) return result
[docs] def generic2specific(self, settings: Settings, mol: None | plams.Molecule = None) -> Settings: """Traverse *settings* and convert generic into package specific keys. Traverse all the key, value pairs of the *settings*, translating the generic keys into package specific keys as defined in the specific dictionary. If one key is not in the specific dictionary an error is raised. These new specific settings take preference over existing specific settings. Parameters ---------- settings : :class:`qmflows.Settings` Settings provided by the user. mol : :class:`plams.Molecule<scm.plams.mol.molecule.Molecule>`, optional A PLAMS molecule to-be passed to the calculation. Returns ------- :class:`qmflows.Settings` A new settings instance without any generic keys. """ specific_from_generic_settings = Settings() for k, v in settings.items(): if k == "specific": continue elif k == 'input': # Allow for PLAMS-style input; i.e. settings.input.blablabla specific_from_generic_settings.specific[self.pkg_name].update( v) continue if not self.generic_mapping.get(k): self.handle_special_keywords( specific_from_generic_settings, k, v, mol) return settings.overlay(specific_from_generic_settings)
def __repr__(self) -> str: """Create a string representation of this instance. Returns ------- :class:`str` A string representation of this instnce. """ values = self.__signature__.parameters.values() sgn = inspect.Signature([ inspect.Parameter(name=v.name, default=v.default, kind=v.kind) for v in values ]) return f"<function {self.__qualname__}{sgn}>"
[docs] def prerun(self, settings: Settings, mol: plams.Molecule, **kwargs: Any) -> None: r"""Run a set of tasks before running the actual job. Parameters ---------- settings : :class:`qmflows.Settings` Settings provided by the user. Note that these settings can still contain generic keywords. mol : :class:`plams.Molecule<scm.plams.mol.molecule.Molecule>`, optional A PLAMS molecule to-be passed to the calculation. \**kwargs : :data:`~typing.Any` Further keyword arguments to-be passed to :meth:`Package.run_job`. See Also -------- :meth:`Package.run_job` A method which handles the running of the actual :class:`plams.Job<scm.plams.core.basejob.Job>`. """ pass
[docs] def postrun(self, result: Result, output_warnings: None | WarnMap = None, settings: None | Settings = None, mol: None | plams.Molecule = None, **kwargs: Any) -> None: r"""Run a set of tasks after running the actual job. Parameters ---------- result : :class:`Result` A Result instance. output_warnings : :class:`~collections.abc.Mapping` [:class:`str`, :class:`type` [:exc:`Warning`]], optional A Mapping which maps an error messages to Warning types. settings : :class:`qmflows.Settings`, optional User-provided Settings as processed by :meth:`Package.generic2specific`. Will be ``None`` if an error occured before this point. mol : :class:`plams.Molecule<scm.plams.mol.molecule.Molecule>`, optional A PLAMS molecule as passed to the calculation. Will be ``None`` if an error occured before the molecule was parsed in :meth:`Package.__call__`. \**kwargs : :data:`~typing.Any` Further keyword arguments that were passed to :meth:`Package.run_job`. See Also -------- :meth:`Package.run_job` A method which handles the running of the actual :class:`plams.Job<scm.plams.core.basejob.Job>`. """ # noqa pass
[docs] @staticmethod @abstractmethod def handle_special_keywords(settings: Settings, key: str, value: Any, mol: plams.Molecule) -> None: """`Abstract method <https://docs.python.org/3/library/abc.html#abc.abstractmethod>`_; should be implemented by the child class. A method providing additional processing for :class:`Package` dependant generic keywords. Parameters ---------- settings : :class:`qmflows.Settings`, optional User-provided Settings as being processed by :meth:`Package.generic2specific`. key : :class:`str` The key associated with the special keyword value : :data:`~typing.Any` The value associated with the special *key*. mol : :class:`plams.Molecule<scm.plams.mol.molecule.Molecule>` A PLAMS molecule to-be passed to the calculation. See Also -------- :meth:`Package.generic2specific` Traverse *settings* and convert generic into package specific keys. """ # noqa raise NotImplementedError("trying to call an abstract method")
[docs] @classmethod @abstractmethod def run_job(cls, settings: Settings, mol: plams.Molecule, job_name: str = "job", work_dir: None | str | os.PathLike[str] = None, validate_output: bool = False, **kwargs: Any) -> Result: r"""`Abstract method <https://docs.python.org/3/library/abc.html#abc.abstractmethod>`_; should be implemented by the child class. A method which handles the running of the actual :class:`plams.Job<scm.plams.core.basejob.Job>`. Parameters ---------- settings : :class:`qmflows.Settings`, optional User-provided Settings as processed by :meth:`Package.generic2specific`. mol : :class:`plams.Molecule<scm.plams.mol.molecule.Molecule>` A PLAMS molecule to-be passed to the calculation. job_name : :class:`str` The name of the job. workdir : :class:`str` or :class:`~os.PathLike`, optional The path+folder name of the PLAMS working directory. validate_output : :class:`bool` If :data:`True`, perform a package-specific validation of the output files' content. Only relevant if the particular :class:`Package` subclass has actually implemented output validation. \**kwargs : :data:`~typing.Any` Further keyword arguments. Returns ------- :class:`Result` A new Result instance. """ # noqa raise NotImplementedError("The class representing a given quantum packages " "should implement this method")
[docs]def run(job: PromisedObject, runner: None | str = None, path: None | str | os.PathLike[str] = None, folder: None | str | os.PathLike[str] = None, load_jobs: bool = False, **kwargs: Any) -> Result: r"""Pickup a runner and initialize it. Serves as a wrapper around :func:`noodles.run_parallel`. Parameters ---------- job : :class:`noodles.PromisedObject<noodles.interface.PromisedObject>` The computation to run as constructed by :meth:`Package.__call__`. runner : :class:`str`, optional The job runner. Note that this value should be left at ``None``. path : :class:`str` or :class:`~os.PathLike`, optional The path where the PLAMS working directory will be created. Will default to the current working directory if ``None``. folder : :class:`str` or :class:`~os.PathLike`, optional The name of the new PLAMS working directory. Will default to ``"plams_workdir"`` if ``None``. load_jobs : :class:`bool` Load all pre-existing Jobs (contained within the working directory) into memory. Note that this can be quite slow if a large number of pre-existing jobs is present. \**kwargs : :data:`~typing.Any` Further keyword arguments to-be passed to :func:`call_default`. Returns ------- :class:`Result` A new Result instance. The exact type will depend on **job**. See Also -------- :func:`noodles.run_parallel` Run a workflow in parallel threads, storing results in a Sqlite3 database. """ with InitRestart(path=path, folder=folder): plams.config.log.stdout = 0 if runner is None: return call_default(job, kwargs.get('n_processes', 1), kwargs.get('always_cache', True)) else: raise ValueError(f"Don't know runner: {runner!r}")
def call_default(wf: PromisedObject, n_processes: int, always_cache: bool) -> Result: """Run locally using several threads. Caching can be turned off by specifying ``cache=None``. """ # In case 'default_jobmanager' is not set (for some reason) try: workdir = plams.config.get('default_jobmanager').workdir except AttributeError as ex: raise plams.PlamsError("Failed to initialize the PLAMS jobmanager") from ex db_file = join(workdir, 'cache.db') return run_parallel( wf, n_threads=n_processes, registry=registry, db_file=db_file, always_cache=always_cache, echo_log=False) _REGISTRY_TYPES = { Package: SerReduce(Package), Path: SerPath(), plams.Molecule: SerMolecule(), Result: AsDict(Result), Settings: SerSettings(), plams.KFFile: SerReasonableObject(plams.KFFile), plams.KFReader: SerReasonableObject(plams.KFReader), np.floating: SerNumpyScalar(), np.integer: SerNumpyScalar(), pd.DataFrame: SerNDFrame(pd.DataFrame), pd.Series: SerNDFrame(pd.Series), } if Chem is not None: _REGISTRY_TYPES[Chem.Mol] = SerMol() #: A :class:`Registry` instance to-be returned by :func:`registry`. REGISTRY = Registry( parent=serial.base() + arrays_to_hdf5(), types=_REGISTRY_TYPES, ) def registry() -> Registry: """Pass to the noodles infrastructure all the information related to the structure of the :class:`Package` object that is scheduled. This *Registry* class contains hints that help Noodles to encode and decode this Package object. Returns ------- :class:`dict[type, Serialiser] <dict>` A dictionary mapping types to their respective noodles ``Serializer`` instance. """ # noqa return REGISTRY def import_parser( ds: Mapping[str, None | str], module_root: str = "qmflows.parsers", ) -> ModuleType: """Import parser for the corresponding property.""" module_sufix = ds['parser'] if module_sufix is None: return importlib.import_module(module_root) else: return importlib.import_module(f"{module_root}.{module_sufix}") def find_file_pattern( path: str | os.PathLike[str], folder: None | str | os.PathLike[str] = None, ) -> Iterator[str]: if folder is not None and os.path.exists(folder): return (join(folder, x) for x in fnmatch.filter(os.listdir(folder), str(path))) else: return iter([]) def ignore_unused_kwargs(fun: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: """Inspect the signature of function `fun` and filter the keyword arguments. Searches for the keyword arguments that are present in both `**kwargs` and the supplied `fun`; all others are discarded. """ # Find the intersction between `kwargs` and # the (potential) parameters of `fun` ps = inspect.signature(fun).parameters valid_keys = kwargs.keys() & ps.keys() kwargs2 = {k: kwargs[k] for k in valid_keys} return fun(*args, **kwargs2) def parse_output_warnings(job_name: str, plams_dir: None | str | os.PathLike[str], parser: WarnParser, package_warnings: WarnMap) -> None | WarnDict: """Look out for warnings in the output file.""" output_files = find_file_pattern('*out', plams_dir) try: return parser(next(output_files), package_warnings) except StopIteration: warn(f"job: {job_name} has failed. check folder: {plams_dir}", category=QMFlows_Warning) return None