-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathjob.go
More file actions
118 lines (98 loc) · 3.1 KB
/
job.go
File metadata and controls
118 lines (98 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package dasmon
import (
"context"
"fmt"
"log/slog"
"sync/atomic"
"time"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/libp2p/go-libp2p/core/peer"
)
var (
startTimestamp = time.Now().UnixNano()
counter atomic.Uint64
)
// Status represents the status of a job
type Status string
const (
StatusPending Status = "pending"
StatusRunning Status = "running"
StatusSuccess Status = "success"
StatusFailed Status = "failed"
StatusCancelled Status = "cancelled"
)
// JobResult holds the outcome of a job execution
type JobResult struct {
Sidecars []*eth.DataColumnSidecar
Duration time.Duration
Error error
}
// Job represents a data column fetch request to a peer.
// It starts with just the request parameters (PeerID, slots, columns),
// and tracking fields are populated when Prepare() is called before execution.
type Job struct {
// Request parameters (set when job is created)
PeerID peer.ID
StartSlot uint64
EndSlot uint64
Columns []uint64
// Tracking fields (set when Prepare is called)
ID string
Status Status
CreatedAt time.Time
StartedAt time.Time
EndedAt time.Time
// Execution state
Context context.Context
Cancel context.CancelFunc
Request *eth.DataColumnSidecarsByRangeRequest
Result *JobResult
log *slog.Logger
}
// Prepare initializes tracking fields before dispatching to a worker.
// Must be called before Execute.
func (j *Job) Prepare(ctx context.Context) {
j.ID = fmt.Sprintf("%s-%d-%d", j.PeerID.String(), startTimestamp, counter.Add(1))
j.Status = StatusPending
j.CreatedAt = time.Now()
j.Context, j.Cancel = context.WithCancel(ctx)
j.Result = new(JobResult)
j.log = log.With("job_id", j.ID, "peer", j.PeerID.String())
}
// Execute performs a DataColumnSidecarsByRange request to the peer and validates the response.
// It validates the response according to the Ethereum consensus specification and logs any violations.
// It also verifies that the returned columns match the expected blob KZG commitments.
func (j *Job) Execute(client *RpcClient) error {
startTime := time.Now()
defer func() {
j.EndedAt = time.Now()
}()
// Build the request
if j.EndSlot < j.StartSlot {
return fmt.Errorf("invalid slot range: [%d, %d]", j.StartSlot, j.EndSlot)
}
count := j.EndSlot - j.StartSlot + 1
j.Request = ð.DataColumnSidecarsByRangeRequest{
StartSlot: primitives.Slot(j.StartSlot),
Count: count,
Columns: j.Columns,
}
j.log.Debug("starting DataColumnSidecarsByRange request")
sidecars, err := client.DataColumnByRange(j.Context, j.PeerID, j.Request)
j.Result.Duration = time.Since(startTime)
if err != nil {
j.Result.Error = WrapError(ErrRPCFailure, err, "start: %d, end: %d, cols: %v", j.StartSlot, j.EndSlot, j.Columns)
return nil
}
j.log.Debug("completed DataColumnSidecarsByRange request",
"duration", j.Result.Duration,
"chunks_rcvd", len(sidecars),
)
// Store result for validation by engine
j.Result.Sidecars = sidecars
j.log.Info("successfully fetched data column sidecars",
"sidecars", len(sidecars),
)
return nil
}