Skip to content

Commit 068fd48

Browse files
anik120joelanford
andauthored
⚠ (feat) Introduce new feature-gated query endpoint (#1643)
* add a new feature-gated api/v1/query endpoint Signed-off-by: Joe Lanford <[email protected]> * add singleflight for shared index access for concurrent requests Signed-off-by: Joe Lanford <[email protected]> * a few improvements and optimizations Signed-off-by: Joe Lanford <[email protected]> * another round of refactoring improvement Signed-off-by: Joe Lanford <[email protected]> * Tests * use io.MultiReader instead of manual implementation * include checkPrecoditions check * add comments * code cleanup * cleanup index.Get's signature * only allow GET/HEAD methods * refractor serverJSONLines * replace static test variable * fix unit test * refractor getIndex() (and other lint issues) --------- Signed-off-by: Joe Lanford <[email protected]> Co-authored-by: Joe Lanford <[email protected]>
1 parent 1a52e2e commit 068fd48

File tree

10 files changed

+1267
-448
lines changed

10 files changed

+1267
-448
lines changed

catalogd/cmd/catalogd/main.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,9 +303,13 @@ func main() {
303303
os.Exit(1)
304304
}
305305

306-
localStorage = storage.LocalDirV1{RootDir: storeDir, RootURL: baseStorageURL}
306+
localStorage = &storage.LocalDirV1{
307+
RootDir: storeDir,
308+
RootURL: baseStorageURL,
309+
EnableQueryHandler: features.CatalogdFeatureGate.Enabled(features.APIV1QueryHandler),
310+
}
307311

308-
// Config for the the catalogd web server
312+
// Config for the catalogd web server
309313
catalogServerConfig := serverutil.CatalogServerConfig{
310314
ExternalAddr: externalAddr,
311315
CatalogAddr: catalogServerAddr,

catalogd/internal/features/features.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,13 @@ import (
55
"k8s.io/component-base/featuregate"
66
)
77

8-
var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{}
8+
const (
9+
APIV1QueryHandler = featuregate.Feature("APIV1QueryHandler")
10+
)
11+
12+
var catalogdFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
13+
APIV1QueryHandler: {Default: false, PreRelease: featuregate.Alpha},
14+
}
915

1016
var CatalogdFeatureGate featuregate.MutableFeatureGate = featuregate.NewFeatureGate()
1117

catalogd/internal/serverutil/serverutil.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/go-logr/logr"
1212
"github.com/gorilla/handlers"
13+
"github.com/klauspost/compress/gzhttp"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
1516
"sigs.k8s.io/controller-runtime/pkg/manager"
@@ -42,17 +43,12 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil
4243
}
4344

