You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
For each runner, the orchestrator appends two arguments to the configured command: the URL of the orchestrator's running Protobuf server and the IRI identifying the runner instance.
452
452
It then executes the resulting command to spawn the runner process.
453
453
454
-
The orchestrator MUST track all active runners, specifically recording which ones have sent an RPC.identify message after startup. This message confirms that the runner is ready to accept processor assignments.
454
+
Each runner is expected to connect back with the orchestrator with the `connect` method, setting up a bidirectional stream of messages, dubbed `normal stream`.
455
+
When a message is sent, without identifying how, the message is sent using this stream.
456
+
457
+
The orchestrator MUST track all active runners, specifically recording which ones have sent an `RPC.identify` message after startup. This message confirms that the runner is ready to accept processor assignments.
455
458
456
459
Once all expected runners have successfully identified themselves, the orchestrator proceeds to the next step in the pipeline initialization sequence.
457
460
458
461
#### Starting processors
459
462
463
+
Once all runners have been successfully identified via Rpc.identify, the orchestrator proceeds to initialize the processors defined in the pipeline.
464
+
This involves the extraction and transformation of processor configuration data into a format suitable for consumption by the associated runner.
460
465
461
-
<div class=issue>
462
-
🚧 This section is a work in progress and will be expanded soon.
466
+
**Processor Arguments**
467
+
468
+
Processor arguments are encoded as JSON-LD objects, providing a structured representation of RDF configuration data.
469
+
JSON-LD fits the requirements as it is selected for the following reasons, and it allows encoding of typed literals and nested structures, in alignment with SHACL definitions.
470
+
This while still enabling extensibility, supporting advanced use cases such as capturing full SHACL paths or preserving provenance metadata.
471
+
472
+
Support for JSON-LD is optional for runners.
473
+
Runners MAY choose to treat the JSON-LD as plain JSON if they do not require the semantic context or graph-aware features.
474
+
However, all runners MUST accept the structure produced by the orchestrator.
475
+
476
+
Processor arguments come from the SHACL shape defined for the processor type.
477
+
Each field is mapped following section [Mapping SHACL to Configuration Structures](#mapping-shacl-to-configuration-structures).
478
+
A JSON-LD `@context` is generated mapping all `sh:name` values to the corresponding IRIs from `sh:path`.
479
+
If the processor instance has a known RDF identifier or `rdf:type`, these are added to the JSON-LD using `@id` and `@type`.
480
+
481
+
<div class="example" title="Example to extract JSON-LD from data and a SHACL shape">
482
+
Let's take this shape, note that the shape also includes a `rdfc:Reader`.
483
+
```turtle
484
+
@prefix : <http://example.org/>.
485
+
[] a sh:NodeShape;
486
+
sh:targetClass <FooBar>;
487
+
sh:property [
488
+
sh:name "reader";
489
+
sh:property :reader;
490
+
sh:class rdfc:Reader;
491
+
sh:maxCount 1;
492
+
], [
493
+
sh:name "count";
494
+
sh:property :count;
495
+
sh:datatype xsd:number;
496
+
sh:maxCount 1;
497
+
].
498
+
499
+
<SomeId> a <FooBar>;
500
+
:reader <ReaderId>;
501
+
:count 42.
502
+
```
503
+
The following JSON-LD structure is built. Which aligns with section [Mapping SHACL to Configuration Structures](#mapping-shacl-to-configuration-structures).
In addition to extracting processor instance arguments, the orchestrator MUST also extract the processor definition configuration. This definition provides implementation-specific parameters, typically required to launch the processor in a specific runtime (e.g., JavaScript entrypoints, file paths, class names, etc.).
525
+
526
+
Processor definitions are extracted using the same SHACL-based mechanism described previously. The shape used for this extraction is associated with the programming language or runtime type and MUST be processed in the same way to produce a structured JSON-LD object.
527
+
528
+
**RPC message**
529
+
530
+
Once both the arguments and definition have been extracted for a processor instance, the orchestrator sends an `Rpc.proc` message to the appropriate runner, initiating the processor launch process.
531
+
532
+
The orchestrator MUST keep an internal record of all processor instance that have been dispatched to a runner, and the runner’s acknowledgment that a processor was successfully launched, as indicated by an incoming `Rpc.init` message.
533
+
534
+
No processor may be assumed to be operational until its runner has responded with `Rpc.init`.
535
+
When all processors are successfully initialized, the orchestrator can start the pipeline.
536
+
537
+
538
+
#### Starting the pipeline
539
+
540
+
The orchestrator can start the pipeline by sending a `RPC.start` message to each runner.
465
541
466
-
The following diagram describes the startup sequence handled by the orchestrator. This includes validating pipeline structure, instantiating runners, and initializing processor instances.
542
+
The full startup flow is shown in this diagram.
467
543
468
544
<pre class=include>
469
545
path: ./startup.mdd
470
546
</pre>
471
547
472
-
### Message Handling Flow
473
548
474
-
Once processors are running, the orchestrator handles incoming messages and forwards them to the appropriate reader instances, based on their declared channels.
549
+
### Handling messages
550
+
551
+
The orchestrator acts as a message broker between processors.
552
+
It is responsible for receiving messages from runners and forwarding them to the appropriate destination runners based on channel identifiers defined in the pipeline.
553
+
Importantly, channels support **many-to-many** communication: multiple processors may emit to or receive from the same channel.
554
+
555
+
556
+
#### Normal messages
557
+
558
+
When a runner sends a `Rpc.msg` message to the orchestrator, the message includes a channel IRI indicating its logical destination.
559
+
560
+
The orchestrator MUST:
561
+
1. Resolve the set of processors that are declared to consume this channel.
562
+
2. Determine which runner is responsible for each of those processors.
563
+
3. Forward the message to each relevant runner using `Rpc.msg`.
564
+
565
+
These messages are sent as discrete units and fit within the allowed message size.
475
566
476
567
<pre class=include>
477
568
path: ./message.mdd
478
569
</pre>
479
570
480
571
481
-
### Streaming Messages
572
+
#### Streaming messages
573
+
574
+
When the payload of a message is large, the streaming message protocol SHOULD be used.
575
+
This protocol enables large messages to be sent incrementally over a separate gRPC stream while maintaining channel-based routing.
576
+
577
+
The process is as follows:
578
+
1. **Sender** (runner) initiates a `sendStreamMessage` gRPC stream to the orchestrator.
579
+
2. The **orchestrator** generates a unique stream identifier and sends it back on this stream.
580
+
3. The **sender** then sends a `Rpc.streamMsg` over the normal bidirectional RPC stream, including the stream identifier and the channel IRI.
581
+
4. The **orchestrator** resolves which processors receive messages on the given channel and forwards the stream identifier to their corresponding runners with `Rpc.streamMsg`.
582
+
5. **Receiving runners** connect to the orchestrator using `receiveStreamMessage`, passing the received stream identifier.
583
+
6. Once all participants are connected, the orchestrator acts as a relay: all incoming chunks from the sending stream are forwarded to each connected receiving stream.
584
+
585
+
The orchestrator MUST close all associated receiving streams once the sending stream completes.
482
586
483
-
For large messages or real-time input, RDF-Connect supports a streaming model.
484
-
Instead of sending entire payloads as a single message, the message can be broken into chunks and sends them over time.
485
-
This is handled by the StreamChunk message type.
587
+
This mechanism ensures that high-volume or large data payloads can be distributed across the pipeline efficiently and reliably.
0 commit comments