Skip to content

Commit 0b68491

Browse files
jkawanJenita
andauthored
Added timeout for kupo client (#430)
Signed-off-by: Jenita <[email protected]> Co-authored-by: Jenita <[email protected]>
1 parent e96f4b9 commit 0b68491

File tree

2 files changed

+133
-14
lines changed

2 files changed

+133
-14
lines changed

input/chainsync/chainsync.go

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"log/slog"
2323
"math"
2424
"net/http"
25+
"net/url"
2526
"slices"
2627
"strings"
2728
"time"
@@ -43,6 +44,7 @@ const (
4344
cursorCacheSize = 20
4445

4546
maxAutoReconnectDelay = 60 * time.Second
47+
defaultKupoTimeout = 30 * time.Second
4648
)
4749

4850
type ChainSync struct {
@@ -507,41 +509,55 @@ func getKupoClient(c *ChainSync) (*kugo.Client, error) {
507509
return c.kupoClient, nil
508510
}
509511

512+
// Validate URL first
513+
_, err := url.ParseRequestURI(c.kupoUrl)
514+
if err != nil {
515+
return nil, fmt.Errorf("invalid kupo URL: %w", err)
516+
}
517+
510518
KugoCustomLogger := logging.NewKugoCustomLogger(logging.LevelInfo)
511519

520+
// Create client with timeout
512521
k := kugo.New(
513522
kugo.WithEndpoint(c.kupoUrl),
514523
kugo.WithLogger(KugoCustomLogger),
524+
kugo.WithTimeout(defaultKupoTimeout),
515525
)
516526

517-
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
518-
defer cancel()
527+
httpClient := &http.Client{
528+
Timeout: 2 * time.Second,
529+
}
519530

520531
healthUrl := strings.TrimRight(c.kupoUrl, "/") + "/health"
532+
533+
// Create context with timeout
534+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
535+
defer cancel()
536+
521537
req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthUrl, nil)
522538
if err != nil {
523539
return nil, fmt.Errorf("failed to create health check request: %w", err)
524540
}
525541

526-
client := &http.Client{}
527-
resp, err := client.Do(req)
542+
resp, err := httpClient.Do(req)
528543
if err != nil {
529-
return nil, fmt.Errorf("failed to perform health check: %w", err)
530-
}
531-
if resp == nil {
532-
return nil, errors.New("health check response empty, aborting")
544+
// Handle different error types
545+
switch {
546+
case errors.Is(err, context.DeadlineExceeded):
547+
return nil, errors.New("kupo health check timed out after 3 seconds")
548+
case strings.Contains(err.Error(), "no such host"):
549+
return nil, fmt.Errorf("failed to resolve kupo host: %w", err)
550+
default:
551+
return nil, fmt.Errorf("failed to perform health check: %w", err)
552+
}
533553
}
534554
defer resp.Body.Close()
535555

536556
if resp.StatusCode != http.StatusOK {
537-
return nil, fmt.Errorf(
538-
"health check failed with status code: %d",
539-
resp.StatusCode,
540-
)
557+
return nil, fmt.Errorf("health check failed with status code: %d", resp.StatusCode)
541558
}
542559

543560
c.kupoClient = k
544-
545561
return k, nil
546562
}
547563

