Skip to content

Commit 8f214a5

Browse files
committed
implements dynamic_tools for clickhouse config section, fix #27
Signed-off-by: Slach <bloodjazman@gmail.com>
1 parent 06cb0fa commit 8f214a5

File tree

9 files changed

+970
-27
lines changed

9 files changed

+970
-27
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ AGENTS.md
99
.idea/
1010
.kilocode/
1111
.crush/
12+
.code/
1213
build/
1314
dist/
1415
coverage.out

cmd/altinity-mcp/main.go

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -540,12 +540,13 @@ func (a *application) startHTTPServer(cfg config.Config, mcpServer *server.MCPSe
540540
// Register custom handlers to ensure token is in the path and inject it into context
541541
mux := http.NewServeMux()
542542
mux.Handle("/{token}/http", serverInjector(tokenInjector(httpServer)))
543-
if cfg.Server.OpenAPI.Enabled {
544-
mux.HandleFunc("/openapi", a.mcpServer.ServeOpenAPISchema)
545-
mux.HandleFunc("/{token}/openapi", serverInjectorOpenAPI)
546-
mux.HandleFunc("/{token}/openapi/list_tables", serverInjectorOpenAPI)
547-
mux.HandleFunc("/{token}/openapi/describe_table", serverInjectorOpenAPI)
548-
mux.HandleFunc("/{token}/openapi/execute_query", serverInjectorOpenAPI)
543+
if cfg.Server.OpenAPI.Enabled {
544+
mux.HandleFunc("/openapi", a.mcpServer.ServeOpenAPISchema)
545+
mux.HandleFunc("/{token}/openapi", serverInjectorOpenAPI)
546+
mux.HandleFunc("/{token}/openapi/", serverInjectorOpenAPI)
547+
mux.HandleFunc("/{token}/openapi/list_tables", serverInjectorOpenAPI)
548+
mux.HandleFunc("/{token}/openapi/describe_table", serverInjectorOpenAPI)
549+
mux.HandleFunc("/{token}/openapi/execute_query", serverInjectorOpenAPI)
549550
log.Info().Str("url", fmt.Sprintf("%s://%s:%d/{token}/openapi", openAPIProtocol, cfg.Server.Address, cfg.Server.Port)).Msg("OpenAPI server listening")
550551
}
551552
mux.HandleFunc("/health", a.healthHandler)
@@ -556,11 +557,12 @@ func (a *application) startHTTPServer(cfg config.Config, mcpServer *server.MCPSe
556557
httpServer := server.NewStreamableHTTPServer(mcpServer)
557558
mux := http.NewServeMux()
558559
mux.Handle("/http", serverInjector(httpServer))
559-
if cfg.Server.OpenAPI.Enabled {
560-
mux.HandleFunc("/openapi", serverInjectorOpenAPI)
561-
mux.HandleFunc("/openapi/list_tables", serverInjectorOpenAPI)
562-
mux.HandleFunc("/openapi/describe_table", serverInjectorOpenAPI)
563-
mux.HandleFunc("/openapi/execute_query", serverInjectorOpenAPI)
560+
if cfg.Server.OpenAPI.Enabled {
561+
mux.HandleFunc("/openapi", serverInjectorOpenAPI)
562+
mux.HandleFunc("/openapi/", serverInjectorOpenAPI)
563+
mux.HandleFunc("/openapi/list_tables", serverInjectorOpenAPI)
564+
mux.HandleFunc("/openapi/describe_table", serverInjectorOpenAPI)
565+
mux.HandleFunc("/openapi/execute_query", serverInjectorOpenAPI)
564566
log.Info().Str("url", fmt.Sprintf("%s://%s:%d/openapi", openAPIProtocol, cfg.Server.Address, cfg.Server.Port)).Msg("OpenAPI server listening")
565567
}
566568
mux.HandleFunc("/health", a.healthHandler)
@@ -645,12 +647,13 @@ func (a *application) startSSEServer(cfg config.Config, mcpServer *server.MCPSer
645647
mux := http.NewServeMux()
646648
mux.Handle("/{token}/sse", serverInjector(tokenInjector(sseServer.SSEHandler())))
647649
mux.Handle("/{token}/message", serverInjector(tokenInjector(sseServer.MessageHandler())))
648-
if cfg.Server.OpenAPI.Enabled {
649-
mux.HandleFunc("/openapi", a.mcpServer.ServeOpenAPISchema)
650-
mux.HandleFunc("/{token}/openapi", serverInjectorOpenAPI)
651-
mux.HandleFunc("/{token}/openapi/list_tables", serverInjectorOpenAPI)
652-
mux.HandleFunc("/{token}/openapi/describe_table", serverInjectorOpenAPI)
653-
mux.HandleFunc("/{token}/openapi/execute_query", serverInjectorOpenAPI)
650+
if cfg.Server.OpenAPI.Enabled {
651+
mux.HandleFunc("/openapi", a.mcpServer.ServeOpenAPISchema)
652+
mux.HandleFunc("/{token}/openapi", serverInjectorOpenAPI)
653+
mux.HandleFunc("/{token}/openapi/", serverInjectorOpenAPI)
654+
mux.HandleFunc("/{token}/openapi/list_tables", serverInjectorOpenAPI)
655+
mux.HandleFunc("/{token}/openapi/describe_table", serverInjectorOpenAPI)
656+
mux.HandleFunc("/{token}/openapi/execute_query", serverInjectorOpenAPI)
654657
log.Info().Str("url", fmt.Sprintf("%s://%s:%d/{token}/openapi", openAPIProtocol, cfg.Server.Address, cfg.Server.Port)).Msg("OpenAPI server listening")
655658
}
656659
mux.HandleFunc("/health", a.healthHandler)
@@ -662,11 +665,12 @@ func (a *application) startSSEServer(cfg config.Config, mcpServer *server.MCPSer
662665
mux := http.NewServeMux()
663666
mux.Handle("/sse", serverInjector(sseServer))
664667
mux.Handle("/message", serverInjector(sseServer.MessageHandler()))
665-
if cfg.Server.OpenAPI.Enabled {
666-
mux.HandleFunc("/openapi", serverInjectorOpenAPI)
667-
mux.HandleFunc("/openapi/list_tables", serverInjectorOpenAPI)
668-
mux.HandleFunc("/openapi/describe_table", serverInjectorOpenAPI)
669-
mux.HandleFunc("/openapi/execute_query", serverInjectorOpenAPI)
668+
if cfg.Server.OpenAPI.Enabled {
669+
mux.HandleFunc("/openapi", serverInjectorOpenAPI)
670+
mux.HandleFunc("/openapi/", serverInjectorOpenAPI)
671+
mux.HandleFunc("/openapi/list_tables", serverInjectorOpenAPI)
672+
mux.HandleFunc("/openapi/describe_table", serverInjectorOpenAPI)
673+
mux.HandleFunc("/openapi/execute_query", serverInjectorOpenAPI)
670674
log.Info().Str("url", fmt.Sprintf("%s://%s:%d/openapi", openAPIProtocol, cfg.Server.Address, cfg.Server.Port)).Msg("OpenAPI server listening")
671675
}
672676
mux.HandleFunc("/health", a.healthHandler)

cmd/altinity-mcp/main_test.go

Lines changed: 171 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ import (
1919
"os"
2020
"strings"
2121
"testing"
22-
"time"
22+
"time"
23+
"path/filepath"
2324

2425
"github.com/altinity/altinity-mcp/pkg/config"
26+
"github.com/altinity/altinity-mcp/pkg/clickhouse"
2527
"github.com/altinity/altinity-mcp/pkg/jwe_auth"
2628
altinitymcp "github.com/altinity/altinity-mcp/pkg/server"
2729
"github.com/stretchr/testify/require"
@@ -358,6 +360,174 @@ MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA7d7Qj8fKjKjKjKjKjKjK
358360
})
359361
}
360362

363+
// setupClickHouseContainerMain is a local helper for this package's tests
364+
func setupClickHouseContainerMain(t *testing.T) *config.ClickHouseConfig {
365+
t.Helper()
366+
ctx := context.Background()
367+
368+
req := testcontainers.ContainerRequest{
369+
Image: "clickhouse/clickhouse-server:latest",
370+
ExposedPorts: []string{"8123/tcp", "9000/tcp"},
371+
Env: map[string]string{
372+
"CLICKHOUSE_SKIP_USER_SETUP": "1",
373+
"CLICKHOUSE_DB": "default",
374+
"CLICKHOUSE_USER": "default",
375+
"CLICKHOUSE_PASSWORD": "",
376+
"CLICKHOUSE_DEFAULT_ACCESS_MANAGEMENT": "1",
377+
},
378+
WaitingFor: wait.ForHTTP("/").WithPort("8123/tcp").WithStartupTimeout(30 * time.Second).WithPollInterval(2 * time.Second),
379+
}
380+
chContainer, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ContainerRequest: req, Started: true})
381+
require.NoError(t, err)
382+
383+
t.Cleanup(func() {
384+
cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
385+
defer cancel()
386+
_ = chContainer.Terminate(cleanupCtx)
387+
})
388+
389+
host, err := chContainer.Host(ctx)
390+
require.NoError(t, err)
391+
port, err := chContainer.MappedPort(ctx, "9000")
392+
require.NoError(t, err)
393+
394+
cfg := &config.ClickHouseConfig{
395+
Host: host,
396+
Port: port.Int(),
397+
Database: "default",
398+
Username: "default",
399+
Password: "",
400+
Protocol: config.TCPProtocol,
401+
ReadOnly: false,
402+
MaxExecutionTime: 60,
403+
Limit: 1000,
404+
}
405+
406+
// create base table
407+
client, err := clickhouse.NewClient(ctx, *cfg)
408+
require.NoError(t, err)
409+
defer func() { _ = client.Close() }()
410+
_, _ = client.ExecuteQuery(ctx, "CREATE TABLE IF NOT EXISTS default.test (id UInt64, value String) ENGINE = Memory")
411+
_, _ = client.ExecuteQuery(ctx, "INSERT INTO default.test VALUES (1, 'one') ON CLUSTER default")
412+
return cfg
413+
}
414+
415+
// Health handler tests
416+
func TestHealthHandler_Additions(t *testing.T) {
417+
// JWE enabled -> should return 200 and auth=jwe_enabled
418+
t.Run("jwe_enabled", func(t *testing.T) {
419+
app := &application{config: config.Config{Server: config.ServerConfig{JWE: config.JWEConfig{Enabled: true}}}}
420+
rr := httptest.NewRecorder()
421+
req := httptest.NewRequest(http.MethodGet, "/health", nil)
422+
app.healthHandler(rr, req)
423+
require.Equal(t, http.StatusOK, rr.Code)
424+
var body map[string]interface{}
425+
require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &body))
426+
require.Equal(t, "jwe_enabled", body["auth"])
427+
})
428+
429+
// JWE disabled with invalid CH -> 503
430+
t.Run("clickhouse_unhealthy", func(t *testing.T) {
431+
app := &application{config: config.Config{Server: config.ServerConfig{JWE: config.JWEConfig{Enabled: false}}, ClickHouse: config.ClickHouseConfig{Host: "127.0.0.1", Port: 9999, Database: "default", Username: "default", Protocol: config.TCPProtocol}}}
432+
rr := httptest.NewRecorder()
433+
req := httptest.NewRequest(http.MethodGet, "/health", nil)
434+
app.healthHandler(rr, req)
435+
require.Equal(t, http.StatusServiceUnavailable, rr.Code)
436+
})
437+
438+
// JWE disabled with real CH -> 200
439+
t.Run("clickhouse_healthy", func(t *testing.T) {
440+
// spin container
441+
ctx := context.Background()
442+
cfg := setupClickHouseContainerMain(t)
443+
app := &application{config: config.Config{Server: config.ServerConfig{JWE: config.JWEConfig{Enabled: false}}, ClickHouse: *cfg}}
444+
rr := httptest.NewRecorder()
445+
req := httptest.NewRequest(http.MethodGet, "/health", nil)
446+
app.healthHandler(rr, req)
447+
require.Equal(t, http.StatusOK, rr.Code)
448+
var body map[string]interface{}
449+
require.NoError(t, json.Unmarshal(rr.Body.Bytes(), &body))
450+
require.Equal(t, "connected", body["clickhouse"])
451+
_ = ctx
452+
})
453+
454+
// Method not allowed
455+
t.Run("method_not_allowed", func(t *testing.T) {
456+
app := &application{config: config.Config{}}
457+
rr := httptest.NewRecorder()
458+
req := httptest.NewRequest(http.MethodPost, "/health", nil)
459+
app.healthHandler(rr, req)
460+
require.Equal(t, http.StatusMethodNotAllowed, rr.Code)
461+
})
462+
}
463+
464+
// testConnection tests
465+
func TestTestConnection_Additions(t *testing.T) {
466+
t.Run("success", func(t *testing.T) {
467+
cfg := setupClickHouseContainerMain(t)
468+
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
469+
defer cancel()
470+
err := testConnection(ctx, *cfg)
471+
require.NoError(t, err)
472+
})
473+
474+
t.Run("failure", func(t *testing.T) {
475+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
476+
defer cancel()
477+
bad := config.ClickHouseConfig{Host: "127.0.0.1", Port: 9999, Database: "default", Username: "default", Protocol: config.TCPProtocol}
478+
err := testConnection(ctx, bad)
479+
require.Error(t, err)
480+
})
481+
}
482+
483+
func TestNewApplication_ErrorPaths(t *testing.T) {
484+
ctx := context.Background()
485+
t.Run("jwe_enabled_missing_key", func(t *testing.T) {
486+
cfg := config.Config{Server: config.ServerConfig{JWE: config.JWEConfig{Enabled: true}}}
487+
_, err := newApplication(ctx, cfg, &mockCommand{flags: map[string]interface{}{"config-reload-time": 0}, setFlags: map[string]bool{"config-reload-time": true}, stringMaps: map[string]map[string]string{}})
488+
require.Error(t, err)
489+
require.Contains(t, err.Error(), "JWE encryption is enabled")
490+
})
491+
492+
t.Run("clickhouse_ping_fail", func(t *testing.T) {
493+
cfg := config.Config{ClickHouse: config.ClickHouseConfig{Host: "127.0.0.1", Port: 65000, Database: "default", Username: "default", Protocol: config.TCPProtocol}}
494+
_, err := newApplication(ctx, cfg, &mockCommand{flags: map[string]interface{}{"config-reload-time": 0}, setFlags: map[string]bool{"config-reload-time": true}, stringMaps: map[string]map[string]string{}})
495+
require.Error(t, err)
496+
})
497+
}
498+
499+
func TestConfigReloadLoop_ErrorAndStop(t *testing.T) {
500+
// Create temp invalid config file to trigger reload error
501+
dir := t.TempDir()
502+
cfgPath := filepath.Join(dir, "config.yaml")
503+
require.NoError(t, os.WriteFile(cfgPath, []byte("invalid: : yaml"), 0o600))
504+
505+
cfg := config.Config{ReloadTime: 1}
506+
app := &application{config: cfg, configFile: cfgPath, stopConfigReload: make(chan struct{}), mcpServer: altinitymcp.NewClickHouseMCPServer(config.Config{}, "test")}
507+
508+
ctx, cancel := context.WithCancel(context.Background())
509+
defer cancel()
510+
done := make(chan struct{})
511+
go func() { app.configReloadLoop(ctx, &mockCommand{flags: map[string]interface{}{}, setFlags: map[string]bool{}, stringMaps: map[string]map[string]string{}}); close(done) }()
512+
time.Sleep(1500 * time.Millisecond)
513+
close(app.stopConfigReload)
514+
<-done
515+
}
516+
517+
// ClickHouse client Ping/DescribeTable extra coverage
518+
func TestClickHouseClient_PingAndDescribeTable(t *testing.T) {
519+
cfg := setupClickHouseContainerMain(t)
520+
ctx := context.Background()
521+
client, err := clickhouse.NewClient(ctx, *cfg)
522+
require.NoError(t, err)
523+
defer func() { require.NoError(t, client.Close()) }()
524+
525+
require.NoError(t, client.Ping(ctx))
526+
cols, err := client.DescribeTable(ctx, cfg.Database, "test")
527+
require.NoError(t, err)
528+
require.NotEmpty(t, cols)
529+
}
530+
361531
// TestHealthHandler tests the health check endpoint
362532
func TestHealthHandler(t *testing.T) {
363533
t.Run("method_not_allowed", func(t *testing.T) {

helm/altinity-mcp/values.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ config:
142142
jwt_secret_key: ""
143143
jwe_secret_key: ""
144144
token_param: "token"
145+
# Dynamic tools generated from ClickHouse views
146+
dynamic_tools: []
147+
# - regexp: "db\\..*"
148+
# prefix: "custom_"
145149

146150
logging:
147151
level: "info"

pkg/clickhouse/client_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,37 @@ func TestClientOperations(t *testing.T) {
148148
})
149149
}
150150

151+
func TestClientErrorPaths(t *testing.T) {
152+
t.Run("ping_failure", func(t *testing.T) {
153+
cfg := &config.ClickHouseConfig{Host: "127.0.0.1", Port: 65000, Database: "default", Username: "default", Protocol: config.TCPProtocol}
154+
ctx := context.Background()
155+
client, err := NewClient(ctx, *cfg)
156+
require.Error(t, err)
157+
require.Nil(t, client)
158+
})
159+
160+
t.Run("describe_table_not_exists", func(t *testing.T) {
161+
cfg := setupClickHouseContainer(t)
162+
ctx := context.Background()
163+
client, err := NewClient(ctx, *cfg)
164+
require.NoError(t, err)
165+
defer func() { _ = client.Close() }()
166+
_, err = client.DescribeTable(ctx, cfg.Database, "not_exists")
167+
require.Error(t, err)
168+
require.Contains(t, err.Error(), "columns not found")
169+
})
170+
171+
t.Run("non_select_error", func(t *testing.T) {
172+
cfg := setupClickHouseContainer(t)
173+
ctx := context.Background()
174+
client, err := NewClient(ctx, *cfg)
175+
require.NoError(t, err)
176+
defer func() { _ = client.Close() }()
177+
_, err = client.ExecuteQuery(ctx, "CREATE TABLE broken ENGINE = Memory")
178+
require.Error(t, err)
179+
})
180+
}
181+
151182
// TestUtilityFunctions tests utility functions
152183
func TestUtilityFunctions(t *testing.T) {
153184
t.Run("isSelectQuery", func(t *testing.T) {

pkg/config/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ type ServerConfig struct {
8080
JWE JWEConfig `json:"jwe" yaml:"jwe"`
8181
OpenAPI OpenAPIConfig `json:"openapi" yaml:"openapi" desc:"OpenAPI endpoints configuration"`
8282
CORSOrigin string `json:"cors_origin" yaml:"cors_origin" flag:"cors-origin" desc:"CORS origin for HTTP/SSE transports (default: *)"`
83+
// DynamicTools defines rules for generating tools from ClickHouse views
84+
DynamicTools []DynamicToolRule `json:"dynamic_tools" yaml:"dynamic_tools"`
8385
}
8486

8587
// OpenAPIConfig defines OpenAPI endpoints configuration
@@ -88,6 +90,12 @@ type OpenAPIConfig struct {
8890
TLS bool `json:"tls" yaml:"tls" desc:"Use TLS (https) for OpenAPI endpoints"`
8991
}
9092

93+
// DynamicToolRule describes a rule to create dynamic tools from views
94+
type DynamicToolRule struct {
95+
Regexp string `json:"regexp" yaml:"regexp"`
96+
Prefix string `json:"prefix" yaml:"prefix"`
97+
}
98+
9199
// LogLevel defines the logging level
92100
type LogLevel string
93101

pkg/config/config_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,39 @@ import (
88
"github.com/stretchr/testify/require"
99
)
1010

11+
func TestLoadConfigWithDynamicTools(t *testing.T) {
12+
yaml := []byte(`
13+
clickhouse:
14+
host: localhost
15+
port: 8123
16+
database: default
17+
username: default
18+
protocol: http
19+
server:
20+
transport: http
21+
address: 0.0.0.0
22+
port: 8080
23+
openapi:
24+
enabled: true
25+
dynamic_tools:
26+
- regexp: "db\\..*"
27+
prefix: "custom_"
28+
logging:
29+
level: info
30+
`)
31+
32+
// Write to temp file
33+
f := t.TempDir() + "/config.yaml"
34+
require.NoError(t, os.WriteFile(f, yaml, 0o600))
35+
36+
cfg, err := LoadConfigFromFile(f)
37+
require.NoError(t, err)
38+
require.NotNil(t, cfg)
39+
require.Len(t, cfg.Server.DynamicTools, 1)
40+
require.Equal(t, "db\\..*", cfg.Server.DynamicTools[0].Regexp)
41+
require.Equal(t, "custom_", cfg.Server.DynamicTools[0].Prefix)
42+
}
43+
1144
// TestLoadConfigFromFile tests configuration loading from files
1245
func TestLoadConfigFromFile(t *testing.T) {
1346
t.Run("yaml_config", func(t *testing.T) {

0 commit comments

Comments
 (0)