Skip to content

Commit 99a3a88

Browse files
authored
fix(websocket): improve write and read deadlines (#1693)
1 parent d356689 commit 99a3a88

File tree

25 files changed

+217
-418
lines changed

25 files changed

+217
-418
lines changed

.github/workflows/router-ci.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,8 +136,13 @@ jobs:
136136
runs-on: ubuntu-latest
137137
timeout-minutes: 30
138138
services:
139+
nats:
140+
image: ghcr.io/wundergraph/cosmo/nats:2.11.0
141+
ports:
142+
- 4222:4222
143+
- 8222:8222
144+
- 6222:6222
139145
redis:
140-
# Docker Hub image
141146
image: redis:7
142147
# Set health checks to wait until redis has started
143148
options: >-

docker-compose.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -202,13 +202,14 @@ services:
202202
- dev
203203

204204
nats:
205-
image: nats:${DC_NATS_VERSION:-2.10.6}
205+
image: ghcr.io/wundergraph/cosmo/nats:${DC_NATS_VERSION:-2.11.0}
206+
build:
207+
context: .
208+
dockerfile: docker/nats/Dockerfile
206209
ports:
207210
- '4222:4222'
208211
- '8222:8222'
209212
- '6222:6222'
210-
command:
211-
- '--js'
212213
networks:
213214
- primary
214215
profiles:

docker/nats/Dockerfile

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM nats:2.11.0
2+
3+
# Copy custom config
4+
COPY nats-server.conf /etc/nats/nats-server.conf
5+
6+
# Expose the default ports
7+
EXPOSE 4222 8222 6222
8+
9+
# Run NATS with JetStream using custom config
10+
CMD ["-c", "/etc/nats/nats-server.conf"]

docker/nats/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Nats
2+
3+
This image is only used for development and testing purposes. It is not intended for production use.
4+
5+
__Reason:__ We haven't found any docker image that can run NATS with JetStream enabled. This is needed for GitHub Actions. We tried [Bitnami's image](https://hub.docker.com/r/bitnami/nats) and Nats official but it doesn't work.
6+
7+
## Build & Release
8+
9+
Run the following command to build and push the image for `linux/amd64`,`linux/arm64` and push it to the registry:
10+
11+
```bash
12+
./build-push.sh
13+
```

docker/nats/build-push.sh

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/bin/bash
2+
3+
# Exit on any error
4+
set -e
5+
6+
# --- Config ---
7+
REPOSITORY="wundergraph/cosmo"
8+
IMAGE_NAME="nats"
9+
IMAGE_TAG="2.11.0"
10+
GHCR_IMAGE="ghcr.io/${REPOSITORY}/${IMAGE_NAME}:${IMAGE_TAG}"
11+
12+
# Create and use a buildx builder (if not exists)
13+
docker buildx create --name multi-builder --use --bootstrap || true
14+
15+
# Build the Docker image
16+
# Build and push multi-arch image
17+
echo "🔨 Building multi-arch image for linux/amd64 and linux/arm64..."
18+
docker buildx build \
19+
--platform linux/amd64,linux/arm64 \
20+
-t "$GHCR_IMAGE" \
21+
--push .
22+
23+
echo "✅ Done! Multi-arch image pushed to $GHCR_IMAGE"

docker/nats/nats-server.conf

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
jetstream {
2+
store_dir: /data/jetstream
3+
max_mem_store: 1Gb
4+
max_file_store: 10Gb
5+
}
6+
7+
server_name: "mynats"

router-tests/Makefile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ test-fresh: clean-testcache test
1111
test-no-race:
1212
go test ./...
1313

14+
test-no-race-fresh: clean-testcache
15+
go test ./...
16+
1417
lint:
1518
go vet ./...
1619
staticcheck ./...

router-tests/cache_warmup_test.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import (
66
"testing"
77
"time"
88

9+
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
910
"go.opentelemetry.io/otel/attribute"
1011
"go.opentelemetry.io/otel/sdk/metric"
11-
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1212
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
1313

14-
nodev1 "github.com/wundergraph/cosmo/router/gen/proto/wg/cosmo/node/v1"
1514
"github.com/wundergraph/cosmo/router/pkg/otel"
15+
"go.opentelemetry.io/otel/sdk/metric/metricdata"
1616

1717
"github.com/stretchr/testify/require"
1818
"go.uber.org/zap"
@@ -790,10 +790,7 @@ func TestCacheWarmup(t *testing.T) {
790790
})
791791
})
792792
})
793-
}
794793

