Skip to content

Commit 4cbf392

Browse files
authored
Merge pull request #618 from pitabwire/fix/critical-robustness-improvements
fix: critical robustness improvements and zombie trace suppression
2 parents f45f0eb + 7c9ee55 commit 4cbf392

22 files changed

+304
-193
lines changed

cache/jetstreamkv/jetstream.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func New(opts ...cache.Option) (cache.RawCache, error) {
4242

4343
js, err := natsConn.JetStream()
4444
if err != nil {
45+
natsConn.Close()
4546
return nil, err
4647
}
4748
// Create the client
@@ -58,15 +59,18 @@ func New(opts ...cache.Option) (cache.RawCache, error) {
5859
// If the bucket already exists, just get a handle to it.
5960
client, err = js.KeyValue(cacheOpts.Name)
6061
if err != nil {
62+
natsConn.Close()
6163
return nil, err
6264
}
6365
} else {
6466
// Another error occurred during creation.
67+
natsConn.Close()
6568
return nil, err
6669
}
6770
}
6871

6972
if _, err = client.Status(); err != nil {
73+
natsConn.Close()
7074
return nil, err
7175
}
7276

cache/redis/redis.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func New(opts ...cache.Option) (cache.RawCache, error) {
4141

4242
err = client.Ping(ctx).Err()
4343
if err != nil {
44+
_ = client.Close()
4445
return nil, err
4546
}
4647

events/events_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package events_test
33
import (
44
"context"
55
"fmt"
6+
"sync/atomic"
67
"testing"
78
"time"
89

@@ -23,7 +24,7 @@ type EventsTestSuite struct {
2324

2425
type MessageToTest struct {
2526
Service *frame.Service
26-
Count int
27+
Count atomic.Int64
2728
}
2829

2930
func (event *MessageToTest) Name() string {
@@ -46,7 +47,7 @@ func (event *MessageToTest) Execute(ctx context.Context, payload any) error {
4647
message := *m
4748
logger := event.Service.Log(ctx).WithField("payload", message).WithField("type", event.Name())
4849
logger.Info("handling event")
49-
event.Count++
50+
event.Count.Add(1)
5051
return nil
5152
}
5253

@@ -200,7 +201,8 @@ func (s *EventsTestSuite) TestServiceEventsPublishingWorks() {
200201
frametests.WithNoopDriver(),
201202
)
202203

203-
testEvent := MessageToTest{Service: svc, Count: tc.initialCount}
204+
testEvent := MessageToTest{Service: svc}
205+
testEvent.Count.Store(int64(tc.initialCount))
204206
events := frame.WithRegisterEvents(&testEvent)
205207

206208
svc.Init(ctx, events)
@@ -215,8 +217,8 @@ func (s *EventsTestSuite) TestServiceEventsPublishingWorks() {
215217
time.Sleep(2 * time.Second)
216218
require.Equal(
217219
t,
218-
tc.expectedCount,
219-
testEvent.Count,
220+
int64(tc.expectedCount),
221+
testEvent.Count.Load(),
220222
"event should be processed and count incremented",
221223
)
222224
}

events/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package events
33
import (
44
"context"
55
"errors"
6+
"sync"
67

78
"github.com/pitabwire/util"
89

@@ -14,14 +15,19 @@ type manager struct {
1415
qm queue.Manager
1516
cfg config.ConfigurationEvents
1617

18+
mu sync.RWMutex
1719
eventRegistry map[string]EventI
1820
}
1921

2022
func (m *manager) Add(evt EventI) {
23+
m.mu.Lock()
24+
defer m.mu.Unlock()
2125
m.eventRegistry[evt.Name()] = evt
2226
}
2327

2428
func (m *manager) Get(eventName string) (EventI, error) {
29+
m.mu.RLock()
30+
defer m.mu.RUnlock()
2531
evt, ok := m.eventRegistry[eventName]
2632
if !ok {
2733
return nil, errors.New("event not found in registry: " + eventName)

frametests/driver.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net"
66
"net/http"
77
"net/http/httptest"
8+
"sync"
89

910
"github.com/pitabwire/util"
1011

@@ -28,25 +29,34 @@ func GetFreePort(ctx context.Context) (int, error) {
2829
}
2930

3031
type testDriver struct {
32+
mu sync.RWMutex
3133
srv *httptest.Server
3234
}
3335

3436
func (t *testDriver) ListenAndServe(_ string, h http.Handler) error {
37+
t.mu.Lock()
38+
defer t.mu.Unlock()
3539
t.srv = httptest.NewServer(h)
3640

3741
return nil
3842
}
3943
func (t *testDriver) ListenAndServeTLS(_, _, _ string, h http.Handler) error {
44+
t.mu.Lock()
45+
defer t.mu.Unlock()
4046
t.srv = httptest.NewTLSServer(h)
4147
return nil
4248
}
4349

4450
func (t *testDriver) Shutdown(_ context.Context) error {
51+
t.mu.RLock()
52+
defer t.mu.RUnlock()
4553
t.srv.Close()
4654
return nil
4755
}
4856

4957
func (t *testDriver) GetTestServer() *httptest.Server {
58+
t.mu.RLock()
59+
defer t.mu.RUnlock()
5060
return t.srv
5161
}
5262

go.mod

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ require (
1313
github.com/exaring/otelpgx v0.10.0
1414
github.com/golang-jwt/jwt/v5 v5.3.1
1515
github.com/jackc/pgx/v5 v5.8.0
16-
github.com/lmittmann/tint v1.1.2
16+
github.com/lmittmann/tint v1.1.3
1717
github.com/nats-io/nats.go v1.48.0
1818
github.com/nicksnyder/go-i18n/v2 v2.6.1
1919
github.com/panjf2000/ants/v2 v2.11.4
20-
github.com/pitabwire/natspubsub v0.7.11
20+
github.com/pitabwire/natspubsub v0.7.12
2121
github.com/pitabwire/util v0.4.0
2222
github.com/redis/go-redis/v9 v9.17.3
2323
github.com/rs/xid v1.6.0
@@ -28,17 +28,17 @@ require (
2828
github.com/testcontainers/testcontainers-go/modules/postgres v0.40.0
2929
github.com/testcontainers/testcontainers-go/modules/valkey v0.40.0
3030
github.com/valkey-io/valkey-go v1.0.71
31-
go.opentelemetry.io/contrib/bridges/otelslog v0.14.0
32-
go.opentelemetry.io/contrib/exporters/autoexport v0.64.0
33-
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0
34-
go.opentelemetry.io/contrib/propagators/autoprop v0.64.0
35-
go.opentelemetry.io/otel v1.39.0
36-
go.opentelemetry.io/otel/log v0.15.0
37-
go.opentelemetry.io/otel/metric v1.39.0
38-
go.opentelemetry.io/otel/sdk v1.39.0
39-
go.opentelemetry.io/otel/sdk/log v0.15.0
40-
go.opentelemetry.io/otel/sdk/metric v1.39.0
41-
go.opentelemetry.io/otel/trace v1.39.0
31+
go.opentelemetry.io/contrib/bridges/otelslog v0.15.0
32+
go.opentelemetry.io/contrib/exporters/autoexport v0.65.0
33+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.65.0
34+
go.opentelemetry.io/contrib/propagators/autoprop v0.65.0
35+
go.opentelemetry.io/otel v1.40.0
36+
go.opentelemetry.io/otel/log v0.16.0
37+
go.opentelemetry.io/otel/metric v1.40.0
38+
go.opentelemetry.io/otel/sdk v1.40.0
39+
go.opentelemetry.io/otel/sdk/log v0.16.0
40+
go.opentelemetry.io/otel/sdk/metric v1.40.0
41+
go.opentelemetry.io/otel/trace v1.40.0
4242
gocloud.dev v0.44.0
4343
golang.org/x/net v0.49.0
4444
golang.org/x/oauth2 v0.34.0
@@ -76,7 +76,7 @@ require (
7676
github.com/go-ole/go-ole v1.3.0 // indirect
7777
github.com/google/cel-go v0.27.0 // indirect
7878
github.com/google/uuid v1.6.0 // indirect
79-
github.com/googleapis/gax-go/v2 v2.16.0 // indirect
79+
github.com/googleapis/gax-go/v2 v2.17.0 // indirect
8080
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.7 // indirect
8181
github.com/jackc/pgpassfile v1.0.0 // indirect
8282
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
@@ -114,32 +114,32 @@ require (
114114
github.com/tklauser/numcpus v0.11.0 // indirect
115115
github.com/yusufpapurcu/wmi v1.2.4 // indirect
116116
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
117-
go.opentelemetry.io/contrib/bridges/prometheus v0.64.0 // indirect
118-
go.opentelemetry.io/contrib/propagators/aws v1.39.0 // indirect
119-
go.opentelemetry.io/contrib/propagators/b3 v1.39.0 // indirect
120-
go.opentelemetry.io/contrib/propagators/jaeger v1.39.0 // indirect
121-
go.opentelemetry.io/contrib/propagators/ot v1.39.0 // indirect
122-
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.15.0 // indirect
123-
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.15.0 // indirect
124-
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.39.0 // indirect
125-
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.39.0 // indirect
126-
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
127-
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect
128-
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 // indirect
129-
go.opentelemetry.io/otel/exporters/prometheus v0.61.0 // indirect
130-
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.15.0 // indirect
131-
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.39.0 // indirect
132-
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0 // indirect
117+
go.opentelemetry.io/contrib/bridges/prometheus v0.65.0 // indirect
118+
go.opentelemetry.io/contrib/propagators/aws v1.40.0 // indirect
119+
go.opentelemetry.io/contrib/propagators/b3 v1.40.0 // indirect
120+
go.opentelemetry.io/contrib/propagators/jaeger v1.40.0 // indirect
121+
go.opentelemetry.io/contrib/propagators/ot v1.40.0 // indirect
122+
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.16.0 // indirect
123+
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.16.0 // indirect
124+
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.40.0 // indirect
125+
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.40.0 // indirect
126+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect
127+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 // indirect
128+
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 // indirect
129+
go.opentelemetry.io/otel/exporters/prometheus v0.62.0 // indirect
130+
go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.16.0 // indirect
131+
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.40.0 // indirect
132+
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.40.0 // indirect
133133
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
134134
go.uber.org/multierr v1.11.0 // indirect
135135
go.yaml.in/yaml/v2 v2.4.3 // indirect
136136
golang.org/x/crypto v0.47.0 // indirect
137-
golang.org/x/exp v0.0.0-20251219203646-944ab1f22d93 // indirect
137+
golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect
138138
golang.org/x/sync v0.19.0 // indirect
139139
golang.org/x/sys v0.40.0 // indirect
140140
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
141141
google.golang.org/api v0.264.0 // indirect
142-
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect
143-
google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect
142+
google.golang.org/genproto/googleapis/api v0.0.0-20260203192932-546029d2fa20 // indirect
143+
google.golang.org/genproto/googleapis/rpc v0.0.0-20260203192932-546029d2fa20 // indirect
144144
gopkg.in/yaml.v3 v3.0.1 // indirect
145145
)

0 commit comments

Comments
 (0)