SDK for building indexers by DipDup. Provides a set of packages for constructing modular, flow-based indexing pipelines.
The SDK follows a flow-based programming (FBP) approach: independent asynchronous modules communicate through typed inputs and outputs, forming a processing pipeline (workflow).
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Source │────>│ Process │────>│ Sink │
│ (gRPC, │ │ (custom │ │ (storage,│
│ cron) │ │ logic) │ │ printer)│
└──────────┘ └──────────┘ └──────────┘
| Package | Description |
|---|---|
pkg/modules |
Flow-based module system: interfaces, inputs/outputs, workflow orchestration |
pkg/modules/grpc |
gRPC client and server modules with subscription support |
pkg/modules/cron |
Cron scheduler module |
pkg/modules/zipper |
Aggregates two input streams by key |
pkg/modules/stopper |
Graceful shutdown via context cancellation |
pkg/modules/printer |
Debug module that logs received messages |
pkg/storage |
Abstract storage layer with generic interfaces |
pkg/storage/postgres |
PostgreSQL implementation (pgx + bun) |
pkg/sync |
Thread-safe generic Map[K, V] |
pkg/contract |
Contract ABI to JSON Schema transformer (EVM) |
pkg/rlp |
RLP encoding/decoding for Ethereum logs |
pkg/jsonschema |
JSON Schema types (Draft 2019-09) |
import (
"github.com/dipdup-net/indexer-sdk/pkg/modules"
"github.com/dipdup-net/indexer-sdk/pkg/modules/cron"
)
// Create modules
cronModule, _ := cron.NewModule(cfg.Cron)
customModule := NewCustomModule()
// Connect cron output to custom module input
modules.Connect(cronModule, customModule, "every_second", "input")
// Start workflow
ctx, cancel := context.WithCancel(context.Background())
cronModule.Start(ctx)
customModule.Start(ctx)import (
"github.com/dipdup-io/go-lib/config"
"github.com/dipdup-net/indexer-sdk/pkg/storage/postgres"
)
cfg := config.Database{
Host: "127.0.0.1", Port: 5432,
User: "user", Password: "password", Database: "mydb",
}
storage, _ := postgres.Create(ctx, cfg, func(ctx context.Context, conn *database.Bun) error {
_, err := conn.DB().NewCreateTable().Model((*MyModel)(nil)).IfNotExists().Exec(ctx)
return err
})
defer storage.Close()The cmd/dipdup-gen tool generates boilerplate for new indexer projects from EVM contract ABIs.
go run ./cmd/dipdup-gen abi --input contract.json --output ./generatedWorking examples are available in the examples/ directory:
examples/cron— cron scheduler usageexamples/storage— PostgreSQL storage layerexamples/grpc— gRPC client/server with subscriptionsexamples/zipper— stream aggregation by key
go get github.com/dipdup-net/indexer-sdk