1- from pathlib import Path
1+ import yaml
2+ import sqlite3
23import sys
34import signal
5+ import time
6+ import random
47
5- from netfilterqueue import NetfilterQueue
6- import yaml
78from dhalsim .py3_logger import get_logger
8-
9+ from pathlib import Path
10+ from netfilterqueue import NetfilterQueue
911from abc import ABCMeta , abstractmethod
1012
13+ class Error (Exception ):
14+ """Base class for exceptions in this module."""
15+
16+
17+ class DatabaseError (Error ):
18+ """Raised when not being able to connect to the database"""
19+
1120
1221class PacketQueue (metaclass = ABCMeta ):
1322 """
1423 Currently, the Netfilterqueue library in Python3 does not support running in threads, using blocking calls.
1524 We will use this class to launch a subprocess that handles the packets in the queue.
1625 """
1726
27+ DB_TRIES = 10
28+ """Amount of times a db query will retry on a exception"""
29+
1830 def __init__ (self , intermediate_yaml_path : Path , yaml_index : int , queue_number : int ):
1931
2032 signal .signal (signal .SIGINT , self .sigint_handler )
@@ -31,6 +43,7 @@ def __init__(self, intermediate_yaml_path: Path, yaml_index: int, queue_number:
3143
3244 # Get the attack that we are from the intermediate YAML
3345 self .intermediate_attack = self .intermediate_yaml ["network_attacks" ][self .yaml_index ]
46+ self .db_sleep_time = random .uniform (0.01 , 0.1 )
3447
3548 def main_loop (self ):
3649 self .logger .debug ('Parent NF Class launched' )
@@ -44,6 +57,60 @@ def main_loop(self):
4457 self .nfqueue .unbind ()
4558 sys .exit (0 )
4659
60+ def db_query (self , query , write = False , parameters = None ):
61+ """
62+ Execute a query on the database
63+ On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
64+ Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
65+ This is necessary because of the limited concurrency in SQLite.
66+
67+ :param query: The SQL query to execute in the db
68+ :type query: str
69+
70+ :param write: Boolean flag to indicate if this query will write into the database
71+
72+ :param parameters: The parameters to put in the query. This must be a tuple.
73+
74+ :raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
75+ :code:`DB_TRIES` tries.
76+ """
77+ for i in range (self .DB_TRIES ):
78+ try :
79+ with sqlite3 .connect (self .intermediate_yaml ["db_path" ]) as conn :
80+ cur = conn .cursor ()
81+ if parameters :
82+ cur .execute (query , parameters )
83+ else :
84+ cur .execute (query )
85+ conn .commit ()
86+
87+ if not write :
88+ return cur .fetchone ()[0 ]
89+ else :
90+ return
91+ except sqlite3 .OperationalError as exc :
92+ self .logger .info (
93+ "Failed to connect to db with exception {exc}. Trying {i} more times." .format (
94+ exc = exc , i = self .DB_TRIES - i - 1 ))
95+ time .sleep (self .db_sleep_time )
96+ self .logger .error (
97+ "Failed to connect to db. Tried {i} times." .format (i = self .DB_TRIES ))
98+ raise DatabaseError ("Failed to get master clock from database" )
99+
100+ def get_master_clock (self ):
101+ """
102+ Get the value of the master clock of the physical process through the database.
103+ On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
104+ Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
105+
106+ :return: Iteration in the physical process.
107+
108+ :raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
109+ :code:`DB_TRIES` tries.
110+ """
111+ master_time = self .db_query ("SELECT time FROM master_time WHERE id IS 1" , False , None )
112+ return master_time
113+
47114 @abstractmethod
48115 def capture (self , pkt ):
49116 """
0 commit comments