Skip to content

Commit 40fe22e

Browse files
committed
Split structs into OperationUpdate and OperationUpdateAsync
Signed-off-by: Enrique Lacal <[email protected]>
1 parent b5c4555 commit 40fe22e

File tree

20 files changed

+221
-189
lines changed

20 files changed

+221
-189
lines changed

internal/blockchain/common/common.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type BlockchainCallbacks interface {
4646
// BulkOperationUpdates is a synchronous way to update multiple operations and will return when the updates have been committed to the database or there has been an error
4747
// An insertion ordering guarantee is only provided when this code is called on a single goroutine inside of the connector.
4848
// It is the responsibility of the connector code to allocate that routine, and ensure that there is only one.
49-
// Note: onComplete at each update level is not called, as this is a bulk operation and should be reponsibility of the caller to manage.
49+
// Note: onComplete at each update level is not called, as this is a bulk operation and should be reponsibility of the caller to manage if needed.
5050
BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error
5151

5252
OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject)
@@ -170,13 +170,15 @@ func (cb *callbacks) SetOperationalHandler(namespace string, handler core.Operat
170170
func (cb *callbacks) OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
171171
namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID)
172172
if handler, ok := cb.opHandlers[namespace]; ok {
173-
handler.OperationUpdate(&core.OperationUpdate{
174-
Plugin: plugin.Name(),
175-
NamespacedOpID: nsOpID,
176-
Status: status,
177-
BlockchainTXID: blockchainTXID,
178-
ErrorMessage: errorMessage,
179-
Output: opOutput,
173+
handler.OperationUpdate(&core.OperationUpdateAsync{
174+
OperationUpdate: core.OperationUpdate{
175+
Plugin: plugin.Name(),
176+
NamespacedOpID: nsOpID,
177+
Status: status,
178+
BlockchainTXID: blockchainTXID,
179+
ErrorMessage: errorMessage,
180+
Output: opOutput,
181+
},
180182
})
181183
return
182184
}

internal/blockchain/common/common_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestCallbackOperationUpdate(t *testing.T) {
4444
cb.SetOperationalHandler("ns1", mcb)
4545

4646
mbi.On("Name").Return("utblockchain")
47-
mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
47+
mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
4848
return update.NamespacedOpID == nsOpID &&
4949
update.Status == core.OpStatusSucceeded &&
5050
update.BlockchainTXID == "tx1" &&

internal/blockchain/ethereum/ethereum_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -1825,7 +1825,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
18251825
"transactionIndex": "0"
18261826
}`)
18271827

1828-
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1828+
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
18291829
return update.NamespacedOpID == "ns1:"+operationID.String() &&
18301830
update.Status == core.OpStatusSucceeded &&
18311831
update.BlockchainTXID == "0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8" &&
@@ -1911,7 +1911,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) {
19111911
"updated": "2022-08-03T18:55:43.781941Z"
19121912
}`)
19131913

1914-
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1914+
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
19151915
return update.NamespacedOpID == "ns1:"+operationID.String() &&
19161916
update.Status == core.OpStatusPending &&
19171917
update.BlockchainTXID == "0x929c898a46762d91e9f4b0b8e2800863dcf4a40f694109dc4cd19dbd334fa4cc" &&
@@ -1955,7 +1955,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {
19551955

19561956
em := &coremocks.OperationCallbacks{}
19571957
e.SetOperationHandler("ns1", em)
1958-
txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1958+
txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
19591959
return update.NamespacedOpID == "ns1:"+operationID.String() &&
19601960
update.Status == core.OpStatusFailed &&
19611961
update.ErrorMessage == "Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument" &&

internal/blockchain/fabric/fabric_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -1778,7 +1778,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
17781778
"receivedAt": 1630033474675
17791779
}`)
17801780

1781-
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1781+
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
17821782
return update.NamespacedOpID == "ns1:"+operationID.String() &&
17831783
update.Status == core.OpStatusSucceeded &&
17841784
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" &&
@@ -1839,7 +1839,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {
18391839
"transactionHash": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
18401840
}`)
18411841

1842-
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1842+
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
18431843
return update.NamespacedOpID == "ns1:"+operationID.String() &&
18441844
update.Status == core.OpStatusFailed &&
18451845
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" &&

internal/blockchain/tezos/tezos_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -685,7 +685,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
685685
}
686686
}`)
687687

688-
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
688+
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
689689
return update.NamespacedOpID == "ns1:"+operationID.String() &&
690690
update.Status == core.OpStatusSucceeded &&
691691
update.BlockchainTXID == "ooGcrcazgcGBrY1iym329ovV13MnWrTmV1fttCwWKH5DiYUQsiq" &&
@@ -769,7 +769,7 @@ func TestHandleReceiptTXUpdateTezosConnect(t *testing.T) {
769769
"updated": "2023-09-10T14:49:36.030604Z"
770770
}`)
771771

