Skip to content

Commit d848525

Browse files
committed
Add Runner layer
1 parent 22f7ce5 commit d848525

File tree

2 files changed

+155
-2
lines changed

2 files changed

+155
-2
lines changed

index.bs

Lines changed: 154 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,11 @@ It then executes the resulting command to spawn the runner process.
454454
Each runner is expected to connect back with the orchestrator with the `connect` method, setting up a bidirectional stream of messages, dubbed `normal stream`.
455455
When a message is sent, without identifying how, the message is sent using this stream.
456456

457-
The orchestrator MUST track all active runners, specifically recording which ones have sent an `RPC.identify` message after startup. This message confirms that the runner is ready to accept processor assignments.
457+
The orchestrator MUST track all active runners, specifically recording which ones have sent an `RPC.identify` message after startup.
458+
This message confirms that the runner is ready to accept processor assignments.
459+
The orchestrator responds to the runner with a `RPC.pipeline` message, containing the full expanded pipeline as Turtle.
460+
This pipeline configuration is useful when a runner wants to extract processor arguments themselves, instead of using the provided JSON-LD.
461+
458462

459463
Once all expected runners have successfully identified themselves, the orchestrator proceeds to the next step in the pipeline initialization sequence.
460464

@@ -593,10 +597,158 @@ path: ./streamMessage.mdd
593597

594598
## Runner
595599

600+
A runner in RDF-Connect is responsible for managing and executing processors within a specific execution context—typically a programming language runtime.
601+
Each runner MUST connect with the orchestrator's Protobuf server and follow the RDF-Connect protocol.
602+
603+
While minimal runners can be implemented with little overhead, they often become enriched with quality-of-life features to better support developers and processors operating in that language.
604+
605+
These quality-of-life features include:
606+
607+
* Wrapping readers and writer in idiomatic objects
608+
* Runners SHOULD also make it possible to let processors start up before acknowledging to the orchestrator that the processor is initialized.
609+
610+
This is useful for processor to execute longer running operations, like reading a file or consulting an external API.
611+
612+
Runners MAY also coalesce or transform message types to simplify processor implementation.
613+
A streaming message may be aggregated into a single message if the underlying platform supports arbitrarily large strings or buffers,
614+
and a single message may be exposed to the processor as a streaming interface, emitting a single chunk.
615+
This flexibility allows generic processors to be implemented more easily without needing to distinguish between streaming and single-message protocols.
616+
617+
<div class=example>
618+
Coalescing messages enables simpler processor logic.
619+
For example, a processor that performs string replacement on incoming messages may otherwise need to implement both streaming and non-streaming handling.
620+
621+
Instead, the runner can convert incoming messages to a streaming form with one chunk, or aggregate streaming chunks into a single message.
622+
623+
It is advised that those processors, before forwarding the message, look at the length of each message before determining whether or not this message should be a single message or a streaming message.
624+
</div>
625+
626+
The following sections detail the runner startup flow and describe the expected interactions between runners and the orchestrator during initialization and execution.
627+
628+
629+
### Pipeline Configuration
630+
631+
A runner is defined with an instance of `rdfc:Runner`, which currently is instantiated by a command.
632+
It requires the command to actually start the runner with `rdfc:command`, and a link to the programming context (`rdfc:handlesSubjectsOf`).
633+
The object of `rdfc:handlesSubjectsOf`, links runners and processors to a context term. This often refers to the programming language.
634+
635+
Each context term is related to a SHACL shape, this specifies the incoming data that the runner can use to start the processors.
636+
637+
<div class=example>
638+
For example, the NodeRunner, a runner that starts JavaScript processors with node, is defined as follows.
639+
640+
```turtle
641+
rdfc:NodeRunner a rdfc:Runner;
642+
rdfc:handlesSubjectsOf rdfc:jsImplementationOf;
643+
rdfc:command "npx js-runner".
644+
645+
# Note that rdfc:jsImplementationOf is already defined by RDF-Connect as follows
646+
sds:implementationOf rdfs:subPropertyOf rdfs:subClassOf.
647+
rdfc:jsImplementationOf rdfs:subPropertyOf sds:implementationOf.
648+
649+
# Shape that a Js Processor should fulfil;
650+
[ ] a sh:NodeShape;
651+
# We target it with jsImplementationOf
652+
sh:targetSubjectsOf rdfc:jsImplementationOf;
653+
sh:property [
654+
sh:path rdfc:file;
655+
sh:name "file";
656+
sh:minCount 1;
657+
sh:maxCount 1;
658+
sh:datatype xsd:string;
659+
], [
660+
sh:path rdfc:class;
661+
sh:name "clazz";
662+
sh:maxCount 1;
663+
sh:datatype xsd:string;
664+
].
665+
```
666+
667+
This way, `rdfc:jsImplementationOf` is a predicate declared only for JavaScript processors.
668+
And a shape is linked with that predicate, runners can expect a file location and a class name to start the JavaScript processors.
669+
</div>
670+
671+
672+
### Connecting Flow
673+
674+
Each runner is started by the orchestrator using a command defined in the pipeline via `rdfc:command`.
675+
The orchestrator appends two arguments to this command: the URL of the orchestrator’s Protobuf server and the IRI that uniquely identifies the runner.
676+
677+
Upon startup, the runner MUST connect to the orchestrator using the provided URL via the `RPC.connect` method.
678+
This establishes a bidirectional message stream referred to as the normal stream.
679+
680+
Once connected, the runner MUST send an `RPC.identify` message, identifying itself with the provided IRI.
681+
The orchestrator then sends an `RPC.pipeline` message containing the complete, expanded pipeline definition.
682+
The runner MAY ignore this message.
683+
684+
Runners MAY initiate a separate log stream using the `RPC.logStream` method to stream log messages back to the orchestrator.
685+
596686
<div class=issue>
597-
🚧 This section is a work in progress and will be expanded soon.
687+
🚧 More information about the log stream is coming soon.
598688
</div>
599689

