|
5 | 5 | import unittest |
6 | 6 |
|
7 | 7 | import asynqp |
| 8 | +import aiohttp |
| 9 | +import opentracing |
8 | 10 |
|
9 | 11 | from instana.singletons import async_tracer |
10 | 12 |
|
| 13 | +from .helpers import testenv |
| 14 | + |
| 15 | + |
11 | 16 | rabbitmq_host = "" |
12 | 17 | if "RABBITMQ_HOST" in os.environ: |
13 | 18 | rabbitmq_host = os.environ["RABBITMQ_HOST"] |
@@ -49,6 +54,14 @@ def tearDown(self): |
49 | 54 | """ Purge the queue """ |
50 | 55 | self.loop.run_until_complete(self.reset()) |
51 | 56 |
|
| 57 | + async def fetch(self, session, url, headers=None): |
| 58 | + try: |
| 59 | + async with session.get(url, headers=headers) as response: |
| 60 | + return response |
| 61 | + except aiohttp.web_exceptions.HTTPException: |
| 62 | + pass |
| 63 | + |
| 64 | + |
52 | 65 | def test_publish(self): |
53 | 66 | @asyncio.coroutine |
54 | 67 | def test(): |
@@ -298,3 +311,88 @@ def test(): |
298 | 311 | self.assertIsNone(publish1_span.ec) |
299 | 312 | self.assertFalse(publish2_span.error) |
300 | 313 | self.assertIsNone(publish2_span.ec) |
| 314 | + |
| 315 | + def test_consume_with_ensure_future(self): |
| 316 | + async def run_later(msg): |
| 317 | + # Extract the context from the message (if there is any) |
| 318 | + ctx = async_tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers)) |
| 319 | + |
| 320 | + # Start a new span to track work that is done processing this message |
| 321 | + with async_tracer.start_active_span("run_later", child_of=ctx) as scope: |
| 322 | + scope.span.set_tag("exchange", msg.exchange_name) |
| 323 | + # print("") |
| 324 | + # print("run_later active scope: %s" % async_tracer.scope_manager.active) |
| 325 | + # print("") |
| 326 | + async with aiohttp.ClientSession() as session: |
| 327 | + return await self.fetch(session, testenv["wsgi_server"] + "/") |
| 328 | + |
| 329 | + def handle_message(msg): |
| 330 | + # print("") |
| 331 | + # print("handle_message active scope: %s" % async_tracer.scope_manager.active) |
| 332 | + # print("") |
| 333 | + async_tracer.inject(async_tracer.active_span.context, opentracing.Format.HTTP_HEADERS, msg.headers) |
| 334 | + asyncio.ensure_future(run_later(msg)) |
| 335 | + msg.ack() |
| 336 | + |
| 337 | + @asyncio.coroutine |
| 338 | + def test(): |
| 339 | + with async_tracer.start_active_span('test'): |
| 340 | + msg1 = asynqp.Message({'consume': 'this'}) |
| 341 | + self.exchange.publish(msg1, 'routing.key') |
| 342 | + |
| 343 | + self.consumer = yield from self.queue.consume(handle_message) |
| 344 | + yield from asyncio.sleep(0.5) |
| 345 | + self.consumer.cancel() |
| 346 | + |
| 347 | + self.loop.run_until_complete(test()) |
| 348 | + |
| 349 | + spans = self.recorder.queued_spans() |
| 350 | + self.assertEqual(6, len(spans)) |
| 351 | + |
| 352 | + publish_span = spans[0] |
| 353 | + test_span = spans[1] |
| 354 | + consume_span = spans[2] |
| 355 | + wsgi_span = spans[3] |
| 356 | + aioclient_span = spans[4] |
| 357 | + run_later_span = spans[5] |
| 358 | + |
| 359 | + self.assertIsNone(async_tracer.active_span) |
| 360 | + |
| 361 | + # Same traceId |
| 362 | + self.assertEqual(test_span.t, publish_span.t) |
| 363 | + self.assertEqual(test_span.t, consume_span.t) |
| 364 | + self.assertEqual(test_span.t, aioclient_span.t) |
| 365 | + self.assertEqual(test_span.t, wsgi_span.t) |
| 366 | + |
| 367 | + # Parent relationships |
| 368 | + self.assertEqual(publish_span.p, test_span.s) |
| 369 | + self.assertEqual(consume_span.p, publish_span.s) |
| 370 | + self.assertEqual(aioclient_span.p, run_later_span.s) |
| 371 | + self.assertEqual(run_later_span.p, consume_span.s) |
| 372 | + self.assertEqual(wsgi_span.p, aioclient_span.s) |
| 373 | + |
| 374 | + # publish |
| 375 | + self.assertEqual('test.exchange', publish_span.data.rabbitmq.exchange) |
| 376 | + self.assertEqual('publish', publish_span.data.rabbitmq.sort) |
| 377 | + self.assertIsNotNone(publish_span.data.rabbitmq.address) |
| 378 | + self.assertEqual('routing.key', publish_span.data.rabbitmq.key) |
| 379 | + self.assertIsNotNone(publish_span.stack) |
| 380 | + self.assertTrue(type(publish_span.stack) is list) |
| 381 | + self.assertGreater(len(publish_span.stack), 0) |
| 382 | + |
| 383 | + # consume |
| 384 | + self.assertEqual('test.exchange', consume_span.data.rabbitmq.exchange) |
| 385 | + self.assertEqual('consume', consume_span.data.rabbitmq.sort) |
| 386 | + self.assertIsNotNone(consume_span.data.rabbitmq.address) |
| 387 | + self.assertEqual('routing.key', consume_span.data.rabbitmq.key) |
| 388 | + self.assertIsNotNone(consume_span.stack) |
| 389 | + self.assertTrue(type(consume_span.stack) is list) |
| 390 | + self.assertGreater(len(consume_span.stack), 0) |
| 391 | + |
| 392 | + # Error logging |
| 393 | + self.assertFalse(test_span.error) |
| 394 | + self.assertIsNone(test_span.ec) |
| 395 | + self.assertFalse(consume_span.error) |
| 396 | + self.assertIsNone(consume_span.ec) |
| 397 | + self.assertFalse(publish_span.error) |
| 398 | + self.assertIsNone(publish_span.ec) |
0 commit comments