Skip to content

Commit cb1c2bc

Browse files
committed
ensure cache is consistent before reading data
Signed-off-by: zhangzujian <zhangzujian.7@gmail.com>
1 parent 5ea385c commit cb1c2bc

File tree

2 files changed

+53
-15
lines changed

2 files changed

+53
-15
lines changed

client/api.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ type API interface {
2222
// If it has a capacity != 0, only 'capacity' elements will be filled in
2323
List(ctx context.Context, result any) error
2424

25+
// Create a Conditional API from a Function that is used to filter cached data
26+
// The function must accept a Model implementation and return a boolean. E.g:
27+
// ConditionFromFunc(func(l *LogicalSwitch) bool { return l.Enabled })
28+
WherePredict(ctx context.Context, predicate any) (ConditionalAPI, error)
29+
2530
// Create a Conditional API from a Function that is used to filter cached data
2631
// The function must accept a Model implementation and return a boolean. E.g:
2732
// ConditionFromFunc(func(l *LogicalSwitch) bool { return l.Enabled })
@@ -151,8 +156,10 @@ func (a api) List(_ context.Context, result any) error {
151156
}
152157

153158
if a.cond != nil && a.cond.Table() != table {
154-
return &ErrWrongType{resultPtr.Type(),
155-
fmt.Sprintf("Table derived from input type (%s) does not match Table from Condition (%s)", table, a.cond.Table())}
159+
return &ErrWrongType{
160+
resultPtr.Type(),
161+
fmt.Sprintf("Table derived from input type (%s) does not match Table from Condition (%s)", table, a.cond.Table()),
162+
}
156163
}
157164

158165
tableCache := a.cache.Table(table)
@@ -206,6 +213,11 @@ func (a api) WhereAll(m model.Model, cond ...model.Condition) ConditionalAPI {
206213
return newConditionalAPI(a.cache, a.conditionFromExplicitConditions(true, m, cond...), a.logger, a.validateModel)
207214
}
208215

216+
// WherePredict returns a conditionalAPI based a Predicate
217+
func (a api) WherePredict(ctx context.Context, predicate interface{}) (ConditionalAPI, error) {
218+
return newConditionalAPI(a.cache, a.conditionFromFunc(predicate), a.logger, a.validateModel), nil
219+
}
220+
209221
// WhereCache returns a conditionalAPI based a Predicate
210222
func (a api) WhereCache(predicate any) ConditionalAPI {
211223
return newConditionalAPI(a.cache, a.conditionFromFunc(predicate), a.logger, a.validateModel)
@@ -335,7 +347,6 @@ func (a api) Create(models ...model.Model) ([]ovsdb.Operation, error) {
335347
namedUUID = tmpUUID
336348
} else if ovsdb.IsValidUUID(tmpUUID) {
337349
realUUID = tmpUUID
338-
339350
}
340351
} else {
341352
return nil, fmt.Errorf("error accessing _uuid field: %w", err)
@@ -621,14 +632,18 @@ func (a api) getTableFromFunc(predicate any) (string, error) {
621632
modelInterface := reflect.TypeOf((*model.Model)(nil)).Elem()
622633
modelType := predType.In(0)
623634
if !modelType.Implements(modelInterface) {
624-
return "", &ErrWrongType{predType,
625-
fmt.Sprintf("Type %s does not implement Model interface", modelType.String())}
635+
return "", &ErrWrongType{
636+
predType,
637+
fmt.Sprintf("Type %s does not implement Model interface", modelType.String()),
638+
}
626639
}
627640

628641
table := a.cache.DatabaseModel().FindTable(modelType)
629642
if table == "" {
630-
return "", &ErrWrongType{predType,
631-
fmt.Sprintf("Model %s not found in Database Model", modelType.String())}
643+
return "", &ErrWrongType{
644+
predType,
645+
fmt.Sprintf("Model %s not found in Database Model", modelType.String()),
646+
}
632647
}
633648
return table, nil
634649
}

client/client.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
10911091
return err
10921092
}
10931093

1094-
// Echo tests the liveness of the OVSDB connetion
1094+
// Echo tests the liveness of the OVSDB connection
10951095
func (o *ovsdbClient) Echo(ctx context.Context) error {
10961096
args := ovsdb.NewEchoArgs()
10971097
var reply []any
@@ -1376,15 +1376,21 @@ func isCacheConsistent(db *database) bool {
13761376

13771377
// best effort to ensure cache is in a good state for reading. RLocks the
13781378
// database's cache before returning; caller must always unlock.
1379-
func waitForCacheConsistent(ctx context.Context, db *database, logger *logr.Logger, dbName string) {
1379+
func (o *ovsdbClient) waitForCacheConsistent(ctx context.Context, db *database, logger *logr.Logger, dbName string) error {
13801380
if !hasMonitors(db) {
13811381
db.cacheMutex.RLock()
1382-
return
1382+
return nil
1383+
}
1384+
1385+
if err := o.Echo(ctx); err != nil {
1386+
db.cacheMutex.RLock()
1387+
return err
13831388
}
1389+
13841390
// Check immediately as a fastpath
13851391
db.cacheMutex.RLock()
13861392
if isCacheConsistent(db) {
1387-
return
1393+
return nil
13881394
}
13891395
db.cacheMutex.RUnlock()
13901396

@@ -1396,11 +1402,11 @@ func waitForCacheConsistent(ctx context.Context, db *database, logger *logr.Logg
13961402
logger.V(3).Info("warning: unable to ensure cache consistency for reading",
13971403
"database", dbName)
13981404
db.cacheMutex.RLock()
1399-
return
1405+
return nil
14001406
case <-ticker.C:
14011407
db.cacheMutex.RLock()
14021408
if isCacheConsistent(db) {
1403-
return
1409+
return nil
14041410
}
14051411
db.cacheMutex.RUnlock()
14061412
}
@@ -1420,8 +1426,11 @@ func hasMonitors(db *database) bool {
14201426
// Get implements the API interface's Get function
14211427
func (o *ovsdbClient) Get(ctx context.Context, model model.Model) error {
14221428
primaryDB := o.primaryDB()
1423-
waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
1429+
err := o.waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
14241430
defer primaryDB.cacheMutex.RUnlock()
1431+
if err != nil {
1432+
return err
1433+
}
14251434
return primaryDB.api.Get(ctx, model)
14261435
}
14271436

@@ -1433,8 +1442,11 @@ func (o *ovsdbClient) Create(models ...model.Model) ([]ovsdb.Operation, error) {
14331442
// List implements the API interface's List function
14341443
func (o *ovsdbClient) List(ctx context.Context, result any) error {
14351444
primaryDB := o.primaryDB()
1436-
waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
1445+
err := o.waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
14371446
defer primaryDB.cacheMutex.RUnlock()
1447+
if err != nil {
1448+
return err
1449+
}
14381450
return primaryDB.api.List(ctx, result)
14391451
}
14401452

@@ -1453,6 +1465,17 @@ func (o *ovsdbClient) WhereAll(m model.Model, conditions ...model.Condition) Con
14531465
return o.primaryDB().api.WhereAll(m, conditions...)
14541466
}
14551467

1468+
// WherePredict implements the API interface's WherePredict function
1469+
func (o *ovsdbClient) WherePredict(ctx context.Context, predicate interface{}) (ConditionalAPI, error) {
1470+
primaryDB := o.primaryDB()
1471+
err := o.waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
1472+
defer primaryDB.cacheMutex.RUnlock()
1473+
if err != nil {
1474+
return nil, err
1475+
}
1476+
return o.primaryDB().api.WhereCache(predicate), nil
1477+
}
1478+
14561479
// WhereCache implements the API interface's WhereCache function
14571480
func (o *ovsdbClient) WhereCache(predicate any) ConditionalAPI {
14581481
return o.primaryDB().api.WhereCache(predicate)

0 commit comments

Comments
 (0)