From 8c56ad2166d7f53de7b36ac9170ebfa9245350b9 Mon Sep 17 00:00:00 2001 From: Thomas Stromberg Date: Tue, 28 Oct 2025 19:47:01 -0400 Subject: [PATCH] Additional option compatibility --- datastore.go | 67 +++++++++++++++++++++++++++++++++++++++++---- datastore_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 6 deletions(-) diff --git a/datastore.go b/datastore.go index f68476c..1b56079 100644 --- a/datastore.go +++ b/datastore.go @@ -1140,14 +1140,56 @@ type Transaction struct { mutations []map[string]any } +// TransactionOption configures transaction behavior. +type TransactionOption interface { + apply(*transactionSettings) +} + +type transactionSettings struct { + readTime time.Time + maxAttempts int +} + +type maxAttemptsOption int + +func (o maxAttemptsOption) apply(s *transactionSettings) { + s.maxAttempts = int(o) +} + +// MaxAttempts returns a TransactionOption that specifies the maximum number +// of times a transaction should be attempted before giving up. +func MaxAttempts(n int) TransactionOption { + return maxAttemptsOption(n) +} + +type readTimeOption struct { + t time.Time +} + +func (o readTimeOption) apply(s *transactionSettings) { + s.readTime = o.t +} + +// WithReadTime returns a TransactionOption that sets a specific timestamp +// at which to read data, enabling reading from a particular snapshot in time. +func WithReadTime(t time.Time) TransactionOption { + return readTimeOption{t: t} +} + // RunInTransaction runs a function in a transaction. // The function should use the transaction's Get and Put methods. // API compatible with cloud.google.com/go/datastore. -func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) error) (*Commit, error) { - const maxTxRetries = 3 +func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) error, opts ...TransactionOption) (*Commit, error) { + settings := transactionSettings{ + maxAttempts: 3, // default + } + for _, opt := range opts { + opt.apply(&settings) + } + var lastErr error - for attempt := range maxTxRetries { + for attempt := range settings.maxAttempts { token, err := auth.AccessToken(ctx) if err != nil { return nil, fmt.Errorf("failed to get access token: %w", err) @@ -1159,6 +1201,19 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro reqBody["databaseId"] = c.databaseID } + // Add transaction options if needed + if !settings.readTime.IsZero() { + reqBody["transactionOptions"] = map[string]any{ + "readOnly": map[string]any{ + "readTime": settings.readTime.Format(time.RFC3339Nano), + }, + } + } else { + reqBody["transactionOptions"] = map[string]any{ + "readWrite": map[string]any{}, + } + } + jsonData, err := json.Marshal(reqBody) if err != nil { return nil, err @@ -1237,13 +1292,13 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro lastErr = err c.logger.Warn("transaction aborted, will retry", "attempt", attempt+1, - "max_attempts", maxTxRetries, + "max_attempts", settings.maxAttempts, "has_409", is409, "has_ABORTED", isAborted, "error", err) // Exponential backoff: 100ms, 200ms, 400ms - if attempt < maxTxRetries-1 { + if attempt < settings.maxAttempts-1 { backoffMS := 100 * (1 << attempt) c.logger.Debug("sleeping before retry", "backoff_ms", backoffMS) time.Sleep(time.Duration(backoffMS) * time.Millisecond) @@ -1256,7 +1311,7 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro return nil, err } - return nil, fmt.Errorf("transaction failed after %d attempts: %w", maxTxRetries, lastErr) + return nil, fmt.Errorf("transaction failed after %d attempts: %w", settings.maxAttempts, lastErr) } // Get retrieves an entity within the transaction. diff --git a/datastore_test.go b/datastore_test.go index eeb4eb0..806475f 100644 --- a/datastore_test.go +++ b/datastore_test.go @@ -7264,3 +7264,73 @@ func TestRunInTransactionErrorReturnsNilCommit(t *testing.T) { t.Errorf("Expected nil Commit on error, got %v", commit) } } + +func TestTransactionOptions(t *testing.T) { + t.Run("MaxAttempts", func(t *testing.T) { + // Test that MaxAttempts option is accepted and sets the retry limit + // We can verify this by checking the error message mentions the right attempt count + client, cleanup := ds9mock.NewClient(t) + defer cleanup() + + ctx := context.Background() + key := ds9.NameKey("TestKind", "test", nil) + + // This test verifies that the MaxAttempts option is parsed correctly + // The actual retry behavior is tested in TestTransactionMaxRetriesExceeded + _, err := client.RunInTransaction(ctx, func(tx *ds9.Transaction) error { + entity := testEntity{Name: "test", Count: 42} + _, err := tx.Put(key, &entity) + return err + }, ds9.MaxAttempts(5)) + // With mock client, this should succeed + if err != nil { + t.Fatalf("Transaction failed: %v", err) + } + }) + + t.Run("WithReadTime", func(t *testing.T) { + client, cleanup := ds9mock.NewClient(t) + defer cleanup() + + ctx := context.Background() + key := ds9.NameKey("TestKind", "test", nil) + + // First, put an entity + entity := testEntity{Name: "test", Count: 42} + _, err := client.Put(ctx, key, &entity) + if err != nil { + t.Fatalf("Put failed: %v", err) + } + + // Run a read-only transaction with readTime + readTime := time.Now().UTC() + _, err = client.RunInTransaction(ctx, func(tx *ds9.Transaction) error { + var result testEntity + return tx.Get(key, &result) + }, ds9.WithReadTime(readTime)) + // Note: ds9mock doesn't actually enforce read-only semantics, + // but we're testing that the option is accepted and doesn't cause errors + if err != nil { + t.Fatalf("Transaction with WithReadTime failed: %v", err) + } + }) + + t.Run("CombinedOptions", func(t *testing.T) { + client, cleanup := ds9mock.NewClient(t) + defer cleanup() + + ctx := context.Background() + key := ds9.NameKey("TestKind", "test", nil) + + // Test that multiple options can be combined + _, err := client.RunInTransaction(ctx, func(tx *ds9.Transaction) error { + entity := testEntity{Name: "test", Count: 42} + _, err := tx.Put(key, &entity) + return err + }, ds9.MaxAttempts(2), ds9.WithReadTime(time.Now().UTC())) + // With mock client, this should succeed + if err != nil { + t.Fatalf("Transaction with combined options failed: %v", err) + } + }) +}