Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 61 additions & 6 deletions datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,14 +1140,56 @@ type Transaction struct {
mutations []map[string]any
}

// TransactionOption configures transaction behavior.
type TransactionOption interface {
apply(*transactionSettings)
}

type transactionSettings struct {
readTime time.Time
maxAttempts int
}

type maxAttemptsOption int

func (o maxAttemptsOption) apply(s *transactionSettings) {
s.maxAttempts = int(o)
}

// MaxAttempts returns a TransactionOption that specifies the maximum number
// of times a transaction should be attempted before giving up.
func MaxAttempts(n int) TransactionOption {
return maxAttemptsOption(n)
}

type readTimeOption struct {
t time.Time
}

func (o readTimeOption) apply(s *transactionSettings) {
s.readTime = o.t
}

// WithReadTime returns a TransactionOption that sets a specific timestamp
// at which to read data, enabling reading from a particular snapshot in time.
func WithReadTime(t time.Time) TransactionOption {
return readTimeOption{t: t}
}

// RunInTransaction runs a function in a transaction.
// The function should use the transaction's Get and Put methods.
// API compatible with cloud.google.com/go/datastore.
func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) error) (*Commit, error) {
const maxTxRetries = 3
func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) error, opts ...TransactionOption) (*Commit, error) {
settings := transactionSettings{
maxAttempts: 3, // default
}
for _, opt := range opts {
opt.apply(&settings)
}

var lastErr error

for attempt := range maxTxRetries {
for attempt := range settings.maxAttempts {
token, err := auth.AccessToken(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get access token: %w", err)
Expand All @@ -1159,6 +1201,19 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro
reqBody["databaseId"] = c.databaseID
}

// Add transaction options if needed
if !settings.readTime.IsZero() {
reqBody["transactionOptions"] = map[string]any{
"readOnly": map[string]any{
"readTime": settings.readTime.Format(time.RFC3339Nano),
},
}
} else {
reqBody["transactionOptions"] = map[string]any{
"readWrite": map[string]any{},
}
}

jsonData, err := json.Marshal(reqBody)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1237,13 +1292,13 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro
lastErr = err
c.logger.Warn("transaction aborted, will retry",
"attempt", attempt+1,
"max_attempts", maxTxRetries,
"max_attempts", settings.maxAttempts,
"has_409", is409,
"has_ABORTED", isAborted,
"error", err)

// Exponential backoff: 100ms, 200ms, 400ms
if attempt < maxTxRetries-1 {
if attempt < settings.maxAttempts-1 {
backoffMS := 100 * (1 << attempt)
c.logger.Debug("sleeping before retry", "backoff_ms", backoffMS)
time.Sleep(time.Duration(backoffMS) * time.Millisecond)
Expand All @@ -1256,7 +1311,7 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro
return nil, err
}

return nil, fmt.Errorf("transaction failed after %d attempts: %w", maxTxRetries, lastErr)
return nil, fmt.Errorf("transaction failed after %d attempts: %w", settings.maxAttempts, lastErr)
}

// Get retrieves an entity within the transaction.
Expand Down
70 changes: 70 additions & 0 deletions datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7264,3 +7264,73 @@ func TestRunInTransactionErrorReturnsNilCommit(t *testing.T) {
t.Errorf("Expected nil Commit on error, got %v", commit)
}
}

func TestTransactionOptions(t *testing.T) {
t.Run("MaxAttempts", func(t *testing.T) {
// Test that MaxAttempts option is accepted and sets the retry limit
// We can verify this by checking the error message mentions the right attempt count
client, cleanup := ds9mock.NewClient(t)
defer cleanup()

ctx := context.Background()
key := ds9.NameKey("TestKind", "test", nil)

// This test verifies that the MaxAttempts option is parsed correctly
// The actual retry behavior is tested in TestTransactionMaxRetriesExceeded
_, err := client.RunInTransaction(ctx, func(tx *ds9.Transaction) error {
entity := testEntity{Name: "test", Count: 42}
_, err := tx.Put(key, &entity)
return err
}, ds9.MaxAttempts(5))
// With mock client, this should succeed
if err != nil {
t.Fatalf("Transaction failed: %v", err)
}
})

t.Run("WithReadTime", func(t *testing.T) {
client, cleanup := ds9mock.NewClient(t)
defer cleanup()

ctx := context.Background()
key := ds9.NameKey("TestKind", "test", nil)

// First, put an entity
entity := testEntity{Name: "test", Count: 42}
_, err := client.Put(ctx, key, &entity)
if err != nil {
t.Fatalf("Put failed: %v", err)
}

// Run a read-only transaction with readTime
readTime := time.Now().UTC()
_, err = client.RunInTransaction(ctx, func(tx *ds9.Transaction) error {
var result testEntity
return tx.Get(key, &result)
}, ds9.WithReadTime(readTime))
// Note: ds9mock doesn't actually enforce read-only semantics,
// but we're testing that the option is accepted and doesn't cause errors
if err != nil {
t.Fatalf("Transaction with WithReadTime failed: %v", err)
}
})

t.Run("CombinedOptions", func(t *testing.T) {
client, cleanup := ds9mock.NewClient(t)
defer cleanup()

ctx := context.Background()
key := ds9.NameKey("TestKind", "test", nil)

// Test that multiple options can be combined
_, err := client.RunInTransaction(ctx, func(tx *ds9.Transaction) error {
entity := testEntity{Name: "test", Count: 42}
_, err := tx.Put(key, &entity)
return err
}, ds9.MaxAttempts(2), ds9.WithReadTime(time.Now().UTC()))
// With mock client, this should succeed
if err != nil {
t.Fatalf("Transaction with combined options failed: %v", err)
}
})
}
Loading