Skip to content

Commit 0ad0cec

Browse files
joelanfordanik120
authored andcommitted
add singleflight for shared index access for concurrent requests
Signed-off-by: Joe Lanford <[email protected]>
1 parent 04a1984 commit 0ad0cec

File tree

2 files changed

+62
-18
lines changed

2 files changed

+62
-18
lines changed

catalogd/internal/serverutil/serverutil.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package serverutil
22

33
import (
4+
"context"
45
"crypto/tls"
56
"fmt"
67
"io"
@@ -12,7 +13,7 @@ import (
1213
"github.com/gorilla/handlers"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/certwatcher"
15-
"sigs.k8s.io/controller-runtime/pkg/manager"
16+
"sigs.k8s.io/controller-runtime/pkg/log"
1617

1718
catalogdmetrics "github.com/operator-framework/operator-controller/catalogd/internal/metrics"
1819
"github.com/operator-framework/operator-controller/catalogd/internal/storage"
@@ -51,8 +52,11 @@ func AddCatalogServerToManager(mgr ctrl.Manager, cfg CatalogServerConfig, tlsFil
5152
Name: "catalogs",
5253
OnlyServeWhenLeader: true,
5354
Server: &http.Server{
54-
Addr: cfg.CatalogAddr,
55-
Handler: handler,
55+
Addr: cfg.CatalogAddr,
56+
Handler: catalogdmetrics.AddMetricsToHandler(cfg.LocalStorage.StorageServerHandler()),
57+
BaseContext: func(_ net.Listener) context.Context {
58+
return log.IntoContext(context.Background(), mgr.GetLogger().WithName("http.catalogs"))
59+
},
5660
ReadTimeout: 5 * time.Second,
5761
// TODO: Revert this to 10 seconds if/when the API
5862
// evolves to have significantly smaller responses

catalogd/internal/storage/localdir.go

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@ import (
1313
"path/filepath"
1414
"strings"
1515
"sync"
16+
"time"
1617

18+
"github.com/go-logr/logr"
1719
"github.com/klauspost/compress/gzhttp"
1820
"golang.org/x/sync/errgroup"
21+
"golang.org/x/sync/singleflight"
1922

2023
"github.com/operator-framework/operator-registry/alpha/declcfg"
2124

@@ -31,7 +34,8 @@ type LocalDirV1 struct {
3134
RootDir string
3235
RootURL *url.URL
3336

34-
m sync.RWMutex
37+
m sync.RWMutex
38+
sf singleflight.Group
3539
}
3640

3741
func (s *LocalDirV1) Store(ctx context.Context, catalog string, fsys fs.FS) error {
@@ -175,7 +179,8 @@ func (s *LocalDirV1) v1AllHandler() http.Handler {
175179
w.Header().Add("Content-Type", "application/jsonl")
176180
gzHandler.ServeHTTP(w, r)
177181
})
178-
return typeHandler
182+
183+
return newLoggingMiddleware(typeHandler)
179184
}
180185

181186
func (s *LocalDirV1) v1QueryHandler() http.Handler {
@@ -215,22 +220,11 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler {
215220
}
216221
defer catalogFile.Close()
217222

218-
indexFile, err := os.Open(filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog)))
223+
idx, err := s.getIndex(catalog)
219224
if err != nil {
220-
if errors.Is(err, fs.ErrNotExist) {
221-
http.Error(w, "No catalog contents found matching query", http.StatusNotFound)
222-
}
223-
http.Error(w, err.Error(), http.StatusInternalServerError)
224-
return
225-
}
226-
defer indexFile.Close()
227-
228-
var idx index
229-
if err := json.NewDecoder(indexFile).Decode(&idx); err != nil {
230225
http.Error(w, err.Error(), http.StatusInternalServerError)
231226
return
232227
}
233-
234228
queryReader, ok := idx.Get(catalogFile, schema, pkg, name)
235229
if !ok {
236230
http.Error(w, fmt.Sprintf("No index found for schema=%q, package=%q, name=%q", schema, pkg, name), http.StatusInternalServerError)
@@ -241,7 +235,7 @@ func (s *LocalDirV1) v1QueryHandler() http.Handler {
241235
_, _ = io.Copy(w, queryReader)
242236
})
243237
gzHandler := gzhttp.GzipHandler(catalogHandler)
244-
return gzHandler
238+
return newLoggingMiddleware(gzHandler)
245239
}
246240

247241
func (s *LocalDirV1) ContentExists(catalog string) bool {
@@ -268,3 +262,49 @@ func (s *LocalDirV1) ContentExists(catalog string) bool {
268262
}
269263
return true
270264
}
265+
266+
func (s *LocalDirV1) getIndex(catalog string) (*index, error) {
267+
idx, err, _ := s.sf.Do(catalog, func() (interface{}, error) {
268+
indexFilePath := filepath.Join(s.RootDir, fmt.Sprintf("%s.index.json", catalog))
269+
fmt.Printf("opening index file %s\n", indexFilePath)
270+
indexFile, err := os.Open(indexFilePath)
271+
if err != nil {
272+
return nil, err
273+
}
274+
defer indexFile.Close()
275+
var idx index
276+
if err := json.NewDecoder(indexFile).Decode(&idx); err != nil {
277+
return nil, err
278+
}
279+
return &idx, nil
280+
})
281+
return idx.(*index), err
282+
}
283+
284+
func newLoggingMiddleware(next http.Handler) http.Handler {
285+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
286+
logger := logr.FromContextOrDiscard(r.Context())
287+
288+
start := time.Now()
289+
lrw := &loggingResponseWriter{ResponseWriter: w, statusCode: http.StatusOK}
290+
next.ServeHTTP(lrw, r)
291+
292+
logger.WithValues(
293+
"method", r.Method,
294+
"url", r.URL.String(),
295+
"status", lrw.statusCode,
296+
"duration", time.Since(start),
297+
"remoteAddr", r.RemoteAddr,
298+
).Info("HTTP request processed")
299+
})
300+
}
301+
302+
type loggingResponseWriter struct {
303+
http.ResponseWriter
304+
statusCode int
305+
}
306+
307+
func (w *loggingResponseWriter) WriteHeader(code int) {
308+
w.statusCode = code
309+
w.ResponseWriter.WriteHeader(code)
310+
}

0 commit comments

Comments
 (0)