Skip to content
This repository was archived by the owner on Jul 11, 2022. It is now read-only.

Commit 0597daf

Browse files
authored
Add types to reporter (#333)
* Add types to reporter Signed-off-by: Kai Mueller <[email protected]> * Fix mypy issue Signed-off-by: Kai Mueller <[email protected]> * Delete =5 Signed-off-by: Kai Mueller <[email protected]>
1 parent 605ae04 commit 0597daf

File tree

2 files changed

+43
-30
lines changed

2 files changed

+43
-30
lines changed

jaeger_client/reporter.py

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import logging
1818
import threading
19+
from typing import List, Dict, Optional, Any
1920

2021
import tornado.gen
2122
import tornado.ioloop
@@ -25,8 +26,9 @@
2526
from .constants import DEFAULT_FLUSH_INTERVAL
2627
from . import thrift
2728
from . import ioloop_util
28-
from .metrics import Metrics, LegacyMetricsFactory
29+
from .metrics import Metrics, LegacyMetricsFactory, MetricsFactory
2930
from .utils import ErrorReporter
31+
from .span import Span
3032

3133
from thrift.protocol import TCompactProtocol
3234
from jaeger_client.thrift_gen.agent import Agent
@@ -36,55 +38,63 @@
3638

3739
class BaseReporter(object):
3840
"""Abstract class."""
39-
def report_span(self, span):
41+
def report_span(self, span: Span) -> None:
4042
raise NotImplementedError()
4143

42-
def set_process(self, service_name, tags, max_length):
44+
def set_process(self, service_name: str, tags: Any, max_length: int) -> None:
4345
pass
4446

45-
def close(self):
46-
fut = Future()
47+
def close(self) -> Future:
48+
fut: Future = Future()
4749
fut.set_result(True)
4850
return fut
4951

5052

5153
class NullReporter(BaseReporter):
5254
"""Ignores all spans."""
53-
def report_span(self, span):
55+
def report_span(self, span: Span) -> None:
5456
pass
5557

5658

5759
class InMemoryReporter(BaseReporter):
5860
"""Stores spans in memory and returns them via get_spans()."""
59-
def __init__(self):
61+
def __init__(self) -> None:
6062
super(InMemoryReporter, self).__init__()
61-
self.spans = []
63+
self.spans: List[Span] = []
6264
self.lock = threading.Lock()
6365

64-
def report_span(self, span):
66+
def report_span(self, span: Span) -> None:
6567
with self.lock:
6668
self.spans.append(span)
6769

68-
def get_spans(self):
70+
def get_spans(self) -> List[Span]:
6971
with self.lock:
7072
return self.spans[:]
7173

7274

7375
class LoggingReporter(BaseReporter):
7476
"""Logs all spans."""
75-
def __init__(self, logger=None):
77+
def __init__(self, logger: Optional[logging.Logger] = None) -> None:
7678
self.logger = logger if logger else default_logger
7779

78-
def report_span(self, span):
80+
def report_span(self, span: Span) -> None:
7981
self.logger.info('Reporting span %s', span)
8082

8183

8284
class Reporter(BaseReporter):
8385
"""Receives completed spans from Tracer and submits them out of process."""
84-
def __init__(self, channel, queue_capacity=100, batch_size=10,
85-
flush_interval=DEFAULT_FLUSH_INTERVAL, io_loop=None,
86-
error_reporter=None, metrics=None, metrics_factory=None,
87-
**kwargs):
86+
def __init__(
87+
self,
88+
channel: Any,
89+
queue_capacity: int = 100,
90+
batch_size: int = 10,
91+
flush_interval: Optional[float] = DEFAULT_FLUSH_INTERVAL,
92+
io_loop: Any = None,
93+
error_reporter: Optional[ErrorReporter] = None,
94+
metrics: Optional[Metrics] = None,
95+
metrics_factory: Optional[MetricsFactory] = None,
96+
**kwargs: Any
97+
) -> None:
8898
"""
8999
:param channel: a communication channel to jaeger-agent
90100
:param queue_capacity: how many spans we can hold in memory before
@@ -119,7 +129,7 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
119129
if self.io_loop is None:
120130
self.logger.error('Jaeger Reporter has no IOLoop')
121131
else:
122-
self.queue = tornado.queues.Queue(maxsize=queue_capacity)
132+
self.queue: tornado.queues.Queue = tornado.queues.Queue(maxsize=queue_capacity)
123133
self.stop = object()
124134
self.stopped = False
125135
self.stop_lock = Lock()
@@ -129,13 +139,13 @@ def __init__(self, channel, queue_capacity=100, batch_size=10,
129139
self._process_lock = Lock()
130140
self._process = None
131141

132-
def set_process(self, service_name, tags, max_length):
142+
def set_process(self, service_name: str, tags: Dict, max_length: int) -> None:
133143
with self._process_lock:
134144
self._process = thrift.make_process(
135145
service_name=service_name, tags=tags, max_length=max_length,
136146
)
137147

138-
def report_span(self, span):
148+
def report_span(self, span: Span) -> None:
139149
self.io_loop.add_callback(self._report_span_from_ioloop, span)
140150

141151
def _report_span_from_ioloop(self, span):
@@ -179,7 +189,7 @@ def _consume_queue(self):
179189
self.logger.info('Span publisher exited')
180190

181191
# method for protocol factory
182-
def getProtocol(self, transport):
192+
def getProtocol(self, transport: Any) -> TCompactProtocol:
183193
"""
184194
Implements Thrift ProtocolFactory interface
185195
:param: transport:
@@ -216,7 +226,7 @@ def _send(self, batch):
216226
"""
217227
return self.agent.emitBatch(batch)
218228

219-
def close(self):
229+
def close(self) -> Future:
220230
"""
221231
Ensure that all spans from the queue are submitted.
222232
Returns Future that will be completed once the queue is empty.
@@ -235,7 +245,7 @@ def _flush(self):
235245
class ReporterMetrics(object):
236246
"""Reporter specific metrics."""
237247

238-
def __init__(self, metrics_factory):
248+
def __init__(self, metrics_factory: MetricsFactory) -> None:
239249
self.reporter_success = \
240250
metrics_factory.create_counter(name='jaeger:reporter_spans', tags={'result': 'ok'})
241251
self.reporter_failure = \
@@ -248,22 +258,22 @@ def __init__(self, metrics_factory):
248258

249259
class CompositeReporter(BaseReporter):
250260
"""Delegates reporting to one or more underlying reporters."""
251-
def __init__(self, *reporters):
261+
def __init__(self, *reporters: BaseReporter) -> None:
252262
self.reporters = reporters
253263

254-
def set_process(self, service_name, tags, max_length):
264+
def set_process(self, service_name: str, tags: Any, max_length: int) -> None:
255265
for reporter in self.reporters:
256266
reporter.set_process(service_name, tags, max_length)
257267

258-
def report_span(self, span):
268+
def report_span(self, span: Span) -> None:
259269
for reporter in self.reporters:
260270
reporter.report_span(span)
261271

262-
def close(self):
272+
def close(self) -> Future:
263273
from threading import Lock
264274
lock = Lock()
265275
count = [0]
266-
future = Future()
276+
future: Future = Future()
267277

268278
def on_close(_):
269279
with lock:

jaeger_client/utils.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414

1515
import socket
1616
import struct
17+
from logging import Logger
1718

1819
import time
19-
from typing import Optional
20+
from typing import Any, Optional
2021

2122

2223
class ErrorReporter(object):
@@ -27,12 +28,14 @@ class ErrorReporter(object):
2728
N.B. metrics will be deprecated in the future
2829
"""
2930

30-
def __init__(self, metrics, logger=None, log_interval_minutes=15):
31+
def __init__(
32+
self, metrics: Any, logger: Optional[Logger] = None, log_interval_minutes: int = 15
33+
):
3134
self.logger = logger
3235
self.log_interval_minutes = log_interval_minutes
3336
self._last_error_reported_at = time.time()
3437

35-
def error(self, *args):
38+
def error(self, *args: Any) -> None:
3639
if self.logger is None:
3740
return
3841

0 commit comments

Comments
 (0)