aiida.engine
#
Module with all the internals that make up the engine of aiida-core.
Package Contents#
Classes#
Persister to take saved process instance states and persisting them to the database. |
|
An attribute dictionary that represents an action that a Process could be waiting for to finish. |
|
Enum that describes the action to be taken for a given awaitable. |
|
Enum that describes the class of the target a given awaitable. |
|
Base restart work chain. |
|
Implementation of the CalcJob process. |
|
An abstract class, to define an importer for computations completed outside of AiiDA. |
|
Sub class of plumpy.OutputPort which adds the _pass_to_parser attribute. |
|
Process spec intended for the CalcJob process class. |
|
Client to interact with the daemon. |
|
A simple data class to define an exit code for a |
|
A namespace of ExitCode instances that can be accessed through getattr as well as getitem. |
|
Function process class used for turning functions into a Process |
|
Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic value serialization to database storable types and support non database storable input types as well. |
|
A future that can be interrupted by calling interrupt. |
|
A manager for |
|
Manager of calculation jobs submitted with a specific |
|
Custom object loader for aiida-core. |
|
Raised when an attempt is made to continue a Process that has already excepted before. |
|
Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive serialization of a given mapping onto the ports of the PortNamespace. |
|
This class represents an AiiDA process which can be executed and will have full provenance saved in the database. |
|
A process builder that helps setting up the inputs for creating a new process. |
|
Input namespace for the ProcessBuilder. |
|
Future that waits for a process to complete using both polling and listening for broadcast events if possible. |
|
A namedtuple to define a process handler report for a |
|
Default process spec for process classes defined in aiida-core. |
|
Class that can launch processes by running in the current interpreter or by submitting them to the daemon. |
|
A mixin that adds support to a port to flag that it should not be stored in the database using the non_db=True flag. |
|
A mixin that adds support for a serialization function which is automatically applied on inputs that are not AiiDA data types. |
|
The WorkChain class is the principle component to implement workflows in AiiDA. |
Functions#
Convenience function that will construct an Awaitable for a given class instance with the context action set to APPEND. When the awaitable target is completed it will be appended to a list in the context for a key that is to be defined later |
|
Convenience function that will construct an Awaitable for a given class instance with the context action set to ASSIGN. When the awaitable target is completed it will be assigned to the context for a key that is to be defined later |
|
A decorator to turn a standard python function into a calcfunction. Example usage: |
|
Construct an instance of the Awaitable class that will contain the information related to the action to be taken with respect to the context once the awaitable object is completed. |
|
Return the global AiiDA object loader. |
|
Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it. |
|
Return whether the given function is a process function |
|
Decorator to register a |
|
Run the process with the supplied inputs in a local runner that will block until the process is completed. |
|
Run the process with the supplied inputs in a local runner that will block until the process is completed. |
|
Run the process with the supplied inputs in a local runner that will block until the process is completed. |
|
Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter. |
|
A decorator to turn a standard python function into a workfunction. Example usage: |
Data#
External#
|
|
|
|
|
|
|
API#
- class aiida.engine.AiiDAPersister#
Bases:
plumpy.persistence.Persister
Persister to take saved process instance states and persisting them to the database.
- save_checkpoint(process: aiida.engine.processes.process.Process, tag: Optional[str] = None)#
Persist a Process instance.
- Parameters:
process –
aiida.engine.Process
tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process
- Raises:
PersistenceError
Raised if there was a problem saving the checkpoint
- load_checkpoint(pid: Hashable, tag: Optional[str] = None) plumpy.persistence.Bundle #
Load a process from a persisted checkpoint by its process id.
- Parameters:
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint
- Returns:
a bundle with the process state
- Return type:
plumpy.Bundle
- Raises:
PersistenceError
Raised if there was a problem loading the checkpoint
- get_checkpoints()#
Return a list of all the current persisted process checkpoints
- Returns:
list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.
- get_process_checkpoints(pid: Hashable)#
Return a list of all the current persisted process checkpoints for the specified process.
- Parameters:
pid – the process pid
- Returns:
list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.
- delete_checkpoint(pid: Hashable, tag: Optional[str] = None) None #
Delete a persisted process checkpoint, where no error will be raised if the checkpoint does not exist.
- Parameters:
pid – the process id of the
plumpy.Process
tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint
- delete_process_checkpoints(pid: Hashable)#
Delete all persisted checkpoints related to the given process id.
- Parameters:
pid – the process id of the
aiida.engine.processes.process.Process
- class aiida.engine.Awaitable#
Bases:
plumpy.utils.AttributesDict
An attribute dictionary that represents an action that a Process could be waiting for to finish.
- class aiida.engine.AwaitableAction#
Bases:
enum.Enum
Enum that describes the action to be taken for a given awaitable.
- ASSIGN = 'assign'#
- APPEND = 'append'#
- class aiida.engine.AwaitableTarget#
Bases:
enum.Enum
Enum that describes the class of the target a given awaitable.
- PROCESS = 'process'#
- class aiida.engine.BaseRestartWorkChain(*args, **kwargs)#
Bases:
aiida.engine.processes.workchains.workchain.WorkChain
Base restart work chain.
This work chain serves as the starting point for more complex work chains that will be designed to run a sub process that might need multiple restarts to come to a successful end. These restarts may be necessary because a single process run is not sufficient to achieve a fully converged result, or certain errors maybe encountered which are recoverable.
This work chain implements the most basic functionality to achieve this goal. It will launch the sub process, restarting until it is completed successfully or the maximum number of iterations is reached. After completion of the sub process it will be inspected, and a list of process handlers are called successively. These process handlers are defined as class methods that are decorated with
process_handler()
.The idea is to sub class this work chain and leverage the generic error handling that is implemented in the few outline methods. The minimally required outline would look something like the following:
cls.setup while_(cls.should_run_process)( cls.run_process, cls.inspect_process, )
Each of these methods can of course be overriden but they should be general enough to fit most process cycles. The run_process method will take the inputs for the process from the context under the key inputs. The user should, therefore, make sure that before the run_process method is called, that the to be used inputs are stored under self.ctx.inputs. One can update the inputs based on the results from a prior process by calling an outline method just before the run_process step, for example:
cls.setup while_(cls.should_run_process)( cls.prepare_inputs, cls.run_process, cls.inspect_process, )
Where in the prepare_calculation method, the inputs dictionary at self.ctx.inputs is updated before the next process will be run with those inputs.
The _process_class attribute should be set to the Process class that should be run in the loop. Finally, to define handlers that will be called during the inspect_process simply define a class method with the signature (self, node) and decorate it with the process_handler decorator, for example:
@process_handler def handle_problem(self, node): if some_problem: self.ctx.inputs = improved_inputs return ProcessHandlerReport()
The process_handler and ProcessHandlerReport support various arguments to control the flow of the logic of the inspect_process. Refer to their respective documentation for details.
Initialization
Construct the instance.
- _considered_handlers_extra = 'considered_handlers'#
- property process_class: Type[aiida.engine.processes.process.Process]#
Return the process class to run in the loop.
- classmethod define(spec: aiida.engine.processes.ProcessSpec) None #
Define the process specification.
- setup() None #
Initialize context variables that are used during the logical flow of the BaseRestartWorkChain.
- should_run_process() bool #
Return whether a new process should be run.
This is the case as long as the last process has not finished successfully and the maximum number of restarts has not yet been exceeded.
- run_process() aiida.engine.processes.workchains.context.ToContext #
Run the next process, taking the input dictionary from the context at self.ctx.inputs.
- inspect_process() Optional[aiida.engine.processes.ExitCode] #
Analyse the results of the previous process and call the handlers when necessary.
If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called in order of their specified priority. If the process was failed and no handler returns a report indicating that the error was handled, it is considered an unhandled process failure and the process is relaunched. If this happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the following matrix determines the logic that is followed:
Process Handler Handler Action result report? exit code —————————————– Success yes == 0 Restart Success yes != 0 Abort Failed yes == 0 Restart Failed yes != 0 Abort
If no handler returned a report and the process finished successfully, the work chain’s work is considered done and it will move on to the next step that directly follows the while conditional, if there is one defined in the outline.
- get_outputs(node) Mapping[str, aiida.orm.Node] #
Return a mapping of the outputs that should be attached as outputs to the work chain.
By default this method returns the outputs of the last completed calculation job. This method can be overridden if the implementation wants to update those outputs before attaching them. Make sure that if the content of an output node is modified that this is done through a calcfunction in order to not lose the provenance.
- results() Optional[aiida.engine.processes.ExitCode] #
Attach the outputs specified in the output specification from the last completed process.
- classmethod is_process_handler(process_handler_name: Union[str, types.FunctionType]) bool #
Return whether the given method name corresponds to a process handler of this class.
- Parameters:
process_handler_name – string name of the instance method
- Returns:
boolean, True if corresponds to process handler, False otherwise
- get_process_handlers_by_priority() List[Tuple[int, types.FunctionType]] #
Return list of process handlers where overrides from
inputs.handler_overrides
are taken into account.
- on_terminated()#
Clean the working directories of all child calculation jobs if clean_workdir=True in the inputs.
- _wrap_bare_dict_inputs(port_namespace: aiida.engine.processes.PortNamespace, inputs: Dict[str, Any]) aiida.common.AttributeDict #
Wrap bare dictionaries in inputs in a Dict node if dictated by the corresponding inputs portnamespace.
- Parameters:
port_namespace – a PortNamespace
inputs – a dictionary of inputs intended for submission of the process
- Returns:
an attribute dictionary with all bare dictionaries wrapped in Dict if dictated by the port namespace
- class aiida.engine.CalcJob(*args, **kwargs)#
Bases:
aiida.engine.processes.process.Process
Implementation of the CalcJob process.
Initialization
Construct a CalcJob instance.
Construct the instance only if it is a sub class of CalcJob, otherwise raise InvalidOperation.
See documentation of
aiida.engine.Process
.- _node_class = None#
- _spec_class = None#
- classmethod define(spec: aiida.engine.processes.process_spec.CalcJobProcessSpec) None #
Define the process specification, including its inputs, outputs and known exit codes.
Ports are added to the metadata input namespace (inherited from the base Process), and a code input Port, a remote_folder output Port and retrieved folder output Port are added.
- Parameters:
spec – the calculation job process spec to define.
- spec_options()#
Return the metadata options port namespace of the process specification of this process.
- Returns:
options dictionary
- Return type:
- classmethod get_importer(entry_point_name: str | None = None) aiida.engine.processes.calcjobs.importer.CalcJobImporter #
Load the CalcJobImporter associated with this CalcJob if it exists.
By default an importer with the same entry point as the
CalcJob
will be loaded, however, this can be overridden using theentry_point_name
argument.- Parameters:
entry_point_name – optional entry point name of a
CalcJobImporter
to override the default.- Returns:
the loaded
CalcJobImporter
.- Raises:
if no importer class could be loaded.
- property options: aiida.common.AttributeDict#
Return the options of the metadata that were specified when this process instance was launched.
- Returns:
options dictionary
- classmethod get_state_classes() Dict[Hashable, Type[plumpy.process_states.State]] #
A mapping of the State constants to the corresponding state class.
Overrides the waiting state with the Calcjob specific version.
- property node: aiida.orm.CalcJobNode#
- on_terminated() None #
Cleanup the node by deleting the calulation job state.
Note
This has to be done before calling the super because that will seal the node after we cannot change it
- run() Union[plumpy.process_states.Stop, int, plumpy.process_states.Wait] #
Run the calculation job.
This means invoking the presubmit and storing the temporary folder in the node’s repository. Then we move the process in the Wait state, waiting for the UPLOAD transport task to be started.
- Returns:
the Stop command if a dry run, int if the process has an exit status, Wait command if the calcjob is to be uploaded
- abstract prepare_for_submission(folder: aiida.common.folders.Folder) aiida.common.datastructures.CalcInfo #
Prepare the calculation for submission.
Convert the input nodes into the corresponding input files in the format that the code will expect. In addition, define and return a CalcInfo instance, which is a simple data structure that contains information for the engine, for example, on what files to copy to the remote machine, what files to retrieve once it has completed, specific scheduler settings and more.
- Parameters:
folder – a temporary folder on the local file system.
- Returns:
the CalcInfo instance
- _setup_inputs() None #
Create the links between the input nodes and the ProcessNode that represents this process.
- _perform_dry_run()#
Perform a dry run.
Instead of performing the normal sequence of steps, just the presubmit is called, which will call the method prepare_for_submission of the plugin to generate the input files based on the inputs. Then the upload action is called, but using a normal local transport that will copy the files to a local sandbox folder. The generated input script and the absolute path to the sandbox folder are stored in the dry_run_info attribute of the node of this process.
- _perform_import()#
Perform the import of an already completed calculation.
The inputs contained a RemoteData under the key remote_folder signalling that this is not supposed to be run as a normal calculation job, but rather the results are already computed outside of AiiDA and merely need to be imported.
- parse(retrieved_temporary_folder: Optional[str] = None, existing_exit_code: aiida.engine.processes.exit_code.ExitCode | None = None) aiida.engine.processes.exit_code.ExitCode #
Parse a retrieved job calculation.
This is called once it’s finished waiting for the calculation to be finished and the data has been retrieved.
- Parameters:
retrieved_temporary_folder – The path to the temporary folder
- static terminate(exit_code: aiida.engine.processes.exit_code.ExitCode) aiida.engine.processes.exit_code.ExitCode #
Terminate the process immediately and return the given exit code.
This method is called by
aiida.engine.processes.calcjobs.tasks.Waiting.execute()
if a monitor triggered the job to be terminated and specified the parsing to be skipped. It will construct the running state and tell this method to be run, which returns the given exit code which will cause the process to be terminated.- Parameters:
exit_code – The exit code to return.
- Returns:
The provided exit code.
- parse_scheduler_output(retrieved: aiida.orm.Node) Optional[aiida.engine.processes.exit_code.ExitCode] #
Parse the output of the scheduler if that functionality has been implemented for the plugin.
- parse_retrieved_output(retrieved_temporary_folder: Optional[str] = None) Optional[aiida.engine.processes.exit_code.ExitCode] #
Parse the retrieved data by calling the parser plugin if it was defined in the inputs.
- presubmit(folder: aiida.common.folders.Folder) aiida.common.datastructures.CalcInfo #
Prepares the calculation folder with all inputs, ready to be copied to the cluster.
- Parameters:
folder – a SandboxFolder that can be used to write calculation input files and the scheduling script.
- Return calcinfo:
the CalcInfo object containing the information needed by the daemon to handle operations.
- class aiida.engine.CalcJobImporter#
Bases:
abc.ABC
An abstract class, to define an importer for computations completed outside of AiiDA.
This class is used to import the results of a calculation that was completed outside of AiiDA. The importer is responsible for parsing the output files of the calculation and creating the corresponding AiiDA nodes.
- abstract static parse_remote_data(remote_data: aiida.orm.RemoteData, **kwargs) Dict[str, Union[aiida.orm.Node, Dict]] #
Parse the input nodes from the files in the provided
RemoteData
.- Parameters:
remote_data – the remote data node containing the raw input files.
kwargs – additional keyword arguments to control the parsing process.
- Returns:
a dictionary with the parsed inputs nodes that match the input spec of the associated
CalcJob
.
- class aiida.engine.CalcJobOutputPort(*args, **kwargs)#
Bases:
plumpy.ports.OutputPort
Sub class of plumpy.OutputPort which adds the _pass_to_parser attribute.
Initialization
- class aiida.engine.CalcJobProcessSpec#
Bases:
aiida.engine.processes.process_spec.ProcessSpec
Process spec intended for the CalcJob process class.
Initialization
- OUTPUT_PORT_TYPE = None#
- class aiida.engine.DaemonClient(profile: aiida.manage.configuration.profile.Profile)#
Client to interact with the daemon.
Initialization
Construct an instance for a given profile.
- Parameters:
profile – The profile instance.
- DAEMON_ERROR_NOT_RUNNING = 'daemon-error-not-running'#
- DAEMON_ERROR_TIMEOUT = 'daemon-error-timeout'#
- _DAEMON_NAME = 'aiida-{name}'#
- _ENDPOINT_PROTOCOL = None#
- property profile: aiida.manage.configuration.profile.Profile#
- property _verdi_bin: str#
Return the absolute path to the
verdi
binary.- Raises:
ConfigurationError – If the path to
verdi
could not be found
- cmd_start_daemon(number_workers: int = 1, foreground: bool = False) list[str] #
Return the command to start the daemon.
- Parameters:
number_workers – Number of daemon workers to start.
foreground – Whether to launch the subprocess in the background or not.
- get_circus_port() int #
Retrieve the port for the circus controller, which should be written to the circus port file.
If the daemon is running, the port file should exist and contain the port to which the controller is connected. If it cannot be read, a RuntimeError will be thrown. If the daemon is not running, an available port will be requested from the operating system, written to the port file and returned.
- Returns:
The port for the circus controller.
- static get_env() dict[str, str] #
Return the environment for this current process.
This method is used to pass variables from the environment of the current process to a subprocess that is spawned when the daemon or a daemon worker is started.
It replicates the
PATH
,PYTHONPATH` and the ``AIIDA_PATH
environment variables. ThePYTHONPATH
variable ensures that all Python modules that can be imported by the parent process, are also importable by the subprocess. TheAIIDA_PATH
variable ensures that the subprocess will use the same AiiDA configuration directory as used by the current process.
- get_circus_socket_directory() str #
Retrieve the absolute path of the directory where the circus sockets are stored.
If the daemon is running, the sockets file should exist and contain the absolute path of the directory that contains the sockets of the circus endpoints. If it cannot be read, a
RuntimeError
will be thrown. If the daemon is not running, a temporary directory will be created and its path will be written to the sockets file and returned.Note
A temporary folder needs to be used for the sockets because UNIX limits the filepath length to 107 bytes. Placing the socket files in the AiiDA config folder might seem like the more logical choice but that folder can be placed in an arbitrarily nested directory, the socket filename will exceed the limit. The solution is therefore to always store them in the temporary directory of the operation system whose base path is typically short enough as to not exceed the limit
- Returns:
The absolute path of directory to write the sockets to.
- get_daemon_pid() int | None #
Get the daemon pid which should be written in the daemon pid file specific to the profile.
- Returns:
The pid of the circus daemon process or None if not found.
- property is_daemon_running: bool#
Return whether the daemon is running, which is determined by seeing if the daemon pid file is present.
- Returns:
True if daemon is running, False otherwise.
- delete_circus_socket_directory() None #
Attempt to delete the directory used to store the circus endpoint sockets.
Will not raise if the directory does not exist.
- classmethod get_available_port()#
Get an available port from the operating system.
- Returns:
A currently available port.
- get_controller_endpoint()#
Get the endpoint string for the circus controller.
For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol an available port will be found and saved in the profile specific port file.
- Returns:
The endpoint string.
- get_pubsub_endpoint()#
Get the endpoint string for the circus pubsub endpoint.
For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol any available port will be used.
- Returns:
The endpoint string.
- get_stats_endpoint()#
Get the endpoint string for the circus stats endpoint.
For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol any available port will be used.
- Returns:
The endpoint string.
- get_ipc_endpoint(endpoint)#
Get the ipc endpoint string for a circus daemon endpoint for a given socket.
- Parameters:
endpoint – The circus endpoint for which to return a socket.
- Returns:
The ipc endpoint string.
- get_tcp_endpoint(port=None)#
Get the tcp endpoint string for a circus daemon endpoint.
If the port is unspecified, the operating system will be asked for a currently available port.
- Parameters:
port – A port to use for the endpoint.
- Returns:
The tcp endpoint string.
- get_client() circus.client.CircusClient #
Return an instance of the CircusClient.
The endpoint is defined by the controller endpoint, which used the port that was written to the port file upon starting of the daemon.
- Returns:
CircusClient instance
- call_client(command: aiida.engine.daemon.client.JsonDictType) aiida.engine.daemon.client.JsonDictType #
Call the client with a specific command.
Will check whether the daemon is running first by checking for the pid file. When the pid is found yet the call still fails with a timeout, this means the daemon was actually not running and it was terminated unexpectedly causing the pid file to not be cleaned up properly.
- Parameters:
command – Command to call the circus client with.
- Returns:
The result of the circus client call.
- get_status() aiida.engine.daemon.client.JsonDictType #
Get the daemon running status.
- Returns:
The client call response. If successful, will will contain ‘status’ key.
- get_numprocesses() aiida.engine.daemon.client.JsonDictType #
Get the number of running daemon processes.
- Returns:
The client call response. If successful, will contain ‘numprocesses’ key.
- get_worker_info() aiida.engine.daemon.client.JsonDictType #
Get workers statistics for this daemon.
- Returns:
The client call response. If successful, will contain ‘info’ key.
- get_daemon_info() aiida.engine.daemon.client.JsonDictType #
Get statistics about this daemon itself.
- Returns:
The client call response. If successful, will contain ‘info’ key.
- increase_workers(number: int) aiida.engine.daemon.client.JsonDictType #
Increase the number of workers.
- Parameters:
number – The number of workers to add.
- Returns:
The client call response.
- decrease_workers(number: int) aiida.engine.daemon.client.JsonDictType #
Decrease the number of workers.
- Parameters:
number – The number of workers to remove.
- Returns:
The client call response.
- stop_daemon(wait: bool = True, timeout: int = 5) aiida.engine.daemon.client.JsonDictType #
Stop the daemon.
- Parameters:
wait – Boolean to indicate whether to wait for the result of the command.
timeout – Wait this number of seconds for the
is_daemon_running
to returnFalse
before raising.
- Returns:
The client call response.
- Raises:
DaemonException – If
is_daemon_running
returnsTrue
after thetimeout
has passed.
- restart_daemon(wait: bool) aiida.engine.daemon.client.JsonDictType #
Restart the daemon.
- Parameters:
wait – Boolean to indicate whether to wait for the result of the command.
- Returns:
The client call response.
- start_daemon(number_workers: int = 1, foreground: bool = False, timeout: int = 5) None #
Start the daemon in a sub process running in the background.
- Parameters:
number_workers – Number of daemon workers to start.
foreground – Whether to launch the subprocess in the background or not.
timeout – Wait this number of seconds for the
is_daemon_running
to returnTrue
before raising.
- Raises:
DaemonException – If the daemon fails to start.
DaemonException – If the daemon starts but then is unresponsive or in an unexpected state.
DaemonException – If
is_daemon_running
returnsFalse
after thetimeout
has passed.
- static _await_condition(condition: Callable, exception: Exception, timeout: int = 5, interval: float = 0.1)#
Await a condition to evaluate to
True
or raise the exception if the timeout is reached.- Parameters:
condition – A callable that is waited for to return
True
.exception – Raise this exception if
condition
does not returnTrue
aftertimeout
seconds.timeout – Wait this number of seconds for
condition
to returnTrue
before raising.interval – The time in seconds to wait between invocations of
condition
.
- Raises:
The exception provided by
exception
if timeout is reached.
- _start_daemon(number_workers: int = 1, foreground: bool = False) None #
Start the daemon.
Warning
This will daemonize the current process and put it in the background. It is most likely not what you want to call if you want to start the daemon from the Python API. Instead you probably will want to use the
aiida.engine.daemon.client.DaemonClient.start_daemon()
function instead.- Parameters:
number_workers – Number of daemon workers to start.
foreground – Whether to launch the subprocess in the background or not.
- class aiida.engine.ExitCode#
Bases:
typing.NamedTuple
A simple data class to define an exit code for a
Process
.When an instance of this class is returned from a Process._run() call, it will be interpreted that the Process should be terminated and that the exit status and message of the namedtuple should be set to the corresponding attributes of the node.
- Parameters:
status – positive integer exit status, where a non-zero value indicated the process failed, default is 0
message – optional message with more details about the failure mode
invalidates_cache – optional flag, indicating that a process should not be used in caching
- format(**kwargs: str) aiida.engine.processes.exit_code.ExitCode #
Create a clone of this exit code where the template message is replaced by the keyword arguments.
- Parameters:
kwargs – replacement parameters for the template message
- class aiida.engine.ExitCodesNamespace#
Bases:
aiida.common.extendeddicts.AttributeDict
A namespace of ExitCode instances that can be accessed through getattr as well as getitem.
Additionally, the collection can be called with an identifier, that can either reference the integer status of the ExitCode that needs to be retrieved or the key in the collection.
- __call__(identifier: Union[int, str]) aiida.engine.processes.exit_code.ExitCode #
Return a specific exit code identified by either its exit status or label.
- Parameters:
identifier – the identifier of the exit code. If the type is integer, it will be interpreted as the exit code status, otherwise it be interpreted as the exit code label
- Returns:
an ExitCode instance
- Raises:
ValueError – if no exit code with the given label is defined for this process
- class aiida.engine.FunctionProcess(*args, **kwargs)#
Bases:
aiida.engine.processes.process.Process
Function process class used for turning functions into a Process
Initialization
- static _func(*_args, **_kwargs) dict #
This is used internally to store the actual function that is being wrapped and will be replaced by the build method.
- static build(func: Callable[..., Any], node_class: Type[aiida.orm.ProcessNode]) Type[aiida.engine.processes.functions.FunctionProcess] #
Build a Process from the given function.
All function arguments will be assigned as process inputs. If keyword arguments are specified then these will also become inputs.
- Parameters:
func – The function to build a process from
node_class – Provide a custom node class to be used, has to be constructable with no arguments. It has to be a sub class of ProcessNode and the mixin
FunctionCalculationMixin
.
- Returns:
A Process class that represents the function
- classmethod validate_inputs(*args: Any, **kwargs: Any) None #
Validate the positional and keyword arguments passed in the function call.
- Raises:
TypeError – if more positional arguments are passed than the function defines
- classmethod create_inputs(*args: Any, **kwargs: Any) Dict[str, Any] #
Create the input args for the FunctionProcess.
- classmethod args_to_dict(*args: Any) Dict[str, Any] #
Create an input dictionary (of form label -> value) from supplied args.
- Parameters:
args – The values to use for the dictionary
- Returns:
A label -> value dictionary
- classmethod get_or_create_db_record() aiida.orm.ProcessNode #
- property process_class: Callable[..., Any]#
Return the class that represents this Process, for the FunctionProcess this is the function itself.
For a standard Process or sub class of Process, this is the class itself. However, for legacy reasons, the Process class is a wrapper around another class. This function returns that original class, i.e. the class that really represents what was being executed.
- Returns:
A Process class that represents the function
- run() Optional[aiida.engine.processes.exit_code.ExitCode] #
Run the process.
- class aiida.engine.InputPort(*args, **kwargs)#
Bases:
aiida.engine.processes.ports.WithSerialize
,aiida.engine.processes.ports.WithNonDb
,plumpy.ports.InputPort
Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic value serialization to database storable types and support non database storable input types as well.
Initialization
Override the constructor to check the type of the default if set and warn if not immutable.
- class aiida.engine.InterruptableFuture(*, loop=None)#
Bases:
asyncio.Future
A future that can be interrupted by calling interrupt.
Initialization
Initialize the future.
The optional event_loop argument allows explicitly setting the event loop object used by the future. If it’s not provided, the future uses the default event loop.
- interrupt(reason: Exception) None #
This method should be called to interrupt the coroutine represented by this InterruptableFuture.
- async with_interrupt(coro: Awaitable[Any]) Any #
return result of a coroutine which will be interrupted if this future is interrupted
import asyncio loop = asyncio.get_event_loop() interruptable = InterutableFuture() loop.call_soon(interruptable.interrupt, RuntimeError("STOP")) loop.run_until_complete(interruptable.with_interrupt(asyncio.sleep(2.))) >>> RuntimeError: STOP
- Parameters:
coro – The coroutine that can be interrupted
- Returns:
The result of the coroutine
- class aiida.engine.JobManager(transport_queue: aiida.engine.transports.TransportQueue)#
A manager for
CalcJob
submitted toComputer
instances.When a calculation job is submitted to a
Computer
, it actually uses a specificAuthInfo
, which is a computer configured for aUser
. TheJobManager
maintains a mapping ofJobsList
instances for each authinfo that has active calculation jobs. These jobslist instances are then responsible for bundling scheduler updates for all the jobs they maintain (i.e. that all share the same authinfo) and update their status.As long as a
Runner
will create a singleJobManager
instance and use that for its lifetime, the guarantees made by theJobsList
about respecting the minimum polling interval of the scheduler will be maintained. Note, however, that since eachRunner
will create its own job manager, these guarantees only hold per runner.Initialization
- get_jobs_list(authinfo: aiida.orm.AuthInfo) aiida.engine.processes.calcjobs.manager.JobsList #
Get or create a new JobLists instance for the given authinfo.
- Parameters:
authinfo – the AuthInfo
- Returns:
a JobsList instance
- request_job_info_update(authinfo: aiida.orm.AuthInfo, job_id: Hashable) Iterator[asyncio.Future[JobInfo]] #
Get a future that will resolve to information about a given job.
This is a context manager so that if the user leaves the context the request is automatically cancelled.
- class aiida.engine.JobsList(authinfo: aiida.orm.AuthInfo, transport_queue: aiida.engine.transports.TransportQueue, last_updated: Optional[float] = None)#
Manager of calculation jobs submitted with a specific
AuthInfo
, i.e. computer configured for a specific user.This container of active calculation jobs is used to update their status periodically in batches, ensuring that even when a lot of jobs are running, the scheduler update command is not triggered for each job individually.
In addition, the
Computer
for which theAuthInfo
is configured, can define a minimum polling interval. This class will guarantee that the time between update calls to the scheduler is larger or equal to that minimum interval.Note that since each instance operates on a specific authinfo, the guarantees of batching scheduler update calls and the limiting of number of calls per unit time, through the minimum polling interval, is only applicable for jobs launched with that particular authinfo. If multiple authinfo instances with the same computer, have active jobs these limitations are not respected between them, since there is no communication between
JobsList
instances. See theJobManager
for example usage.Initialization
Construct an instance for the given authinfo and transport queue.
- Parameters:
authinfo – The authinfo used to check the jobs list
transport_queue – A transport queue
last_updated – initialize the last updated timestamp
- property logger: logging.Logger#
Return the logger configured for this instance.
- Returns:
the logger
- get_minimum_update_interval() float #
Get the minimum interval that should be respected between updates of the list.
- Returns:
the minimum interval
- property last_updated: Optional[float]#
Get the timestamp of when the list was last updated as produced by time.time()
- Returns:
The last update point
- async _get_jobs_from_scheduler() Dict[Hashable, aiida.schedulers.datastructures.JobInfo] #
Get the current jobs list from the scheduler.
- Returns:
a mapping of job ids to
JobInfo
instances
- async _update_job_info() None #
Update all of the job information objects.
This will set the futures for all pending update requests where the corresponding job has a new status compared to the last update.
- request_job_info_update(job_id: Hashable) Iterator[asyncio.Future[JobInfo]] #
Request job info about a job when the job next changes state.
If the job is not found in the jobs list at the update, the future will resolve to None.
- Parameters:
job_id – job identifier
- Returns:
future that will resolve to a JobInfo object when the job changes state
- _ensure_updating() None #
Ensure that we are updating the job list from the remote resource.
This will automatically stop if there are no outstanding requests.
- static _has_job_state_changed(old: Optional[aiida.schedulers.datastructures.JobInfo], new: Optional[aiida.schedulers.datastructures.JobInfo]) bool #
Return whether the states old and new are different.
- _get_next_update_delay() float #
Calculate when we are next allowed to poll the scheduler.
This delay is calculated as the minimum polling interval defined by the authentication info for this instance, minus time elapsed since the last update.
- Returns:
delay (in seconds) after which the scheduler may be polled again
- class aiida.engine.ObjectLoader#
Bases:
plumpy.loaders.DefaultObjectLoader
Custom object loader for aiida-core.
- load_object(identifier: str) Any #
Attempt to load the object identified by the given identifier.
Note
We override the plumpy.DefaultObjectLoader to be able to throw an ImportError instead of a ValueError which in the context of aiida-core is not as apt, since we are loading classes.
- Parameters:
identifier – concatenation of module and resource name
- Returns:
loaded object
- Raises:
ImportError – if the object cannot be loaded
- aiida.engine.OutputPort = None#
- aiida.engine.PORT_NAMESPACE_SEPARATOR = '__'#
- class aiida.engine.PastException#
Bases:
aiida.common.exceptions.AiidaException
Raised when an attempt is made to continue a Process that has already excepted before.
- class aiida.engine.PortNamespace(*args, **kwargs)#
Bases:
aiida.engine.processes.ports.WithNonDb
,plumpy.ports.PortNamespace
Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive serialization of a given mapping onto the ports of the PortNamespace.
Initialization
- __setitem__(key: str, port: plumpy.ports.Port) None #
Ensure that a Port being added inherits the non_db attribute if not explicitly defined at construction.
The reasoning is that if a PortNamespace has non_db=True, which is different from the default value, very often all leaves should be also non_db=True. To prevent a user from having to specify it manually everytime we overload the value here, unless it was specifically set during construction.
Note that the non_db attribute is not present for all Port sub classes so we have to check for it first.
- static validate_port_name(port_name: str) None #
Validate the given port name.
Valid port names adhere to the following restrictions:
Is a valid link label (see below)
Does not contain two or more consecutive underscores
Valid link labels adhere to the following restrictions:
Has to be a valid python identifier
Can only contain alphanumeric characters and underscores
Can not start or end with an underscore
- Parameters:
port_name – the proposed name of the port to be added
- Raises:
TypeError – if the port name is not a string type
ValueError – if the port name is invalid
- serialize(mapping: Optional[Dict[str, Any]], breadcrumbs: Sequence[str] = ()) Optional[Dict[str, Any]] #
Serialize the given mapping onto this Portnamespace.
It will recursively call this function on any nested PortNamespace or the serialize function on any Ports.
- Parameters:
mapping – a mapping of values to be serialized
breadcrumbs – a tuple with the namespaces of parent namespaces
- Returns:
the serialized mapping
- class aiida.engine.Process(inputs: Optional[Dict[str, Any]] = None, logger: Optional[logging.Logger] = None, runner: Optional[aiida.engine.runners.Runner] = None, parent_pid: Optional[int] = None, enable_persistence: bool = True)#
Bases:
plumpy.processes.Process
This class represents an AiiDA process which can be executed and will have full provenance saved in the database.
Initialization
Process constructor.
- Parameters:
inputs – process inputs
logger – aiida logger
runner – process runner
parent_pid – id of parent process
enable_persistence – whether to persist this process
- _node_class = None#
- _spec_class = None#
- classmethod spec() aiida.engine.processes.process_spec.ProcessSpec #
- classmethod define(spec: aiida.engine.processes.process_spec.ProcessSpec) None #
Define the specification of the process, including its inputs, outputs and known exit codes.
A metadata input namespace is defined, with optional ports that are not stored in the database.
- classmethod get_builder() aiida.engine.processes.builder.ProcessBuilder #
- classmethod get_or_create_db_record() aiida.orm.ProcessNode #
Create a process node that represents what happened in this process.
- Returns:
A process node
- classmethod get_exit_statuses(exit_code_labels: Iterable[str]) List[int] #
Return the exit status (integers) for the given exit code labels.
- Parameters:
exit_code_labels – a list of strings that reference exit code labels of this process class
- Returns:
list of exit status integers that correspond to the given exit code labels
- Raises:
AttributeError – if at least one of the labels does not correspond to an existing exit code
- exit_codes() aiida.engine.processes.exit_code.ExitCodesNamespace #
Return the namespace of exit codes defined for this WorkChain through its ProcessSpec.
The namespace supports getitem and getattr operations with an ExitCode label to retrieve a specific code. Additionally, the namespace can also be called with either the exit code integer status to retrieve it.
- Returns:
ExitCodesNamespace of ExitCode named tuples
- spec_metadata() aiida.engine.processes.ports.PortNamespace #
Return the metadata port namespace of the process specification of this process.
- property node: aiida.orm.ProcessNode#
Return the ProcessNode used by this process to represent itself in the database.
- Returns:
instance of sub class of ProcessNode
- property uuid: str#
Return the UUID of the process which corresponds to the UUID of its associated ProcessNode.
- Returns:
the UUID associated to this process instance
- property metadata: aiida.common.extendeddicts.AttributeDict#
Return the metadata that were specified when this process instance was launched.
- Returns:
metadata dictionary
- _save_checkpoint() None #
Save the current state in a chechpoint if persistence is enabled and the process state is not terminal
If the persistence call excepts with a PersistenceError, it will be caught and a warning will be logged.
- save_instance_state(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext]) None #
Save instance state.
See documentation of
plumpy.processes.Process.save_instance_state()
.
- get_provenance_inputs_iterator() Iterator[Tuple[str, Union[aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace]]] #
Get provenance input iterator.
- Return type:
filter
- load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) None #
Load instance state.
- Parameters:
saved_state – saved instance state
load_context –
- kill(msg: Union[str, None] = None) Union[bool, plumpy.futures.Future] #
Kill the process and all the children calculations it called
- Parameters:
msg – message
- out(output_port: str, value: Any = None) None #
Attach output to output port.
The name of the port will be used as the link label.
- Parameters:
output_port – name of output port
value – value to put inside output port
- out_many(out_dict: Dict[str, Any]) None #
Attach outputs to multiple output ports.
Keys of the dictionary will be used as output port names, values as outputs.
- Parameters:
out_dict (dict) – output dictionary
- on_entered(from_state: Optional[plumpy.process_states.State]) None #
After entering a new state, save a checkpoint and update the latest process state change timestamp.
- on_except(exc_info: Tuple[Any, Exception, types.TracebackType]) None #
Log the exception by calling the report method with formatted stack trace from exception info object and store the exception string as a node attribute
- Parameters:
exc_info – the sys.exc_info() object (type, value, traceback)
- on_finish(result: Union[int, aiida.engine.processes.exit_code.ExitCode], successful: bool) None #
Set the finish status on the process node.
- Parameters:
result – result of the process
successful – whether execution was successful
- on_paused(msg: Optional[str] = None) None #
The Process was paused so set the paused attribute on the process node
- Parameters:
msg – message
- on_output_emitting(output_port: str, value: Any) None #
The process has emitted a value on the given output port.
- Parameters:
output_port – The output port name the value was emitted on
value – The value emitted
- set_status(status: Optional[str]) None #
The status of the Process is about to be changed, so we reflect this is in node’s attribute proxy.
- Parameters:
status – the status message
- submit(process: Type[aiida.engine.processes.process.Process], **kwargs) aiida.orm.ProcessNode #
Submit process for execution.
- Parameters:
process – process
- Returns:
the calculation node of the process
- property runner: aiida.engine.runners.Runner#
Get process runner.
- get_parent_calc() Optional[aiida.orm.ProcessNode] #
Get the parent process node
- Returns:
the parent process node if there is one
- classmethod build_process_type() str #
The process type.
- Returns:
string of the process type
Note: This could be made into a property ‘process_type’ but in order to have it be a property of the class it would need to be defined in the metaclass, see https://bugs.python.org/issue20659
- report(msg: str, *args, **kwargs) None #
Log a message to the logger, which should get saved to the database through the attached DbLogHandler.
The pk, class name and function name of the caller are prepended to the given message
- Parameters:
msg – message to log
args – args to pass to the log call
kwargs – kwargs to pass to the log call
- _create_and_setup_db_record() Union[int, uuid.UUID] #
Create and setup the database record for this process
- Returns:
the uuid or pk of the process
- encode_input_args(inputs: Dict[str, Any]) str #
Encode input arguments such that they may be saved in a Bundle
- Parameters:
inputs – A mapping of the inputs as passed to the process
- Returns:
The encoded (serialized) inputs
- decode_input_args(encoded: str) Dict[str, Any] #
Decode saved input arguments as they came from the saved instance state Bundle
- Parameters:
encoded – encoded (serialized) inputs
- Returns:
The decoded input args
- update_outputs() None #
Attach new outputs to the node since the last call.
Does nothing, if self.metadata.store_provenance is False.
- _build_process_label() str #
Construct the process label that should be set on
ProcessNode
instances for this process class.Note
By default this returns the name of the process class itself. It can be overridden by
Process
subclasses to provide a more specific label.- Returns:
The process label to use for
ProcessNode
instances.
- _setup_db_record() None #
Create the database record for this process and the links with respect to its inputs
This function will set various attributes on the node that serve as a proxy for attributes of the Process. This is essential as otherwise this information could only be introspected through the Process itself, which is only available to the interpreter that has it in memory. To make this data introspectable from any interpreter, for example for the command line interface, certain Process attributes are proxied through the calculation node.
In addition, the parent calculation will be setup with a CALL link if applicable and all inputs will be linked up as well.
- _setup_inputs() None #
Create the links between the input nodes and the ProcessNode that represents this process.
- _flat_inputs() Dict[str, Any] #
Return a flattened version of the parsed inputs dictionary.
The eventual keys will be a concatenation of the nested keys. Note that the metadata dictionary, if present, is not passed, as those are dealt with separately in _setup_metadata.
- Returns:
flat dictionary of parsed inputs
- _flat_outputs() Dict[str, Any] #
Return a flattened version of the registered outputs dictionary.
The eventual keys will be a concatenation of the nested keys.
- Returns:
flat dictionary of parsed outputs
- _flatten_inputs(port: Union[None, aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any, parent_name: str = '', separator: str = PORT_NAMESPACE_SEPARATOR) List[Tuple[str, Any]] #
Function that will recursively flatten the inputs dictionary, omitting inputs for ports that are marked as being non database storable
- Parameters:
port – port against which to map the port value, can be InputPort or PortNamespace
port_value – value for the current port, can be a Mapping
parent_name – the parent key with which to prefix the keys
separator – character to use for the concatenation of keys
- Returns:
flat list of inputs
- _flatten_outputs(port: Union[None, aiida.engine.processes.ports.OutputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any, parent_name: str = '', separator: str = PORT_NAMESPACE_SEPARATOR) List[Tuple[str, Any]] #
Function that will recursively flatten the outputs dictionary.
- Parameters:
port – port against which to map the port value, can be OutputPort or PortNamespace
port_value – value for the current port, can be a Mapping
parent_name – the parent key with which to prefix the keys
separator – character to use for the concatenation of keys
- Returns:
flat list of outputs
- exposed_inputs(process_class: Type[aiida.engine.processes.process.Process], namespace: Optional[str] = None, agglomerate: bool = True) aiida.common.extendeddicts.AttributeDict #
Gather a dictionary of the inputs that were exposed for a given Process class under an optional namespace.
- Parameters:
process_class – Process class whose inputs to try and retrieve
namespace – PortNamespace in which to look for the inputs
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched for inputs. Inputs in lower-lying namespaces take precedence.
- Returns:
exposed inputs
- exposed_outputs(node: aiida.orm.ProcessNode, process_class: Type[aiida.engine.processes.process.Process], namespace: Optional[str] = None, agglomerate: bool = True) aiida.common.extendeddicts.AttributeDict #
Return the outputs which were exposed from the
process_class
and emitted by the specificnode
- Parameters:
node – process node whose outputs to try and retrieve
namespace – Namespace in which to search for exposed outputs.
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched for outputs. Outputs in lower-lying namespaces take precedence.
- Returns:
exposed outputs
- static _get_namespace_list(namespace: Optional[str] = None, agglomerate: bool = True) List[Optional[str]] #
Get the list of namespaces in a given namespace.
- Parameters:
namespace – name space
agglomerate – If set to true, all parent namespaces of the given
namespace
will also be searched.
- Returns:
namespace list
- classmethod is_valid_cache(node: aiida.orm.ProcessNode) bool #
Check if the given node can be cached from.
Overriding this method allows
Process
sub-classes to modify when corresponding process nodes are considered as a cache.Warning
When overriding this method, make sure to return
False
at least in all cases whensuper()._node.base.caching.is_valid_cache(node)
returnsFalse
. Otherwise, theinvalidates_cache
keyword on exit codes may have no effect.
- class aiida.engine.ProcessBuilder(process_class: Type[aiida.engine.processes.process.Process])#
Bases:
aiida.engine.processes.builder.ProcessBuilderNamespace
A process builder that helps setting up the inputs for creating a new process.
Initialization
Construct a ProcessBuilder instance for the given Process class.
- Parameters:
process_class – the Process subclass
- property process_class: Type[aiida.engine.processes.process.Process]#
Return the process class for which this builder is constructed.
- class aiida.engine.ProcessBuilderNamespace(port_namespace: aiida.engine.processes.ports.PortNamespace)#
Bases:
collections.abc.MutableMapping
Input namespace for the ProcessBuilder.
Dynamically generates the getters and setters for the input ports of a given PortNamespace
Initialization
Dynamically construct the get and set properties for the ports of the given port namespace.
For each port in the given port namespace a get and set property will be constructed dynamically and added to the ProcessBuilderNamespace. The docstring for these properties will be defined by calling str() on the Port, which should return the description of the Port.
- Parameters:
port_namespace – the inputs PortNamespace for which to construct the builder
- __setattr__(attr: str, value: Any) None #
Assign the given value to the port with key attr.
Note
Any attributes without a leading underscore being set correspond to inputs and should hence be validated with respect to the corresponding input port from the process spec
- __repr__()#
- __dir__()#
- __iter__()#
- __len__()#
- __getitem__(item)#
- __setitem__(item, value)#
- __delitem__(item)#
- __delattr__(item)#
- _recursive_merge(dictionary, key, value)#
Recursively merge the contents of
dictionary
setting itskey
tovalue
.
- _merge(*args, **kwds)#
Merge the content of a dictionary or keyword arguments in .
Note
This method differs in behavior from
_update
in that_merge
will recursively update the existing dictionary with the one that is specified in the arguments. The_update
method will merge only the keys on the top level, but any lower lying nested namespace will be replaced entirely.The method is prefixed with an underscore in order to not reserve the name for a potential port.
- Parameters:
args – a single mapping that should be mapped on the namespace.
kwds – keyword value pairs that should be mapped onto the ports.
- _update(*args, **kwds)#
Update the values of the builder namespace passing a mapping as argument or individual keyword value pairs.
The method functions just as collections.abc.MutableMapping.update and is merely prefixed with an underscore in order to not reserve the name for a potential port.
- Parameters:
args – a single mapping that should be mapped on the namespace.
kwds – keyword value pairs that should be mapped onto the ports.
- _inputs(prune: bool = False) dict #
Return the entire mapping of inputs specified for this builder.
- Parameters:
prune – boolean, when True, will prune nested namespaces that contain no actual values whatsoever
- Returns:
mapping of inputs ports and their input values.
- _prune(value)#
Prune a nested mapping from all mappings that are completely empty.
Note
a nested mapping that is completely empty means it contains at most other empty mappings. Other null values, such as None or empty lists, should not be pruned.
- Parameters:
value – a nested mapping of port values
- Returns:
the same mapping but without any nested namespace that is completely empty.
- class aiida.engine.ProcessFuture(pk: int, loop: Optional[asyncio.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, communicator: Optional[kiwipy.Communicator] = None)#
Bases:
asyncio.Future
Future that waits for a process to complete using both polling and listening for broadcast events if possible.
Initialization
Construct a future for a process node being finished.
If a None poll_interval is supplied polling will not be used. If a communicator is supplied it will be used to listen for broadcast messages.
- Parameters:
pk – process pk
loop – An event loop
poll_interval – optional polling interval, if None, polling is not activated.
communicator – optional communicator, if None, will not subscribe to broadcasts.
- _filtered = None#
- class aiida.engine.ProcessHandlerReport#
Bases:
typing.NamedTuple
A namedtuple to define a process handler report for a
aiida.engine.BaseRestartWorkChain
.This namedtuple should be returned by a process handler of a work chain instance if the condition of the handler was met by the completed process. If no further handling should be performed after this method the do_break field should be set to True. If the handler encountered a fatal error and the work chain needs to be terminated, an ExitCode with non-zero exit status can be set. This exit code is what will be set on the work chain itself. This works because the value of the exit_code field returned by the handler, will in turn be returned by the inspect_process step and returning a non-zero exit code from any work chain step will instruct the engine to abort the work chain.
- Parameters:
do_break – boolean, set to True if no further process handlers should be called, default is False
exit_code – an instance of the
ExitCode
tuple. If not explicitly set, the default ExitCode will be instantiated, which has status 0 meaning that the work chain step will be considered successful and the work chain will continue to the next step.
- exit_code: aiida.engine.processes.exit_code.ExitCode = 'ExitCode(...)'#
- class aiida.engine.ProcessSpec#
Bases:
plumpy.process_spec.ProcessSpec
Default process spec for process classes defined in aiida-core.
This sub class defines custom classes for input ports and port namespaces. It also adds support for the definition of exit codes and retrieving them subsequently.
Initialization
- INPUT_PORT_TYPE = None#
- PORT_NAMESPACE_TYPE = None#
- property exit_codes: aiida.engine.processes.exit_code.ExitCodesNamespace#
Return the namespace of exit codes defined for this ProcessSpec
- Returns:
ExitCodesNamespace of ExitCode named tuples
- exit_code(status: int, label: str, message: str, invalidates_cache: bool = False) None #
Add an exit code to the ProcessSpec
- Parameters:
status – the exit status integer
label – a label by which the exit code can be addressed
message – a more detailed description of the exit code
invalidates_cache – when set to True, a process exiting with this exit code will not be considered for caching
- property ports: aiida.engine.processes.ports.PortNamespace#
- property inputs: aiida.engine.processes.ports.PortNamespace#
- property outputs: aiida.engine.processes.ports.PortNamespace#
- class aiida.engine.Runner(poll_interval: Union[int, float] = 0, loop: Optional[asyncio.AbstractEventLoop] = None, communicator: Optional[kiwipy.Communicator] = None, rmq_submit: bool = False, persister: Optional[plumpy.persistence.Persister] = None)#
Class that can launch processes by running in the current interpreter or by submitting them to the daemon.
Initialization
Construct a new runner.
- Parameters:
poll_interval – interval in seconds between polling for status of active sub processes
loop – an asyncio event loop, if none is suppled a new one will be created
communicator – the communicator to use
rmq_submit – if True, processes will be submitted to RabbitMQ, otherwise they will be scheduled here
persister – the persister to use to persist processes
- __enter__() aiida.engine.runners.Runner #
- __exit__(exc_type, exc_val, exc_tb)#
- property loop: asyncio.AbstractEventLoop#
Get the event loop of this runner.
- property transport: aiida.engine.transports.TransportQueue#
- property plugin_version_provider: aiida.plugins.utils.PluginVersionProvider#
- property job_manager: aiida.engine.processes.calcjobs.manager.JobManager#
- property controller: Optional[plumpy.process_comms.RemoteProcessThreadController]#
Get the controller used by this runner.
- property is_daemon_runner: bool#
Return whether the runner is a daemon runner, which means it submits processes over RabbitMQ.
- Returns:
True if the runner is a daemon runner
- run_until_complete(future: asyncio.Future) Any #
Run the loop until the future has finished and return the result.
- instantiate_process(process: aiida.engine.runners.TYPE_RUN_PROCESS, **inputs)#
- submit(process: aiida.engine.runners.TYPE_SUBMIT_PROCESS, **inputs: Any)#
Submit the process with the supplied inputs to this runner immediately returning control to the interpreter. The return value will be the calculation node of the submitted process
- Parameters:
process – the process class to submit
inputs – the inputs to be passed to the process
- Returns:
the calculation node of the process
- schedule(process: aiida.engine.runners.TYPE_SUBMIT_PROCESS, *args: Any, **inputs: Any) aiida.orm.ProcessNode #
Schedule a process to be executed by this runner
- Parameters:
process – the process class to submit
inputs – the inputs to be passed to the process
- Returns:
the calculation node of the process
- _run(process: aiida.engine.runners.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Tuple[Dict[str, Any], aiida.orm.ProcessNode] #
Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and the calculation node
- run(process: aiida.engine.runners.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Dict[str, Any] #
Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
the outputs of the process
- run_get_node(process: aiida.engine.runners.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) aiida.engine.runners.ResultAndNode #
Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and the calculation node
- run_get_pk(process: aiida.engine.runners.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) aiida.engine.runners.ResultAndPk #
Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and process node pk
- call_on_process_finish(pk: int, callback: Callable[[], Any]) None #
Schedule a callback when the process of the given pk is terminated.
This method will add a broadcast subscriber that will listen for state changes of the target process to be terminated. As a fail-safe, a polling-mechanism is used to check the state of the process, should the broadcast message be missed by the subscriber, in order to prevent the caller to wait indefinitely.
- Parameters:
pk – pk of the process
callback – function to be called upon process termination
- get_process_future(pk: int) aiida.engine.processes.futures.ProcessFuture #
Return a future for a process.
The future will have the process node as the result when finished.
- Returns:
A future representing the completion of the process node
- _poll_process(node, callback)#
Check whether the process state of the node is terminated and call the callback or reschedule it.
- Parameters:
node – the process node
callback – callback to be called when process is terminated
- aiida.engine.ToContext = None#
- class aiida.engine.WithNonDb(*args, **kwargs)#
A mixin that adds support to a port to flag that it should not be stored in the database using the non_db=True flag.
The mixins have to go before the main port class in the superclass order to make sure the mixin has the chance to strip out the non_db keyword.
Initialization
- class aiida.engine.WithSerialize(*args, **kwargs)#
A mixin that adds support for a serialization function which is automatically applied on inputs that are not AiiDA data types.
Initialization
- serialize(value: Any) aiida.orm.Data #
Serialize the given value, unless it is
None
, already a Data type, or no serializer function is defined.- Parameters:
value – the value to be serialized
- Returns:
a serialized version of the value or the unchanged value
- class aiida.engine.WorkChain(inputs: dict | None = None, logger: logging.Logger | None = None, runner: aiida.engine.runners.Runner | None = None, enable_persistence: bool = True)#
Bases:
aiida.engine.processes.process.Process
The WorkChain class is the principle component to implement workflows in AiiDA.
Initialization
Construct a WorkChain instance.
Construct the instance only if it is a sub class of WorkChain, otherwise raise InvalidOperation.
- Parameters:
inputs – work chain inputs
logger – aiida logger
runner – work chain runner
enable_persistence – whether to persist this work chain
- _node_class = None#
- _spec_class = None#
- _STEPPER_STATE = 'stepper_state'#
- _CONTEXT = 'CONTEXT'#
- classmethod spec() aiida.engine.processes.workchains.workchain.WorkChainSpec #
- property node: aiida.orm.WorkChainNode#
- property ctx: aiida.common.extendeddicts.AttributeDict#
Get the context.
- save_instance_state(out_state, save_context)#
Save instance state.
- Parameters:
out_state – state to save in
save_context (
plumpy.persistence.LoadSaveContext
) –
- load_instance_state(saved_state, load_context)#
- on_run()#
- _resolve_nested_context(key: str) tuple[aiida.common.extendeddicts.AttributeDict, str] #
Returns a reference to a sub-dictionary of the context and the last key, after resolving a potentially segmented key where required sub-dictionaries are created as needed.
- Parameters:
key – A key into the context, where words before a dot are interpreted as a key for a sub-dictionary
- _insert_awaitable(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable) None #
Insert an awaitable that should be terminated before before continuing to the next step.
- Parameters:
awaitable – the thing to await
- _resolve_awaitable(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable, value: Any) None #
Resolve an awaitable.
Precondition: must be an awaitable that was previously inserted.
- Parameters:
awaitable – the awaitable to resolve
- to_context(**kwargs: aiida.engine.processes.workchains.awaitable.Awaitable | aiida.orm.ProcessNode) None #
Add a dictionary of awaitables to the context.
This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the work chain.
- _update_process_status() None #
Set the process status with a message accounting the current sub processes that we are waiting for.
- _do_step() Any #
Execute the next step in the outline and return the result.
If the stepper returns a non-finished status and the return value is of type ToContext, the contents of the ToContext container will be turned into awaitables if necessary. If any awaitables were created, the process will enter in the Wait state, otherwise it will go to Continue. When the stepper returns that it is done, the stepper result will be converted to None and returned, unless it is an integer or instance of ExitCode.
- _store_nodes(data: Any) None #
Recurse through a data structure and store any unstored nodes that are found along the way
- Parameters:
data – a data structure potentially containing unstored nodes
- on_exiting() None #
Ensure that any unstored nodes in the context are stored, before the state is exited
After the state is exited the next state will be entered and if persistence is enabled, a checkpoint will be saved. If the context contains unstored nodes, the serialization necessary for checkpointing will fail.
- on_wait(awaitables: Sequence[aiida.engine.processes.workchains.awaitable.Awaitable])#
Entering the WAITING state.
- _action_awaitables() None #
Handle the awaitables that are currently registered with the work chain.
Depending on the class type of the awaitable’s target a different callback function will be bound with the awaitable and the runner will be asked to call it when the target is completed
- _on_awaitable_finished(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable) None #
Callback function, for when an awaitable process instance is completed.
The awaitable will be effectuated on the context of the work chain and removed from the internal list. If all awaitables have been dealt with, the work chain process is resumed.
- Parameters:
awaitable – an Awaitable instance
- aiida.engine.append_(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable #
Convenience function that will construct an Awaitable for a given class instance with the context action set to APPEND. When the awaitable target is completed it will be appended to a list in the context for a key that is to be defined later
- Parameters:
target – an instance of a Process or Awaitable
- Returns:
the awaitable
- aiida.engine.assign_(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable #
Convenience function that will construct an Awaitable for a given class instance with the context action set to ASSIGN. When the awaitable target is completed it will be assigned to the context for a key that is to be defined later
- Parameters:
target – an instance of a Process or Awaitable
- Returns:
the awaitable
- aiida.engine.calcfunction(function: aiida.engine.processes.functions.FunctionType) aiida.engine.processes.functions.FunctionType #
A decorator to turn a standard python function into a calcfunction. Example usage:
>>> from aiida.orm import Int >>> >>> # Define the calcfunction >>> @calcfunction >>> def sum(a, b): >>> return a + b >>> # Run it with some input >>> r = sum(Int(4), Int(5)) >>> print(r) 9 >>> r.base.links.get_incoming().all() [Neighbor(link_type='', link_label='result', node=<CalcFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)] >>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes() [4, 5]
- Parameters:
function – The function to decorate.
- Returns:
The decorated function.
- aiida.engine.construct_awaitable(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable #
Construct an instance of the Awaitable class that will contain the information related to the action to be taken with respect to the context once the awaitable object is completed.
The awaitable is a simple dictionary with the following keys
pk: the pk of the node that is being waited on
action: the context action to be performed upon completion
outputs: a boolean that toggles whether the node itself
Currently the only awaitable classes are ProcessNode and Workflow The only awaitable actions are the Assign and Append operators
- aiida.engine.get_object_loader() aiida.engine.persistence.ObjectLoader #
Return the global AiiDA object loader.
- Returns:
The global object loader
- aiida.engine.interruptable_task(coro: Callable[[aiida.engine.utils.InterruptableFuture], Awaitable[Any]], loop: Optional[asyncio.AbstractEventLoop] = None) aiida.engine.utils.InterruptableFuture #
Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it.
- Parameters:
coro – the coroutine that should be made interruptable with object of InterutableFuture as last paramenter
loop – the event loop in which to run the coroutine, by default uses asyncio.get_event_loop()
- Returns:
an InterruptableFuture
- aiida.engine.is_process_function(function: Any) bool #
Return whether the given function is a process function
- Parameters:
function – a function
- Returns:
True if the function is a wrapped process function, False otherwise
- aiida.engine.process_handler(wrapped: Optional[types.FunctionType] = None, *, priority: int = 0, exit_codes: Union[None, aiida.engine.processes.exit_code.ExitCode, List[aiida.engine.processes.exit_code.ExitCode]] = None, enabled: bool = True) types.FunctionType #
Decorator to register a
BaseRestartWorkChain
instance method as a process handler.The decorator will validate the priority and exit_codes optional keyword arguments and then add itself as an attribute to the wrapped instance method. This is used in the inspect_process to return all instance methods of the class that have been decorated by this function and therefore are considered to be process handlers.
Requirements on the function signature of process handling functions. The function to which the decorator is applied needs to take two arguments:
self: This is the instance of the work chain itself
node: This is the process node that finished and is to be investigated
The function body typically consists of a conditional that will check for a particular problem that might have occurred for the sub process. If a particular problem is handled, the process handler should return an instance of the
aiida.engine.ProcessHandlerReport
tuple. If no other process handlers should be considered, the set do_break attribute should be set to True. If the work chain is to be aborted entirely, the exit_code of the report can be set to an ExitCode instance with a non-zero status.- Parameters:
wrapped – the work chain method to register the process handler with
priority – optional integer that defines the order in which registered handlers will be called during the handling of a finished process. Higher priorities will be handled first. Default value is 0. Multiple handlers with the same priority is allowed, but the order of those is not well defined.
exit_codes – single or list of ExitCode instances. If defined, the handler will return None if the exit code set on the node does not appear in the exit_codes. This is useful to have a handler called only when the process failed with a specific exit code.
enabled – boolean, by default True, which will cause the handler to be called during inspect_process. When set to False, the handler will be skipped. This static value can be overridden on a per work chain instance basis through the input handler_overrides.
- aiida.engine.run(process: aiida.engine.launch.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Dict[str, Any] #
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters:
process – the process class or process function to run
inputs – the inputs to be passed to the process
- Returns:
the outputs of the process
- aiida.engine.run_get_node(process: aiida.engine.launch.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Tuple[Dict[str, Any], aiida.orm.ProcessNode] #
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters:
process – the process class, instance, builder or function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and the process node
- aiida.engine.run_get_pk(process: aiida.engine.launch.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Tuple[Dict[str, Any], int] #
Run the process with the supplied inputs in a local runner that will block until the process is completed.
- Parameters:
process – the process class, instance, builder or function to run
inputs – the inputs to be passed to the process
- Returns:
tuple of the outputs of the process and process node pk
- aiida.engine.submit(process: aiida.engine.launch.TYPE_SUBMIT_PROCESS, **inputs: Any) aiida.orm.ProcessNode #
Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.
- Parameters:
process – the process class, instance or builder to submit
inputs – the inputs to be passed to the process
- Returns:
the calculation node of the process
- aiida.engine.workfunction(function: aiida.engine.processes.functions.FunctionType) aiida.engine.processes.functions.FunctionType #
A decorator to turn a standard python function into a workfunction. Example usage:
>>> from aiida.orm import Int >>> >>> # Define the workfunction >>> @workfunction >>> def select(a, b): >>> return a >>> # Run it with some input >>> r = select(Int(4), Int(5)) >>> print(r) 4 >>> r.base.links.get_incoming().all() [Neighbor(link_type='', link_label='result', node=<WorkFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)] >>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes() [4, 5]
- Parameters:
function – The function to decorate.
- Returns:
The decorated function.