@@ -564,10 +580,18 @@ func resolveTransactionInputs(
564580
// Extract transaction ID and index from the input
565581
txId := input.Id().String()
566582
txIndex := int(input.Index())
567-
matches, err := k.Matches(context.Background(),
583+
584+
// Add timeout for matches query
585+
ctx, cancel := context.WithTimeout(context.Background(), defaultKupoTimeout)
586+
defer cancel()
587+
588+
matches, err := k.Matches(ctx,
568589
kugo.TxOut(chainsync.NewTxID(txId, txIndex)),
569590
)
570591
if err != nil {
592+
if errors.Is(err, context.DeadlineExceeded) {
593+
return nil, fmt.Errorf("kupo matches query timed out after %v", defaultKupoTimeout)
594+
}
571595
return nil, fmt.Errorf(
572596
"error fetching matches for input TxId: %s, Index: %d. Error: %w",
573597
txId,

input/chainsync/chainsync_test.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@ package chainsync
22

33
import (
44
"encoding/hex"
5+
//"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"strings"
59
"testing"
610
"time"
711

12+
"github.com/SundaeSwap-finance/kugo"
813
"github.com/blinklabs-io/adder/event"
914
"github.com/blinklabs-io/gouroboros/protocol/chainsync"
1015
ocommon "github.com/blinklabs-io/gouroboros/protocol/common"
1116
"github.com/stretchr/testify/assert"
17+
"github.com/stretchr/testify/require"
1218
)
1319

1420
func TestHandleRollBackward(t *testing.T) {
@@ -68,3 +74,92 @@ func TestHandleRollBackward(t *testing.T) {
6874
assert.Equal(t, uint64(67890), c.status.TipSlotNumber)
6975
assert.Equal(t, "060708090a", c.status.TipBlockHash)
7076
}
77+
78+
func TestGetKupoClient(t *testing.T) {
79+
// Setup test server
80+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
81+
if r.URL.Path == "/health" {
82+
w.WriteHeader(http.StatusOK)
83+
return
84+
}
85+
w.WriteHeader(http.StatusNotFound)
86+
}))
87+
defer ts.Close()
88+
89+
t.Run("successful client creation", func(t *testing.T) {
90+
c := &ChainSync{
91+
kupoUrl: ts.URL,
92+
}
93+
94+
client, err := getKupoClient(c)
95+
require.NoError(t, err)
96+
assert.NotNil(t, client)
97+
assert.NotNil(t, c.kupoClient)
98+
})
99+
100+
t.Run("returns cached client", func(t *testing.T) {
101+
mockClient := &kugo.Client{}
102+
c := &ChainSync{
103+
kupoUrl: ts.URL,
104+
kupoClient: mockClient,
105+
}
106+
107+
client, err := getKupoClient(c)
108+
require.NoError(t, err)
109+
assert.Same(t, mockClient, client)
110+
})
111+
112+
t.Run("health check timeout", func(t *testing.T) {
113+
slowTS := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
114+
time.Sleep(4 * time.Second) // Longer than the 3s context timeout
115+
w.WriteHeader(http.StatusOK)
116+
}))
117+
defer slowTS.Close()
118+
119+
c := &ChainSync{
120+
kupoUrl: slowTS.URL,
121+
}
122+
123+
_, err := getKupoClient(c)
124+
require.Error(t, err)
125+
assert.Contains(t, err.Error(), "kupo health check timed out after 3 seconds")
126+
})
127+
128+
t.Run("failed health check status", func(t *testing.T) {
129+
failTS := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
130+
w.WriteHeader(http.StatusInternalServerError)
131+
}))
132+
defer failTS.Close()
133+
134+
c := &ChainSync{
135+
kupoUrl: failTS.URL,
136+
}
137+
138+
_, err := getKupoClient(c)
139+
require.Error(t, err)
140+
assert.Contains(t, err.Error(), "health check failed with status code: 500")
141+
})
142+
143+
t.Run("malformed URL", func(t *testing.T) {
144+
c := &ChainSync{
145+
kupoUrl: "http://invalid url",
146+
}
147+
148+
_, err := getKupoClient(c)
149+
require.Error(t, err)
150+
assert.Contains(t, err.Error(), "invalid kupo URL")
151+
})
152+
153+
t.Run("unreachable host", func(t *testing.T) {
154+
c := &ChainSync{
155+
kupoUrl: "http://unreachable-host.invalid",
156+
}
157+
158+
_, err := getKupoClient(c)
159+
require.Error(t, err)
160+
assert.True(t,
161+
strings.Contains(err.Error(), "failed to resolve kupo host") ||
162+
strings.Contains(err.Error(), "failed to perform health check"),
163+
"unexpected error: %v", err)
164+
})
165+
}

0 commit comments

Comments
 (0)