Skip to content

Commit 15b8d64

Browse files
committed
CRE-1325
1 parent 64eac69 commit 15b8d64

File tree

3 files changed

+19
-8
lines changed

3 files changed

+19
-8
lines changed

pkg/workflows/dontime/plugin.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
7575
if req.ExpiryTime().Before(timeoutCheck) {
7676
// Request has been sitting in queue too long
7777
p.store.RemoveRequest(req.WorkflowExecutionID)
78-
req.SendTimeout(nil)
78+
ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime())
79+
req.SendTimeout(ctx)
80+
cancel()
7981
continue
8082
}
8183

@@ -89,14 +91,16 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
8991

9092
if req.SeqNum > numObservedDonTimes {
9193
p.store.RemoveRequest(req.WorkflowExecutionID)
92-
req.SendResponse(nil,
94+
ctx, cancel := context.WithDeadline(context.Background(), req.ExpiryTime())
95+
req.SendResponse(ctx,
9396
Response{
9497
WorkflowExecutionID: req.WorkflowExecutionID,
9598
SeqNum: req.SeqNum,
9699
Timestamp: 0,
97100
Err: fmt.Errorf("requested seqNum %d for executionID %s is greater than the number of observed don times %d",
98101
req.SeqNum, req.WorkflowExecutionID, numObservedDonTimes),
99102
})
103+
cancel()
100104
continue
101105
}
102106

pkg/workflows/dontime/request.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,28 @@ func (r *Request) ExpiryTime() time.Time {
2525
return r.ExpiresAt
2626
}
2727

28-
func (r *Request) SendResponse(_ context.Context, resp Response) {
28+
func (r *Request) SendResponse(ctx context.Context, resp Response) {
2929
select {
3030
case r.CallbackCh <- resp:
3131
close(r.CallbackCh)
32-
default: // Don't block trying to send
32+
default:
33+
// Don't block if receiver not ready, but check if context is actually expired
34+
select {
35+
case <-ctx.Done():
36+
// Context cancelled or deadline exceeded before send
37+
default:
38+
// Try once more without blocking
39+
}
3340
}
3441
}
3542

36-
func (r *Request) SendTimeout(_ context.Context) {
43+
func (r *Request) SendTimeout(ctx context.Context) {
3744
timeoutResponse := Response{
3845
WorkflowExecutionID: r.WorkflowExecutionID,
3946
SeqNum: r.SeqNum,
4047
Err: fmt.Errorf("timeout exceeded: could not process request before expiry, workflowExecutionID %s", r.WorkflowExecutionID),
4148
}
42-
r.SendResponse(nil, timeoutResponse)
49+
r.SendResponse(ctx, timeoutResponse)
4350
}
4451

4552
func (r *Request) Copy() *Request {

pkg/workflows/dontime/transmitter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ func NewTransmitter(lggr logger.Logger, store *Store, fromAccount types.Account)
2626
return &Transmitter{lggr: lggr, store: store, fromAccount: fromAccount}
2727
}
2828

29-
func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error {
29+
func (t *Transmitter) Transmit(ctx context.Context, _ types.ConfigDigest, _ uint64, r ocr3types.ReportWithInfo[[]byte], _ []types.AttributedOnchainSignature) error {
3030
outcome := &pb.Outcome{}
3131
if err := proto.Unmarshal(r.Report, outcome); err != nil {
3232
t.lggr.Errorf("failed to unmarshal report")
@@ -51,7 +51,7 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64
5151
if len(donTimes.Timestamps) > request.SeqNum {
5252
donTime := donTimes.Timestamps[request.SeqNum]
5353
t.store.RemoveRequest(executionID) // Make space for next request before delivering
54-
request.SendResponse(nil, Response{
54+
request.SendResponse(ctx, Response{
5555
WorkflowExecutionID: executionID,
5656
SeqNum: request.SeqNum,
5757
Timestamp: donTime,

0 commit comments

Comments
 (0)