From 0670e2fbf9013197514265a574b72bc67e539bbe Mon Sep 17 00:00:00 2001 From: Zeynel Koca Date: Mon, 6 Oct 2025 15:03:18 +0200 Subject: [PATCH 1/7] Global ttlInSeconds metadata configuration for DynamoDb Signed-off-by: Zeynel Koca --- state/aws/dynamodb/dynamodb.go | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/state/aws/dynamodb/dynamodb.go b/state/aws/dynamodb/dynamodb.go index c89ded68ea..918c85b68f 100644 --- a/state/aws/dynamodb/dynamodb.go +++ b/state/aws/dynamodb/dynamodb.go @@ -48,6 +48,7 @@ type StateStore struct { table string ttlAttributeName string partitionKey string + ttlInSeconds *int dynamodbClient *dynamodb.Client } @@ -64,6 +65,7 @@ type dynamoDBMetadata struct { Table string `json:"table"` TTLAttributeName string `json:"ttlAttributeName"` PartitionKey string `json:"partitionKey"` + TTLInSeconds *int `json:"ttlInSeconds" mapstructure:"ttlInSeconds"` } type putData struct { @@ -117,6 +119,7 @@ func (d *StateStore) Init(ctx context.Context, metadata state.Metadata) error { d.table = meta.Table d.ttlAttributeName = meta.TTLAttributeName d.partitionKey = meta.PartitionKey + d.ttlInSeconds = meta.TTLInSeconds if err := d.validateTableAccess(ctx); err != nil { return fmt.Errorf("error validating DynamoDB table '%s' access: %w", d.table, err) @@ -425,6 +428,11 @@ func (d *StateStore) parseTTL(req *state.SetRequest) (*int64, error) { return &expirationTime, nil } + // apply global TTL if no explicit TTL in request metadata + if d.ttlInSeconds != nil { + expirationTime := time.Now().Unix() + int64(*d.ttlInSeconds) + return &expirationTime, nil + } } return nil, nil @@ -471,7 +479,26 @@ func (d *StateStore) Multi(ctx context.Context, request *state.TransactionalStat if err != nil { return fmt.Errorf("dynamodb error: failed to marshal value for key %s: %w", req.Key, err) } - twi.Put = pd.ToPut() + ttl, err := d.parseTTL(&req) + if err != nil { + return fmt.Errorf("dynamodb error: failed to parse ttlInSeconds: %w", err) + } + twi.Put = &types.Put{ + TableName: ptr.Of(d.table), + Item: map[string]types.AttributeValue{ + d.partitionKey: &types.AttributeValueMemberS{ + Value: req.Key, + }, + "value": &types.AttributeValueMemberS{ + Value: value, + }, + }, + } + if ttl != nil { + twi.Put.Item[d.ttlAttributeName] = &types.AttributeValueMemberN{ + Value: strconv.FormatInt(*ttl, 10), + } + } case state.DeleteRequest: twi.Delete = &types.Delete{ From f8b66477ff5358477284e1847bfb4357e406b85a Mon Sep 17 00:00:00 2001 From: Zeynel Koca Date: Mon, 6 Oct 2025 15:03:33 +0200 Subject: [PATCH 2/7] Document new property Signed-off-by: Zeynel Koca --- state/aws/dynamodb/metadata.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/state/aws/dynamodb/metadata.yaml b/state/aws/dynamodb/metadata.yaml index dfc7d0ab36..c5400e0e12 100644 --- a/state/aws/dynamodb/metadata.yaml +++ b/state/aws/dynamodb/metadata.yaml @@ -36,6 +36,12 @@ metadata: The table attribute name which should be used for TTL. example: '"expiresAt"' type: string + - name: ttlInSeconds + required: false + description: | + Allows specifying a default Time-to-live (TTL) in seconds that will be applied to every state store request unless TTL is explicitly defined via the request metadata. + example: '"600"' + type: number - name: partitionKey required: false description: | From f300b2b6ffcd7c2911950c795c9653467d18f7aa Mon Sep 17 00:00:00 2001 From: Zeynel Koca Date: Mon, 6 Oct 2025 15:03:45 +0200 Subject: [PATCH 3/7] Unit tests Signed-off-by: Zeynel Koca --- state/aws/dynamodb/dynamodb_test.go | 167 ++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/state/aws/dynamodb/dynamodb_test.go b/state/aws/dynamodb/dynamodb_test.go index e93b41caea..eb33fb32ff 100644 --- a/state/aws/dynamodb/dynamodb_test.go +++ b/state/aws/dynamodb/dynamodb_test.go @@ -1206,3 +1206,170 @@ func TestMultiTx(t *testing.T) { require.NoError(t, err) }) } + +func TestParseTTLWithDefault(t *testing.T) { + t.Run("Use explicit TTL from request metadata", func(t *testing.T) { + defaultTTL := 600 + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{ + "ttlInSeconds": "300", + }, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + require.NotNil(t, ttl) + + // Should use explicit value (300), not default (600) + expectedTime := time.Now().Unix() + 300 + assert.InDelta(t, expectedTime, *ttl, 2) // Allow 2 second tolerance + }) + + t.Run("Use default TTL when no explicit TTL in request", func(t *testing.T) { + defaultTTL := 600 + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{}, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + require.NotNil(t, ttl) + + // Should use default value (600) + expectedTime := time.Now().Unix() + 600 + assert.InDelta(t, expectedTime, *ttl, 2) // Allow 2 second tolerance + }) + + t.Run("No TTL when no default and no explicit TTL", func(t *testing.T) { + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: nil, // No default configured + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{}, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + assert.Nil(t, ttl) + }) + + t.Run("No TTL when ttlAttributeName is not set", func(t *testing.T) { + defaultTTL := 600 + s := StateStore{ + ttlAttributeName: "", // TTL not enabled in component + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{ + "ttlInSeconds": "300", + }, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + assert.Nil(t, ttl) // Should return nil when TTL not enabled + }) + + t.Run("Explicit TTL with value -1", func(t *testing.T) { + defaultTTL := 600 + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{ + "ttlInSeconds": "-1", + }, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + require.NotNil(t, ttl) + + // -1 should result in immediate expiration (now + -1) + expectedTime := time.Now().Unix() - 1 + assert.InDelta(t, expectedTime, *ttl, 2) + }) + + t.Run("Default TTL with large value", func(t *testing.T) { + defaultTTL := 86400 // 24 hours + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{}, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + require.NotNil(t, ttl) + + expectedTime := time.Now().Unix() + 86400 + assert.InDelta(t, expectedTime, *ttl, 2) + }) + + t.Run("Error on invalid TTL value", func(t *testing.T) { + defaultTTL := 600 + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{ + "ttlInSeconds": "invalid", + }, + } + + ttl, err := s.parseTTL(req) + require.Error(t, err) + assert.Nil(t, ttl) + assert.Contains(t, err.Error(), "invalid syntax") + }) + + t.Run("Explicit TTL overrides default in request with empty metadata", func(t *testing.T) { + defaultTTL := 1200 + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{ + "ttlInSeconds": "0", + }, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + require.NotNil(t, ttl) + + // Should use explicit value 0, not default + expectedTime := time.Now().Unix() + assert.InDelta(t, expectedTime, *ttl, 2) + }) +} \ No newline at end of file From d34576b41e348c26709e7a4ece8e5173a695ee7c Mon Sep 17 00:00:00 2001 From: Zeynel <48792248+ZeynelKoca@users.noreply.github.com> Date: Mon, 27 Oct 2025 10:41:37 +0100 Subject: [PATCH 4/7] Update state/aws/dynamodb/metadata.yaml Co-authored-by: Sam Signed-off-by: Zeynel <48792248+ZeynelKoca@users.noreply.github.com> --- state/aws/dynamodb/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/state/aws/dynamodb/metadata.yaml b/state/aws/dynamodb/metadata.yaml index c5400e0e12..4c8dadd3a5 100644 --- a/state/aws/dynamodb/metadata.yaml +++ b/state/aws/dynamodb/metadata.yaml @@ -39,7 +39,7 @@ metadata: - name: ttlInSeconds required: false description: | - Allows specifying a default Time-to-live (TTL) in seconds that will be applied to every state store request unless TTL is explicitly defined via the request metadata. + Allows specifying a Time-to-live (TTL) in seconds that will be applied to every state store request unless TTL is explicitly defined via the request metadata. example: '"600"' type: number - name: partitionKey From 0c79d4d6b5d006ae5bfc3e28a304dc2cf593f680 Mon Sep 17 00:00:00 2001 From: Zeynel Koca Date: Mon, 27 Oct 2025 10:58:23 +0100 Subject: [PATCH 5/7] Global TTL of 0 or lower should never expire Signed-off-by: Zeynel Koca --- state/aws/dynamodb/dynamodb.go | 8 +++++ state/aws/dynamodb/dynamodb_test.go | 54 ++++++++++++++++++++++------- 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/state/aws/dynamodb/dynamodb.go b/state/aws/dynamodb/dynamodb.go index 918c85b68f..c97d32b391 100644 --- a/state/aws/dynamodb/dynamodb.go +++ b/state/aws/dynamodb/dynamodb.go @@ -423,6 +423,10 @@ func (d *StateStore) parseTTL(req *state.SetRequest) (*int64, error) { if err != nil { return nil, err } + // Values <= 0 mean no TTL (never expires) + if parsedVal <= 0 { + return nil, nil + } // DynamoDB expects an epoch timestamp in seconds. expirationTime := time.Now().Unix() + parsedVal @@ -430,6 +434,10 @@ func (d *StateStore) parseTTL(req *state.SetRequest) (*int64, error) { } // apply global TTL if no explicit TTL in request metadata if d.ttlInSeconds != nil { + // Values <= 0 mean no TTL (never expires) + if *d.ttlInSeconds <= 0 { + return nil, nil + } expirationTime := time.Now().Unix() + int64(*d.ttlInSeconds) return &expirationTime, nil } diff --git a/state/aws/dynamodb/dynamodb_test.go b/state/aws/dynamodb/dynamodb_test.go index eb33fb32ff..ca1e47af7b 100644 --- a/state/aws/dynamodb/dynamodb_test.go +++ b/state/aws/dynamodb/dynamodb_test.go @@ -1287,7 +1287,7 @@ func TestParseTTLWithDefault(t *testing.T) { assert.Nil(t, ttl) // Should return nil when TTL not enabled }) - t.Run("Explicit TTL with value -1", func(t *testing.T) { + t.Run("Explicit TTL with value -1 means no expiration", func(t *testing.T) { defaultTTL := 600 s := StateStore{ ttlAttributeName: "expiresAt", @@ -1303,11 +1303,8 @@ func TestParseTTLWithDefault(t *testing.T) { ttl, err := s.parseTTL(req) require.NoError(t, err) - require.NotNil(t, ttl) - - // -1 should result in immediate expiration (now + -1) - expectedTime := time.Now().Unix() - 1 - assert.InDelta(t, expectedTime, *ttl, 2) + // -1 means never expire + assert.Nil(t, ttl) }) t.Run("Default TTL with large value", func(t *testing.T) { @@ -1350,7 +1347,7 @@ func TestParseTTLWithDefault(t *testing.T) { assert.Contains(t, err.Error(), "invalid syntax") }) - t.Run("Explicit TTL overrides default in request with empty metadata", func(t *testing.T) { + t.Run("Explicit TTL with value 0 means no expiration", func(t *testing.T) { defaultTTL := 1200 s := StateStore{ ttlAttributeName: "expiresAt", @@ -1366,10 +1363,43 @@ func TestParseTTLWithDefault(t *testing.T) { ttl, err := s.parseTTL(req) require.NoError(t, err) - require.NotNil(t, ttl) - - // Should use explicit value 0, not default - expectedTime := time.Now().Unix() - assert.InDelta(t, expectedTime, *ttl, 2) + // 0 means never expire, overriding default + assert.Nil(t, ttl) + }) + + t.Run("Default TTL with value 0 means no expiration", func(t *testing.T) { + defaultTTL := 0 + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{}, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + // Default of 0 means never expire + assert.Nil(t, ttl) + }) + + t.Run("Default TTL with negative value means no expiration", func(t *testing.T) { + defaultTTL := -1 + s := StateStore{ + ttlAttributeName: "expiresAt", + ttlInSeconds: &defaultTTL, + } + + req := &state.SetRequest{ + Key: "test-key", + Metadata: map[string]string{}, + } + + ttl, err := s.parseTTL(req) + require.NoError(t, err) + // Default of -1 means never expire + assert.Nil(t, ttl) }) } \ No newline at end of file From e01dcdf4868104204c627ab37c02b1a07753a9c9 Mon Sep 17 00:00:00 2001 From: Zeynel <48792248+ZeynelKoca@users.noreply.github.com> Date: Tue, 28 Oct 2025 09:56:48 +0100 Subject: [PATCH 6/7] Update state/aws/dynamodb/metadata.yaml Co-authored-by: Sam Signed-off-by: Zeynel <48792248+ZeynelKoca@users.noreply.github.com> --- state/aws/dynamodb/metadata.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/state/aws/dynamodb/metadata.yaml b/state/aws/dynamodb/metadata.yaml index 4c8dadd3a5..188c3b350f 100644 --- a/state/aws/dynamodb/metadata.yaml +++ b/state/aws/dynamodb/metadata.yaml @@ -39,8 +39,9 @@ metadata: - name: ttlInSeconds required: false description: | - Allows specifying a Time-to-live (TTL) in seconds that will be applied to every state store request unless TTL is explicitly defined via the request metadata. + Allows specifying a Time-to-live (TTL) in seconds that will be applied to every state store request unless TTL is explicitly defined via the request metadata. If set to zero or less, no default TTL is applied, and items will only expire if a TTL is explicitly provided in the request metadata with if ttlAttributeName is set. example: '"600"' + default: "0" type: number - name: partitionKey required: false From e8b1d9d2aa91703c3378627c1823f76be16baa70 Mon Sep 17 00:00:00 2001 From: Zeynel Koca Date: Thu, 30 Oct 2025 15:56:26 +0100 Subject: [PATCH 7/7] Fix unused pd variable and undefined value in Multi method Signed-off-by: Zeynel Koca --- state/aws/dynamodb/dynamodb.go | 21 +-------------------- 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/state/aws/dynamodb/dynamodb.go b/state/aws/dynamodb/dynamodb.go index c97d32b391..ba6f547c79 100644 --- a/state/aws/dynamodb/dynamodb.go +++ b/state/aws/dynamodb/dynamodb.go @@ -487,26 +487,7 @@ func (d *StateStore) Multi(ctx context.Context, request *state.TransactionalStat if err != nil { return fmt.Errorf("dynamodb error: failed to marshal value for key %s: %w", req.Key, err) } - ttl, err := d.parseTTL(&req) - if err != nil { - return fmt.Errorf("dynamodb error: failed to parse ttlInSeconds: %w", err) - } - twi.Put = &types.Put{ - TableName: ptr.Of(d.table), - Item: map[string]types.AttributeValue{ - d.partitionKey: &types.AttributeValueMemberS{ - Value: req.Key, - }, - "value": &types.AttributeValueMemberS{ - Value: value, - }, - }, - } - if ttl != nil { - twi.Put.Item[d.ttlAttributeName] = &types.AttributeValueMemberN{ - Value: strconv.FormatInt(*ttl, 10), - } - } + twi.Put = pd.ToPut() case state.DeleteRequest: twi.Delete = &types.Delete{