Skip to content

Commit ddd538b

Browse files
committed
feat: add consul resolver and related tests for service discovery #1241
1 parent 7ef888a commit ddd538b

File tree

4 files changed

+857
-32
lines changed

4 files changed

+857
-32
lines changed
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
package upstream
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"strings"
8+
"time"
9+
)
10+
11+
// ConsulResolver handles DNS resolution through consul
12+
type ConsulResolver struct {
13+
resolver string // e.g., "127.0.0.1:8600"
14+
}
15+
16+
// NewConsulResolver creates a new consul resolver
17+
func NewConsulResolver(resolver string) *ConsulResolver {
18+
return &ConsulResolver{
19+
resolver: resolver,
20+
}
21+
}
22+
23+
// ResolveService resolves a consul service to actual IP addresses and ports
24+
func (cr *ConsulResolver) ResolveService(serviceURL string) ([]string, error) {
25+
// Parse consul service URL (e.g., "service.consul service=redacted-net resolve")
26+
serviceName := cr.extractServiceName(serviceURL)
27+
if serviceName == "" {
28+
return nil, fmt.Errorf("could not extract service name from: %s", serviceURL)
29+
}
30+
31+
// Create a custom resolver that uses the consul DNS server
32+
dialer := &net.Dialer{
33+
Timeout: 5 * time.Second,
34+
}
35+
36+
resolver := &net.Resolver{
37+
PreferGo: true,
38+
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
39+
return dialer.DialContext(ctx, network, cr.resolver)
40+
},
41+
}
42+
43+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
44+
defer cancel()
45+
46+
// Query consul for service SRV records
47+
_, srvRecords, err := resolver.LookupSRV(ctx, "", "", serviceName+".service.consul")
48+
if err != nil {
49+
// Fallback to A record lookup if SRV fails
50+
ips, err := resolver.LookupIPAddr(ctx, serviceName+".service.consul")
51+
if err != nil {
52+
return nil, fmt.Errorf("failed to resolve service %s: %v", serviceName, err)
53+
}
54+
55+
// Return IP addresses with default port (80)
56+
var addresses []string
57+
for _, ip := range ips {
58+
addresses = append(addresses, fmt.Sprintf("%s:80", ip.IP.String()))
59+
}
60+
return addresses, nil
61+
}
62+
63+
// Convert SRV records to address:port format
64+
var addresses []string
65+
for _, srv := range srvRecords {
66+
// Resolve the target hostname to IP
67+
ips, err := resolver.LookupIPAddr(ctx, srv.Target)
68+
if err != nil {
69+
continue // Skip this record if resolution fails
70+
}
71+
72+
for _, ip := range ips {
73+
addresses = append(addresses, fmt.Sprintf("%s:%d", ip.IP.String(), srv.Port))
74+
}
75+
}
76+
77+
if len(addresses) == 0 {
78+
return nil, fmt.Errorf("no addresses found for service %s", serviceName)
79+
}
80+
81+
return addresses, nil
82+
}
83+
84+
// extractServiceName extracts the service name from consul service URL
85+
func (cr *ConsulResolver) extractServiceName(serviceURL string) string {
86+
serviceURL = strings.TrimSpace(serviceURL)
87+
88+
// Handle empty input
89+
if serviceURL == "" {
90+
return ""
91+
}
92+
93+
// Parse "service.consul service=redacted-net resolve" format
94+
if strings.Contains(serviceURL, "service=") {
95+
parts := strings.Fields(serviceURL)
96+
for _, part := range parts {
97+
if strings.HasPrefix(part, "service=") {
98+
serviceName := strings.TrimPrefix(part, "service=")
99+
// Handle edge cases like "service=" or "service= "
100+
serviceName = strings.TrimSpace(serviceName)
101+
if serviceName == "" {
102+
return ""
103+
}
104+
return serviceName
105+
}
106+
}
107+
}
108+
109+
// Fallback: try to extract from hostname format like "my-service.service.consul"
110+
if strings.Contains(serviceURL, ".service.consul") {
111+
parts := strings.Split(serviceURL, ".")
112+
if len(parts) > 0 {
113+
serviceName := strings.TrimSpace(parts[0])
114+
if serviceName == "" {
115+
return ""
116+
}
117+
return serviceName
118+
}
119+
}
120+
121+
return ""
122+
}
123+
124+
// TestConsulTargets performs availability test specifically for consul targets
125+
func TestConsulTargets(consulTargets []ProxyTarget) map[string]*Status {
126+
result := make(map[string]*Status)
127+
128+
// Group consul targets by resolver
129+
consulTargetsByResolver := make(map[string][]ProxyTarget)
130+
for _, target := range consulTargets {
131+
if target.Resolver != "" {
132+
consulTargetsByResolver[target.Resolver] = append(consulTargetsByResolver[target.Resolver], target)
133+
} else {
134+
// No resolver specified, mark as offline
135+
key := target.Host + ":" + target.Port
136+
result[key] = &Status{
137+
Online: false,
138+
Latency: 0,
139+
}
140+
}
141+
}
142+
143+
// Test each resolver group
144+
for resolver, targets := range consulTargetsByResolver {
145+
consulResolver := NewConsulResolver(resolver)
146+
147+
for _, target := range targets {
148+
key := target.Host + ":" + target.Port
149+
150+
// Try to resolve the consul service
151+
addresses, err := consulResolver.ResolveService(target.ServiceURL)
152+
if err != nil {
153+
// If resolution fails, mark as offline
154+
result[key] = &Status{
155+
Online: false,
156+
Latency: 0,
157+
}
158+
continue
159+
}
160+
161+
// Test the first resolved address as representative
162+
if len(addresses) > 0 {
163+
addressResults := AvailabilityTest(addresses[:1])
164+
165+
if status, exists := addressResults[addresses[0]]; exists {
166+
result[key] = status
167+
} else {
168+
result[key] = &Status{
169+
Online: false,
170+
Latency: 0,
171+
}
172+
}
173+
} else {
174+
result[key] = &Status{
175+
Online: false,
176+
Latency: 0,
177+
}
178+
}
179+
}
180+
}
181+
182+
return result
183+
}
184+
185+
// EnhancedAvailabilityTest performs availability test with consul resolution support
186+
// Deprecated: Use TestConsulTargets for consul targets and AvailabilityTest for regular targets
187+
func EnhancedAvailabilityTest(targets []ProxyTarget) map[string]*Status {
188+
result := make(map[string]*Status)
189+
190+
// Group targets by type
191+
consulTargets := make([]ProxyTarget, 0)
192+
regularTargets := make([]string, 0)
193+
194+
for _, target := range targets {
195+
if target.IsConsul && target.Resolver != "" {
196+
consulTargets = append(consulTargets, target)
197+
} else {
198+
// Regular target - use existing format for traditional AvailabilityTest
199+
key := target.Host + ":" + target.Port
200+
regularTargets = append(regularTargets, key)
201+
}
202+
}
203+
204+
// Use traditional AvailabilityTest for regular targets (more efficient)
205+
if len(regularTargets) > 0 {
206+
regularResults := AvailabilityTest(regularTargets)
207+
// Merge results
208+
for k, v := range regularResults {
209+
result[k] = v
210+
}
211+
}
212+
213+
// Test consul targets with DNS resolution
214+
if len(consulTargets) > 0 {
215+
consulResults := TestConsulTargets(consulTargets)
216+
for k, v := range consulResults {
217+
result[k] = v
218+
}
219+
}
220+
221+
return result
222+
}

0 commit comments

Comments
 (0)