Skip to content

Commit f9f5e06

Browse files
authored
feat(cli): store remote digest in local state for concurrency (#1136)
Signed-off-by: Miguel <[email protected]>
1 parent e57234c commit f9f5e06

File tree

11 files changed

+108
-58
lines changed

11 files changed

+108
-58
lines changed

app/cli/cmd/attestation_add.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,15 @@ import (
1919
"errors"
2020
"fmt"
2121
"os"
22+
"time"
2223

2324
"github.com/spf13/cobra"
2425
"github.com/spf13/viper"
2526
"google.golang.org/grpc"
2627

28+
"github.com/cenkalti/backoff/v4"
2729
"github.com/chainloop-dev/chainloop/app/cli/internal/action"
30+
v1 "github.com/chainloop-dev/chainloop/app/controlplane/api/controlplane/v1"
2831
schemaapi "github.com/chainloop-dev/chainloop/app/controlplane/api/workflowcontract/v1"
2932
)
3033

@@ -85,18 +88,34 @@ func newAttestationAddCmd() *cobra.Command {
8588
return err
8689
}
8790

88-
if err := a.Run(cmd.Context(), attestationID, name, value, kind, annotations); err != nil {
89-
if errors.Is(err, action.ErrAttestationNotInitialized) {
90-
return err
91-
}
91+
// In some cases, the attestation state is stored remotely. To control concurrency we use
92+
// optimistic locking. We retry the operation if the state has changed since we last read it.
93+
return backoff.RetryNotify(
94+
func() error {
95+
if err := a.Run(cmd.Context(), attestationID, name, value, kind, annotations); err != nil {
96+
if errors.Is(err, action.ErrAttestationNotInitialized) {
97+
return err
98+
}
9299

93-
return newGracefulError(err)
94-
}
100+
// We want to retry if the error is a conflict
101+
if v1.IsAttestationStateErrorConflict(err) {
102+
return err
103+
}
95104

96-
logger.Info().Msg("material added to attestation")
105+
// if it's another kind of error we want to stop retrying
106+
return backoff.Permanent(newGracefulError(err))
107+
}
97108

98-
return nil
109+
logger.Info().Msg("material added to attestation")
110+
111+
return nil
112+
},
113+
backoff.NewExponentialBackOff(backoff.WithMaxElapsedTime(10*time.Second)),
114+
func(err error, delay time.Duration) {
115+
logger.Err(err).Msgf("retrying in %s", delay)
116+
})
99117
},
118+
100119
PostRunE: func(cmd *cobra.Command, args []string) error {
101120
if artifactCASConn != nil {
102121
return artifactCASConn.Close()

app/cli/internal/action/action.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,15 @@ func newCrafter(enableRemoteState bool, conn *grpc.ClientConn, opts ...crafter.N
4343
var stateManager crafter.StateManager
4444
var err error
4545

46+
// run opts to extract logger
47+
c := &crafter.Crafter{}
48+
for _, opt := range opts {
49+
_ = opt(c)
50+
}
51+
4652
switch enableRemoteState {
4753
case true:
48-
stateManager, err = remote.New(pb.NewAttestationStateServiceClient(conn))
54+
stateManager, err = remote.New(pb.NewAttestationStateServiceClient(conn), c.Logger)
4955
case false:
5056
stateManager, err = filesystem.New(filepath.Join(os.TempDir(), "chainloop-attestation.tmp.json"))
5157
}

app/cli/internal/action/attestation_status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (action *AttestationStatus) Run(ctx context.Context, attestationID string)
107107
// 1. Populate the materials that are defined in the contract schema
108108
// 2. Populate the materials that are not defined in the contract schema, added inline in the attestation
109109
// In order to avoid duplicates, we keep track of the visited materials
110-
if err := populateMaterials(c.CraftingState, res); err != nil {
110+
if err := populateMaterials(c.CraftingState.CraftingState, res); err != nil {
111111
return nil, fmt.Errorf("populating materials: %w", err)
112112
}
113113

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ require (
1414
github.com/aws/aws-sdk-go-v2/service/secretsmanager v1.28.6
1515
github.com/aws/aws-sdk-go-v2/service/sso v1.20.4
1616
github.com/aws/smithy-go v1.20.2
17-
github.com/cenkalti/backoff/v4 v4.2.1
17+
github.com/cenkalti/backoff/v4 v4.3.0
1818
github.com/coreos/go-oidc/v3 v3.10.0
1919
github.com/docker/distribution v2.8.3+incompatible
2020
github.com/docker/go-connections v0.4.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -293,8 +293,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH
293293
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
294294
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
295295
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
296-
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
297-
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
296+
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
297+
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
298298
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
299299
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
300300
github.com/census-instrumentation/opencensus-proto v0.4.1 h1:iKLQ0xPNFxR/2hzXZMrBo8f1j86j5WHzznCCQxV/b8g=

internal/attestation/crafter/crafter.go

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,18 @@ type StateManager interface {
4747
// Check if the state is already initialized
4848
Initialized(ctx context.Context, key string) (bool, error)
4949
// Write the state to the manager backend
50-
Write(ctx context.Context, key string, state *api.CraftingState) error
50+
Write(ctx context.Context, key string, state *VersionedCraftingState) error
5151
// Read the state from the manager backend
52-
Read(ctx context.Context, key string, state *api.CraftingState) error
52+
Read(ctx context.Context, key string, state *VersionedCraftingState) error
5353
// Reset/Delete the state
5454
Reset(ctx context.Context, key string) error
5555
// String returns a string representation of the state manager
5656
Info(ctx context.Context, key string) string
5757
}
5858

5959
type Crafter struct {
60-
logger *zerolog.Logger
61-
CraftingState *api.CraftingState
60+
Logger *zerolog.Logger
61+
CraftingState *VersionedCraftingState
6262
Runner SupportedRunner
6363
workingDir string
6464
stateManager StateManager
@@ -67,13 +67,19 @@ type Crafter struct {
6767
validator *protovalidate.Validator
6868
}
6969

70+
type VersionedCraftingState struct {
71+
*api.CraftingState
72+
// This digest is used to verify the integrity of the state during updates
73+
UpdateCheckSum string
74+
}
75+
7076
var ErrAttestationStateNotLoaded = errors.New("crafting state not loaded")
7177

7278
type NewOpt func(c *Crafter) error
7379

7480
func WithLogger(l *zerolog.Logger) NewOpt {
7581
return func(c *Crafter) error {
76-
c.logger = l
82+
c.Logger = l
7783
return nil
7884
}
7985
}
@@ -108,7 +114,7 @@ func NewCrafter(stateManager StateManager, opts ...NewOpt) (*Crafter, error) {
108114

109115
cw, _ := os.Getwd()
110116
c := &Crafter{
111-
logger: &noopLogger,
117+
Logger: &noopLogger,
112118
workingDir: cw,
113119
stateManager: stateManager,
114120
// By default we authenticate with the current user's keychain (i.e ~/.docker/config.json)
@@ -225,11 +231,13 @@ func (c *Crafter) initCraftingStateFile(
225231
return fmt.Errorf("initializing crafting state: %w", err)
226232
}
227233

228-
if err := c.stateManager.Write(ctx, attestationID, state); err != nil {
234+
// newState doesn't have a digest to check against
235+
newState := &VersionedCraftingState{CraftingState: state}
236+
if err := c.stateManager.Write(ctx, attestationID, newState); err != nil {
229237
return fmt.Errorf("failed to persist crafting state: %w", err)
230238
}
231239

232-
c.logger.Debug().Str("state", c.stateManager.Info(ctx, attestationID)).Msg("created state file")
240+
c.Logger.Debug().Str("state", c.stateManager.Info(ctx, attestationID)).Msg("created state file")
233241

234242
return c.LoadCraftingState(ctx, attestationID)
235243
}
@@ -240,9 +248,9 @@ func (c *Crafter) Reset(ctx context.Context, stateID string) error {
240248
}
241249

242250
func (c *Crafter) LoadCraftingState(ctx context.Context, attestationID string) error {
243-
c.logger.Debug().Str("state", c.stateManager.Info(ctx, attestationID)).Msg("loading state")
251+
c.Logger.Debug().Str("state", c.stateManager.Info(ctx, attestationID)).Msg("loading state")
244252

245-
c.CraftingState = &api.CraftingState{}
253+
c.CraftingState = &VersionedCraftingState{CraftingState: &api.CraftingState{}}
246254

247255
if err := c.stateManager.Read(ctx, attestationID, c.CraftingState); err != nil {
248256
return fmt.Errorf("failed to load crafting state: %w", err)
@@ -255,7 +263,7 @@ func (c *Crafter) LoadCraftingState(ctx context.Context, attestationID string) e
255263
}
256264

257265
c.Runner = NewRunner(runnerType)
258-
c.logger.Debug().Str("state", c.stateManager.Info(ctx, attestationID)).Msg("loaded state")
266+
c.Logger.Debug().Str("state", c.stateManager.Info(ctx, attestationID)).Msg("loaded state")
259267

260268
return nil
261269
}
@@ -413,7 +421,7 @@ func (c *Crafter) ResolveEnvVars(ctx context.Context, attestationID string) erro
413421
}
414422

415423
// Runner specific environment variables
416-
c.logger.Debug().Str("runnerType", c.Runner.ID().String()).Msg("loading runner specific env variables")
424+
c.Logger.Debug().Str("runnerType", c.Runner.ID().String()).Msg("loading runner specific env variables")
417425
if !c.Runner.CheckEnv() {
418426
errorStr := fmt.Sprintf("couldn't detect the environment %q. Is the crafting process happening in the target env?", c.Runner.ID().String())
419427
return fmt.Errorf("%s - %w", errorStr, ErrRunnerContextNotFound)
@@ -424,7 +432,7 @@ func (c *Crafter) ResolveEnvVars(ctx context.Context, attestationID string) erro
424432
for index, envVarDef := range c.Runner.ListEnvVars() {
425433
varNames[index] = envVarDef.Name
426434
}
427-
c.logger.Debug().Str("runnerType", c.Runner.ID().String()).Strs("variables", varNames).Msg("list of env variables to automatically extract")
435+
c.Logger.Debug().Str("runnerType", c.Runner.ID().String()).Strs("variables", varNames).Msg("list of env variables to automatically extract")
428436

429437
outputEnvVars, errors := c.Runner.ResolveEnvVars()
430438
if len(errors) > 0 {
@@ -437,7 +445,7 @@ func (c *Crafter) ResolveEnvVars(ctx context.Context, attestationID string) erro
437445

438446
// User-defined environment vars
439447
if len(c.CraftingState.InputSchema.EnvAllowList) > 0 {
440-
c.logger.Debug().Strs("allowList", c.CraftingState.InputSchema.EnvAllowList).Msg("loading env variables")
448+
c.Logger.Debug().Strs("allowList", c.CraftingState.InputSchema.EnvAllowList).Msg("loading env variables")
441449
}
442450
for _, want := range c.CraftingState.InputSchema.EnvAllowList {
443451
val := os.Getenv(want)
@@ -501,7 +509,7 @@ func (c *Crafter) AddMaterialFromContract(ctx context.Context, attestationID, ke
501509

502510
// 2 - Check that it has not been set yet and warn of override
503511
if _, found := c.CraftingState.Attestation.Materials[key]; found {
504-
c.logger.Info().Str("key", key).Str("value", value).Msg("material already set, overriding it")
512+
c.Logger.Info().Str("key", key).Str("value", value).Msg("material already set, overriding it")
505513
}
506514

507515
// 3 - Craft resulting material
@@ -518,7 +526,7 @@ func (c *Crafter) AddMaterialContactFreeAutomatic(ctx context.Context, attestati
518526
return kind, nil
519527
}
520528

521-
c.logger.Debug().Err(err).Str("kind", kind.String()).Msg("failed to add material")
529+
c.Logger.Debug().Err(err).Str("kind", kind.String()).Msg("failed to add material")
522530

523531
// Handle base error for upload and craft errors except the opening file error
524532
var policyError *policies.PolicyError
@@ -534,7 +542,7 @@ func (c *Crafter) AddMaterialContactFreeAutomatic(ctx context.Context, attestati
534542
// addMaterials adds the incoming material m to the crafting state
535543
func (c *Crafter) addMaterial(ctx context.Context, m *schemaapi.CraftingSchema_Material, attestationID, value string, casBackend *casclient.CASBackend, runtimeAnnotations map[string]string) error {
536544
// 3- Craft resulting material
537-
mt, err := materials.Craft(context.Background(), m, value, casBackend, c.ociRegistryAuth, c.logger)
545+
mt, err := materials.Craft(context.Background(), m, value, casBackend, c.ociRegistryAuth, c.Logger)
538546
if err != nil {
539547
return err
540548
}
@@ -551,7 +559,7 @@ func (c *Crafter) addMaterial(ctx context.Context, m *schemaapi.CraftingSchema_M
551559
mt.Annotations[kr] = vr
552560
} else {
553561
// NOTE: we do not allow overriding values that come from the contract
554-
c.logger.Info().Str("key", m.Name).Str("annotation", kr).Msg("annotation can't be changed, skipping")
562+
c.Logger.Info().Str("key", m.Name).Str("annotation", kr).Msg("annotation can't be changed, skipping")
555563
}
556564
}
557565

@@ -573,13 +581,13 @@ func (c *Crafter) addMaterial(ctx context.Context, m *schemaapi.CraftingSchema_M
573581
}
574582

575583
// Validate policies
576-
pv := policies.NewPolicyVerifier(c.CraftingState.InputSchema, c.logger)
584+
pv := policies.NewPolicyVerifier(c.CraftingState.InputSchema, c.Logger)
577585
policyResults, err := pv.VerifyMaterial(ctx, mt, value)
578586
if err != nil {
579587
return fmt.Errorf("error applying policies to material: %w", err)
580588
}
581589
// log policy violations
582-
policies.LogPolicyViolations(policyResults, c.logger)
590+
policies.LogPolicyViolations(policyResults, c.Logger)
583591
// store policy results
584592
c.CraftingState.Attestation.PolicyEvaluations = append(c.CraftingState.Attestation.PolicyEvaluations, policyResults...)
585593

@@ -594,7 +602,7 @@ func (c *Crafter) addMaterial(ctx context.Context, m *schemaapi.CraftingSchema_M
594602
return fmt.Errorf("failed to persist crafting state: %w", err)
595603
}
596604

597-
c.logger.Debug().Str("key", m.Name).Msg("added to state")
605+
c.Logger.Debug().Str("key", m.Name).Msg("added to state")
598606
return nil
599607
}
600608

internal/attestation/crafter/statemanager/filesystem/filesystem.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
"io"
2222
"os"
2323

24-
v1 "github.com/chainloop-dev/chainloop/internal/attestation/crafter/api/attestation/v1"
24+
"github.com/chainloop-dev/chainloop/internal/attestation/crafter"
2525
"github.com/chainloop-dev/chainloop/internal/attestation/crafter/statemanager"
2626
"google.golang.org/protobuf/encoding/protojson"
2727
)
@@ -55,7 +55,7 @@ func (l *Filesystem) Initialized(_ context.Context, _ string) (bool, error) {
5555
return false, nil
5656
}
5757

58-
func (l *Filesystem) Write(_ context.Context, _ string, state *v1.CraftingState) error {
58+
func (l *Filesystem) Write(_ context.Context, _ string, state *crafter.VersionedCraftingState) error {
5959
if state == nil {
6060
return fmt.Errorf("state cannot be nil")
6161
}
@@ -81,7 +81,7 @@ func (l *Filesystem) Write(_ context.Context, _ string, state *v1.CraftingState)
8181
return nil
8282
}
8383

84-
func (l *Filesystem) Read(_ context.Context, _ string, state *v1.CraftingState) error {
84+
func (l *Filesystem) Read(_ context.Context, _ string, state *crafter.VersionedCraftingState) error {
8585
if state == nil {
8686
return fmt.Errorf("state cannot be nil")
8787
}

internal/attestation/crafter/statemanager/filesystem/filesystem_test.go

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"os"
2222
"testing"
2323

24+
"github.com/chainloop-dev/chainloop/internal/attestation/crafter"
2425
v1 "github.com/chainloop-dev/chainloop/internal/attestation/crafter/api/attestation/v1"
2526
"github.com/chainloop-dev/chainloop/internal/attestation/crafter/statemanager"
2627
"github.com/stretchr/testify/require"
@@ -61,7 +62,7 @@ func (s *testSuite) TestNew() {
6162
func (s *testSuite) TestWrite() {
6263
testCases := []struct {
6364
name string
64-
state *v1.CraftingState
65+
state *crafter.VersionedCraftingState
6566
wantErr bool
6667
}{
6768
{
@@ -87,7 +88,7 @@ func (s *testSuite) TestWrite() {
8788
}
8889

8990
s.NoError(err)
90-
got := &v1.CraftingState{}
91+
got := &crafter.VersionedCraftingState{CraftingState: &v1.CraftingState{}}
9192
err = sm.Read(context.Background(), "", got)
9293
s.NoError(err)
9394
s.Equal(tc.state, got)
@@ -107,7 +108,7 @@ func (s *testSuite) TestRead() {
107108
s.T().Run("no state found in path return NotFound error", func(t *testing.T) {
108109
sm, err := New(s.statePath)
109110
require.NoError(t, err)
110-
err = sm.Read(context.Background(), "", &v1.CraftingState{})
111+
err = sm.Read(context.Background(), "", &crafter.VersionedCraftingState{})
111112
s.Error(err)
112113
want := &statemanager.ErrNotFound{}
113114
s.ErrorAs(err, &want)
@@ -116,7 +117,7 @@ func (s *testSuite) TestRead() {
116117
s.T().Run("we can read the state", func(t *testing.T) {
117118
sm, err := New("testdata/state.json")
118119
require.NoError(t, err)
119-
got := &v1.CraftingState{}
120+
got := &crafter.VersionedCraftingState{CraftingState: &v1.CraftingState{}}
120121
err = sm.Read(context.Background(), "", got)
121122
require.NoError(s.T(), err)
122123

@@ -173,14 +174,14 @@ func (s *testSuite) TestInitialized() {
173174
type testSuite struct {
174175
suite.Suite
175176
statePath string
176-
exampleState *v1.CraftingState
177+
exampleState *crafter.VersionedCraftingState
177178
}
178179

179180
func (s *testSuite) SetupTest() {
180181
s.statePath = fmt.Sprintf("%s/attestation.json", s.T().TempDir())
181-
s.exampleState = &v1.CraftingState{DryRun: true, Attestation: &v1.Attestation{
182+
s.exampleState = &crafter.VersionedCraftingState{CraftingState: &v1.CraftingState{DryRun: true, Attestation: &v1.Attestation{
182183
Annotations: map[string]string{"foo": "bar"},
183-
}}
184+
}}}
184185
}
185186

186187
func TestSuite(t *testing.T) {

0 commit comments

Comments
 (0)