1+ from __future__ import annotations
2+
13import json
24import os
35import threading
46from typing import TYPE_CHECKING # noqa:F401
7+ from typing import Any # noqa:F401
8+ from typing import Dict # noqa:F401
9+ from typing import List # noqa:F401
10+ from typing import Optional # noqa:F401
11+ from typing import Tuple # noqa:F401
512from uuid import uuid4
613
714from ddtrace .ext import SpanTypes
2835log = get_logger (__name__ )
2936
3037if TYPE_CHECKING : # pragma: no cover
31- from typing import Any # noqa:F401
32- from typing import Dict # noqa:F401
33- from typing import List # noqa:F401
34- from typing import Optional # noqa:F401
35- from typing import Tuple # noqa:F401
36-
3738 from ddtrace ._trace .span import Span # noqa:F401
3839
3940
@@ -43,79 +44,153 @@ class CIVisibilityEncoderV01(BufferedEncoder):
4344 TEST_SUITE_EVENT_VERSION = 1
4445 TEST_EVENT_VERSION = 2
4546 ENDPOINT_TYPE = ENDPOINT .TEST_CYCLE
47+ _MAX_PAYLOAD_SIZE = 5 * 1024 * 1024 # 5MB
4648
4749 def __init__ (self , * args ):
4850 # DEV: args are not used here, but are used by BufferedEncoder's __cinit__() method,
4951 # which is called implicitly by Cython.
5052 super (CIVisibilityEncoderV01 , self ).__init__ ()
53+ self ._metadata : Dict [str , Dict [str , str ]] = {}
5154 self ._lock = threading .RLock ()
52- self ._metadata = {}
55+ self ._is_xdist_worker = os . getenv ( "PYTEST_XDIST_WORKER" ) is not None
5356 self ._init_buffer ()
5457
5558 def __len__ (self ):
5659 with self ._lock :
5760 return len (self .buffer )
5861
59- def set_metadata (self , event_type , metadata ):
60- # type: (str, Dict[str, str]) -> None
62+ def set_metadata (self , event_type : str , metadata : Dict [str , str ]):
6163 self ._metadata .setdefault (event_type , {}).update (metadata )
6264
6365 def _init_buffer (self ):
6466 with self ._lock :
6567 self .buffer = []
6668
67- def put (self , spans ):
69+ def put (self , item ):
6870 with self ._lock :
69- self .buffer .append (spans )
71+ self .buffer .append (item )
7072
7173 def encode_traces (self , traces ):
72- return self ._build_payload (traces = traces )
74+ """
75+ Only used for LogWriter, not called for CI Visibility currently
76+ """
77+ raise NotImplementedError ()
7378
74- def encode (self ):
79+ def encode (self ) -> List [ Tuple [ Optional [ bytes ], int ]] :
7580 with self ._lock :
81+ if not self .buffer :
82+ return []
83+ payloads = []
7684 with StopWatch () as sw :
77- result_payloads = self ._build_payload (self .buffer )
85+ payloads = self ._build_payload (self .buffer )
7886 record_endpoint_payload_events_serialization_time (endpoint = self .ENDPOINT_TYPE , seconds = sw .elapsed ())
7987 self ._init_buffer ()
80- return result_payloads
88+ return payloads
8189
82- def _get_parent_session (self , traces ) :
90+ def _get_parent_session (self , traces : List [ List [ Span ]]) -> int :
8391 for trace in traces :
8492 for span in trace :
8593 if span .get_tag (EVENT_TYPE ) == SESSION_TYPE and span .parent_id is not None and span .parent_id != 0 :
8694 return span .parent_id
8795 return 0
8896
89- def _build_payload (self , traces ):
90- # type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
97+ def _build_payload (self , traces : List [List [Span ]]) -> List [Tuple [Optional [bytes ], int ]]:
98+ """
99+ Build multiple payloads from traces, splitting when necessary to stay under size limits.
100+ Uses index-based recursive approach to avoid copying slices.
101+
102+ Returns a list of (payload_bytes, trace_count) tuples, where each payload contains
103+ as many traces as possible without exceeding _MAX_PAYLOAD_SIZE.
104+ """
105+ if not traces :
106+ return []
107+
91108 new_parent_session_span_id = self ._get_parent_session (traces )
92- is_not_xdist_worker = os .getenv ("PYTEST_XDIST_WORKER" ) is None
93- normalized_spans = [
94- self ._convert_span (span , trace [0 ].context .dd_origin , new_parent_session_span_id )
95- for trace in traces
96- for span in trace
97- if (is_not_xdist_worker or span .get_tag (EVENT_TYPE ) != SESSION_TYPE )
98- ]
99- if not normalized_spans :
109+ return self ._build_payloads_recursive (traces , 0 , len (traces ), new_parent_session_span_id )
110+
111+ def _build_payloads_recursive (
112+ self , traces : List [List [Span ]], start_idx : int , end_idx : int , new_parent_session_span_id : int
113+ ) -> List [Tuple [Optional [bytes ], int ]]:
114+ """
115+ Recursively build payloads using start/end indexes to avoid slice copying.
116+
117+ Args:
118+ traces: Full list of traces
119+ start_idx: Start index (inclusive)
120+ end_idx: End index (exclusive)
121+ new_parent_session_span_id: Parent session span ID
122+
123+ Returns:
124+ List of (payload_bytes, trace_count) tuples
125+ """
126+ if start_idx >= end_idx :
100127 return []
101- record_endpoint_payload_events_count (endpoint = ENDPOINT .TEST_CYCLE , count = len (normalized_spans ))
102128
103- # TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
104- return [
105- (
106- CIVisibilityEncoderV01 ._pack_payload (
107- {"version" : self .PAYLOAD_FORMAT_VERSION , "metadata" : self ._metadata , "events" : normalized_spans }
108- ),
109- len (traces ),
110- )
111- ]
129+ trace_count = end_idx - start_idx
130+
131+ # Convert traces to spans with filtering (using indexes)
132+ all_spans_with_trace_info = self ._convert_traces_to_spans_indexed (
133+ traces , start_idx , end_idx , new_parent_session_span_id
134+ )
135+
136+ # Get all spans (flattened)
137+ all_spans = [span for _ , trace_spans in all_spans_with_trace_info for span in trace_spans ]
138+
139+ if not all_spans :
140+ log .debug ("No spans to encode after filtering, skipping chunk" )
141+ return []
142+
143+ # Try to create payload from all spans
144+ payload = self ._create_payload_from_spans (all_spans )
145+
146+ if len (payload ) <= self ._MAX_PAYLOAD_SIZE or trace_count == 1 :
147+ # Payload fits or we can't split further (single trace)
148+ record_endpoint_payload_events_count (endpoint = self .ENDPOINT_TYPE , count = len (all_spans ))
149+ return [(payload , trace_count )]
150+ else :
151+ # Payload is too large, split in half recursively
152+ mid_idx = start_idx + (trace_count + 1 ) // 2
153+
154+ # Process both halves recursively
155+ left_payloads = self ._build_payloads_recursive (traces , start_idx , mid_idx , new_parent_session_span_id )
156+ right_payloads = self ._build_payloads_recursive (traces , mid_idx , end_idx , new_parent_session_span_id )
157+
158+ # Combine results
159+ return left_payloads + right_payloads
160+
161+ def _convert_traces_to_spans_indexed (
162+ self , traces : List [List [Span ]], start_idx : int , end_idx : int , new_parent_session_span_id : int
163+ ) -> List [Tuple [int , List [Dict [str , Any ]]]]:
164+ """Convert traces to spans with xdist filtering applied, using indexes to avoid slicing."""
165+ all_spans_with_trace_info = []
166+ for trace_idx in range (start_idx , end_idx ):
167+ trace = traces [trace_idx ]
168+ trace_spans = [
169+ self ._convert_span (span , trace [0 ].context .dd_origin , new_parent_session_span_id )
170+ for span in trace
171+ if (not self ._is_xdist_worker ) or (span .get_tag (EVENT_TYPE ) != SESSION_TYPE )
172+ ]
173+ all_spans_with_trace_info .append ((trace_idx , trace_spans ))
174+
175+ return all_spans_with_trace_info
176+
177+ def _create_payload_from_spans (self , spans : List [Dict [str , Any ]]) -> bytes :
178+ """Create a payload from the given spans."""
179+ return CIVisibilityEncoderV01 ._pack_payload (
180+ {
181+ "version" : self .PAYLOAD_FORMAT_VERSION ,
182+ "metadata" : self ._metadata ,
183+ "events" : spans ,
184+ }
185+ )
112186
113187 @staticmethod
114188 def _pack_payload (payload ):
115189 return msgpack_packb (payload )
116190
117- def _convert_span (self , span , dd_origin , new_parent_session_span_id = 0 ):
118- # type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
191+ def _convert_span (
192+ self , span : Span , dd_origin : Optional [str ] = None , new_parent_session_span_id : int = 0
193+ ) -> Dict [str , Any ]:
119194 sp = JSONEncoderV2 ._span_to_dict (span )
120195 sp = JSONEncoderV2 ._normalize_span (sp )
121196 sp ["type" ] = span .get_tag (EVENT_TYPE ) or span .span_type
@@ -183,18 +258,17 @@ class CIVisibilityCoverageEncoderV02(CIVisibilityEncoderV01):
183258 def _set_itr_suite_skipping_mode (self , new_value ):
184259 self .itr_suite_skipping_mode = new_value
185260
186- def put (self , spans ):
261+ def put (self , item ):
187262 spans_with_coverage = [
188263 span
189- for span in spans
264+ for span in item
190265 if COVERAGE_TAG_NAME in span .get_tags () or span .get_struct_tag (COVERAGE_TAG_NAME ) is not None
191266 ]
192267 if not spans_with_coverage :
193268 raise NoEncodableSpansError ()
194269 return super (CIVisibilityCoverageEncoderV02 , self ).put (spans_with_coverage )
195270
196- def _build_coverage_attachment (self , data ):
197- # type: (bytes) -> List[bytes]
271+ def _build_coverage_attachment (self , data : bytes ) -> List [bytes ]:
198272 return [
199273 b"--%s" % self .boundary .encode ("utf-8" ),
200274 b'Content-Disposition: form-data; name="coverage1"; filename="coverage1.msgpack"' ,
@@ -203,8 +277,7 @@ def _build_coverage_attachment(self, data):
203277 data ,
204278 ]
205279
206- def _build_event_json_attachment (self ):
207- # type: () -> List[bytes]
280+ def _build_event_json_attachment (self ) -> List [bytes ]:
208281 return [
209282 b"--%s" % self .boundary .encode ("utf-8" ),
210283 b'Content-Disposition: form-data; name="event"; filename="event.json"' ,
@@ -213,18 +286,16 @@ def _build_event_json_attachment(self):
213286 b'{"dummy":true}' ,
214287 ]
215288
216- def _build_body (self , data ):
217- # type: (bytes) -> List[bytes]
289+ def _build_body (self , data : bytes ) -> List [bytes ]:
218290 return (
219291 self ._build_coverage_attachment (data )
220292 + self ._build_event_json_attachment ()
221293 + [b"--%s--" % self .boundary .encode ("utf-8" )]
222294 )
223295
224- def _build_data (self , traces ):
225- # type: (List[List[Span]]) -> Optional[bytes]
296+ def _build_data (self , traces : List [List [Span ]]) -> Optional [bytes ]:
226297 normalized_covs = [
227- self ._convert_span (span , "" )
298+ self ._convert_span (span )
228299 for trace in traces
229300 for span in trace
230301 if (COVERAGE_TAG_NAME in span .get_tags () or span .get_struct_tag (COVERAGE_TAG_NAME ) is not None )
@@ -235,17 +306,17 @@ def _build_data(self, traces):
235306 # TODO: Split the events in several payloads as needed to avoid hitting the intake's maximum payload size.
236307 return msgpack_packb ({"version" : self .PAYLOAD_FORMAT_VERSION , "coverages" : normalized_covs })
237308
238- def _build_payload (self , traces ):
239- # type: (List[List[Span]]) -> List[Tuple[Optional[bytes], int]]
309+ def _build_payload (self , traces : List [List [Span ]]) -> List [Tuple [Optional [bytes ], int ]]:
240310 data = self ._build_data (traces )
241311 if not data :
242312 return []
243- return [(b"\r \n " .join (self ._build_body (data )), len (traces ))]
313+ return [(b"\r \n " .join (self ._build_body (data )), len (data ))]
244314
245- def _convert_span (self , span , dd_origin , new_parent_session_span_id = 0 ):
246- # type: (Span, Optional[str], Optional[int]) -> Dict[str, Any]
315+ def _convert_span (
316+ self , span : Span , dd_origin : Optional [str ] = None , new_parent_session_span_id : int = 0
317+ ) -> Dict [str , Any ]:
247318 # DEV: new_parent_session_span_id is unused here, but it is used in super class
248- files : Dict [str , Any ] = {}
319+ files : dict [str , Any ] = {}
249320
250321 files_struct_tag_value = span .get_struct_tag (COVERAGE_TAG_NAME )
251322 if files_struct_tag_value is not None and "files" in files_struct_tag_value :
0 commit comments