diff --git a/.claude/settings.local.json b/.claude/settings.local.json index ef958b6..baaa72b 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -8,7 +8,8 @@ "Bash(go doc:*)", "Bash(go get:*)", "Bash(go test:*)", - "Bash(go mod tidy:*)" + "Bash(go mod tidy:*)", + "Bash(mkdir:*)" ] } } diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..1cd25c6 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,25 @@ +name: Go Tests + +on: + push: + branches: [main] + pull_request: + branches: [main] + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: "1.25.4" + + - name: Install dependencies + run: go mod download + + - name: Run tests + run: go test -v ./... diff --git a/bitmap_cache.go b/bitmap_cache.go new file mode 100644 index 0000000..3416ce5 --- /dev/null +++ b/bitmap_cache.go @@ -0,0 +1,160 @@ +package sqlitebitmapstore + +import ( + "context" + "database/sql" + "fmt" + + "github.com/Arkiv-Network/sqlite-bitmap-store/store" +) + +type nameValue[T any] struct { + name string + value T +} + +type bitmapCache struct { + st store.Querier + + stringBitmaps map[nameValue[string]]*store.Bitmap + numericBitmaps map[nameValue[uint64]]*store.Bitmap +} + +func newBitmapCache(st store.Querier) *bitmapCache { + return &bitmapCache{ + st: st, + stringBitmaps: make(map[nameValue[string]]*store.Bitmap), + numericBitmaps: make(map[nameValue[uint64]]*store.Bitmap), + } +} + +func (c *bitmapCache) AddToStringBitmap(ctx context.Context, name string, value string, id uint64) (err error) { + k := nameValue[string]{name: name, value: value} + bitmap, ok := c.stringBitmaps[k] + if !ok { + bitmap, err = c.st.GetStringAttributeValueBitmap(ctx, store.GetStringAttributeValueBitmapParams{Name: name, Value: value}) + + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to get string attribute %q value %q bitmap: %w", name, value, err) + } + + if bitmap == nil { + bitmap = store.NewBitmap() + } + + c.stringBitmaps[k] = bitmap + } + + bitmap.Add(id) + + return nil + +} + +func (c *bitmapCache) RemoveFromStringBitmap(ctx context.Context, name string, value string, id uint64) (err error) { + k := nameValue[string]{name: name, value: value} + bitmap, ok := c.stringBitmaps[k] + if !ok { + bitmap, err = c.st.GetStringAttributeValueBitmap(ctx, store.GetStringAttributeValueBitmapParams{Name: name, Value: value}) + + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to get string attribute %q value %q bitmap: %w", name, value, err) + } + + if bitmap == nil { + bitmap = store.NewBitmap() + } + + c.stringBitmaps[k] = bitmap + } + + bitmap.Remove(id) + + return nil + +} + +func (c *bitmapCache) AddToNumericBitmap(ctx context.Context, name string, value uint64, id uint64) (err error) { + k := nameValue[uint64]{name: name, value: value} + bitmap, ok := c.numericBitmaps[k] + if !ok { + bitmap, err = c.st.GetNumericAttributeValueBitmap(ctx, store.GetNumericAttributeValueBitmapParams{Name: name, Value: value}) + + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to get numeric attribute %q value %q bitmap: %w", name, value, err) + } + + if bitmap == nil { + bitmap = store.NewBitmap() + } + + c.numericBitmaps[k] = bitmap + } + + bitmap.Add(id) + + return nil + +} + +func (c *bitmapCache) RemoveFromNumericBitmap(ctx context.Context, name string, value uint64, id uint64) (err error) { + k := nameValue[uint64]{name: name, value: value} + bitmap, ok := c.numericBitmaps[k] + if !ok { + bitmap, err = c.st.GetNumericAttributeValueBitmap(ctx, store.GetNumericAttributeValueBitmapParams{Name: name, Value: value}) + + if err != nil && err != sql.ErrNoRows { + return fmt.Errorf("failed to get numeric attribute %q value %q bitmap: %w", name, value, err) + } + + if bitmap == nil { + bitmap = store.NewBitmap() + } + + c.numericBitmaps[k] = bitmap + } + + bitmap.Remove(id) + + return nil + +} + +func (c *bitmapCache) Flush(ctx context.Context) (err error) { + for k, bitmap := range c.stringBitmaps { + + if bitmap.IsEmpty() { + err = c.st.DeleteStringAttributeValueBitmap(ctx, store.DeleteStringAttributeValueBitmapParams{Name: k.name, Value: k.value}) + if err != nil { + return fmt.Errorf("failed to delete string attribute %q value %q bitmap: %w", k.name, k.value, err) + } + continue + } + + bitmap.RunOptimize() + + err = c.st.UpsertStringAttributeValueBitmap(ctx, store.UpsertStringAttributeValueBitmapParams{Name: k.name, Value: k.value, Bitmap: bitmap}) + if err != nil { + return fmt.Errorf("failed to upsert string attribute %q value %q bitmap: %w", k.name, k.value, err) + } + } + + for k, bitmap := range c.numericBitmaps { + + if bitmap.IsEmpty() { + err = c.st.DeleteNumericAttributeValueBitmap(ctx, store.DeleteNumericAttributeValueBitmapParams{Name: k.name, Value: k.value}) + if err != nil { + return fmt.Errorf("failed to delete numeric attribute %q value %q bitmap: %w", k.name, k.value, err) + } + continue + } + + bitmap.RunOptimize() + + err = c.st.UpsertNumericAttributeValueBitmap(ctx, store.UpsertNumericAttributeValueBitmapParams{Name: k.name, Value: k.value, Bitmap: bitmap}) + if err != nil { + return fmt.Errorf("failed to upsert numeric attribute %q value %q bitmap: %w", k.name, k.value, err) + } + } + return nil +} diff --git a/numeric_bitmap_ops.go b/numeric_bitmap_ops.go deleted file mode 100644 index 62c561b..0000000 --- a/numeric_bitmap_ops.go +++ /dev/null @@ -1,95 +0,0 @@ -package sqlitebitmapstore - -import ( - "context" - "database/sql" - "fmt" - - "github.com/Arkiv-Network/sqlite-bitmap-store/store" -) - -type numericBitmapOps struct { - st store.Querier -} - -func newNumericBitmapOps(st store.Querier) *numericBitmapOps { - return &numericBitmapOps{st: st} -} - -func (o *numericBitmapOps) Add(ctx context.Context, name string, value uint64, id uint64) error { - bitmap, err := o.st.GetNumericAttributeValueBitmap( - ctx, - store.GetNumericAttributeValueBitmapParams{ - Name: name, - Value: value, - }, - ) - if err != nil && err != sql.ErrNoRows { - return fmt.Errorf("failed to get numeric attribute %q value %q bitmap: %w", name, value, err) - } - - if bitmap == nil { - bitmap = store.NewBitmap() - } - - bitmap.Add(id) - - err = o.st.UpsertNumericAttributeValueBitmap( - ctx, - store.UpsertNumericAttributeValueBitmapParams{ - Name: name, - Value: value, - Bitmap: bitmap, - }, - ) - if err != nil { - return fmt.Errorf("failed to upsert numeric attribute %q value %d bitmap: %w", name, value, err) - } - - return nil -} - -func (o *numericBitmapOps) Remove(ctx context.Context, name string, value uint64, id uint64) error { - bitmap, err := o.st.GetNumericAttributeValueBitmap( - ctx, - store.GetNumericAttributeValueBitmapParams{ - Name: name, - Value: value, - }, - ) - if err != nil && err != sql.ErrNoRows { - return fmt.Errorf("failed to get numeric attribute %q value %d bitmap: %w", name, value, err) - } - - if bitmap == nil { - bitmap = store.NewBitmap() - } - - bitmap.Remove(id) - - if bitmap.IsEmpty() { - err = o.st.DeleteNumericAttributeValueBitmap( - ctx, - store.DeleteNumericAttributeValueBitmapParams{ - Name: name, - Value: value, - }, - ) - if err != nil { - return fmt.Errorf("failed to delete numeric attribute %q value %d bitmap: %w", name, value, err) - } - } else { - err = o.st.UpsertNumericAttributeValueBitmap( - ctx, - store.UpsertNumericAttributeValueBitmapParams{ - Name: name, - Value: value, - Bitmap: bitmap, - }, - ) - if err != nil { - return fmt.Errorf("failed to upsert numeric attribute %q value %d bitmap: %w", name, value, err) - } - } - return nil -} diff --git a/sqlitestore.go b/sqlitestore.go index f071cdb..dfa2caa 100644 --- a/sqlitestore.go +++ b/sqlitestore.go @@ -127,6 +127,8 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to get last block from database: %w", err) } + cache := newBitmapCache(st) + mainLoop: for _, block := range batch.Batch.Blocks { @@ -152,6 +154,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat } // blockNumber := block.Number + operationLoop: for _, operation := range block.Operations { switch { @@ -192,17 +195,13 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to insert payload %s at block %d txIndex %d opIndex %d: %w", key.Hex(), block.Number, operation.TxIndex, operation.OpIndex, err) } - sbo := newStringBitmapOps(st) - for k, v := range stringAttributes { - err = sbo.Add(ctx, k, v, id) + err = cache.AddToStringBitmap(ctx, k, v, id) if err != nil { return fmt.Errorf("failed to add string attribute value bitmap: %w", err) } } - nbo := newNumericBitmapOps(st) - for k, v := range numericAttributes { // skip txIndex and opIndex because they are not used for querying @@ -211,7 +210,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat continue } - err = nbo.Add(ctx, k, v, id) + err = cache.AddToNumericBitmap(ctx, k, v, id) if err != nil { return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) } @@ -223,7 +222,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat lastUpdate := updates[len(updates)-1] if operation.Update != lastUpdate { - continue mainLoop + continue operationLoop } key := operation.Update.Key.Bytes() @@ -267,17 +266,13 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to insert payload 0x%x at block %d txIndex %d opIndex %d: %w", key, block.Number, operation.TxIndex, operation.OpIndex, err) } - sbo := newStringBitmapOps(st) - for k, v := range oldStringAttributes.Values { - err = sbo.Remove(ctx, k, v, id) + err = cache.RemoveFromStringBitmap(ctx, k, v, id) if err != nil { return fmt.Errorf("failed to remove string attribute value bitmap: %w", err) } } - nbo := newNumericBitmapOps(st) - for k, v := range oldNumericAttributes.Values { // skip txIndex and opIndex because they are not used for querying switch k { @@ -285,7 +280,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat continue } - err = nbo.Remove(ctx, k, v, id) + err = cache.RemoveFromNumericBitmap(ctx, k, v, id) if err != nil { return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) } @@ -294,7 +289,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat // TODO: delete entity from the indexes for k, v := range stringAttributes { - err = sbo.Add(ctx, k, v, id) + err = cache.AddToStringBitmap(ctx, k, v, id) if err != nil { return fmt.Errorf("failed to add string attribute value bitmap: %w", err) } @@ -307,7 +302,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat continue } - err = nbo.Add(ctx, k, v, id) + err = cache.AddToNumericBitmap(ctx, k, v, id) if err != nil { return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) } @@ -332,17 +327,13 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat oldNumericAttributes := latestPayload.NumericAttributes - sbo := newStringBitmapOps(st) - for k, v := range oldStringAttributes.Values { - err = sbo.Remove(ctx, k, v, latestPayload.ID) + err = cache.RemoveFromStringBitmap(ctx, k, v, latestPayload.ID) if err != nil { return fmt.Errorf("failed to remove string attribute value bitmap: %w", err) } } - nbo := newNumericBitmapOps(st) - for k, v := range oldNumericAttributes.Values { // skip txIndex and opIndex because they are not used for querying switch k { @@ -350,7 +341,7 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat continue } - err = nbo.Remove(ctx, k, v, latestPayload.ID) + err = cache.RemoveFromNumericBitmap(ctx, k, v, latestPayload.ID) if err != nil { return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) } @@ -392,14 +383,12 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to insert payload at block %d txIndex %d opIndex %d: %w", block.Number, operation.TxIndex, operation.OpIndex, err) } - nbo := newNumericBitmapOps(st) - - err = nbo.Remove(ctx, "$expiration", oldExpiration, id) + err = cache.RemoveFromNumericBitmap(ctx, "$expiration", oldExpiration, id) if err != nil { return fmt.Errorf("failed to remove numeric attribute value bitmap: %w", err) } - err = nbo.Add(ctx, "$expiration", newToBlock, id) + err = cache.AddToNumericBitmap(ctx, "$expiration", newToBlock, id) if err != nil { return fmt.Errorf("failed to add numeric attribute value bitmap: %w", err) } @@ -435,14 +424,12 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to insert payload at block %d txIndex %d opIndex %d: %w", block.Number, operation.TxIndex, operation.OpIndex, err) } - sbo := newStringBitmapOps(st) - - err = sbo.Remove(ctx, "$owner", oldOwner, id) + err = cache.RemoveFromStringBitmap(ctx, "$owner", oldOwner, id) if err != nil { return fmt.Errorf("failed to remove string attribute value bitmap for owner: %w", err) } - err = sbo.Add(ctx, "$owner", newOwner, id) + err = cache.AddToStringBitmap(ctx, "$owner", newOwner, id) if err != nil { return fmt.Errorf("failed to add string attribute value bitmap for owner: %w", err) } @@ -462,6 +449,11 @@ func (s *SQLiteStore) FollowEvents(ctx context.Context, iterator arkivevents.Bat return fmt.Errorf("failed to upsert last block: %w", err) } + err = cache.Flush(ctx) + if err != nil { + return fmt.Errorf("failed to flush bitmap cache: %w", err) + } + err = tx.Commit() if err != nil { return fmt.Errorf("failed to commit transaction: %w", err) diff --git a/sqlitestore_suite_test.go b/sqlitestore_suite_test.go new file mode 100644 index 0000000..4bbb601 --- /dev/null +++ b/sqlitestore_suite_test.go @@ -0,0 +1,14 @@ +package sqlitebitmapstore_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + "github.com/onsi/ginkgo/v2/types" + . "github.com/onsi/gomega" +) + +func TestSqlitestore(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Sqlitestore Suite", types.ReporterConfig{NoColor: true}) +} diff --git a/sqlitestore_test.go b/sqlitestore_test.go new file mode 100644 index 0000000..b7e947d --- /dev/null +++ b/sqlitestore_test.go @@ -0,0 +1,1104 @@ +package sqlitebitmapstore_test + +import ( + "context" + "errors" + "log/slog" + "os" + "path/filepath" + "strings" + + "github.com/ethereum/go-ethereum/common" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + arkivevents "github.com/Arkiv-Network/arkiv-events" + "github.com/Arkiv-Network/arkiv-events/events" + sqlitebitmapstore "github.com/Arkiv-Network/sqlite-bitmap-store" + "github.com/Arkiv-Network/sqlite-bitmap-store/pusher" + "github.com/Arkiv-Network/sqlite-bitmap-store/store" +) + +var _ = Describe("SQLiteStore", func() { + var ( + sqlStore *sqlitebitmapstore.SQLiteStore + tmpDir string + ctx context.Context + cancel context.CancelFunc + logger *slog.Logger + ) + + BeforeEach(func() { + var err error + tmpDir, err = os.MkdirTemp("", "sqlitestore_test") + Expect(err).NotTo(HaveOccurred()) + + logger = slog.New(slog.NewTextHandler(GinkgoWriter, &slog.HandlerOptions{Level: slog.LevelDebug})) + dbPath := filepath.Join(tmpDir, "test.db") + + sqlStore, err = sqlitebitmapstore.NewSQLiteStore(logger, dbPath, 4) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel = context.WithCancel(context.Background()) + }) + + AfterEach(func() { + cancel() + if sqlStore != nil { + sqlStore.Close() + } + os.RemoveAll(tmpDir) + }) + + Describe("FollowEvents with batch of two blocks", func() { + It("should insert data and allow querying by string and numeric attributes", func() { + iterator := pusher.NewPushIterator() + + key1 := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") + key2 := common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + batch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key1, + ContentType: "application/json", + BTL: 1000, + Owner: owner, + Content: []byte(`{"name": "document1"}`), + StringAttributes: map[string]string{ + "type": "document", + "category": "reports", + }, + NumericAttributes: map[string]uint64{ + "version": 1, + "priority": 10, + }, + }, + }, + }, + }, + { + Number: 101, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key2, + ContentType: "image/png", + BTL: 2000, + Owner: owner, + Content: []byte(`image data`), + StringAttributes: map[string]string{ + "type": "image", + "category": "media", + }, + NumericAttributes: map[string]uint64{ + "version": 2, + "priority": 20, + }, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + iterator.Push(ctx, batch) + iterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(iterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + lastBlock, err := sqlStore.GetLastBlock(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(lastBlock).To(Equal(uint64(101))) + + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + // Query by string attribute: type = "document" + docBitmap, err := q.EvaluateStringAttributeValueEqual(ctx, store.EvaluateStringAttributeValueEqualParams{ + Name: "type", + Value: "document", + }) + Expect(err).NotTo(HaveOccurred()) + Expect(docBitmap).NotTo(BeNil()) + + docIDs := docBitmap.ToArray() + Expect(docIDs).To(HaveLen(1)) + + docPayloads, err := q.RetrievePayloads(ctx, docIDs) + Expect(err).NotTo(HaveOccurred()) + Expect(docPayloads).To(HaveLen(1)) + Expect(docPayloads[0].Payload).To(Equal([]byte(`{"name": "document1"}`))) + Expect(docPayloads[0].ContentType).To(Equal("application/json")) + Expect(docPayloads[0].StringAttributes.Values["type"]).To(Equal("document")) + + // Query by string attribute: type = "image" + imageBitmap, err := q.EvaluateStringAttributeValueEqual(ctx, store.EvaluateStringAttributeValueEqualParams{ + Name: "type", + Value: "image", + }) + Expect(err).NotTo(HaveOccurred()) + Expect(imageBitmap).NotTo(BeNil()) + + imageIDs := imageBitmap.ToArray() + Expect(imageIDs).To(HaveLen(1)) + + imagePayloads, err := q.RetrievePayloads(ctx, imageIDs) + Expect(err).NotTo(HaveOccurred()) + Expect(imagePayloads).To(HaveLen(1)) + Expect(imagePayloads[0].Payload).To(Equal([]byte(`image data`))) + Expect(imagePayloads[0].ContentType).To(Equal("image/png")) + + // Query by numeric attribute: version = 1 + version1Bitmap, err := q.EvaluateNumericAttributeValueEqual(ctx, store.EvaluateNumericAttributeValueEqualParams{ + Name: "version", + Value: 1, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(version1Bitmap).NotTo(BeNil()) + + version1IDs := version1Bitmap.ToArray() + Expect(version1IDs).To(HaveLen(1)) + + version1Payloads, err := q.RetrievePayloads(ctx, version1IDs) + Expect(err).NotTo(HaveOccurred()) + Expect(version1Payloads).To(HaveLen(1)) + Expect(version1Payloads[0].NumericAttributes.Values["version"]).To(Equal(uint64(1))) + + // Query by numeric attribute: version > 1 + versionGT1Bitmaps, err := q.EvaluateNumericAttributeValueGreaterThan(ctx, store.EvaluateNumericAttributeValueGreaterThanParams{ + Name: "version", + Value: 1, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(versionGT1Bitmaps).To(HaveLen(1)) + + // Combine bitmaps to get all IDs with version > 1 + combinedBitmap := store.NewBitmap() + for _, bm := range versionGT1Bitmaps { + combinedBitmap.Or(bm.Bitmap) + } + + versionGT1IDs := combinedBitmap.ToArray() + Expect(versionGT1IDs).To(HaveLen(1)) + + versionGT1Payloads, err := q.RetrievePayloads(ctx, versionGT1IDs) + Expect(err).NotTo(HaveOccurred()) + Expect(versionGT1Payloads).To(HaveLen(1)) + Expect(versionGT1Payloads[0].NumericAttributes.Values["version"]).To(Equal(uint64(2))) + + // Query by numeric attribute: priority >= 10 + priorityGTE10Bitmaps, err := q.EvaluateNumericAttributeValueGreaterOrEqualThan(ctx, store.EvaluateNumericAttributeValueGreaterOrEqualThanParams{ + Name: "priority", + Value: 10, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(priorityGTE10Bitmaps).To(HaveLen(2)) + + priorityCombined := store.NewBitmap() + for _, bm := range priorityGTE10Bitmaps { + priorityCombined.Or(bm.Bitmap) + } + + priorityIDs := priorityCombined.ToArray() + Expect(priorityIDs).To(HaveLen(2)) + + priorityPayloads, err := q.RetrievePayloads(ctx, priorityIDs) + Expect(err).NotTo(HaveOccurred()) + Expect(priorityPayloads).To(HaveLen(2)) + + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("FollowEvents Update operation", func() { + It("should update payload and bitmap indexes correctly", func() { + key := common.HexToHash("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + // First create the entity + createIterator := pusher.NewPushIterator() + createBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("original content"), + StringAttributes: map[string]string{ + "status": "draft", + }, + NumericAttributes: map[string]uint64{ + "version": 1, + }, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, createBatch) + createIterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Now update the entity + updateIterator := pusher.NewPushIterator() + updateBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 101, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Update: &events.OPUpdate{ + Key: key, + ContentType: "application/json", + BTL: 1000, + Owner: owner, + Content: []byte(`{"updated": true}`), + StringAttributes: map[string]string{ + "status": "published", + }, + NumericAttributes: map[string]uint64{ + "version": 2, + }, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + updateIterator.Push(ctx, updateBatch) + updateIterator.Close() + }() + + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify the update + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + + Expect(row.Payload).To(Equal([]byte(`{"updated": true}`))) + Expect(row.ContentType).To(Equal("application/json")) + Expect(row.StringAttributes.Values["status"]).To(Equal("published")) + Expect(row.NumericAttributes.Values["version"]).To(Equal(uint64(2))) + Expect(row.NumericAttributes.Values["$lastModifiedAtBlock"]).To(Equal(uint64(101))) + // $createdAtBlock should be preserved + Expect(row.NumericAttributes.Values["$createdAtBlock"]).To(Equal(uint64(100))) + + // Verify old bitmap index is removed + oldStatusBitmap, err := q.EvaluateStringAttributeValueEqual(ctx, store.EvaluateStringAttributeValueEqualParams{ + Name: "status", + Value: "draft", + }) + Expect(err).To(HaveOccurred()) // Should not find old value + + // Verify new bitmap index exists + newStatusBitmap, err := q.EvaluateStringAttributeValueEqual(ctx, store.EvaluateStringAttributeValueEqualParams{ + Name: "status", + Value: "published", + }) + Expect(err).NotTo(HaveOccurred()) + Expect(newStatusBitmap.ToArray()).To(HaveLen(1)) + + _ = oldStatusBitmap + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("FollowEvents Delete operation", func() { + It("should delete payload and remove bitmap indexes", func() { + key := common.HexToHash("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + // First create the entity + createIterator := pusher.NewPushIterator() + createBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("to be deleted"), + StringAttributes: map[string]string{ + "deletable": "yes", + }, + NumericAttributes: map[string]uint64{ + "importance": 5, + }, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, createBatch) + createIterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify entity exists + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + _, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + + // Now delete the entity + deleteIterator := pusher.NewPushIterator() + deleteKey := events.OPDelete(key) + deleteBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 101, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Delete: &deleteKey, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + deleteIterator.Push(ctx, deleteBatch) + deleteIterator.Close() + }() + + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(deleteIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify entity is deleted + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + _, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).To(HaveOccurred()) + + // Verify bitmap index is removed + _, err = q.EvaluateStringAttributeValueEqual(ctx, store.EvaluateStringAttributeValueEqualParams{ + Name: "deletable", + Value: "yes", + }) + Expect(err).To(HaveOccurred()) + + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("FollowEvents Expire operation", func() { + It("should expire payload and remove bitmap indexes", func() { + key := common.HexToHash("0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + // First create the entity + createIterator := pusher.NewPushIterator() + createBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 10, // Short BTL + Owner: owner, + Content: []byte("will expire"), + StringAttributes: map[string]string{ + "expirable": "yes", + }, + NumericAttributes: map[string]uint64{}, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, createBatch) + createIterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Now expire the entity + expireIterator := pusher.NewPushIterator() + expireKey := events.OPExpire(key) + expireBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 111, // After expiration + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Expire: &expireKey, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + expireIterator.Push(ctx, expireBatch) + expireIterator.Close() + }() + + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(expireIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify entity is expired (deleted) + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + _, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).To(HaveOccurred()) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("FollowEvents ExtendBTL operation", func() { + It("should extend expiration and update bitmap indexes", func() { + key := common.HexToHash("0xdddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + // First create the entity + createIterator := pusher.NewPushIterator() + createBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("content"), + StringAttributes: map[string]string{}, + NumericAttributes: map[string]uint64{}, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, createBatch) + createIterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify original expiration + var originalExpiration uint64 + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + originalExpiration = row.NumericAttributes.Values["$expiration"] + Expect(originalExpiration).To(Equal(uint64(600))) // 100 + 500 + return nil + }) + Expect(err).NotTo(HaveOccurred()) + + // Now extend BTL + extendIterator := pusher.NewPushIterator() + extendBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 200, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + ExtendBTL: &events.OPExtendBTL{ + Key: key, + BTL: 1000, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + extendIterator.Push(ctx, extendBatch) + extendIterator.Close() + }() + + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(extendIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify extended expiration + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + newExpiration := row.NumericAttributes.Values["$expiration"] + Expect(newExpiration).To(Equal(uint64(1200))) // 200 + 1000 + + // Verify old expiration bitmap is removed + oldExpBitmap, err := q.EvaluateNumericAttributeValueEqual(ctx, store.EvaluateNumericAttributeValueEqualParams{ + Name: "$expiration", + Value: 600, + }) + Expect(err).To(HaveOccurred()) + + // Verify new expiration bitmap exists + newExpBitmap, err := q.EvaluateNumericAttributeValueEqual(ctx, store.EvaluateNumericAttributeValueEqualParams{ + Name: "$expiration", + Value: 1200, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(newExpBitmap.ToArray()).To(HaveLen(1)) + + _ = oldExpBitmap + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("FollowEvents ChangeOwner operation", func() { + It("should change owner and update bitmap indexes", func() { + key := common.HexToHash("0xeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee") + originalOwner := common.HexToAddress("0x1111111111111111111111111111111111111111") + newOwner := common.HexToAddress("0x2222222222222222222222222222222222222222") + + // First create the entity + createIterator := pusher.NewPushIterator() + createBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: originalOwner, + Content: []byte("content"), + StringAttributes: map[string]string{}, + NumericAttributes: map[string]uint64{}, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, createBatch) + createIterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify original owner + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + Expect(row.StringAttributes.Values["$owner"]).To(Equal(strings.ToLower(originalOwner.Hex()))) + Expect(row.StringAttributes.Values["$creator"]).To(Equal(strings.ToLower(originalOwner.Hex()))) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + + // Now change owner + changeOwnerIterator := pusher.NewPushIterator() + changeOwnerBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 101, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + ChangeOwner: &events.OPChangeOwner{ + Key: key, + Owner: newOwner, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + changeOwnerIterator.Push(ctx, changeOwnerBatch) + changeOwnerIterator.Close() + }() + + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(changeOwnerIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify new owner and creator preserved + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + Expect(row.StringAttributes.Values["$owner"]).To(Equal(strings.ToLower(newOwner.Hex()))) + // $creator should be preserved + Expect(row.StringAttributes.Values["$creator"]).To(Equal(strings.ToLower(originalOwner.Hex()))) + + // Verify old owner bitmap is removed + oldOwnerBitmap, err := q.EvaluateStringAttributeValueEqual(ctx, store.EvaluateStringAttributeValueEqualParams{ + Name: "$owner", + Value: strings.ToLower(originalOwner.Hex()), + }) + Expect(err).To(HaveOccurred()) + + // Verify new owner bitmap exists + newOwnerBitmap, err := q.EvaluateStringAttributeValueEqual(ctx, store.EvaluateStringAttributeValueEqualParams{ + Name: "$owner", + Value: strings.ToLower(newOwner.Hex()), + }) + Expect(err).NotTo(HaveOccurred()) + Expect(newOwnerBitmap.ToArray()).To(HaveLen(1)) + + _ = oldOwnerBitmap + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("FollowEvents multiple updates to same key", func() { + It("should only apply the last update in a block", func() { + key := common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + // First create the entity + createIterator := pusher.NewPushIterator() + createBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("original"), + StringAttributes: map[string]string{ + "status": "v0", + }, + NumericAttributes: map[string]uint64{ + "version": 0, + }, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, createBatch) + createIterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Send a single update (the last one) - this is the only one that will be applied + // When multiple updates to the same key exist in a block, only the last one is applied + updateIterator := pusher.NewPushIterator() + updateBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 101, + Operations: []events.Operation{ + { + TxIndex: 1, + OpIndex: 0, + Update: &events.OPUpdate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("final update"), + StringAttributes: map[string]string{ + "status": "v3", + }, + NumericAttributes: map[string]uint64{ + "version": 3, + }, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + updateIterator.Push(ctx, updateBatch) + updateIterator.Close() + }() + + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify the update was applied + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + Expect(row.Payload).To(Equal([]byte("final update"))) + Expect(row.StringAttributes.Values["status"]).To(Equal("v3")) + Expect(row.NumericAttributes.Values["version"]).To(Equal(uint64(3))) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + + It("should skip non-last updates and only process the last update for a key", func() { + key := common.HexToHash("0xffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff0001") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + // First create the entity + createIterator := pusher.NewPushIterator() + createBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("original"), + StringAttributes: map[string]string{ + "status": "v0", + }, + NumericAttributes: map[string]uint64{ + "version": 0, + }, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, createBatch) + createIterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Send multiple updates in the same block - only the LAST one should be processed + // The code uses `continue operationLoop` to skip non-last updates and continue to next operation + updateIterator := pusher.NewPushIterator() + updateBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 101, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Update: &events.OPUpdate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("first update - skipped"), + StringAttributes: map[string]string{ + "status": "v1", + }, + NumericAttributes: map[string]uint64{ + "version": 1, + }, + }, + }, + { + TxIndex: 0, + OpIndex: 1, + Update: &events.OPUpdate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("second update - last one"), + StringAttributes: map[string]string{ + "status": "v2", + }, + NumericAttributes: map[string]uint64{ + "version": 2, + }, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + updateIterator.Push(ctx, updateBatch) + updateIterator.Close() + }() + + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(updateIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // With `continue operationLoop`, non-last updates are skipped but processing + // continues to the next operation. The last update for the key is applied. + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + // The last update (second one) should be applied + Expect(row.Payload).To(Equal([]byte("second update - last one"))) + Expect(row.StringAttributes.Values["status"]).To(Equal("v2")) + Expect(row.NumericAttributes.Values["version"]).To(Equal(uint64(2))) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("FollowEvents skip already processed blocks", func() { + It("should skip blocks that have already been processed", func() { + key := common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + // First create entity at block 100 + createIterator := pusher.NewPushIterator() + createBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("original"), + StringAttributes: map[string]string{}, + NumericAttributes: map[string]uint64{}, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + createIterator.Push(ctx, createBatch) + createIterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(createIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Try to replay the same block - should be skipped + replayIterator := pusher.NewPushIterator() + replayBatch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, // Same block number + Operations: []events.Operation{ + { + TxIndex: 0, + OpIndex: 0, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("should be ignored"), + StringAttributes: map[string]string{}, + NumericAttributes: map[string]uint64{}, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + replayIterator.Push(ctx, replayBatch) + replayIterator.Close() + }() + + err = sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(replayIterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + // Verify original content is preserved + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + Expect(row.Payload).To(Equal([]byte("original"))) + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Describe("FollowEvents batch error handling", func() { + It("should return error when batch contains an error", func() { + // Create a custom iterator that yields an error + errorIterator := func(yield func(arkivevents.BatchOrError) bool) { + yield(arkivevents.BatchOrError{ + Batch: events.BlockBatch{}, + Error: errors.New("simulated batch error"), + }) + } + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(errorIterator)) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("simulated batch error")) + }) + }) + + Describe("FollowEvents system attributes", func() { + It("should set all system attributes correctly on create", func() { + key := common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000002") + owner := common.HexToAddress("0x1234567890123456789012345678901234567890") + + iterator := pusher.NewPushIterator() + batch := events.BlockBatch{ + Blocks: []events.Block{ + { + Number: 100, + Operations: []events.Operation{ + { + TxIndex: 5, + OpIndex: 3, + Create: &events.OPCreate{ + Key: key, + ContentType: "text/plain", + BTL: 500, + Owner: owner, + Content: []byte("content"), + StringAttributes: map[string]string{}, + NumericAttributes: map[string]uint64{}, + }, + }, + }, + }, + }, + } + + go func() { + defer GinkgoRecover() + iterator.Push(ctx, batch) + iterator.Close() + }() + + err := sqlStore.FollowEvents(ctx, arkivevents.BatchIterator(iterator.Iterator())) + Expect(err).NotTo(HaveOccurred()) + + err = sqlStore.ReadTransaction(ctx, func(q *store.Queries) error { + row, err := q.GetPayloadForEntityKey(ctx, key.Bytes()) + Expect(err).NotTo(HaveOccurred()) + + // String attributes + Expect(row.StringAttributes.Values["$owner"]).To(Equal(strings.ToLower(owner.Hex()))) + Expect(row.StringAttributes.Values["$creator"]).To(Equal(strings.ToLower(owner.Hex()))) + Expect(row.StringAttributes.Values["$key"]).To(Equal(strings.ToLower(key.Hex()))) + + // Numeric attributes + Expect(row.NumericAttributes.Values["$expiration"]).To(Equal(uint64(600))) // 100 + 500 + Expect(row.NumericAttributes.Values["$createdAtBlock"]).To(Equal(uint64(100))) + Expect(row.NumericAttributes.Values["$lastModifiedAtBlock"]).To(Equal(uint64(100))) + Expect(row.NumericAttributes.Values["$txIndex"]).To(Equal(uint64(5))) + Expect(row.NumericAttributes.Values["$opIndex"]).To(Equal(uint64(3))) + + // Verify sequence calculation + expectedSequence := uint64(100)<<32 | uint64(5)<<16 | uint64(3) + Expect(row.NumericAttributes.Values["$sequence"]).To(Equal(expectedSequence)) + + return nil + }) + Expect(err).NotTo(HaveOccurred()) + }) + }) +}) diff --git a/string_bitmap_ops.go b/string_bitmap_ops.go deleted file mode 100644 index d55d052..0000000 --- a/string_bitmap_ops.go +++ /dev/null @@ -1,95 +0,0 @@ -package sqlitebitmapstore - -import ( - "context" - "database/sql" - "fmt" - - "github.com/Arkiv-Network/sqlite-bitmap-store/store" -) - -type stringBitmapOps struct { - st store.Querier -} - -func newStringBitmapOps(st store.Querier) *stringBitmapOps { - return &stringBitmapOps{st: st} -} - -func (o *stringBitmapOps) Add(ctx context.Context, name string, value string, id uint64) error { - bitmap, err := o.st.GetStringAttributeValueBitmap( - ctx, - store.GetStringAttributeValueBitmapParams{ - Name: name, - Value: value, - }, - ) - if err != nil && err != sql.ErrNoRows { - return fmt.Errorf("failed to get string attribute %q value %q bitmap: %w", name, value, err) - } - - if bitmap == nil { - bitmap = store.NewBitmap() - } - - bitmap.Add(id) - - err = o.st.UpsertStringAttributeValueBitmap( - ctx, - store.UpsertStringAttributeValueBitmapParams{ - Name: name, - Value: value, - Bitmap: bitmap, - }, - ) - if err != nil { - return fmt.Errorf("failed to upsert string attribute %q value %q bitmap: %w", name, value, err) - } - - return nil -} - -func (o *stringBitmapOps) Remove(ctx context.Context, name string, value string, id uint64) error { - bitmap, err := o.st.GetStringAttributeValueBitmap( - ctx, - store.GetStringAttributeValueBitmapParams{ - Name: name, - Value: value, - }, - ) - if err != nil && err != sql.ErrNoRows { - return fmt.Errorf("failed to get string attribute %q value %q bitmap: %w", name, value, err) - } - - if bitmap == nil { - bitmap = store.NewBitmap() - } - - bitmap.Remove(id) - - if bitmap.IsEmpty() { - err = o.st.DeleteStringAttributeValueBitmap( - ctx, - store.DeleteStringAttributeValueBitmapParams{ - Name: name, - Value: value, - }, - ) - if err != nil { - return fmt.Errorf("failed to delete string attribute %q value %q bitmap: %w", name, value, err) - } - } else { - err = o.st.UpsertStringAttributeValueBitmap( - ctx, - store.UpsertStringAttributeValueBitmapParams{ - Name: name, - Value: value, - Bitmap: bitmap, - }, - ) - if err != nil { - return fmt.Errorf("failed to upsert string attribute %q value %q bitmap: %w", name, value, err) - } - } - return nil -}