772-
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
772+
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
773773
return update.NamespacedOpID == "ns1:"+operationID.String() &&
774774
update.Status == core.OpStatusPending &&
775775
update.BlockchainTXID == "onhZJDmz5JihnW1RaZ96f17FgUBv3GoERkRECK3XVFt1kL5E6Yy" &&

internal/dataexchange/ffdx/dxevent.go

Lines changed: 54 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -90,71 +90,83 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) {
9090

9191
switch msg.Type {
9292
case messageFailed:
93-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
94-
Plugin: h.Name(),
95-
NamespacedOpID: msg.RequestID,
96-
Status: core.OpStatusFailed,
97-
ErrorMessage: msg.Error,
98-
Output: msg.Info,
99-
OnComplete: e.Ack,
93+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
94+
OperationUpdate: core.OperationUpdate{
95+
Plugin: h.Name(),
96+
NamespacedOpID: msg.RequestID,
97+
Status: core.OpStatusFailed,
98+
ErrorMessage: msg.Error,
99+
Output: msg.Info,
100+
},
101+
OnComplete: e.Ack,
100102
})
101103
return
102104
case messageDelivered:
103105
status := core.OpStatusSucceeded
104106
if h.capabilities.Manifest {
105107
status = core.OpStatusPending
106108
}
107-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
108-
Plugin: h.Name(),
109-
NamespacedOpID: msg.RequestID,
110-
Status: status,
111-
Output: msg.Info,
112-
OnComplete: e.Ack,
109+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
110+
OperationUpdate: core.OperationUpdate{
111+
Plugin: h.Name(),
112+
NamespacedOpID: msg.RequestID,
113+
Status: status,
114+
Output: msg.Info,
115+
},
116+
OnComplete: e.Ack,
113117
})
114118
return
115119
case messageAcknowledged:
116-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
117-
Plugin: h.Name(),
118-
NamespacedOpID: msg.RequestID,
119-
Status: core.OpStatusSucceeded,
120-
VerifyManifest: h.capabilities.Manifest,
121-
DXManifest: msg.Manifest,
122-
Output: msg.Info,
123-
OnComplete: e.Ack,
120+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
121+
OperationUpdate: core.OperationUpdate{
122+
Plugin: h.Name(),
123+
NamespacedOpID: msg.RequestID,
124+
Status: core.OpStatusSucceeded,
125+
VerifyManifest: h.capabilities.Manifest,
126+
DXManifest: msg.Manifest,
127+
Output: msg.Info,
128+
},
129+
OnComplete: e.Ack,
124130
})
125131
return
126132
case blobFailed:
127-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
128-
Plugin: h.Name(),
129-
NamespacedOpID: msg.RequestID,
130-
Status: core.OpStatusFailed,
131-
ErrorMessage: msg.Error,
132-
Output: msg.Info,
133-
OnComplete: e.Ack,
133+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
134+
OperationUpdate: core.OperationUpdate{
135+
Plugin: h.Name(),
136+
NamespacedOpID: msg.RequestID,
137+
Status: core.OpStatusFailed,
138+
ErrorMessage: msg.Error,
139+
Output: msg.Info,
140+
},
141+
OnComplete: e.Ack,
134142
})
135143
return
136144
case blobDelivered:
137145
status := core.OpStatusSucceeded
138146
if h.capabilities.Manifest {
139147
status = core.OpStatusPending
140148
}
141-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
142-
Plugin: h.Name(),
143-
NamespacedOpID: msg.RequestID,
144-
Status: status,
145-
Output: msg.Info,
146-
OnComplete: e.Ack,
149+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
150+
OperationUpdate: core.OperationUpdate{
151+
Plugin: h.Name(),
152+
NamespacedOpID: msg.RequestID,
153+
Status: status,
154+
Output: msg.Info,
155+
},
156+
OnComplete: e.Ack,
147157
})
148158
return
149159
case blobAcknowledged:
150-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
151-
Plugin: h.Name(),
152-
NamespacedOpID: msg.RequestID,
153-
Status: core.OpStatusSucceeded,
154-
Output: msg.Info,
155-
VerifyManifest: h.capabilities.Manifest,
156-
DXHash: msg.Hash,
157-
OnComplete: e.Ack,
160+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
161+
OperationUpdate: core.OperationUpdate{
162+
Plugin: h.Name(),
163+
NamespacedOpID: msg.RequestID,
164+
Status: core.OpStatusSucceeded,
165+
Output: msg.Info,
166+
VerifyManifest: h.capabilities.Manifest,
167+
DXHash: msg.Hash,
168+
},
169+
OnComplete: e.Ack,
158170
})
159171
return
160172

internal/dataexchange/ffdx/ffdx.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type callbacks struct {
6868
opHandlers map[string]core.OperationCallbacks
6969
}
7070

