-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathaikido_background_process.py
More file actions
124 lines (109 loc) · 4.3 KB
/
aikido_background_process.py
File metadata and controls
124 lines (109 loc) · 4.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
Simply exports the aikido background process
"""
import multiprocessing.connection as con
import signal
import time
import sched
import traceback
import sys
from threading import Thread
from queue import Queue
from aikido_zen.helpers.logging import logger
from aikido_zen.background_process.cloud_connection_manager import (
CloudConnectionManager,
)
from aikido_zen.helpers.check_env_for_blocking import check_env_for_blocking
from aikido_zen.helpers.token import get_token_from_env
from aikido_zen.background_process.api.http_api_ratelimited import (
ReportingApiHTTPRatelimited,
)
from aikido_zen.helpers.urls.get_api_url import get_api_url
from .commands import process_incoming_command
EMPTY_QUEUE_INTERVAL = 5 # 5 seconds
class AikidoBackgroundProcess:
"""
Aikido's background process consists of 2 threads :
- (main) Listening thread which listens on an IPC socket for incoming data
- (spawned) reporting thread which collects IPC data and send it to a CloudConnectionManager
"""
def __init__(self, address, key):
try:
listener = con.Listener(address, authkey=None)
except OSError:
logger.debug("not listening, another thread is already listening.")
sys.exit(0)
self.queue = Queue()
self.connection_manager = None
# Start reporting thread :
Thread(target=self.reporting_thread, daemon=True).start()
logger.debug(
"Background process started successfully, with UDS File : %s", address
)
add_exit_handlers()
while True:
conn = listener.accept()
while True:
try:
data = conn.recv() # because of this no sleep needed in thread
logger.debug("Incoming data : %s", data)
process_incoming_command(
connection_manager=self.connection_manager,
obj=data,
conn=conn,
queue=self.queue,
)
conn.close() # Sort of EOL for Python IPC
break
except Exception as e:
logger.error("Exception occurred in server thread : %s", e)
logger.debug("Trace \n %s", traceback.format_exc())
break # Return back to listening for new connections
def reporting_thread(self):
"""Reporting thread"""
logger.debug("Started reporting thread")
event_scheduler = sched.scheduler(
time.monotonic, time.sleep
) # Create an event scheduler
self.send_to_connection_manager(event_scheduler)
api = ReportingApiHTTPRatelimited(
reporting_url=get_api_url(),
max_events_per_interval=100,
interval_in_ms=60 * 60 * 1000,
)
# We need to pass along the scheduler so that the heartbeat also gets sent
self.connection_manager = CloudConnectionManager(
block=check_env_for_blocking(),
api=api,
token=get_token_from_env(),
serverless=False,
)
time.sleep(2) # Sleep 2 seconds to make sure modules get reported
self.connection_manager.start(event_scheduler)
event_scheduler.run()
def send_to_connection_manager(self, event_scheduler):
"""
Reports the found data to an Aikido server
"""
# Add back to event scheduler in EMPTY_QUEUE_INTERVAL secs :
event_scheduler.enter(
EMPTY_QUEUE_INTERVAL, 1, self.send_to_connection_manager, (event_scheduler,)
)
while not self.queue.empty():
queue_attack_item = self.queue.get()
self.connection_manager.on_detected_attack(
attack=queue_attack_item[0],
context=queue_attack_item[1],
blocked=queue_attack_item[2],
stack=queue_attack_item[3],
)
def add_exit_handlers():
"""
We add graceful exit handlers here since the process keeps hanging otherwise.
"""
def exit_gracefully(sig, frame):
sys.exit(0)
signal.signal(signal.SIGINT, exit_gracefully)
signal.signal(signal.SIGTERM, exit_gracefully)
signal.signal(signal.SIGQUIT, exit_gracefully)
signal.signal(signal.SIGHUP, exit_gracefully)