Skip to content

Commit 8a0d5c4

Browse files
authored
Merge pull request #1637 from hyperledger/reliable_receipts
Operation handling for reliable receipts
2 parents d484497 + 456279a commit 8a0d5c4

File tree

65 files changed

+602
-223
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+602
-223
lines changed

internal/blockchain/common/common.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -43,6 +43,12 @@ type BlockchainCallbacks interface {
4343
SetHandler(namespace string, handler blockchain.Callbacks)
4444
SetOperationalHandler(namespace string, handler core.OperationCallbacks)
4545

46+
// BulkOperationUpdates is a synchronous way to update multiple operations and will return when the updates have been committed to the database or there has been an error
47+
// An insertion ordering guarantee is only provided when this code is called on a single goroutine inside of the connector.
48+
// It is the responsibility of the connector code to allocate that routine, and ensure that there is only one.
49+
// Note: onComplete at each update level is not called, as this is a bulk operation and should be reponsibility of the caller to manage if needed.
50+
BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error
51+
4652
OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject)
4753
// Common logic for parsing a BatchPinOrNetworkAction event, and if not discarded to add it to the by-namespace map
4854
PrepareBatchPinOrNetworkAction(ctx context.Context, events EventsToDispatch, subInfo *SubscriptionInfo, location *fftypes.JSONAny, event *blockchain.Event, signingKey *core.VerifierRef, params *BatchPinParams)
@@ -64,6 +70,17 @@ type callbacks struct {
6470
opHandlers map[string]core.OperationCallbacks
6571
}
6672

