Skip to content

Commit 2534f84

Browse files
committed
Address PR comments
Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent ebed64e commit 2534f84

File tree

5 files changed

+80
-42
lines changed

5 files changed

+80
-42
lines changed

service/vc/database.go

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -174,15 +174,14 @@ func (db *database) queryVersionsIfPresent(ctx context.Context, nsID string, que
174174
start := time.Now()
175175
query := FmtNsID(queryVersionsSQLTempl, nsID)
176176

177-
foundKeysVersions, err := retryQueryAndReadTwoItems[[]byte, int64](ctx, db, query, queryKeys)
177+
foundKeys, foundVersions, err := retryQueryAndReadTwoItems[[]byte, int64](ctx, db, query, queryKeys)
178178
if err != nil {
179179
return nil, fmt.Errorf("failed to get keys' version from namespace [%s]: %w", nsID, err)
180180
}
181181

182182
kToV := make(keyToVersion)
183-
for _, keyVersion := range foundKeysVersions {
184-
//nolint:gosec // DB table is constraint to non-negative value.
185-
kToV[string(keyVersion.item1)] = uint64(keyVersion.item2)
183+
for i, key := range foundKeys {
184+
kToV[string(key)] = uint64(foundVersions[i]) //nolint:gosec // DB table is constraint to non-negative value.
186185
}
187186
promutil.Observe(db.metrics.databaseTxBatchQueryVersionLatencySeconds, time.Since(start))
188187

@@ -483,33 +482,33 @@ func (db *database) readStatusWithHeight(
483482
}
484483

485484
func (db *database) readNamespacePolicies(ctx context.Context) (*applicationpb.NamespacePolicies, error) {
486-
keysValues, err := retryQueryAndReadTwoItems[[]byte, []byte](ctx, db, queryPoliciesSQLStmt)
485+
keys, values, err := retryQueryAndReadTwoItems[[]byte, []byte](ctx, db, queryPoliciesSQLStmt)
487486
if err != nil {
488487
metaTable := TableName(committerpb.MetaNamespaceID)
489488
return nil, fmt.Errorf("failed to read the policies from table [%s]: %w", metaTable, err)
490489
}
491490
policy := &applicationpb.NamespacePolicies{
492-
Policies: make([]*applicationpb.PolicyItem, len(keysValues)),
491+
Policies: make([]*applicationpb.PolicyItem, len(keys)),
493492
}
494493

495-
for i, keyValue := range keysValues {
494+
for i, key := range keys {
496495
policy.Policies[i] = &applicationpb.PolicyItem{
497-
Namespace: string(keyValue.item1),
498-
Policy: keyValue.item2,
496+
Namespace: string(key),
497+
Policy: values[i],
499498
}
500499
}
501500
return policy, nil
502501
}
503502

504503
func (db *database) readConfigTX(ctx context.Context) (*applicationpb.ConfigTransaction, error) {
505-
keysValues, err := retryQueryAndReadTwoItems[[]byte, []byte](ctx, db, queryConfigSQLStmt)
504+
_, values, err := retryQueryAndReadTwoItems[[]byte, []byte](ctx, db, queryConfigSQLStmt)
506505
if err != nil {
507506
return nil, fmt.Errorf("failed to read the config transaction from table [%s]: %w",
508507
TableName(committerpb.ConfigNamespaceID), err)
509508
}
510509
configTX := &applicationpb.ConfigTransaction{}
511-
for _, v := range keysValues {
512-
configTX.Envelope = v.item2
510+
for _, v := range values {
511+
configTX.Envelope = v
513512
}
514513
return configTX, nil
515514
}
@@ -529,28 +528,37 @@ func retryQueryAndReadArrayResult[T any](
529528

530529
func retryQueryAndReadTwoItems[T1, T2 any](
531530
ctx context.Context, db *database, query string, args ...any,
532-
) ([]tuple[T1, T2], error) {
533-
return retry.ExecuteWithResult(ctx, db.retryProfile, func() ([]tuple[T1, T2], error) {
531+
) ([]T1, []T2, error) {
532+
res, err := retry.ExecuteWithResult(ctx, db.retryProfile, func() (*tuple[[]T1, []T2], error) {
534533
rows, queryErr := db.pool.Query(ctx, query, args...)
535534
if queryErr != nil {
536535
return nil, errors.Wrapf(queryErr, "query rows: query [%s]", query)
537536
}
538537
defer rows.Close()
539-
items, readErr := readTwoItems[T1, T2](rows)
540-
return items, errors.Wrapf(readErr, "read rows from the query [%s] results", query)
538+
items1, items2, readErr := readTwoItems[T1, T2](rows)
539+
return &tuple[[]T1, []T2]{
540+
item1: items1,
541+
item2: items2,
542+
}, errors.Wrapf(readErr, "read rows from the query [%s] results", query)
541543
})
544+
if err != nil {
545+
return nil, nil, err
546+
}
547+
return res.item1, res.item2, nil
542548
}
543549

544550
// readTwoItems reads two items from given rows.
545-
func readTwoItems[T1, T2 any](r pgx.Rows) (items []tuple[T1, T2], err error) {
551+
func readTwoItems[T1, T2 any](r pgx.Rows) (items1 []T1, items2 []T2, err error) {
546552
for r.Next() {
547-
var i tuple[T1, T2]
548-
if scanErr := r.Scan(&i.item1, &i.item2); scanErr != nil {
549-
return nil, errors.Wrap(scanErr, "failed while scanning a row")
553+
var i1 T1
554+
var i2 T2
555+
if err := r.Scan(&i1, &i2); err != nil {
556+
return nil, nil, errors.Wrap(err, "failed while scanning a row")
550557
}
551-
items = append(items, i)
558+
items1 = append(items1, i1)
559+
items2 = append(items2, i2)
552560
}
553-
return items, errors.Wrap(r.Err(), "failed while reading from rows")
561+
return items1, items2, errors.Wrap(r.Err(), "failed while reading from rows")
554562
}
555563

556564
func readArrayResult[T any](r pgx.Row) (res []T, err error) {

service/vc/validator_committer_service_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,9 +175,10 @@ func TestCreateConfigAndTables(t *testing.T) {
175175
rows, err := env.dbEnv.DB.pool.Query(ctx, fmt.Sprintf("select key, value from %s", TableName(utNsID)))
176176
require.NoError(t, err)
177177
defer rows.Close()
178-
keysValues, err := readTwoItems[[]byte, []byte](rows)
178+
keys, values, err := readTwoItems[[]byte, []byte](rows)
179179
require.NoError(t, err)
180-
require.Empty(t, keysValues)
180+
require.Empty(t, keys)
181+
require.Empty(t, values)
181182
}
182183

183184
func TestValidatorAndCommitterService(t *testing.T) {

utils/retry/executor.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func ExecuteSQL(ctx context.Context, p *Profile, e executor, sqlStmt string, arg
4545
_, err := executeWithResult(ctx, p, func() (any, error) {
4646
_, err := e.Exec(ctx, sqlStmt, args...)
4747
return nil, errors.Wrapf(err, "failed to execute the SQL statement [%s]", sqlStmt)
48-
})
48+
}, puddle.ErrClosedPool)
4949
return err
5050
}
5151

@@ -67,16 +67,20 @@ func WaitForCondition(ctx context.Context, p *Profile, condition func() bool) bo
6767
// It returns the result of the operation on success, or an error on timeout.
6868
// This is a generic version that can return any type T.
6969
// We skip 4 callers for logging to always report the calling method.
70-
func executeWithResult[T any](ctx context.Context, p *Profile, operation func() (T, error)) (T, error) {
70+
func executeWithResult[T any](
71+
ctx context.Context, p *Profile, operation func() (T, error), terminalErrors ...error,
72+
) (T, error) {
7173
p = p.WithDefaults()
7274
return backoff.Retry(ctx, func() (T, error) {
7375
res, err := operation()
7476
if err != nil {
7577
logger.WithOptions(zap.AddCallerSkip(4)).Warn(err)
7678
}
7779
// We identify common cases where retry isn't useful.
78-
if errors.Is(err, puddle.ErrClosedPool) {
79-
err = backoff.Permanent(err)
80+
for _, isErr := range terminalErrors {
81+
if errors.Is(err, isErr) {
82+
err = backoff.Permanent(err)
83+
}
8084
}
8185
return res, err
8286
}, backoff.WithBackOff(p.NewBackoff()), backoff.WithMaxElapsedTime(p.MaxElapsedTime))

utils/retry/sustain.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,13 @@ func Sustain(ctx context.Context, p *Profile, op func() error) error {
5454
return nil, errors.WithMessage(err, "sustained operation error")
5555
})
5656

57+
// The above call should retry whenever ErrBackOff is returned.
58+
// Thus, if the returned error is ErrBackOff, it means we reached the
59+
// maximal attempts (MaxElapsedTime).
60+
if errors.Is(err, ErrBackOff) {
61+
return err
62+
}
63+
5764
if errors.Is(err, ErrNonRetryable) {
5865
return err
5966
}

utils/retry/sustain_test.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ func TestSustain(t *testing.T) {
2020

2121
// Cases where operation runs continuously until context cancellation
2222
for _, tc := range []struct {
23-
name string
24-
operation func(callCount uint64) error
25-
profile *Profile
26-
cancelAfter time.Duration
27-
minCallCount uint64
23+
name string
24+
operation func(callCount uint64) error
25+
profile *Profile
26+
cancelAfter time.Duration
27+
minCallCount uint64
28+
errorContains string
2829
}{
2930
{
3031
name: "operation runs continuously returning nil until context cancelled",
@@ -33,8 +34,9 @@ func TestSustain(t *testing.T) {
3334
InitialInterval: 10 * time.Millisecond,
3435
MaxElapsedTime: 100 * time.Millisecond,
3536
},
36-
cancelAfter: 50 * time.Millisecond,
37-
minCallCount: 2,
37+
cancelAfter: 50 * time.Millisecond,
38+
minCallCount: 2,
39+
errorContains: "context has been cancelled",
3840
},
3941
{
4042
name: "operation recovers from backoff errors and continues running",
@@ -48,8 +50,9 @@ func TestSustain(t *testing.T) {
4850
InitialInterval: 10 * time.Millisecond,
4951
MaxElapsedTime: 500 * time.Millisecond,
5052
},
51-
cancelAfter: 200 * time.Millisecond,
52-
minCallCount: 4,
53+
cancelAfter: 200 * time.Millisecond,
54+
minCallCount: 4,
55+
errorContains: "context has been cancelled",
5356
},
5457
{
5558
name: "operation recovers from backoff errors and continues running and than back offs again",
@@ -63,8 +66,9 @@ func TestSustain(t *testing.T) {
6366
InitialInterval: 10 * time.Millisecond,
6467
MaxElapsedTime: 500 * time.Millisecond,
6568
},
66-
cancelAfter: 200 * time.Millisecond,
67-
minCallCount: 4,
69+
cancelAfter: 200 * time.Millisecond,
70+
minCallCount: 4,
71+
errorContains: "context has been cancelled",
6872
},
6973
{
7074
name: "other errors retry immediately without backoff",
@@ -78,8 +82,22 @@ func TestSustain(t *testing.T) {
7882
InitialInterval: 10 * time.Millisecond,
7983
MaxElapsedTime: 500 * time.Millisecond,
8084
},
81-
cancelAfter: 400 * time.Millisecond,
82-
minCallCount: 4,
85+
cancelAfter: 400 * time.Millisecond,
86+
minCallCount: 4,
87+
errorContains: "context has been cancelled",
88+
},
89+
{
90+
name: "max time elapsed before context cancelled",
91+
operation: func(uint64) error {
92+
return errors.Wrap(ErrBackOff, "transient error")
93+
},
94+
profile: &Profile{
95+
InitialInterval: 10 * time.Millisecond,
96+
MaxElapsedTime: 100 * time.Millisecond,
97+
},
98+
cancelAfter: time.Second,
99+
minCallCount: 2,
100+
errorContains: "transient error",
83101
},
84102
} {
85103
t.Run(tc.name, func(t *testing.T) {
@@ -95,7 +113,7 @@ func TestSustain(t *testing.T) {
95113
})
96114

97115
// Should run continuously until cancelled by context.
98-
require.ErrorContains(t, err, "context")
116+
require.ErrorContains(t, err, tc.errorContains)
99117
require.GreaterOrEqual(t, callCount, tc.minCallCount)
100118
})
101119
}

0 commit comments

Comments
 (0)