71-
func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdate) {
71+
func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdateAsync) {
7272
namespace, _, _ := core.ParseNamespacedOpID(ctx, update.NamespacedOpID)
7373
if handler, ok := cb.opHandlers[namespace]; ok {
7474
handler.OperationUpdate(update)

internal/dataexchange/ffdx/ffdx_test.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2021 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -142,7 +142,7 @@ func TestInitMissingURL(t *testing.T) {
142142

143143
func opAcker() func(args mock.Arguments) {
144144
return func(args mock.Arguments) {
145-
args[0].(*core.OperationUpdate).OnComplete()
145+
args[0].(*core.OperationUpdateAsync).OnComplete()
146146
}
147147
}
148148

@@ -492,7 +492,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) {
492492
assert.NoError(t, err)
493493

494494
namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
495-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
495+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
496496
return ev.NamespacedOpID == namespacedID1 &&
497497
ev.Status == core.OpStatusFailed &&
498498
ev.ErrorMessage == "pop" &&
@@ -503,7 +503,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) {
503503
assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg))
504504

505505
namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
506-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
506+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
507507
return ev.NamespacedOpID == namespacedID2 &&
508508
ev.Status == core.OpStatusSucceeded &&
509509
ev.Plugin == "ffdx"
@@ -513,7 +513,7 @@ func TestMessageEventsBackgroundStart(t *testing.T) {
513513
assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg))
514514

515515
namespacedID3 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
516-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
516+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
517517
return ev.NamespacedOpID == namespacedID3 &&
518518
ev.Status == core.OpStatusSucceeded &&
519519
ev.DXManifest == `{"manifest":true}` &&
@@ -552,7 +552,7 @@ func TestMessageEvents(t *testing.T) {
552552
assert.NoError(t, err)
553553

554554
namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
555-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
555+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
556556
return ev.NamespacedOpID == namespacedID1 &&
557557
ev.Status == core.OpStatusFailed &&
558558
ev.ErrorMessage == "pop" &&
@@ -563,7 +563,7 @@ func TestMessageEvents(t *testing.T) {
563563
assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg))
564564

565565
namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
566-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
566+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
567567
return ev.NamespacedOpID == namespacedID2 &&
568568
ev.Status == core.OpStatusSucceeded &&
569569
ev.Plugin == "ffdx"
@@ -573,7 +573,7 @@ func TestMessageEvents(t *testing.T) {
573573
assert.Equal(t, `{"action":"ack","id":"2"}`, string(msg))
574574

575575
namespacedID3 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
576-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
576+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
577577
return ev.NamespacedOpID == namespacedID3 &&
578578
ev.Status == core.OpStatusSucceeded &&
579579
ev.DXManifest == `{"manifest":true}` &&
@@ -617,7 +617,7 @@ func TestBlobEvents(t *testing.T) {
617617
assert.NoError(t, err)
618618

619619
namespacedID5 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
620-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
620+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
621621
return ev.NamespacedOpID == namespacedID5 &&
622622
ev.Status == core.OpStatusFailed &&
623623
ev.ErrorMessage == "pop" &&
@@ -628,7 +628,7 @@ func TestBlobEvents(t *testing.T) {
628628
assert.Equal(t, `{"action":"ack","id":"5"}`, string(msg))
629629

630630
namespacedID6 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
631-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
631+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
632632
return ev.NamespacedOpID == namespacedID6 &&
633633
ev.Status == core.OpStatusSucceeded &&
634634
ev.Output.String() == `{"some":"details"}` &&
@@ -650,7 +650,7 @@ func TestBlobEvents(t *testing.T) {
650650
assert.Equal(t, `{"action":"ack","id":"9"}`, string(msg))
651651

652652
namespacedID10 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
653-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
653+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
654654
return ev.NamespacedOpID == namespacedID10 &&
655655
ev.Status == core.OpStatusSucceeded &&
656656
ev.Output.String() == `{"signatures":"and stuff"}` &&
@@ -683,7 +683,7 @@ func TestEventsWithManifest(t *testing.T) {
683683
h.SetOperationHandler("ns1", ocb)
684684

685685
namespacedID1 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
686-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
686+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
687687
return ev.NamespacedOpID == namespacedID1 &&
688688
ev.Status == core.OpStatusPending &&
689689
ev.Plugin == "ffdx"
@@ -693,7 +693,7 @@ func TestEventsWithManifest(t *testing.T) {
693693
assert.Equal(t, `{"action":"ack","id":"1"}`, string(msg))
694694

695695
namespacedID2 := fmt.Sprintf("ns1:%s", fftypes.NewUUID())
696-
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdate) bool {
696+
ocb.On("OperationUpdate", mock.MatchedBy(func(ev *core.OperationUpdateAsync) bool {
697697
return ev.NamespacedOpID == namespacedID2 &&
698698
ev.Status == core.OpStatusPending &&
699699
ev.Plugin == "ffdx"

0 commit comments

Comments
 (0)