Skip to content

Commit 52523b7

Browse files
Merge branch 'main' of github.com:hyperledger/firefly into dup-batch-handle
2 parents c4e8e7c + 75bb921 commit 52523b7

32 files changed

+92
-88
lines changed

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ linters-settings:
2121
values:
2222
regexp:
2323
COMPANY: .*
24+
YEAR: '\d\d\d\d(-\d\d\d\d)?'
2425
template: |-
2526
Copyright © {{ YEAR }} {{ COMPANY }}
2627

doc-site/docs/reference/types/subscription.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ title: Subscription
8686
| Field Name | Description | Type |
8787
|------------|-------------|------|
8888
| `firstEvent` | Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | `SubOptsFirstEvent` |
89-
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint16` |
89+
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint` |
9090
| `withData` | Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | `bool` |
9191
| `batch` | Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch, allowing client-side optimizations when processing the events in a group. Available for both Webhooks and WebSockets. | `bool` |
9292
| `batchTimeout` | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | `string` |

doc-site/docs/reference/types/wsstart.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ title: WSStart
7777
| Field Name | Description | Type |
7878
|------------|-------------|------|
7979
| `firstEvent` | Whether your application would like to receive events from the 'oldest' event emitted by your FireFly node (from the beginning of time), or the 'newest' event (from now), or a specific event sequence. Default is 'newest' | `SubOptsFirstEvent` |
80-
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint16` |
80+
| `readAhead` | The number of events to stream ahead to your application, while waiting for confirmation of consumption of those events. At least once delivery semantics are used in FireFly, so if your application crashes/reconnects this is the maximum number of events you would expect to be redelivered after it restarts | `uint` |
8181
| `withData` | Whether message events delivered over the subscription, should be packaged with the full data of those messages in-line as part of the event JSON payload. Or if the application should make separate REST calls to download that data. May not be supported on some transports. | `bool` |
8282
| `batch` | Events are delivered in batches in an ordered array. The batch size is capped to the readAhead limit. The event payload is always an array even if there is a single event in the batch, allowing client-side optimizations when processing the events in a group. Available for both Webhooks and WebSockets. | `bool` |
8383
| `batchTimeout` | When batching is enabled, the optional timeout to send events even when the batch hasn't filled. | `string` |

doc-site/docs/swagger/swagger.yaml

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29287,7 +29287,6 @@ paths:
2928729287
used in FireFly, so if your application crashes/reconnects
2928829288
this is the maximum number of events you would expect
2928929289
to be redelivered after it restarts
29290-
maximum: 65535
2929129290
minimum: 0
2929229291
type: integer
2929329292
reply:
@@ -29572,7 +29571,6 @@ paths:
2957229571
At least once delivery semantics are used in FireFly, so if
2957329572
your application crashes/reconnects this is the maximum number
2957429573
of events you would expect to be redelivered after it restarts
29575-
maximum: 65535
2957629574
minimum: 0
2957729575
type: integer
2957829576
reply:
@@ -29846,7 +29844,6 @@ paths:
2984629844
in FireFly, so if your application crashes/reconnects this
2984729845
is the maximum number of events you would expect to be redelivered
2984829846
after it restarts
29849-
maximum: 65535
2985029847
minimum: 0
2985129848
type: integer
2985229849
reply:
@@ -30128,7 +30125,6 @@ paths:
3012830125
At least once delivery semantics are used in FireFly, so if
3012930126
your application crashes/reconnects this is the maximum number
3013030127
of events you would expect to be redelivered after it restarts
30131-
maximum: 65535
3013230128
minimum: 0
3013330129
type: integer
3013430130
reply:
@@ -30402,7 +30398,6 @@ paths:
3040230398
in FireFly, so if your application crashes/reconnects this
3040330399
is the maximum number of events you would expect to be redelivered
3040430400
after it restarts
30405-
maximum: 65535
3040630401
minimum: 0
3040730402
type: integer
3040830403
reply:
@@ -30750,7 +30745,6 @@ paths:
3075030745
in FireFly, so if your application crashes/reconnects this
3075130746
is the maximum number of events you would expect to be redelivered
3075230747
after it restarts
30753-
maximum: 65535
3075430748
minimum: 0
3075530749
type: integer
3075630750
reply:
@@ -38574,7 +38568,6 @@ paths:
3857438568
used in FireFly, so if your application crashes/reconnects
3857538569
this is the maximum number of events you would expect
3857638570
to be redelivered after it restarts
38577-
maximum: 65535
3857838571
minimum: 0
3857938572
type: integer
3858038573
reply:
@@ -38852,7 +38845,6 @@ paths:
3885238845
At least once delivery semantics are used in FireFly, so if
3885338846
your application crashes/reconnects this is the maximum number
3885438847
of events you would expect to be redelivered after it restarts
38855-
maximum: 65535
3885638848
minimum: 0
3885738849
type: integer
3885838850
reply:
@@ -39126,7 +39118,6 @@ paths:
3912639118
in FireFly, so if your application crashes/reconnects this
3912739119
is the maximum number of events you would expect to be redelivered
3912839120
after it restarts
39129-
maximum: 65535
3913039121
minimum: 0
3913139122
type: integer
3913239123
reply:
@@ -39401,7 +39392,6 @@ paths:
3940139392
At least once delivery semantics are used in FireFly, so if
3940239393
your application crashes/reconnects this is the maximum number
3940339394
of events you would expect to be redelivered after it restarts
39404-
maximum: 65535
3940539395
minimum: 0
3940639396
type: integer
3940739397
reply:
@@ -39675,7 +39665,6 @@ paths:
3967539665
in FireFly, so if your application crashes/reconnects this
3967639666
is the maximum number of events you would expect to be redelivered
3967739667
after it restarts
39678-
maximum: 65535
3967939668
minimum: 0
3968039669
type: integer
3968139670
reply:
@@ -40009,7 +39998,6 @@ paths:
4000939998
in FireFly, so if your application crashes/reconnects this
4001039999
is the maximum number of events you would expect to be redelivered
4001140000
after it restarts
40012-
maximum: 65535
4001340001
minimum: 0
4001440002
type: integer
4001540003
reply:

internal/apiserver/ffi2swagger.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (swg *ffiSwaggerGen) Build(ctx context.Context, api *core.ContractAPI, ffi
8181

8282
func addFFIMethod(ctx context.Context, routes []*ffapi.Route, method *fftypes.FFIMethod, hasLocation bool) []*ffapi.Route {
8383
description := method.Description
84-
if method.Details != nil && len(method.Details) > 0 {
84+
if len(method.Details) > 0 {
8585
additionalDetailsHeader := i18n.Expand(ctx, coremsgs.APISmartContractDetails)
8686
description = fmt.Sprintf("%s\n\n%s:\n\n%s", description, additionalDetailsHeader, buildDetailsTable(ctx, method.Details))
8787
}
@@ -117,7 +117,7 @@ func addFFIMethod(ctx context.Context, routes []*ffapi.Route, method *fftypes.FF
117117

118118
func addFFIEvent(ctx context.Context, routes []*ffapi.Route, event *fftypes.FFIEvent, hasLocation bool) []*ffapi.Route {
119119
description := event.Description
120-
if event.Details != nil && len(event.Details) > 0 {
120+
if len(event.Details) > 0 {
121121
additionalDetailsHeader := i18n.Expand(ctx, coremsgs.APISmartContractDetails)
122122
description = fmt.Sprintf("%s\n\n%s:\n\n%s", description, additionalDetailsHeader, buildDetailsTable(ctx, event.Details))
123123
}

internal/batch/batch_manager.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,11 @@ func NewBatchManager(ctx context.Context, ns string, di database.Plugin, dm data
4444
return nil, i18n.NewError(ctx, coremsgs.MsgInitializationNilDepError, "BatchManager")
4545
}
4646
pCtx, cancelCtx := context.WithCancel(log.WithLogField(ctx, "role", "batchmgr"))
47-
readPageSize := config.GetUint(coreconfig.BatchManagerReadPageSize)
47+
readPageSize := uint16(1)
48+
confReadPageSize := config.GetUint64(coreconfig.BatchManagerReadPageSize)
49+
if confReadPageSize > 0 && confReadPageSize <= 65535 {
50+
readPageSize = uint16(confReadPageSize)
51+
}
4852
bm := &batchManager{
4953
ctx: pCtx,
5054
cancelCtx: cancelCtx,
@@ -54,7 +58,7 @@ func NewBatchManager(ctx context.Context, ns string, di database.Plugin, dm data
5458
data: dm,
5559
txHelper: txHelper,
5660
readOffset: -1, // On restart we trawl for all ready messages
57-
readPageSize: uint64(readPageSize),
61+
readPageSize: readPageSize,
5862
minimumPollDelay: config.GetDuration(coreconfig.BatchManagerMinimumPollDelay),
5963
messagePollTimeout: config.GetDuration(coreconfig.BatchManagerReadPollTimeout),
6064
startupOffsetRetryAttempts: config.GetInt(coreconfig.OrchestratorStartupAttempts),
@@ -116,7 +120,7 @@ type batchManager struct {
116120
inflightSequences map[int64]*batchProcessor
117121
inflightFlushed []int64
118122
shoulderTap chan bool
119-
readPageSize uint64
123+
readPageSize uint16
120124
minimumPollDelay time.Duration
121125
messagePollTimeout time.Duration
122126
startupOffsetRetryAttempts int
@@ -126,7 +130,7 @@ type DispatchHandler func(context.Context, *DispatchPayload) error
126130

127131
type DispatcherOptions struct {
128132
BatchType core.BatchType
129-
BatchMaxSize uint
133+
BatchMaxSize int
130134
BatchMaxBytes int64
131135
BatchTimeout time.Duration
132136
DisposeTimeout time.Duration
@@ -279,11 +283,11 @@ func (bm *batchManager) readPage(lastPageFull bool) ([]*core.IDAndSequence, bool
279283
// Read a page from the DB
280284
var ids []*core.IDAndSequence
281285
err := bm.retry.Do(bm.ctx, "retrieve messages", func(attempt int) (retry bool, err error) {
282-
fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, bm.readPageSize)
286+
fb := database.MessageQueryFactory.NewFilterLimit(bm.ctx, uint64(bm.readPageSize))
283287
ids, err = bm.database.GetMessageIDs(bm.ctx, bm.namespace, fb.And(
284288
fb.Gt("sequence", bm.readOffset),
285289
fb.Eq("state", core.MessageStateReady),
286-
).Sort("sequence").Limit(bm.readPageSize))
290+
).Sort("sequence").Limit(uint64(bm.readPageSize)))
287291
return true, err
288292
})
289293

@@ -549,6 +553,7 @@ func (bm *batchManager) maskContext(ctx context.Context, state *dispatchState, m
549553

550554
// Now we have the nonce, add that at the end of the hash to make it unqiue to this message
551555
nonceBytes := make([]byte, 8)
556+
//nolint:gosec
552557
binary.BigEndian.PutUint64(nonceBytes, uint64(nonce))
553558
hashBuilder.Write(nonceBytes)
554559

internal/batch/batch_manager_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ func newTestBatchManager(t *testing.T) (*batchManager, func()) {
5353
cmi := &cachemocks.Manager{}
5454
cmi.On("GetCache", mock.Anything).Return(cache.NewUmanagedCache(ctx, 100, 5*time.Minute), nil)
5555
txHelper, _ := txcommon.NewTransactionHelper(ctx, "ns1", mdi, mdm, cmi)
56+
config.Set(coreconfig.BatchManagerReadPageSize, 0) // will get min value of 1
5657
bm, err := NewBatchManager(context.Background(), "ns1", mdi, mdm, mim, txHelper)
5758
assert.NoError(t, err)
5859
return bm.(*batchManager), bm.(*batchManager).cancelCtx

internal/batch/batch_processor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func (bp *batchProcessor) addWork(newWork *batchWork) (full, overflow bool) {
228228
bp.assemblyQueueBytes += newWork.estimateSize()
229229
bp.assemblyQueue = newQueue
230230

231-
full = len(bp.assemblyQueue) >= int(bp.conf.BatchMaxSize) || bp.assemblyQueueBytes >= bp.conf.BatchMaxBytes
231+
full = len(bp.assemblyQueue) >= bp.conf.BatchMaxSize || bp.assemblyQueueBytes >= bp.conf.BatchMaxBytes
232232
overflow = len(bp.assemblyQueue) > 1 && (batchOfOne || bp.assemblyQueueBytes > bp.conf.BatchMaxBytes)
233233
}
234234

internal/blockchain/ethereum/ethereum.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf
196196
e.streamID = make(map[string]string)
197197
e.closed = make(map[string]chan struct{})
198198
e.wsconn = make(map[string]wsclient.WSClient)
199-
e.streams = newStreamManager(e.client, e.cache, e.ethconnectConf.GetUint(EthconnectConfigBatchSize), uint(e.ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds()))
199+
e.streams = newStreamManager(e.client, e.cache, e.ethconnectConf.GetUint(EthconnectConfigBatchSize), e.ethconnectConf.GetDuration(EthconnectConfigBatchTimeout).Milliseconds())
200200

201201
return nil
202202
}

internal/blockchain/ethereum/eventstream.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ type streamManager struct {
3838
client *resty.Client
3939
cache cache.CInterface
4040
batchSize uint
41-
batchTimeout uint
41+
batchTimeout int64
4242
}
4343

4444
type eventStream struct {
4545
ID string `json:"id"`
4646
Name string `json:"name"`
4747
ErrorHandling string `json:"errorHandling"`
4848
BatchSize uint `json:"batchSize"`
49-
BatchTimeoutMS uint `json:"batchTimeoutMS"`
49+
BatchTimeoutMS int64 `json:"batchTimeoutMS"`
5050
Type string `json:"type"`
5151
WebSocket eventStreamWebsocket `json:"websocket"`
5252
Timestamps bool `json:"timestamps"`
@@ -73,7 +73,7 @@ type subscriptionCheckpoint struct {
7373
Catchup bool `json:"catchup,omitempty"`
7474
}
7575

76-
func newStreamManager(client *resty.Client, cache cache.CInterface, batchSize, batchTimeout uint) *streamManager {
76+
func newStreamManager(client *resty.Client, cache cache.CInterface, batchSize uint, batchTimeout int64) *streamManager {
7777
return &streamManager{
7878
client: client,
7979
cache: cache,
@@ -93,7 +93,7 @@ func (s *streamManager) getEventStreams(ctx context.Context) (streams []*eventSt
9393
return streams, nil
9494
}
9595

96-
func buildEventStream(topic string, batchSize, batchTimeout uint) *eventStream {
96+
func buildEventStream(topic string, batchSize uint, batchTimeout int64) *eventStream {
9797
return &eventStream{
9898
Name: topic,
9999
ErrorHandling: "block",
@@ -120,7 +120,7 @@ func (s *streamManager) createEventStream(ctx context.Context, topic string) (*e
120120
return stream, nil
121121
}
122122

123-
func (s *streamManager) updateEventStream(ctx context.Context, topic string, batchSize, batchTimeout uint, eventStreamID string) (*eventStream, error) {
123+
func (s *streamManager) updateEventStream(ctx context.Context, topic string, batchSize uint, batchTimeout int64, eventStreamID string) (*eventStream, error) {
124124
stream := buildEventStream(topic, batchSize, batchTimeout)
125125
res, err := s.client.R().
126126
SetContext(ctx).

0 commit comments

Comments
 (0)