Skip to content

Commit 907a106

Browse files
committed
feat(catalog): support for multiple catalog pods
Enables horizontal scaling through PostgreSQL-based leader election that coordinates database writes across multiple pods. All pods serve read requests from in-memory data and database queries. The leader alone performs database writes: fetches models, writes updates, and cleans up orphaned data. Leadership transfers automatically when the leader fails. Implementation: - Leader election package using pglock for distributed locking - Loader split into StartReadOnly() and StartLeader() modes - Configuration: CATALOG_LEADER_LOCK_DURATION and CATALOG_LEADER_HEARTBEAT environment variables - Integration tests for multi-pod scenarios Signed-off-by: Paul Boyd <paul@pboyd.io>
1 parent e580f2f commit 907a106

File tree

11 files changed

+1236
-79
lines changed

11 files changed

+1236
-79
lines changed

catalog/cmd/catalog.go

Lines changed: 145 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,25 @@ package cmd
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"net/http"
8+
"os"
9+
"os/signal"
710
"reflect"
11+
"syscall"
812
"time"
913

1014
"github.com/golang/glog"
1115
"github.com/kubeflow/model-registry/catalog/internal/catalog"
1216
"github.com/kubeflow/model-registry/catalog/internal/db/models"
1317
"github.com/kubeflow/model-registry/catalog/internal/db/service"
18+
"github.com/kubeflow/model-registry/catalog/internal/leader"
1419
"github.com/kubeflow/model-registry/catalog/internal/server/openapi"
1520
"github.com/kubeflow/model-registry/internal/datastore"
1621
"github.com/kubeflow/model-registry/internal/datastore/embedmd"
1722
"github.com/spf13/cobra"
23+
"golang.org/x/sync/errgroup"
1824
)
1925

