Skip to content

Commit eef52ee

Browse files
authored
Merge pull request #113 from thushan/feature/anthropic-agents
feature: Anthropic agent fixes and improvements
2 parents 7649f80 + 3d00f3b commit eef52ee

File tree

13 files changed

+774
-55
lines changed

13 files changed

+774
-55
lines changed

internal/adapter/proxy/config/unified.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,11 @@ const (
1414
DefaultKeepAlive = 60 * time.Second
1515

1616
// Olla-specific defaults for high-performance
17-
OllaDefaultStreamBufferSize = 64 * 1024 // Larger buffer for better streaming performance
18-
OllaDefaultMaxIdleConns = 100
19-
OllaDefaultMaxConnsPerHost = 50
20-
OllaDefaultIdleConnTimeout = 90 * time.Second
17+
OllaDefaultStreamBufferSize = 64 * 1024 // Larger buffer for better streaming performance
18+
OllaDefaultMaxIdleConns = 100
19+
OllaDefaultMaxConnsPerHost = 50
20+
OllaDefaultMaxIdleConnsPerHost = 25 // Half of MaxConnsPerHost; idle slots rarely need to match total capacity
21+
OllaDefaultIdleConnTimeout = 90 * time.Second
2122
// Olla uses 30s timeouts for faster failure detection in AI workloads
2223
OllaDefaultTimeout = 30 * time.Second
2324
OllaDefaultKeepAlive = 30 * time.Second
@@ -118,9 +119,10 @@ type OllaConfig struct {
118119
BaseProxyConfig
119120

120121
// Olla-specific fields for advanced connection pooling
121-
IdleConnTimeout time.Duration
122-
MaxIdleConns int
123-
MaxConnsPerHost int
122+
IdleConnTimeout time.Duration
123+
MaxIdleConns int
124+
MaxConnsPerHost int
125+
MaxIdleConnsPerHost int
124126
}
125127

126128
// GetStreamBufferSize returns the stream buffer size, defaulting to OllaDefaultStreamBufferSize for better performance
@@ -155,6 +157,14 @@ func (c *OllaConfig) GetMaxConnsPerHost() int {
155157
return c.MaxConnsPerHost
156158
}
157159

160+
// GetMaxIdleConnsPerHost returns the maximum idle connections per host, defaulting to OllaDefaultMaxIdleConnsPerHost
161+
func (c *OllaConfig) GetMaxIdleConnsPerHost() int {
162+
if c.MaxIdleConnsPerHost == 0 {
163+
return OllaDefaultMaxIdleConnsPerHost
164+
}
165+
return c.MaxIdleConnsPerHost
166+
}
167+
158168
// GetConnectionTimeout returns the connection timeout, defaulting to OllaDefaultTimeout (30s for faster failure detection)
159169
func (c *OllaConfig) GetConnectionTimeout() time.Duration {
160170
if c.ConnectionTimeout == 0 {

internal/adapter/proxy/olla/service.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ func NewService(
150150
if configuration.MaxConnsPerHost == 0 {
151151
configuration.MaxConnsPerHost = config.OllaDefaultMaxConnsPerHost
152152
}
153+
if configuration.MaxIdleConnsPerHost == 0 {
154+
configuration.MaxIdleConnsPerHost = config.OllaDefaultMaxIdleConnsPerHost
155+
}
153156
if configuration.IdleConnTimeout == 0 {
154157
configuration.IdleConnTimeout = config.OllaDefaultIdleConnTimeout
155158
}
@@ -215,7 +218,8 @@ func NewService(
215218
func createOptimisedTransport(config *Configuration) *http.Transport {
216219
return &http.Transport{
217220
MaxIdleConns: config.MaxIdleConns,
218-
MaxIdleConnsPerHost: config.MaxConnsPerHost,
221+
MaxIdleConnsPerHost: config.MaxIdleConnsPerHost,
222+
MaxConnsPerHost: config.MaxConnsPerHost,
219223
IdleConnTimeout: config.IdleConnTimeout,
220224
TLSHandshakeTimeout: DefaultTLSHandshakeTimeout,
221225
DisableCompression: true,
@@ -693,11 +697,13 @@ func (s *Service) UpdateConfig(config ports.ProxyConfiguration) {
693697
newConfig.MaxIdleConns = ollaConfig.MaxIdleConns
694698
newConfig.IdleConnTimeout = ollaConfig.IdleConnTimeout
695699
newConfig.MaxConnsPerHost = ollaConfig.MaxConnsPerHost
700+
newConfig.MaxIdleConnsPerHost = ollaConfig.MaxIdleConnsPerHost
696701
} else {
697702
// fallback: preserve current Olla-specific settings for non-Olla configs
698703
newConfig.MaxIdleConns = s.configuration.MaxIdleConns
699704
newConfig.IdleConnTimeout = s.configuration.IdleConnTimeout
700705
newConfig.MaxConnsPerHost = s.configuration.MaxConnsPerHost
706+
newConfig.MaxIdleConnsPerHost = s.configuration.MaxIdleConnsPerHost
701707
}
702708

703709
// Update configuration atomically

internal/adapter/proxy/olla/service_retry.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ func (s *Service) proxyToSingleEndpoint(ctx context.Context, w http.ResponseWrit
6161
return fmt.Errorf("circuit breaker open for endpoint %s", endpoint.Name)
6262
}
6363

64-
s.Selector.IncrementConnections(endpoint)
65-
defer s.Selector.DecrementConnections(endpoint)
66-
6764
// Build target URL using common function that respects preserve_path
6865
targetURL := common.BuildTargetURL(r, endpoint, s.configuration.GetProxyPrefix())
6966
stats.TargetUrl = targetURL.String()
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package olla
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/thushan/olla/internal/adapter/proxy/config"
8+
)
9+
10+
// TestCreateOptimisedTransport_ConnectionLimits verifies that both MaxConnsPerHost and
11+
// MaxIdleConnsPerHost are mapped to their correct fields on http.Transport.
12+
// Previously MaxConnsPerHost was mistakenly written to MaxIdleConnsPerHost and
13+
// MaxConnsPerHost was never set (defaulting to 0 = unlimited).
14+
func TestCreateOptimisedTransport_ConnectionLimits(t *testing.T) {
15+
t.Parallel()
16+
17+
cfg := &Configuration{}
18+
cfg.MaxConnsPerHost = 42
19+
cfg.MaxIdleConnsPerHost = 17
20+
cfg.MaxIdleConns = 200
21+
cfg.IdleConnTimeout = 90 * time.Second
22+
23+
transport := createOptimisedTransport(cfg)
24+
25+
if transport.MaxConnsPerHost != 42 {
26+
t.Errorf("MaxConnsPerHost: want 42, got %d", transport.MaxConnsPerHost)
27+
}
28+
if transport.MaxIdleConnsPerHost != 17 {
29+
t.Errorf("MaxIdleConnsPerHost: want 17, got %d", transport.MaxIdleConnsPerHost)
30+
}
31+
if transport.MaxIdleConns != 200 {
32+
t.Errorf("MaxIdleConns: want 200, got %d", transport.MaxIdleConns)
33+
}
34+
}
35+
36+
// TestCreateOptimisedTransport_DefaultsApplied verifies that NewService fills in sensible
37+
// defaults before handing the config to createOptimisedTransport, so a zero-value config
38+
// never silently leaves MaxConnsPerHost unlimited.
39+
func TestCreateOptimisedTransport_DefaultsApplied(t *testing.T) {
40+
t.Parallel()
41+
42+
// Zero-value config — defaults should be filled in by NewService, but we can verify
43+
// the expected defaults are consistent with the package constants.
44+
cfg := &Configuration{}
45+
cfg.MaxConnsPerHost = config.OllaDefaultMaxConnsPerHost
46+
cfg.MaxIdleConnsPerHost = config.OllaDefaultMaxIdleConnsPerHost
47+
cfg.MaxIdleConns = config.OllaDefaultMaxIdleConns
48+
cfg.IdleConnTimeout = config.OllaDefaultIdleConnTimeout
49+
50+
transport := createOptimisedTransport(cfg)
51+
52+
if transport.MaxConnsPerHost != config.OllaDefaultMaxConnsPerHost {
53+
t.Errorf("MaxConnsPerHost: want %d, got %d", config.OllaDefaultMaxConnsPerHost, transport.MaxConnsPerHost)
54+
}
55+
if transport.MaxIdleConnsPerHost != config.OllaDefaultMaxIdleConnsPerHost {
56+
t.Errorf("MaxIdleConnsPerHost: want %d, got %d", config.OllaDefaultMaxIdleConnsPerHost, transport.MaxIdleConnsPerHost)
57+
}
58+
}
59+
60+
// TestCreateOptimisedTransport_FieldsAreDistinct guards against the specific regression
61+
// where MaxConnsPerHost value bled into MaxIdleConnsPerHost. Using distinct values
62+
// makes the mapping error immediately visible.
63+
func TestCreateOptimisedTransport_FieldsAreDistinct(t *testing.T) {
64+
t.Parallel()
65+
66+
cfg := &Configuration{}
67+
cfg.MaxConnsPerHost = 100
68+
cfg.MaxIdleConnsPerHost = 10
69+
cfg.MaxIdleConns = 500
70+
71+
transport := createOptimisedTransport(cfg)
72+
73+
// Regression guard: if the bug is reintroduced both fields get value 100.
74+
if transport.MaxConnsPerHost == transport.MaxIdleConnsPerHost {
75+
t.Errorf("MaxConnsPerHost (%d) and MaxIdleConnsPerHost (%d) are equal — likely a field mapping regression",
76+
transport.MaxConnsPerHost, transport.MaxIdleConnsPerHost)
77+
}
78+
if transport.MaxConnsPerHost != 100 {
79+
t.Errorf("MaxConnsPerHost: want 100, got %d", transport.MaxConnsPerHost)
80+
}
81+
if transport.MaxIdleConnsPerHost != 10 {
82+
t.Errorf("MaxIdleConnsPerHost: want 10, got %d", transport.MaxIdleConnsPerHost)
83+
}
84+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package proxy
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"net/http/httptest"
7+
"sync/atomic"
8+
"testing"
9+
"time"
10+
11+
"github.com/thushan/olla/internal/adapter/proxy/olla"
12+
"github.com/thushan/olla/internal/core/domain"
13+
)
14+
15+
// countingEndpointSelector tracks the number of Increment and Decrement calls
16+
// using atomic counters so the test is safe under concurrent execution.
17+
type countingEndpointSelector struct {
18+
incrementCalls atomic.Int64
19+
decrementCalls atomic.Int64
20+
endpoint *domain.Endpoint
21+
}
22+
23+
func (c *countingEndpointSelector) Select(_ context.Context, endpoints []*domain.Endpoint) (*domain.Endpoint, error) {
24+
if c.endpoint != nil {
25+
return c.endpoint, nil
26+
}
27+
if len(endpoints) > 0 {
28+
return endpoints[0], nil
29+
}
30+
return nil, nil
31+
}
32+
33+
func (c *countingEndpointSelector) Name() string { return "counting" }
34+
35+
func (c *countingEndpointSelector) IncrementConnections(_ *domain.Endpoint) {
36+
c.incrementCalls.Add(1)
37+
}
38+
39+
func (c *countingEndpointSelector) DecrementConnections(_ *domain.Endpoint) {
40+
c.decrementCalls.Add(1)
41+
}
42+
43+
// TestOllaProxy_ConnectionCountingNoDuplication verifies that a single successful proxy
44+
// attempt results in exactly one IncrementConnections call and one DecrementConnections
45+
// call. Before the fix, proxyToSingleEndpoint also incremented/decremented, producing
46+
// counts of two each.
47+
func TestOllaProxy_ConnectionCountingNoDuplication(t *testing.T) {
48+
t.Parallel()
49+
50+
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
51+
w.WriteHeader(http.StatusOK)
52+
w.Write([]byte(`{"ok":true}`))
53+
}))
54+
defer upstream.Close()
55+
56+
endpoint := createTestEndpoint("test-endpoint", upstream.URL, domain.StatusHealthy)
57+
58+
selector := &countingEndpointSelector{endpoint: endpoint}
59+
60+
config := &olla.Configuration{}
61+
config.ResponseTimeout = 5 * time.Second
62+
config.ReadTimeout = 2 * time.Second
63+
config.StreamBufferSize = 8192
64+
config.MaxIdleConns = 10
65+
config.IdleConnTimeout = 30 * time.Second
66+
config.MaxConnsPerHost = 5
67+
68+
proxy, err := olla.NewService(
69+
&mockDiscoveryService{endpoints: []*domain.Endpoint{endpoint}},
70+
selector,
71+
config,
72+
createTestStatsCollector(),
73+
nil,
74+
createTestLogger(),
75+
)
76+
if err != nil {
77+
t.Fatalf("failed to create Olla proxy: %v", err)
78+
}
79+
80+
req, stats, rlog := createTestRequestWithStats("POST", "/v1/chat/completions", `{"model":"test"}`)
81+
w := httptest.NewRecorder()
82+
83+
if err := proxy.ProxyRequestToEndpoints(req.Context(), w, req, []*domain.Endpoint{endpoint}, stats, rlog); err != nil {
84+
t.Fatalf("proxy request failed: %v", err)
85+
}
86+
87+
if got := selector.incrementCalls.Load(); got != 1 {
88+
t.Errorf("IncrementConnections called %d times; want exactly 1", got)
89+
}
90+
if got := selector.decrementCalls.Load(); got != 1 {
91+
t.Errorf("DecrementConnections called %d times; want exactly 1", got)
92+
}
93+
}
94+
95+
// TestOllaProxy_ConnectionCountReturnsToZero verifies that after a completed request
96+
// the net connection delta is zero — i.e. every increment is paired with a decrement.
97+
func TestOllaProxy_ConnectionCountReturnsToZero(t *testing.T) {
98+
t.Parallel()
99+
100+
upstream := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
101+
w.WriteHeader(http.StatusOK)
102+
w.Write([]byte(`{"ok":true}`))
103+
}))
104+
defer upstream.Close()
105+
106+
endpoint := createTestEndpoint("test-endpoint", upstream.URL, domain.StatusHealthy)
107+
selector := &countingEndpointSelector{endpoint: endpoint}
108+
109+
config := &olla.Configuration{}
110+
config.ResponseTimeout = 5 * time.Second
111+
config.ReadTimeout = 2 * time.Second
112+
config.StreamBufferSize = 8192
113+
config.MaxIdleConns = 10
114+
config.IdleConnTimeout = 30 * time.Second
115+
config.MaxConnsPerHost = 5
116+
117+
proxy, err := olla.NewService(
118+
&mockDiscoveryService{endpoints: []*domain.Endpoint{endpoint}},
119+
selector,
120+
config,
121+
createTestStatsCollector(),
122+
nil,
123+
createTestLogger(),
124+
)
125+
if err != nil {
126+
t.Fatalf("failed to create Olla proxy: %v", err)
127+
}
128+
129+
const requests = 5
130+
for i := 0; i < requests; i++ {
131+
req, stats, rlog := createTestRequestWithStats("POST", "/v1/chat/completions", `{"model":"test"}`)
132+
w := httptest.NewRecorder()
133+
if err := proxy.ProxyRequestToEndpoints(req.Context(), w, req, []*domain.Endpoint{endpoint}, stats, rlog); err != nil {
134+
t.Fatalf("request %d failed: %v", i+1, err)
135+
}
136+
}
137+
138+
inc := selector.incrementCalls.Load()
139+
dec := selector.decrementCalls.Load()
140+
141+
if inc != requests {
142+
t.Errorf("IncrementConnections called %d times; want %d", inc, requests)
143+
}
144+
if dec != requests {
145+
t.Errorf("DecrementConnections called %d times; want %d", dec, requests)
146+
}
147+
if net := inc - dec; net != 0 {
148+
t.Errorf("net connection delta is %d after all requests completed; want 0", net)
149+
}
150+
}

