Skip to content

Commit 7f80686

Browse files
feat(proxy): introduce datastore abstraction (kubeflow#425)
* feat(proxy): introduce datastore abstraction Signed-off-by: Alessio Pragliola <[email protected]> * chore(proxy): apply code suggestions Co-authored-by: Ramesh Reddy <[email protected]> Signed-off-by: Alessio Pragliola <[email protected]> --------- Signed-off-by: Alessio Pragliola <[email protected]> Co-authored-by: Ramesh Reddy <[email protected]>
1 parent fff97f5 commit 7f80686

File tree

11 files changed

+216
-80
lines changed

11 files changed

+216
-80
lines changed

cmd/proxy.go

Lines changed: 51 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -4,110 +4,98 @@ import (
44
"fmt"
55
"net/http"
66
"sync"
7-
"time"
87

98
"github.com/golang/glog"
10-
"github.com/kubeflow/model-registry/internal/mlmdtypes"
9+
10+
"github.com/kubeflow/model-registry/internal/datastore"
1111
"github.com/kubeflow/model-registry/internal/proxy"
1212
"github.com/kubeflow/model-registry/internal/server/openapi"
13-
"github.com/kubeflow/model-registry/pkg/core"
1413
"github.com/spf13/cobra"
15-
"google.golang.org/grpc"
16-
"google.golang.org/grpc/codes"
17-
"google.golang.org/grpc/credentials/insecure"
18-
"google.golang.org/grpc/status"
1914
)
2015

16+
type ProxyConfig struct {
17+
Datastore datastore.Datastore
18+
}
19+
2120
const (
22-
// mlmdUnavailableMessage is the message returned when the MLMD server is down or unavailable.
23-
mlmdUnavailableMessage = "MLMD server is down or unavailable. Please check that the database is reachable and try again later."
24-
// maxGRPCRetryAttempts is the maximum number of attempts to retry GRPC requests to the MLMD server.
25-
maxGRPCRetryAttempts = 25 // 25 attempts with incremental backoff (1s, 2s, 3s, ..., 25s) it's ~5 minutes
21+
// datastoreUnavailableMessage is the message returned when the datastore service is down or unavailable.
22+
datastoreUnavailableMessage = "Datastore service is down or unavailable. Please check that the database is reachable and try again later."
2623
)
2724

28-
// proxyCmd represents the proxy command
29-
var proxyCmd = &cobra.Command{
30-
Use: "proxy",
31-
Short: "Starts the ml-metadata go OpenAPI proxy",
32-
Long: `This command launches the ml-metadata go OpenAPI proxy server.
33-
34-
The server connects to a mlmd CPP server. It supports options to customize the
35-
hostname and port where it listens.'`,
36-
RunE: runProxyServer,
37-
}
25+
var (
26+
proxyCfg = ProxyConfig{
27+
Datastore: datastore.Datastore{
28+
Type: "mlmd",
29+
},
30+
}
31+
32+
// proxyCmd represents the proxy command
33+
proxyCmd = &cobra.Command{
34+
Use: "proxy",
35+
Short: "Starts the go OpenAPI proxy server to connect to a metadata store",
36+
Long: `This command launches the go OpenAPI proxy server.
37+
38+
The server connects to a metadata store, currently only MLMD is supported. It supports options to customize the
39+
hostname and port where it listens.`,
40+
RunE: runProxyServer,
41+
}
42+
)
3843

3944
func runProxyServer(cmd *cobra.Command, args []string) error {
40-
var conn *grpc.ClientConn
45+
var (
46+
ds datastore.Connector
47+
wg sync.WaitGroup
48+
)
4149

4250
router := proxy.NewDynamicRouter()
4351

4452
router.SetRouter(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
45-
http.Error(w, mlmdUnavailableMessage, http.StatusServiceUnavailable)
53+
http.Error(w, datastoreUnavailableMessage, http.StatusServiceUnavailable)
4654
}))
4755

4856
errChan := make(chan error, 1)
49-
var wg sync.WaitGroup
57+
5058
wg.Add(2)
59+
5160
go func() {
5261
defer close(errChan)
5362
wg.Wait()
5463
}()
5564

