|
1 | 1 | from basictracer import Sampler, SpanRecorder |
2 | 2 | import instana.agent_const as a |
3 | | -import instana.http as http |
4 | | -import instana.custom as c |
5 | 3 | import threading as t |
6 | 4 | import opentracing.ext.tags as ext |
7 | 5 | import socket |
8 | | -import instana.data as d |
| 6 | +import instana.span_data as sd |
| 7 | +import Queue |
| 8 | +import time |
9 | 9 |
|
10 | | -class InstanaSpan(object): |
11 | | - t = 0 |
12 | | - p = None |
13 | | - s = 0 |
14 | | - ts = 0 |
15 | | - d = 0 |
16 | | - n = None |
17 | | - f = None |
18 | | - data = None |
19 | | - |
20 | | - def __init__(self, **kwds): |
21 | | - self.__dict__.update(kwds) |
22 | 10 |
|
23 | 11 | class InstanaRecorder(SpanRecorder): |
24 | 12 | sensor = None |
| 13 | + registered_spans = ("memcache", "rpc-client", "rpc-server") |
| 14 | + queue = Queue.Queue() |
25 | 15 |
|
26 | 16 | def __init__(self, sensor): |
27 | 17 | super(InstanaRecorder, self).__init__() |
28 | 18 | self.sensor = sensor |
| 19 | + self.run() |
| 20 | + |
| 21 | + def run(self): |
| 22 | + """ Span a background thread to periodically report queued spans """ |
| 23 | + self.timer = t.Thread(target=self.report_spans) |
| 24 | + self.timer.daemon = True |
| 25 | + self.timer.name = "Instana Span Reporting" |
| 26 | + self.timer.start() |
| 27 | + |
| 28 | + def report_spans(self): |
| 29 | + """ Periodically report the queued spans """ |
| 30 | + while 1: |
| 31 | + if self.sensor.agent.can_send() and self.queue.qsize: |
| 32 | + url = self.sensor.agent.make_url(a.AGENT_TRACES_URL) |
| 33 | + self.sensor.agent.request(url, "POST", self.queued_spans()) |
| 34 | + time.sleep(2) |
| 35 | + |
| 36 | + def queue_size(self): |
| 37 | + """ Return the size of the queue; how may spans are queued, """ |
| 38 | + return self.queue.qsize() |
| 39 | + |
| 40 | + def queued_spans(self): |
| 41 | + """ Get all of the spans in the queue """ |
| 42 | + spans = [] |
| 43 | + while True: |
| 44 | + try: |
| 45 | + s = self.queue.get(False) |
| 46 | + except Queue.Empty: |
| 47 | + break |
| 48 | + else: |
| 49 | + spans.append(s) |
| 50 | + return spans |
| 51 | + |
| 52 | + def clear_spans(self): |
| 53 | + """ Clear the queue of spans """ |
| 54 | + self.queued_spans() |
29 | 55 |
|
30 | 56 | def record_span(self, span): |
| 57 | + """ |
| 58 | + Convert the passed BasicSpan into an InstanaSpan and |
| 59 | + add it to the span queue |
| 60 | + """ |
31 | 61 | if self.sensor.agent.can_send(): |
32 | | - data = d.Data(service=self.get_service_name(span), |
33 | | - http=http.HttpData(host=self.get_host_name(span), |
34 | | - url=self.get_string_tag(span, ext.HTTP_URL), |
35 | | - method=self.get_string_tag(span, ext.HTTP_METHOD), |
36 | | - status=self.get_tag(span, ext.HTTP_STATUS_CODE)), |
37 | | - baggage=span.context.baggage, |
38 | | - custom=c.CustomData(tags=span.tags, |
39 | | - logs=self.collect_logs(span))) |
40 | | - |
41 | | - t.Thread(target=self.sensor.agent.request, |
42 | | - args=(self.sensor.agent.make_url(a.AGENT_TRACES_URL), "POST", |
43 | | - [InstanaSpan(t=span.context.trace_id, |
44 | | - p=span.parent_id, |
45 | | - s=span.context.span_id, |
46 | | - ts=int(round(span.start_time * 1000)), |
47 | | - d=int(round(span.duration * 1000)), |
48 | | - n=self.get_http_type(span), |
49 | | - f=self.sensor.agent.from_, |
50 | | - data=data)])).start() |
| 62 | + instana_span = None |
| 63 | + |
| 64 | + if span.operation_name in self.registered_spans: |
| 65 | + instana_span = self.build_registered_span(span) |
| 66 | + else: |
| 67 | + instana_span = self.build_sdk_span(span) |
| 68 | + |
| 69 | + self.queue.put(instana_span) |
| 70 | + |
| 71 | + def build_registered_span(self, span): |
| 72 | + """ Takes a BasicSpan and converts it into a registered InstanaSpan """ |
| 73 | + data = sd.Data(service=self.get_service_name(span), |
| 74 | + http=sd.HttpData(host=self.get_host_name(span), |
| 75 | + url=self.get_string_tag(span, ext.HTTP_URL), |
| 76 | + method=self.get_string_tag(span, ext.HTTP_METHOD), |
| 77 | + status=self.get_tag(span, ext.HTTP_STATUS_CODE)), |
| 78 | + baggage=span.context.baggage, |
| 79 | + custom=sd.CustomData(tags=span.tags, |
| 80 | + logs=self.collect_logs(span))) |
| 81 | + return sd.InstanaSpan( |
| 82 | + t=span.context.trace_id, |
| 83 | + p=span.parent_id, |
| 84 | + s=span.context.span_id, |
| 85 | + ts=int(round(span.start_time * 1000)), |
| 86 | + d=int(round(span.duration * 1000)), |
| 87 | + n=self.get_http_type(span), |
| 88 | + f=self.sensor.agent.from_, |
| 89 | + data=data) |
| 90 | + |
| 91 | + def build_sdk_span(self, span): |
| 92 | + """ Takes a BasicSpan and converts into an SDK type InstanaSpan """ |
| 93 | + |
| 94 | + custom_data = sd.CustomData( |
| 95 | + tags=span.tags, |
| 96 | + logs=self.collect_logs(span)) |
| 97 | + |
| 98 | + sdk_data = sd.SDKData( |
| 99 | + Name=span.operation_name, |
| 100 | + Custom=custom_data |
| 101 | + ) |
| 102 | + |
| 103 | + if "span.kind" in span.tags: |
| 104 | + sdk_data.Type = span.tags["span.kind"] |
| 105 | + |
| 106 | + data = sd.Data(service=self.get_service_name(span), |
| 107 | + sdk=sdk_data) |
| 108 | + |
| 109 | + return sd.InstanaSpan( |
| 110 | + t=span.context.trace_id, |
| 111 | + p=span.parent_id, |
| 112 | + s=span.context.span_id, |
| 113 | + ts=int(round(span.start_time * 1000)), |
| 114 | + d=int(round(span.duration * 1000)), |
| 115 | + n="sdk", |
| 116 | + f=self.sensor.agent.from_, |
| 117 | + data=data) |
51 | 118 |
|
52 | 119 | def get_tag(self, span, tag): |
53 | 120 | if tag in span.tags: |
|
0 commit comments