Skip to content

Commit 8c56ad2

Browse files
Thomas StrombergThomas Stromberg
authored andcommitted
Additional option compatibility
1 parent 3671259 commit 8c56ad2

File tree

2 files changed

+131
-6
lines changed

2 files changed

+131
-6
lines changed

datastore.go

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1140,14 +1140,56 @@ type Transaction struct {
11401140
mutations []map[string]any
11411141
}
11421142

1143+
// TransactionOption configures transaction behavior.
1144+
type TransactionOption interface {
1145+
apply(*transactionSettings)
1146+
}
1147+
1148+
type transactionSettings struct {
1149+
readTime time.Time
1150+
maxAttempts int
1151+
}
1152+
1153+
type maxAttemptsOption int
1154+
1155+
func (o maxAttemptsOption) apply(s *transactionSettings) {
1156+
s.maxAttempts = int(o)
1157+
}
1158+
1159+
// MaxAttempts returns a TransactionOption that specifies the maximum number
1160+
// of times a transaction should be attempted before giving up.
1161+
func MaxAttempts(n int) TransactionOption {
1162+
return maxAttemptsOption(n)
1163+
}
1164+
1165+
type readTimeOption struct {
1166+
t time.Time
1167+
}
1168+
1169+
func (o readTimeOption) apply(s *transactionSettings) {
1170+
s.readTime = o.t
1171+
}
1172+
1173+
// WithReadTime returns a TransactionOption that sets a specific timestamp
1174+
// at which to read data, enabling reading from a particular snapshot in time.
1175+
func WithReadTime(t time.Time) TransactionOption {
1176+
return readTimeOption{t: t}
1177+
}
1178+
11431179
// RunInTransaction runs a function in a transaction.
11441180
// The function should use the transaction's Get and Put methods.
11451181
// API compatible with cloud.google.com/go/datastore.
1146-
func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) error) (*Commit, error) {
1147-
const maxTxRetries = 3
1182+
func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) error, opts ...TransactionOption) (*Commit, error) {
1183+
settings := transactionSettings{
1184+
maxAttempts: 3, // default
1185+
}
1186+
for _, opt := range opts {
1187+
opt.apply(&settings)
1188+
}
1189+
11481190
var lastErr error
11491191

1150-
for attempt := range maxTxRetries {
1192+
for attempt := range settings.maxAttempts {
11511193
token, err := auth.AccessToken(ctx)
11521194
if err != nil {
11531195
return nil, fmt.Errorf("failed to get access token: %w", err)
@@ -1159,6 +1201,19 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro
11591201
reqBody["databaseId"] = c.databaseID
11601202
}
11611203

1204+
// Add transaction options if needed
1205+
if !settings.readTime.IsZero() {
1206+
reqBody["transactionOptions"] = map[string]any{
1207+
"readOnly": map[string]any{
1208+
"readTime": settings.readTime.Format(time.RFC3339Nano),
1209+
},
1210+
}
1211+
} else {
1212+
reqBody["transactionOptions"] = map[string]any{
1213+
"readWrite": map[string]any{},
1214+
}
1215+
}
1216+
11621217
jsonData, err := json.Marshal(reqBody)
11631218
if err != nil {
11641219
return nil, err
@@ -1237,13 +1292,13 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro
12371292
lastErr = err
12381293
c.logger.Warn("transaction aborted, will retry",
12391294
"attempt", attempt+1,
1240-
"max_attempts", maxTxRetries,
1295+
"max_attempts", settings.maxAttempts,
12411296
"has_409", is409,
12421297
"has_ABORTED", isAborted,
12431298
"error", err)
12441299

12451300
// Exponential backoff: 100ms, 200ms, 400ms
1246-
if attempt < maxTxRetries-1 {
1301+
if attempt < settings.maxAttempts-1 {
12471302
backoffMS := 100 * (1 << attempt)
12481303
c.logger.Debug("sleeping before retry", "backoff_ms", backoffMS)
12491304
time.Sleep(time.Duration(backoffMS) * time.Millisecond)
@@ -1256,7 +1311,7 @@ func (c *Client) RunInTransaction(ctx context.Context, f func(*Transaction) erro
12561311
return nil, err
12571312
}
12581313

1259-
return nil, fmt.Errorf("transaction failed after %d attempts: %w", maxTxRetries, lastErr)
1314+
return nil, fmt.Errorf("transaction failed after %d attempts: %w", settings.maxAttempts, lastErr)
12601315
}
12611316

12621317
// Get retrieves an entity within the transaction.

datastore_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7264,3 +7264,73 @@ func TestRunInTransactionErrorReturnsNilCommit(t *testing.T) {
72647264
t.Errorf("Expected nil Commit on error, got %v", commit)
72657265
}
72667266
}
7267+
7268+
func TestTransactionOptions(t *testing.T) {
7269+
t.Run("MaxAttempts", func(t *testing.T) {
7270+
// Test that MaxAttempts option is accepted and sets the retry limit
7271+
// We can verify this by checking the error message mentions the right attempt count
7272+
client, cleanup := ds9mock.NewClient(t)
7273+
defer cleanup()
7274+
7275+
ctx := context.Background()
7276+
key := ds9.NameKey("TestKind", "test", nil)
7277+
7278+
// This test verifies that the MaxAttempts option is parsed correctly
7279+
// The actual retry behavior is tested in TestTransactionMaxRetriesExceeded
7280+
_, err := client.RunInTransaction(ctx, func(tx *ds9.Transaction) error {
7281+
entity := testEntity{Name: "test", Count: 42}
7282+
_, err := tx.Put(key, &entity)
7283+
return err
7284+
}, ds9.MaxAttempts(5))
7285+
// With mock client, this should succeed
7286+
if err != nil {
7287+
t.Fatalf("Transaction failed: %v", err)
7288+
}
7289+
})
7290+
7291+
t.Run("WithReadTime", func(t *testing.T) {
7292+
client, cleanup := ds9mock.NewClient(t)
7293+
defer cleanup()
7294+
7295+
ctx := context.Background()
7296+
key := ds9.NameKey("TestKind", "test", nil)
7297+
7298+
// First, put an entity
7299+
entity := testEntity{Name: "test", Count: 42}
7300+
_, err := client.Put(ctx, key, &entity)
7301+
if err != nil {
7302+
t.Fatalf("Put failed: %v", err)
7303+
}
7304+
7305+
// Run a read-only transaction with readTime
7306+
readTime := time.Now().UTC()
7307+
_, err = client.RunInTransaction(ctx, func(tx *ds9.Transaction) error {
7308+
var result testEntity
7309+
return tx.Get(key, &result)
7310+
}, ds9.WithReadTime(readTime))
7311+
// Note: ds9mock doesn't actually enforce read-only semantics,
7312+
// but we're testing that the option is accepted and doesn't cause errors
7313+
if err != nil {
7314+
t.Fatalf("Transaction with WithReadTime failed: %v", err)
7315+
}
7316+
})
7317+
7318+
t.Run("CombinedOptions", func(t *testing.T) {
7319+
client, cleanup := ds9mock.NewClient(t)
7320+
defer cleanup()
7321+
7322+
ctx := context.Background()
7323+
key := ds9.NameKey("TestKind", "test", nil)
7324+
7325+
// Test that multiple options can be combined
7326+
_, err := client.RunInTransaction(ctx, func(tx *ds9.Transaction) error {
7327+
entity := testEntity{Name: "test", Count: 42}
7328+
_, err := tx.Put(key, &entity)
7329+
return err
7330+
}, ds9.MaxAttempts(2), ds9.WithReadTime(time.Now().UTC()))
7331+
// With mock client, this should succeed
7332+
if err != nil {
7333+
t.Fatalf("Transaction with combined options failed: %v", err)
7334+
}
7335+
})
7336+
}

0 commit comments

Comments
 (0)