aiida.engine#

Module with all the internals that make up the engine of aiida-core.

Package Contents#

Classes#

AiiDAPersister

Persister to take saved process instance states and persisting them to the database.

Awaitable

An attribute dictionary that represents an action that a Process could be waiting for to finish.

AwaitableAction

Enum that describes the action to be taken for a given awaitable.

AwaitableTarget

Enum that describes the class of the target a given awaitable.

BaseRestartWorkChain

Base restart work chain.

CalcJob

Implementation of the CalcJob process.

CalcJobImporter

An abstract class, to define an importer for computations completed outside of AiiDA.

CalcJobOutputPort

Sub class of plumpy.OutputPort which adds the _pass_to_parser attribute.

CalcJobProcessSpec

Process spec intended for the CalcJob process class.

DaemonClient

Client to interact with the daemon.

ExitCode

A simple data class to define an exit code for a Process.

ExitCodesNamespace

A namespace of ExitCode instances that can be accessed through getattr as well as getitem.

FunctionProcess

Function process class used for turning functions into a Process

InputPort

Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic value serialization to database storable types and support non database storable input types as well.

InterruptableFuture

A future that can be interrupted by calling interrupt.

JobManager

A manager for CalcJob submitted to Computer instances.

JobsList

Manager of calculation jobs submitted with a specific AuthInfo, i.e. computer configured for a specific user.

ObjectLoader

Custom object loader for aiida-core.

PastException

Raised when an attempt is made to continue a Process that has already excepted before.

PortNamespace

Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive serialization of a given mapping onto the ports of the PortNamespace.

Process

This class represents an AiiDA process which can be executed and will have full provenance saved in the database.

ProcessBuilder

A process builder that helps setting up the inputs for creating a new process.

ProcessBuilderNamespace

Input namespace for the ProcessBuilder.

ProcessFuture

Future that waits for a process to complete using both polling and listening for broadcast events if possible.

ProcessHandlerReport

A namedtuple to define a process handler report for a aiida.engine.BaseRestartWorkChain.

ProcessSpec

Default process spec for process classes defined in aiida-core.

Runner

Class that can launch processes by running in the current interpreter or by submitting them to the daemon.

WithNonDb

A mixin that adds support to a port to flag that it should not be stored in the database using the non_db=True flag.

WithSerialize

A mixin that adds support for a serialization function which is automatically applied on inputs that are not AiiDA data types.

WorkChain

The WorkChain class is the principle component to implement workflows in AiiDA.

Functions#

append_

Convenience function that will construct an Awaitable for a given class instance with the context action set to APPEND. When the awaitable target is completed it will be appended to a list in the context for a key that is to be defined later

assign_

Convenience function that will construct an Awaitable for a given class instance with the context action set to ASSIGN. When the awaitable target is completed it will be assigned to the context for a key that is to be defined later

calcfunction

A decorator to turn a standard python function into a calcfunction. Example usage:

construct_awaitable

Construct an instance of the Awaitable class that will contain the information related to the action to be taken with respect to the context once the awaitable object is completed.

get_object_loader

Return the global AiiDA object loader.

interruptable_task

Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it.

is_process_function

Return whether the given function is a process function

process_handler

Decorator to register a BaseRestartWorkChain instance method as a process handler.

run

Run the process with the supplied inputs in a local runner that will block until the process is completed.

run_get_node

Run the process with the supplied inputs in a local runner that will block until the process is completed.

run_get_pk

Run the process with the supplied inputs in a local runner that will block until the process is completed.

submit

Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.

workfunction

A decorator to turn a standard python function into a workfunction. Example usage:

Data#

External#

ProcessState

if_

return_

while_

API#

class aiida.engine.AiiDAPersister#

Bases: plumpy.persistence.Persister

Persister to take saved process instance states and persisting them to the database.

save_checkpoint(process: aiida.engine.processes.process.Process, tag: Optional[str] = None)#

Persist a Process instance.

Parameters:
  • process – aiida.engine.Process

  • tag – optional checkpoint identifier to allow distinguishing multiple checkpoints for the same process

Raises:

PersistenceError Raised if there was a problem saving the checkpoint

load_checkpoint(pid: Hashable, tag: Optional[str] = None) plumpy.persistence.Bundle#

Load a process from a persisted checkpoint by its process id.

Parameters:
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint

Returns:

a bundle with the process state

Return type:

plumpy.Bundle

Raises:

PersistenceError Raised if there was a problem loading the checkpoint

get_checkpoints()#

Return a list of all the current persisted process checkpoints

Returns:

list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.

get_process_checkpoints(pid: Hashable)#

Return a list of all the current persisted process checkpoints for the specified process.

Parameters:

pid – the process pid

Returns:

list of PersistedCheckpoint tuples with element containing the process id and optional checkpoint tag.

delete_checkpoint(pid: Hashable, tag: Optional[str] = None) None#

Delete a persisted process checkpoint, where no error will be raised if the checkpoint does not exist.

Parameters:
  • pid – the process id of the plumpy.Process

  • tag – optional checkpoint identifier to allow retrieving a specific sub checkpoint

delete_process_checkpoints(pid: Hashable)#

Delete all persisted checkpoints related to the given process id.

Parameters:

pid – the process id of the aiida.engine.processes.process.Process

class aiida.engine.Awaitable#

Bases: plumpy.utils.AttributesDict

An attribute dictionary that represents an action that a Process could be waiting for to finish.

class aiida.engine.AwaitableAction#

Bases: enum.Enum

Enum that describes the action to be taken for a given awaitable.

ASSIGN = 'assign'#
APPEND = 'append'#
class aiida.engine.AwaitableTarget#

Bases: enum.Enum

Enum that describes the class of the target a given awaitable.

PROCESS = 'process'#
class aiida.engine.BaseRestartWorkChain(*args, **kwargs)#

Bases: aiida.engine.processes.workchains.workchain.WorkChain

Base restart work chain.

This work chain serves as the starting point for more complex work chains that will be designed to run a sub process that might need multiple restarts to come to a successful end. These restarts may be necessary because a single process run is not sufficient to achieve a fully converged result, or certain errors maybe encountered which are recoverable.

This work chain implements the most basic functionality to achieve this goal. It will launch the sub process, restarting until it is completed successfully or the maximum number of iterations is reached. After completion of the sub process it will be inspected, and a list of process handlers are called successively. These process handlers are defined as class methods that are decorated with process_handler().

The idea is to sub class this work chain and leverage the generic error handling that is implemented in the few outline methods. The minimally required outline would look something like the following:

cls.setup
while_(cls.should_run_process)(
    cls.run_process,
    cls.inspect_process,
)

Each of these methods can of course be overriden but they should be general enough to fit most process cycles. The run_process method will take the inputs for the process from the context under the key inputs. The user should, therefore, make sure that before the run_process method is called, that the to be used inputs are stored under self.ctx.inputs. One can update the inputs based on the results from a prior process by calling an outline method just before the run_process step, for example:

cls.setup
while_(cls.should_run_process)(
    cls.prepare_inputs,
    cls.run_process,
    cls.inspect_process,
)

Where in the prepare_calculation method, the inputs dictionary at self.ctx.inputs is updated before the next process will be run with those inputs.

The _process_class attribute should be set to the Process class that should be run in the loop. Finally, to define handlers that will be called during the inspect_process simply define a class method with the signature (self, node) and decorate it with the process_handler decorator, for example:

@process_handler
def handle_problem(self, node):
    if some_problem:
        self.ctx.inputs = improved_inputs
        return ProcessHandlerReport()

The process_handler and ProcessHandlerReport support various arguments to control the flow of the logic of the inspect_process. Refer to their respective documentation for details.

Initialization

Construct the instance.

_process_class: Optional[Type[aiida.engine.processes.Process]] = None#
_considered_handlers_extra = 'considered_handlers'#
property process_class: Type[aiida.engine.processes.process.Process]#

Return the process class to run in the loop.

classmethod define(spec: aiida.engine.processes.ProcessSpec) None#

Define the process specification.

setup() None#

Initialize context variables that are used during the logical flow of the BaseRestartWorkChain.

should_run_process() bool#

Return whether a new process should be run.

This is the case as long as the last process has not finished successfully and the maximum number of restarts has not yet been exceeded.

run_process() aiida.engine.processes.workchains.context.ToContext#

Run the next process, taking the input dictionary from the context at self.ctx.inputs.

inspect_process() Optional[aiida.engine.processes.ExitCode]#

Analyse the results of the previous process and call the handlers when necessary.

If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called in order of their specified priority. If the process was failed and no handler returns a report indicating that the error was handled, it is considered an unhandled process failure and the process is relaunched. If this happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the following matrix determines the logic that is followed:

