6161from .sampler import RateByServiceSampler
6262from .sampler import RateSampler
6363from .span import Span
64+ from .vendor .debtcollector import removals
6465
6566
6667log = get_logger (__name__ )
@@ -147,7 +148,7 @@ def __init__(
147148 report_metrics = config .health_metrics_enabled ,
148149 sync_mode = self ._use_sync_mode (),
149150 )
150- self .writer = writer # type: TraceWriter
151+ self ._writer = writer # type: TraceWriter
151152
152153 # DD_TRACER_... should be deprecated after version 1.0.0 is released
153154 pfe_default_value = False
@@ -351,8 +352,8 @@ def configure(
351352 if any (x is not None for x in [hostname , port , uds_path , https ]):
352353 # If any of the parts of the URL have updated, merge them with
353354 # the previous writer values.
354- if isinstance (self .writer , AgentWriter ):
355- prev_url_parsed = compat .parse .urlparse (self .writer .agent_url )
355+ if isinstance (self ._writer , AgentWriter ):
356+ prev_url_parsed = compat .parse .urlparse (self ._writer .agent_url )
356357 else :
357358 prev_url_parsed = compat .parse .urlparse ("" )
358359
@@ -369,26 +370,26 @@ def configure(
369370 port = prev_url_parsed .port
370371 scheme = "https" if https else "http"
371372 url = "%s://%s:%s" % (scheme , hostname , port )
372- elif isinstance (self .writer , AgentWriter ):
373+ elif isinstance (self ._writer , AgentWriter ):
373374 # Reuse the URL from the previous writer if there was one.
374- url = self .writer .agent_url
375+ url = self ._writer .agent_url
375376 else :
376377 # No URL parts have updated and there's no previous writer to
377378 # get the URL from.
378379 url = None
379380
380381 try :
381- self .writer .stop ()
382+ self ._writer .stop ()
382383 except service .ServiceStatusError :
383384 # It's possible the writer never got started in the first place :(
384385 pass
385386
386387 if writer is not None :
387- self .writer = writer
388+ self ._writer = writer
388389 elif url :
389390 # Verify the URL and create a new AgentWriter with it.
390391 agent .verify_url (url )
391- self .writer = AgentWriter (
392+ self ._writer = AgentWriter (
392393 url ,
393394 sampler = self .sampler ,
394395 priority_sampler = self .priority_sampler ,
@@ -397,11 +398,11 @@ def configure(
397398 sync_mode = self ._use_sync_mode (),
398399 api_version = api_version ,
399400 )
400- elif writer is None and isinstance (self .writer , LogWriter ):
401+ elif writer is None and isinstance (self ._writer , LogWriter ):
401402 # No need to do anything for the LogWriter.
402403 pass
403- if isinstance (self .writer , AgentWriter ):
404- self .writer .dogstatsd = get_dogstatsd_client (self ._dogstatsd_url ) # type: ignore[has-type]
404+ if isinstance (self ._writer , AgentWriter ):
405+ self ._writer .dogstatsd = get_dogstatsd_client (self ._dogstatsd_url ) # type: ignore[has-type]
405406 self ._initialize_span_processors ()
406407
407408 if context_provider is not None :
@@ -436,7 +437,7 @@ def _child_after_fork(self):
436437 self ._services = set ()
437438
438439 # Re-create the background writer thread
439- self .writer = self .writer .recreate ()
440+ self ._writer = self ._writer .recreate ()
440441 self ._initialize_span_processors ()
441442
442443 self ._new_process = True
@@ -671,8 +672,8 @@ def _initialize_span_processors(self, appsec_enabled=asbool(get_env("appsec", "e
671672 partial_flush_enabled = self ._partial_flush_enabled ,
672673 partial_flush_min_spans = self ._partial_flush_min_spans ,
673674 trace_processors = trace_processors ,
674- writer = self .writer ,
675- )
675+ writer = self ._writer ,
676+ ),
676677 ] # type: List[SpanProcessor]
677678
678679 if appsec_enabled :
@@ -811,7 +812,24 @@ def write(self, spans):
811812 return
812813
813814 if spans is not None :
814- self .writer .write (spans = spans )
815+ self ._writer .write (spans = spans )
816+
817+ @removals .removed_property (message = "Use Tracer.flush instead to flush buffered traces to agent" , version = "1.0.0" )
818+ def writer (self ):
819+ return self ._writer
820+
821+ @property
822+ def agent_trace_url (self ):
823+ # type: () -> Optional[str]
824+ """Trace agent url"""
825+ if isinstance (self ._writer , AgentWriter ):
826+ return self ._writer .agent_url
827+
828+ return None
829+
830+ def flush (self ):
831+ """Flush the buffer of the trace writer. This does nothing if an unbuffered trace writer is used."""
832+ self ._writer .flush_queue ()
815833
816834 @deprecated (message = "Manually setting service info is no longer necessary" , version = "1.0.0" )
817835 def set_service_info (self , * args , ** kwargs ):
@@ -974,7 +992,7 @@ def shutdown(self, timeout=None):
974992 :type timeout: :obj:`int` | :obj:`float` | :obj:`None`
975993 """
976994 try :
977- self .writer .stop (timeout = timeout )
995+ self ._writer .stop (timeout = timeout )
978996 except service .ServiceStatusError :
979997 # It's possible the writer never got started in the first place :(
980998 pass
0 commit comments