690+
### Starting Processors
691+
692+
After initialization, the orchestrator may send multiple `RPC.proc` messages to instruct the runner to start specific processors.
693+
Each message includes a processor IRI, a configuration object and an argument object.
694+
Both the configuration and arguments are provided as JSON-LD strings.
695+
The configuration object contains the arguments as defined by the context term following the section [Mapping SHACL to Configuration Structures](#mapping-shacl-to-configuration-structures).
696+
The arguments are constructed based on a SHACL shape defined for the processor type
697+
698+
Runners are MAY to parse these JSON-LD payloads and transform known constructs into idiomatic equivalents.
699+
For example reader and writer objects are represented as JSON-LD values with: an @id field (containing the channel IRI), and an @type field indicating either `https://w3id.org/rdf-connect/ontology#Reader` or `https://w3id.org/rdf-connect/ontology#Writer`.
700+
Runners are RECOMMENDED to replace these values with appropriate typed objects in the target environment.
701+
702+
When a processor has been fully initialized, the runner MUST send an `RPC.init` message, indicating success or failure.
703+
If any runner signals an error during initialization, the orchestrator MUST abort the pipeline execution.
704+
705+
Once all processors are successfully initialized, the orchestrator sends an `RPC.start` message, instructing the runner to start the processors.
706+
707+
After all processors complete their execution, the runner SHOULD gracefully close the normal stream to signal completion.
708+
709+
710+
### Handling messages
711+
712+
Apart from starting processors, the runner also acts as a mediator that makes sure the correct messages are sent to the correct processors.
713+
The orchestrator MAY send any number of `RPC.msg` or `RPC.streamMsg` messages.
714+
715+
#### Receiving normal messages
716+
717+
When an `RPC.msg` is received, the runner MUST deliver the message to the appropriate processor, using the channel IRI to determine the correct target.
718+
Message routing can follow either a push or pull model depending on the language environment.
719+
720+
Runners MAY coerce or transform messages into a different representation, including converting a normal message into a streaming message with a single chunk or converting a short streaming message into a normal message.
721+
These transformations SHOULD respect the preferences of the processor and the runner's internal design constraints.
722+
723+
#### Receiving streaming messages
724+
725+
When an `RPC.streamMsg` is received, the runner MUST establish a streaming channel by invoking the `RPC.receiveStreamMessage` method with the provided stream ID.
726+
This initiates a stream of chunks from the orchestrator.
727+
728+
The runner MUST forward these chunks to the appropriate processor, using any internal buffering or transformation logic as required.
729+
730+
If the runner determines that the message size is small enough, it MAY convert a streaming message into a single message object, provided that this does not violate processor expectations or exceed system limits.
731+
732+
#### Sending messages
733+
734+
Runners must also support outbound communication from processors.
735+
While runners MAY omit support for certain advanced features (such as streaming output), a full implementation is strongly encouraged.
736+
737+
To send a normal message, the runner uses the `RPC.msg` method on the normal stream.
738+
739+
To send a streaming message, the runner first initiates the `RPC.sendStreamMessage` method, which returns a new stream.
740+
The orchestrator responds with a single message, the stream ID.
741+
742+
The runner then sends an `RPC.streamMsg` on the normal stream, including the channel IRI and stream ID.
743+
Finally, the runner streams the message content using the created stream.
744+
The message is considered complete when the runner closes the stream.
745+
746+
#### Channel Closure
747+
748+
Processors may indicate that a given channel is closed (i.e., no further messages will be sent).
749+
The runner MUST propagate this information to the orchestrator via an RPC.close message.
750+
751+
Similarly, when the orchestrator sends an RPC.close message for a channel, the runner MAY respond by closing or invalidating the corresponding data stream in the processor.
600752

601753
## Processor
602754

startup.mdd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ sequenceDiagram
1010
Note over R: Runner is started with cli
1111
O-->>R: Startup with address and uri
1212
R-->>O: RPC.Identify: with uri
13+
O-->>R: RPC.Pipeline: with expanded pipeline
1314
end
1415
loop For every processor
1516
O-->>R: RPC.Processor: Start processor

0 commit comments

Comments
 (0)