| Crate | crates.io | docs.rs |
|---|---|---|
| pulse-core | ||
| pulse-state | ||
| pulse-ops | ||
| pulse-io |
Pulse is a tiny, modular, event-time streaming framework (Flink/Beam-like) written in Rust. It focuses on clarity, testability, and a local-first workflow. It supports watermarks, windowing, pluggable state, Prometheus metrics, and a single-binary CLI that runs pipelines from a TOML file.
Goals:
- Local-first: zero external services required for the common path
- Stream processing with event-time and watermarks
- Configurable windowing (tumbling/sliding/session) also in CLI
- Pluggable state backends (in-memory and optional RocksDB)
- File and Parquet I/O, plus optional Kafka
- First-class observability (tracing/Prometheus)
- Single binary you can run on your laptop or inside a container without standing up a cluster or control plane.
- Files-in/files-out by default; opt into Kafka when needed.
- Deterministic replay support (EOF watermark) so you can iterate quickly and write golden tests.
- Ergonomics first: small, readable codebase; simple config; sensible defaults.
pulse-core: core types/traits,Executor,Record, event-timeWatermark, timers, metrics, and config loaderpulse-ops: operators (Map,Filter,KeyBy,Aggregate,WindowedAggregate), event-time window helperspulse-state: state backendsInMemoryState(default)RocksDbState(featurerocksdb)
pulse-io: sources/sinksFileSource(JSONL/CSV) with EOF watermarkFileSink(JSONL)ParquetSink(featureparquet) with date partitioning and rotationKafkaSource/KafkaSink(featurekafka) with resume from persisted offsets
pulse-examples: runnable examples and sample datapulse-bin: CLI (pulse) and/metricsHTTP server
Record { event_time: chrono::DateTime<Utc>, value: serde_json::Value }Watermark(EventTime)propagated through the pipeline- Executor drives
on_watermarkfor operators and informs sinks - Lag metric (
pulse_watermark_lag_ms) = now - watermark
Semantics overview:
- Event-time is derived from your data (RFC3339 string or epoch ms).
- Sources advance a low watermark to signal “no more records ≤ t expected”.
- Operators emit window results when
watermark >= window.end. - FileSource emits a final EOF watermark far in the future to flush all windows in batch-like runs.
- Operators:
WindowedAggregatesupports Tumbling/Sliding/Session - The CLI supports tumbling, sliding, and session with aggregations:
count,sum,avg,distinct - EOF watermark emitted by
FileSourceflushes windows
KvStatetrait:get/put/delete/iter_prefix/snapshot/restorepulse-state::InMemoryStateimplements full API (snapshots kept in-memory)pulse-state::RocksDbState(featurerocksdb): prefix iteration and checkpoint directory creation via RocksDB CheckpointWindowOperatorcan persist per-window state via a backend and restore after restart (optional hook)
- Single-node at-least-once: each record is processed at least once; exactly-once is not guaranteed.
- FileSource is deterministic (EOF watermark). KafkaSource commits offsets periodically (configurable); on restart, offsets are available in KvState for recovery logic.
- Operator state updates and sink writes are not transactional as a unit (no two-phase commit in MVP).
- No cluster/distributed runtime yet (single process, single binary).
- No SQL/DSL planner; define pipelines in Rust or via TOML.
- Checkpoint/resume orchestration is minimal: offsets/snapshots exist, but full CLI-driven recovery is a follow-up.
- Kafka is optional and depends on native
librdkafka.
FileSource(JSONL/CSV): parsesevent_timefrom RFC3339 or epoch ms; final watermark at EOFFileSink: writes JSON lines to stdout/fileParquetSink(featureparquet):- Schema:
event_time: timestamp(ms),payload: utf8(full JSON) - Partitioning:
- By date (default):
out_dir/dt=YYYY-MM-DD/part-*.parquet(configurable format viapartition_format) - By field:
out_dir/<field>=<value>/part-*.parquet(setpartition_field)
- By date (default):
- Rotation: by row-count, time, and optional bytes (
max_bytes) - Compression:
snappy(default),zstd, ornone - Tested: writes files then read back via Arrow reader, asserting row counts
- Schema:
KafkaSource/KafkaSink(featurekafka): integration withrdkafka, with resuming offsets from persisted state
- Tracing spans on operators (receive/emit) using
tracing - Prometheus metrics (via
pulse-core::metrics):pulse_operator_records_total{operator,stage=receive|emit}pulse_watermark_lag_ms(gauge)pulse_bytes_written_total{sink}pulse_state_size{operator}pulse_operator_process_latency_ms(histogram)pulse_sink_process_latency_ms(histogram)pulse_queue_depth(gauge)pulse_dropped_records_total{reason}(counter)
/metricsHTTP endpoint served bypulse-bin(axum 0.7)
Binary crate: pulse-bin. Subcommands:
-
pulse serve --port 9898- Serves
/metricsin Prometheus format.
- Serves
-
pulse run --config pipeline.toml [--http-port 9898]- Loads a TOML config, validates it, builds the pipeline, and runs until EOF (or Ctrl-C if you wire a streaming source).
- If
--http-portis provided, starts/metricson that port. - Optional backpressure (soft-bound) via environment: set
PULSE_CHANNEL_BOUND(e.g.,PULSE_CHANNEL_BOUND=10000) to drop new records when the in-flight depth reaches the bound. Watermarks are never dropped.
[source]
kind = "file"
path = "pulse-examples/examples/sliding_avg.jsonl"
time_field = "event_time"
[time]
allowed_lateness = "10s"
[window]
# supported: tumbling|sliding|session
type = "sliding"
size = "60s"
slide = "15s" # for sliding; for session, use: gap = "30s"
[ops]
# aggregation over a key; supported: count (default), sum, avg, distinct
count_by = "word"
# agg = "count" # default
# agg_field = "value" # obrigatório para sum|avg|distinct
[sink]
kind = "parquet"
out_dir = "outputs"
## Optional Parquet settings
# compression = "snappy" # one of: snappy (default) | zstd | none
# max_bytes = 104857600 # rotate file when ~bytes reached (e.g. 100MB)
# partition_field = "user_id" # partition by a payload field value
# partition_format = "%Y-%m" # date partition format when partitioning by event_timeValidation rules:
source.kindmust befile(orkafka)sink.kindmust beparquet/file(orkafka)ops.count_bymust be present
# Build
cargo build
# Run the pipeline and export metrics
cargo run -p pulse-bin -- run --config examples/pipeline.toml --http-port 9898
# Scrape metrics
curl http://127.0.0.1:9898/metricsExpected output:
- Parquet files created under
outputs/dt=YYYY-MM-DD/part-*.parquet.
CLI supports JSONL and CSV via source.format.
- Direct CSV in the CLI:
[source]
kind = "file"
format = "csv" # jsonl | csv
path = "input.csv"
time_field = "event_time" # epoch ms in CSV
[time]
allowed_lateness = "10s"
[window]
type = "tumbling"
size = "60s"
[ops]
count_by = "word"
[sink]
kind = "parquet"
out_dir = "outputs"- Use the Rust API directly:
use pulse_core::Executor;
use pulse_io::{FileSource, ParquetSink, ParquetSinkConfig, PartitionSpec};
use pulse_ops::{KeyBy, WindowedAggregate};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// CSV source: header must include event_time (epoch ms)
let src = FileSource { path: "input.csv".into(), format: pulse_io::FileFormat::Csv, event_time_field: "event_time".into(), text_field: None };
let mut exec = Executor::new();
exec.source(src)
.operator(KeyBy::new("word"))
.operator(WindowedAggregate::tumbling_count("key", 60_000))
.sink(ParquetSink::new(ParquetSinkConfig {
out_dir: "outputs".into(),
partition_by: PartitionSpec::ByDate { field: "event_time".into(), fmt: "%Y-%m-%d".into() },
max_rows: 1_000_000,
max_age: std::time::Duration::from_secs(300),
}));
exec.run().await?;
Ok(())
}[source]
kind = "file"
path = "pulse-examples/examples/sliding_avg.jsonl"
time_field = "event_time"
[time]
allowed_lateness = "10s"
[window]
type = "tumbling"
size = "60s"
[ops]
count_by = "word"
agg = "avg" # or: sum | distinct | count
agg_field = "score" # required for avg/sum/distinct
[sink]
kind = "parquet"
out_dir = "outputs"- Data examples under
pulse-examples/examples/ - Sample dataset:
sliding_avg.jsonl(generated earlier) works with the file source - You can also wire other examples in
pulse-examplesusing the operators frompulse-ops
Enable per crate features:
# Kafka
cargo build -p pulse-io --features kafka
# Arrow/Parquet
cargo build -p pulse-io --features parquet
Windows + Kafka notes: enabling kafka builds the native librdkafka by default and requires CMake and MSVC Build Tools.
- Install CMake and Visual Studio Build Tools (C++), then rerun the build
- Or link against a preinstalled librdkafka and remove
cmake-buildfeature
From the workspace root, you can run tests per crate. Some features are crate-specific:
# Operators crate
cargo test -p pulse-ops -- --nocapture
# I/O crate with Parquet
cargo test -p pulse-io --features parquet -- --nocapture
# CLI crate (includes end-to-end goldens)
cargo test -p pulse-bin -- --nocapture
# All workspace tests (no extra features)
cargo test -- --nocaptureNotes:
- Only the
pulse-iocrate defines theparquetfeature. Do not pass--features parquetto other crates. - The
kafkafeature is also only onpulse-ioand requires native dependencies on Windows.
Pulse takes a pragmatic, local-first approach for single-node pipelines. A comparison at a glance:
| System | Install/runtime footprint | Local-first UX | Event-time/windowing | SQL | Cluster | Primary niche |
|---|---|---|---|---|---|---|
| Flink | Heavy (cluster/services) | No (dev spins cluster) | Yes (rich) | Yes | Yes | Large-scale distributed streaming |
| Arroyo | Moderate (service + workers) | Partial | Yes | Yes | Yes | Cloud-native streaming w/ SQL |
| Fluvio | Broker + clients | Partial | Limited | No | Yes | Distributed streaming data plane |
| Materialize | Service + storage | Partial | Incremental views | Yes | Yes | Streaming SQL materialized views |
| Pulse | Single binary | Yes | Yes (MVP focused) | No | No | Local-first pipelines & testing |
If you need distributed scale, multi-tenant scheduling, or a SQL-first experience, those systems are a better fit. Pulse aims to be the simplest way to iterate on event-time pipelines locally and ship small, self-contained jobs.
