55import logging
66import os
77import sys
8- from typing import Dict
98from typing import List
109from typing import Optional
1110from typing import TYPE_CHECKING
4140if TYPE_CHECKING :
4241 from ddtrace import Span
4342
44- from .agent import ConnectionType
45-
4643
4744log = get_logger (__name__ )
4845
5754DEFAULT_BUFFER_SIZE = 8 << 20 # 8 MB
5855DEFAULT_MAX_PAYLOAD_SIZE = 8 << 20 # 8 MB
5956DEFAULT_PROCESSING_INTERVAL = 1.0
60- DEFAULT_REUSE_CONNECTIONS = False
6157
6258
6359def get_writer_buffer_size ():
@@ -79,11 +75,6 @@ def get_writer_interval_seconds():
7975 )
8076
8177
82- def get_writer_reuse_connections ():
83- # type: () -> bool
84- return asbool (os .getenv ("DD_TRACE_WRITER_REUSE_CONNECTIONS" , DEFAULT_REUSE_CONNECTIONS ))
85-
86-
8778def _human_size (nbytes ):
8879 """Return a human-readable size."""
8980 i = 0
@@ -255,7 +246,6 @@ def __init__(
255246 report_metrics = False , # type: bool
256247 sync_mode = False , # type: bool
257248 api_version = None , # type: Optional[str]
258- reuse_connections = None , # type: Optional[bool]
259249 ):
260250 # type: (...) -> None
261251 # Pre-conditions:
@@ -311,7 +301,6 @@ def __init__(
311301 self ._metrics_reset ()
312302 self ._drop_sma = SimpleMovingAverage (DEFAULT_SMA_WINDOW )
313303 self ._sync_mode = sync_mode
314- self ._conn = None # type: Optional[ConnectionType]
315304 self ._retry_upload = tenacity .Retrying (
316305 # Retry RETRY_ATTEMPTS times within the first half of the processing
317306 # interval, using a Fibonacci policy with jitter
@@ -322,7 +311,6 @@ def __init__(
322311 retry = tenacity .retry_if_exception_type ((compat .httplib .HTTPException , OSError , IOError )),
323312 )
324313 self ._log_error_payloads = asbool (os .environ .get ("_DD_TRACE_WRITER_LOG_ERROR_PAYLOADS" , False ))
325- self ._reuse_connections = get_writer_reuse_connections () if reuse_connections is None else reuse_connections
326314
327315 @property
328316 def _agent_endpoint (self ):
@@ -375,34 +363,22 @@ def recreate(self):
375363 api_version = self ._api_version ,
376364 )
377365
378- def _reset_connection (self ):
379- # type: () -> None
380- if self ._conn :
381- self ._conn .close ()
382- self ._conn = None
383-
384366 def _put (self , data , headers ):
385- # type: (bytes, Dict[str, str]) -> Response
367+ conn = get_connection (self .agent_url , self ._timeout )
368+
386369 with StopWatch () as sw :
387- if self ._conn is None :
388- log .debug ("creating new agent connection to %s with timeout %d" , self .agent_url , self ._timeout )
389- self ._conn = get_connection (self .agent_url , self ._timeout )
390370 try :
391- self . _conn .request ("PUT" , self ._endpoint , data , headers )
392- resp = compat .get_connection_response (self . _conn )
371+ conn .request ("PUT" , self ._endpoint , data , headers )
372+ resp = compat .get_connection_response (conn )
393373 t = sw .elapsed ()
394374 if t >= self .interval :
395375 log_level = logging .WARNING
396376 else :
397377 log_level = logging .DEBUG
398378 log .log (log_level , "sent %s in %.5fs to %s" , _human_size (len (data )), t , self ._agent_endpoint )
399379 return Response .from_http_response (resp )
400- except Exception :
401- self ._reset_connection ()
402- raise
403380 finally :
404- if self ._conn and not self ._reuse_connections :
405- self ._reset_connection ()
381+ conn .close ()
406382
407383 def _downgrade (self , payload , response ):
408384 if self ._endpoint == "v0.5/traces" :
@@ -581,8 +557,4 @@ def _stop_service( # type: ignore[override]
581557 super (AgentWriter , self )._stop_service ()
582558 self .join (timeout = timeout )
583559
584- def on_shutdown (self ):
585- try :
586- self .periodic ()
587- finally :
588- self ._reset_connection ()
560+ on_shutdown = periodic
0 commit comments