73+
// BulkOperationUpdates implements BlockchainCallbacks.
74+
func (cb *callbacks) BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate) error {
75+
if handler, ok := cb.opHandlers[namespace]; ok {
76+
return handler.BulkOperationUpdates(ctx, updates)
77+
}
78+
// We don't want to error as it just means this update was not for this namespace
79+
// This is unlikely to happen in practice, as the namespace is always passed in the operation handler
80+
log.L(ctx).Errorf("No operation handler found for namespace '%s'", namespace)
81+
return nil
82+
}
83+
6784
type subscriptions struct {
6885
subs map[string]*SubscriptionInfo
6986
}
@@ -153,13 +170,15 @@ func (cb *callbacks) SetOperationalHandler(namespace string, handler core.Operat
153170
func (cb *callbacks) OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject) {
154171
namespace, _, _ := core.ParseNamespacedOpID(ctx, nsOpID)
155172
if handler, ok := cb.opHandlers[namespace]; ok {
156-
handler.OperationUpdate(&core.OperationUpdate{
157-
Plugin: plugin.Name(),
158-
NamespacedOpID: nsOpID,
159-
Status: status,
160-
BlockchainTXID: blockchainTXID,
161-
ErrorMessage: errorMessage,
162-
Output: opOutput,
173+
handler.OperationUpdate(&core.OperationUpdateAsync{
174+
OperationUpdate: core.OperationUpdate{
175+
Plugin: plugin.Name(),
176+
NamespacedOpID: nsOpID,
177+
Status: status,
178+
BlockchainTXID: blockchainTXID,
179+
ErrorMessage: errorMessage,
180+
Output: opOutput,
181+
},
163182
})
164183
return
165184
}

internal/blockchain/common/common_test.go

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func TestCallbackOperationUpdate(t *testing.T) {
4444
cb.SetOperationalHandler("ns1", mcb)
4545

4646
mbi.On("Name").Return("utblockchain")
47-
mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
47+
mcb.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
4848
return update.NamespacedOpID == nsOpID &&
4949
update.Status == core.OpStatusSucceeded &&
5050
update.BlockchainTXID == "tx1" &&
@@ -455,3 +455,54 @@ func TestErrorWrappingNonConflict(t *testing.T) {
455455
_, conforms := err.(operations.ConflictError)
456456
assert.False(t, conforms)
457457
}
458+
459+
func TestCallbackBulkOperationUpdate(t *testing.T) {
460+
nsOpID := "ns1:" + fftypes.NewUUID().String()
461+
nsOpID2 := "ns1:" + fftypes.NewUUID().String()
462+
463+
mbi := &blockchainmocks.Plugin{}
464+
mcb := &coremocks.OperationCallbacks{}
465+
cb := NewBlockchainCallbacks()
466+
cb.SetOperationalHandler("ns1", mcb)
467+
468+
mbi.On("Name").Return("utblockchain")
469+
mcb.On("BulkOperationUpdates", mock.Anything, mock.MatchedBy(func(updates []*core.OperationUpdate) bool {
470+
assert.True(t, updates[0].NamespacedOpID == nsOpID &&
471+
updates[0].Status == core.OpStatusSucceeded &&
472+
updates[0].BlockchainTXID == "tx1" &&
473+
updates[0].ErrorMessage == "err" &&
474+
updates[0].Plugin == "utblockchain")
475+
476+
assert.True(t, updates[1].NamespacedOpID == nsOpID2 &&
477+
updates[1].Status == core.OpStatusSucceeded &&
478+
updates[1].BlockchainTXID == "tx2" &&
479+
updates[1].ErrorMessage == "err" &&
480+
updates[1].Plugin == "utblockchain")
481+
482+
return true
483+
})).Return(nil).Once()
484+
485+
cb.BulkOperationUpdates(context.Background(), "ns1", []*core.OperationUpdate{
486+
{
487+
NamespacedOpID: nsOpID,
488+
Status: core.OpStatusSucceeded,
489+
BlockchainTXID: "tx1",
490+
ErrorMessage: "err",
491+
Output: fftypes.JSONObject{},
492+
Plugin: "utblockchain",
493+
},
494+
{
495+
NamespacedOpID: nsOpID2,
496+
Status: core.OpStatusSucceeded,
497+
BlockchainTXID: "tx2",
498+
ErrorMessage: "err",
499+
Output: fftypes.JSONObject{},
500+
Plugin: "utblockchain",
501+
},
502+
})
503+
504+
// No Handler
505+
cb.BulkOperationUpdates(context.Background(), "ns2", []*core.OperationUpdate{})
506+
507+
mcb.AssertExpectations(t)
508+
}

internal/blockchain/ethereum/ethereum_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -1825,7 +1825,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
18251825
"transactionIndex": "0"
18261826
}`)
18271827

1828-
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1828+
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
18291829
return update.NamespacedOpID == "ns1:"+operationID.String() &&
18301830
update.Status == core.OpStatusSucceeded &&
18311831
update.BlockchainTXID == "0x71a38acb7a5d4a970854f6d638ceb1fa10a4b59cbf4ed7674273a1a8dc8b36b8" &&
@@ -1911,7 +1911,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) {
19111911
"updated": "2022-08-03T18:55:43.781941Z"
19121912
}`)
19131913

1914-
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1914+
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
19151915
return update.NamespacedOpID == "ns1:"+operationID.String() &&
19161916
update.Status == core.OpStatusPending &&
19171917
update.BlockchainTXID == "0x929c898a46762d91e9f4b0b8e2800863dcf4a40f694109dc4cd19dbd334fa4cc" &&
@@ -1955,7 +1955,7 @@ func TestHandleBadPayloadsAndThenReceiptFailure(t *testing.T) {
19551955

19561956
em := &coremocks.OperationCallbacks{}
19571957
e.SetOperationHandler("ns1", em)
1958-
txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1958+
txsu := em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
19591959
return update.NamespacedOpID == "ns1:"+operationID.String() &&
19601960
update.Status == core.OpStatusFailed &&
19611961
update.ErrorMessage == "Packing arguments for method 'broadcastBatch': abi: cannot use [3]uint8 as type [32]uint8 as argument" &&

internal/blockchain/fabric/fabric_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -1778,7 +1778,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
17781778
"receivedAt": 1630033474675
17791779
}`)
17801780

1781-
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1781+
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
17821782
return update.NamespacedOpID == "ns1:"+operationID.String() &&
17831783
update.Status == core.OpStatusSucceeded &&
17841784
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" &&
@@ -1839,7 +1839,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {
18391839
"transactionHash": "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2"
18401840
}`)
18411841

1842-
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
1842+
em.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
18431843
return update.NamespacedOpID == "ns1:"+operationID.String() &&
18441844
update.Status == core.OpStatusFailed &&
18451845
update.BlockchainTXID == "ce79343000e851a0c742f63a733ce19a5f8b9ce1c719b6cecd14f01bcf81fff2" &&

internal/blockchain/tezos/tezos_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -685,7 +685,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
685685
}
686686
}`)
687687

688-
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
688+
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
689689
return update.NamespacedOpID == "ns1:"+operationID.String() &&
690690
update.Status == core.OpStatusSucceeded &&
691691
update.BlockchainTXID == "ooGcrcazgcGBrY1iym329ovV13MnWrTmV1fttCwWKH5DiYUQsiq" &&
@@ -769,7 +769,7 @@ func TestHandleReceiptTXUpdateTezosConnect(t *testing.T) {
769769
"updated": "2023-09-10T14:49:36.030604Z"
770770
}`)
771771

