Fix ZADD-based cache key resolution and parallelize subscription startup#4790
Fix ZADD-based cache key resolution and parallelize subscription startup#4790denis-chernov-smartcontract wants to merge 9 commits intomainfrom
Conversation
|
ro-tex
left a comment
There was a problem hiding this comment.
LGTM.
I have some non-blocking comments but those can be satisfied in subsequent PRs.
| epLock := km.getEndpointLock(endpointTransport) | ||
| epLock.Lock() | ||
| defer epLock.Unlock() |
There was a problem hiding this comment.
nit: A minor performance optimisation would be to move this locking operation after the check whether we already got the key.
| // endpoint alias can be found. | ||
| func findEndpointInKey(key string) (string, error) { | ||
| if activeAliasIndex == nil { | ||
| if activeIndex == nil { |
There was a problem hiding this comment.
I don't see a unit test for this func and it would be great to have one.
| km.mu.Unlock() | ||
| km.logger.Debug("Learned key mapping", "rawKey", rawCacheKey, "transformedKey", transformedKey) | ||
| } | ||
| case <-time.After(2 * time.Second): |
There was a problem hiding this comment.
All hard-coded numbers should be named constants. This will allow us to easily clean them up later and turn them into configuration options.
| if _, ok := km.Get(rawCacheKey); ok { | ||
| subscribeFn() | ||
| return | ||
| } |
There was a problem hiding this comment.
We return if we already have a mapping for this raw key. We don't return if we don't have a mapping but we're already waiting for one, thus allowing us to amass a number of parallel waiting goroutines.
I am not sure how plausible this situation is, but if it is plausible, I would suggest refreshing the timeout period on the wait each time we get a hit on a raw key that we're waiting on and then returning.
Feel free to ignore if your assessment is that this cannot cause a considerable draw on resources - there's no need to do extra work if the max number of parallel waiting threads will be under, say, 50 or 100.
There was a problem hiding this comment.
This looks like it's already being handled by the alreadySubscribing := s.subscriptionTracker.LoadOrStore() part. Please mark as resolved, if so.
| // Phase 2: Learn the raw→transformed cache key mapping. | ||
| // Serialized per endpoint via KeyMapper's existing mutex. | ||
| // The re-subscribe is fast because the JS adapter already has | ||
| // the subscription active from phase 1. | ||
| s.keyMapper.SubscribeAndLearn(params["endpoint"], key, func() { | ||
| s.subscribeToAsset(originalData) | ||
| }) |
There was a problem hiding this comment.
Are we starting to listen for the subscription event after we've issued it? I might be misreading this.
|
|
||
| // Remove from tracker after subscription attempt completes. | ||
| // Allow retries after 10 seconds if data still not available. | ||
| time.Sleep(10 * time.Second) |
There was a problem hiding this comment.
Same comment about magic numbers which should be named constants.
Closes #DS-2819
Description
Summary
Replaces fragile Go-side replication of JS adapter parameter transforms with dynamic cache key learning via ZADD interception
Parallelizes subscription startup to reduce cold-start time from ~25 minutes to seconds for ~300 feeds
Fixes overrides handling so payloads with overrides are forwarded intact to the JS adapter
Background
The Go streams-adapter acts as a caching proxy in front of a JS adapter. When a client requests data, the Go layer looks up the result in cache using a deterministic key derived from request parameters. When the JS adapter stores data, it writes to the Redcon server using keys that reflect its own internal parameter transformations (param aliases like from→base, per-adapter overrides like IDR→indodaxidr, custom transforms like CFBenchmarks index construction).
The problem: The Go layer was trying to replicate all of the JS adapter's transformation logic to compute matching cache keys. This was incomplete, fragile, and produced mismatches whenever the JS transforms diverged from the Go reimplementation.
The second problem: Subscriptions were processed serially — one asset at a time with a ~5 second round-trip each. With ~300 feeds at startup, this took ~25 minutes before data was available.
How the new approach works
KeyMapper: dynamic cache key learning
The new KeyMapper component (helpers/keymapper.go) learns the mapping between Go-computed "raw" cache keys and JS-computed "transformed" cache keys at runtime:
Raw cache key — computed by the Go layer with only endpoint alias resolution (e.g., "crypto"→"price"). No param aliases, no overrides, no custom transforms.
Transformed cache key — derived from the ZADD member the JS adapter writes to its *-subscriptionSet sorted set during subscription processing.
Learning flow — On cache miss, SubscribeAndLearn sends a subscription request to the JS adapter. The JS adapter processes it and writes a ZADD. The Redcon handleZAdd handler intercepts this and calls KeyMapper.NotifyZAdd(), delivering the transformed params via a channel. The KeyMapper computes the transformed cache key and stores the raw→transformed mapping.
Lookup flow — On subsequent requests, the handler computes the raw key, looks up the transformed key via KeyMapper.Get(), and retrieves the cached observation. A direct raw-key fallback handles the common case where no transformation occurs.
Two-phase subscription (concurrency)
On cache miss, the adapter runs two phases in a background goroutine:
Phase 1 — Immediately sends a subscription request with the original client data (including overrides). Fully concurrent — all feeds fire at once. The JS adapter queues them internally.
Phase 2 — Calls SubscribeAndLearn, which is serialized per endpoint via a per-endpoint mutex. It re-sends the subscription (a no-op since the feed is already active from Phase 1) and waits for the ZADD notification to learn the key mapping. Serialization ensures reliable attribution of ZADD notifications.
Overrides handling
subscribeToAsset now accepts interface{} instead of RequestParams, forwarding the original client request data (including overrides) intact to the JS adapter. The Go layer strips overrides only for its own cache key computation.
Changes
New files
helpers/keymapper.goKeyMapperstruct: learns and stores raw→transformed cache key mappings via ZADD interceptionhelpers/keymapper_test.goDeleted files
helpers/transforms.gocfbenchmarksTransform). No longer needed — JS transforms are learned dynamically via ZADD.Modified files
server/server.goadapterHandler: KeyMapper-based cache lookups, two-phase subscription goroutine,subscribeToAssetacceptsinterface{}for overrides passthrough. AddedGET /cachedebug endpoint.helpers/aliases.goparamAlias,requiredParams, override handling,adapterTransformsintegration. Structs reduced from 6 to 2.cache/cache.goGet()now accepts a string key and returns*CacheItem(wasRequestParams→*Observation). Removed unusedKeys(),Size()methods andCacheinterface assertion.common/types.goCache,Config,Serverinterfaces andRawParamsfield fromCacheItem.redcon/redcon.goKeyMapperfield.handleZAddnow callsKeyMapper.NotifyZAdd()for subscription set writes. Removed unusedStop()method andserverfield. UnexportedSortedSetMember.main.goKeyMapperand wires it to bothServerandRedconServer.metrics/metrics.gohelpers/helpers.gonormalizeString(only used within package).generate-endpoint-aliases/index.tsTest changes
cache/cache_test.go— Updated for newGet(string)signature; replacedSize()/Keys()calls withItems().aliases_test.go— Removed param alias, override, and transform tests (logic removed). SimplifiedsampleConfig.helpers_test.go— RemovedTestRequestParamsFromKey_ParamAliasesResolved.server/server_test.go— UpdatedNew()call signature (now takesKeyMapper).redcon/redcon_test.go— Removed unusedlastWritetest helper.