Skip to content

Commit 7c65e19

Browse files
olivielpeaugmmeyer
authored andcommitted
[forwarder] Fix error on removal of transactions (#3287)
Fixes `ValueError: list.remove(x): x not in list` error Fix in 2 parts: 1. Recompute next flush of transactions on flush timeout. This is needed so that when a flush timeout occurs, the transactions of that flush do not stay at the head of the queue (if they do stay at the head of the queue then, when the max queue size is hit, newer transactions are deleted first, which is not desirable). Recomputing the next flush time might delay these transactions slightly (up to MAX_WAIT_FOR_REPLAY seconds, i.e. 90 seconds), but it's ok here since at this point these transactions have probably been waiting for some time already. 2. Safely remove transactions from list So that the forwarder doesn't crash when a removal attempt is made on a transaction that was already removed.
1 parent e995bb9 commit 7c65e19

File tree

1 file changed

+18
-9
lines changed

1 file changed

+18
-9
lines changed

transaction.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,7 @@ def append(self,tr):
134134
new_trs = sorted(self._transactions,key=attrgetter('_next_flush'), reverse = True)
135135
for tr2 in new_trs:
136136
if (self._total_size + tr_size) > self._MAX_QUEUE_SIZE:
137-
self._transactions.remove(tr2)
138-
self._total_count = self._total_count - 1
139-
self._total_size = self._total_size - tr2.get_size()
137+
self._remove(tr2)
140138
log.warn("Removed transaction %s from queue" % tr2.get_id())
141139

142140
# Done
@@ -148,6 +146,17 @@ def append(self,tr):
148146
log.debug("Transaction %s added" % (tr.get_id()))
149147
self.print_queue_stats()
150148

149+
def _remove(self, tr):
150+
'''Safely remove transaction from list'''
151+
try:
152+
self._transactions.remove(tr)
153+
except ValueError:
154+
# Should not happen if we order the queue consistently, but we should catch the error anyway
155+
log.warn("Tried to remove transaction %s from queue but it was not in the queue anymore.", tr.get_id())
156+
else:
157+
self._total_count -= 1
158+
self._total_size -= tr.get_size()
159+
151160
def flush(self):
152161

153162
if self._trs_to_flush is not None:
@@ -201,6 +210,10 @@ def flush_next(self):
201210
# Running for too long?
202211
if datetime.utcnow() - self._flush_time >= self._MAX_FLUSH_DURATION:
203212
log.warn('Flush %s is taking more than 10s, stopping it', self._flush_count)
213+
for tr in self._trs_to_flush:
214+
# Recompute these transactions' next flush so that if we hit the max queue size
215+
# newer transactions are preserved
216+
tr.compute_next_flush(self._MAX_WAIT_FOR_REPLAY)
204217
self._trs_to_flush = []
205218
return self.flush_next()
206219

@@ -277,9 +290,7 @@ def tr_error_reject_request(self, tr):
277290
"It will not be replayed.",
278291
tr.get_id(),
279292
tr.get_size() / 1024)
280-
self._transactions.remove(tr)
281-
self._total_count -= 1
282-
self._total_size -= tr.get_size()
293+
self._remove(tr)
283294
self._transactions_flushed += 1
284295
self.print_queue_stats()
285296
self._transactions_rejected += 1
@@ -295,8 +306,6 @@ def tr_success(self, tr):
295306
self._running_flushes -= 1
296307
self._finished_flushes += 1
297308
log.debug("Transaction %d completed", tr.get_id())
298-
self._transactions.remove(tr)
299-
self._total_count -= 1
300-
self._total_size -= tr.get_size()
309+
self._remove(tr)
301310
self._transactions_flushed += 1
302311
self.print_queue_stats()

0 commit comments

Comments
 (0)