Skip to content

Event system

Alan B. Christie edited this page Aug 13, 2025 · 6 revisions

The workflow engine is distributed via PyPI and imported into the Data Manager, which relies on importing the WorkflowEngine class: -

from workflow.workflow_engine import WorkflowEngine

The Data Manger is responsible for providing access to an API adapter the engine can call to access the Data Manager database and an InstanceLauncher that will launch Step Jobs (Instances). The Data Manager provides these objects when instantiating the engine, exposed here as the variables _DataManagerWorkflowAPIAdapter, and _DataManagerInstanceLauncher: -

_WorkflowEngine: WorkflowEngine = WorkflowEngine(
    wapi_adapter=_DataManagerWorkflowAPIAdapter,
    instance_launcher=_DataManagerInstanceLauncher,
)

Once initialised the engine is required to respond to Events (messages) provided by the Data Manager as it calls the engine's handle_message(). This method must be designed not to block, and is required to complete with minimal delay.

Events

Events sent to the workflow engine are Protocol Buffer objects (proto 3). For details of the protocol buffer framework refer to the ProtoDev documentation. The protocol buffers used by the Data Manager (and Account Server) can be found in our squonk2-protobuf repository, which is distributed as a package to PyPI.

Only two protocol buffer message types are sent to the engine from the Data Manager: -

  • datamanager.WorkflowMessage
  • datamanager.PodMessage

WorkflowMessage

This message is sent by the Data Manager in response to an API call to either start or stop a Workflow. The Data Manager is responsible for creating the RunningWorkflow database record before sending the message. The content of a typical start message, carrying the UUID of the running workflow to start will look like this:

action = "START"
running_workflow = "r-workflow-00000000-0000-0000-0000-000000000000"

A stop message's action will be "STOP"

PodMessage

This message is sent by the Data Manager when an Instance Pod, one that is part of a Workflow Step, completes, successfully or otherwise. Salient parts of the protocol buffer message are:

has_exit_code = (True if there is an exit code, ignore the message when False)
exit_code = (0 on success, any other value indicates failure)
instance = (UUID of the Data Manager Step/Job Instance)

Using the instance property (a UUID string identifying the Step's Instance) the engine can retrieve the database Instance object, which will contain a reference to the RunningWorkflowStep record. Using this, the engine should be able to advance the workflow, starting the next step or terminating the running workflow.

Clone this wiki locally