Skip to content

Commit c3a445d

Browse files
committed
Ensure getSlotsForAddress are not called for every batch in the block range
1 parent 855479b commit c3a445d

File tree

1 file changed

+63
-0
lines changed

1 file changed

+63
-0
lines changed

pkg/solana/logpoller/log_poller_test.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,69 @@ func TestLogPoller_run(t *testing.T) {
284284
})
285285
}
286286

287+
// TestLogPoller_run_EncodedLogCollector_uniqueGetSignaturesRequests asserts that when LogPollerSlotsBatchSize is
288+
// smaller than the number of slots in the backfill range, block fetching runs in multiple batches but
289+
// GetSignaturesForAddress (getSlotsForAddressJob) is not invoked twice for the same RPC cursor (MinContextSlot + Before).
290+
func TestLogPoller_run_EncodedLogCollector_uniqueGetSignaturesRequests(t *testing.T) {
291+
t.Parallel()
292+
293+
const slotsBatchSize = 2
294+
cfg := config.NewDefault()
295+
cfg.Chain.LogPollerSlotsBatchSize = ptr[int64](slotsBatchSize)
296+
297+
lp := newMockedLPwithConfig(t, cfg, chainID)
298+
loader := NewEncodedLogCollector(lp.Client, logger.Test(t), t.Name(), nil, nil, slotsBatchSize)
299+
require.NoError(t, loader.Start(t.Context()))
300+
t.Cleanup(func() { require.NoError(t, loader.Close()) })
301+
lp.LogPoller.loader = loader
302+
lp.LogPoller.lastProcessedSlot = 100
303+
304+
ctx := t.Context()
305+
306+
address, err := solana.PublicKeyFromBase58("J1zQwrBNBngz26jRPNWsUSZMHJwBwpkoDitXRV95LdK4")
307+
require.NoError(t, err)
308+
addr := types.PublicKey(address)
309+
310+
// Five slots 101..105 — strictly larger than slotsBatchSize so scheduleBlocksFetching uses multiple batches.
311+
slots := []uint64{105, 104, 103, 102, 101}
312+
313+
lp.Filters.EXPECT().LoadFilters(mock.Anything).Return(nil).Once()
314+
lp.Filters.EXPECT().GetFiltersToBackfill().Return(nil).Once()
315+
lp.Filters.EXPECT().GetDistinctAddresses(mock.Anything).Return([]types.PublicKey{addr}, nil).Once()
316+
317+
lp.Client.EXPECT().SlotHeightWithCommitment(mock.Anything, rpc.CommitmentFinalized).Return(uint64(105), nil).Once()
318+
319+
lp.Client.EXPECT().GetSignaturesForAddressWithOpts(mock.Anything, mock.Anything, mock.Anything).
320+
RunAndReturn(func(_ context.Context, _ solana.PublicKey, opts *rpc.GetSignaturesForAddressOpts) ([]*rpc.TransactionSignature, error) {
321+
txSigsResponse := make([]*rpc.TransactionSignature, 0, len(slots))
322+
for _, slot := range slots {
323+
tx := &rpc.TransactionSignature{Slot: slot}
324+
txSigsResponse = append(txSigsResponse, tx)
325+
326+
}
327+
// add an extra signature with lower slot to signal that there the block range is fully processed
328+
txSigsResponse = append(txSigsResponse, &rpc.TransactionSignature{Slot: slots[len(slots)-1] - 1})
329+
return txSigsResponse, nil
330+
}).Once() // GetSignaturesForAddress should be called only once for the whole range, not once per batch.
331+
332+
blockTime := solana.UnixTimeSeconds(128)
333+
lp.Client.EXPECT().
334+
GetBlockWithOpts(mock.Anything, mock.Anything, mock.Anything).
335+
RunAndReturn(func(_ context.Context, slot uint64, _ *rpc.GetBlockOpts) (*rpc.GetBlockResult, error) {
336+
println(slot)
337+
require.Contains(t, slots, slot)
338+
return &rpc.GetBlockResult{
339+
Blockhash: solana.Hash{1, 2, 3},
340+
BlockHeight: &slot,
341+
BlockTime: &blockTime,
342+
}, nil
343+
}).Times(len(slots))
344+
345+
err = lp.LogPoller.run(ctx)
346+
require.NoError(t, err)
347+
require.Equal(t, int64(105), lp.LogPoller.lastProcessedSlot)
348+
}
349+
287350
func Test_GetLastProcessedSlot(t *testing.T) {
288351
ctx := t.Context()
289352

0 commit comments

Comments
 (0)