2026
var catalogCfg = struct {
@@ -27,6 +33,44 @@ var catalogCfg = struct {
2733
PerformanceMetricsPath: []string{},
2834
}
2935

36+
const (
37+
leaderLockName = "catalog-leader"
38+
39+
defaultLeaderLockDuration = 60 * time.Second
40+
defaultLeaderHeartbeat = 15 * time.Second
41+
42+
envLeaderLockDuration = "CATALOG_LEADER_LOCK_DURATION"
43+
envLeaderHeartbeat = "CATALOG_LEADER_HEARTBEAT"
44+
)
45+
46+
// parseDurationEnv parses a duration from an environment variable,
47+
// falling back to a default value if unset or invalid.
48+
func parseDurationEnv(envName string, defaultVal time.Duration) time.Duration {
49+
if envVal := os.Getenv(envName); envVal != "" {
50+
if parsed, err := time.ParseDuration(envVal); err == nil {
51+
glog.Infof("Using %s: %v", envName, parsed)
52+
return parsed
53+
}
54+
glog.Warningf("Invalid %s value %q, using default %v", envName, envVal, defaultVal)
55+
}
56+
return defaultVal
57+
}
58+
59+
// getLeaderElectionConfig reads leader election configuration from environment
60+
// variables, falling back to defaults when unset or invalid.
61+
func getLeaderElectionConfig() (lockDuration, heartbeat time.Duration) {
62+
lockDuration = parseDurationEnv(envLeaderLockDuration, defaultLeaderLockDuration)
63+
heartbeat = parseDurationEnv(envLeaderHeartbeat, defaultLeaderHeartbeat)
64+
65+
// Validate pglock requirement: heartbeat <= lockDuration/2
66+
if heartbeat > lockDuration/2 {
67+
glog.Warningf("Heartbeat (%v) exceeds half of lock duration (%v), required by pglock. Using defaults.", heartbeat, lockDuration)
68+
return defaultLeaderLockDuration, defaultLeaderHeartbeat
69+
}
70+
71+
return lockDuration, heartbeat
72+
}
73+
3074
var CatalogCmd = &cobra.Command{
3175
Use: "catalog",
3276
Short: "Catalog API server",
@@ -81,11 +125,23 @@ func runCatalogServer(cmd *cobra.Command, args []string) error {
81125
return nil
82126
})
83127

84-
err = loader.Start(context.Background())
85-
if err != nil {
86-
return fmt.Errorf("error loading catalog sources: %v", err)
128+
ctx, cancel := context.WithCancel(context.Background())
129+
defer cancel()
130+
131+
sigCh := make(chan os.Signal, 1)
132+
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
133+
go func() {
134+
sig := <-sigCh
135+
glog.Infof("Received signal %v, initiating graceful shutdown", sig)
136+
cancel()
137+
}()
138+
139+
glog.Info("Starting loader in read-only mode (standby)")
140+
if err := loader.StartReadOnly(ctx); err != nil {
141+
return fmt.Errorf("error starting loader in read-only mode: %v", err)
87142
}
88143

144+
// Set up HTTP server (runs continuously regardless of leadership)
89145
svc := openapi.NewModelCatalogServiceAPIService(
90146
catalog.NewDBCatalog(services, loader.Sources),
91147
loader.Sources,
@@ -94,8 +150,92 @@ func runCatalogServer(cmd *cobra.Command, args []string) error {
94150
)
95151
ctrl := openapi.NewModelCatalogServiceAPIController(svc)
96152

97-
glog.Infof("Catalog API server listening on %s", catalogCfg.ListenAddress)
98-
return http.ListenAndServe(catalogCfg.ListenAddress, openapi.NewRouter(ctrl))
153+
server := &http.Server{
154+
Addr: catalogCfg.ListenAddress,
155+
Handler: openapi.NewRouter(ctrl),
156+
}
157+
158+
g, gctx := errgroup.WithContext(ctx)
159+
160+
// HTTP server goroutine
161+
g.Go(func() error {
162+
glog.Infof("Catalog API server listening on %s", catalogCfg.ListenAddress)
163+
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
164+
return fmt.Errorf("HTTP server failed: %w", err)
165+
}
166+
return nil
167+
})
168+
169+
// HTTP server shutdown goroutine
170+
g.Go(func() error {
171+
<-gctx.Done()
172+
glog.Info("Shutting down HTTP server...")
173+
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
174+
defer shutdownCancel()
175+
if err := server.Shutdown(shutdownCtx); err != nil {
176+
glog.Errorf("HTTP server shutdown error: %v", err)
177+
}
178+
return nil
179+
})
180+
181+
gormDB, err := ds.DB()
182+
if err != nil {
183+
return fmt.Errorf("error getting database connection: %w", err)
184+
}
185+
186+
lockDuration, heartbeat := getLeaderElectionConfig()
187+
glog.Infof("Leader election configured: lock duration=%v, heartbeat=%v", lockDuration, heartbeat)
188+
189+
elector, err := leader.NewLeaderElector(
190+
gormDB,
191+
ctx,
192+
leaderLockName,
193+
lockDuration,
194+
heartbeat,
195+
func(leaderCtx context.Context) {
196+
glog.Info("Became leader - starting leader-only operations")
197+
198+
// Monitor leaderCtx in separate goroutine and call StopLeader when lost
199+
go func() {
200+
<-leaderCtx.Done()
201+
glog.Info("Lost leadership, stopping leader operations...")
202+
if err := loader.StopLeader(); err != nil {
203+
glog.Errorf("Error stopping leader: %v", err)
204+
}
205+
}()
206+
207+
// StartLeader blocks until StopLeader is called or ctx cancels
208+
// Pass program context (ctx), NOT leaderCtx
209+
if err := loader.StartLeader(ctx); err != nil && !errors.Is(err, context.Canceled) {
210+
glog.Errorf("StartLeader exited with error: %v", err)
211+
}
212+
213+
glog.Info("Leader callback complete")
214+
},
215+
)
216+
if err != nil {
217+
return fmt.Errorf("error creating leader elector: %w", err)
218+
}
219+
220+
// Leader elector goroutine
221+
g.Go(func() error {
222+
if err := elector.Run(ctx); err != nil {
223+
return fmt.Errorf("leader elector failed: %w", err)
224+
}
225+
return nil
226+
})
227+
228+
// Wait for all goroutines and collect errors
229+
errs := []error{}
230+
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
231+
errs = append(errs, err)
232+
}
233+
234+
if err := loader.Shutdown(); err != nil {
235+
errs = append(errs, fmt.Errorf("loader shutdown error: %w", err))
236+
}
237+
238+
return errors.Join(errs...)
99239
}
100240

