Skip to content

Commit bb16cb3

Browse files
author
Dimy Jeannot
committed
feat: updated spec with more network context; updated edge routing logic; added additional supported headers; retry for nats
1 parent 7aa4fed commit bb16cb3

File tree

10 files changed

+655
-404
lines changed

10 files changed

+655
-404
lines changed

go/oeco-sdk/v2beta/CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
### 🩹 Fixes
44

5-
- added additional error handling; preparing for batch handling ([0fa903f](https://github.com/openecosystems/ecosystem/commit/0fa903f))
5+
- added additional error handling; preparing for batch handling ([0fa903f](https://github.com/openecosystems/ecosystem/commit/0fa903f))
66

77
### ❤️ Thank You
88

9-
- Dimy Jeannot
9+
- Dimy Jeannot
1010

1111
## 0.20.4 (2025-09-21)
1212

go/oeco-sdk/v2beta/bindings/nats/binding.go

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,19 +140,27 @@ func (b *Binding) Bind(_ context.Context, bindings *sdkv2betalib.Bindings) *sdkv
140140
apexlog.Info("Connection closed.")
141141
}))
142142

143-
_nats, err := nats.Connect(servers, natsOptions...)
143+
maxRetries := 180 // -1 for infinite retries
144+
retryDelay := 10 * time.Second // wait 3 seconds between attempts
145+
146+
_nats, js, err := connectWithRetry(servers, maxRetries, retryDelay, natsOptions...)
144147
if err != nil {
145-
fmt.Println(err.Error())
146-
panic("Cannot connect to NATS")
148+
panic(err)
147149
}
150+
151+
//_nats, err := nats.Connect(servers, natsOptions...)
152+
//if err != nil {
153+
// fmt.Println(err.Error())
154+
// panic("Cannot connect to NATS")
155+
//}
148156
b.Nats = _nats
149157

150158
// Create a JetStream management interface
151-
js, err := jetstream.New(_nats)
152-
if err != nil {
153-
fmt.Println(err.Error())
154-
panic("Cannot configure Jetstream")
155-
}
159+
//js, err := jetstream.New(_nats)
160+
//if err != nil {
161+
// fmt.Println(err.Error())
162+
// panic("Cannot configure Jetstream")
163+
//}
156164
b.JetStream = &js
157165

158166
Bound = &Binding{
@@ -261,3 +269,33 @@ func (b *Binding) RegisterSpecBatchListeners(bindings *sdkv2betalib.Bindings) *s
261269

262270
return bindings
263271
}
272+
273+
func connectWithRetry(url string, maxRetries int, retryDelay time.Duration, options ...nats.Option) (*nats.Conn, jetstream.JetStream, error) {
274+
var nc *nats.Conn
275+
var js jetstream.JetStream
276+
var err error
277+
278+
for attempt := 1; maxRetries == -1 || attempt <= maxRetries; attempt++ {
279+
fmt.Printf("Connecting to NATS (attempt %d)...\n", attempt)
280+
281+
nc, err = nats.Connect(url)
282+
if err != nil {
283+
fmt.Printf("NATS connection failed: %v\n", err)
284+
time.Sleep(retryDelay)
285+
continue
286+
}
287+
288+
js, err = jetstream.New(nc)
289+
if err != nil {
290+
fmt.Printf("JetStream setup failed: %v\n", err)
291+
nc.Close()
292+
time.Sleep(retryDelay)
293+
continue
294+
}
295+
296+
fmt.Println("Connected to NATS and JetStream is ready.")
297+
return nc, js, nil
298+
}
299+
300+
return nil, nil, fmt.Errorf("failed to connect to NATS after %d attempts", maxRetries)
301+
}

go/oeco-sdk/v2beta/edge.go

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -123,36 +123,65 @@ var allowedRequestHeaders = map[string]struct{}{
123123
AccessControlMaxAge: {},
124124
AccessControlRequestHeaders: {},
125125
AccessControlRequestMethod: {},
126-
XCorrelationId: {},
127-
XClientTraceId: {},
128-
XB3Traceid: {},
129-
XB3Spanid: {},
130-
XB3Parentspanid: {},
131-
XB3Sampled: {},
132-
XB3Flags: {},
133-
ApiKey: {},
134-
XSpecRouterKey: {},
135-
XSpecEnvironment: {},
136-
XSpecPlatformHost: {},
137-
FieldMask: {},
138-
SentAtKey: {},
139-
LocaleKey: {},
140-
TimezoneKey: {},
141-
ValidateOnlyKey: {},
142-
DeviceIdKey: {},
143-
DeviceAdvertisingIdKey: {},
144-
DeviceManufacturerKey: {},
145-
DeviceModelKey: {},
146-
DeviceNameKey: {},
147-
DeviceTypeKey: {},
148-
DeviceTokenKey: {},
149-
BluetoothKey: {},
150-
CellularKey: {},
151-
WifiKey: {},
152-
CarrierKey: {},
153-
OsNameKey: {},
154-
OsVersionKey: {},
155-
EcosystemID: {},
126+
XForwardedFor: {},
127+
128+
XSpecRouterKey: {},
129+
XSpecEnvironment: {},
130+
XSpecPlatformHost: {},
131+
132+
RequestIdKey: {},
133+
XCorrelationId: {},
134+
XClientTraceId: {},
135+
XB3Traceid: {},
136+
XB3Spanid: {},
137+
XB3Parentspanid: {},
138+
XB3Sampled: {},
139+
XB3Flags: {},
140+
141+
ApiKey: {},
142+
FieldMask: {},
143+
ValidateOnlyKey: {},
144+
145+
SentAtKey: {},
146+
LocaleKey: {},
147+
TimezoneKey: {},
148+
IpKey: {},
149+
150+
OrganizationID: {},
151+
OrganizationSlug: {},
152+
EcosystemID: {},
153+
EcosystemSlug: {},
154+
JurisdictionAreaNetworkKey: {},
155+
156+
ContinentKey: {},
157+
CountryKey: {},
158+
IsEUCountryKey: {},
159+
CityKey: {},
160+
RegionKey: {},
161+
RegionCodeKey: {},
162+
LatitudeKey: {},
163+
LongitudeKey: {},
164+
PostalCodeKey: {},
165+
MetroCodeKey: {},
166+
SpeedKey: {},
167+
168+
DeviceIdKey: {},
169+
DeviceAdvertisingIdKey: {},
170+
DeviceManufacturerKey: {},
171+
DeviceModelKey: {},
172+
DeviceNameKey: {},
173+
DeviceTypeKey: {},
174+
DeviceTokenKey: {},
175+
176+
BluetoothKey: {},
177+
CellularKey: {},
178+
WifiKey: {},
179+
CarrierKey: {},
180+
AsnKey: {},
181+
AsnOrganizationKey: {},
182+
183+
OsNameKey: {},
184+
OsVersionKey: {},
156185
}
157186

158187
func sanitizeRequestHeaders(req *http.Request) {

go/oeco-sdk/v2beta/error.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,11 @@ func (se *SpecError) WithSpecDetail(spec *specv2pb.Spec) SpecErrorable {
267267
return se
268268
}
269269

270-
se.ConnectErr.AddDetail(d)
271-
return se
270+
newErr := *se // shallow copy
271+
newErr.ConnectErr = *connect.NewError(se.ConnectErr.Code(), se.ConnectErr.Unwrap())
272+
newErr.ConnectErr.AddDetail(d)
273+
// se.ConnectErr.AddDetail(d)
274+
return &newErr
272275
}
273276

274277
// WithInternalErrorDetail sets internal error details for the SpecError instance and returns the updated SpecError object.

0 commit comments

Comments
 (0)