Process Handler Handler Action result report? exit code —————————————– Success yes == 0 Restart Success yes != 0 Abort Failed yes == 0 Restart Failed yes != 0 Abort

If no handler returned a report and the process finished successfully, the work chain’s work is considered done and it will move on to the next step that directly follows the while conditional, if there is one defined in the outline.

get_outputs(node) Mapping[str, aiida.orm.Node]#

Return a mapping of the outputs that should be attached as outputs to the work chain.

By default this method returns the outputs of the last completed calculation job. This method can be overridden if the implementation wants to update those outputs before attaching them. Make sure that if the content of an output node is modified that this is done through a calcfunction in order to not lose the provenance.

results() Optional[aiida.engine.processes.ExitCode]#

Attach the outputs specified in the output specification from the last completed process.

classmethod is_process_handler(process_handler_name: Union[str, types.FunctionType]) bool#

Return whether the given method name corresponds to a process handler of this class.

Parameters:

process_handler_name – string name of the instance method

Returns:

boolean, True if corresponds to process handler, False otherwise

classmethod get_process_handlers() List[types.FunctionType]#
get_process_handlers_by_priority() List[Tuple[int, types.FunctionType]]#

Return list of process handlers where overrides from inputs.handler_overrides are taken into account.

on_terminated()#

Clean the working directories of all child calculation jobs if clean_workdir=True in the inputs.

_wrap_bare_dict_inputs(port_namespace: aiida.engine.processes.PortNamespace, inputs: Dict[str, Any]) aiida.common.AttributeDict#

Wrap bare dictionaries in inputs in a Dict node if dictated by the corresponding inputs portnamespace.

Parameters:
  • port_namespace – a PortNamespace

  • inputs – a dictionary of inputs intended for submission of the process

Returns:

an attribute dictionary with all bare dictionaries wrapped in Dict if dictated by the port namespace

class aiida.engine.CalcJob(*args, **kwargs)#

Bases: aiida.engine.processes.process.Process

Implementation of the CalcJob process.

Initialization

Construct a CalcJob instance.

Construct the instance only if it is a sub class of CalcJob, otherwise raise InvalidOperation.

See documentation of aiida.engine.Process.

_node_class = None#
_spec_class = None#
classmethod define(spec: aiida.engine.processes.process_spec.CalcJobProcessSpec) None#

Define the process specification, including its inputs, outputs and known exit codes.

Ports are added to the metadata input namespace (inherited from the base Process), and a code input Port, a remote_folder output Port and retrieved folder output Port are added.

Parameters:

spec – the calculation job process spec to define.

spec_options()#

Return the metadata options port namespace of the process specification of this process.

Returns:

options dictionary

Return type:

dict

classmethod get_importer(entry_point_name: str | None = None) aiida.engine.processes.calcjobs.importer.CalcJobImporter#

Load the CalcJobImporter associated with this CalcJob if it exists.

By default an importer with the same entry point as the CalcJob will be loaded, however, this can be overridden using the entry_point_name argument.

Parameters:

entry_point_name – optional entry point name of a CalcJobImporter to override the default.

Returns:

the loaded CalcJobImporter.

Raises:

if no importer class could be loaded.

property options: aiida.common.AttributeDict#

Return the options of the metadata that were specified when this process instance was launched.

Returns:

options dictionary

classmethod get_state_classes() Dict[Hashable, Type[plumpy.process_states.State]]#

A mapping of the State constants to the corresponding state class.

Overrides the waiting state with the Calcjob specific version.

property node: aiida.orm.CalcJobNode#
on_terminated() None#

Cleanup the node by deleting the calulation job state.

Note

This has to be done before calling the super because that will seal the node after we cannot change it

run() Union[plumpy.process_states.Stop, int, plumpy.process_states.Wait]#

Run the calculation job.

This means invoking the presubmit and storing the temporary folder in the node’s repository. Then we move the process in the Wait state, waiting for the UPLOAD transport task to be started.

Returns:

the Stop command if a dry run, int if the process has an exit status, Wait command if the calcjob is to be uploaded

abstract prepare_for_submission(folder: aiida.common.folders.Folder) aiida.common.datastructures.CalcInfo#

Prepare the calculation for submission.

Convert the input nodes into the corresponding input files in the format that the code will expect. In addition, define and return a CalcInfo instance, which is a simple data structure that contains information for the engine, for example, on what files to copy to the remote machine, what files to retrieve once it has completed, specific scheduler settings and more.

Parameters:

folder – a temporary folder on the local file system.

Returns:

the CalcInfo instance

_setup_metadata(metadata: dict) None#

Store the metadata on the ProcessNode.

_setup_inputs() None#

Create the links between the input nodes and the ProcessNode that represents this process.

_perform_dry_run()#

Perform a dry run.

Instead of performing the normal sequence of steps, just the presubmit is called, which will call the method prepare_for_submission of the plugin to generate the input files based on the inputs. Then the upload action is called, but using a normal local transport that will copy the files to a local sandbox folder. The generated input script and the absolute path to the sandbox folder are stored in the dry_run_info attribute of the node of this process.

_perform_import()#

Perform the import of an already completed calculation.

The inputs contained a RemoteData under the key remote_folder signalling that this is not supposed to be run as a normal calculation job, but rather the results are already computed outside of AiiDA and merely need to be imported.

parse(retrieved_temporary_folder: Optional[str] = None, existing_exit_code: aiida.engine.processes.exit_code.ExitCode | None = None) aiida.engine.processes.exit_code.ExitCode#

Parse a retrieved job calculation.

This is called once it’s finished waiting for the calculation to be finished and the data has been retrieved.

Parameters:

retrieved_temporary_folder – The path to the temporary folder

static terminate(exit_code: aiida.engine.processes.exit_code.ExitCode) aiida.engine.processes.exit_code.ExitCode#

Terminate the process immediately and return the given exit code.

This method is called by aiida.engine.processes.calcjobs.tasks.Waiting.execute() if a monitor triggered the job to be terminated and specified the parsing to be skipped. It will construct the running state and tell this method to be run, which returns the given exit code which will cause the process to be terminated.

Parameters:

exit_code – The exit code to return.

Returns:

The provided exit code.

parse_scheduler_output(retrieved: aiida.orm.Node) Optional[aiida.engine.processes.exit_code.ExitCode]#

Parse the output of the scheduler if that functionality has been implemented for the plugin.

parse_retrieved_output(retrieved_temporary_folder: Optional[str] = None) Optional[aiida.engine.processes.exit_code.ExitCode]#

Parse the retrieved data by calling the parser plugin if it was defined in the inputs.

presubmit(folder: aiida.common.folders.Folder) aiida.common.datastructures.CalcInfo#

Prepares the calculation folder with all inputs, ready to be copied to the cluster.

Parameters:

folder – a SandboxFolder that can be used to write calculation input files and the scheduling script.

Return calcinfo:

the CalcInfo object containing the information needed by the daemon to handle operations.

class aiida.engine.CalcJobImporter#

Bases: abc.ABC

An abstract class, to define an importer for computations completed outside of AiiDA.

This class is used to import the results of a calculation that was completed outside of AiiDA. The importer is responsible for parsing the output files of the calculation and creating the corresponding AiiDA nodes.

abstract static parse_remote_data(remote_data: aiida.orm.RemoteData, **kwargs) Dict[str, Union[aiida.orm.Node, Dict]]#

Parse the input nodes from the files in the provided RemoteData.

Parameters:
  • remote_data – the remote data node containing the raw input files.

  • kwargs – additional keyword arguments to control the parsing process.

Returns:

a dictionary with the parsed inputs nodes that match the input spec of the associated CalcJob.

class aiida.engine.CalcJobOutputPort(*args, **kwargs)#

Bases: plumpy.ports.OutputPort

Sub class of plumpy.OutputPort which adds the _pass_to_parser attribute.

Initialization

property pass_to_parser: bool#
class aiida.engine.CalcJobProcessSpec#

Bases: aiida.engine.processes.process_spec.ProcessSpec

Process spec intended for the CalcJob process class.

Initialization

OUTPUT_PORT_TYPE = None#
property default_output_node: Optional[str]#
class aiida.engine.DaemonClient(profile: aiida.manage.configuration.profile.Profile)#

Client to interact with the daemon.

Initialization

Construct an instance for a given profile.

Parameters:

profile – The profile instance.

DAEMON_ERROR_NOT_RUNNING = 'daemon-error-not-running'#
DAEMON_ERROR_TIMEOUT = 'daemon-error-timeout'#
_DAEMON_NAME = 'aiida-{name}'#
_ENDPOINT_PROTOCOL = None#
property profile: aiida.manage.configuration.profile.Profile#
property daemon_name: str#