101241
func getRepo[T any](repoSet datastore.RepoSet) T {

catalog/cmd/catalog_config_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package cmd
2+
3+
import (
4+
"os"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestGetLeaderElectionConfig(t *testing.T) {
10+
tests := []struct {
11+
name string
12+
lockDurationEnv string
13+
heartbeatEnv string
14+
expectedLockDuration time.Duration
15+
expectedHeartbeat time.Duration
16+
}{
17+
{
18+
name: "defaults when no env vars set",
19+
lockDurationEnv: "",
20+
heartbeatEnv: "",
21+
expectedLockDuration: defaultLeaderLockDuration,
22+
expectedHeartbeat: defaultLeaderHeartbeat,
23+
},
24+
{
25+
name: "custom valid values",
26+
lockDurationEnv: "30s",
27+
heartbeatEnv: "10s",
28+
expectedLockDuration: 30 * time.Second,
29+
expectedHeartbeat: 10 * time.Second,
30+
},
31+
{
32+
name: "fast failover for local dev",
33+
lockDurationEnv: "10s",
34+
heartbeatEnv: "3s",
35+
expectedLockDuration: 10 * time.Second,
36+
expectedHeartbeat: 3 * time.Second,
37+
},
38+
{
39+
name: "long lease for production",
40+
lockDurationEnv: "120s",
41+
heartbeatEnv: "30s",
42+
expectedLockDuration: 120 * time.Second,
43+
expectedHeartbeat: 30 * time.Second,
44+
},
45+
{
46+
name: "invalid lock duration uses default lock, keeps valid heartbeat",
47+
lockDurationEnv: "invalid",
48+
heartbeatEnv: "10s",
49+
expectedLockDuration: defaultLeaderLockDuration, // falls back to default
50+
expectedHeartbeat: 10 * time.Second, // uses parsed value
51+
},
52+
{
53+
name: "invalid heartbeat uses default heartbeat, keeps valid lock",
54+
lockDurationEnv: "30s",
55+
heartbeatEnv: "invalid",
56+
expectedLockDuration: 30 * time.Second, // uses parsed value
57+
expectedHeartbeat: defaultLeaderHeartbeat, // falls back to default
58+
},
59+
{
60+
name: "heartbeat exceeds lock duration/2 uses defaults",
61+
lockDurationEnv: "30s",
62+
heartbeatEnv: "20s", // 20s > 15s (half of 30s)
63+
expectedLockDuration: defaultLeaderLockDuration,
64+
expectedHeartbeat: defaultLeaderHeartbeat,
65+
},
66+
{
67+
name: "heartbeat exactly at lock duration/2 is valid",
68+
lockDurationEnv: "30s",
69+
heartbeatEnv: "15s",
70+
expectedLockDuration: 30 * time.Second,
71+
expectedHeartbeat: 15 * time.Second,
72+
},
73+
}
74+
75+
for _, tt := range tests {
76+
t.Run(tt.name, func(t *testing.T) {
77+
// Clear environment
78+
os.Unsetenv(envLeaderLockDuration)
79+
os.Unsetenv(envLeaderHeartbeat)
80+
81+
// Set test environment
82+
if tt.lockDurationEnv != "" {
83+
os.Setenv(envLeaderLockDuration, tt.lockDurationEnv)
84+
defer os.Unsetenv(envLeaderLockDuration)
85+
}
86+
if tt.heartbeatEnv != "" {
87+
os.Setenv(envLeaderHeartbeat, tt.heartbeatEnv)
88+
defer os.Unsetenv(envLeaderHeartbeat)
89+
}
90+
91+
// Get configuration
92+
lockDuration, heartbeat := getLeaderElectionConfig()
93+
94+
// Verify
95+
if lockDuration != tt.expectedLockDuration {
96+
t.Errorf("lock duration = %v, want %v", lockDuration, tt.expectedLockDuration)
97+
}
98+
if heartbeat != tt.expectedHeartbeat {
99+
t.Errorf("heartbeat = %v, want %v", heartbeat, tt.expectedHeartbeat)
100+
}
101+
102+
// Verify pglock requirement: heartbeat <= lockDuration/2
103+
if heartbeat > lockDuration/2 {
104+
t.Errorf("heartbeat (%v) exceeds half of lock duration (%v), violates pglock requirement", heartbeat, lockDuration)
105+
}
106+
})
107+
}
108+
}

0 commit comments

Comments
 (0)