-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.bs
More file actions
758 lines (537 loc) · 35.4 KB
/
index.bs
File metadata and controls
758 lines (537 loc) · 35.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
<pre class='metadata'>
Title: RDF-Connect Specification
Shortname: rdfc
Markup Shorthands: markdown yes
Level: 1
Status: LS
Editor: RDF-Connect Team, https://example.org
Repository: https://github.com/your-org/rdf-connect
Abstract: Some abstract
</pre>
<link rel=stylesheet href="https://cdn.jsdelivr.net/npm/mermaid@11/dist/mermaid.min.css">
# Introduction
RDF-Connect is a modular framework for building and executing multilingual data processing pipelines using RDF as the configuration and orchestration layer.
It enables fine-grained, reusable processor components that exchange streaming data, allowing workflows to be described declaratively across programming languages and environments.
RDF-Connect is especially well suited for data transformation, integration, and linked data publication.
<pre id="overview" class="mermaid">
</pre>
# Usage Paths
<div class=note>
This section is complete in terms of content, but may be reorganized or rewritten for clarity during editorial review.
</div>
Depending on your use case, you may only need a subset of this specification:
* **Pipeline Authors**: Read the [site](https://rdf-connect.github.io/) or [[#pipeline]] for a more in depth explanation.
* **Processor Developers**: Read [[#processor]] and [[#runner]].
* **Platform Maintainers**: Read all sections, including implementation notes.
# Design Goals
<div class=note>
This section is complete in terms of content, but may be reorganized or rewritten for clarity during editorial review.
</div>
RDF-Connect is designed to be **language-agnostic**, enabling seamless integration of processor components written in diverse programming languages such as JavaScript, Python, and shell scripts.
This flexibility allows developers to leverage existing tools and libraries within their language of choice, avoiding the constraints of monolithic, single-language frameworks.
A key architectural principle of RDF-Connect is that it operates in a **streaming-by-default** mode.
Data flows between processors via a central orchestrator, supporting real-time and large-scale data processing scenarios.
This streaming model ensures efficient memory usage and enables continuous transformation and publication of data as it is ingested.
The configuration of pipelines, processors, inputs, and outputs is expressed semantically using RDF.
This use of **semantic configuration** promotes clarity, extensibility, and interoperability by describing the structure and behavior of the system in a machine-readable, standards-based format.
Processor components in RDF-Connect are designed for **reusability**.
A processor defined once can be reused across multiple pipelines and in varying contexts, reducing duplication and encouraging modular design practices.
This modularity fosters rapid prototyping and easier maintenance of complex workflows.
**Transparency** is a fundamental goal of the framework.
By leveraging RDF vocabularies to describe pipeline components and their interactions, RDF-Connect makes it easy to inspect, document, and reason about the behavior and structure of a pipeline, both during development and after deployment.
Finally, RDF-Connect provides native support for **provenance** tracking using the PROV-O ontology.
Each step in the pipeline can be traced to its source, enabling detailed **lineage** tracking.
This capability is especially critical in applications involving data quality, reproducibility, and compliance.
# Concepts
This section introduces the core concepts underpinning the RDF-Connect framework.
These concepts collectively define the modular, streaming, and language-agnostic architecture that enables RDF-Connect to support sophisticated, provenance-aware data pipelines.
## Pipeline
At the heart of RDF-Connect is the notion of a *pipeline*, which defines a structured sequence of processing steps.
Each step corresponds to a *processor*, configured with parameters and paired with an appropriate runner that governs its execution.
Pipelines specify the flow of data between processors using *readers* and *writers*, forming a streaming architecture in which data is continuously passed along and transformed.
The pipeline configuration itself is expressed in RDF, making it semantically explicit and machine-interpretable.
## Processor
A *processor* is a modular, reusable software component that performs a discrete data processing task.
In a typical scenario, a *processor* receives input data through a *reader*, transforms it according to its logic, and emits the result via a *writer*.
However, processors are also flexible enough to support single-directional tasks, such as those that only produce or only consume data.
Crucially, processors are implementation-agnostic — they can be written in any programming language and integrated into pipelines via language-specific runners.
This makes processors the building blocks of RDF-Connect’s cross-language interoperability.
## Runner
A *runner* is responsible for executing a *processor* on behalf of the *orchestrator*.
Each *runner* targets a specific language or execution environment, enabling processors written in different languages to participate seamlessly in a *pipeline*.
For example, a JavaScript processor would be executed using a `NodeRunner`, which knows how to initialize and manage JavaScript-based components.
In this sense, a *runner* serves as an execution strategy, abstracting away the platform-specific details of launching and interacting with a *processor*.
## Orchestrator
The *orchestrator* is the central component that interprets and runs RDF-Connect pipelines.
It parses the RDF-based configuration, initializes and assigns runners, instantiates processors, and manages the data flow between components.
Acting as the conductor of the system, the *orchestrator* ensures that *processors* execute in the correct order and that data is passed efficiently and correctly through the *pipeline*.
Its role is fundamental to realizing the streaming and semantic integration goals of RDF-Connect.
## Reader / Writer
*Readers* and *writers* provide the streaming interfaces that connect *processors* within a *pipeline*.
A writer streams data out from a processor, while a reader receives data into a processor.
Together, they define how data flows between pipeline steps in an idiomatic and composable way.
This separation of concerns allows for flexible data routing and makes it easy to compose and recombine processors across different pipeline configurations.
# SHACL as Configuration Schema
RDF-Connect uses SHACL [[shacl]] not only as a data validation mechanism but also as a schema language for defining the configuration interface of components such as processors and runners.
These SHACL shapes enable:
* Static validation of component descriptions.
* Programmatic extraction of configuration contracts.
* Type-safe interpretation in environments like JavaScript/TypeScript.
Shapes define required and optional configuration properties, which are transformed into JSON objects at startup, according to the pipeline.
<div class="example" title="From SHACL to JSON configuration">
This SHACL shape definition defines a configuration structure for a processor.
In RDF-Connect, such shapes are used to describe required parameters.
They result in a well-typed JSON object that developers can rely on during implementation.
SHACL shape defining some required configuration for a processor
```turtle
[] a sh:NodeShape;
sh:targetClass <FooBar>;
sh:property [
sh:name "repeat"; # JSON field name
sh:datatype xsd:integer; # specify datatype
sh:maxCount 1;
sh:path :repeat;
], [
sh:name "messages";
sh:datatype xsd:string;
sh:path :msg;
].
```
Processor configuration
```turtle
<MyProcessor> a <FooBar>;
:repeat 10;
:msg "Hello", "World".
```
Results in the following JSON structure.
```typescript
{
"repeat": 10,
"messages": [ "Hello", "World"]
}
```
</div>
## Mapping SHACL to Configuration Structures
Each `sh:property` statement in a SHACL shape directly maps to a field in a configuration object, such as a JSON structure.
This enables semantically rich, machine-validated configurations for processors and pipelines within RDF-Connect.
### Required Fields
**sh:path**
This property indicates the RDF predicate that must be present on the target resource.
In the context of configuration mapping, `sh:path` is used to extract the corresponding value from the data graph.
It defines the link between the RDF representation and the configuration data being generated or interpreted.
**sh:name**
The `sh:name` property provides the external field name used in the resulting configuration object.
This allows decoupling the internal RDF predicate (as defined in sh:path) from how the value appears in the configuration file.
**sh:datatype** or **sh:class**
These properties define the expected type of the configuration field.
Use `sh:datatype` for primitive types such as `xsd:string`, `xsd:boolean`, or `xsd:anyURI`.
When the expected value is a nested resource, i.e., an object with its own structured fields, use `sh:class` instead.
This signals that the value is an embedded configuration object to be interpreted according to its own SHACL shape, enabling deeply nested configuration structures.
### Optional Fields
**sh:minCount**
This property defines the minimum number of values that must be provided.
If the actual number of values is below this threshold, a validation error will be raised.
A `sh:minCount` of 1 or higher indicates that the field is required in the configuration.
**sh:maxCount**
This property sets the maximum number of values allowed for a field.
It also determines how the field should be interpreted structurally.
If `sh:maxCount` is set to 1, the corresponding field is treated as a single value (`T`).
If it is greater than 1 or left unspecified, the field is interpreted as a list or array of values (`T[]`).
This behavior ensures that the structure of the configuration aligns with user expectations and downstream processing logic.
## Nested Shapes and Component Types
Configuration fields can reference structured values instead of primitive literals.
This is supported via `sh:class`, which indicates that the value must conform to another shape associated with a given RDF class.
For example:
```turtle
sh:property [
sh:path rdfc:input ;
sh:name "input" ;
sh:class rdfc:Reader ;
]
```
This defines a configuration field named `input` whose value is expected to be an instance of the class `rdfc:Reader`.
### Use of `sh:class` and `sh:targetClass`
The `sh:class` predicate is used within a property constraint to indicate that the value of a field must be an RDF resource belonging to a specific class.
This class must, in turn, have a shape associated with it that defines its expected structure.
To make this connection, `sh:targetClass` is used on a `sh:NodeShape`.
This associates the shape with a class so that tools can look up the shape definition when they encounter an RDF resource of that class.
In RDF-Connect, `sh:targetClass` allows reusable schemas to be defined for configuration components.
These can then be referenced in other shapes through `sh:class`, enabling configuration structures that are both modular and type-safe.
### Special Component Types: `rdfc:Reader` and `rdfc:Writer`
`rdfc:Reader` and `rdfc:Writer` are two special classes to represent input and output endpoints in RDF-Connect pipelines.
Fields that are declared with `sh:class rdfc:Reader` or `sh:class rdfc:Writer` are not simply nested configuration objects.
They serve as runtime injection points for streaming data.
At execution time, the runner is responsible for resolving these references into actual language-native abstractions.
Depending on the programming environment, these may be JavaScript streams, async iterators, or callback-based interfaces.
For example:
```turtle
sh:property [
sh:path rdfc:input ;
sh:name "input" ;
sh:class rdfc:Reader ;
sh:minCount 1 ;
]
```
This declares an `input` field that must be provided and will be automatically connected to an input stream by the runner.
This design decouples the declarative pipeline configuration from the underlying data transport logic, allowing developers to focus on processor behavior rather than infrastructure.
## Example: Putting it all together
The following SHACL definitions and RDF instance demonstrate how a FooBar processor might be configured to append text to each incoming message and send the result to an output channel.
```turtle
[ ] a sh:NodeShape;
sh:targetClass :Channels;
sh:property [
sh:path rdfc:input ;
sh:name "input" ;
sh:class rdfc:Reader ;
sh:minCount 1 ;
sh:maxCount 1 ;
], [
sh:path rdfc:output ;
sh:name "output" ;
sh:class rdfc:Writer ;
sh:minCount 1 ;
sh:maxCount 1 ;
].
[ ] a sh:NodeShape ;
sh:targetClass <FooBar2> ;
sh:property [
sh:path :channel ;
sh:name "channels" ;
sh:minCount 1 ;
sh:class :Channels ;
], [
sh:path :append ;
sh:name "append" ;
sh:minCount 1 ;
sh:maxCount 1 ;
sh:datatype xsd:string ;
].
```
```turtle
<foobar> a <FooBar2>;
:channel [
#`a :Channels` is not required, this is implicit from the definition
rdfc:input <channel1>;
rdfc:output <channel2>;
], [
rdfc:input <channel2>;
rdfc:output <channel1>;
];
:append " World!".
```
This results in the following JSON object:
```typescript
{
channels: [
{
"input": { /* idiomatic input stream for channel <channel1> */ } ,
"output": {/* idiomatic output stream for channel <channel2> */ } ,
}, {
"input": { /* idiomatic input stream for channel <channel2> */ } ,
"output": {/* idiomatic output stream for channel <channel1> */ } ,
}
],
append: " World!"
}
```
# RDF-Connect by Layer
Communication between the orchestrator and the runners happens using a strongly typed protocol defined in Protocol Buffers (protobuf).
This enables language-independent and efficient communication across processes and machines.
The protobuf server is the orchestrator, which is the central point.
The orchestrator starts all runners and notifies the runners of the different processors they should start.
The orchestrator is also the message post office, allowing messages to be sent to the correct runner which will relay those messages to the correct processor.
## Communication Protocol: Orchestrator ↔ Runner
RDF-Connect uses a bidirectional communication protocol based on Protocol Buffers (protobuf) for interaction between the **Orchestrator** and **Runners**.
The orchestrator manages execution, while runners host and execute individual processors.
### Messages Sent to Runners
The orchestrator can send the following messages to the runner:
* `RPC.pipeline`: Contains the complete pipeline configuration (in Turtle syntax).
* `RPC.proc`: Instructs the runner to register and setup a new processor.
* `RPC.start`: Signals that all added processors should begin execution.
* `RPC.close`: Instructs the runner to gracefully shut down and terminate all processors.
* `RPC.msg`: Delivers a message to a specific processor. Used for normal data transfer.
* `RPC.streamMsg`: Begins a streaming message transmission to a processor, typically for large or chunked payloads.
### Messages Sent from Runners
The runner can send the following messages to the orchestrator:
* `RPC.identify`: Indicates that the runner is ready and provides their unique identifier (IRI).
* `RPC.init`: Confirms that a previously registered **processor** has successfully started.
* `RPC.close`: Notifies that the runner is shutting down because all processors have stopped.
* `RPC.msg`: Sends a message from a processor to another processor via the orchestrator.
* `RPC.streamMsg`: Sends a streaming message from a processor. Used when the payload is too large for a single message.
## Orchestrator
The orchestrator is the central runtime entity in RDF-Connect.
It reads the pipeline configuration, sets up the runners, initiates processors, and routes messages between them.
It ensures the dataflow graph described by the pipeline is brought to life across isolated runtimes.
The orchestrator acts as a coordinator, not an executor. Each runner is responsible for running the actual processor code, but the orchestrator ensures the pipeline as a whole behaves as intended.
Responsibilities:
* Parse the pipeline RDF.
* Load SHACL shapes for processors and runners.
* Validate and coerce configuration to structured JSON.
* Instantiate runners.
* Start processors.
* Mediate messages (data and control).
* Handle retries, streaming, and backpressure.
<div class=note>
The remained of this section is intended for developers building custom runners or integrating RDF-Connect into infrastructure.
</div>
### Startup Flow
#### Understanding the pipeline file
The orchestrator begins execution from a single pipeline RDF file. This file **MUST** first be expanded by resolving all `owl:imports` statements recursively.
Once the full RDF graph is assembled, the orchestrator extracts the pipeline to execute by locating a `rdfc:Pipeline` instance whose **subject is the pipeline file itself**.
The pipeline is composed of one or more **runner–processor pairs**, defined via the `rdfc:consistsOf` property.
Each pair includes:
- A reference to a runner, using the `rdfc:instantiates` property.
- One or more processors, referenced with the `rdfc:processor` property.
<div class="example" title="Pipeline definition">
This example pipeline contains three processors divided over two runners.
The orchestrator starts this pipeline with two runners and provides them with the correct processor configurations.
```turtle
@prefix rdfc: <https://w3id.org/rdf-connect#>.
<> a rdfc:Pipeline;
rdfc:consistsOf [
rdfc:instantiates rdfc:NodeRunner;
rdfc:processor <sender>, <echo>;
], [
rdfc:instantiates rdfc:RustRunner;
rdfc:processor <log>;
].
```
</div>
#### Starting the runners
Each simple runner should specify two things:
- the language it supports, linked with `rdfc:handlesSubjectsOf`
- how the runner should be started, linked with `rdfc:command`
<aside class="note">
The property `rdfc:handlesSubjectsOf` plays a somewhat unconventional role. It allows RDF-Connect to remain aligned with [PROV-O](https://www.w3.org/TR/prov-o/).
A runner may link to a predicate such as `rdfc:jsImplementationOf`, and each JavaScript processor may then declare itself using:
```turtle
<FooBar> rdfc:jsImplementationOf rdfc:Processor.
```
Since `rdfc:jsImplementationOf` is a subproperty of `rdfs:subClassOf`, this implies that `<FooBar>` is a subclass of `rdfc:Processor`, which itself is a subclass of prov:Activity.
Therefore, any instance of `<FooBar>` can be inferred to be a prov:Activity.
```turtle
rdfc:jsImplementationOf rdfs:subPropertyOf rdfs:subClassOf.
rdfc:Processor rdfs:subClassOf prov:Activity.
```
This inference enables seamless integration with provenance-aware tooling.
</aside>
<aside class="note">
Currently, only **command-line-based runners** are supported and defined, but the model is intentionally extensible.
In the future, runners might be deployed remotely and communicate over a network. Such runners would not require a `rdfc:command`, but instead might define a connection endpoint (e.g., a URL or service descriptor).
</aside>
For each runner, the orchestrator appends two arguments to the configured command: the URL of the orchestrator's running Protobuf server and the IRI identifying the runner instance.
It then executes the resulting command to spawn the runner process.
Each runner is expected to connect back with the orchestrator with the `connect` method, setting up a bidirectional stream of messages, dubbed `normal stream`.
When a message is sent, without identifying how, the message is sent using this stream.
The orchestrator MUST track all active runners, specifically recording which ones have sent an `RPC.identify` message after startup.
This message confirms that the runner is ready to accept processor assignments.
The orchestrator responds to the runner with a `RPC.pipeline` message, containing the full expanded pipeline as Turtle.
This pipeline configuration is useful when a runner wants to extract processor arguments themselves, instead of using the provided JSON-LD.
Once all expected runners have successfully identified themselves, the orchestrator proceeds to the next step in the pipeline initialization sequence.
#### Starting processors
Once all runners have been successfully identified via `RPC.identify`, the orchestrator proceeds to initialize the processors defined in the pipeline.
This involves the extraction and transformation of processor configuration data into a format suitable for consumption by the associated runner.
**Processor Arguments**
Processor arguments are encoded as JSON-LD objects, providing a structured representation of RDF configuration data.
JSON-LD fits the requirements as it is selected for the following reasons, and it allows encoding of typed literals and nested structures, in alignment with SHACL definitions.
This while still enabling extensibility, supporting advanced use cases such as capturing full SHACL paths or preserving provenance metadata.
Support for JSON-LD is optional for runners.
Runners MAY choose to treat the JSON-LD as plain JSON if they do not require the semantic context or graph-aware features.
However, all runners MUST accept the structure produced by the orchestrator.
Processor arguments come from the SHACL shape defined for the processor type.
Each field is mapped following section [Mapping SHACL to Configuration Structures](#mapping-shacl-to-configuration-structures).
A JSON-LD `@context` is generated mapping all `sh:name` values to the corresponding IRIs from `sh:path`.
If the processor instance has a known RDF identifier or `rdf:type`, these are added to the JSON-LD using `@id` and `@type`.
<div class="example" title="Example to extract JSON-LD from data and a SHACL shape">
Let's take this shape, note that the shape also includes a `rdfc:Reader`.
```turtle
@prefix : <http://example.org/>.
[] a sh:NodeShape;
sh:targetClass <FooBar>;
sh:property [
sh:name "reader";
sh:property :reader;
sh:class rdfc:Reader;
sh:maxCount 1;
], [
sh:name "count";
sh:property :count;
sh:datatype xsd:number;
sh:maxCount 1;
].
<SomeId> a <FooBar>;
:reader <ReaderId>;
:count 42.
```
The following JSON-LD structure is built. Which aligns with section [Mapping SHACL to Configuration Structures](#mapping-shacl-to-configuration-structures).
```json
{
"@context": {
"reader": "http://example.org/reader",
"count": "http://example.org/count"
}
"@id": "SomeId",
"@type": "FooBar",
"reader": {
"@type": "https://w3id.org/rdf-connect#Reader",
"@id": "ReaderId"
},
"count": 42
}
```
</div>
**Processor Definition Extraction**
In addition to extracting processor instance arguments, the orchestrator MUST also extract the processor definition configuration. This definition provides implementation-specific parameters, typically required to launch the processor in a specific runtime (e.g., JavaScript entrypoints, file paths, class names, etc.).
Processor definitions are extracted using the same SHACL-based mechanism described previously. The shape used for this extraction is associated with the programming language or runtime type and MUST be processed in the same way to produce a structured JSON-LD object.
**RPC message**
Once both the arguments and definition have been extracted for a processor instance, the orchestrator sends an `RPC.proc` message to the appropriate runner, initiating the processor launch process.
The orchestrator MUST keep an internal record of all processor instance that have been dispatched to a runner, and the runner’s acknowledgment that a processor was successfully launched, as indicated by an incoming `Rpc.init` message.
No processor may be assumed to be operational until its runner has responded with `RPC.init`.
When all processors are successfully initialized, the orchestrator can start the pipeline.
#### Starting the pipeline
The orchestrator can start the pipeline by sending a `RPC.start` message to each runner.
The full startup flow is shown in this diagram.
<pre class="mermaid" id="startup">
</pre>
### Handling messages
The orchestrator acts as a message broker between processors.
It is responsible for receiving messages from runners and forwarding them to the appropriate destination runners based on channel identifiers defined in the pipeline.
Importantly, channels support **many-to-many** communication: multiple processors may emit to or receive from the same channel.
#### Normal messages
When a runner sends a `RPC.msg` message to the orchestrator, the message includes a channel IRI indicating its logical destination.
The orchestrator MUST:
1. Resolve the set of processors that are declared to consume this channel.
2. Determine which runner is responsible for each of those processors.
3. Forward the message to each relevant runner using `RPC.msg`.
These messages are sent as discrete units and fit within the allowed message size.
<pre class="mermaid" id="message">
</pre>
#### Streaming messages
When the payload of a message is large, the streaming message protocol SHOULD be used.
This protocol enables large messages to be sent incrementally over a separate gRPC stream while maintaining channel-based routing.
The process is as follows:
1. **Sender** (runner) initiates a `sendStreamMessage` gRPC stream to the orchestrator.
2. The **orchestrator** generates a unique stream identifier and sends it back on this stream.
3. The **sender** then sends a `RPC.streamMsg` over the normal bidirectional RPC stream, including the stream identifier and the channel IRI.
4. The **orchestrator** resolves which processors receive messages on the given channel and forwards the stream identifier to their corresponding runners with `RPC.streamMsg`.
5. **Receiving runners** connect to the orchestrator using `receiveStreamMessage`, passing the received stream identifier.
6. Once all participants are connected, the orchestrator acts as a relay: all incoming chunks from the sending stream are forwarded to each connected receiving stream.
The orchestrator MUST close all associated receiving streams once the sending stream completes.
This mechanism ensures that high-volume or large data payloads can be distributed across the pipeline efficiently and reliably.
<pre class="mermaid" id="streamMessage">
</pre>
## Runner
A runner in RDF-Connect is responsible for managing and executing processors within a specific execution context—typically a programming language runtime.
Each runner MUST connect with the orchestrator's Protobuf server and follow the RDF-Connect protocol.
While minimal runners can be implemented with little overhead, they often become enriched with quality-of-life features to better support developers and processors operating in that language.
These quality-of-life features include:
* Wrapping readers and writers in idiomatic objects
* Runners SHOULD also make it possible to let processors start up before acknowledging to the orchestrator that the processor is initialized.
This is useful for processor to execute longer running operations, like reading a file or consulting an external API.
Runners MAY also coalesce or transform message types to simplify processor implementation.
A streaming message may be aggregated into a single message if the underlying platform supports arbitrarily large strings or buffers,
and a single message may be exposed to the processor as a streaming interface, emitting a single chunk.
This flexibility allows generic processors to be implemented more easily without needing to distinguish between streaming and single-message protocols.
<div class=note>
Coalescing messages enables simpler processor logic.
For example, a processor that performs string replacement on incoming messages may otherwise need to implement both streaming and non-streaming handling.
Instead, the runner can convert incoming messages to a streaming form with one chunk, or aggregate streaming chunks into a single message.
It is advised that those processors, before forwarding the message, look at the length of each message before determining whether or not this message should be a single message or a streaming message.
</div>
The following sections detail the runner startup flow and describe the expected interactions between runners and the orchestrator during initialization and execution.
### Pipeline Configuration
A runner is defined with an instance of `rdfc:Runner`, which currently is instantiated by a command.
It requires the command to actually start the runner with `rdfc:command`, and a link to the programming context (`rdfc:handlesSubjectsOf`).
The object of `rdfc:handlesSubjectsOf` links runners and processors to a context term. This often refers to the programming language.
Each context term is related to a SHACL shape, this specifies the incoming data that the runner can use to start the processors.
<div class=example>
The NodeRunner, for example, is a program that starts JavaScript processors with Node. It is defined as follows:
```turtle
rdfc:NodeRunner a rdfc:Runner;
rdfc:handlesSubjectsOf rdfc:jsImplementationOf;
rdfc:command "npx js-runner".
# Note that rdfc:jsImplementationOf is already defined by RDF-Connect as follows
sds:implementationOf rdfs:subPropertyOf rdfs:subClassOf.
rdfc:jsImplementationOf rdfs:subPropertyOf sds:implementationOf.
# Shape that a Js Processor should fulfil;
[ ] a sh:NodeShape;
# We target it with jsImplementationOf
sh:targetSubjectsOf rdfc:jsImplementationOf;
sh:property [
sh:path rdfc:file;
sh:name "file";
sh:minCount 1;
sh:maxCount 1;
sh:datatype xsd:string;
], [
sh:path rdfc:class;
sh:name "clazz";
sh:maxCount 1;
sh:datatype xsd:string;
].
```
This way, `rdfc:jsImplementationOf` is a predicate declared only for JavaScript processors.
And a shape is linked with that predicate, runners can expect a file location and a class name to start the JavaScript processors.
</div>
### Connecting Flow
Each runner is started by the orchestrator using a command defined in the pipeline via `rdfc:command`.
The orchestrator appends two arguments to this command: the URL of the orchestrator’s Protobuf server and the IRI that uniquely identifies the runner.
Upon startup, the runner MUST connect to the orchestrator using the provided URL via the `RPC.connect` method.
This establishes a bidirectional message stream referred to as the normal stream.
Once connected, the runner MUST send an `RPC.identify` message, identifying itself with the provided IRI.
The orchestrator then sends an `RPC.pipeline` message containing the complete, expanded pipeline definition.
The runner MAY ignore this message.
Runners MAY initiate a separate log stream using the `RPC.logStream` method to stream log messages back to the orchestrator.
<div class=issue>
🚧 More information about the log stream is coming soon.
</div>
### Starting Processors
After initialization, the orchestrator may send multiple `RPC.proc` messages to instruct the runner to start specific processors.
Each message includes a processor IRI, a configuration object and an argument object.
Both the configuration and arguments are provided as JSON-LD strings.
The configuration object contains the arguments as defined by the context term following the section [Mapping SHACL to Configuration Structures](#mapping-shacl-to-configuration-structures).
The arguments are constructed based on a SHACL shape defined for the processor type.
Runners MAY parse these JSON-LD payloads and transform known constructs into idiomatic equivalents.
For example, reader and writer objects are represented as JSON-LD values with: an `@id` field (containing the channel IRI), and an `@type` field indicating either `https://w3id.org/rdf-connect#Reader` or `https://w3id.org/rdf-connect#Writer`.
Runners are RECOMMENDED to replace these values with appropriate typed objects in the target environment.
When a processor has been fully initialized, the runner MUST send an `RPC.init` message, indicating success or failure.
If any runner signals an error during initialization, the orchestrator MUST abort the pipeline execution.
Once all processors are successfully initialized, the orchestrator sends an `RPC.start` message, instructing the runner to start the processors.
After all processors complete their execution, the runner SHOULD gracefully close the normal stream to signal completion.
### Handling messages
Apart from starting processors, the runner also acts as a mediator that makes sure the correct messages are sent to the correct processors.
The orchestrator MAY send any number of `RPC.msg` or `RPC.streamMsg` messages.
#### Receiving normal messages
When an `RPC.msg` is received, the runner MUST deliver the message to the appropriate processor, using the channel IRI to determine the correct target.
Message routing can follow either a push or pull model depending on the language environment.
Runners MAY coerce or transform messages into a different representation, including converting a normal message into a streaming message with a single chunk or converting a short streaming message into a normal message.
These transformations SHOULD respect the preferences of the processor and the runner's internal design constraints.
#### Receiving streaming messages
When an `RPC.streamMsg` is received, the runner MUST establish a streaming channel by invoking the `RPC.receiveStreamMessage` method with the provided stream ID.
This initiates a stream of chunks from the orchestrator.
The runner MUST forward these chunks to the appropriate processor, using any internal buffering or transformation logic as required.
If the runner determines that the message size is small enough, it MAY convert a streaming message into a single message object, provided that this does not violate processor expectations or exceed system limits.
#### Sending messages
Runners MUST also support outbound communication from processors.
While runners MAY omit support for certain advanced features (such as streaming output), a full implementation is strongly encouraged.
To send a normal message, the runner uses the `RPC.msg` method on the normal stream.
To send a streaming message, the runner first initiates the `RPC.sendStreamMessage` method, which returns a new stream.
The orchestrator responds with a single message, the stream ID.
The runner then sends an `RPC.streamMsg` on the normal stream, including the channel IRI and stream ID.
Finally, the runner streams the message content using the created stream.
The message is considered complete when the runner closes the stream.
#### Channel Closure
Processors MAY indicate that a given channel is closed (i.e., no further messages will be sent).
The runner MUST propagate this information to the orchestrator via an `RPC.close` message.
Similarly, when the orchestrator sends an `RPC.close` message for a channel, the runner MAY respond by closing or invalidating the corresponding data stream in the processor.
## Processor
<div class=issue>
🚧 This section is a work in progress and will be expanded soon.
</div>
## Pipeline
<div class=issue>
🚧 This section is a work in progress and will be expanded soon.
</div>
# Ontology Reference
The RDF-Connect ontology provides the terms used in RDF pipeline definitions. See the full [RDF-Connect Ontology](https://w3id.org/rdf-connect/ontology.ttl) for details.
<script src="./scripts.js" type="module"></script>