@@ -1091,7 +1091,7 @@ func (o *ovsdbClient) monitor(ctx context.Context, cookie MonitorCookie, reconne
10911091 return err
10921092}
10931093
1094- // Echo tests the liveness of the OVSDB connetion
1094+ // Echo tests the liveness of the OVSDB connection
10951095func (o * ovsdbClient ) Echo (ctx context.Context ) error {
10961096 args := ovsdb .NewEchoArgs ()
10971097 var reply []any
@@ -1376,15 +1376,21 @@ func isCacheConsistent(db *database) bool {
13761376
13771377// best effort to ensure cache is in a good state for reading. RLocks the
13781378// database's cache before returning; caller must always unlock.
1379- func waitForCacheConsistent (ctx context.Context , db * database , logger * logr.Logger , dbName string ) {
1379+ func ( o * ovsdbClient ) waitForCacheConsistent (ctx context.Context , db * database , logger * logr.Logger , dbName string ) error {
13801380 if ! hasMonitors (db ) {
13811381 db .cacheMutex .RLock ()
1382- return
1382+ return nil
1383+ }
1384+
1385+ if err := o .Echo (ctx ); err != nil {
1386+ db .cacheMutex .RLock ()
1387+ return err
13831388 }
1389+
13841390 // Check immediately as a fastpath
13851391 db .cacheMutex .RLock ()
13861392 if isCacheConsistent (db ) {
1387- return
1393+ return nil
13881394 }
13891395 db .cacheMutex .RUnlock ()
13901396
@@ -1396,11 +1402,11 @@ func waitForCacheConsistent(ctx context.Context, db *database, logger *logr.Logg
13961402 logger .V (3 ).Info ("warning: unable to ensure cache consistency for reading" ,
13971403 "database" , dbName )
13981404 db .cacheMutex .RLock ()
1399- return
1405+ return nil
14001406 case <- ticker .C :
14011407 db .cacheMutex .RLock ()
14021408 if isCacheConsistent (db ) {
1403- return
1409+ return nil
14041410 }
14051411 db .cacheMutex .RUnlock ()
14061412 }
@@ -1420,8 +1426,11 @@ func hasMonitors(db *database) bool {
14201426// Get implements the API interface's Get function
14211427func (o * ovsdbClient ) Get (ctx context.Context , model model.Model ) error {
14221428 primaryDB := o .primaryDB ()
1423- waitForCacheConsistent (ctx , primaryDB , o .logger , o .primaryDBName )
1429+ err := o . waitForCacheConsistent (ctx , primaryDB , o .logger , o .primaryDBName )
14241430 defer primaryDB .cacheMutex .RUnlock ()
1431+ if err != nil {
1432+ return err
1433+ }
14251434 return primaryDB .api .Get (ctx , model )
14261435}
14271436
@@ -1433,8 +1442,11 @@ func (o *ovsdbClient) Create(models ...model.Model) ([]ovsdb.Operation, error) {
14331442// List implements the API interface's List function
14341443func (o * ovsdbClient ) List (ctx context.Context , result any ) error {
14351444 primaryDB := o .primaryDB ()
1436- waitForCacheConsistent (ctx , primaryDB , o .logger , o .primaryDBName )
1445+ err := o . waitForCacheConsistent (ctx , primaryDB , o .logger , o .primaryDBName )
14371446 defer primaryDB .cacheMutex .RUnlock ()
1447+ if err != nil {
1448+ return err
1449+ }
14381450 return primaryDB .api .List (ctx , result )
14391451}
14401452
@@ -1453,6 +1465,17 @@ func (o *ovsdbClient) WhereAll(m model.Model, conditions ...model.Condition) Con
14531465 return o .primaryDB ().api .WhereAll (m , conditions ... )
14541466}
14551467
1468+ // WherePredict implements the API interface's WherePredict function
1469+ func (o * ovsdbClient ) WherePredict (ctx context.Context , predicate interface {}) (ConditionalAPI , error ) {
1470+ primaryDB := o .primaryDB ()
1471+ err := o .waitForCacheConsistent (ctx , primaryDB , o .logger , o .primaryDBName )
1472+ defer primaryDB .cacheMutex .RUnlock ()
1473+ if err != nil {
1474+ return nil , err
1475+ }
1476+ return o .primaryDB ().api .WhereCache (predicate ), nil
1477+ }
1478+
14561479// WhereCache implements the API interface's WhereCache function
14571480func (o * ovsdbClient ) WhereCache (predicate any ) ConditionalAPI {
14581481 return o .primaryDB ().api .WhereCache (predicate )
0 commit comments