-
Notifications
You must be signed in to change notification settings - Fork 0
Event system
The workflow engine is distributed via PyPI and imported into the DM, which relies on importing the WorkflowEngine
class: -
from workflow.workflow_engine import WorkflowEngine
The DM is responsible for providing access to an API adapter the engine can call to access the DM database and an InstanceLauncher that will launch Step Jobs (Instances). The DM provides these objects when instantiating the engine, exposed here as the variables _DataManagerWorkflowAPIAdapter
, and _DataManagerInstanceLauncher
: -
_WorkflowEngine: WorkflowEngine = WorkflowEngine(
wapi_adapter=_DataManagerWorkflowAPIAdapter,
instance_launcher=_DataManagerInstanceLauncher,
)
Once initialised the engine is required to respond to Events (messages) provided by the DM as it calls the engine's handle_message()
. This method must be designed not to block, and is required to complete with minimal delay.
Events sent to the workflow engine are Protocol Buffer objects (proto 3). For details of the protocol buffer framework refer to the ProtoDev documentation. The protocol buffers used by the DM (and AS) can be found in our squonk2-protobuf repository, which is distributed as a package to PyPI.
Only two protocol buffer message types are sent to the engine from the DM: -
datamanager.WorkflowMessage
datamanager.PodMessage
This message is sent by the Data Manager in response to an API call to either start or stop a Workflow. The DM is responsible for creating the RunningWorkflow database record before sending the message. The content of a typical start message, carrying the UUID of the running workflow to start will look like this:
action = "START" running_workflow = "r-workflow-00000000-0000-0000-0000-000000000000"
A stop message's action will be "STOP"
This message is sent by the DM when an Instance Pod, one that is part of a Workflow Step, completes, successfully or otherwise. Salient parts of the protocol buffer message are:
has_exit_code = (True if there is an exit code, ignore the message when False) exit_code = (0 on success, any other value indicates failure) instance = (UUID of the Data Manager Step/Job Instance)
Using the instance
property (a UUID string identifying the Step's Instance) the engine can retrieve the database Instance object, which will contain a reference to the RunningWorkflowStep record. Using this, the engine should be able to advance the workflow, starting the next step or terminating the running workflow.
The following simplified message sequence diagram (discussed below) illustrates the engine's basic operation. For clarity some components are not shown. These include the RabbitMQ messaging service, Job operator, and Kubernetes event system. Don't worry if you do not understand the role of these services - they're not needed in order to describe the engine's basic operation.

A user uses the DM UI or its public REST API to "run" a workflow. After checking the legitimacy of the request the API creates a new RunningWorkflow
record and then initiates the workflow by dispatching a WorkflowMessage START
message containing the record's identity. The API then returns a CREATED
API response to the caller, giving the user the running workflow record ID.
The message is received by the DM Protocol Buffer Consumer (PBC) - a Pod that handles all messages and then passes the message to an instance of the WorkflowEngine class via its handle_message()
method.
All messages sent to the workflow engine come from the PBC.
The Workflow engine responds to this START
message by using the Workflow API object to obtain the running workflow and workflow records. The engine's role during a START
is to locate the first step, satisfy its variables and call the LaunchInstance's launch()
method.
The instance launcher is responsible for creating a RunningWorkflowStep
record, used to track the state of the Step about to run. Additionally Task
and Instance
records are created, and used by any service that needs to, to track the job instance that represents the Step.
The launcher finishes by creating a CultivateMessage that eventually results in the instance Pod being created from the context of the DM Celery Task Worker (CTW) Pod.
The CTW is used to run things in the background or run long-running operations.
After the step's instance has finished an internal Kubernetes event is picked-up by the DM Kubernetes Event Watcher (KEW) Pod and translated into a PodMessage, which is then processed by the PBC.
The PBC responds to this (which is a message known to represent a Step's conclusion) and, as before, passes it to the workflow engine's handle_msg()
function.
The engine responds to the PodMessage by loading the corresponding step, running workflow, and workflow records to determine the next course of action. This may be the launch of another step or the end of the workflow. In either case the engine updates the step and running workflow records so that the state of the running workflow can be inspected in real-time via the DM public API.