Skip to content

Commit fc06564

Browse files
committed
RONDB-933: RDRS1. Change feature store cache to asynchronously update… (#738)
* RONDB-933: RDRS1. Change feature store cache to asynchronously update the cached entries * RONDB-933: Fixes for review
1 parent 8f0bec1 commit fc06564

File tree

14 files changed

+379
-70
lines changed

14 files changed

+379
-70
lines changed

storage/ndb/rest-server/rest-api-server/cmd/server/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"hopsworks.ai/rdrs/internal/config"
2929
"hopsworks.ai/rdrs/internal/dal/heap"
30+
"hopsworks.ai/rdrs/internal/feature_store"
3031
"hopsworks.ai/rdrs/internal/log"
3132
"hopsworks.ai/rdrs/internal/metrics"
3233
"hopsworks.ai/rdrs/internal/security/apikey/hopsworkscache"
@@ -71,13 +72,18 @@ func main() {
7172
apiKeyCache := hopsworkscache.New()
7273
defer apiKeyCache.Cleanup()
7374

75+
//FS metadata cache
76+
featureViewMetaDataCache := feature_store.NewFeatureViewMetaDataCache()
77+
defer featureViewMetaDataCache.Cleanup()
78+
7479
// Prometheus metrics
7580
rdrsMetrics, rdrsMetricsCleanup := metrics.NewRDRSMetrics()
7681
defer rdrsMetricsCleanup()
7782

7883
cleanupServers, err := servers.CreateAndStartDefaultServers(
7984
newHeap,
8085
apiKeyCache,
86+
featureViewMetaDataCache,
8187
rdrsMetrics,
8288
quit)
8389
if err != nil {

storage/ndb/rest-server/rest-api-server/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ require (
7171
github.com/bytedance/sonic v1.12.5
7272
github.com/goccy/go-json v0.10.2 // indirect
7373
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
74-
github.com/patrickmn/go-cache v2.1.0+incompatible
7574
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
7675
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
7776
golang.org/x/arch v0.12.0 // indirect

storage/ndb/rest-server/rest-api-server/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
7171
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
7272
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
7373
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
74-
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
75-
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
7674
github.com/pelletier/go-toml/v2 v2.1.0 h1:FnwAJ4oYMvbT/34k9zzHuZNrhlz48GB3/s6at6/MHO4=
7775
github.com/pelletier/go-toml/v2 v2.1.0/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdUvyaeZurnPPDc=
7876
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=

storage/ndb/rest-server/rest-api-server/internal/config/init_defaults.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,13 @@ func newWithDefaults() AllConfigs {
106106
CacheRefreshIntervalJitterMS: 1000,
107107
},
108108
},
109+
FeatureStore: FeatureStore{
110+
FeatureStoreMetadataCache: FeatureStoreMetadataCache{
111+
CacheRefreshIntervalMS: 600000, // 10 min
112+
CacheUnusedEntriesEvictionMS: 1800000, // 30 min
113+
CacheRefreshIntervalJitterMS: 1000,
114+
},
115+
},
109116
Log: log.LogConfig{
110117
Level: "warn",
111118
FilePath: "",

storage/ndb/rest-server/rest-api-server/internal/config/structs.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,13 +247,39 @@ func (t *TestParameters) Validate() error {
247247
return nil
248248
}
249249

250+
type FeatureStoreMetadataCache struct {
251+
CacheRefreshIntervalMS uint32
252+
CacheUnusedEntriesEvictionMS uint32
253+
CacheRefreshIntervalJitterMS uint32
254+
}
255+
250256
type APIKey struct {
251257
UseHopsworksAPIKeys bool
252258
CacheRefreshIntervalMS uint32
253259
CacheUnusedEntriesEvictionMS uint32
254260
CacheRefreshIntervalJitterMS uint32
255261
}
256262

263+
func (fsmdc *FeatureStoreMetadataCache) Validate() error {
264+
if fsmdc.CacheRefreshIntervalMS == 0 {
265+
return errors.New("CacheRefreshIntervalMS cannot be 0")
266+
}
267+
268+
if fsmdc.CacheUnusedEntriesEvictionMS == 0 {
269+
return errors.New("CacheUnusedEntriesEvictionMS cannot be 0")
270+
}
271+
272+
if fsmdc.CacheRefreshIntervalMS > fsmdc.CacheUnusedEntriesEvictionMS {
273+
return errors.New("CacheRefreshIntervalMS can not be more that CacheUnusedEntriesEvictionMS")
274+
}
275+
276+
if fsmdc.CacheRefreshIntervalJitterMS >= fsmdc.CacheRefreshIntervalMS {
277+
return errors.New("CacheRefreshIntervalJitterMS must be smaller than CacheRefreshIntervalMS")
278+
}
279+
280+
return nil
281+
}
282+
257283
func (a *APIKey) Validate() error {
258284
if a.CacheRefreshIntervalMS == 0 {
259285
return errors.New("CacheRefreshIntervalMS cannot be 0")
@@ -302,6 +328,10 @@ func (t *TLS) Validate() error {
302328
return t.TestParameters.Validate()
303329
}
304330

331+
type FeatureStore struct {
332+
FeatureStoreMetadataCache FeatureStoreMetadataCache
333+
}
334+
305335
type Security struct {
306336
TLS TLS
307337
APIKey APIKey
@@ -321,6 +351,15 @@ func (c *Security) Validate() error {
321351
return nil
322352
}
323353

354+
func (fs *FeatureStore) Validate() error {
355+
err := fs.FeatureStoreMetadataCache.Validate()
356+
if err != nil {
357+
return err
358+
}
359+
360+
return nil
361+
}
362+
324363
/*
325364
The RDRS is tested on a regular basis by RonDB's MTR tests. These MTR tests
326365
have a config file defined for the RDRS in `mysql-test/suite/rdrs/include/have_rdrs.inc`.
@@ -335,6 +374,7 @@ type AllConfigs struct {
335374
RonDB RonDB
336375
RonDBMetadataCluster RonDB
337376
Security Security
377+
FeatureStore FeatureStore
338378
Log log.LogConfig
339379
Testing Testing
340380
}
@@ -351,6 +391,8 @@ func (c *AllConfigs) Validate() error {
351391
return err
352392
} else if err = c.Security.Validate(); err != nil {
353393
return err
394+
} else if err = c.FeatureStore.Validate(); err != nil {
395+
return err
354396
}
355397

356398
// c.RonDBMetaCluster is optional. Copy the cluster

storage/ndb/rest-server/rest-api-server/internal/feature_store/metadata.go

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,18 @@
1818
package feature_store
1919

2020
import (
21-
"encoding/json"
2221
"errors"
2322
"fmt"
2423
"reflect"
2524
"sort"
2625
"strings"
27-
"time"
2826

2927
"github.com/hamba/avro/v2"
30-
"github.com/patrickmn/go-cache"
3128

3229
"hopsworks.ai/rdrs/internal/dal"
3330
"hopsworks.ai/rdrs/internal/log"
3431
)
3532

36-
var DefaultExpiration time.Duration = 15 * time.Minute
37-
var CleanupInterval time.Duration = 15 * time.Minute
38-
3933
const ERROR_NOT_FOUND = "Not Found"
4034

4135
type ComplexFeature struct {
@@ -298,39 +292,6 @@ func getFeatureIndexKey(joinIndex int, fgId int, f string) *string {
298292
return &featureIndexKey
299293
}
300294

301-
type FeatureViewMetaDataCache struct {
302-
metadataCache cache.Cache
303-
}
304-
305-
func NewFeatureViewMetaDataCache() *FeatureViewMetaDataCache {
306-
var c = cache.New(DefaultExpiration, CleanupInterval)
307-
return &FeatureViewMetaDataCache{*c}
308-
}
309-
310-
func (fvmeta *FeatureViewMetaDataCache) Get(featureStoreName, featureViewName string, featureViewVersion int) (*FeatureViewMetadata, *RestErrorCode) {
311-
var fvCacheKey = getFeatureViewCacheKey(featureStoreName, featureViewName, featureViewVersion)
312-
var metadataInf, exist = fvmeta.metadataCache.Get(fvCacheKey)
313-
if !exist {
314-
var metadata, err = GetFeatureViewMetadata(featureStoreName, featureViewName, featureViewVersion)
315-
if err != nil {
316-
return nil, err
317-
} else {
318-
fvmeta.metadataCache.SetDefault(fvCacheKey, metadata)
319-
if log.IsDebug() {
320-
metadataJson, _ := json.MarshalIndent(metadata, "", " ")
321-
log.Debugf("Feature store metadata is %s", metadataJson)
322-
}
323-
return metadata, nil
324-
}
325-
} else {
326-
var metadata, ok = metadataInf.(*FeatureViewMetadata)
327-
if !ok {
328-
return nil, FETCH_METADATA_FROM_CACHE_FAIL
329-
}
330-
return metadata, nil
331-
}
332-
}
333-
334295
func getFeatureViewCacheKey(featureStoreName, featureViewName string, featureViewVersion int) string {
335296
return fmt.Sprintf("%s|%s|%d", featureStoreName, featureViewName, featureViewVersion)
336297
}

0 commit comments

Comments
 (0)