56-
// Start the connection to the MLMD server in a separate goroutine, so that
65+
// Start the connection to the Datastore server in a separate goroutine, so that
5766
// we can start the proxy server and start serving requests while we wait
5867
// for the connection to be established.
5968
go func() {
69+
var (
70+
err error
71+
)
72+
6073
defer wg.Done()
6174

62-
mlmdAddr := fmt.Sprintf("%s:%d", proxyCfg.MLMDHostname, proxyCfg.MLMDPort)
63-
glog.Infof("connecting to MLMD server %s..", mlmdAddr)
64-
var err error
65-
conn, err = grpc.NewClient(mlmdAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
75+
ds, err = datastore.NewConnector(proxyCfg.Datastore)
6676
if err != nil {
67-
errChan <- fmt.Errorf("error dialing connection to mlmd server %s: %w", mlmdAddr, err)
77+
errChan <- fmt.Errorf("error creating datastore: %w", err)
6878
return
6979
}
7080

71-
mlmdTypeNamesConfig := mlmdtypes.NewMLMDTypeNamesConfigFromDefaults()
72-
73-
// Backoff and retry GRPC requests to the MLMD server, until the server
74-
// becomes available or the maximum number of attempts is reached.
75-
for i := 0; i < maxGRPCRetryAttempts; i++ {
76-
_, err = mlmdtypes.CreateMLMDTypes(conn, mlmdTypeNamesConfig)
77-
if err == nil {
78-
break
79-
}
80-
81-
st, ok := status.FromError(err)
82-
if !ok || st.Code() != codes.Unavailable {
83-
errChan <- fmt.Errorf("error creating MLMD types: %w", err)
84-
return
85-
}
86-
87-
time.Sleep(time.Duration(i+1) * time.Second)
88-
}
89-
90-
service, err := core.NewModelRegistryService(conn, mlmdTypeNamesConfig)
81+
conn, err := ds.Connect()
9182
if err != nil {
92-
errChan <- fmt.Errorf("error creating core service: %w", err)
93-
83+
errChan <- fmt.Errorf("error connecting to datastore: %w", err)
9484
return
9585
}
9686

97-
ModelRegistryServiceAPIService := openapi.NewModelRegistryServiceAPIService(service)
87+
ModelRegistryServiceAPIService := openapi.NewModelRegistryServiceAPIService(conn)
9888
ModelRegistryServiceAPIController := openapi.NewModelRegistryServiceAPIController(ModelRegistryServiceAPIService)
9989

10090
router.SetRouter(openapi.NewRouter(ModelRegistryServiceAPIController))
101-
102-
glog.Infof("connected to MLMD server")
10391
}()
10492

10593
// Start the proxy server in a separate goroutine so that we can handle
106-
// errors from both the proxy server and the connection to the MLMD server.
94+
// errors from both the proxy server and the connection to the Datastore server.
10795
go func() {
10896
defer wg.Done()
10997

110-
glog.Infof("proxy server started at %s:%v", cfg.Hostname, cfg.Port)
98+
glog.Infof("Proxy server started at %s:%v", cfg.Hostname, cfg.Port)
11199

112100
err := http.ListenAndServe(fmt.Sprintf("%s:%d", cfg.Hostname, cfg.Port), router)
113101
if err != nil {
@@ -116,35 +104,24 @@ func runProxyServer(cmd *cobra.Command, args []string) error {
116104
}()
117105

118106
defer func() {
119-
if conn != nil {
120-
glog.Info("closing connection to MLMD server")
121-
107+
if ds != nil {
122108
//nolint:errcheck
123-
conn.Close()
109+
ds.Teardown()
124110
}
125111
}()
126112

127-
// Wait for either the MLMD server connection or the proxy server to return an error
113+
// Wait for either the Datastore server connection or the proxy server to return an error
128114
// or for both to finish successfully.
129115
return <-errChan
130116
}
131-
132117
func init() {
133118
rootCmd.AddCommand(proxyCmd)
134119

135120
proxyCmd.Flags().StringVarP(&cfg.Hostname, "hostname", "n", cfg.Hostname, "Proxy server listen hostname")
136121
proxyCmd.Flags().IntVarP(&cfg.Port, "port", "p", cfg.Port, "Proxy server listen port")
137122

138-
proxyCmd.Flags().StringVar(&proxyCfg.MLMDHostname, "mlmd-hostname", proxyCfg.MLMDHostname, "MLMD hostname")
139-
proxyCmd.Flags().IntVar(&proxyCfg.MLMDPort, "mlmd-port", proxyCfg.MLMDPort, "MLMD port")
140-
}
141-
142-
type ProxyConfig struct {
143-
MLMDHostname string
144-
MLMDPort int
145-
}
123+
proxyCmd.Flags().StringVar(&proxyCfg.Datastore.MLMD.Hostname, "mlmd-hostname", proxyCfg.Datastore.MLMD.Hostname, "MLMD hostname")
124+
proxyCmd.Flags().IntVar(&proxyCfg.Datastore.MLMD.Port, "mlmd-port", proxyCfg.Datastore.MLMD.Port, "MLMD port")
146125

147-
var proxyCfg = ProxyConfig{
148-
MLMDHostname: "localhost",
149-
MLMDPort: 9090,
126+
proxyCmd.Flags().StringVar(&proxyCfg.Datastore.Type, "datastore-type", proxyCfg.Datastore.Type, "Datastore type")
150127
}

cmd/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func initConfig(cmd *cobra.Command) error {
9090

9191
// If a config file is found, read it in.
9292
if err := viper.ReadInConfig(); err == nil {
93-
glog.Info("using config file: ", viper.ConfigFileUsed())
93+
glog.Info("Using config file: ", viper.ConfigFileUsed())
9494
} else {
9595
var configFileNotFoundError viper.ConfigFileNotFoundError
9696
ok := errors.As(err, &configFileNotFoundError)

docker-compose-local.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ services:
1313
build:
1414
context: .
1515
dockerfile: Dockerfile
16-
command: ["proxy", "--hostname", "0.0.0.0", "--mlmd-hostname", "mlmd-server", "--mlmd-port", "8080"]
16+
command: ["proxy", "--hostname", "0.0.0.0", "--mlmd-hostname", "mlmd-server", "--mlmd-port", "8080", "--datastore-type", "mlmd"]
1717
container_name: model-registry
1818
ports:
1919
- "8080:8080"

docker-compose.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ services:
1111
- ./test/config/ml-metadata:/tmp/shared
1212
model-registry:
1313
image: docker.io/kubeflow/model-registry:latest
14-
command: ["proxy", "--hostname", "0.0.0.0", "--mlmd-hostname", "mlmd-server", "--mlmd-port", "8080"]
14+
command: ["proxy", "--hostname", "0.0.0.0", "--mlmd-hostname", "mlmd-server", "--mlmd-port", "8080", "--datastore-type", "mlmd"]
1515
container_name: model-registry
1616
ports:
1717
- "8080:8080"

internal/datastore/connector.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package datastore
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/kubeflow/model-registry/pkg/api"
8+
)
9+
10+
var (
11+
ErrCreatingDatastore = errors.New("error creating datastore")
12+
ErrUnsupportedDatastore = errors.New("unsupported datastore type")
13+
)
14+
15+
type TeardownFunc func() error
16+
17+
type Datastore struct {
18+
MLMD MLMDConfig
19+
Type string
20+
}
21+
22+
type Connector interface {
23+
Connect() (api.ModelRegistryApi, error)
24+
Teardown() error
25+
}
26+
27+
func NewConnector(ds Datastore) (Connector, error) {
28+
switch ds.Type {
29+
case "mlmd":
30+
if err := ds.MLMD.Validate(); err != nil {
31+
return nil, fmt.Errorf("invalid MLMD config: %w", err)
32+
}
33+
34+
mlmd := NewMLMDService(&ds.MLMD)
35+
36+
return mlmd, nil
37+
default:
38+
return nil, fmt.Errorf("%w: %s. Supported types: mlmd", ErrUnsupportedDatastore, ds.Type)
39+
}
40+
}

internal/datastore/mlmd.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package datastore
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"time"
7+
8+
"github.com/golang/glog"
9+
"github.com/kubeflow/model-registry/internal/mlmdtypes"
10+
"github.com/kubeflow/model-registry/pkg/api"
11+
"github.com/kubeflow/model-registry/pkg/core"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/codes"
14+
"google.golang.org/grpc/credentials/insecure"
15+
"google.golang.org/grpc/status"
16+
)
17+
18+
const (
19+
// maxGRPCRetryAttempts is the maximum number of attempts to retry GRPC requests to the MLMD service.
20+
maxGRPCRetryAttempts = 25 // 25 attempts with incremental backoff (1s, 2s, 3s, ..., 25s) it's ~5 minutes
21+
)
22+
23+
var (
24+
ErrMLMDConnectionStart = errors.New("error dialing connection to mlmd service")
25+
ErrMLMDTypeCreation = errors.New("error creating MLMD types")
26+
ErrMLMDCoreCreation = errors.New("error creating core service")
27+
ErrMLMDConnectionClose = errors.New("error closing connection to mlmd service")
28+
)
29+
30+
type MLMDConfig struct {
31+
Hostname string
32+
Port int
33+
}
34+
35+
func (c *MLMDConfig) Validate() error {
36+
if c.Hostname == "" {
37+
return fmt.Errorf("hostname is required")
38+
}
39+
40+
if c.Port <= 0 || c.Port > 65535 {
41+
return fmt.Errorf("port must be in the range 1-65535")
42+
}
43+
44+
return nil
45+
}
46+
47+
type MLMDService struct {
48+
*MLMDConfig
49+
gRPCConnection *grpc.ClientConn
50+
}
51+
52+
func NewMLMDService(cfg *MLMDConfig) *MLMDService {
53+
return &MLMDService{
54+
MLMDConfig: cfg,
55+
}
56+
}
57+
58+
func (s *MLMDService) Connect() (api.ModelRegistryApi, error) {
59+
uri := fmt.Sprintf("%s:%d", s.Hostname, s.Port)
60+
61+
glog.Infof("Connecting to MLMD service at %s..", uri)
62+
63+
conn, err := grpc.NewClient(uri, grpc.WithTransportCredentials(insecure.NewCredentials()))
64+
if err != nil {
65+
return nil, fmt.Errorf("%w %s: %w", ErrMLMDConnectionStart, uri, err)
66+
}
67+
68+
s.gRPCConnection = conn
69+
70+
mlmdTypeNamesConfig := mlmdtypes.NewMLMDTypeNamesConfigFromDefaults()
71+
72+
// Backoff and retry GRPC requests to the MLMD service, until the service
73+
// becomes available or the maximum number of attempts is reached.
74+
for i := range maxGRPCRetryAttempts {
75+
_, err = mlmdtypes.CreateMLMDTypes(conn, mlmdTypeNamesConfig)
76+
if err == nil {
77+
break
78+
}
79+
80+
st, ok := status.FromError(err)
81+
if !ok || st.Code() != codes.Unavailable {
82+
return nil, fmt.Errorf("%w: %w", ErrMLMDTypeCreation, err)
83+
}
84+
85+
glog.Warningf("Retrying connection to MLMD service (attempt %d/%d): %v", i+1, maxGRPCRetryAttempts, err)
86+
87+
time.Sleep(time.Duration(i+1) * time.Second)
88+
}
89+
90+
if err != nil {
91+
return nil, fmt.Errorf("%w: %w", ErrMLMDTypeCreation, err)
92+
}
93+
94+
service, err := core.NewModelRegistryService(conn, mlmdTypeNamesConfig)
95+
if err != nil {
96+
return nil, fmt.Errorf("%w: %w", ErrMLMDCoreCreation, err)
97+
}
98+
99+
glog.Infof("Successfully connected to MLMD service")
100+
101+
return service, nil
102+
}
103+
104+
func (s *MLMDService) Teardown() error {
105+
glog.Info("Closing connection to MLMD service")
106+
107+
if s.gRPCConnection == nil {
108+
return nil
109+
}
110+
111+
if err := s.gRPCConnection.Close(); err != nil {
112+
return fmt.Errorf("%w: %w", ErrMLMDConnectionClose, err)
113+
}
114+
115+
return nil
116+
}

manifests/kustomize/base/model-registry-configmap.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ data:
99
MODEL_REGISTRY_REST_SERVICE_PORT: "8080"
1010
MODEL_REGISTRY_GRPC_SERVICE_HOST: "model-registry-service"
1111
MODEL_REGISTRY_GRPC_SERVICE_PORT: "9090"
12+
MODEL_REGISTRY_DATA_STORE_TYPE: "mlmd"

manifests/kustomize/base/model-registry-deployment.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ spec:
2626
- --port=8080
2727
- --mlmd-hostname=localhost
2828
- --mlmd-port=9090
29+
- --datastore-type=mlmd
2930
command:
3031
- /model-registry
3132
- proxy

0 commit comments

Comments
 (0)