Skip to content

Commit 85c98b9

Browse files
ha mode fix for go client
1 parent 3e848b8 commit 85c98b9

File tree

3 files changed

+97
-4
lines changed

3 files changed

+97
-4
lines changed

go/stream.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,24 @@ func (c *client) newStream(ctx context.Context, httpClient *http.Client, feedIDs
123123
// and ws ha is enabled
124124
if len(origins) > 0 && c.config.WsHA {
125125
c.config.logDebug("client: attempting to connect websockets in HA mode")
126+
var errs []error
126127
for x := 0; x < len(origins); x++ {
128+
s.stats.configuredConnections.Add(1)
127129
conn, err := s.newWSconn(ctx, origins[x])
128130
if err != nil {
129-
return nil, err
131+
c.config.logInfo("client: failed to connect to origin %s: %s", origins[x], err)
132+
errs = append(errs, fmt.Errorf("origin %s: %w", origins[x], err))
133+
continue
130134
}
131135
go s.monitorConn(conn)
132136
s.conns = append(s.conns, conn)
133-
s.stats.configuredConnections.Add(1)
134137
}
138+
139+
// Only fail if we couldn't connect to ANY origins
140+
if len(s.conns) == 0 {
141+
return nil, fmt.Errorf("failed to connect to any origins in HA mode: %v", errs)
142+
}
143+
c.config.logInfo("client: connected to %d out of %d origins in HA mode", len(s.conns), len(origins))
135144
} else {
136145
conn, err := s.newWSconn(ctx, "")
137146
if err != nil {

go/stream_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,3 +717,87 @@ func TestClient_StreamCustomHeader(t *testing.T) {
717717
}
718718

719719
}
720+
721+
// TestClient_StreamHA_OneOriginDown tests that when in HA mode with multiple origins,
722+
// if one origin is down during initial connection, the stream should still be created
723+
func TestClient_StreamHA_OneOriginDown(t *testing.T) {
724+
connectAttempts := &atomic.Uint64{}
725+
726+
ms := newMockServer(func(w http.ResponseWriter, r *http.Request) {
727+
if r.Method == http.MethodHead {
728+
w.Header().Add(cllAvailOriginsHeader, "{001,002}")
729+
w.WriteHeader(200)
730+
return
731+
}
732+
733+
if r.URL.Path != apiV1WS {
734+
t.Errorf("expected path %s, got %s", apiV1WS, r.URL.Path)
735+
}
736+
737+
origin := r.Header.Get(cllOriginHeader)
738+
connectAttempts.Add(1)
739+
740+
// Simulate origin 002 being down by timing out the connection
741+
if origin == "002" {
742+
w.WriteHeader(http.StatusGatewayTimeout)
743+
return
744+
}
745+
746+
// Origin 001 works fine
747+
conn, err := websocket.Accept(
748+
w, r, &websocket.AcceptOptions{CompressionMode: websocket.CompressionContextTakeover},
749+
)
750+
751+
if err != nil {
752+
t.Fatalf("error accepting connection: %s", err)
753+
}
754+
defer func() { _ = conn.CloseNow() }()
755+
756+
// Keep the connection alive for testing
757+
for conn.Ping(context.Background()) == nil {
758+
time.Sleep(100 * time.Millisecond)
759+
}
760+
})
761+
defer ms.Close()
762+
763+
streamsClient, err := ms.Client()
764+
if err != nil {
765+
t.Fatalf("error creating client %s", err)
766+
}
767+
768+
cc := streamsClient.(*client)
769+
cc.config.Logger = LogPrintf
770+
cc.config.LogDebug = true
771+
cc.config.WsHA = true
772+
773+
// Attempt to create a stream - this should succeed with origin 001 even though 002 is down
774+
sub, err := streamsClient.Stream(context.Background(), []feed.ID{feed1, feed2})
775+
776+
// In HA mode, the stream should succeed even if one origin is down
777+
if err != nil {
778+
t.Errorf("Stream creation failed: %v", err)
779+
t.Errorf("BUG DETECTED: In HA mode, stream should succeed with available origins")
780+
t.Errorf("Connect attempts made: %d", connectAttempts.Load())
781+
t.Errorf("Expected: Stream should succeed with 1 active connection from origin 001")
782+
t.Errorf("Actual: Stream creation failed completely when origin 002 was unavailable")
783+
t.Fatalf("Test reveals bug: HA mode is not resilient to individual origin failures during initial connection")
784+
}
785+
defer sub.Close()
786+
787+
// Give connections time to establish
788+
time.Sleep(200 * time.Millisecond)
789+
790+
stats := sub.Stats()
791+
792+
// In HA mode with 2 origins configured but 1 down, we should have:
793+
// - ConfiguredConnections: 2 (we tried to connect to both)
794+
// - ActiveConnections: 1 (only origin 001 is up)
795+
if stats.ConfiguredConnections != 2 {
796+
t.Errorf("expected 2 configured connections, got %d", stats.ConfiguredConnections)
797+
}
798+
799+
if stats.ActiveConnections != 1 {
800+
t.Errorf("expected 1 active connection, got %d", stats.ActiveConnections)
801+
}
802+
803+
}

typescript/examples/stream-reports.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ async function main() {
3333
const client = createClient({
3434
apiKey: process.env.API_KEY || "YOUR_API_KEY",
3535
userSecret: process.env.USER_SECRET || "YOUR_USER_SECRET",
36-
endpoint: "https://api.dataengine.chain.link",
37-
wsEndpoint: "wss://ws.dataengine.chain.link",
36+
endpoint: process.env.REST_URL || "https://api.dataengine.chain.link",
37+
wsEndpoint: process.env.WS_URL || "wss://ws.dataengine.chain.link",
3838
haMode,
3939

4040
// Comment to disable SDK logging:

0 commit comments

Comments
 (0)