Skip to content

Commit c6ef02e

Browse files
authored
Merge pull request #1601 from kaleido-io/receipt
Filter receipt events by namespace when possible
2 parents 9a94dc1 + fa27342 commit c6ef02e

File tree

9 files changed

+41
-24
lines changed

9 files changed

+41
-24
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ ARG UI_TAG
7575
ARG UI_RELEASE
7676
RUN apk add --update --no-cache \
7777
sqlite=3.44.2-r0 \
78-
postgresql16-client=16.5-r0 \
78+
postgresql16-client=16.6-r0 \
7979
curl=8.9.1-r1 \
8080
jq=1.7.1-r0
8181
WORKDIR /firefly

internal/blockchain/common/common.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2023 Kaleido, Inc.
1+
// Copyright © 2024 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -392,9 +392,17 @@ func (s *subscriptions) GetSubscription(subID string) *SubscriptionInfo {
392392
}
393393

394394
// Common function for handling receipts from blockchain connectors.
395-
func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainReceiptNotification, callbacks BlockchainCallbacks) error {
395+
func HandleReceipt(ctx context.Context, namespace string, plugin core.Named, reply *BlockchainReceiptNotification, callbacks BlockchainCallbacks) error {
396396
l := log.L(ctx)
397397

398+
if namespace != "" {
399+
opNamespace, _, _ := core.ParseNamespacedOpID(ctx, reply.Headers.ReceiptID)
400+
if opNamespace != namespace {
401+
l.Debugf("Ignoring operation update from other namespace: request=%s tx=%s message=%s", reply.Headers.ReceiptID, reply.TxHash, reply.Message)
402+
return nil
403+
}
404+
}
405+
398406
if reply.Headers.ReceiptID == "" || reply.Headers.ReplyType == "" {
399407
return fmt.Errorf("reply cannot be processed - missing fields: %+v", reply)
400408
}
@@ -409,7 +417,7 @@ func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainRece
409417
updateType = core.OpStatusFailed
410418
}
411419

412-
// Slightly upgly conversion from ReceiptFromBlockchain -> JSONObject which the generic OperationUpdate() function requires
420+
// Slightly ugly conversion from ReceiptFromBlockchain -> JSONObject which the generic OperationUpdate() function requires
413421
var output fftypes.JSONObject
414422
obj, err := json.Marshal(reply)
415423
if err != nil {

internal/blockchain/common/common_test.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright © 2022 Kaleido, Inc.
1+
// Copyright © 2024 Kaleido, Inc.
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
@@ -342,15 +342,15 @@ func TestGoodSuccessReceipt(t *testing.T) {
342342
cb.SetHandler("ns1", mcb)
343343
mcb.On("OperationUpdate", "ns1", mock.Anything).Return()
344344

345-
err := HandleReceipt(context.Background(), nil, &reply, cb)
345+
err := HandleReceipt(context.Background(), "", nil, &reply, cb)
346346
assert.NoError(t, err)
347347

348348
reply.Headers.ReplyType = "TransactionUpdate"
349-
err = HandleReceipt(context.Background(), nil, &reply, cb)
349+
err = HandleReceipt(context.Background(), "", nil, &reply, cb)
350350
assert.NoError(t, err)
351351

352352
reply.Headers.ReplyType = "TransactionFailed"
353-
err = HandleReceipt(context.Background(), nil, &reply, cb)
353+
err = HandleReceipt(context.Background(), "", nil, &reply, cb)
354354
assert.NoError(t, err)
355355
}
356356

@@ -365,7 +365,7 @@ func TestReceiptMarshallingError(t *testing.T) {
365365
cb.SetHandler("ns1", mcb)
366366
mcb.On("OperationUpdate", "ns1", mock.Anything).Return()
367367

368-
err := HandleReceipt(context.Background(), nil, &reply, cb)
368+
err := HandleReceipt(context.Background(), "", nil, &reply, cb)
369369
assert.Error(t, err)
370370
assert.Regexp(t, ".*[^n]marshalling error.*", err)
371371
}
@@ -384,10 +384,19 @@ func TestBadReceipt(t *testing.T) {
384384
data := fftypes.JSONAnyPtr(`{}`)
385385
err := json.Unmarshal(data.Bytes(), &reply)
386386
assert.NoError(t, err)
387-
err = HandleReceipt(context.Background(), nil, &reply, nil)
387+
err = HandleReceipt(context.Background(), "", nil, &reply, nil)
388388
assert.Error(t, err)
389389
}
390390

391+
func TestWrongNamespaceReceipt(t *testing.T) {
392+
var reply BlockchainReceiptNotification
393+
data := fftypes.JSONAnyPtr(`{}`)
394+
err := json.Unmarshal(data.Bytes(), &reply)
395+
assert.NoError(t, err)
396+
err = HandleReceipt(context.Background(), "wrong", nil, &reply, nil)
397+
assert.NoError(t, err)
398+
}
399+
391400
func TestErrorWrappingConflict(t *testing.T) {
392401
ctx := context.Background()
393402
res := &resty.Response{

internal/blockchain/ethereum/ethereum.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ func (e *Ethereum) eventLoop(namespace string, wsconn wsclient.WSClient, closed
513513
if !isBatch {
514514
var receipt common.BlockchainReceiptNotification
515515
_ = json.Unmarshal(msgBytes, &receipt)
516-
err := common.HandleReceipt(ctx, e, &receipt, e.callbacks)
516+
err := common.HandleReceipt(ctx, namespace, e, &receipt, e.callbacks)
517517
if err != nil {
518518
l.Errorf("Failed to process receipt: %+v", msgTyped)
519519
}
@@ -1223,7 +1223,7 @@ func (e *Ethereum) GetTransactionStatus(ctx context.Context, operation *core.Ope
12231223
TxHash: statusResponse.GetString("transactionHash"),
12241224
Message: statusResponse.GetString("errorMessage"),
12251225
ProtocolID: receiptInfo.GetString("protocolId")}
1226-
err := common.HandleReceipt(ctx, e, receipt, e.callbacks)
1226+
err := common.HandleReceipt(ctx, operation.Namespace, e, receipt, e.callbacks)
12271227
if err != nil {
12281228
log.L(ctx).Warnf("Failed to handle receipt")
12291229
}

internal/blockchain/ethereum/ethereum_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1835,7 +1835,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
18351835
err := json.Unmarshal(data.Bytes(), &reply)
18361836
assert.NoError(t, err)
18371837

1838-
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
1838+
common.HandleReceipt(context.Background(), "ns1", e, &reply, e.callbacks)
18391839

18401840
em.AssertExpectations(t)
18411841
}
@@ -1922,7 +1922,7 @@ func TestHandleReceiptTXUpdateEVMConnect(t *testing.T) {
19221922
assert.NoError(t, err)
19231923
expectedReceiptId := "ns1:" + operationID.String()
19241924
assert.Equal(t, reply.Headers.ReceiptID, expectedReceiptId)
1925-
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
1925+
common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks)
19261926

19271927
em.AssertExpectations(t)
19281928
}
@@ -1987,7 +1987,7 @@ func TestHandleMsgBatchBadData(t *testing.T) {
19871987
data := fftypes.JSONAnyPtr(`{}`)
19881988
err := json.Unmarshal(data.Bytes(), &reply)
19891989
assert.NoError(t, err)
1990-
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
1990+
common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks)
19911991
}
19921992

19931993
func TestFormatNil(t *testing.T) {

internal/blockchain/fabric/fabric.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ func (f *Fabric) eventLoop(namespace string, wsconn wsclient.WSClient, closed ch
540540
var receipt common.BlockchainReceiptNotification
541541
_ = json.Unmarshal(msgBytes, &receipt)
542542

543-
err := common.HandleReceipt(ctx, f, &receipt, f.callbacks)
543+
err := common.HandleReceipt(ctx, namespace, f, &receipt, f.callbacks)
544544
if err != nil {
545545
l.Errorf("Failed to process receipt: %+v", msgTyped)
546546
}

internal/blockchain/fabric/fabric_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,7 +1815,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
18151815

18161816
err := json.Unmarshal(data, &reply)
18171817
assert.NoError(t, err)
1818-
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
1818+
common.HandleReceipt(context.Background(), "ns1", e, &reply, e.callbacks)
18191819

18201820
em.AssertExpectations(t)
18211821
}
@@ -1836,7 +1836,7 @@ func TestHandleReceiptNoRequestID(t *testing.T) {
18361836
data := []byte(`{}`)
18371837
err := json.Unmarshal(data, &reply)
18381838
assert.NoError(t, err)
1839-
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
1839+
common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks)
18401840
}
18411841

18421842
func TestHandleReceiptFailedTx(t *testing.T) {
@@ -1876,7 +1876,7 @@ func TestHandleReceiptFailedTx(t *testing.T) {
18761876

18771877
err := json.Unmarshal(data, &reply)
18781878
assert.NoError(t, err)
1879-
common.HandleReceipt(context.Background(), e, &reply, e.callbacks)
1879+
common.HandleReceipt(context.Background(), "", e, &reply, e.callbacks)
18801880

18811881
em.AssertExpectations(t)
18821882
}

internal/blockchain/tezos/tezos.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ func (t *Tezos) GetTransactionStatus(ctx context.Context, operation *core.Operat
591591
TxHash: statusResponse.GetString("transactionHash"),
592592
Message: statusResponse.GetString("errorMessage"),
593593
ProtocolID: receiptInfo.GetString("protocolId")}
594-
err := common.HandleReceipt(ctx, t, receipt, t.callbacks)
594+
err := common.HandleReceipt(ctx, operation.Namespace, t, receipt, t.callbacks)
595595
if err != nil {
596596
log.L(ctx).Warnf("Failed to handle receipt")
597597
}
@@ -822,7 +822,7 @@ func (t *Tezos) eventLoop() {
822822
var receipt common.BlockchainReceiptNotification
823823
_ = json.Unmarshal(msgBytes, &receipt)
824824

825-
err := common.HandleReceipt(ctx, t, &receipt, t.callbacks)
825+
err := common.HandleReceipt(ctx, "", t, &receipt, t.callbacks) // TODO: should be specific to a namespace
826826
if err != nil {
827827
l.Errorf("Failed to process receipt: %+v", msgTyped)
828828
}

internal/blockchain/tezos/tezos_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ func TestHandleReceiptTXSuccess(t *testing.T) {
695695
err := json.Unmarshal(data.Bytes(), &reply)
696696
assert.NoError(t, err)
697697

698-
common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks)
698+
common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks)
699699

700700
tm.AssertExpectations(t)
701701
}
@@ -780,7 +780,7 @@ func TestHandleReceiptTXUpdateTezosConnect(t *testing.T) {
780780
assert.NoError(t, err)
781781
expectedReceiptId := "ns1:" + operationID.String()
782782
assert.Equal(t, reply.Headers.ReceiptID, expectedReceiptId)
783-
common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks)
783+
common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks)
784784

785785
tm.AssertExpectations(t)
786786
}
@@ -797,7 +797,7 @@ func TestHandleMsgBatchBadData(t *testing.T) {
797797
data := fftypes.JSONAnyPtr(`{}`)
798798
err := json.Unmarshal(data.Bytes(), &reply)
799799
assert.NoError(t, err)
800-
common.HandleReceipt(context.Background(), tz, &reply, tz.callbacks)
800+
common.HandleReceipt(context.Background(), "", tz, &reply, tz.callbacks)
801801
}
802802

803803
func TestAddSubscription(t *testing.T) {

0 commit comments

Comments
 (0)