Skip to content

Commit 1670601

Browse files
authored
adds events docs (#109)
Signed-off-by: Ashraf Fouda <ashraf.m.fouda@gmail.com>
1 parent 50cab0e commit 1670601

File tree

1 file changed

+135
-0
lines changed

1 file changed

+135
-0
lines changed

docs/internals/events/readme.md

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
# Events Package
2+
3+
## Introduction
4+
5+
The events package bridges TFChain (substrate) on-chain events to local node modules via Redis Streams. It subscribes to new blocks on the chain, decodes relevant events, filters them for this node/farm, and publishes them to Redis streams that other modules can consume.
6+
7+
## Architecture
8+
9+
```
10+
TFChain (substrate)
11+
|
12+
| WebSocket subscription (new block heads)
13+
v
14+
Processor (events.go)
15+
| Decodes EventRecords per block
16+
v
17+
RedisStream (redis.go)
18+
| Filters events for this node/farm
19+
| Publishes to Redis streams (GOB-encoded)
20+
v
21+
Redis Streams
22+
|
23+
+-- stream:public-config → gateway, network modules
24+
+-- stream:contract-cancelled → provision engine (noded)
25+
+-- stream:contract-lock → provision engine (noded)
26+
+-- stream:power-target → power module
27+
|
28+
v
29+
RedisConsumer (redis.go)
30+
| XREADGROUP with consumer groups
31+
v
32+
Module-specific event channels
33+
```
34+
35+
## Components
36+
37+
### Processor (`events.go`)
38+
39+
The core block-processing engine. Subscribes to new block headers via substrate WebSocket, then for each new block:
40+
41+
1. Gets the last processed block number from `State`
42+
2. Iterates from `last + 1` to the new block number
43+
3. Queries storage changes for `System.Events` at each block hash
44+
4. Decodes the raw event records into `substrate.EventRecords`
45+
5. Calls the registered `Callback` with the decoded events
46+
6. Persists the new block number to `State`
47+
48+
If the subscription drops (connection lost, substrate manager updated), it waits 10 seconds and reconnects. Blocks that are too old and no longer in the archive (RPC error code -32000) are silently skipped.
49+
50+
### State (`events.go`)
51+
52+
Tracks the last processed block number to avoid reprocessing on restart.
53+
54+
```go
55+
type State interface {
56+
Set(num types.BlockNumber) error
57+
Get(cl *gsrpc.SubstrateAPI) (types.BlockNumber, error)
58+
}
59+
```
60+
61+
`FileState` persists the block number as a 4-byte big-endian uint32 to a file. On first run (no file), it starts from the latest block on chain.
62+
63+
### RedisStream (`redis.go`) — Producer
64+
65+
Wraps `Processor` and publishes filtered events to Redis streams.
66+
67+
The `process` callback filters the following on-chain events:
68+
69+
| On-chain Event | Filter | Redis Stream | Event Type |
70+
|----------------|--------|-------------|------------|
71+
| `NodePublicConfigStored` | `event.Node == this node` | `stream:public-config` | `PublicConfigEvent` |
72+
| `NodeContractCanceled` | `event.Node == this node` | `stream:contract-cancelled` | `ContractCancelledEvent` |
73+
| `ContractGracePeriodStarted` | `event.NodeID == this node` | `stream:contract-lock` | `ContractLockedEvent` (Lock=true) |
74+
| `ContractGracePeriodEnded` | `event.NodeID == this node` | `stream:contract-lock` | `ContractLockedEvent` (Lock=false) |
75+
| `PowerTargetChanged` | `event.Farm == this farm` | `stream:power-target` | `PowerTargetChangeEvent` |
76+
77+
Events are GOB-encoded and pushed via `XADD` with `MAXLEN ~ 1024` (approximate trimming to keep the stream bounded).
78+
79+
The substrate manager can be hot-swapped via `UpdateSubstrateManager()` when the chain connection needs to be re-established with new URLs.
80+
81+
### RedisConsumer (`redis.go`) — Consumer
82+
83+
Provides typed Go channels for each event stream. Each consumer uses Redis consumer groups (`XREADGROUP`) for reliable delivery with acknowledgement.
84+
85+
```go
86+
func (r *RedisConsumer) PublicConfig(ctx context.Context) (<-chan pkg.PublicConfigEvent, error)
87+
func (r *RedisConsumer) ContractCancelled(ctx context.Context) (<-chan pkg.ContractCancelledEvent, error)
88+
func (r *RedisConsumer) ContractLocked(ctx context.Context) (<-chan pkg.ContractLockedEvent, error)
89+
func (r *RedisConsumer) PowerTargetChange(ctx context.Context) (<-chan pkg.PowerTargetChangeEvent, error)
90+
```
91+
92+
Each consumer:
93+
1. Creates a consumer group for the stream (idempotent, ignores `BUSYGROUP` error)
94+
2. First reads any pending (unacknowledged) messages from ID `0`
95+
3. Then blocks waiting for new messages from ID `>`
96+
4. Decodes GOB payload and sends on the typed channel
97+
5. Acknowledges each message after processing
98+
99+
The consumer ID must be unique per module to ensure independent delivery.
100+
101+
## Event Types
102+
103+
Defined in `pkg/events.go`:
104+
105+
```go
106+
type PublicConfigEvent struct {
107+
PublicConfig substrate.OptionPublicConfig
108+
}
109+
110+
type ContractCancelledEvent struct {
111+
Contract uint64
112+
TwinId uint32
113+
}
114+
115+
type ContractLockedEvent struct {
116+
Contract uint64
117+
TwinId uint32
118+
Lock bool // true = grace period started, false = ended
119+
}
120+
121+
type PowerTargetChangeEvent struct {
122+
FarmID pkg.FarmID
123+
NodeID uint32
124+
Target substrate.Power
125+
}
126+
```
127+
128+
## Consumers
129+
130+
| Module | Stream | Purpose |
131+
|--------|--------|---------|
132+
| Gateway / Network | `stream:public-config` | Reconfigure gateway when farmer updates public config |
133+
| Provision engine (noded) | `stream:contract-cancelled` | Deprovision workloads when contract is cancelled |
134+
| Provision engine (noded) | `stream:contract-lock` | Pause/resume workloads during grace period |
135+
| Power module | `stream:power-target` | Handle power on/off commands from the farmer |

0 commit comments

Comments
 (0)