From 7ed7d033e1c25d43435d49420b2c2b8bd40a614d Mon Sep 17 00:00:00 2001 From: "Leo Zhang (zhangchiqing)" Date: Fri, 7 Nov 2025 11:21:57 -0800 Subject: [PATCH] optimize batch store same collection --- storage/store/collections.go | 21 ++++++++--- storage/store/collections_test.go | 61 +++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 5 deletions(-) diff --git a/storage/store/collections.go b/storage/store/collections.go index 14f19e1ae0a..5a837a02a2b 100644 --- a/storage/store/collections.go +++ b/storage/store/collections.go @@ -178,11 +178,8 @@ func (c *Collections) BatchStoreAndIndexByTransaction(lctx lockctx.Proof, collec light := collection.Light() collectionID := light.ID() - err := operation.UpsertCollection(rw.Writer(), light) - if err != nil { - return nil, fmt.Errorf("could not insert collection: %w", err) - } - + // First, check if all transactions are already indexed and consistent + someTransactionIndexed := false for _, txID := range light.Transactions { var differentColTxIsIn flow.Identifier // The following is not BFT, because we can't handle the case where a transaction is included @@ -205,6 +202,20 @@ func (c *Collections) BatchStoreAndIndexByTransaction(lctx lockctx.Proof, collec if err != nil { return nil, fmt.Errorf("could not insert transaction ID: %w", err) } + someTransactionIndexed = true + } + + if !someTransactionIndexed { + // All transactions are already indexed and point to this collection. + // Since the index is always added along with the collection and transactions, + // this means the collection and its transactions have already been stored. + // Abort early to avoid redundant database writes. + return light, nil + } + + err := operation.UpsertCollection(rw.Writer(), light) + if err != nil { + return nil, fmt.Errorf("could not insert collection: %w", err) } // Store individual transactions diff --git a/storage/store/collections_test.go b/storage/store/collections_test.go index 8ab091c67b4..45576fe3199 100644 --- a/storage/store/collections_test.go +++ b/storage/store/collections_test.go @@ -187,3 +187,64 @@ func TestCollections_ConcurrentIndexByTx(t *testing.T) { assert.True(t, indexedCollection.ID() == col1.ID() || indexedCollection.ID() == col2.ID(), "Expected one of the collections to be indexed") }) } + +// TestCollections_BatchStoreAndIndexByTransaction_EarlyAbort verifies that +// BatchStoreAndIndexByTransaction aborts early when all transactions are already +// indexed and point to the same collection, avoiding redundant database writes. +func TestCollections_BatchStoreAndIndexByTransaction_EarlyAbort(t *testing.T) { + dbtest.RunWithDB(t, func(t *testing.T, db storage.DB) { + lockManager := storage.NewTestingLockManager() + metrics := metrics.NewNoopCollector() + transactions := store.NewTransactions(metrics, db) + collections := store.NewCollections(db, transactions) + + // Create a collection with multiple transactions + collection := unittest.CollectionFixture(3) + expectedLight := collection.Light() + + // First, store the collection and index it by transaction + err := unittest.WithLock(t, lockManager, storage.LockInsertCollection, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + _, err := collections.BatchStoreAndIndexByTransaction(lctx, &collection, rw) + return err + }) + }) + require.NoError(t, err) + + // Verify the collection was stored + actualLight, err := collections.LightByID(collection.ID()) + require.NoError(t, err) + assert.Equal(t, expectedLight, actualLight) + + // Verify all transactions are indexed + for _, tx := range collection.Transactions { + collLight, err := collections.LightByTransactionID(tx.ID()) + require.NoError(t, err) + assert.Equal(t, collection.ID(), collLight.ID()) + } + + // Try to store the same collection again - should abort early + err = unittest.WithLock(t, lockManager, storage.LockInsertCollection, func(lctx lockctx.Context) error { + return db.WithReaderBatchWriter(func(rw storage.ReaderBatchWriter) error { + light, err := collections.BatchStoreAndIndexByTransaction(lctx, &collection, rw) + require.NoError(t, err) + // Should return the light collection without error + assert.Equal(t, expectedLight, light) + return err + }) + }) + require.NoError(t, err) + + // Verify the collection still exists and is unchanged + actualLight, err = collections.LightByID(collection.ID()) + require.NoError(t, err) + assert.Equal(t, expectedLight, actualLight) + + // Verify all transactions are still indexed correctly + for _, tx := range collection.Transactions { + collLight, err := collections.LightByTransactionID(tx.ID()) + require.NoError(t, err) + assert.Equal(t, collection.ID(), collLight.ID()) + } + }) +}