internal/adapter/proxy/sherpa/service_retry.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,11 @@ func (s *Service) ProxyRequestToEndpointsWithRetry(ctx context.Context, w http.R
5050
}
5151

5252
// proxyToSingleEndpoint executes the proxy request to a specific endpoint
53+
// Note: Connection increment/decrement is handled by RetryHandler.executeProxyAttempt
54+
// to avoid double-counting (see proxy_olla_connection_counting_test.go for context).
5355
func (s *Service) proxyToSingleEndpoint(ctx context.Context, w http.ResponseWriter, r *http.Request, endpoint *domain.Endpoint, stats *ports.RequestStats, rlog logger.StyledLogger) error {
5456
stats.EndpointName = endpoint.Name
5557

56-
s.Selector.IncrementConnections(endpoint)
57-
defer s.Selector.DecrementConnections(endpoint)
58-
5958
targetURL := common.BuildTargetURL(r, endpoint, s.configuration.GetProxyPrefix())
6059

6160
stats.TargetUrl = targetURL.String()

internal/adapter/translator/anthropic/types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ import "fmt"
55
// AnthropicRequest represents an Anthropic API request
66
// Maps to the Anthropic Messages API format
77
type AnthropicRequest struct {
8-
ToolChoice interface{} `json:"tool_choice,omitempty"` // string or object
9-
System interface{} `json:"system,omitempty"` // string or []ContentBlock
10-
Thinking interface{} `json:"thinking,omitempty"` // Extended thinking configuration
8+
ToolChoice interface{} `json:"tool_choice,omitempty"` // string or object
9+
System interface{} `json:"system,omitempty"` // string or []ContentBlock
10+
Thinking interface{} `json:"thinking,omitempty"` // Extended thinking configuration
11+
OutputConfig interface{} `json:"output_config,omitempty"` // Output configuration (effort, structured output format)
1112
Temperature *float64 `json:"temperature,omitempty"`
1213
TopP *float64 `json:"top_p,omitempty"`
1314
TopK *int `json:"top_k,omitempty"`

0 commit comments

Comments
 (0)