Skip to content

Commit bdc03f4

Browse files
committed
--wip-- [skip ci]
# Conflicts: # openfeature/provider.go
1 parent 1557bbd commit bdc03f4

File tree

6 files changed

+623
-11
lines changed

6 files changed

+623
-11
lines changed

openfeature/evaluator.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ type evaluationResult struct {
2929
Reason of.Reason
3030
// Error contains any error that occurred during evaluation
3131
Error error
32+
// Metadata contains additional evaluation metadata for hooks
33+
Metadata map[string]any
3234
}
3335

3436
// evaluateFlag evaluates a feature flag with the given context.
@@ -66,10 +68,22 @@ func evaluateFlag(flag *flag, defaultValue any, context map[string]any) evaluati
6668
}
6769
}
6870

71+
// Build metadata for exposure tracking
72+
metadata := make(map[string]any)
73+
metadata[metadataAllocationKey] = allocation.Key
74+
75+
// Get doLog value (defaults to true if not specified)
76+
doLog := true
77+
if allocation.DoLog != nil {
78+
doLog = *allocation.DoLog
79+
}
80+
metadata[metadataDoLog] = doLog
81+
6982
return evaluationResult{
7083
Value: variant.Value,
7184
VariantKey: variant.Key,
7285
Reason: of.TargetingMatchReason,
86+
Metadata: metadata,
7387
}
7488
}
7589
}

