Skip to content

Commit 26c62ab

Browse files
hannahkmdarccio
andauthored
feat: v1 trace protocol implementation (#3947)
Co-authored-by: darccio <[email protected]> Co-authored-by: hannahs.kim <[email protected]>
1 parent 882e8a8 commit 26c62ab

File tree

12 files changed

+1843
-215
lines changed

12 files changed

+1843
-215
lines changed

.github/workflows/system-tests.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,8 @@ jobs:
136136
scenario: AGENT_SUPPORTING_SPAN_EVENTS
137137
- weblog-variant: net-http
138138
scenario: TELEMETRY_METRIC_GENERATION_DISABLED
139+
- weblog-variant: net-http
140+
scenario: APM_TRACING_EFFICIENT_PAYLOAD
139141
fail-fast: false
140142
env:
141143
TEST_LIBRARY: golang

ddtrace/tracer/api.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,16 @@ type UserMonitoringConfig struct {
130130

131131
type UserMonitoringOption func(*UserMonitoringConfig)()
132132

133+
// File: payload_v1.go
134+
135+
// Package Functions
136+
func DecodeAttributes([]byte, *stringTable) (map[string]anyValue, []byte, error)
137+
func DecodeKeyValueList([]byte, *stringTable) (map[string]anyValue, []byte, error)
138+
func DecodeSpanEvents([]byte, *stringTable) ([]spanEvent, []byte, error)
139+
func DecodeSpanLinks([]byte, *stringTable) ([]SpanLink, []byte, error)
140+
func DecodeSpans([]byte, *stringTable) (spanList, []byte, error)
141+
func DecodeTraceChunks([]byte, *stringTable) ([]traceChunk, []byte, error)
142+
133143
// File: propagator.go
134144

135145
// Types

ddtrace/tracer/log.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,12 @@ type startupInfo struct {
6969
// checkEndpoint tries to connect to the URL specified by endpoint.
7070
// If the endpoint is not reachable, checkEndpoint returns an error
7171
// explaining why.
72-
func checkEndpoint(c *http.Client, endpoint string) error {
73-
req, err := http.NewRequest("POST", endpoint, bytes.NewReader([]byte{0x90}))
72+
func checkEndpoint(c *http.Client, endpoint string, protocol float64) error {
73+
b := []byte{0x90} // empty array
74+
if protocol == traceProtocolV1 {
75+
b = []byte{0x80} // empty map
76+
}
77+
req, err := http.NewRequest("POST", endpoint, bytes.NewReader(b))
7478
if err != nil {
7579
return fmt.Errorf("cannot create http request: %s", err)
7680
}
@@ -162,8 +166,8 @@ func logStartup(t *tracer) {
162166
info.SampleRateLimit = fmt.Sprintf("%v", limit)
163167
}
164168
if !t.config.logToStdout {
165-
if err := checkEndpoint(t.config.httpClient, t.config.transport.endpoint()); err != nil {
166-
info.AgentError = err.Error()
169+
if err := checkEndpoint(t.config.httpClient, t.config.transport.endpoint(), t.config.traceProtocol); err != nil {
170+
info.AgentError = fmt.Sprintf("%s", err.Error())
167171
log.Warn("DIAGNOSTICS Unable to reach agent intake: %s", err.Error())
168172
}
169173
}

ddtrace/tracer/option.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -414,9 +414,6 @@ func newConfig(opts ...StartOption) (*config, error) {
414414

415415
reportTelemetryOnAppStarted(telemetry.Configuration{Name: "trace_rate_limit", Value: c.traceRateLimitPerSecond, Origin: origin})
416416

417-
// Set the trace protocol to use.
418-
c.traceProtocol = internal.FloatEnv("DD_TRACE_AGENT_PROTOCOL_VERSION", traceProtocolV04)
419-
420417
if v := env.Get("OTEL_LOGS_EXPORTER"); v != "" {
421418
log.Warn("OTEL_LOGS_EXPORTER is not supported")
422419
}
@@ -595,6 +592,15 @@ func newConfig(opts ...StartOption) (*config, error) {
595592
if c.transport == nil {
596593
c.transport = newHTTPTransport(c.agentURL.String(), c.httpClient)
597594
}
595+
// Set the trace protocol to use.
596+
if internal.BoolEnv("DD_TRACE_V1_PAYLOAD_FORMAT_ENABLED", false) {
597+
c.traceProtocol = traceProtocolV1
598+
if t, ok := c.transport.(*httpTransport); ok {
599+
t.traceURL = fmt.Sprintf("%s%s", c.agentURL.String(), tracesAPIPathV1)
600+
}
601+
} else {
602+
c.traceProtocol = traceProtocolV04
603+
}
598604
if c.propagator == nil {
599605
envKey := "DD_TRACE_X_DATADOG_TAGS_MAX_LENGTH"
600606
maxLen := internal.IntEnv(envKey, defaultMaxTagsHeaderLen)

ddtrace/tracer/payload.go

Lines changed: 18 additions & 195 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,8 @@
66
package tracer
77

88
import (
9-
"bytes"
10-
"encoding/binary"
119
"io"
1210
"sync"
13-
"sync/atomic"
14-
15-
"github.com/DataDog/dd-trace-go/v2/internal/processtags"
16-
"github.com/tinylib/msgp/msgp"
1711
)
1812

1913
// payloadStats contains the statistics of a payload.
@@ -52,206 +46,35 @@ type payload interface {
5246
payloadReader
5347
}
5448

55-
// unsafePayload is a wrapper on top of the msgpack encoder which allows constructing an
56-
// encoded array by pushing its entries sequentially, one at a time. It basically
57-
// allows us to encode as we would with a stream, except that the contents of the stream
58-
// can be read as a slice by the msgpack decoder at any time. It follows the guidelines
59-
// from the msgpack array spec:
60-
// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
61-
//
62-
// unsafePayload implements io.Reader and can be used with the decoder directly.
63-
//
64-
// unsafePayload is not safe for concurrent use.
65-
//
66-
// unsafePayload is meant to be used only once and eventually dismissed with the
67-
// single exception of retrying failed flush attempts.
68-
//
69-
// ⚠️ Warning!
70-
//
71-
// The payload should not be reused for multiple sets of traces. Resetting the
72-
// payload for re-use requires the transport to wait for the HTTP package to
73-
// Close the request body before attempting to re-use it again! This requires
74-
// additional logic to be in place. See:
75-
//
76-
// • https://github.com/golang/go/blob/go1.16/src/net/http/client.go#L136-L138
77-
// • https://github.com/DataDog/dd-trace-go/pull/475
78-
// • https://github.com/DataDog/dd-trace-go/pull/549
79-
// • https://github.com/DataDog/dd-trace-go/pull/976
80-
type unsafePayload struct {
81-
// header specifies the first few bytes in the msgpack stream
82-
// indicating the type of array (fixarray, array16 or array32)
83-
// and the number of items contained in the stream.
84-
header []byte
85-
86-
// off specifies the current read position on the header.
87-
off int
88-
89-
// count specifies the number of items in the stream.
90-
count uint32
91-
92-
// buf holds the sequence of msgpack-encoded items.
93-
buf bytes.Buffer
94-
95-
// reader is used for reading the contents of buf.
96-
reader *bytes.Reader
97-
98-
// protocolVersion specifies the trace protocolVersion to use.
99-
protocolVersion float64
100-
}
101-
102-
var _ io.Reader = (*unsafePayload)(nil)
103-
104-
// newUnsafePayload returns a ready to use unsafe payload.
105-
func newUnsafePayload(protocol float64) *unsafePayload {
106-
p := &unsafePayload{
107-
header: make([]byte, 8),
108-
off: 8,
109-
protocolVersion: protocol,
110-
}
111-
return p
112-
}
113-
114-
// push pushes a new item into the stream.
115-
func (p *unsafePayload) push(t []*Span) (stats payloadStats, err error) {
116-
p.setTracerTags(t)
117-
sl := spanList(t)
118-
p.buf.Grow(sl.Msgsize())
119-
if err := msgp.Encode(&p.buf, sl); err != nil {
120-
return payloadStats{}, err
121-
}
122-
p.recordItem()
123-
return p.stats(), nil
124-
}
125-
126-
func (p *unsafePayload) setTracerTags(t []*Span) {
127-
// set on first chunk
128-
if atomic.LoadUint32(&p.count) != 0 {
129-
return
130-
}
131-
if len(t) == 0 {
132-
return
133-
}
134-
pTags := processtags.GlobalTags().String()
135-
if pTags == "" {
136-
return
49+
// newPayload returns a ready to use payload.
50+
func newPayload(protocol float64) payload {
51+
if protocol == traceProtocolV1 {
52+
return &safePayload{
53+
p: newPayloadV1(),
54+
}
13755
}
138-
t[0].setProcessTags(pTags)
139-
}
140-
141-
// itemCount returns the number of items available in the stream.
142-
func (p *unsafePayload) itemCount() int {
143-
return int(atomic.LoadUint32(&p.count))
144-
}
145-
146-
// size returns the payload size in bytes. After the first read the value becomes
147-
// inaccurate by up to 8 bytes.
148-
func (p *unsafePayload) size() int {
149-
return p.buf.Len() + len(p.header) - p.off
150-
}
151-
152-
// reset sets up the payload to be read a second time. It maintains the
153-
// underlying byte contents of the buffer. reset should not be used in order to
154-
// reuse the payload for another set of traces.
155-
func (p *unsafePayload) reset() {
156-
p.updateHeader()
157-
if p.reader != nil {
158-
p.reader.Seek(0, 0)
56+
return &safePayload{
57+
p: newPayloadV04(),
15958
}
16059
}
16160

162-
// clear empties the payload buffers.
163-
func (p *unsafePayload) clear() {
164-
p.buf = bytes.Buffer{}
165-
p.reader = nil
166-
}
167-
16861
// https://github.com/msgpack/msgpack/blob/master/spec.md#array-format-family
16962
const (
63+
// arrays
17064
msgpackArrayFix byte = 144 // up to 15 items
17165
msgpackArray16 byte = 0xdc // up to 2^16-1 items, followed by size in 2 bytes
17266
msgpackArray32 byte = 0xdd // up to 2^32-1 items, followed by size in 4 bytes
173-
)
174-
175-
// updateHeader updates the payload header based on the number of items currently
176-
// present in the stream.
177-
func (p *unsafePayload) updateHeader() {
178-
n := uint64(atomic.LoadUint32(&p.count))
179-
switch {
180-
case n <= 15:
181-
p.header[7] = msgpackArrayFix + byte(n)
182-
p.off = 7
183-
case n <= 1<<16-1:
184-
binary.BigEndian.PutUint64(p.header, n) // writes 2 bytes
185-
p.header[5] = msgpackArray16
186-
p.off = 5
187-
default: // n <= 1<<32-1
188-
binary.BigEndian.PutUint64(p.header, n) // writes 4 bytes
189-
p.header[3] = msgpackArray32
190-
p.off = 3
191-
}
192-
}
193-
194-
// Close implements io.Closer
195-
func (p *unsafePayload) Close() error {
196-
return nil
197-
}
198-
199-
// Read implements io.Reader. It reads from the msgpack-encoded stream.
200-
func (p *unsafePayload) Read(b []byte) (n int, err error) {
201-
if p.off < len(p.header) {
202-
// reading header
203-
n = copy(b, p.header[p.off:])
204-
p.off += n
205-
return n, nil
206-
}
207-
if p.reader == nil {
208-
p.reader = bytes.NewReader(p.buf.Bytes())
209-
}
210-
return p.reader.Read(b)
211-
}
212-
213-
// Write implements io.Writer. It writes data directly to the buffer.
214-
func (p *unsafePayload) Write(data []byte) (n int, err error) {
215-
return p.buf.Write(data)
216-
}
217-
218-
// grow grows the buffer to ensure it can accommodate n more bytes.
219-
func (p *unsafePayload) grow(n int) {
220-
p.buf.Grow(n)
221-
}
22267

223-
// recordItem records that an item was added and updates the header.
224-
func (p *unsafePayload) recordItem() {
225-
atomic.AddUint32(&p.count, 1)
226-
p.updateHeader()
227-
}
228-
229-
// stats returns the current stats of the payload.
230-
func (p *unsafePayload) stats() payloadStats {
231-
return payloadStats{
232-
size: p.size(),
233-
itemCount: int(atomic.LoadUint32(&p.count)),
234-
}
235-
}
236-
237-
// protocol returns the protocol version of the payload.
238-
func (p *unsafePayload) protocol() float64 {
239-
return p.protocolVersion
240-
}
241-
242-
var _ io.Reader = (*safePayload)(nil)
243-
244-
// newPayload returns a ready to use thread-safe payload.
245-
func newPayload(protocol float64) payload {
246-
return &safePayload{
247-
p: newUnsafePayload(protocol),
248-
}
249-
}
68+
// maps
69+
msgpackMapFix byte = 0x80 // up to 15 items
70+
msgpackMap16 byte = 0xde // up to 2^16-1 items, followed by size in 2 bytes
71+
msgpackMap32 byte = 0xdf // up to 2^32-1 items, followed by size in 4 bytes
72+
)
25073

251-
// safePayload provides a thread-safe wrapper around unsafePayload.
74+
// safePayload provides a thread-safe wrapper around payload.
25275
type safePayload struct {
25376
mu sync.RWMutex
254-
p *unsafePayload
77+
p payload
25578
}
25679

25780
// push pushes a new item into the stream in a thread-safe manner.
@@ -262,9 +85,9 @@ func (sp *safePayload) push(t spanList) (stats payloadStats, err error) {
26285
}
26386

26487
// itemCount returns the number of items available in the stream in a thread-safe manner.
88+
// This method is not thread-safe, but the underlying payload.itemCount() must be.
26589
func (sp *safePayload) itemCount() int {
266-
// Use direct atomic access for better performance - no mutex needed
267-
return int(atomic.LoadUint32(&sp.p.count))
90+
return sp.p.itemCount()
26891
}
26992

27093
// size returns the payload size in bytes in a thread-safe manner.

0 commit comments

Comments
 (0)