Skip to content

Commit df05d3d

Browse files
authored
Merge pull request #19 from ethpandaops/feat/slot-to-time-req-interceptor
feat: translate slot filters to timestamps
2 parents 7cbf34d + fd0073e commit df05d3d

File tree

9 files changed

+1039
-17
lines changed

9 files changed

+1039
-17
lines changed

cmd/server/main.go

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"os"
99
"os/signal"
1010
"syscall"
11+
"time"
1112

1213
"github.com/sirupsen/logrus"
1314

@@ -18,6 +19,7 @@ import (
1819
"github.com/ethpandaops/lab-backend/internal/redis"
1920
"github.com/ethpandaops/lab-backend/internal/server"
2021
"github.com/ethpandaops/lab-backend/internal/version"
22+
"github.com/ethpandaops/lab-backend/internal/wallclock"
2123
)
2224

2325
// infrastructure holds core infrastructure components.
@@ -32,6 +34,7 @@ type services struct {
3234
cartographoorProvider cartographoor.Provider
3335
upstreamBounds *bounds.Service
3436
boundsProvider bounds.Provider
37+
wallclockSvc *wallclock.Service
3538
}
3639

3740
func main() {
@@ -236,6 +239,66 @@ func setupServices(
236239

237240
logger.Info("Bounds service started")
238241

242+
// Initialize wallclock service
243+
svc.wallclockSvc = wallclock.New(logger)
244+
245+
err = svc.wallclockSvc.Start(ctx)
246+
if err != nil {
247+
return nil, fmt.Errorf("failed to start wallclock service: %w", err)
248+
}
249+
250+
// Populate wallclocks from cartographoor networks
251+
networks := svc.cartographoorProvider.GetActiveNetworks(ctx)
252+
for name, network := range networks {
253+
genesisTimeWithDelay := time.Unix(network.GenesisTime+network.GenesisDelay, 0)
254+
255+
if err := svc.wallclockSvc.AddNetwork(wallclock.NetworkConfig{
256+
Name: name,
257+
GenesisTime: genesisTimeWithDelay,
258+
SecondsPerSlot: 12,
259+
}); err != nil {
260+
logger.WithFields(logrus.Fields{
261+
"network": name,
262+
"error": err.Error(),
263+
}).Warn("Failed to add wallclock for network")
264+
}
265+
}
266+
267+
logger.WithField("networks", len(networks)).Info("Wallclock service started")
268+
269+
// Sync wallclocks when cartographoor updates
270+
go func() {
271+
notifyChan := svc.cartographoorProvider.NotifyChannel()
272+
273+
for {
274+
select {
275+
case <-notifyChan:
276+
logger.Debug("Cartographoor updated, syncing wallclocks")
277+
278+
networks := svc.cartographoorProvider.GetActiveNetworks(ctx)
279+
280+
for name, network := range networks {
281+
genesisTime := time.Unix(network.GenesisTime, 0)
282+
283+
if err := svc.wallclockSvc.AddNetwork(wallclock.NetworkConfig{
284+
Name: name,
285+
GenesisTime: genesisTime,
286+
SecondsPerSlot: 12,
287+
}); err != nil {
288+
logger.WithFields(logrus.Fields{
289+
"network": name,
290+
"error": err.Error(),
291+
}).Warn("Failed to update wallclock for network")
292+
}
293+
}
294+
295+
logger.Debug("Wallclocks synced with cartographoor")
296+
case <-ctx.Done():
297+
return
298+
}
299+
}
300+
}()
301+
239302
return svc, nil
240303
}
241304

@@ -246,7 +309,7 @@ func startServer(
246309
infra *infrastructure,
247310
svc *services,
248311
) (*server.Server, error) {
249-
srv, err := server.New(logger, cfg, infra.redisClient, svc.cartographoorProvider, svc.boundsProvider)
312+
srv, err := server.New(logger, cfg, infra.redisClient, svc.cartographoorProvider, svc.boundsProvider, svc.wallclockSvc)
250313
if err != nil {
251314
return nil, fmt.Errorf("failed to create server: %w", err)
252315
}
@@ -300,6 +363,13 @@ func shutdownGracefully(
300363
}
301364
}
302365

366+
// Stop wallclock service
367+
if svc.wallclockSvc != nil {
368+
if err := svc.wallclockSvc.Stop(); err != nil {
369+
logger.WithError(err).Error("Error stopping wallclock service")
370+
}
371+
}
372+
303373
// Stop leader election (releases lock)
304374
if err := infra.elector.Stop(); err != nil {
305375
logger.WithError(err).Error("Error stopping leader election")

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ module github.com/ethpandaops/lab-backend
33
go 1.25.1
44

55
require (
6+
github.com/alicebob/miniredis/v2 v2.35.0
7+
github.com/ethpandaops/ethwallclock v0.4.0
68
github.com/google/uuid v1.6.0
79
github.com/prometheus/client_golang v1.23.2
810
github.com/redis/go-redis/v9 v9.14.1
@@ -13,7 +15,6 @@ require (
1315
)
1416

1517
require (
16-
github.com/alicebob/miniredis/v2 v2.35.0 // indirect
1718
github.com/beorn7/perks v1.0.1 // indirect
1819
github.com/cespare/xxhash/v2 v2.3.0 // indirect
1920
github.com/davecgh/go-spew v1.1.1 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
1414
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
1515
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
1616
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
17+
github.com/ethpandaops/ethwallclock v0.4.0 h1:+sgnhf4pk6hLPukP076VxkiLloE4L0Yk1yat+ZyHh1g=
18+
github.com/ethpandaops/ethwallclock v0.4.0/go.mod h1:y0Cu+mhGLlem19vnAV2x0hpFS5KZ7oOi2SWYayv9l24=
1719
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
1820
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
1921
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=

internal/proxy/proxy.go

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,18 @@ import (
1414

1515
"github.com/ethpandaops/lab-backend/internal/cartographoor"
1616
"github.com/ethpandaops/lab-backend/internal/config"
17+
"github.com/ethpandaops/lab-backend/internal/wallclock"
1718
)
1819

1920
// Proxy manages network-based reverse proxying.
2021
type Proxy struct {
21-
config *config.Config
22-
proxies map[string]*httputil.ReverseProxy
23-
proxyURLs map[string]string
24-
logger logrus.FieldLogger
25-
mu sync.RWMutex
26-
provider cartographoor.Provider
22+
config *config.Config
23+
proxies map[string]*httputil.ReverseProxy
24+
proxyURLs map[string]string
25+
logger logrus.FieldLogger
26+
mu sync.RWMutex
27+
provider cartographoor.Provider
28+
wallclockSvc *wallclock.Service
2729

2830
// Periodic sync lifecycle
2931
syncTicker *time.Ticker
@@ -36,14 +38,16 @@ func New(
3638
logger logrus.FieldLogger,
3739
cfg *config.Config,
3840
provider cartographoor.Provider,
41+
wallclockSvc *wallclock.Service,
3942
) (*Proxy, error) {
4043
p := &Proxy{
41-
config: cfg,
42-
proxies: make(map[string]*httputil.ReverseProxy),
43-
proxyURLs: make(map[string]string),
44-
logger: logger.WithField("component", "proxy"),
45-
provider: provider,
46-
stopChan: make(chan struct{}),
44+
config: cfg,
45+
proxies: make(map[string]*httputil.ReverseProxy),
46+
proxyURLs: make(map[string]string),
47+
logger: logger.WithField("component", "proxy"),
48+
provider: provider,
49+
wallclockSvc: wallclockSvc,
50+
stopChan: make(chan struct{}),
4751
}
4852

4953
// Initial sync: build merged network list and create proxies
@@ -153,8 +157,24 @@ func (p *Proxy) createReverseProxy(
153157

154158
r.Out.URL.Path = rewrittenPath
155159

156-
// Preserve query parameters (already handled by SetURL)
157-
r.Out.URL.RawQuery = r.In.URL.RawQuery
160+
// Transform query parameters (slot_* to slot_start_time_*)
161+
originalQuery := r.In.URL.RawQuery
162+
transformedQuery := transformQueryParams(
163+
p.logger,
164+
networkName,
165+
p.wallclockSvc,
166+
originalQuery,
167+
)
168+
r.Out.URL.RawQuery = transformedQuery
169+
170+
// Log transformation if query changed
171+
if originalQuery != transformedQuery {
172+
p.logger.WithFields(logrus.Fields{
173+
"network": networkName,
174+
"original": originalQuery,
175+
"transformed": transformedQuery,
176+
}).Debug("Transformed slot filters to slot_start_date_time")
177+
}
158178
},
159179
Transport: transport,
160180
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {

internal/proxy/transform.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package proxy
2+
3+
import (
4+
"net/url"
5+
"strconv"
6+
"strings"
7+
8+
"github.com/ethpandaops/lab-backend/internal/wallclock"
9+
"github.com/sirupsen/logrus"
10+
)
11+
12+
// transformQueryParams transforms slot_* filters to slot_start_date_time_* filters.
13+
// Returns the original query string if transformation fails (fail-open).
14+
func transformQueryParams(
15+
logger logrus.FieldLogger,
16+
networkName string,
17+
wallclockSvc *wallclock.Service,
18+
originalQuery string,
19+
) string {
20+
// If no wallclock service or empty query, return original
21+
if wallclockSvc == nil || originalQuery == "" {
22+
return originalQuery
23+
}
24+
25+
// Parse query string
26+
values, err := url.ParseQuery(originalQuery)
27+
if err != nil {
28+
logger.WithFields(logrus.Fields{
29+
"network": networkName,
30+
"query": originalQuery,
31+
"error": err.Error(),
32+
}).Warn("Failed to parse query string, using original")
33+
34+
return originalQuery
35+
}
36+
37+
// Track if any transformations were made
38+
transformed := false
39+
transformedValues := make(url.Values)
40+
41+
// Iterate over each parameter
42+
for key, valuesSlice := range values {
43+
// Check if this is a slot filter
44+
isSlot, operator, slotValue := detectSlotFilter(key, valuesSlice)
45+
46+
if !isSlot {
47+
// Not a slot filter, copy as-is
48+
transformedValues[key] = valuesSlice
49+
50+
continue
51+
}
52+
53+
// Calculate slot_start_date_time
54+
slotStartTime := wallclockSvc.CalculateSlotStartTime(networkName, slotValue)
55+
56+
if slotStartTime == 0 {
57+
// Wallclock unavailable or calculation failed, keep original slot filter
58+
logger.WithFields(logrus.Fields{
59+
"network": networkName,
60+
"slot": slotValue,
61+
}).Warn("Failed to calculate slot start time, using original slot filter")
62+
63+
transformedValues[key] = valuesSlice
64+
65+
continue
66+
}
67+
68+
// Replace slot_* with slot_start_date_time_*
69+
newKey := "slot_start_date_time_" + operator
70+
transformedValues[newKey] = []string{strconv.FormatUint(uint64(slotStartTime), 10)}
71+
transformed = true
72+
73+
logger.WithFields(logrus.Fields{
74+
"network": networkName,
75+
"slot": slotValue,
76+
"slot_start_date_time": slotStartTime,
77+
"operator": operator,
78+
}).Debug("Transformed slot filter to slot_start_date_time")
79+
}
80+
81+
// If no transformations were made, return original
82+
if !transformed {
83+
return originalQuery
84+
}
85+
86+
// Return transformed query string
87+
return transformedValues.Encode()
88+
}
89+
90+
// detectSlotFilter checks if a query parameter is a slot filter.
91+
// Returns: isSlotFilter, operator (e.g., "eq", "gte"), value.
92+
func detectSlotFilter(key string, values []string) (bool, string, uint64) {
93+
// Check if key starts with "slot_"
94+
if !strings.HasPrefix(key, "slot_") {
95+
return false, "", 0
96+
}
97+
98+
// If no values, not a valid filter
99+
if len(values) == 0 {
100+
return false, "", 0
101+
}
102+
103+
// Extract operator from key
104+
operator := strings.TrimPrefix(key, "slot_")
105+
106+
// Validate operator
107+
switch operator {
108+
case "eq", "gte", "lte", "gt", "lt":
109+
// Valid operator
110+
default:
111+
// Unknown operator
112+
return false, "", 0
113+
}
114+
115+
// Parse slot value
116+
slotValue, err := strconv.ParseUint(values[0], 10, 64)
117+
if err != nil {
118+
// Invalid slot value
119+
return false, "", 0
120+
}
121+
122+
return true, operator, slotValue
123+
}

0 commit comments

Comments
 (0)