|
| 1 | +import asyncio |
| 2 | + |
| 3 | +from copy import copy |
| 4 | +from time import sleep |
| 5 | + |
| 6 | +from logbook import Logger |
| 7 | +from queue import Queue |
| 8 | + |
1 | 9 | from riemann_client.client import QueuedClient |
2 | 10 |
|
3 | 11 |
|
| 12 | +logger = Logger("Processor") |
| 13 | + |
| 14 | + |
| 15 | +class StopEvent(object): |
| 16 | + pass |
| 17 | + |
| 18 | + |
4 | 19 | class AugmentFixture(object): |
5 | | - def apply_augment(self, **data): |
6 | | - if "service" not in data: |
| 20 | + |
| 21 | + def apply_augment(self, data): |
| 22 | + if "service" not in data or hasattr(self, "stopped"): |
7 | 23 | return |
8 | 24 |
|
9 | 25 | key = data["service"] |
10 | | - if key in self.augments: |
11 | | - subscribers = self.augments[key] |
| 26 | + if key not in self.augments: |
| 27 | + return |
| 28 | + |
| 29 | + subscribers = self.augments[key] |
| 30 | + for sub in subscribers: |
| 31 | + sub.put_nowait(data) |
| 32 | + |
| 33 | + def on_stop(self): |
| 34 | + if hasattr(self, "stopped"): |
| 35 | + return |
| 36 | + |
| 37 | + logger.info("Stopping all augments") |
| 38 | + |
| 39 | + self.stopped = True |
| 40 | + for _, subscribers in self.augments.items(): |
| 41 | + for sub in subscribers: |
| 42 | + sub.put_nowait(StopEvent()) |
| 43 | + |
| 44 | + for _, subscribers in self.augments.items(): |
12 | 45 | for sub in subscribers: |
13 | | - sub.send(data) |
| 46 | + while not sub.empty(): |
| 47 | + sleep(0.1) |
14 | 48 |
|
15 | 49 |
|
16 | 50 | class QClient(QueuedClient, AugmentFixture): |
17 | 51 | def __init__(self, *args, **kwargs): |
18 | 52 | super(QClient, self).__init__(*args, **kwargs) |
19 | 53 | self.augments = {} |
| 54 | + self.tasks = [] |
20 | 55 |
|
21 | 56 | def event(self, **data): |
22 | | - self.apply_augment(**data) |
| 57 | + self.tasks.append(self.apply_augment(copy(data))) |
23 | 58 | super(QClient, self).event(**data) |
24 | 59 |
|
25 | 60 |
|
26 | | -def flush(client, transport, logger): |
27 | | - try: |
28 | | - transport.connect() |
29 | | - client.flush() |
30 | | - transport.disconnect() |
31 | | - except ConnectionRefusedError as ce: |
32 | | - logger.warn(ce) |
| 61 | +async def flush(client, transport, logger): |
| 62 | + future = asyncio.Future() |
| 63 | + |
| 64 | + async def process_async(future): |
| 65 | + try: |
| 66 | + transport.connect() |
| 67 | + client.flush() |
| 68 | + transport.disconnect() |
| 69 | + |
| 70 | + future.set_result(True) |
| 71 | + except ConnectionRefusedError as ce: |
| 72 | + logger.warn(ce) |
33 | 73 |
|
| 74 | + future.set_result(False) |
34 | 75 |
|
35 | | -def register_augment(client, key, generator, logger): |
| 76 | + asyncio.ensure_future(process_async(future)) |
| 77 | + await future |
| 78 | + return future.result() |
| 79 | + |
| 80 | + |
| 81 | +def register_augment(client, key, augment_fn, logger): |
36 | 82 | if key not in client.augments: |
37 | 83 | client.augments[key] = [] |
38 | 84 |
|
39 | | - next(generator) |
| 85 | + loop = asyncio.get_event_loop() |
| 86 | + |
| 87 | + def generator(q): |
| 88 | + stopped = False |
| 89 | + while not (stopped and q.empty()): |
| 90 | + logger.debug("Waiting for event for {0}".format(augment_fn)) |
| 91 | + event = q.get(block=not stopped) |
| 92 | + if isinstance(event, StopEvent): |
| 93 | + logger.debug("Stopping event generator") |
| 94 | + stopped = True |
| 95 | + continue |
| 96 | + yield event |
| 97 | + raise StopIteration |
| 98 | + |
| 99 | + q = Queue() |
| 100 | + g = generator(q) |
| 101 | + |
| 102 | + def execute_in_thread(fn, client, g): |
| 103 | + fn(client, g) |
| 104 | + |
| 105 | + loop.run_in_executor(None, execute_in_thread, augment_fn, client, g) |
40 | 106 |
|
41 | | - generator.send(client) |
42 | | - client.augments[key].append(generator) |
| 107 | + client.augments[key].append(q) |
0 commit comments