Skip to content

Commit 2db9f28

Browse files
committed
[Invoker] Reduce threads in async FaaS Invoker and resolve token bucket issue
1 parent 3035c5c commit 2db9f28

File tree

3 files changed

+33
-16
lines changed

3 files changed

+33
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
-
1010

1111
### Fixed
12-
- [Standalone] Fix issue causing worker to stop prematurely in Consume mode
12+
- [Standalone] Fixed an issue causing workers to stop prematurely in Consume mode
13+
- [Invoker] Reduced the number of threads used in the async FaaS Invoker
14+
- [Monitoring] Fixed token bucket issue that prevented generating the correct number of tokens
1315

1416

1517
## [v3.5.1]

lithops/invokers.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,8 @@ def __init__(self, config, executor_id, internal_storage, compute_handler, job_m
301301
self.should_run = False
302302
self.sync = is_lithops_worker()
303303

304-
invoke_pool_threads = self.config[self.backend]['invoke_pool_threads']
305-
self.executor = ThreadPoolExecutor(invoke_pool_threads)
304+
self.invoke_pool_threads = self.config[self.backend]['invoke_pool_threads']
305+
self.executor = ThreadPoolExecutor(self.invoke_pool_threads)
306306

307307
logger.debug(f'ExecutorID {self.executor_id} - Serverless invoker created')
308308

@@ -315,7 +315,7 @@ def invoker_process(inv_id):
315315
"""Run process that implements token bucket scheduling approach"""
316316
logger.debug(f'ExecutorID {self.executor_id} - Async invoker {inv_id} started')
317317

318-
with ThreadPoolExecutor(max_workers=250) as executor:
318+
with ThreadPoolExecutor(max_workers=min(64, self.invoke_pool_threads // 4)) as executor:
319319
while self.should_run:
320320
try:
321321
self.job_monitor.token_bucket_q.get()
@@ -330,6 +330,7 @@ def invoker_process(inv_id):
330330
logger.debug(f'ExecutorID {self.executor_id} - Async invoker {inv_id} finished')
331331

332332
for inv_id in range(self.ASYNC_INVOKERS):
333+
self.job_monitor.token_bucket_q.put('#')
333334
p = threading.Thread(target=invoker_process, args=(inv_id,))
334335
self.invokers.append(p)
335336
p.daemon = True
@@ -430,6 +431,16 @@ def _invoke_job(self, job):
430431
self.should_run = True
431432
self._start_async_invokers()
432433

434+
if self.running_workers > 0 and not self.job_monitor.token_bucket_q.empty():
435+
while not self.job_monitor.token_bucket_q.empty():
436+
try:
437+
self.job_monitor.token_bucket_q.get(False)
438+
self.running_workers -= 1
439+
if self.running_workers == 0:
440+
break
441+
except Exception:
442+
pass
443+
433444
if self.running_workers < self.max_workers:
434445
free_workers = self.max_workers - self.running_workers
435446
total_direct = free_workers * job.chunksize

lithops/monitor.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -429,9 +429,22 @@ def run(self):
429429
logger.debug(f'ExecutorID {self.executor_id} - Starting Storage job monitor')
430430

431431
wait_dur_sec = self.monitoring_interval
432-
prevoius_log = None
432+
previous_log = None
433433
log_time = 0
434434

435+
def process_callids():
436+
nonlocal previous_log, log_time
437+
callids_running, callids_done = self.internal_storage.get_job_status(self.executor_id)
438+
# verify if there are new callids_done and reduce the sleep
439+
new_callids_done = callids_done - self.callids_done_processed_status
440+
# generate tokens and mark futures as running/done
441+
self._generate_tokens(callids_running, callids_done)
442+
self._tag_future_as_running(callids_running)
443+
self._tag_future_as_ready(callids_done)
444+
previous_log, log_time = self._print_status_log(previous_log, log_time)
445+
446+
return new_callids_done
447+
435448
while not self._all_ready():
436449
time.sleep(wait_dur_sec)
437450
wait_dur_sec = self.monitoring_interval
@@ -440,19 +453,10 @@ def run(self):
440453
if not self.should_run:
441454
break
442455

443-
callids_running, callids_done = \
444-
self.internal_storage.get_job_status(self.executor_id)
445-
446-
# verify if there are new callids_done and reduce the sleep
447-
new_callids_done = callids_done - self.callids_done_processed_status
448-
if len(new_callids_done) > 0:
456+
if len(process_callids()) > 0:
449457
wait_dur_sec = self.monitoring_interval / 5
450458

451-
# generate tokens and mark futures as running/done
452-
self._generate_tokens(callids_running, callids_done)
453-
self._tag_future_as_running(callids_running)
454-
self._tag_future_as_ready(callids_done)
455-
prevoius_log, log_time = self._print_status_log(prevoius_log, log_time)
459+
process_callids()
456460

457461
logger.debug(f'ExecutorID {self.executor_id} - Storage job monitor finished')
458462

0 commit comments

Comments
 (0)