Skip to content

Commit 83c44a0

Browse files
authored
Retry SyncExec (#496)
* Retry SyncExec * Update to use httputil backoff * Remove attempts * Redundant params
1 parent 50925c1 commit 83c44a0

File tree

3 files changed

+66
-26
lines changed

3 files changed

+66
-26
lines changed

build-index/tagstore/store.go

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ type tagStore struct {
5959
fs FileStore
6060
backends *backend.Manager
6161
writeBackManager persistedretry.Manager
62+
63+
// writeBackStrategy determines how tags are written to backend storage.
64+
// Set at initialization based on WriteThrough config.
65+
writeBackStrategy func(task persistedretry.Task) error
6266
}
6367

6468
// New creates a new Store.
@@ -73,42 +77,33 @@ func New(
7377
"module": "tagstore",
7478
})
7579

76-
return &tagStore{
80+
s := &tagStore{
7781
config: config,
7882
fs: fs,
7983
backends: backends,
8084
writeBackManager: writeBackManager,
8185
}
86+
87+
// Set write-back strategy based on configuration
88+
if config.WriteThrough {
89+
s.writeBackStrategy = s.writeThroughStrategy
90+
} else {
91+
s.writeBackStrategy = s.asyncWriteBackStrategy
92+
}
93+
94+
return s
8295
}
8396

8497
func (s *tagStore) Put(tag string, d core.Digest, writeBackDelay time.Duration) error {
85-
log.With("tag", tag, "digest", d.String(), "writeback_delay", writeBackDelay).Debug("Storing tag to disk")
86-
8798
if err := s.writeTagToDisk(tag, d); err != nil {
88-
log.With("tag", tag, "digest", d.String(), "error", err).Error("Failed to write tag to disk")
8999
return fmt.Errorf("write tag to disk: %s", err)
90100
}
91101
if _, err := s.fs.SetCacheFileMetadata(tag, metadata.NewPersist(true)); err != nil {
92-
log.With("tag", tag, "digest", d.String(), "error", err).Error("Failed to set persist metadata")
93102
return fmt.Errorf("set persist metadata: %s", err)
94103
}
95104

96105
task := writeback.NewTask(tag, tag, writeBackDelay)
97-
if s.config.WriteThrough {
98-
log.With("tag", tag, "digest", d.String()).Debug("Using write-through mode for tag")
99-
if err := s.writeBackManager.SyncExec(task); err != nil {
100-
log.With("tag", tag, "digest", d.String(), "error", err).Error("Failed to sync write-back tag to backend")
101-
return fmt.Errorf("sync exec write-back task: %s", err)
102-
}
103-
log.With("tag", tag, "digest", d.String()).Info("Tag written to backend synchronously")
104-
} else {
105-
log.With("tag", tag, "digest", d.String(), "writeback_delay", writeBackDelay).Debug("Adding async write-back task for tag")
106-
if err := s.writeBackManager.Add(task); err != nil {
107-
log.With("tag", tag, "digest", d.String(), "error", err).Error("Failed to add write-back task")
108-
return fmt.Errorf("add write-back task: %s", err)
109-
}
110-
}
111-
return nil
106+
return s.writeBackStrategy(task)
112107
}
113108

114109
func (s *tagStore) Get(tag string) (d core.Digest, err error) {
@@ -125,6 +120,22 @@ func (s *tagStore) Get(tag string) (d core.Digest, err error) {
125120
return d, err
126121
}
127122

123+
// writeThroughStrategy writes tags synchronously to backend storage.
124+
func (s *tagStore) writeThroughStrategy(task persistedretry.Task) error {
125+
if err := s.writeBackManager.SyncExec(task); err != nil {
126+
return fmt.Errorf("sync exec write-back task: %s", err)
127+
}
128+
return nil
129+
}
130+
131+
// asyncWriteBackStrategy queues tags for asynchronous write-back to backend storage.
132+
func (s *tagStore) asyncWriteBackStrategy(task persistedretry.Task) error {
133+
if err := s.writeBackManager.Add(task); err != nil {
134+
return fmt.Errorf("add write-back task: %s", err)
135+
}
136+
return nil
137+
}
138+
128139
func (s *tagStore) writeTagToDisk(tag string, d core.Digest) error {
129140
buf := bytes.NewBufferString(d.String())
130141
if err := s.fs.CreateCacheFile(tag, buf); err != nil && !os.IsExist(err) {

lib/persistedretry/config.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@
1313
// limitations under the License.
1414
package persistedretry
1515

16-
import "time"
16+
import (
17+
"time"
18+
19+
"github.com/uber/kraken/utils/httputil"
20+
)
1721

1822
// Config defines Manager configuration.
1923
type Config struct {
@@ -32,6 +36,9 @@ type Config struct {
3236
// Interval at which retries should be polled from storage.
3337
PollRetriesInterval time.Duration `yaml:"poll_retries_interval"`
3438

39+
// Exponential backoff configuration for synchronous task execution.
40+
SyncRetryBackoff httputil.ExponentialBackOffConfig `yaml:"sync_retry_backoff"`
41+
3542
// Flags that zero-value channel sizes should not have defaults applied.
3643
Testing bool
3744
}
@@ -52,6 +59,17 @@ func (c Config) applyDefaults() Config {
5259
if c.RetryInterval == 0 {
5360
c.RetryInterval = 30 * time.Second
5461
}
62+
if !c.SyncRetryBackoff.Enabled {
63+
// Apply defaults for sync retry backoff to maintain backward compatibility
64+
c.SyncRetryBackoff = httputil.ExponentialBackOffConfig{
65+
Enabled: true,
66+
InitialInterval: 500 * time.Millisecond,
67+
RandomizationFactor: 0.05,
68+
Multiplier: 2,
69+
MaxInterval: 30 * time.Second,
70+
MaxRetries: 2, // 2 retries = 3 total attempts (1 initial + 2 retries)
71+
}
72+
}
5573
if !c.Testing {
5674
if c.IncomingBuffer == 0 {
5775
c.IncomingBuffer = 1000

lib/persistedretry/manager.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"sync"
2020
"time"
2121

22+
"github.com/cenkalti/backoff"
2223
"github.com/uber-go/tally"
2324
"go.uber.org/atomic"
2425

@@ -54,8 +55,8 @@ type manager struct {
5455

5556
// NewManager creates a new Manager.
5657
func NewManager(
57-
config Config, stats tally.Scope, store Store, executor Executor) (Manager, error) {
58-
58+
config Config, stats tally.Scope, store Store, executor Executor,
59+
) (Manager, error) {
5960
stats = stats.Tagged(map[string]string{
6061
"module": "persistedretry",
6162
"executor": executor.Name(),
@@ -143,10 +144,20 @@ func (m *manager) Add(t Task) error {
143144
return nil
144145
}
145146

146-
// SyncExec executes the task synchronously.
147-
// Tasks will NOT be added to the retry queue if fail.
147+
// SyncExec executes the task synchronously with retry logic.
148+
// Tasks will NOT be added to the retry queue if fail, but will be retried
149+
// in-place according to the configured SyncRetryBackoff.
148150
func (m *manager) SyncExec(t Task) error {
149-
return m.executor.Exec(t)
151+
bo := m.config.SyncRetryBackoff.Build()
152+
153+
operation := func() error {
154+
return m.executor.Exec(t)
155+
}
156+
157+
if err := backoff.Retry(operation, bo); err != nil {
158+
return fmt.Errorf("sync task failed: %w", err)
159+
}
160+
return nil
150161
}
151162

152163
// Close waits for all workers to exit current task.

0 commit comments

Comments
 (0)