Skip to content
Julien Loudet edited this page Jan 22, 2025 · 1 revision

In Zenoh-Flow, a Sink receives data from upstream Sources or Operators. A Sink typically connects a data flow to the "outside world": by, for example, writing the result of previous computations to a database. In the object detection pipeline scenario, the Sink would be the node that outputs what was detected.

For Zenoh-Flow to be able to load our Sink, it must be accompanied by a descriptor.

Descriptor

The content of the descriptor for a Sink is as follows:

  1. (optional) a description,
  2. (optional) some configuration,
  3. (optional) some vars,
  4. its inputs --- i.e. the data it requires,
  5. a library --- i.e. where to find its actual implementation.

Below is a valid descriptor that matches the code we are going to write next:

description: my implementation of a Sink

# This configuration is not used and serves as an example.
configuration:
  value: not-used

# This vars section is not used and serves as an example.
vars:
  FOO: not-used
  
inputs:
  - input

# Linux:
library: file:///absolute/path/to/the/implementation/libmy_sink.so
# MacOS:
# library: file:///absolute/path/to/the/implementation/libmy_sink.dylib

Shared library

Assuming you want to create a Sink called my-sink, enter the following in a terminal:

cargo new --lib my-sink

Modify the Cargo.toml to add these dependencies and tell rustc that you want a library that can be dynamically loaded:

[dependencies]
async-trait = "0.1.50"  # Zenoh-Flow’s nodes traits are asynchronous
zenoh-flow-nodes = { git = "https://github.com/eclipse-zenoh-flow/zenoh-flow.git" }

[lib]
crate-type=["cdylib"]

Now modify lib.rs to (i) implement the Zenoh-Flow traits and (ii) include your logic.

Below you can find commented boilerplate code to do (i).

use async_trait::async_trait;
use zenoh_flow_nodes::prelude::*;

// MySink is where you implement your business' logic.
//
// `Input` is a structure provided by Zenoh-Flow through which you receive data from upstream nodes.
// The way to pass an `Input` is through its Constructor --- see below.
//
// That structure is also where a state can be saved. For concurrency reasons, the state must
// implement `Send` and `Sync` (`Arc` and `Mutex` structures can be helpful, in particular their
// `async_std` variant).
//
// The `export_sink` macro is required to properly expose the symbol and information about the
// version of the Rust compiler and Zenoh-Flow, to Zenoh-Flow.
//
// It allows Zenoh-Flow to detect, at runtime, a version mismatch between the Zenoh-Flow daemon and
// the shared library (be it on the version of the Rust compiler or of Zenoh-Flow itself).
#[export_sink]
struct MySink {
    input: Input<String>,
}

#[async_trait]
impl Node for MySink {
    async fn iteration(&self) -> Result<()> {
        // Add your business logic here.
        let data_to_process = self.input.recv().await?;
        println!("This is what was received: {:?}", data_to_process);
    }
}

#[async_trait]
impl Sink for MySink {
    async fn new(
        // The `context` provides information about the Zenoh-Flow daemon on which the generated
        // node MySink will be executed.
        context: Context,
        // The `configuration`(1) is a re-export of `serde_json::Value`(2). It behaves as a
        // dictionary and allows accessing configuration variables defined in the descriptor.
        configuration: Option<Configuration>,
        // The `Inputs` are encapsulated `flume::Receivers` created by Zenoh-Flow. It is a HashMap
        // whose keys match what was defined in the descriptor file.
        mut inputs: Inputs,
    ) -> Result<Self> {
        let input = inputs
                .take("input")
                .expect("No input named 'input' found")
                // The method `typed` allows automatically deserialising data if it comes from a
                // node located on another process. It will call the provided closure.
                .typed(|bytes| todo!());
        Ok(MySink { input })
    }
}

(1): Configuration (2): serde_json::Value

Python script

TODO: Add a reference to the auto-generated Python docs.

Clone this wiki locally