Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dev/config/api.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
http_port = 7070
region = "local"
redis_url = "redis://redis:6379"
dashboard_token = "dashboard-test-token-123"

[database]
primary = "unkey:password@tcp(mysql:3306)/unkey?parseTime=true"
Expand Down
1 change: 0 additions & 1 deletion gen/proto/ctrl/v1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
"custom_domain.pb.go",
"deployment.pb.go",
"environment.pb.go",
"oneof_interfaces.go",
"openapi.pb.go",
"secrets.pb.go",
"service.pb.go",
Expand Down
15 changes: 0 additions & 15 deletions gen/proto/ctrl/v1/oneof_interfaces.go

This file was deleted.

1 change: 0 additions & 1 deletion gen/proto/hydra/v1/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ go_library(
"deployment_restate.pb.go",
"key_refill.pb.go",
"key_refill_restate.pb.go",
"oneof_interfaces.go",
"quota_check.pb.go",
"quota_check_restate.pb.go",
"routing.pb.go",
Expand Down
6 changes: 0 additions & 6 deletions gen/proto/hydra/v1/oneof_interfaces.go

This file was deleted.

25 changes: 19 additions & 6 deletions internal/services/caches/caches.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package caches

import (
"context"
"fmt"
"os"
"time"
Expand Down Expand Up @@ -43,6 +44,15 @@ type Caches struct {
// dispatcher handles routing of invalidation events to all caches in this process.
// This is not exported as it's an internal implementation detail.
dispatcher *clustering.InvalidationDispatcher

// registry maps cache resource names to invalidation handlers, used by the
// internal cache invalidation endpoint.
registry *middleware.InvalidationRegistry
}

// Invalidate removes the given keys from the named cache.
func (c *Caches) Invalidate(ctx context.Context, cacheName string, keys []string) error {
return c.registry.Invalidate(ctx, cacheName, keys)
}

// Close shuts down the caches and cleans up resources.
Expand Down Expand Up @@ -175,6 +185,8 @@ func New(config Config) (Caches, error) {
}()
}

registry := middleware.NewInvalidationRegistry()

ratelimitNamespace, err := createCache(
cache.Config[cache.ScopedKey, db.FindRatelimitNamespace]{
Fresh: time.Minute,
Expand Down Expand Up @@ -261,12 +273,13 @@ func New(config Config) (Caches, error) {

initialized = true
return Caches{
RatelimitNamespace: middleware.WithTracing(ratelimitNamespace),
LiveApiByID: middleware.WithTracing(liveApiByID),
VerificationKeyByHash: middleware.WithTracing(verificationKeyByHash),
ClickhouseSetting: middleware.WithTracing(clickhouseSetting),
KeyAuthToApiRow: middleware.WithTracing(keyAuthToApiRow),
ApiToKeyAuthRow: middleware.WithTracing(apiToKeyAuthRow),
RatelimitNamespace: middleware.WithInvalidation(middleware.WithTracing(ratelimitNamespace), "ratelimit_namespace", registry, cache.ScopedKeyFromString),
LiveApiByID: middleware.WithInvalidation(middleware.WithTracing(liveApiByID), "live_api_by_id", registry, cache.ScopedKeyFromString),
VerificationKeyByHash: middleware.WithInvalidation(middleware.WithTracing(verificationKeyByHash), "verification_key_by_hash", registry, middleware.StringKeyParser),
ClickhouseSetting: middleware.WithInvalidation(middleware.WithTracing(clickhouseSetting), "clickhouse_setting", registry, middleware.StringKeyParser),
KeyAuthToApiRow: middleware.WithInvalidation(middleware.WithTracing(keyAuthToApiRow), "key_auth_to_api_row", registry, cache.ScopedKeyFromString),
ApiToKeyAuthRow: middleware.WithInvalidation(middleware.WithTracing(apiToKeyAuthRow), "api_to_key_auth_row", registry, cache.ScopedKeyFromString),
dispatcher: dispatcher,
registry: registry,
}, nil
}
33 changes: 8 additions & 25 deletions pkg/cache/clustering/broadcaster_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,19 @@ package clustering

import (
"context"
"sync"
"sync/atomic"

cachev1 "github.com/unkeyed/unkey/gen/proto/cache/v1"
clusterv1 "github.com/unkeyed/unkey/gen/proto/cluster/v1"
"github.com/unkeyed/unkey/pkg/cluster"
"github.com/unkeyed/unkey/pkg/logger"
)

// invalidationHandler wraps the handler func so we can use atomic.Pointer
// (atomic.Pointer requires a named type, not a bare func signature).
type invalidationHandler struct {
fn func(context.Context, *cachev1.CacheInvalidationEvent) error
}

// GossipBroadcaster implements Broadcaster using the gossip cluster for
// cache invalidation. It builds ClusterMessage envelopes with the oneof
// variant directly, avoiding double serialization.
type GossipBroadcaster struct {
cluster cluster.Cluster
handler atomic.Pointer[invalidationHandler]

closeOnce sync.Once
closeErr error
handler func(context.Context, *cachev1.CacheInvalidationEvent) error
}

var _ Broadcaster = (*GossipBroadcaster)(nil)
Expand All @@ -34,18 +23,16 @@ var _ Broadcaster = (*GossipBroadcaster)(nil)
// given cluster instance.
func NewGossipBroadcaster(c cluster.Cluster) *GossipBroadcaster {
return &GossipBroadcaster{
cluster: c,
handler: atomic.Pointer[invalidationHandler]{},
closeOnce: sync.Once{},
closeErr: nil,
cluster: c,
handler: nil,
}
}

// HandleCacheInvalidation is the typed handler for cache invalidation messages.
// Register it with cluster.Subscribe(mux, broadcaster.HandleCacheInvalidation).
func (b *GossipBroadcaster) HandleCacheInvalidation(ci *clusterv1.ClusterMessage_CacheInvalidation) {
if h := b.handler.Load(); h != nil {
if err := h.fn(context.Background(), ci.CacheInvalidation); err != nil {
if b.handler != nil {
if err := b.handler(context.Background(), ci.CacheInvalidation); err != nil {
logger.Error("Failed to handle gossip cache event", "error", err)
}
}
Expand All @@ -67,14 +54,10 @@ func (b *GossipBroadcaster) Broadcast(_ context.Context, events ...*cachev1.Cach
// Subscribe sets the single handler for incoming invalidation events.
// Calling Subscribe again replaces the previous handler.
func (b *GossipBroadcaster) Subscribe(_ context.Context, handler func(context.Context, *cachev1.CacheInvalidationEvent) error) {
b.handler.Store(&invalidationHandler{fn: handler})
b.handler = handler
}

// Close shuts down the underlying cluster. It is safe to call multiple times;
// only the first call closes the cluster, subsequent calls return the original result.
// Close shuts down the underlying cluster.
func (b *GossipBroadcaster) Close() error {
b.closeOnce.Do(func() {
b.closeErr = b.cluster.Close()
})
return b.closeErr
return b.cluster.Close()
}
5 changes: 4 additions & 1 deletion pkg/cache/middleware/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@rules_go//go:def.bzl", "go_library")

go_library(
name = "middleware",
srcs = ["tracing.go"],
srcs = [
"invalidation.go",
"tracing.go",
],
importpath = "github.com/unkeyed/unkey/pkg/cache/middleware",
visibility = ["//visibility:public"],
deps = [
Expand Down
79 changes: 79 additions & 0 deletions pkg/cache/middleware/invalidation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package middleware

import (
"context"
"fmt"
"sync"

"github.com/unkeyed/unkey/pkg/cache"
)

// InvalidationRegistry maps cache resource names to functions that can remove
// entries by their string-encoded keys. Built once during cache construction;
// used by the API's cache invalidation endpoint.
type InvalidationRegistry struct {
mu sync.RWMutex
handlers map[string]func(ctx context.Context, keys []string) error
}

// NewInvalidationRegistry creates an empty registry.
func NewInvalidationRegistry() *InvalidationRegistry {
return &InvalidationRegistry{
mu: sync.RWMutex{},
handlers: make(map[string]func(ctx context.Context, keys []string) error),
}
}

// register adds an invalidation handler for the given cache name.
func (r *InvalidationRegistry) register(name string, fn func(ctx context.Context, keys []string) error) {
r.mu.Lock()
defer r.mu.Unlock()
r.handlers[name] = fn
}

// Invalidate removes the given keys from the named cache.
// Returns an error if the cache name is not registered.
func (r *InvalidationRegistry) Invalidate(ctx context.Context, cacheName string, keys []string) error {
r.mu.RLock()
fn, ok := r.handlers[cacheName]
r.mu.RUnlock()

if !ok {
return fmt.Errorf("unknown cache name: %s", cacheName)
}
return fn(ctx, keys)
}

// WithInvalidation registers a cache for string-key-based invalidation
// and returns the cache unchanged. This is not a wrapping middleware —
// it only registers a side-channel so the cache can be invalidated by
// name through the registry.
//
// parseKey converts the raw string key (from the HTTP request) into the
// cache's typed key K. For string-keyed caches, use StringKeyParser.
// For ScopedKey caches, use cache.ParseScopedKey.
func WithInvalidation[K comparable, V any](
c cache.Cache[K, V],
name string,
registry *InvalidationRegistry,
parseKey func(string) (K, error),
) cache.Cache[K, V] {
registry.register(name, func(ctx context.Context, keys []string) error {
parsed := make([]K, len(keys))
for i, s := range keys {
k, err := parseKey(s)
if err != nil {
return err
}
parsed[i] = k
}
c.Remove(ctx, parsed...)
return nil
})
return c
}

// StringKeyParser is a key parser for caches keyed by plain strings.
func StringKeyParser(s string) (string, error) {
return s, nil
}
4 changes: 2 additions & 2 deletions pkg/cluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "cluster",
srcs = [
"bridge.go",
"ambassador.go",
"cluster.go",
"config.go",
"delegate_lan.go",
Expand All @@ -27,7 +27,7 @@ go_library(
go_test(
name = "cluster_test",
srcs = [
"bridge_test.go",
"ambassador_test.go",
"cluster_test.go",
"mux_test.go",
],
Expand Down
Loading
Loading