A blazing-fast, concurrent, and extensible distributed stream processing system in Go—engineered for high-throughput, low-latency, and robust concurrent workloads. This project is a blueprint for building scalable, real-time data pipelines.
This system delivers exceptional throughput and efficiency.
goos: linux
goarch: amd64
cpu: Intel(R) Core(TM) i5-8265U CPU @ 1.60GHz
BenchmarkBroker_AppendRecord-8 14,883,109 68.15 ns/op
BenchmarkBroker_GetRecords-8 1,000,000 1024 ns/op
BenchmarkBroker_ChannelIngestion-8 5,393,268 217.3 ns/op
BenchmarkLoadConfig_Default-8 32,622,258 36.99 ns/op
BenchmarkLoadConfig_EnvOverride-8 25,494,633 46.57 ns/op
BenchmarkProcessor_Process-8 22,775,946 50.00 ns/op
BenchmarkProcessor_ProcessWindow-8 913 1,803,179 ns/op
BenchmarkRecord_Metadata-8 36,200,748 33.23 ns/op
Highlights:
- Handles millions of records per second per core.
- Sub-microsecond latency for core operations.
- Efficient channel-based ingestion and transformation.
.
├── benchmark/ # Benchmarks for all core components
│ ├── broker_benchmark_test.go
│ ├── config_benchmark_test.go
│ ├── processor_benchmark_test.go
│ ├── types_benchmark_test.go
│ └── README.md
├── config/ # Configuration loader
│ └── config.go
├── internal/
│ ├── broker/ # In-memory broker (shards, retention, channel ingestion)
│ │ └── broker.go
│ ├── processor/ # Stream processor (transformations, windowing)
│ │ └── processor.go
│ └── types/ # Shared data types (Record, etc.)
│ └── types.go
├── tests/
│ ├── unit/ # Unit tests for all components
│ └── integration/ # Integration tests for end-to-end flows
├── cmd/
│ ├── producer/ # Producer simulation
│ │ └── main.go
│ └── consumer/ # Consumer simulation
│ └── main.go
├── Makefile # Helpful run commands
├── go.mod # Go module definition
└── README.md # This file
- Clone the repository:
git clone https://github.com/khushmanvar/litestreams cd litestreams - Ensure Go 1.20+ is installed:
go version
- Install dependencies:
go mod tidy
Simulates sending records to the stream:
make run-producerSimulates consuming and processing records:
make run-consumergo test ./tests/unit/... ./tests/integration/...go test -bench=. ./benchmark/...- Broker: In-memory, sharded, append-only log with retention and channel-based ingestion.
- Processor: Pluggable transformation and windowed processing logic.
- Types: Extensible record structure with metadata.
- Config: Simple, environment-driven configuration.
- Producer/Consumer: Simulated entry points for data flow and processing.
- Tests: 100% coverage with clear separation of unit and integration tests.
- Benchmarks: Comprehensive performance metrics for all core operations.
Contributions are welcome! Please open issues or pull requests for improvements, new features, or bug fixes.
MIT License. See LICENSE for details.