openfeature/exposure.go

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2025 Datadog, Inc.
5+
6+
package openfeature
7+
8+
import (
9+
"bytes"
10+
"context"
11+
"encoding/json"
12+
"fmt"
13+
"io"
14+
"net/http"
15+
"net/url"
16+
"sync"
17+
"time"
18+
19+
"github.com/DataDog/dd-trace-go/v2/internal"
20+
"github.com/DataDog/dd-trace-go/v2/internal/env"
21+
"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
22+
"github.com/DataDog/dd-trace-go/v2/internal/log"
23+
)
24+
25+
const (
26+
// defaultExposureFlushInterval is the default interval for flushing exposure events
27+
// Matches the dd-trace-js implementation (1 second)
28+
defaultExposureFlushInterval = 1 * time.Second
29+
30+
// exposureEndpoint is the EVP proxy endpoint for exposure events
31+
exposureEndpoint = "/evp_proxy/v2/api/v2/exposures"
32+
33+
// evpSubdomainHeader is the HTTP header name for EVP subdomain routing
34+
evpSubdomainHeader = "X-Datadog-EVP-Subdomain"
35+
36+
// evpSubdomainValue is the subdomain value for event platform intake
37+
evpSubdomainValue = "event-platform-intake"
38+
39+
// defaultHTTPTimeout is the timeout for HTTP requests to the agent
40+
defaultHTTPTimeout = 5 * time.Second
41+
)
42+
43+
// exposureEvent represents a single feature flag evaluation exposure event.
44+
// It matches the schema defined in exposure.json.
45+
type exposureEvent struct {
46+
Timestamp int64 `json:"timestamp"`
47+
Allocation exposureAllocation `json:"allocation"`
48+
Flag exposureFlag `json:"flag"`
49+
Variant exposureVariant `json:"variant"`
50+
Subject exposureSubject `json:"subject"`
51+
}
52+
53+
// exposureAllocation represents allocation information in an exposure event
54+
type exposureAllocation struct {
55+
Key string `json:"key"`
56+
}
57+
58+
// exposureFlag represents flag information in an exposure event
59+
type exposureFlag struct {
60+
Key string `json:"key"`
61+
}
62+
63+
// exposureVariant represents variant information in an exposure event
64+
type exposureVariant struct {
65+
Key string `json:"key"`
66+
}
67+
68+
// exposureSubject represents subject (user/entity) information in an exposure event
69+
type exposureSubject struct {
70+
ID string `json:"id"`
71+
Type string `json:"type,omitempty"`
72+
Attributes map[string]any `json:"attributes,omitempty"`
73+
}
74+
75+
// exposureContext represents service context metadata for the exposure payload
76+
type exposureContext struct {
77+
ServiceName string `json:"service_name"`
78+
Version string `json:"version,omitempty"`
79+
Env string `json:"env,omitempty"`
80+
}
81+
82+
// exposurePayload represents the complete payload sent to the exposure endpoint
83+
type exposurePayload struct {
84+
Context exposureContext `json:"context"`
85+
Exposures []exposureEvent `json:"exposures"`
86+
}
87+
88+
// exposureWriter manages buffering and flushing of exposure events to the Datadog Agent
89+
type exposureWriter struct {
90+
mu sync.Mutex
91+
buffer map[string]exposureEvent // Deduplicate by composite key
92+
flushInterval time.Duration
93+
httpClient *http.Client
94+
agentURL *url.URL
95+
context exposureContext
96+
ticker *time.Ticker
97+
stopChan chan struct{}
98+
stopped bool
99+
}
100+
101+
// newExposureWriter creates a new exposure writer with the given configuration
102+
func newExposureWriter(config *ProviderConfig) *exposureWriter {
103+
flushInterval := config.ExposureFlushInterval
104+
if flushInterval == 0 {
105+
flushInterval = defaultExposureFlushInterval
106+
}
107+
108+
// Get agent URL from environment or default
109+
agentURL := internal.AgentURLFromEnv()
110+
111+
// Build service context from environment variables
112+
serviceName := globalconfig.ServiceName()
113+
if serviceName == "" {
114+
serviceName = env.Get("DD_SERVICE")
115+
}
116+
if serviceName == "" {
117+
serviceName = "unknown"
118+
}
119+
120+
context := exposureContext{
121+
ServiceName: serviceName,
122+
}
123+
124+
// Only include version and env if they are defined
125+
if version := env.Get("DD_VERSION"); version != "" {
126+
context.Version = version
127+
}
128+
129+
if envName := env.Get("DD_ENV"); envName != "" {
130+
context.Env = envName
131+
}
132+
133+
// Create HTTP client with timeout
134+
httpClient := &http.Client{
135+
Timeout: defaultHTTPTimeout,
136+
}
137+
138+
w := &exposureWriter{
139+
buffer: make(map[string]exposureEvent),
140+
flushInterval: flushInterval,
141+
httpClient: httpClient,
142+
agentURL: agentURL,
143+
context: context,
144+
stopChan: make(chan struct{}),
145+
}
146+
147+
// Start periodic flushing
148+
w.start()
149+
150+
return w
151+
}
152+
153+
// start begins the periodic flushing of exposure events
154+
func (w *exposureWriter) start() {
155+
w.ticker = time.NewTicker(w.flushInterval)
156+
go func() {
157+
for {
158+
select {
159+
case <-w.ticker.C:
160+
w.flush()
161+
case <-w.stopChan:
162+
return
163+
}
164+
}
165+
}()
166+
}
167+
168+
// append adds an exposure event to the buffer with deduplication
169+
func (w *exposureWriter) append(event exposureEvent) {
170+
w.mu.Lock()
171+
defer w.mu.Unlock()
172+
173+
if w.stopped {
174+
return
175+
}
176+
177+
// Create composite key for deduplication
178+
// Deduplicate by flag, allocation, variant, and subject.id
179+
key := fmt.Sprintf("%s|%s|%s|%s", event.Flag.Key, event.Allocation.Key, event.Variant.Key, event.Subject.ID)
180+
181+
// Store event (will overwrite if duplicate)
182+
w.buffer[key] = event
183+
}
184+
185+
// flush sends all buffered exposure events to the agent
186+
func (w *exposureWriter) flush() {
187+
w.mu.Lock()
188+
if len(w.buffer) == 0 || w.stopped {
189+
w.mu.Unlock()
190+
return
191+
}
192+
193+
// Move buffer to local variable and create new buffer
194+
events := make([]exposureEvent, 0, len(w.buffer))
195+
for _, event := range w.buffer {
196+
events = append(events, event)
197+
}
198+
w.buffer = make(map[string]exposureEvent)
199+
w.mu.Unlock()
200+
201+
// Build payload
202+
payload := exposurePayload{
203+
Context: w.context,
204+
Exposures: events,
205+
}
206+
207+
// Send to agent
208+
if err := w.sendToAgent(payload); err != nil {
209+
log.Error("openfeature: failed to send exposure events: %v", err)
210+
} else {
211+
log.Debug("openfeature: successfully sent %d exposure events", len(events))
212+
}
213+
}
214+
215+
// sendToAgent sends the exposure payload to the Datadog Agent via EVP proxy
216+
func (w *exposureWriter) sendToAgent(payload exposurePayload) error {
217+
// Serialize payload
218+
jsonData, err := json.Marshal(payload)
219+
if err != nil {
220+
return fmt.Errorf("failed to marshal exposure payload: %w", err)
221+
}
222+
223+
// Build request URL
224+
requestURL := w.buildRequestURL()
225+
226+
// Create HTTP request
227+
req, err := http.NewRequestWithContext(context.Background(), "POST", requestURL, bytes.NewReader(jsonData))
228+
if err != nil {
229+
return fmt.Errorf("failed to create request: %w", err)
230+
}
231+
232+
// Set headers
233+
req.Header.Set("Content-Type", "application/json")
234+
req.Header.Set(evpSubdomainHeader, evpSubdomainValue)
235+
236+
log.Debug("openfeature: sending exposure events to %s", requestURL)
237+
238+
// Send request
239+
resp, err := w.httpClient.Do(req)
240+
if err != nil {
241+
return fmt.Errorf("request failed: %w", err)
242+
}
243+
defer resp.Body.Close()
244+
245+
// Check response status
246+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
247+
body, _ := io.ReadAll(io.LimitReader(resp.Body, 256))
248+
return fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(body))
249+
}
250+
251+
return nil
252+
}
253+
254+
// buildRequestURL constructs the full URL for the exposure endpoint
255+
func (w *exposureWriter) buildRequestURL() string {
256+
if w.agentURL.Scheme == "unix" {
257+
// For Unix domain sockets, use the HTTP adapter
258+
u := internal.UnixDataSocketURL(w.agentURL.Path)
259+
u.Path = exposureEndpoint
260+
return u.String()
261+
}
262+
263+
// For HTTP/HTTPS URLs, append the endpoint path
264+
u := *w.agentURL
265+
u.Path = exposureEndpoint
266+
return u.String()
267+
}
268+
269+
// stop stops the exposure writer and flushes any remaining events
270+
func (w *exposureWriter) stop() {
271+
w.mu.Lock()
272+
if w.stopped {
273+
w.mu.Unlock()
274+
return
275+
}
276+
w.stopped = true
277+
w.mu.Unlock()
278+
279+
// Stop the ticker
280+
if w.ticker != nil {
281+
w.ticker.Stop()
282+
}
283+
284+
// Signal the goroutine to stop
285+
close(w.stopChan)
286+
287+
// Final flush
288+
w.flush()
289+
290+
log.Debug("openfeature: exposure writer stopped")
291+
}

0 commit comments

Comments
 (0)