@@ -1091,7 +1091,7 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
10911091 return err
10921092}
10931093
1094- // Echo tests the liveness of the OVSDB connetion
1094+ // Echo tests the liveness of the OVSDB connection
10951095func (o *ovsdbClient) Echo(ctx context.Context) error {
10961096 args := ovsdb.NewEchoArgs()
10971097 var reply []any
@@ -1376,15 +1376,21 @@ func isCacheConsistent(db *database) bool {
13761376
13771377// best effort to ensure cache is in a good state for reading. RLocks the
13781378// database's cache before returning; caller must always unlock.
1379- func waitForCacheConsistent(ctx context.Context, db *database, logger *logr.Logger, dbName string) {
1379+ func (o *ovsdbClient) waitForCacheConsistent(ctx context.Context, db *database, logger *logr.Logger, dbName string) error {
13801380 if !hasMonitors(db) {
13811381 db.cacheMutex.RLock()
1382- return
1382+ return nil
1383+ }
1384+
1385+ if err := o.Echo(ctx); err != nil {
1386+ db.cacheMutex.RLock()
1387+ return err
13831388 }
1389+
13841390 // Check immediately as a fastpath
13851391 db.cacheMutex.RLock()
13861392 if isCacheConsistent(db) {
1387- return
1393+ return nil
13881394 }
13891395 db.cacheMutex.RUnlock()
13901396
@@ -1396,11 +1402,11 @@ func waitForCacheConsistent(ctx context.Context, db *database, logger *logr.Logg
13961402 logger.V(3).Info("warning: unable to ensure cache consistency for reading",
13971403 "database", dbName)
13981404 db.cacheMutex.RLock()
1399- return
1405+ return nil
14001406 case <-ticker.C:
14011407 db.cacheMutex.RLock()
14021408 if isCacheConsistent(db) {
1403- return
1409+ return nil
14041410 }
14051411 db.cacheMutex.RUnlock()
14061412 }
@@ -1420,8 +1426,11 @@ func hasMonitors(db *database) bool {
14201426// Get implements the API interface's Get function
14211427func (o *ovsdbClient) Get(ctx context.Context, model model.Model) error {
14221428 primaryDB := o.primaryDB()
1423- waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
1429+ err := o. waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
14241430 defer primaryDB.cacheMutex.RUnlock()
1431+ if err != nil {
1432+ return err
1433+ }
14251434 return primaryDB.api.Get(ctx, model)
14261435}
14271436
@@ -1433,8 +1442,11 @@ func (o *ovsdbClient) Create(models ...model.Model) ([]ovsdb.Operation, error) {
14331442// List implements the API interface's List function
14341443func (o *ovsdbClient) List(ctx context.Context, result any) error {
14351444 primaryDB := o.primaryDB()
1436- waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
1445+ err := o. waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
14371446 defer primaryDB.cacheMutex.RUnlock()
1447+ if err != nil {
1448+ return err
1449+ }
14381450 return primaryDB.api.List(ctx, result)
14391451}
14401452
@@ -1453,6 +1465,17 @@ func (o *ovsdbClient) WhereAll(m model.Model, conditions ...model.Condition) Con
14531465 return o.primaryDB().api.WhereAll(m, conditions...)
14541466}
14551467
1468+ // WherePredict implements the API interface's WherePredict function
1469+ func (o *ovsdbClient) WherePredict(ctx context.Context, predicate interface{}) (ConditionalAPI, error) {
1470+ primaryDB := o.primaryDB()
1471+ err := o.waitForCacheConsistent(ctx, primaryDB, o.logger, o.primaryDBName)
1472+ defer primaryDB.cacheMutex.RUnlock()
1473+ if err != nil {
1474+ return nil, err
1475+ }
1476+ return o.primaryDB().api.WhereCache(predicate), nil
1477+ }
1478+
14561479// WhereCache implements the API interface's WhereCache function
14571480func (o *ovsdbClient) WhereCache(predicate any) ConditionalAPI {
14581481 return o.primaryDB().api.WhereCache(predicate)
0 commit comments