772-
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdate) bool {
772+
tm.On("OperationUpdate", mock.MatchedBy(func(update *core.OperationUpdateAsync) bool {
773773
return update.NamespacedOpID == "ns1:"+operationID.String() &&
774774
update.Status == core.OpStatusPending &&
775775
update.BlockchainTXID == "onhZJDmz5JihnW1RaZ96f17FgUBv3GoERkRECK3XVFt1kL5E6Yy" &&

internal/coremsgs/en_error_messages.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -316,4 +316,6 @@ var (
316316
MsgFiltersEmpty = ffe("FF10475", "No filters specified in contract listener: %s.", 500)
317317
MsgContractListenerBlockchainFilterLimit = ffe("FF10476", "Blockchain plugin only supports one filter for contract listener: %s.", 500)
318318
MsgDuplicateContractListenerFilterLocation = ffe("FF10477", "Duplicate filter provided for contract listener for location", 400)
319+
MsgInvalidNamespaceForOperationUpdate = ffe("FF10478", "Received different namespace for operation update '%s' than expected for manager '%s'")
320+
MsgEmptyPluginForOperationUpdate = ffe("FF10479", "Received empty plugin for operation update '%s'")
319321
)

internal/dataexchange/ffdx/dxevent.go

Lines changed: 55 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2023 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -90,71 +90,83 @@ func (h *FFDX) dispatchEvent(msg *wsEvent) {
9090

9191
switch msg.Type {
9292
case messageFailed:
93-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
94-
Plugin: h.Name(),
95-
NamespacedOpID: msg.RequestID,
96-
Status: core.OpStatusFailed,
97-
ErrorMessage: msg.Error,
98-
Output: msg.Info,
99-
OnComplete: e.Ack,
93+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
94+
OperationUpdate: core.OperationUpdate{
95+
Plugin: h.Name(),
96+
NamespacedOpID: msg.RequestID,
97+
Status: core.OpStatusFailed,
98+
ErrorMessage: msg.Error,
99+
Output: msg.Info,
100+
},
101+
OnComplete: e.Ack,
100102
})
101103
return
102104
case messageDelivered:
103105
status := core.OpStatusSucceeded
104106
if h.capabilities.Manifest {
105107
status = core.OpStatusPending
106108
}
107-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
108-
Plugin: h.Name(),
109-
NamespacedOpID: msg.RequestID,
110-
Status: status,
111-
Output: msg.Info,
112-
OnComplete: e.Ack,
109+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
110+
OperationUpdate: core.OperationUpdate{
111+
Plugin: h.Name(),
112+
NamespacedOpID: msg.RequestID,
113+
Status: status,
114+
Output: msg.Info,
115+
},
116+
OnComplete: e.Ack,
113117
})
114118
return
115119
case messageAcknowledged:
116-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
117-
Plugin: h.Name(),
118-
NamespacedOpID: msg.RequestID,
119-
Status: core.OpStatusSucceeded,
120-
VerifyManifest: h.capabilities.Manifest,
121-
DXManifest: msg.Manifest,
122-
Output: msg.Info,
123-
OnComplete: e.Ack,
120+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
121+
OperationUpdate: core.OperationUpdate{
122+
Plugin: h.Name(),
123+
NamespacedOpID: msg.RequestID,
124+
Status: core.OpStatusSucceeded,
125+
VerifyManifest: h.capabilities.Manifest,
126+
DXManifest: msg.Manifest,
127+
Output: msg.Info,
128+
},
129+
OnComplete: e.Ack,
124130
})
125131
return
126132
case blobFailed:
127-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
128-
Plugin: h.Name(),
129-
NamespacedOpID: msg.RequestID,
130-
Status: core.OpStatusFailed,
131-
ErrorMessage: msg.Error,
132-
Output: msg.Info,
133-
OnComplete: e.Ack,
133+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
134+
OperationUpdate: core.OperationUpdate{
135+
Plugin: h.Name(),
136+
NamespacedOpID: msg.RequestID,
137+
Status: core.OpStatusFailed,
138+
ErrorMessage: msg.Error,
139+
Output: msg.Info,
140+
},
141+
OnComplete: e.Ack,
134142
})
135143
return
136144
case blobDelivered:
137145
status := core.OpStatusSucceeded
138146
if h.capabilities.Manifest {
139147
status = core.OpStatusPending
140148
}
141-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
142-
Plugin: h.Name(),
143-
NamespacedOpID: msg.RequestID,
144-
Status: status,
145-
Output: msg.Info,
146-
OnComplete: e.Ack,
149+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
150+
OperationUpdate: core.OperationUpdate{
151+
Plugin: h.Name(),
152+
NamespacedOpID: msg.RequestID,
153+
Status: status,
154+
Output: msg.Info,
155+
},
156+
OnComplete: e.Ack,
147157
})
148158
return
149159
case blobAcknowledged:
150-
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdate{
151-
Plugin: h.Name(),
152-
NamespacedOpID: msg.RequestID,
153-
Status: core.OpStatusSucceeded,
154-
Output: msg.Info,
155-
VerifyManifest: h.capabilities.Manifest,
156-
DXHash: msg.Hash,
157-
OnComplete: e.Ack,
160+
h.callbacks.OperationUpdate(h.ctx, &core.OperationUpdateAsync{
161+
OperationUpdate: core.OperationUpdate{
162+
Plugin: h.Name(),
163+
NamespacedOpID: msg.RequestID,
164+
Status: core.OpStatusSucceeded,
165+
Output: msg.Info,
166+
VerifyManifest: h.capabilities.Manifest,
167+
DXHash: msg.Hash,
168+
},
169+
OnComplete: e.Ack,
158170
})
159171
return
160172

internal/dataexchange/ffdx/ffdx.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2024 Kaleido, Inc.
1+
// Copyright © 2025 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -68,7 +68,7 @@ type callbacks struct {
6868
opHandlers map[string]core.OperationCallbacks
6969
}
7070

71-
func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdate) {
71+
func (cb *callbacks) OperationUpdate(ctx context.Context, update *core.OperationUpdateAsync) {
7272
namespace, _, _ := core.ParseNamespacedOpID(ctx, update.NamespacedOpID)
7373
if handler, ok := cb.opHandlers[namespace]; ok {
7474
handler.OperationUpdate(update)

0 commit comments

Comments
 (0)