795-
// Is set as Flaky so that when running the tests it will be run separately and retried if it fails
796-
func TestFlakyCacheWarmupMetrics(t *testing.T) {
797794
t.Run("should emit planning times metrics during warmup", func(t *testing.T) {
798795
t.Parallel()
799796

@@ -896,17 +893,36 @@ func TestFlakyCacheWarmupMetrics(t *testing.T) {
896893
// One when warming up the operation and one when executing the operation
897894
require.Len(t, m.Data.(metricdata.Histogram[float64]).DataPoints, 2)
898895

896+
// Find data-points by matching their cache hit attribute
897+
warmupDataPoint := findDataPoint(t, m.Data.(metricdata.Histogram[float64]).DataPoints, false)
898+
executionDataPoint := findDataPoint(t, m.Data.(metricdata.Histogram[float64]).DataPoints, true)
899+
899900
// Warming up the operation
900-
require.Equal(t, m.Data.(metricdata.Histogram[float64]).DataPoints[0].Count, uint64(1))
901+
require.Equal(t, uint64(1), warmupDataPoint.Count)
901902

902903
// Executing the operation. Is exported as an aggregated value
903-
require.Equal(t, m.Data.(metricdata.Histogram[float64]).DataPoints[1].Count, uint64(2))
904+
require.Equal(t, uint64(2), executionDataPoint.Count)
904905

905906
// Ensure we collected non-zero planning times
906-
require.Greater(t, m.Data.(metricdata.Histogram[float64]).DataPoints[0].Sum, float64(0))
907-
require.Greater(t, m.Data.(metricdata.Histogram[float64]).DataPoints[1].Sum, float64(0))
907+
require.Greater(t, warmupDataPoint.Sum, float64(0))
908+
require.Greater(t, executionDataPoint.Sum, float64(0))
908909

909910
metricdatatest.AssertEqual(t, operationPlanningTimeMetric, m, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
910911
})
911912
})
912913
}
914+
915+
// findDataPoint finds a data point in a slice of histogram data points by matching
916+
// the value of WgEnginePlanCacheHit attribute
917+
func findDataPoint(t *testing.T, dataPoints []metricdata.HistogramDataPoint[float64], cacheHit bool) metricdata.HistogramDataPoint[float64] {
918+
t.Helper()
919+
for _, dp := range dataPoints {
920+
// Get the value of the WgEnginePlanCacheHit attribute
921+
val, found := dp.Attributes.Value(otel.WgEnginePlanCacheHit)
922+
if found && val.AsBool() == cacheHit {
923+
return dp
924+
}
925+
}
926+
t.Fatalf("Could not find data point with WgEnginePlanCacheHit=%v", cacheHit)
927+
return metricdata.HistogramDataPoint[float64]{}
928+
}

router-tests/events/kafka_events_test.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,20 +24,6 @@ import (
2424

2525
const KafkaWaitTimeout = time.Second * 30
2626

27-
func TestLocalKafka(t *testing.T) {
28-
t.Skip("skip only for local testing")
29-
30-
t.Run("subscribe async", func(t *testing.T) {
31-
testenv.Run(t, &testenv.Config{
32-
RouterConfigJSONTemplate: testenv.ConfigWithEdfsJSONTemplate,
33-
EnableKafka: true,
34-
}, func(t *testing.T, xEnv *testenv.Environment) {
35-
// ensureTopicExists(t, xEnv, "employeeUpdated", "employeeUpdatedTwo")
36-
produceKafkaMessage(t, xEnv, "employeeUpdatedTwo", `{"__typename":"Employee","id": 2,"update":{"name":"foo"}}`)
37-
})
38-
})
39-
}
40-
4127
func TestKafkaEvents(t *testing.T) {
4228
t.Parallel()
4329
// All tests are running in sequence because they are using the same kafka topic

router-tests/events/nats_events_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ func TestNatsEvents(t *testing.T) {
386386
})
387387
})
388388

389-
t.Run("subscribe with multipart responses http", func(t *testing.T) {
389+
t.Run("subscribe with multipart responses http and consume healthcheck only", func(t *testing.T) {
390390
t.Parallel()
391391

392392
testenv.Run(t, &testenv.Config{
@@ -402,12 +402,11 @@ func TestNatsEvents(t *testing.T) {
402402

403403
var counter atomic.Uint32
404404

405-
var client *http.Client
405+
client := &http.Client{}
406+
406407
go func() {
407408
defer counter.Add(1)
408409

409-
client = &http.Client{}
410-
411410
req := xEnv.MakeGraphQLMultipartRequest(http.MethodPost, bytes.NewReader(subscribePayload))
412411
resp, err := client.Do(req)
413412
require.NoError(t, err)
@@ -432,7 +431,6 @@ func TestNatsEvents(t *testing.T) {
432431
})
433432

434433
t.Run("subscribe with closing channel", func(t *testing.T) {
435-
t.Parallel()
436434

437435
testenv.Run(t, &testenv.Config{
438436
RouterConfigJSONTemplate: testenv.ConfigWithEdfsNatsJSONTemplate,

0 commit comments

Comments
 (0)