Get the daemon name which is tied to the profile name.

property _verdi_bin: str#

Return the absolute path to the verdi binary.

Raises:

ConfigurationError – If the path to verdi could not be found

cmd_start_daemon(number_workers: int = 1, foreground: bool = False) list[str]#

Return the command to start the daemon.

Parameters:
  • number_workers – Number of daemon workers to start.

  • foreground – Whether to launch the subprocess in the background or not.

property cmd_start_daemon_worker: list[str]#

Return the command to start a daemon worker process.

property loglevel: str#
property virtualenv: str | None#
property circus_log_file: str#
property circus_pid_file: str#
property circus_port_file: str#
property circus_socket_file: str#
property circus_socket_endpoints: dict[str, str]#
property daemon_log_file: str#
property daemon_pid_file: str#
get_circus_port() int#

Retrieve the port for the circus controller, which should be written to the circus port file.

If the daemon is running, the port file should exist and contain the port to which the controller is connected. If it cannot be read, a RuntimeError will be thrown. If the daemon is not running, an available port will be requested from the operating system, written to the port file and returned.

Returns:

The port for the circus controller.

static get_env() dict[str, str]#

Return the environment for this current process.

This method is used to pass variables from the environment of the current process to a subprocess that is spawned when the daemon or a daemon worker is started.

