Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
1 Skipped Deployment
|
|
Caution Review failedThe pull request is closed. 📝 WalkthroughWalkthroughThis pull request replaces Kafka-based distributed cache invalidation with a gossip-based cluster membership system using HashiCorp memberlist. Changes include introducing a new cluster package implementing two-tier LAN/WAN gossip with automatic ambassador election, updating cache clustering to use a Broadcaster interface, removing the eventstream infrastructure, and wiring gossip configuration across API, Frontline, and Sentinel services with corresponding CLI flags and Kubernetes manifests. Changes
Sequence Diagram(s)sequenceDiagram
participant Node1 as Node 1<br/>(API Instance)
participant LAN1 as LAN Pool<br/>(memberlist)
participant Node2 as Node 2<br/>(API Instance)
participant LAN2 as LAN Pool<br/>(memberlist)
participant WAN as WAN Pool<br/>(Ambassador)
rect rgba(100, 150, 200, 0.5)
Note over Node1,Node2: Same Region (LAN) Invalidation
Node1->>LAN1: Broadcast(CacheInvalidation)
LAN1->>Node2: NotifyMsg(ClusterMessage)
Note over Node2: Deserialize & Apply<br/>Cache Invalidation
end
rect rgba(150, 100, 200, 0.5)
Note over Node1,WAN: Inter-Region (WAN) Invalidation
Node1->>LAN1: Broadcast(CacheInvalidation)
LAN1->>WAN: Bridge relays to WAN<br/>(direction=DIRECTION_WAN)
WAN->>LAN2: Ambassador notifies<br/>remote LAN pool
LAN2->>Node2: NotifyMsg(ClusterMessage)
Note over Node2: Deserialize & Apply<br/>Cache Invalidation
end
sequenceDiagram
participant App as Service Start
participant Cluster as cluster.New()
participant LAN as LAN Memberlist
participant Seeds as LAN Seeds
participant Bridge as Bridge Eval Loop
participant WAN as WAN Memberlist
participant WanSeeds as WAN Seeds
App->>Cluster: New(cfg Config)
activate Cluster
Cluster->>LAN: Create with DefaultLANConfig
Cluster->>LAN: Add Delegate & EventDelegate
Cluster->>LAN: Create TransmitLimitedQueue
Cluster->>Bridge: Start bridgeEvalLoop goroutine
Cluster->>Seeds: joinSeeds(LANSeeds)
activate Seeds
Seeds->>LAN: Join with backoff/retry
Seeds-->>Cluster: Success callback
deactivate Seeds
Note over Bridge: Periodic evaluation
Bridge->>LAN: Get smallest member by name
alt Is this node smallest?
Bridge->>WAN: promoteToBridge
activate WAN
WAN->>WAN: Create with DefaultWANConfig
WAN->>WAN: Add WAN delegate
WAN->>WanSeeds: joinSeeds(WANSeeds)
WanSeeds->>WAN: Join with backoff
WAN-->>Bridge: Success
deactivate WAN
else Is not smallest
Bridge->>WAN: demoteFromBridge (if currently bridge)
end
Cluster-->>App: Return Cluster instance
deactivate Cluster
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes 🚥 Pre-merge checks | ✅ 1 | ❌ 3❌ Failed checks (2 warnings, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@internal/services/caches/caches.go`:
- Around line 168-249: The dispatcher created in New() must be closed when any
subsequent cache creation fails to avoid leaking resources: after the dispatcher
is successfully created (variable name dispatcher in New), ensure you call
dispatcher.Close() on every early return that follows (e.g., every "return
Caches{}, err" that occurs after calls to createCache such as when building
ratelimitNamespace, verificationKeyByHash, liveApiByID, clickhouseSetting,
keyAuthToApiRow, apiToKeyAuthRow, etc.), or preferably add a deferred cleanup
like "defer func(){ if !initialized { dispatcher.Close() } }()" immediately
after creating dispatcher and set initialized=true only on the final successful
return; update all error paths accordingly so dispatcher.Close() runs on
failure.
In `@pkg/cache/clustering/broadcaster_gossip.go`:
- Around line 60-63: GossipBroadcaster.Close currently forwards to
b.cluster.Close but ownership is ambiguous and can result in double-close;
modify GossipBroadcaster to make Close idempotent by adding a sync.Once (or
equivalent boolean + mutex) on the GossipBroadcaster struct and invoke
b.cluster.Close inside that Once, or clearly transfer/document ownership so only
one caller closes the cluster (e.g., remove cluster.Close from
GossipBroadcaster.Close if run.go defers closing); update the Close method on
GossipBroadcaster to use the Once/guard and ensure subsequent Close calls return
nil (or the original error) without calling cluster.Close again.
In `@svc/frontline/services/caches/caches.go`:
- Around line 104-160: When
clustering.NewInvalidationDispatcher(config.Broadcaster) succeeds but a
subsequent createCache call fails, the dispatcher is leaked; update the New()
path to call dispatcher.Close() (or dispatcher.Close(context?) depending on its
API) before each early return after dispatcher initialization (i.e., before each
fmt.Errorf return after createCache for frontlineRoute, sentinelsByEnvironment,
tlsCertificate). Guard the Close call with a nil check on dispatcher and ensure
you preserve the original returned error; do the same for any other early
returns in this function after dispatcher was set.
🧹 Nitpick comments (6)
svc/krane/internal/sentinel/apply.go (2)
392-446: Multiple gossip services with identical selectors per environment.Each sentinel creates its own gossip service (
<k8sName>-gossip-lan) but the selector matches ALL sentinels in the environment viaEnvironmentID + ComponentSentinel. This means multiple headless services will resolve to the same set of pods.While this works (DNS will resolve any of them to the same pod IPs), it creates redundant services. Consider either:
- Use a single environment-scoped gossip service name (idempotent across sentinels)
- Keep per-sentinel services but scope the selector to that sentinel
This isn't blocking since it functions correctly, but adds unnecessary resources.
448-524: Same redundancy applies to CiliumNetworkPolicy.Similar to the gossip service, each sentinel creates its own policy with the same environment-scoped selector. Multiple policies with identical selectors are functionally equivalent but redundant.
pkg/cache/clustering/gossip_e2e_test.go (1)
54-55: Magic sleep may be fragile.The 50ms sleep before node 2 creation appears to be a timing workaround. Consider documenting why this is needed or using a more deterministic approach (e.g., waiting for node 1 to be ready to accept connections).
dev/k8s/manifests/api.yaml (1)
78-84: Consider adding UNKEY_GOSSIP_BIND_ADDR.Gossip enabled but bind address not specified. If the default (likely
0.0.0.0or pod IP) is intentional, this is fine, but explicit config aids clarity.svc/sentinel/services/router/service.go (1)
45-82: Consider extractingclusterOptsandcreateCacheto a shared package.This pattern is duplicated in
svc/frontline/services/caches/caches.go. Could be a shared helper inpkg/cache/clustering.pkg/cache/clustering/broadcaster_gossip.go (1)
31-39: Handler invocation usescontext.Background()instead of propagating context.The handler signature accepts a context, but
HandleCacheInvalidationalways passescontext.Background(). Consider storing the subscription context or accepting context as a parameter if cancellation/deadline propagation is needed.
chronark
left a comment
There was a problem hiding this comment.
maybe reorder the proto fields, but it's not super important
What does this PR do?
Adds a specific gossip implementation that would work for us - in theory.
We have 2 seperate gossip memberlists, one for intra cluster messages and one for cross region messages.
The idea is to:
Have a single node be the broadcast who talks to other clusters meaning
We publish a message in us-east-1 one of our 3 nodes will send it to eu-central-1 and that itself will distribute the message to its local members.
That way we dont need everyone to know about everyone and keep latency shit for only a single req across the globe
Type of change
How should this be tested?
Checklist
Required
pnpm buildpnpm fmtmake fmton/godirectoryconsole.logsgit pull origin mainAppreciated