Skip to content

Commit fe3774e

Browse files
committed
ipam; Introduce node prefix update damping logic
Implements a mechanism to damp frequent updates of node prefixes in the SQLite storage, reducing unnecessary database write operations, especially for the 'updatedAt' timestamp. Previously, node prefixes would have their 'updatedAt' timestamp refreshed on every `Allocate` or `Release` operation that involved `conduit.GetNode`, despite no prefix parameters had changed. This could lead to a high volume of writes for frequently accessed node prefixes.
1 parent 3242155 commit fe3774e

File tree

4 files changed

+279
-10
lines changed

4 files changed

+279
-10
lines changed

pkg/ipam/server.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/*
22
Copyright (c) 2021 Nordix Foundation
3-
Copyright (c) 2024 OpenInfra Foundation Europe
3+
Copyright (c) 2024-2025 OpenInfra Foundation Europe
44
55
Licensed under the Apache License, Version 2.0 (the "License");
66
you may not use this file except in compliance with the License.
@@ -35,6 +35,8 @@ import (
3535
"google.golang.org/protobuf/types/known/emptypb"
3636
)
3737

38+
const nodeUpdateDampingThreshold = 1 * time.Minute // update node prefix if last update older than threshold // TODO: configuration
39+
3840
type IpamServer struct {
3941
ctx context.Context
4042
logger logr.Logger
@@ -94,9 +96,9 @@ func NewServer(
9496
// allocation, it would make the cluster upgrade problematic due to the possible
9597
// mix of old and new clients. Hence, IMHO the best approach for now is to let the
9698
// server decide which prefixes should have their expirable attribute set. Currently,
97-
// this translates to calls of node.Allocate() (excluding bridges). The information
98-
// regarding the time scope is passed in context to avoid the need for changing all
99-
// the API in between.
99+
// this translates to calls of node.Allocate() (excluding bridges) and conduit.GetNode().
100+
// The information regarding the time scope is passed in context to avoid the need for
101+
// changing all the API in between.
100102
func (is *IpamServer) Allocate(ctx context.Context, child *ipamAPI.Child) (*ipamAPI.Prefix, error) {
101103
ctx = logr.NewContext(ctx, is.logger)
102104
trench, exists := is.Trenches[child.GetSubnet().GetIpFamily()]
@@ -126,7 +128,8 @@ func (is *IpamServer) Allocate(ctx context.Context, child *ipamAPI.Child) (*ipam
126128
break
127129
}
128130
}
129-
node, err := conduit.GetNode(sqlite.WithExpirable(ctx), child.GetSubnet().GetNode()) // Note: refreshes existing node prefix (i.e. the 'updatedAt' timestamp)
131+
getNodeCtx := sqlite.WithUpdateDamping(sqlite.WithExpirable(ctx), nodeUpdateDampingThreshold) // Applies damping to reduce frequent DB writes and marks node prefix as expirable
132+
node, err := conduit.GetNode(getNodeCtx, child.GetSubnet().GetNode()) // Note: refreshes existing node prefix (i.e. the 'updatedAt' timestamp)
130133
if err != nil {
131134
return nil, fmt.Errorf("failed getting node (%s) while allocating (%s): %w", child.GetSubnet().GetNode(), child.GetName(), err)
132135
}
@@ -170,7 +173,8 @@ func (is *IpamServer) Release(ctx context.Context, child *ipamAPI.Child) (*empty
170173
// Note: Currently also refreshes existing node prefix (i.e. the 'updatedAt' timestamp).
171174
// Not sure node refresh is needed in case of Release, but errors must be avoided for
172175
// sure if node was reaped by a Garbage Collector logic.
173-
node, err := conduit.GetNode(sqlite.WithExpirable(ctx), child.GetSubnet().GetNode())
176+
getNodeCtx := sqlite.WithUpdateDamping(sqlite.WithExpirable(ctx), nodeUpdateDampingThreshold)
177+
node, err := conduit.GetNode(getNodeCtx, child.GetSubnet().GetNode())
174178
if err != nil {
175179
return &emptypb.Empty{}, fmt.Errorf("failed getting node (%s) while releasing (%s): %w", child.GetSubnet().GetNode(), child.GetName(), err)
176180
}

pkg/ipam/storage/sqlite/context.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright (c) 2024 OpenInfra Foundation Europe
2+
Copyright (c) 2024-2025 OpenInfra Foundation Europe
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
@@ -16,10 +16,14 @@ limitations under the License.
1616

1717
package sqlite
1818

19-
import "context"
19+
import (
20+
"context"
21+
"time"
22+
)
2023

2124
const (
22-
lifetimeKey contextKeyType = "expirable"
25+
lifetimeKey contextKeyType = "expirable"
26+
updateDampingKey contextKeyType = "updateDamping"
2327
)
2428

2529
type contextKeyType string
@@ -39,3 +43,17 @@ func Expirable(ctx context.Context) bool {
3943
_, ok := ctx.Value(lifetimeKey).(struct{})
4044
return ok
4145
}
46+
47+
// WithUpdateDamping -
48+
// Returns a new context that signals a request for damped updating
49+
func WithUpdateDamping(ctx context.Context, threshold time.Duration) context.Context {
50+
return context.WithValue(ctx, updateDampingKey, threshold)
51+
}
52+
53+
// getUpdateDampingThreshold -
54+
// Retrieves the damping threshold from the context. The boolean indicates
55+
// if the damping logic should be applied at all.
56+
func getUpdateDampingThreshold(ctx context.Context) (time.Duration, bool) {
57+
val, ok := ctx.Value(updateDampingKey).(time.Duration)
58+
return val, ok
59+
}

pkg/ipam/storage/sqlite/sqlite.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ func (sqlis *SQLiteIPAMStorage) Add(ctx context.Context, prefix types.Prefix) er
220220
}
221221

222222
// Update -
223-
// Updates or add the database entry.
223+
// Updates or adds the database entry.
224224
// Currently, the whole purpose of this function is to update the UpdatedAt
225225
// field in the database that is used by garbage collector logic to clean up
226226
// unused entries that haven't been updated for a long time. And to keep the
@@ -239,6 +239,9 @@ func (sqlis *SQLiteIPAMStorage) Update(ctx context.Context, prefix types.Prefix)
239239
exp = true
240240
}
241241
model.Expirable = &exp
242+
if ok := sqlis.shouldUpdate(ctx, model); !ok {
243+
return nil // Success, but no-op.
244+
}
242245
tx := sqlis.DB.Save(model)
243246
if isCIDRUniquenessViolation(tx.Error) {
244247
return fmt.Errorf("%w: %v", ErrCIDRConflict, tx.Error)
@@ -337,6 +340,43 @@ func (sqlis *SQLiteIPAMStorage) delete(prefix types.Prefix) error {
337340
return tx.Error
338341
}
339342

343+
// shouldUpdate determines if update is required based on the damping configuration
344+
// or lack of it. In case damping is enabled, it tries to fetch the prefix from the
345+
// db to retrieve its updatedAt timestamp.
346+
func (sqlis *SQLiteIPAMStorage) shouldUpdate(ctx context.Context, model *Prefix) bool {
347+
updateThreshold, ok := getUpdateDampingThreshold(ctx)
348+
if !ok {
349+
return true // Damping not enabled
350+
}
351+
352+
var result *Prefix
353+
// Use Select to specify exactly which fields to load based on ID (due to performance considerations)
354+
err := sqlis.DB.Model(&Prefix{}).
355+
Select("id", "cidr", "name", "parent_id", "expirable", "updated_at").
356+
Where("id = ?", model.Id).
357+
First(&result).Error
358+
if err != nil {
359+
return true // Record might not exist (keep backward compatibility)
360+
}
361+
362+
// Explicitly check for significant data changes
363+
modelExpirable := model.Expirable != nil && *model.Expirable
364+
resultExpirable := result.Expirable != nil && *result.Expirable
365+
if model.ParentID != result.ParentID ||
366+
modelExpirable != resultExpirable ||
367+
model.Cidr != result.Cidr ||
368+
model.Name != result.Name {
369+
return true // Legitimate update changing fields
370+
}
371+
372+
// Damping logic
373+
if time.Since(result.UpdatedAt) < updateThreshold {
374+
return false // Recently updated within updateThreshold period
375+
}
376+
377+
return true
378+
}
379+
340380
// migrate aims to find specific prefixes within the IPAM hierarchy that should
341381
// be subject to garbage collection but currently are not.
342382
//

pkg/ipam/storage/sqlite/sqlite_test.go

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
"github.com/nordix/meridio/pkg/ipam/prefix"
3030
"github.com/nordix/meridio/pkg/ipam/storage/sqlite"
31+
"github.com/nordix/meridio/pkg/ipam/types"
3132
"github.com/stretchr/testify/assert"
3233
sqliteDrv "gorm.io/driver/sqlite"
3334
"gorm.io/gorm"
@@ -651,3 +652,209 @@ func TestGarbageCollectorLogic(t *testing.T) {
651652
}
652653
}
653654
}
655+
656+
func TestUpdateDampingUpdatedAt(t *testing.T) {
657+
dbConn, cleanup := setupTestDB(t)
658+
defer cleanup()
659+
assert.NoError(t, dbConn.AutoMigrate(&sqlite.Prefix{}), "Failed to auto-migrate schema")
660+
661+
sqlis := newSQLiteIPAMStorageForTest(t, dbConn)
662+
baseCtx := context.Background()
663+
664+
// Define common test prefix properties
665+
prefixID := "testNode"
666+
prefixCidr := "192.168.0.0/24"
667+
668+
// Helper to create and insert a prefix with a specific UpdatedAt
669+
createTestPrefix := func(id string, updatedAt time.Time) types.Prefix {
670+
expirableTrue := false // false to avoid the usage of sqlite.WithExpirable for sqlite.Update
671+
p := &sqlite.Prefix{
672+
ParentID: "", // no parent so that Prefix Id and Prefix Name could be the same values
673+
Id: id, // mimic prefixToModel()
674+
Name: id, // mimic prefixToModel()
675+
Cidr: prefixCidr,
676+
Expirable: &expirableTrue,
677+
UpdatedAt: updatedAt,
678+
}
679+
assert.NoError(t, dbConn.Create(p).Error, fmt.Sprintf("Failed to create initial prefix for test, id: %q, cidr: %q", id, prefixCidr))
680+
return prefix.New(p.Name, p.Cidr, nil) // Convert to types.Prefix for storage's Update method
681+
}
682+
683+
// Helper to fetch UpdatedAt directly from DB
684+
fetchUpdatedAt := func(id string) time.Time {
685+
var p sqlite.Prefix
686+
err := dbConn.Select("updated_at").Where("id = ?", id).First(&p).Error
687+
assert.NoError(t, err, fmt.Sprintf("Failed to fetch UpdatedAt for ID: %q", id))
688+
return p.UpdatedAt
689+
}
690+
691+
deletePrefixById := func(id string) {
692+
_ = dbConn.Where("id = ?", id).Delete(&sqlite.Prefix{}).Error
693+
}
694+
695+
dampingThreshold := 1 * time.Minute
696+
697+
// Scenario 1: Damping enabled, recently updated (within threshold), no other changes -> UpdatedAt should NOT change
698+
t.Run("UpdatedAt_Damped", func(t *testing.T) {
699+
pID := prefixID + "Damped"
700+
defer deletePrefixById(pID)
701+
initialUpdatedAt := time.Now().UTC().Add(-30 * time.Second) // 30 seconds ago
702+
testPrefix := createTestPrefix(pID, initialUpdatedAt)
703+
704+
err := sqlis.Update(sqlite.WithUpdateDamping(baseCtx, dampingThreshold), testPrefix)
705+
assert.NoError(t, err, "Update should not error when damped")
706+
707+
finalUpdatedAt := fetchUpdatedAt(pID)
708+
assert.True(t, finalUpdatedAt.Equal(initialUpdatedAt), "UpdatedAt should remain unchanged (damped)")
709+
})
710+
711+
// Scenario 2: Damping enabled, updated long ago (older than threshold), no other changes -> UpdatedAt should CHANGE
712+
t.Run("UpdatedAt_NotDamped_OldPrefix", func(t *testing.T) {
713+
pID := prefixID + "NotDampedOldPrefix"
714+
defer deletePrefixById(pID)
715+
initialUpdatedAt := time.Now().UTC().Add(-2 * time.Minute) // 2 minutes ago
716+
testPrefix := createTestPrefix(pID, initialUpdatedAt)
717+
718+
err := sqlis.Update(sqlite.WithUpdateDamping(baseCtx, dampingThreshold), testPrefix)
719+
assert.NoError(t, err, "Update should not error when not damped")
720+
721+
finalUpdatedAt := fetchUpdatedAt(pID)
722+
assert.True(t, finalUpdatedAt.After(initialUpdatedAt), "UpdatedAt should be newer than initial time")
723+
})
724+
725+
// Scenario 3: Damping disabled (no context), recently updated -> UpdatedAt should CHANGE
726+
t.Run("UpdatedAt_NotDamped_Disabled", func(t *testing.T) {
727+
pID := prefixID + "NotDampedDisabled"
728+
defer deletePrefixById(pID)
729+
initialUpdatedAt := time.Now().UTC().Add(-30 * time.Second) // 30 seconds ago
730+
testPrefix := createTestPrefix(pID, initialUpdatedAt)
731+
732+
// No damping context applied
733+
err := sqlis.Update(baseCtx, testPrefix)
734+
assert.NoError(t, err, "Update should not error when damping is disabled")
735+
736+
finalUpdatedAt := fetchUpdatedAt(pID)
737+
assert.True(t, finalUpdatedAt.After(initialUpdatedAt), "UpdatedAt should be newer than initial time")
738+
})
739+
740+
// Scenario 4: Update a non-existent record (should create it, damping logic won't prevent initial creation)
741+
t.Run("UpdatedAt_NonExistentRecord", func(t *testing.T) {
742+
pID := prefixID + "NonExistent"
743+
defer deletePrefixById(pID)
744+
prefixName := pID // prefixToModel() in sqlite.Update() sets ID based on prefix Name
745+
746+
// Create a types.Prefix representing the new record
747+
newPrefix := prefix.New(prefixName, prefixCidr, nil)
748+
749+
err := sqlis.Update(sqlite.WithUpdateDamping(baseCtx, dampingThreshold), newPrefix)
750+
assert.NoError(t, err, "Update should succeed for a non-existent record (effectively a create)")
751+
752+
// Verify the record was created and UpdatedAt is set
753+
var fetchedPrefix sqlite.Prefix
754+
err = dbConn.Where("id = ?", pID).First(&fetchedPrefix).Error
755+
assert.NoError(t, err, "Failed to fetch newly created prefix")
756+
assert.False(t, fetchedPrefix.UpdatedAt.IsZero(), "UpdatedAt should be set for a new record")
757+
assert.Equal(t, pID, fetchedPrefix.Id)
758+
assert.Equal(t, prefixCidr, fetchedPrefix.Cidr)
759+
assert.Equal(t, prefixName, fetchedPrefix.Name)
760+
})
761+
762+
// Scenario 5: Update recent record changing its CIDR (should update it, damping logic won't prevent legitimate update)
763+
t.Run("UpdatedAt_NotDamped_RecentPrefixChanged", func(t *testing.T) {
764+
pID := prefixID + "RecentPrefixChanged"
765+
defer deletePrefixById(pID)
766+
prefixName := pID // prefixToModel() in sqlite.Update() sets ID based on prefix Name
767+
768+
initialUpdatedAt := time.Now().UTC().Add(-30 * time.Second) // 30 seconds ago
769+
testPrefix := createTestPrefix(pID, initialUpdatedAt)
770+
771+
changedPrefixCidr := "192.168.1.0/24"
772+
changedTestPrefix := prefix.New(testPrefix.GetName(), changedPrefixCidr, testPrefix.GetParent())
773+
err := sqlis.Update(sqlite.WithUpdateDamping(baseCtx, dampingThreshold), changedTestPrefix)
774+
assert.NoError(t, err, "Update should not error when damped")
775+
776+
var fetchedPrefix sqlite.Prefix
777+
err = dbConn.Where("id = ?", pID).First(&fetchedPrefix).Error
778+
assert.NoError(t, err, "Failed to fetch updated prefix")
779+
assert.True(t, fetchedPrefix.UpdatedAt.After(initialUpdatedAt), "UpdatedAt should be newer than initial time")
780+
assert.NotEqual(t, fetchedPrefix.Cidr, testPrefix.GetCidr(), "Cidr should be updated")
781+
assert.Equal(t, prefixName, fetchedPrefix.Name)
782+
})
783+
}
784+
785+
func TestExpirable(t *testing.T) {
786+
dbConn, cleanup := setupTestDB(t)
787+
defer cleanup()
788+
assert.NoError(t, dbConn.AutoMigrate(&sqlite.Prefix{}), "Failed to auto-migrate schema")
789+
790+
sqlis := newSQLiteIPAMStorageForTest(t, dbConn)
791+
baseCtx := context.Background()
792+
793+
// Define common test prefix properties
794+
commonPrefixName := "testNode"
795+
commonPrefixCidr := "192.168.0.0/24"
796+
commonParentID := ""
797+
798+
assertPrefixData := func(t *testing.T, name, expectedCidr, expectedParentID string, expectedExpirable bool, msg string) *sqlite.Prefix {
799+
var fetched []sqlite.Prefix
800+
err := dbConn.Where("name = ?", name).Find(&fetched).Error
801+
assert.NoError(t, err, fmt.Sprintf("%s: Error fetching prefix with Name %q", msg, name))
802+
assert.Len(t, fetched, 1, fmt.Sprintf("%s: Expected exactly one prefix with Name %q, but found %d", msg, name, len(fetched)))
803+
resultPrefix := fetched[0]
804+
805+
assert.False(t, resultPrefix.UpdatedAt.IsZero(), fmt.Sprintf("%s: UpdatedAt should be set for prefix with Name %q", msg, name))
806+
assert.Equal(t, expectedCidr, resultPrefix.Cidr, fmt.Sprintf("%s: CIDR mismatch for prefix with Name %q", msg, name))
807+
assert.Equal(t, expectedParentID, resultPrefix.ParentID, fmt.Sprintf("%s: ParentID mismatch for prefix with Name %q", msg, name))
808+
if expectedExpirable {
809+
assert.True(t, resultPrefix.Expirable != nil && *resultPrefix.Expirable,
810+
fmt.Sprintf("%s: Expirable should be true for prefix with Name %q", msg, name))
811+
} else {
812+
assert.True(t, resultPrefix.Expirable != nil && !*resultPrefix.Expirable,
813+
fmt.Sprintf("%s: Expirable should be false for prefix with Name %q", msg, name))
814+
}
815+
return &resultPrefix
816+
}
817+
818+
t.Run("InitialCreate_NonExpirable", func(t *testing.T) {
819+
testName := commonPrefixName + "InitialNonExpirable"
820+
testPrefix := prefix.New(testName, commonPrefixCidr, nil)
821+
822+
defer func() {
823+
_ = sqlis.Delete(baseCtx, testPrefix)
824+
}()
825+
826+
err := sqlis.Add(baseCtx, testPrefix)
827+
assert.NoError(t, err, "Add should succeed for initial creation")
828+
assertPrefixData(t, testName, commonPrefixCidr, commonParentID, false, "After initial create")
829+
})
830+
831+
t.Run("InitialCreate_Expirable", func(t *testing.T) {
832+
testName := commonPrefixName + "InitialExpirable"
833+
testPrefix := prefix.New(testName, commonPrefixCidr, nil)
834+
835+
defer func() {
836+
_ = sqlis.Delete(baseCtx, testPrefix)
837+
}()
838+
839+
err := sqlis.Add(sqlite.WithExpirable(baseCtx), testPrefix)
840+
assert.NoError(t, err, "Add should succeed for initial creation")
841+
assertPrefixData(t, testName, commonPrefixCidr, commonParentID, true, "After initial create")
842+
})
843+
844+
t.Run("UpdateExisting_WithExpirable", func(t *testing.T) {
845+
testName := commonPrefixName + "ExpirableUpdate"
846+
testPrefix := prefix.New(testName, commonPrefixCidr, nil)
847+
848+
defer func() {
849+
_ = sqlis.Delete(baseCtx, testPrefix)
850+
}()
851+
852+
err := sqlis.Update(baseCtx, testPrefix)
853+
assert.NoError(t, err, "Update should succeed for a non-existent record (effectively a create)")
854+
assertPrefixData(t, testName, commonPrefixCidr, commonParentID, false, "Before expirable update")
855+
856+
err = sqlis.Update(sqlite.WithExpirable(baseCtx), testPrefix)
857+
assert.NoError(t, err, "Update should succeed with expirable context")
858+
assertPrefixData(t, testName, commonPrefixCidr, commonParentID, true, "After expirable update")
859+
})
860+
}

0 commit comments

Comments
 (0)