Skip to content

Commit 78bf8d9

Browse files
committed
rework exposures, add tests and better flattening
Signed-off-by: Eliott Bouhana <[email protected]>
1 parent 713c1a8 commit 78bf8d9

File tree

13 files changed

+938
-207
lines changed

13 files changed

+938
-207
lines changed

ddtrace/tracer/option.go

Lines changed: 4 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package tracer
77

88
import (
9+
"cmp"
910
"context"
1011
"encoding/json"
1112
"errors"
@@ -551,14 +552,10 @@ func newConfig(opts ...StartOption) (*config, error) {
551552
if c.agentURL.Scheme == "unix" {
552553
// If we're connecting over UDS we can just rely on the agent to provide the hostname
553554
log.Debug("connecting to agent over unix, do not set hostname on any traces")
554-
c.httpClient = udsClient(c.agentURL.Path, c.httpClientTimeout)
555-
// TODO(darccio): use internal.UnixDataSocketURL instead
556-
c.agentURL = &url.URL{
557-
Scheme: "http",
558-
Host: fmt.Sprintf("UDS_%s", strings.NewReplacer(":", "_", "/", "_", `\`, "_").Replace(c.agentURL.Path)),
559-
}
555+
c.httpClient = internal.UDSClient(c.agentURL.Path, cmp.Or(c.httpClientTimeout, defaultHTTPTimeout))
556+
c.agentURL = internal.UnixDataSocketURL(c.agentURL.Path)
560557
} else {
561-
c.httpClient = defaultHTTPClient(c.httpClientTimeout, false)
558+
c.httpClient = internal.DefaultHTTPClient(c.httpClientTimeout, false)
562559
}
563560
}
564561
WithGlobalTag(ext.RuntimeID, globalconfig.RuntimeID())(c)
@@ -742,29 +739,6 @@ func newStatsdClient(c *config) (internal.StatsdClient, error) {
742739
return internal.NewStatsdClient(c.dogstatsdAddr, statsTags(c))
743740
}
744741

745-
// udsClient returns a new http.Client which connects using the given UDS socket path.
746-
func udsClient(socketPath string, timeout time.Duration) *http.Client {
747-
if timeout == 0 {
748-
timeout = defaultHTTPTimeout
749-
}
750-
return &http.Client{
751-
Transport: &http.Transport{
752-
Proxy: http.ProxyFromEnvironment,
753-
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
754-
return defaultDialer(timeout).DialContext(ctx, "unix", (&net.UnixAddr{
755-
Name: socketPath,
756-
Net: "unix",
757-
}).String())
758-
},
759-
MaxIdleConns: 100,
760-
IdleConnTimeout: 90 * time.Second,
761-
TLSHandshakeTimeout: 10 * time.Second,
762-
ExpectContinueTimeout: 1 * time.Second,
763-
},
764-
Timeout: timeout,
765-
}
766-
}
767-
768742
// defaultDogstatsdAddr returns the default connection address for Dogstatsd.
769743
func defaultDogstatsdAddr() string {
770744
envHost, envPort := env.Get("DD_DOGSTATSD_HOST"), env.Get("DD_DOGSTATSD_PORT")

ddtrace/tracer/transport.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"bytes"
1010
"fmt"
1111
"io"
12-
"net"
1312
"net/http"
1413
"runtime"
1514
"strconv"
@@ -31,32 +30,6 @@ const (
3130
headerComputedTopLevel = "Datadog-Client-Computed-Top-Level"
3231
)
3332

34-
func defaultDialer(timeout time.Duration) *net.Dialer {
35-
return &net.Dialer{
36-
Timeout: timeout,
37-
KeepAlive: 30 * time.Second,
38-
DualStack: true,
39-
}
40-
}
41-
42-
func defaultHTTPClient(timeout time.Duration, disableKeepAlives bool) *http.Client {
43-
if timeout == 0 {
44-
timeout = defaultHTTPTimeout
45-
}
46-
return &http.Client{
47-
Transport: &http.Transport{
48-
Proxy: http.ProxyFromEnvironment,
49-
DialContext: defaultDialer(timeout).DialContext,
50-
MaxIdleConns: 100,
51-
IdleConnTimeout: 90 * time.Second,
52-
TLSHandshakeTimeout: 10 * time.Second,
53-
ExpectContinueTimeout: 1 * time.Second,
54-
DisableKeepAlives: disableKeepAlives,
55-
},
56-
Timeout: timeout,
57-
}
58-
}
59-
6033
const (
6134
defaultHostname = "localhost"
6235
defaultPort = "8126"

internal/agent.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ package internal
77

88
import (
99
"net"
10+
"net/http"
1011
"net/url"
1112
"os"
13+
"time"
1214

1315
"github.com/DataDog/dd-trace-go/v2/internal/env"
1416
"github.com/DataDog/dd-trace-go/v2/internal/log"
@@ -74,3 +76,26 @@ func AgentURLFromEnv() *url.URL {
7476
}
7577
return httpURL
7678
}
79+
80+
func DefaultDialer(timeout time.Duration) *net.Dialer {
81+
return &net.Dialer{
82+
Timeout: timeout,
83+
KeepAlive: 30 * time.Second,
84+
DualStack: true,
85+
}
86+
}
87+
88+
func DefaultHTTPClient(timeout time.Duration, disableKeepAlives bool) *http.Client {
89+
return &http.Client{
90+
Transport: &http.Transport{
91+
Proxy: http.ProxyFromEnvironment,
92+
DialContext: DefaultDialer(timeout).DialContext,
93+
MaxIdleConns: 100,
94+
IdleConnTimeout: 90 * time.Second,
95+
TLSHandshakeTimeout: 10 * time.Second,
96+
ExpectContinueTimeout: 1 * time.Second,
97+
DisableKeepAlives: disableKeepAlives,
98+
},
99+
Timeout: timeout,
100+
}
101+
}

internal/telemetry/internal/writer.go

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"errors"
1212
"fmt"
1313
"io"
14-
"net"
1514
"net/http"
1615
"os"
1716
"runtime"
@@ -29,25 +28,6 @@ import (
2928
"github.com/DataDog/dd-trace-go/v2/internal/version"
3029
)
3130

32-
// We copy the transport to avoid using the default one, as it might be
33-
// augmented with tracing and we don't want these calls to be recorded.
34-
// See https://golang.org/pkg/net/http/#DefaultTransport .
35-
var defaultHTTPClient = &http.Client{
36-
Transport: &http.Transport{
37-
Proxy: http.ProxyFromEnvironment,
38-
DialContext: (&net.Dialer{
39-
Timeout: 30 * time.Second,
40-
KeepAlive: 30 * time.Second,
41-
}).DialContext,
42-
ForceAttemptHTTP2: true,
43-
MaxIdleConns: 100,
44-
IdleConnTimeout: 90 * time.Second,
45-
TLSHandshakeTimeout: 10 * time.Second,
46-
ExpectContinueTimeout: 1 * time.Second,
47-
},
48-
Timeout: 5 * time.Second,
49-
}
50-
5131
func newBody(config TracerConfig, debugMode bool) *transport.Body {
5232
osHostname, err := os.Hostname()
5333
if err != nil {
@@ -130,7 +110,7 @@ func NewWriter(config WriterConfig) (Writer, error) {
130110
}
131111

132112
if config.HTTPClient == nil {
133-
config.HTTPClient = defaultHTTPClient
113+
config.HTTPClient = internal.DefaultHTTPClient(5*time.Second, true)
134114
}
135115

136116
// Don't allow the client to have a timeout higher than 5 seconds

internal/uds.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,13 @@
66
package internal
77

88
import (
9+
"context"
910
"fmt"
11+
"net"
12+
"net/http"
1013
"net/url"
1114
"strings"
15+
"time"
1216
)
1317

1418
func UnixDataSocketURL(path string) *url.URL {
@@ -17,3 +21,23 @@ func UnixDataSocketURL(path string) *url.URL {
1721
Host: fmt.Sprintf("UDS_%s", strings.NewReplacer(":", "_", "/", "_", `\`, "_").Replace(path)),
1822
}
1923
}
24+
25+
// UDSClient returns a new http.Client which connects using the given UDS socket path.
26+
func UDSClient(socketPath string, timeout time.Duration) *http.Client {
27+
return &http.Client{
28+
Transport: &http.Transport{
29+
Proxy: http.ProxyFromEnvironment,
30+
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
31+
return DefaultDialer(timeout).DialContext(ctx, "unix", (&net.UnixAddr{
32+
Name: socketPath,
33+
Net: "unix",
34+
}).String())
35+
},
36+
MaxIdleConns: 100,
37+
IdleConnTimeout: 90 * time.Second,
38+
TLSHandshakeTimeout: 10 * time.Second,
39+
ExpectContinueTimeout: 1 * time.Second,
40+
},
41+
Timeout: timeout,
42+
}
43+
}

openfeature/evaluator.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func evaluateFlag(flag *flag, defaultValue any, context map[string]any) evaluati
7777
if allocation.DoLog != nil {
7878
doLog = *allocation.DoLog
7979
}
80-
metadata[metadataDoLog] = doLog
80+
metadata[metadataDoLogKey] = doLog
8181

8282
return evaluationResult{
8383
Value: variant.Value,

openfeature/exposure.go

Lines changed: 44 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,21 @@ import (
99
"bytes"
1010
"cmp"
1111
"context"
12-
"encoding/json"
1312
"fmt"
1413
"io"
14+
"log/slog"
1515
"net/http"
1616
"net/url"
17+
"os"
1718
"sync"
1819
"time"
1920

2021
"github.com/DataDog/dd-trace-go/v2/internal"
2122
"github.com/DataDog/dd-trace-go/v2/internal/env"
2223
"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
2324
"github.com/DataDog/dd-trace-go/v2/internal/log"
25+
telemetrylog "github.com/DataDog/dd-trace-go/v2/internal/telemetry/log"
26+
jsoniter "github.com/json-iterator/go"
2427
)
2528

2629
const (
@@ -75,9 +78,9 @@ type exposureSubject struct {
7578

7679
// exposureContext represents service context metadata for the exposure payload
7780
type exposureContext struct {
78-
ServiceName string `json:"service"`
79-
Version string `json:"version,omitempty"`
80-
Env string `json:"env,omitempty"`
81+
Service string `json:"service"`
82+
Version string `json:"version,omitempty"`
83+
Env string `json:"env,omitempty"`
8184
}
8285

8386
// exposurePayload represents the complete payload sent to the exposure endpoint
@@ -97,31 +100,34 @@ type exposureWriter struct {
97100
ticker *time.Ticker
98101
stopChan chan struct{}
99102
stopped bool
103+
jsonConfig jsoniter.API
100104
}
101105

102106
// newExposureWriter creates a new exposure writer with the given configuration
103107
func newExposureWriter(config ProviderConfig) *exposureWriter {
104-
// Build service context from environment variables
105-
serviceName := cmp.Or(env.Get("DD_SERVICE"), globalconfig.ServiceName())
106-
if serviceName == "" {
107-
serviceName = "unknown"
108+
agentURL := internal.AgentURLFromEnv()
109+
var httpClient *http.Client
110+
if agentURL.Scheme == "unix" {
111+
httpClient = internal.UDSClient(agentURL.Path, defaultHTTPTimeout)
112+
agentURL = internal.UnixDataSocketURL(agentURL.Path)
113+
} else {
114+
httpClient = internal.DefaultHTTPClient(defaultHTTPTimeout, false)
108115
}
109116

110-
context := exposureContext{
111-
ServiceName: serviceName,
112-
Version: env.Get("DD_VERSION"),
113-
Env: env.Get("DD_ENV"),
114-
}
117+
executable, _ := os.Executable()
115118

116119
return &exposureWriter{
117-
buffer: make([]exposureEvent, 0),
120+
buffer: make([]exposureEvent, 1<<8), // Initial capacity of 256
118121
flushInterval: cmp.Or(config.ExposureFlushInterval, defaultExposureFlushInterval),
119-
httpClient: &http.Client{
120-
Timeout: defaultHTTPTimeout,
122+
httpClient: httpClient,
123+
agentURL: agentURL,
124+
stopChan: make(chan struct{}),
125+
jsonConfig: jsoniter.Config{}.Froze(),
126+
context: exposureContext{
127+
Service: cmp.Or(env.Get("DD_SERVICE"), globalconfig.ServiceName(), executable),
128+
Version: env.Get("DD_VERSION"),
129+
Env: env.Get("DD_ENV"),
121130
},
122-
agentURL: internal.AgentURLFromEnv(),
123-
context: context,
124-
stopChan: make(chan struct{}),
125131
}
126132
}
127133

@@ -132,6 +138,13 @@ func (w *exposureWriter) start() {
132138
defer func() {
133139
if r := recover(); r != nil {
134140
log.Error("openfeature: exposure writer recovered panic: %v", r)
141+
var errAttr slog.Attr
142+
if err, ok := r.(error); ok {
143+
errAttr = slog.Any("panic", telemetrylog.NewSafeError(err))
144+
} else {
145+
errAttr = slog.Any("panic", r)
146+
}
147+
telemetrylog.Error("openfeature: exposure writer recovered panic", errAttr)
135148
}
136149
w.stop()
137150
}()
@@ -171,17 +184,14 @@ func (w *exposureWriter) flush() {
171184

172185
// Move buffer to local variable and create new buffer
173186
events := w.buffer
174-
w.buffer = make([]exposureEvent, len(events)/2)
187+
w.buffer = make([]exposureEvent, 0, len(events)/2)
175188
w.mu.Unlock()
176189

177-
// Build payload
178-
payload := exposurePayload{
190+
// Send to agent
191+
if err := w.sendToAgent(exposurePayload{
179192
Context: w.context,
180193
Exposures: events,
181-
}
182-
183-
// Send to agent
184-
if err := w.sendToAgent(payload); err != nil {
194+
}); err != nil {
185195
log.Error("openfeature: failed to send exposure events: %v", err.Error())
186196
} else {
187197
log.Debug("openfeature: successfully sent %d exposure events", len(events))
@@ -191,16 +201,19 @@ func (w *exposureWriter) flush() {
191201
// sendToAgent sends the exposure payload to the Datadog Agent via EVP proxy
192202
func (w *exposureWriter) sendToAgent(payload exposurePayload) error {
193203
// Serialize payload
194-
jsonData, err := json.Marshal(payload)
195-
if err != nil {
196-
return fmt.Errorf("failed to marshal exposure payload: %w", err)
204+
var bytesBuffer bytes.Buffer
205+
encoder := w.jsonConfig.NewEncoder(&bytesBuffer)
206+
if err := encoder.Encode(payload); err != nil {
207+
return fmt.Errorf("failed to encode exposure payload: %w", err)
197208
}
198209

199210
// Build request URL
200-
requestURL := w.buildRequestURL()
211+
u := *w.agentURL
212+
u.Path = exposureEndpoint
213+
requestURL := u.String()
201214

202215
// Create HTTP request
203-
req, err := http.NewRequestWithContext(context.Background(), "POST", requestURL, bytes.NewReader(jsonData))
216+
req, err := http.NewRequestWithContext(context.Background(), "POST", requestURL, &bytesBuffer)
204217
if err != nil {
205218
return fmt.Errorf("failed to create request: %w", err)
206219
}
@@ -227,21 +240,6 @@ func (w *exposureWriter) sendToAgent(payload exposurePayload) error {
227240
return nil
228241
}
229242

230-
// buildRequestURL constructs the full URL for the exposure endpoint
231-
func (w *exposureWriter) buildRequestURL() string {
232-
if w.agentURL.Scheme == "unix" {
233-
// For Unix domain sockets, use the HTTP adapter
234-
u := internal.UnixDataSocketURL(w.agentURL.Path)
235-
u.Path = exposureEndpoint
236-
return u.String()
237-
}
238-
239-
// For HTTP/HTTPS URLs, append the endpoint path
240-
u := *w.agentURL
241-
u.Path = exposureEndpoint
242-
return u.String()
243-
}
244-
245243
// stop stops the exposure writer and flushes any remaining events
246244
func (w *exposureWriter) stop() {
247245
w.mu.Lock()

0 commit comments

Comments
 (0)