@@ -124,6 +124,55 @@ def __init__(self, intermediate_yaml):
124124 self .db_sleep_time = random .uniform (0.01 , 0.1 )
125125 self .logger .info ("DB Sleep time: " + str (self .db_sleep_time ))
126126
127+ def set_sync (self , flag ):
128+ """
129+ Set this plcs sync flag in the sync table. When this is 1, the physical process
130+ knows this plc finished the requested iteration.
131+ On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
132+ Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
133+ :param flag: True for sync to 1, False for sync to 0
134+ :type flag: bool
135+ :raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
136+ :code:`DB_TRIES` tries.
137+ """
138+ #"UPDATE sync SET flag=2"
139+ self .db_query ("UPDATE sync SET flag=?" , True , (int (flag ),))
140+
141+ def db_query (self , query , write = False , parameters = None ):
142+ """
143+ Execute a query on the database
144+ On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
145+ Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
146+ This is necessary because of the limited concurrency in SQLite.
147+ :param query: The SQL query to execute in the db
148+ :type query: str
149+ :param write: Boolean flag to indicate if this query will write into the database
150+ :param parameters: The parameters to put in the query. This must be a tuple.
151+ :raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
152+ :code:`DB_TRIES` tries.
153+ """
154+ for i in range (self .DB_TRIES ):
155+ try :
156+ with sqlite3 .connect (self .data ["db_path" ]) as conn :
157+ cur = conn .cursor ()
158+ if parameters :
159+ cur .execute (query , parameters )
160+ else :
161+ cur .execute (query )
162+ conn .commit ()
163+
164+ if not write :
165+ return cur .fetchone ()[0 ]
166+ else :
167+ return
168+ except sqlite3 .OperationalError as exc :
169+ self .logger .info (
170+ "Failed to connect to db with exception {exc}. Trying {i} more times." .format (
171+ exc = exc , i = self .DB_TRIES - i - 1 ))
172+ time .sleep (self .db_sleep_time )
173+
174+ self .logger .error ("Failed to connect to db. Tried {i} times." .format (i = self .DB_TRIES ))
175+ raise DatabaseError ("Failed to execute db query in database" )
127176
128177 def prepare_wntr_simulator (self ):
129178 self .logger .info ("Preparing wntr simulation" )
@@ -577,7 +626,7 @@ def set_to_db(self, what, value):
577626 time .sleep (self .db_sleep_time )
578627 self .logger .error (
579628 "Failed to connect to db. Tried {i} times." .format (i = self .DB_TRIES ))
580- raise DatabaseError ("Failed to get master clock from database" )
629+ raise DatabaseError ("Failed to set value to database" )
581630
582631 def get_from_db (self , what ):
583632 """Returns the first element of the result tuple."""
@@ -645,10 +694,12 @@ def simulate_with_epynet(self, iteration_limit, p_bar):
645694 time .sleep (self .WAIT_FOR_FLAG )
646695
647696 # Notify the PLCs they can start receiving remote values
648- with sqlite3 .connect (self .data ["db_path" ]) as conn :
649- c = conn .cursor ()
650- c .execute ("UPDATE sync SET flag=2" )
651- conn .commit ()
697+ self .set_sync (2 )
698+
699+ #with sqlite3.connect(self.data["db_path"]) as conn:
700+ # c = conn.cursor()
701+ # c.execute("UPDATE sync SET flag=2")
702+ # conn.commit()
652703
653704 # Wait for the PLCs to apply control logic
654705 while not self .get_plcs_ready (3 ):
@@ -696,10 +747,11 @@ def simulate_with_epynet(self, iteration_limit, p_bar):
696747 self .write_results (self .results_list )
697748
698749 # Set sync flags for nodes
699- with sqlite3 .connect (self .data ["db_path" ]) as conn :
700- c = conn .cursor ()
701- c .execute ("UPDATE sync SET flag=0" )
702- conn .commit ()
750+ self .set_sync (0 )
751+ #with sqlite3.connect(self.data["db_path"]) as conn:
752+ # c = conn.cursor()
753+ # c.execute("UPDATE sync SET flag=0")
754+ # conn.commit()
703755
704756 simulation_time = simulation_time + internal_epynet_step
705757 conn = sqlite3 .connect (self .data ["db_path" ])
@@ -722,10 +774,11 @@ def simulate_with_wntr(self, iteration_limit, p_bar):
722774 time .sleep (self .WAIT_FOR_FLAG )
723775
724776 # Notify the PLCs they can start receiving remote values
725- with sqlite3 .connect (self .data ["db_path" ]) as conn :
726- c = conn .cursor ()
727- c .execute ("UPDATE sync SET flag=2" )
728- conn .commit ()
777+ self .set_sync (2 )
778+ #with sqlite3.connect(self.data["db_path"]) as conn:
779+ # c = conn.cursor()
780+ # c.execute("UPDATE sync SET flag=2")
781+ # conn.commit()
729782
730783 # Wait for the PLCs to apply control logic
731784 while not self .get_plcs_ready (3 ):
@@ -765,10 +818,11 @@ def simulate_with_wntr(self, iteration_limit, p_bar):
765818 self .write_results (self .results_list )
766819
767820 # Set sync flags for nodes
768- with sqlite3 .connect (self .data ["db_path" ]) as conn :
769- c = conn .cursor ()
770- c .execute ("UPDATE sync SET flag=0" )
771- conn .commit ()
821+ self .set_sync (0 )
822+ #with sqlite3.connect(self.data["db_path"]) as conn:
823+ # c = conn.cursor()
824+ # c.execute("UPDATE sync SET flag=0")
825+ # conn.commit()
772826
773827 def update_tanks (self , network_state = None ):
774828 """Update tanks in database."""
0 commit comments