Skip to content

Commit 387af52

Browse files
committed
Update to return an error in the channel
Signed-off-by: Enrique Lacal <enrique.lacal@kaleido.io>
1 parent 77d478f commit 387af52

File tree

8 files changed

+19
-14
lines changed

8 files changed

+19
-14
lines changed

internal/blockchain/common/common.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ type BlockchainCallbacks interface {
4444
SetOperationalHandler(namespace string, handler core.OperationCallbacks)
4545

4646
// BulkOperationUpdates is a way to update multiple operations and get notified when the updates have been committed to the database
47-
BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate, onCommit chan<- bool)
47+
BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate, onCommit chan<- error)
4848

4949
OperationUpdate(ctx context.Context, plugin core.Named, nsOpID string, status core.OpStatus, blockchainTXID, errorMessage string, opOutput fftypes.JSONObject)
5050
// Common logic for parsing a BatchPinOrNetworkAction event, and if not discarded to add it to the by-namespace map
@@ -68,7 +68,7 @@ type callbacks struct {
6868
}
6969

7070
// BulkOperationUpdates implements BlockchainCallbacks.
71-
func (cb *callbacks) BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate, onCommit chan<- bool) {
71+
func (cb *callbacks) BulkOperationUpdates(ctx context.Context, namespace string, updates []*core.OperationUpdate, onCommit chan<- error) {
7272
if handler, ok := cb.opHandlers[namespace]; ok {
7373
handler.BulkOperationUpdates(ctx, updates, onCommit)
7474
return

internal/operations/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type Manager interface {
4747
ResubmitOperations(ctx context.Context, txID *fftypes.UUID) (total int, resubmit []*core.Operation, err error)
4848
AddOrReuseOperation(ctx context.Context, op *core.Operation, hooks ...database.PostCompletionHook) error
4949
BulkInsertOperations(ctx context.Context, ops ...*core.Operation) error
50-
SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- bool)
50+
SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- error)
5151
SubmitOperationUpdate(update *core.OperationUpdate)
5252
GetOperationByIDCached(ctx context.Context, opID *fftypes.UUID) (*core.Operation, error)
5353
ResolveOperationByID(ctx context.Context, opID *fftypes.UUID, op *core.OperationUpdateDTO) error
@@ -79,7 +79,7 @@ type operationsManager struct {
7979
}
8080

8181
// SubmitBulkOperationUpdate implements Manager.
82-
func (om *operationsManager) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- bool) {
82+
func (om *operationsManager) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- error) {
8383
for _, update := range updates {
8484
errString := ""
8585
if update.ErrorMessage != "" {

internal/operations/manager_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -765,7 +765,7 @@ func TestSubmitBulkOperationUpdates(t *testing.T) {
765765
}
766766

767767
// Create a channel to receive the onCommit signal
768-
onCommit := make(chan bool, 1)
768+
onCommit := make(chan error, 1)
769769
go om.SubmitBulkOperationUpdates(ctx, []*core.OperationUpdate{submittedUpdate, submittedUpdate2}, onCommit)
770770

771771
update := <-om.updater.workQueues[0]
@@ -779,5 +779,6 @@ func TestSubmitBulkOperationUpdates(t *testing.T) {
779779
update2.OnComplete()
780780

781781
// Wait for oncommit signal
782-
<-onCommit
782+
err := <-onCommit
783+
assert.NoError(t, err)
783784
}

internal/operations/operation_updater.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (ou *operationUpdater) pickWorker(ctx context.Context, id *fftypes.UUID, up
9494
}
9595

9696
// SubmitBulkOperationUpdates will wait for the commit to DB before calling the onCommit
97-
func (ou *operationUpdater) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- bool) {
97+
func (ou *operationUpdater) SubmitBulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- error) {
9898
validUpdates := []*core.OperationUpdate{}
9999
for _, update := range updates {
100100
ns, _, err := core.ParseNamespacedOpID(ctx, update.NamespacedOpID)
@@ -113,19 +113,24 @@ func (ou *operationUpdater) SubmitBulkOperationUpdates(ctx context.Context, upda
113113

114114
// Notice how this is not using the workers
115115
// The reason is because we want for all updates to be stored at once in this order
116-
// If offloaded into worker the updates would be processed in parallel, in different DB TX and in a different order
116+
// If offloaded into workers the updates would be processed in parallel, in different DB TX and in a different order
117117
go func() {
118118
// Copy of the array
119+
// TODO make this a deep copy
119120
updates := validUpdates
120121
// This retries forever until there is no error
121122
// but returns on cancelled context
122123
err := ou.doBatchUpdateWithRetry(ctx, updates)
123124
if err != nil {
124125
log.L(ctx).Warnf("Exiting while updating operation: %s", err)
126+
if onCommit != nil {
127+
onCommit <- err
128+
}
129+
return
125130
}
126131
// Batch has been updated correctly
127132
if onCommit != nil {
128-
onCommit <- true
133+
onCommit <- nil
129134
}
130135
}()
131136
}
@@ -236,7 +241,6 @@ func (ou *operationUpdater) doBatchUpdateWithRetry(ctx context.Context, updates
236241
}
237242

238243
func (ou *operationUpdater) doBatchUpdate(ctx context.Context, updates []*core.OperationUpdate) error {
239-
240244
// Get all the operations that match
241245
opIDs := make([]*fftypes.UUID, 0, len(updates))
242246
for _, update := range updates {

internal/orchestrator/bound_callbacks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (bc *boundCallbacks) checkStopped() error {
4141
return nil
4242
}
4343

44-
func (bc *boundCallbacks) BulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- bool) {
44+
func (bc *boundCallbacks) BulkOperationUpdates(ctx context.Context, updates []*core.OperationUpdate, onCommit chan<- error) {
4545
bc.o.operations.SubmitBulkOperationUpdates(ctx, updates, onCommit)
4646
}
4747

mocks/coremocks/operation_callbacks.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mocks/operationmocks/manager.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/core/operation.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ func ParseNamespacedOpID(ctx context.Context, nsIDStr string) (string, *fftypes.
245245

246246
type OperationCallbacks interface {
247247
OperationUpdate(update *OperationUpdate)
248-
BulkOperationUpdates(ctx context.Context, updates []*OperationUpdate, onCommit chan<- bool)
248+
BulkOperationUpdates(ctx context.Context, updates []*OperationUpdate, onCommit chan<- error)
249249
}
250250

251251
// OperationUpdate notifies FireFly of an update to an operation.

0 commit comments

Comments
 (0)