It replicates the PATH, PYTHONPATH` and the ``AIIDA_PATH environment variables. The PYTHONPATH variable ensures that all Python modules that can be imported by the parent process, are also importable by the subprocess. The AIIDA_PATH variable ensures that the subprocess will use the same AiiDA configuration directory as used by the current process.

get_circus_socket_directory() str#

Retrieve the absolute path of the directory where the circus sockets are stored.

If the daemon is running, the sockets file should exist and contain the absolute path of the directory that contains the sockets of the circus endpoints. If it cannot be read, a RuntimeError will be thrown. If the daemon is not running, a temporary directory will be created and its path will be written to the sockets file and returned.

Note

A temporary folder needs to be used for the sockets because UNIX limits the filepath length to 107 bytes. Placing the socket files in the AiiDA config folder might seem like the more logical choice but that folder can be placed in an arbitrarily nested directory, the socket filename will exceed the limit. The solution is therefore to always store them in the temporary directory of the operation system whose base path is typically short enough as to not exceed the limit

Returns:

The absolute path of directory to write the sockets to.

get_daemon_pid() int | None#

Get the daemon pid which should be written in the daemon pid file specific to the profile.

Returns:

The pid of the circus daemon process or None if not found.

property is_daemon_running: bool#

Return whether the daemon is running, which is determined by seeing if the daemon pid file is present.

Returns:

True if daemon is running, False otherwise.

delete_circus_socket_directory() None#

Attempt to delete the directory used to store the circus endpoint sockets.

Will not raise if the directory does not exist.

classmethod get_available_port()#

Get an available port from the operating system.

Returns:

A currently available port.

get_controller_endpoint()#

Get the endpoint string for the circus controller.

For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol an available port will be found and saved in the profile specific port file.

Returns:

The endpoint string.

get_pubsub_endpoint()#

Get the endpoint string for the circus pubsub endpoint.

For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol any available port will be used.

Returns:

The endpoint string.

get_stats_endpoint()#

Get the endpoint string for the circus stats endpoint.

For the IPC protocol a profile specific socket will be used, whereas for the TCP protocol any available port will be used.

Returns:

The endpoint string.

get_ipc_endpoint(endpoint)#

Get the ipc endpoint string for a circus daemon endpoint for a given socket.

Parameters:

endpoint – The circus endpoint for which to return a socket.

Returns:

The ipc endpoint string.

get_tcp_endpoint(port=None)#

Get the tcp endpoint string for a circus daemon endpoint.

If the port is unspecified, the operating system will be asked for a currently available port.

Parameters:

port – A port to use for the endpoint.

Returns:

The tcp endpoint string.

get_client() circus.client.CircusClient#

Return an instance of the CircusClient.

The endpoint is defined by the controller endpoint, which used the port that was written to the port file upon starting of the daemon.

Returns:

CircusClient instance

call_client(command: aiida.engine.daemon.client.JsonDictType) aiida.engine.daemon.client.JsonDictType#

Call the client with a specific command.

Will check whether the daemon is running first by checking for the pid file. When the pid is found yet the call still fails with a timeout, this means the daemon was actually not running and it was terminated unexpectedly causing the pid file to not be cleaned up properly.

Parameters:

command – Command to call the circus client with.

Returns:

The result of the circus client call.

get_status() aiida.engine.daemon.client.JsonDictType#

Get the daemon running status.

Returns:

The client call response. If successful, will will contain ‘status’ key.

get_numprocesses() aiida.engine.daemon.client.JsonDictType#

Get the number of running daemon processes.

Returns:

The client call response. If successful, will contain ‘numprocesses’ key.

get_worker_info() aiida.engine.daemon.client.JsonDictType#

Get workers statistics for this daemon.

Returns:

The client call response. If successful, will contain ‘info’ key.

get_daemon_info() aiida.engine.daemon.client.JsonDictType#

Get statistics about this daemon itself.

Returns:

The client call response. If successful, will contain ‘info’ key.

increase_workers(number: int) aiida.engine.daemon.client.JsonDictType#

Increase the number of workers.

Parameters:

number – The number of workers to add.

Returns:

The client call response.

decrease_workers(number: int) aiida.engine.daemon.client.JsonDictType#

Decrease the number of workers.

Parameters:

number – The number of workers to remove.

Returns:

The client call response.

stop_daemon(wait: bool = True, timeout: int = 5) aiida.engine.daemon.client.JsonDictType#

Stop the daemon.

Parameters:
  • wait – Boolean to indicate whether to wait for the result of the command.

  • timeout – Wait this number of seconds for the is_daemon_running to return False before raising.

Returns:

The client call response.

Raises:

DaemonException – If is_daemon_running returns True after the timeout has passed.

restart_daemon(wait: bool) aiida.engine.daemon.client.JsonDictType#

Restart the daemon.

Parameters:

wait – Boolean to indicate whether to wait for the result of the command.

Returns:

The client call response.

start_daemon(number_workers: int = 1, foreground: bool = False, timeout: int = 5) None#

Start the daemon in a sub process running in the background.

Parameters:
  • number_workers – Number of daemon workers to start.

  • foreground – Whether to launch the subprocess in the background or not.

  • timeout – Wait this number of seconds for the is_daemon_running to return True before raising.

Raises:
  • DaemonException – If the daemon fails to start.

  • DaemonException – If the daemon starts but then is unresponsive or in an unexpected state.

  • DaemonException – If is_daemon_running returns False after the timeout has passed.

static _await_condition(condition: Callable, exception: Exception, timeout: int = 5, interval: float = 0.1)#

Await a condition to evaluate to True or raise the exception if the timeout is reached.

Parameters:
  • condition – A callable that is waited for to return True.

  • exception – Raise this exception if condition does not return True after timeout seconds.

  • timeout – Wait this number of seconds for condition to return True before raising.

  • interval – The time in seconds to wait between invocations of condition.

Raises:

The exception provided by exception if timeout is reached.

_start_daemon(number_workers: int = 1, foreground: bool = False) None#

Start the daemon.

Warning

This will daemonize the current process and put it in the background. It is most likely not what you want to call if you want to start the daemon from the Python API. Instead you probably will want to use the aiida.engine.daemon.client.DaemonClient.start_daemon() function instead.

Parameters:
  • number_workers – Number of daemon workers to start.

  • foreground – Whether to launch the subprocess in the background or not.

class aiida.engine.ExitCode#

Bases: typing.NamedTuple

A simple data class to define an exit code for a Process.

When an instance of this class is returned from a Process._run() call, it will be interpreted that the Process should be terminated and that the exit status and message of the namedtuple should be set to the corresponding attributes of the node.

Parameters:
  • status – positive integer exit status, where a non-zero value indicated the process failed, default is 0

  • message – optional message with more details about the failure mode

  • invalidates_cache – optional flag, indicating that a process should not be used in caching

status: int = 0#
message: Optional[str] = None#
invalidates_cache: bool = False#
format(**kwargs: str) aiida.engine.processes.exit_code.ExitCode#

Create a clone of this exit code where the template message is replaced by the keyword arguments.

Parameters:

kwargs – replacement parameters for the template message

class aiida.engine.ExitCodesNamespace#

Bases: aiida.common.extendeddicts.AttributeDict

A namespace of ExitCode instances that can be accessed through getattr as well as getitem.

Additionally, the collection can be called with an identifier, that can either reference the integer status of the ExitCode that needs to be retrieved or the key in the collection.

__call__(identifier: Union[int, str]) aiida.engine.processes.exit_code.ExitCode#

Return a specific exit code identified by either its exit status or label.

Parameters:

identifier – the identifier of the exit code. If the type is integer, it will be interpreted as the exit code status, otherwise it be interpreted as the exit code label

Returns:

an ExitCode instance

Raises:

ValueError – if no exit code with the given label is defined for this process

class aiida.engine.FunctionProcess(*args, **kwargs)#

Bases: aiida.engine.processes.process.Process

Function process class used for turning functions into a Process

Initialization

_func_args: Sequence[str] = ()#
static _func(*_args, **_kwargs) dict#

This is used internally to store the actual function that is being wrapped and will be replaced by the build method.

static build(func: Callable[..., Any], node_class: Type[aiida.orm.ProcessNode]) Type[aiida.engine.processes.functions.FunctionProcess]#

Build a Process from the given function.

All function arguments will be assigned as process inputs. If keyword arguments are specified then these will also become inputs.

Parameters:
  • func – The function to build a process from

  • node_class – Provide a custom node class to be used, has to be constructable with no arguments. It has to be a sub class of ProcessNode and the mixin FunctionCalculationMixin.

Returns:

A Process class that represents the function

classmethod validate_inputs(*args: Any, **kwargs: Any) None#

Validate the positional and keyword arguments passed in the function call.

Raises:

TypeError – if more positional arguments are passed than the function defines

classmethod create_inputs(*args: Any, **kwargs: Any) Dict[str, Any]#

Create the input args for the FunctionProcess.

classmethod args_to_dict(*args: Any) Dict[str, Any]#

Create an input dictionary (of form label -> value) from supplied args.

Parameters:

args – The values to use for the dictionary

Returns:

A label -> value dictionary

classmethod get_or_create_db_record() aiida.orm.ProcessNode#
property process_class: Callable[..., Any]#

Return the class that represents this Process, for the FunctionProcess this is the function itself.

For a standard Process or sub class of Process, this is the class itself. However, for legacy reasons, the Process class is a wrapper around another class. This function returns that original class, i.e. the class that really represents what was being executed.

Returns:

A Process class that represents the function

execute() Optional[Dict[str, Any]]#

Execute the process.

_setup_db_record() None#

Set up the database record for the process.

run() Optional[aiida.engine.processes.exit_code.ExitCode]#

Run the process.

class aiida.engine.InputPort(*args, **kwargs)#

Bases: aiida.engine.processes.ports.WithSerialize, aiida.engine.processes.ports.WithNonDb, plumpy.ports.InputPort

Sub class of plumpy.InputPort which mixes in the WithSerialize and WithNonDb mixins to support automatic value serialization to database storable types and support non database storable input types as well.

Initialization

Override the constructor to check the type of the default if set and warn if not immutable.

get_description() Dict[str, str]#

Return a description of the InputPort, which will be a dictionary of its attributes

Returns:

a dictionary of the stringified InputPort attributes

class aiida.engine.InterruptableFuture(*, loop=None)#

Bases: asyncio.Future

A future that can be interrupted by calling interrupt.

Initialization

Initialize the future.

The optional event_loop argument allows explicitly setting the event loop object used by the future. If it’s not provided, the future uses the default event loop.

interrupt(reason: Exception) None#

This method should be called to interrupt the coroutine represented by this InterruptableFuture.

async with_interrupt(coro: Awaitable[Any]) Any#

return result of a coroutine which will be interrupted if this future is interrupted

import asyncio
loop = asyncio.get_event_loop()

interruptable = InterutableFuture()
loop.call_soon(interruptable.interrupt, RuntimeError("STOP"))
loop.run_until_complete(interruptable.with_interrupt(asyncio.sleep(2.)))
>>> RuntimeError: STOP
Parameters:

coro – The coroutine that can be interrupted

Returns:

The result of the coroutine

class aiida.engine.JobManager(transport_queue: aiida.engine.transports.TransportQueue)#

A manager for CalcJob submitted to Computer instances.

When a calculation job is submitted to a Computer, it actually uses a specific AuthInfo, which is a computer configured for a User. The JobManager maintains a mapping of JobsList instances for each authinfo that has active calculation jobs. These jobslist instances are then responsible for bundling scheduler updates for all the jobs they maintain (i.e. that all share the same authinfo) and update their status.

As long as a Runner will create a single JobManager instance and use that for its lifetime, the guarantees made by the JobsList about respecting the minimum polling interval of the scheduler will be maintained. Note, however, that since each Runner will create its own job manager, these guarantees only hold per runner.

Initialization

get_jobs_list(authinfo: aiida.orm.AuthInfo) aiida.engine.processes.calcjobs.manager.JobsList#

Get or create a new JobLists instance for the given authinfo.

Parameters:

authinfo – the AuthInfo

Returns:

a JobsList instance

request_job_info_update(authinfo: aiida.orm.AuthInfo, job_id: Hashable) Iterator[asyncio.Future[JobInfo]]#

Get a future that will resolve to information about a given job.

This is a context manager so that if the user leaves the context the request is automatically cancelled.

class aiida.engine.JobsList(authinfo: aiida.orm.AuthInfo, transport_queue: aiida.engine.transports.TransportQueue, last_updated: Optional[float] = None)#

Manager of calculation jobs submitted with a specific AuthInfo, i.e. computer configured for a specific user.

This container of active calculation jobs is used to update their status periodically in batches, ensuring that even when a lot of jobs are running, the scheduler update command is not triggered for each job individually.

In addition, the Computer for which the AuthInfo is configured, can define a minimum polling interval. This class will guarantee that the time between update calls to the scheduler is larger or equal to that minimum interval.

Note that since each instance operates on a specific authinfo, the guarantees of batching scheduler update calls and the limiting of number of calls per unit time, through the minimum polling interval, is only applicable for jobs launched with that particular authinfo. If multiple authinfo instances with the same computer, have active jobs these limitations are not respected between them, since there is no communication between JobsList instances. See the JobManager for example usage.

Initialization

Construct an instance for the given authinfo and transport queue.

Parameters:
  • authinfo – The authinfo used to check the jobs list

  • transport_queue – A transport queue

  • last_updated – initialize the last updated timestamp

property logger: logging.Logger#

Return the logger configured for this instance.

Returns:

the logger

get_minimum_update_interval() float#

Get the minimum interval that should be respected between updates of the list.

Returns:

the minimum interval

property last_updated: Optional[float]#

Get the timestamp of when the list was last updated as produced by time.time()

Returns:

The last update point

async _get_jobs_from_scheduler() Dict[Hashable, aiida.schedulers.datastructures.JobInfo]#

Get the current jobs list from the scheduler.

Returns:

a mapping of job ids to JobInfo instances

async _update_job_info() None#

Update all of the job information objects.

This will set the futures for all pending update requests where the corresponding job has a new status compared to the last update.

request_job_info_update(job_id: Hashable) Iterator[asyncio.Future[JobInfo]]#

Request job info about a job when the job next changes state.

If the job is not found in the jobs list at the update, the future will resolve to None.

Parameters:

job_id – job identifier

Returns:

future that will resolve to a JobInfo object when the job changes state

_ensure_updating() None#

Ensure that we are updating the job list from the remote resource.

This will automatically stop if there are no outstanding requests.

static _has_job_state_changed(old: Optional[aiida.schedulers.datastructures.JobInfo], new: Optional[aiida.schedulers.datastructures.JobInfo]) bool#

Return whether the states old and new are different.

_get_next_update_delay() float#

Calculate when we are next allowed to poll the scheduler.

This delay is calculated as the minimum polling interval defined by the authentication info for this instance, minus time elapsed since the last update.

Returns:

delay (in seconds) after which the scheduler may be polled again

_update_requests_outstanding() bool#
_get_jobs_with_scheduler() List[str]#

Get all the jobs that are currently with scheduler.

Returns:

the list of jobs with the scheduler

Return type:

list

class aiida.engine.ObjectLoader#

Bases: plumpy.loaders.DefaultObjectLoader

Custom object loader for aiida-core.

load_object(identifier: str) Any#

Attempt to load the object identified by the given identifier.

Note

We override the plumpy.DefaultObjectLoader to be able to throw an ImportError instead of a ValueError which in the context of aiida-core is not as apt, since we are loading classes.

Parameters:

identifier – concatenation of module and resource name

Returns:

loaded object

Raises:

ImportError – if the object cannot be loaded

aiida.engine.OutputPort = None#
aiida.engine.PORT_NAMESPACE_SEPARATOR = '__'#
class aiida.engine.PastException#

Bases: aiida.common.exceptions.AiidaException

Raised when an attempt is made to continue a Process that has already excepted before.

class aiida.engine.PortNamespace(*args, **kwargs)#

Bases: aiida.engine.processes.ports.WithNonDb, plumpy.ports.PortNamespace

Sub class of plumpy.PortNamespace which implements the serialize method to support automatic recursive serialization of a given mapping onto the ports of the PortNamespace.

Initialization

__setitem__(key: str, port: plumpy.ports.Port) None#

Ensure that a Port being added inherits the non_db attribute if not explicitly defined at construction.

The reasoning is that if a PortNamespace has non_db=True, which is different from the default value, very often all leaves should be also non_db=True. To prevent a user from having to specify it manually everytime we overload the value here, unless it was specifically set during construction.

Note that the non_db attribute is not present for all Port sub classes so we have to check for it first.

static validate_port_name(port_name: str) None#

Validate the given port name.

Valid port names adhere to the following restrictions:

  • Is a valid link label (see below)

  • Does not contain two or more consecutive underscores

Valid link labels adhere to the following restrictions:

  • Has to be a valid python identifier

  • Can only contain alphanumeric characters and underscores

  • Can not start or end with an underscore

Parameters:

port_name – the proposed name of the port to be added

Raises:
  • TypeError – if the port name is not a string type

  • ValueError – if the port name is invalid

serialize(mapping: Optional[Dict[str, Any]], breadcrumbs: Sequence[str] = ()) Optional[Dict[str, Any]]#

Serialize the given mapping onto this Portnamespace.

It will recursively call this function on any nested PortNamespace or the serialize function on any Ports.

Parameters:
  • mapping – a mapping of values to be serialized

  • breadcrumbs – a tuple with the namespaces of parent namespaces

Returns:

the serialized mapping

class aiida.engine.Process(inputs: Optional[Dict[str, Any]] = None, logger: Optional[logging.Logger] = None, runner: Optional[aiida.engine.runners.Runner] = None, parent_pid: Optional[int] = None, enable_persistence: bool = True)#

Bases: plumpy.processes.Process

This class represents an AiiDA process which can be executed and will have full provenance saved in the database.

Initialization

Process constructor.

Parameters:
  • inputs – process inputs

  • logger – aiida logger

  • runner – process runner

  • parent_pid – id of parent process

  • enable_persistence – whether to persist this process

_node_class = None#
_spec_class = None#
SINGLE_OUTPUT_LINKNAME: str = 'result'#
class SaveKeys#

Bases: enum.Enum

Keys used to identify things in the saved instance state bundle.

CALC_ID: str = 'calc_id'#
classmethod spec() aiida.engine.processes.process_spec.ProcessSpec#
classmethod define(spec: aiida.engine.processes.process_spec.ProcessSpec) None#

Define the specification of the process, including its inputs, outputs and known exit codes.

A metadata input namespace is defined, with optional ports that are not stored in the database.

classmethod get_builder() aiida.engine.processes.builder.ProcessBuilder#
classmethod get_or_create_db_record() aiida.orm.ProcessNode#

Create a process node that represents what happened in this process.

Returns:

A process node

init() None#
classmethod get_exit_statuses(exit_code_labels: Iterable[str]) List[int]#

Return the exit status (integers) for the given exit code labels.

Parameters:

exit_code_labels – a list of strings that reference exit code labels of this process class

Returns:

list of exit status integers that correspond to the given exit code labels

Raises:

AttributeError – if at least one of the labels does not correspond to an existing exit code

exit_codes() aiida.engine.processes.exit_code.ExitCodesNamespace#

Return the namespace of exit codes defined for this WorkChain through its ProcessSpec.

The namespace supports getitem and getattr operations with an ExitCode label to retrieve a specific code. Additionally, the namespace can also be called with either the exit code integer status to retrieve it.

Returns:

ExitCodesNamespace of ExitCode named tuples

spec_metadata() aiida.engine.processes.ports.PortNamespace#

Return the metadata port namespace of the process specification of this process.

property node: aiida.orm.ProcessNode#

Return the ProcessNode used by this process to represent itself in the database.

Returns:

instance of sub class of ProcessNode

property uuid: str#

Return the UUID of the process which corresponds to the UUID of its associated ProcessNode.

Returns:

the UUID associated to this process instance

property metadata: aiida.common.extendeddicts.AttributeDict#

Return the metadata that were specified when this process instance was launched.

Returns:

metadata dictionary

_save_checkpoint() None#

Save the current state in a chechpoint if persistence is enabled and the process state is not terminal

If the persistence call excepts with a PersistenceError, it will be caught and a warning will be logged.

save_instance_state(out_state: MutableMapping[str, Any], save_context: Optional[plumpy.persistence.LoadSaveContext]) None#

Save instance state.

See documentation of plumpy.processes.Process.save_instance_state().

get_provenance_inputs_iterator() Iterator[Tuple[str, Union[aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace]]]#

Get provenance input iterator.

Return type:

filter

load_instance_state(saved_state: MutableMapping[str, Any], load_context: plumpy.persistence.LoadSaveContext) None#

Load instance state.

Parameters:
  • saved_state – saved instance state

  • load_context –

kill(msg: Union[str, None] = None) Union[bool, plumpy.futures.Future]#

Kill the process and all the children calculations it called

Parameters:

msg – message

out(output_port: str, value: Any = None) None#

Attach output to output port.

The name of the port will be used as the link label.

Parameters:
  • output_port – name of output port

  • value – value to put inside output port

out_many(out_dict: Dict[str, Any]) None#

Attach outputs to multiple output ports.

Keys of the dictionary will be used as output port names, values as outputs.

Parameters:

out_dict (dict) – output dictionary

on_create() None#

Called when a Process is created.

on_entered(from_state: Optional[plumpy.process_states.State]) None#

After entering a new state, save a checkpoint and update the latest process state change timestamp.

on_terminated() None#

Called when a Process enters a terminal state.

on_except(exc_info: Tuple[Any, Exception, types.TracebackType]) None#

Log the exception by calling the report method with formatted stack trace from exception info object and store the exception string as a node attribute

Parameters:

exc_info – the sys.exc_info() object (type, value, traceback)

on_finish(result: Union[int, aiida.engine.processes.exit_code.ExitCode], successful: bool) None#

Set the finish status on the process node.

Parameters:
  • result – result of the process

  • successful – whether execution was successful

on_paused(msg: Optional[str] = None) None#

The Process was paused so set the paused attribute on the process node

Parameters:

msg – message

on_playing() None#

The Process was unpaused so remove the paused attribute on the process node

on_output_emitting(output_port: str, value: Any) None#

The process has emitted a value on the given output port.

Parameters:
  • output_port – The output port name the value was emitted on

  • value – The value emitted

set_status(status: Optional[str]) None#

The status of the Process is about to be changed, so we reflect this is in node’s attribute proxy.

Parameters:

status – the status message

submit(process: Type[aiida.engine.processes.process.Process], **kwargs) aiida.orm.ProcessNode#

Submit process for execution.

Parameters:

process – process

Returns:

the calculation node of the process

property runner: aiida.engine.runners.Runner#

Get process runner.

get_parent_calc() Optional[aiida.orm.ProcessNode]#

Get the parent process node

Returns:

the parent process node if there is one

classmethod build_process_type() str#

The process type.

Returns:

string of the process type

Note: This could be made into a property ‘process_type’ but in order to have it be a property of the class it would need to be defined in the metaclass, see https://bugs.python.org/issue20659

report(msg: str, *args, **kwargs) None#

Log a message to the logger, which should get saved to the database through the attached DbLogHandler.

The pk, class name and function name of the caller are prepended to the given message

Parameters:
  • msg – message to log

  • args – args to pass to the log call

  • kwargs – kwargs to pass to the log call

_create_and_setup_db_record() Union[int, uuid.UUID]#

Create and setup the database record for this process

Returns:

the uuid or pk of the process

encode_input_args(inputs: Dict[str, Any]) str#

Encode input arguments such that they may be saved in a Bundle

Parameters:

inputs – A mapping of the inputs as passed to the process

Returns:

The encoded (serialized) inputs

decode_input_args(encoded: str) Dict[str, Any]#

Decode saved input arguments as they came from the saved instance state Bundle

Parameters:

encoded – encoded (serialized) inputs

Returns:

The decoded input args

update_outputs() None#

Attach new outputs to the node since the last call.

Does nothing, if self.metadata.store_provenance is False.

_build_process_label() str#

Construct the process label that should be set on ProcessNode instances for this process class.

Note

By default this returns the name of the process class itself. It can be overridden by Process subclasses to provide a more specific label.

Returns:

The process label to use for ProcessNode instances.

_setup_db_record() None#

Create the database record for this process and the links with respect to its inputs

This function will set various attributes on the node that serve as a proxy for attributes of the Process. This is essential as otherwise this information could only be introspected through the Process itself, which is only available to the interpreter that has it in memory. To make this data introspectable from any interpreter, for example for the command line interface, certain Process attributes are proxied through the calculation node.

In addition, the parent calculation will be setup with a CALL link if applicable and all inputs will be linked up as well.

_setup_version_info() None#

Store relevant plugin version information.

_setup_metadata(metadata: dict) None#

Store the metadata on the ProcessNode.

_setup_inputs() None#

Create the links between the input nodes and the ProcessNode that represents this process.

_flat_inputs() Dict[str, Any]#

Return a flattened version of the parsed inputs dictionary.

The eventual keys will be a concatenation of the nested keys. Note that the metadata dictionary, if present, is not passed, as those are dealt with separately in _setup_metadata.

Returns:

flat dictionary of parsed inputs

_flat_outputs() Dict[str, Any]#

Return a flattened version of the registered outputs dictionary.

The eventual keys will be a concatenation of the nested keys.

Returns:

flat dictionary of parsed outputs

_flatten_inputs(port: Union[None, aiida.engine.processes.ports.InputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any, parent_name: str = '', separator: str = PORT_NAMESPACE_SEPARATOR) List[Tuple[str, Any]]#

Function that will recursively flatten the inputs dictionary, omitting inputs for ports that are marked as being non database storable

Parameters:
  • port – port against which to map the port value, can be InputPort or PortNamespace

  • port_value – value for the current port, can be a Mapping

  • parent_name – the parent key with which to prefix the keys

  • separator – character to use for the concatenation of keys

Returns:

flat list of inputs

_flatten_outputs(port: Union[None, aiida.engine.processes.ports.OutputPort, aiida.engine.processes.ports.PortNamespace], port_value: Any, parent_name: str = '', separator: str = PORT_NAMESPACE_SEPARATOR) List[Tuple[str, Any]]#

Function that will recursively flatten the outputs dictionary.

Parameters:
  • port – port against which to map the port value, can be OutputPort or PortNamespace

  • port_value – value for the current port, can be a Mapping

  • parent_name – the parent key with which to prefix the keys

  • separator – character to use for the concatenation of keys

Returns:

flat list of outputs

exposed_inputs(process_class: Type[aiida.engine.processes.process.Process], namespace: Optional[str] = None, agglomerate: bool = True) aiida.common.extendeddicts.AttributeDict#

Gather a dictionary of the inputs that were exposed for a given Process class under an optional namespace.

Parameters:
  • process_class – Process class whose inputs to try and retrieve

  • namespace – PortNamespace in which to look for the inputs

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched for inputs. Inputs in lower-lying namespaces take precedence.

Returns:

exposed inputs

exposed_outputs(node: aiida.orm.ProcessNode, process_class: Type[aiida.engine.processes.process.Process], namespace: Optional[str] = None, agglomerate: bool = True) aiida.common.extendeddicts.AttributeDict#

Return the outputs which were exposed from the process_class and emitted by the specific node

Parameters:
  • node – process node whose outputs to try and retrieve

  • namespace – Namespace in which to search for exposed outputs.

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched for outputs. Outputs in lower-lying namespaces take precedence.

Returns:

exposed outputs

static _get_namespace_list(namespace: Optional[str] = None, agglomerate: bool = True) List[Optional[str]]#

Get the list of namespaces in a given namespace.

Parameters:
  • namespace – name space

  • agglomerate – If set to true, all parent namespaces of the given namespace will also be searched.

Returns:

namespace list

classmethod is_valid_cache(node: aiida.orm.ProcessNode) bool#

Check if the given node can be cached from.

Overriding this method allows Process sub-classes to modify when corresponding process nodes are considered as a cache.

Warning

When overriding this method, make sure to return False at least in all cases when super()._node.base.caching.is_valid_cache(node) returns False. Otherwise, the invalidates_cache keyword on exit codes may have no effect.

class aiida.engine.ProcessBuilder(process_class: Type[aiida.engine.processes.process.Process])#

Bases: aiida.engine.processes.builder.ProcessBuilderNamespace

A process builder that helps setting up the inputs for creating a new process.

Initialization

Construct a ProcessBuilder instance for the given Process class.

Parameters:

process_class – the Process subclass

property process_class: Type[aiida.engine.processes.process.Process]#

Return the process class for which this builder is constructed.

_repr_pretty_(p, _) str#

Pretty representation for in the IPython console and notebooks.

class aiida.engine.ProcessBuilderNamespace(port_namespace: aiida.engine.processes.ports.PortNamespace)#

Bases: collections.abc.MutableMapping

Input namespace for the ProcessBuilder.

Dynamically generates the getters and setters for the input ports of a given PortNamespace

Initialization

Dynamically construct the get and set properties for the ports of the given port namespace.

For each port in the given port namespace a get and set property will be constructed dynamically and added to the ProcessBuilderNamespace. The docstring for these properties will be defined by calling str() on the Port, which should return the description of the Port.

Parameters:

port_namespace – the inputs PortNamespace for which to construct the builder

__setattr__(attr: str, value: Any) None#

Assign the given value to the port with key attr.

Note

Any attributes without a leading underscore being set correspond to inputs and should hence be validated with respect to the corresponding input port from the process spec

__repr__()#
__dir__()#
__iter__()#
__len__()#
__getitem__(item)#
__setitem__(item, value)#
__delitem__(item)#
__delattr__(item)#
_recursive_merge(dictionary, key, value)#

Recursively merge the contents of dictionary setting its key to value.

_merge(*args, **kwds)#

Merge the content of a dictionary or keyword arguments in .

Note

This method differs in behavior from _update in that _merge will recursively update the existing dictionary with the one that is specified in the arguments. The _update method will merge only the keys on the top level, but any lower lying nested namespace will be replaced entirely.

The method is prefixed with an underscore in order to not reserve the name for a potential port.

Parameters:
  • args – a single mapping that should be mapped on the namespace.

  • kwds – keyword value pairs that should be mapped onto the ports.

_update(*args, **kwds)#

Update the values of the builder namespace passing a mapping as argument or individual keyword value pairs.

The method functions just as collections.abc.MutableMapping.update and is merely prefixed with an underscore in order to not reserve the name for a potential port.

Parameters:
  • args – a single mapping that should be mapped on the namespace.

  • kwds – keyword value pairs that should be mapped onto the ports.

_inputs(prune: bool = False) dict#

Return the entire mapping of inputs specified for this builder.

Parameters:

prune – boolean, when True, will prune nested namespaces that contain no actual values whatsoever

Returns:

mapping of inputs ports and their input values.

_prune(value)#

Prune a nested mapping from all mappings that are completely empty.

Note

a nested mapping that is completely empty means it contains at most other empty mappings. Other null values, such as None or empty lists, should not be pruned.

Parameters:

value – a nested mapping of port values

Returns:

the same mapping but without any nested namespace that is completely empty.

class aiida.engine.ProcessFuture(pk: int, loop: Optional[asyncio.AbstractEventLoop] = None, poll_interval: Union[None, int, float] = None, communicator: Optional[kiwipy.Communicator] = None)#

Bases: asyncio.Future

Future that waits for a process to complete using both polling and listening for broadcast events if possible.

Initialization

Construct a future for a process node being finished.

If a None poll_interval is supplied polling will not be used. If a communicator is supplied it will be used to listen for broadcast messages.

Parameters:
  • pk – process pk

  • loop – An event loop

  • poll_interval – optional polling interval, if None, polling is not activated.

  • communicator – optional communicator, if None, will not subscribe to broadcasts.

_filtered = None#
cleanup() None#

Clean up the future by removing broadcast subscribers from the communicator if it still exists.

async _poll_process(node: aiida.orm.Node, poll_interval: Union[int, float]) None#

Poll whether the process node has reached a terminal state.

class aiida.engine.ProcessHandlerReport#

Bases: typing.NamedTuple

A namedtuple to define a process handler report for a aiida.engine.BaseRestartWorkChain.

This namedtuple should be returned by a process handler of a work chain instance if the condition of the handler was met by the completed process. If no further handling should be performed after this method the do_break field should be set to True. If the handler encountered a fatal error and the work chain needs to be terminated, an ExitCode with non-zero exit status can be set. This exit code is what will be set on the work chain itself. This works because the value of the exit_code field returned by the handler, will in turn be returned by the inspect_process step and returning a non-zero exit code from any work chain step will instruct the engine to abort the work chain.

Parameters:
  • do_break – boolean, set to True if no further process handlers should be called, default is False

  • exit_code – an instance of the ExitCode tuple. If not explicitly set, the default ExitCode will be instantiated, which has status 0 meaning that the work chain step will be considered successful and the work chain will continue to the next step.

do_break: bool = False#
exit_code: aiida.engine.processes.exit_code.ExitCode = 'ExitCode(...)'#
class aiida.engine.ProcessSpec#

Bases: plumpy.process_spec.ProcessSpec

Default process spec for process classes defined in aiida-core.

This sub class defines custom classes for input ports and port namespaces. It also adds support for the definition of exit codes and retrieving them subsequently.

Initialization

METADATA_KEY: str = 'metadata'#
METADATA_OPTIONS_KEY: str = 'options'#
INPUT_PORT_TYPE = None#
PORT_NAMESPACE_TYPE = None#
property metadata_key: str#
property options_key: str#
property exit_codes: aiida.engine.processes.exit_code.ExitCodesNamespace#

Return the namespace of exit codes defined for this ProcessSpec

Returns:

ExitCodesNamespace of ExitCode named tuples

exit_code(status: int, label: str, message: str, invalidates_cache: bool = False) None#

Add an exit code to the ProcessSpec

Parameters:
  • status – the exit status integer

  • label – a label by which the exit code can be addressed

  • message – a more detailed description of the exit code

  • invalidates_cache – when set to True, a process exiting with this exit code will not be considered for caching

property ports: aiida.engine.processes.ports.PortNamespace#
property inputs: aiida.engine.processes.ports.PortNamespace#
property outputs: aiida.engine.processes.ports.PortNamespace#
class aiida.engine.Runner(poll_interval: Union[int, float] = 0, loop: Optional[asyncio.AbstractEventLoop] = None, communicator: Optional[kiwipy.Communicator] = None, rmq_submit: bool = False, persister: Optional[plumpy.persistence.Persister] = None)#

Class that can launch processes by running in the current interpreter or by submitting them to the daemon.

Initialization

Construct a new runner.

Parameters:
  • poll_interval – interval in seconds between polling for status of active sub processes

  • loop – an asyncio event loop, if none is suppled a new one will be created

  • communicator – the communicator to use

  • rmq_submit – if True, processes will be submitted to RabbitMQ, otherwise they will be scheduled here

  • persister – the persister to use to persist processes

_persister: Optional[plumpy.persistence.Persister] = None#
_communicator: Optional[kiwipy.Communicator] = None#
_controller: Optional[plumpy.process_comms.RemoteProcessThreadController] = None#
_closed: bool = False#
__enter__() aiida.engine.runners.Runner#
__exit__(exc_type, exc_val, exc_tb)#
property loop: asyncio.AbstractEventLoop#

Get the event loop of this runner.

property transport: aiida.engine.transports.TransportQueue#
property persister: Optional[plumpy.persistence.Persister]#

Get the persister used by this runner.

property communicator: Optional[kiwipy.Communicator]#

Get the communicator used by this runner.

property plugin_version_provider: aiida.plugins.utils.PluginVersionProvider#
property job_manager: aiida.engine.processes.calcjobs.manager.JobManager#
property controller: Optional[plumpy.process_comms.RemoteProcessThreadController]#

Get the controller used by this runner.

property is_daemon_runner: bool#

Return whether the runner is a daemon runner, which means it submits processes over RabbitMQ.

Returns:

True if the runner is a daemon runner

is_closed() bool#
start() None#

Start the internal event loop.

stop() None#

Stop the internal event loop.

run_until_complete(future: asyncio.Future) Any#

Run the loop until the future has finished and return the result.

close() None#

Close the runner by stopping the loop.

instantiate_process(process: aiida.engine.runners.TYPE_RUN_PROCESS, **inputs)#
submit(process: aiida.engine.runners.TYPE_SUBMIT_PROCESS, **inputs: Any)#

Submit the process with the supplied inputs to this runner immediately returning control to the interpreter. The return value will be the calculation node of the submitted process

Parameters:
  • process – the process class to submit

  • inputs – the inputs to be passed to the process

Returns:

the calculation node of the process

schedule(process: aiida.engine.runners.TYPE_SUBMIT_PROCESS, *args: Any, **inputs: Any) aiida.orm.ProcessNode#

Schedule a process to be executed by this runner

Parameters:
  • process – the process class to submit

  • inputs – the inputs to be passed to the process

Returns:

the calculation node of the process

_run(process: aiida.engine.runners.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Tuple[Dict[str, Any], aiida.orm.ProcessNode]#

Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and the calculation node

run(process: aiida.engine.runners.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Dict[str, Any]#

Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

the outputs of the process

run_get_node(process: aiida.engine.runners.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) aiida.engine.runners.ResultAndNode#

Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and the calculation node

run_get_pk(process: aiida.engine.runners.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) aiida.engine.runners.ResultAndPk#

Run the process with the supplied inputs in this runner that will block until the process is completed. The return value will be the results of the completed process

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and process node pk

call_on_process_finish(pk: int, callback: Callable[[], Any]) None#

Schedule a callback when the process of the given pk is terminated.

This method will add a broadcast subscriber that will listen for state changes of the target process to be terminated. As a fail-safe, a polling-mechanism is used to check the state of the process, should the broadcast message be missed by the subscriber, in order to prevent the caller to wait indefinitely.

Parameters:
  • pk – pk of the process

  • callback – function to be called upon process termination

get_process_future(pk: int) aiida.engine.processes.futures.ProcessFuture#

Return a future for a process.

The future will have the process node as the result when finished.

Returns:

A future representing the completion of the process node

_poll_process(node, callback)#

Check whether the process state of the node is terminated and call the callback or reschedule it.

Parameters:
  • node – the process node

  • callback – callback to be called when process is terminated

aiida.engine.ToContext = None#
class aiida.engine.WithNonDb(*args, **kwargs)#

A mixin that adds support to a port to flag that it should not be stored in the database using the non_db=True flag.

The mixins have to go before the main port class in the superclass order to make sure the mixin has the chance to strip out the non_db keyword.

Initialization

property non_db_explicitly_set: bool#

Return whether the a value for non_db was explicitly passed in the construction of the Port.

Returns:

boolean, True if non_db was explicitly defined during construction, False otherwise

property non_db: bool#

Return whether the value of this Port should be stored as a Node in the database.

Returns:

boolean, True if it should be storable as a Node, False otherwise

class aiida.engine.WithSerialize(*args, **kwargs)#

A mixin that adds support for a serialization function which is automatically applied on inputs that are not AiiDA data types.

Initialization

serialize(value: Any) aiida.orm.Data#

Serialize the given value, unless it is None, already a Data type, or no serializer function is defined.

Parameters:

value – the value to be serialized

Returns:

a serialized version of the value or the unchanged value

class aiida.engine.WorkChain(inputs: dict | None = None, logger: logging.Logger | None = None, runner: aiida.engine.runners.Runner | None = None, enable_persistence: bool = True)#

Bases: aiida.engine.processes.process.Process

The WorkChain class is the principle component to implement workflows in AiiDA.

Initialization

Construct a WorkChain instance.

Construct the instance only if it is a sub class of WorkChain, otherwise raise InvalidOperation.

Parameters:
  • inputs – work chain inputs

  • logger – aiida logger

  • runner – work chain runner

  • enable_persistence – whether to persist this work chain

_node_class = None#
_spec_class = None#
_STEPPER_STATE = 'stepper_state'#
_CONTEXT = 'CONTEXT'#
classmethod spec() aiida.engine.processes.workchains.workchain.WorkChainSpec#
property node: aiida.orm.WorkChainNode#
property ctx: aiida.common.extendeddicts.AttributeDict#

Get the context.

save_instance_state(out_state, save_context)#

Save instance state.

Parameters:
  • out_state – state to save in

  • save_context (plumpy.persistence.LoadSaveContext) –

load_instance_state(saved_state, load_context)#
on_run()#
_resolve_nested_context(key: str) tuple[aiida.common.extendeddicts.AttributeDict, str]#

Returns a reference to a sub-dictionary of the context and the last key, after resolving a potentially segmented key where required sub-dictionaries are created as needed.

Parameters:

key – A key into the context, where words before a dot are interpreted as a key for a sub-dictionary

_insert_awaitable(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable) None#

Insert an awaitable that should be terminated before before continuing to the next step.

Parameters:

awaitable – the thing to await

_resolve_awaitable(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable, value: Any) None#

Resolve an awaitable.

Precondition: must be an awaitable that was previously inserted.

Parameters:

awaitable – the awaitable to resolve

to_context(**kwargs: aiida.engine.processes.workchains.awaitable.Awaitable | aiida.orm.ProcessNode) None#

Add a dictionary of awaitables to the context.

This is a convenience method that provides syntactic sugar, for a user to add multiple intersteps that will assign a certain value to the corresponding key in the context of the work chain.

_update_process_status() None#

Set the process status with a message accounting the current sub processes that we are waiting for.

run() Any#
_do_step() Any#

Execute the next step in the outline and return the result.

If the stepper returns a non-finished status and the return value is of type ToContext, the contents of the ToContext container will be turned into awaitables if necessary. If any awaitables were created, the process will enter in the Wait state, otherwise it will go to Continue. When the stepper returns that it is done, the stepper result will be converted to None and returned, unless it is an integer or instance of ExitCode.

_store_nodes(data: Any) None#

Recurse through a data structure and store any unstored nodes that are found along the way

Parameters:

data – a data structure potentially containing unstored nodes

on_exiting() None#

Ensure that any unstored nodes in the context are stored, before the state is exited

After the state is exited the next state will be entered and if persistence is enabled, a checkpoint will be saved. If the context contains unstored nodes, the serialization necessary for checkpointing will fail.

on_wait(awaitables: Sequence[aiida.engine.processes.workchains.awaitable.Awaitable])#

Entering the WAITING state.

_action_awaitables() None#

Handle the awaitables that are currently registered with the work chain.

Depending on the class type of the awaitable’s target a different callback function will be bound with the awaitable and the runner will be asked to call it when the target is completed

_on_awaitable_finished(awaitable: aiida.engine.processes.workchains.awaitable.Awaitable) None#

Callback function, for when an awaitable process instance is completed.

The awaitable will be effectuated on the context of the work chain and removed from the internal list. If all awaitables have been dealt with, the work chain process is resumed.

Parameters:

awaitable – an Awaitable instance

aiida.engine.append_(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable#

Convenience function that will construct an Awaitable for a given class instance with the context action set to APPEND. When the awaitable target is completed it will be appended to a list in the context for a key that is to be defined later

Parameters:

target – an instance of a Process or Awaitable

Returns:

the awaitable

aiida.engine.assign_(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable#

Convenience function that will construct an Awaitable for a given class instance with the context action set to ASSIGN. When the awaitable target is completed it will be assigned to the context for a key that is to be defined later

Parameters:

target – an instance of a Process or Awaitable

Returns:

the awaitable

aiida.engine.calcfunction(function: aiida.engine.processes.functions.FunctionType) aiida.engine.processes.functions.FunctionType#

A decorator to turn a standard python function into a calcfunction. Example usage:

>>> from aiida.orm import Int
>>>
>>> # Define the calcfunction
>>> @calcfunction
>>> def sum(a, b):
>>>    return a + b
>>> # Run it with some input
>>> r = sum(Int(4), Int(5))
>>> print(r)
9
>>> r.base.links.get_incoming().all() 
[Neighbor(link_type='', link_label='result',
node=<CalcFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)]
>>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes()
[4, 5]
Parameters:

function – The function to decorate.

Returns:

The decorated function.

aiida.engine.construct_awaitable(target: Union[aiida.engine.processes.workchains.awaitable.Awaitable, aiida.orm.ProcessNode]) aiida.engine.processes.workchains.awaitable.Awaitable#

Construct an instance of the Awaitable class that will contain the information related to the action to be taken with respect to the context once the awaitable object is completed.

The awaitable is a simple dictionary with the following keys

  • pk: the pk of the node that is being waited on

  • action: the context action to be performed upon completion

  • outputs: a boolean that toggles whether the node itself

Currently the only awaitable classes are ProcessNode and Workflow The only awaitable actions are the Assign and Append operators

aiida.engine.get_object_loader() aiida.engine.persistence.ObjectLoader#

Return the global AiiDA object loader.

Returns:

The global object loader

aiida.engine.interruptable_task(coro: Callable[[aiida.engine.utils.InterruptableFuture], Awaitable[Any]], loop: Optional[asyncio.AbstractEventLoop] = None) aiida.engine.utils.InterruptableFuture#

Turn the given coroutine into an interruptable task by turning it into an InterruptableFuture and returning it.

Parameters:
  • coro – the coroutine that should be made interruptable with object of InterutableFuture as last paramenter

  • loop – the event loop in which to run the coroutine, by default uses asyncio.get_event_loop()

Returns:

an InterruptableFuture

aiida.engine.is_process_function(function: Any) bool#

Return whether the given function is a process function

Parameters:

function – a function

Returns:

True if the function is a wrapped process function, False otherwise

aiida.engine.process_handler(wrapped: Optional[types.FunctionType] = None, *, priority: int = 0, exit_codes: Union[None, aiida.engine.processes.exit_code.ExitCode, List[aiida.engine.processes.exit_code.ExitCode]] = None, enabled: bool = True) types.FunctionType#

Decorator to register a BaseRestartWorkChain instance method as a process handler.

The decorator will validate the priority and exit_codes optional keyword arguments and then add itself as an attribute to the wrapped instance method. This is used in the inspect_process to return all instance methods of the class that have been decorated by this function and therefore are considered to be process handlers.

Requirements on the function signature of process handling functions. The function to which the decorator is applied needs to take two arguments:

  • self: This is the instance of the work chain itself

  • node: This is the process node that finished and is to be investigated

The function body typically consists of a conditional that will check for a particular problem that might have occurred for the sub process. If a particular problem is handled, the process handler should return an instance of the aiida.engine.ProcessHandlerReport tuple. If no other process handlers should be considered, the set do_break attribute should be set to True. If the work chain is to be aborted entirely, the exit_code of the report can be set to an ExitCode instance with a non-zero status.

Parameters:
  • wrapped – the work chain method to register the process handler with

  • priority – optional integer that defines the order in which registered handlers will be called during the handling of a finished process. Higher priorities will be handled first. Default value is 0. Multiple handlers with the same priority is allowed, but the order of those is not well defined.

  • exit_codes – single or list of ExitCode instances. If defined, the handler will return None if the exit code set on the node does not appear in the exit_codes. This is useful to have a handler called only when the process failed with a specific exit code.

  • enabled – boolean, by default True, which will cause the handler to be called during inspect_process. When set to False, the handler will be skipped. This static value can be overridden on a per work chain instance basis through the input handler_overrides.

aiida.engine.run(process: aiida.engine.launch.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Dict[str, Any]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

Parameters:
  • process – the process class or process function to run

  • inputs – the inputs to be passed to the process

Returns:

the outputs of the process

aiida.engine.run_get_node(process: aiida.engine.launch.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Tuple[Dict[str, Any], aiida.orm.ProcessNode]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

Parameters:
  • process – the process class, instance, builder or function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and the process node

aiida.engine.run_get_pk(process: aiida.engine.launch.TYPE_RUN_PROCESS, *args: Any, **inputs: Any) Tuple[Dict[str, Any], int]#

Run the process with the supplied inputs in a local runner that will block until the process is completed.

Parameters:
  • process – the process class, instance, builder or function to run

  • inputs – the inputs to be passed to the process

Returns:

tuple of the outputs of the process and process node pk

aiida.engine.submit(process: aiida.engine.launch.TYPE_SUBMIT_PROCESS, **inputs: Any) aiida.orm.ProcessNode#

Submit the process with the supplied inputs to the daemon immediately returning control to the interpreter.

Parameters:
  • process – the process class, instance or builder to submit

  • inputs – the inputs to be passed to the process

Returns:

the calculation node of the process

aiida.engine.workfunction(function: aiida.engine.processes.functions.FunctionType) aiida.engine.processes.functions.FunctionType#

A decorator to turn a standard python function into a workfunction. Example usage:

>>> from aiida.orm import Int
>>>
>>> # Define the workfunction
>>> @workfunction
>>> def select(a, b):
>>>    return a
>>> # Run it with some input
>>> r = select(Int(4), Int(5))
>>> print(r)
4
>>> r.base.links.get_incoming().all() 
[Neighbor(link_type='', link_label='result',
node=<WorkFunctionNode: uuid: ce0c63b3-1c84-4bb8-ba64-7b70a36adf34 (pk: 3567)>)]
>>> r.base.links.get_incoming().get_node_by_label('result').base.links.get_incoming().all_nodes()
[4, 5]
Parameters:

function – The function to decorate.

Returns:

The decorated function.