4445
shutdownTimeout := 30 * time.Second
45-
46-
l := mgr.GetLogger().WithName("catalogd-http-server")
47-
handler := catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler())
48-
handler = logrLoggingHandler(l, handler)
49-
5046
catalogServer := manager.Server{
5147
Name: "catalogs",
5248
OnlyServeWhenLeader: true,
5349
Server: &http.Server{
5450
Addr: cfg.CatalogAddr,
55-
Handler: handler,
51+
Handler: storageServerHandlerWrapped(mgr.GetLogger().WithName("catalogd-http-server"), cfg),
5652
ReadTimeout: 5 * time.Second,
5753
// TODO: Revert this to 10 seconds if/when the API
5854
// evolves to have significantly smaller responses
@@ -97,3 +93,12 @@ func logrLoggingHandler(l logr.Logger, handler http.Handler) http.Handler {
9793
l.Info("handled request", "host", host, "username", username, "method", params.Request.Method, "uri", uri, "protocol", params.Request.Proto, "status", params.StatusCode, "size", params.Size)
9894
})
9995
}
96+
97+
func storageServerHandlerWrapped(l logr.Logger, cfg CatalogServerConfig) http.Handler {
98+
handler := cfg.LocalStorage.StorageServerHandler()
99+
handler = gzhttp.GzipHandler(handler)
100+
handler = catalogdmetrics.AddMetricsToHandler(handler)
101+
102+
handler = logrLoggingHandler(l, handler)
103+
return handler
104+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package serverutil
2+
3+
import (
4+
"compress/gzip"
5+
"context"
6+
"io"
7+
"io/fs"
8+
"net/http"
9+
"net/http/httptest"
10+
"strings"
11+
"testing"
12+
13+
"github.com/go-logr/logr"
14+
"github.com/stretchr/testify/require"
15+
)
16+
17+
func TestStorageServerHandlerWrapped_Gzip(t *testing.T) {
18+
var generatedJSON = func(size int) string {
19+
return "{\"data\":\"" + strings.Repeat("test data ", size) + "\"}"
20+
}
21+
tests := []struct {
22+
name string
23+
acceptEncoding string
24+
responseContent string
25+
expectCompressed bool
26+
expectedStatus int
27+
}{
28+
{
29+
name: "compresses large response when client accepts gzip",
30+
acceptEncoding: "gzip",
31+
responseContent: generatedJSON(1000),
32+
expectCompressed: true,
33+
expectedStatus: http.StatusOK,
34+
},
35+
{
36+
name: "does not compress small response even when client accepts gzip",
37+
acceptEncoding: "gzip",
38+
responseContent: `{"foo":"bar"}`,
39+
expectCompressed: false,
40+
expectedStatus: http.StatusOK,
41+
},
42+
{
43+
name: "does not compress when client doesn't accept gzip",
44+
acceptEncoding: "",
45+
responseContent: generatedJSON(1000),
46+
expectCompressed: false,
47+
expectedStatus: http.StatusOK,
48+
},
49+
}
50+
51+
for _, tt := range tests {
52+
t.Run(tt.name, func(t *testing.T) {
53+
// Create a mock storage instance that returns our test content
54+
mockStorage := &mockStorageInstance{
55+
content: tt.responseContent,
56+
}
57+
58+
cfg := CatalogServerConfig{
59+
LocalStorage: mockStorage,
60+
}
61+
handler := storageServerHandlerWrapped(logr.Logger{}, cfg)
62+
63+
// Create test request
64+
req := httptest.NewRequest("GET", "/test", nil)
65+
if tt.acceptEncoding != "" {
66+
req.Header.Set("Accept-Encoding", tt.acceptEncoding)
67+
}
68+
69+
// Create response recorder
70+
rec := httptest.NewRecorder()
71+
72+
// Handle the request
73+
handler.ServeHTTP(rec, req)
74+
75+
// Check status code
76+
require.Equal(t, tt.expectedStatus, rec.Code)
77+
78+
// Check if response was compressed
79+
wasCompressed := rec.Header().Get("Content-Encoding") == "gzip"
80+
require.Equal(t, tt.expectCompressed, wasCompressed)
81+
82+
// Get the response body
83+
var responseBody []byte
84+
if wasCompressed {
85+
// Decompress the response
86+
gzipReader, err := gzip.NewReader(rec.Body)
87+
require.NoError(t, err)
88+
responseBody, err = io.ReadAll(gzipReader)
89+
require.NoError(t, err)
90+
require.NoError(t, gzipReader.Close())
91+
} else {
92+
responseBody = rec.Body.Bytes()
93+
}
94+
95+
// Verify the response content
96+
require.Equal(t, tt.responseContent, string(responseBody))
97+
})
98+
}
99+
}
100+
101+
// mockStorageInstance implements storage.Instance interface for testing
102+
type mockStorageInstance struct {
103+
content string
104+
}
105+
106+
func (m *mockStorageInstance) StorageServerHandler() http.Handler {
107+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
108+
_, err := w.Write([]byte(m.content))
109+
if err != nil {
110+
http.Error(w, err.Error(), http.StatusInternalServerError)
111+
}
112+
})
113+
}
114+
115+
func (m *mockStorageInstance) Store(ctx context.Context, catalogName string, fs fs.FS) error {
116+
return nil
117+
}
118+
119+
func (m *mockStorageInstance) Delete(catalogName string) error {
120+
return nil
121+
}
122+
123+
func (m *mockStorageInstance) ContentExists(catalog string) bool {
124+
return true
125+
}
126+
func (m *mockStorageInstance) BaseURL(catalog string) string {
127+
return ""
128+
}

0 commit comments

Comments
 (0)