Skip to content

Commit 7c4257a

Browse files
adhami3310masenf
andauthored
give option to only use main thread (#4809)
* give option to only use main thread * change default to main thread * fix comment * default to None, as 0 would raise a ValueError Co-authored-by: Masen Furer <[email protected]> * add warning about passing 0 * move executor to config --------- Co-authored-by: Masen Furer <[email protected]>
1 parent 8e579ef commit 7c4257a

File tree

3 files changed

+127
-32
lines changed

3 files changed

+127
-32
lines changed

reflex/app.py

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@
1111
import inspect
1212
import io
1313
import json
14-
import multiprocessing
15-
import platform
1614
import sys
1715
import traceback
1816
from datetime import datetime
1917
from pathlib import Path
18+
from timeit import default_timer as timer
2019
from types import SimpleNamespace
2120
from typing import (
2221
TYPE_CHECKING,
@@ -76,7 +75,7 @@
7675
from reflex.components.core.sticky import sticky
7776
from reflex.components.core.upload import Upload, get_upload_dir
7877
from reflex.components.radix import themes
79-
from reflex.config import environment, get_config
78+
from reflex.config import ExecutorType, environment, get_config
8079
from reflex.event import (
8180
_EVENT_FIELDS,
8281
Event,
@@ -1114,10 +1113,23 @@ def memoized_toast_provider():
11141113
app_wrappers[(1, "ToasterProvider")] = toast_provider
11151114

11161115
with console.timing("Evaluate Pages (Frontend)"):
1116+
performance_metrics: list[tuple[str, float]] = []
11171117
for route in self._unevaluated_pages:
11181118
console.debug(f"Evaluating page: {route}")
1119+
start = timer()
11191120
self._compile_page(route, save_page=should_compile)
1121+
end = timer()
1122+
performance_metrics.append((route, end - start))
11201123
progress.advance(task)
1124+
console.debug(
1125+
"Slowest pages:\n"
1126+
+ "\n".join(
1127+
f"{route}: {time * 1000:.1f}ms"
1128+
for route, time in sorted(
1129+
performance_metrics, key=lambda x: x[1], reverse=True
1130+
)[:10]
1131+
)
1132+
)
11211133

11221134
# Add the optional endpoints (_upload)
11231135
self._add_optional_endpoints()
@@ -1130,7 +1142,7 @@ def memoized_toast_provider():
11301142
progress.advance(task)
11311143

11321144
# Store the compile results.
1133-
compile_results = []
1145+
compile_results: list[tuple[str, str]] = []
11341146

11351147
progress.advance(task)
11361148

@@ -1209,33 +1221,19 @@ def memoized_toast_provider():
12091221
),
12101222
)
12111223

1212-
# Use a forking process pool, if possible. Much faster, especially for large sites.
1213-
# Fallback to ThreadPoolExecutor as something that will always work.
1214-
executor = None
1215-
if (
1216-
platform.system() in ("Linux", "Darwin")
1217-
and (number_of_processes := environment.REFLEX_COMPILE_PROCESSES.get())
1218-
is not None
1219-
):
1220-
executor = concurrent.futures.ProcessPoolExecutor(
1221-
max_workers=number_of_processes or None,
1222-
mp_context=multiprocessing.get_context("fork"),
1223-
)
1224-
else:
1225-
executor = concurrent.futures.ThreadPoolExecutor(
1226-
max_workers=environment.REFLEX_COMPILE_THREADS.get() or None
1227-
)
1224+
executor = ExecutorType.get_executor_from_environment()
12281225

12291226
for route, component in zip(self._pages, page_components, strict=True):
12301227
ExecutorSafeFunctions.COMPONENTS[route] = component
12311228

12321229
ExecutorSafeFunctions.STATE = self._state
12331230

1234-
with executor:
1235-
result_futures = []
1231+
with console.timing("Compile to Javascript"), executor as executor:
1232+
result_futures: list[concurrent.futures.Future[tuple[str, str]]] = []
12361233

1237-
def _submit_work(fn: Callable, *args, **kwargs):
1234+
def _submit_work(fn: Callable[..., tuple[str, str]], *args, **kwargs):
12381235
f = executor.submit(fn, *args, **kwargs)
1236+
f.add_done_callback(lambda _: progress.advance(task))
12391237
result_futures.append(f)
12401238

12411239
# Compile the pre-compiled pages.
@@ -1261,10 +1259,10 @@ def _submit_work(fn: Callable, *args, **kwargs):
12611259
_submit_work(compiler.remove_tailwind_from_postcss)
12621260

12631261
# Wait for all compilation tasks to complete.
1264-
with console.timing("Compile to Javascript"):
1265-
for future in concurrent.futures.as_completed(result_futures):
1266-
compile_results.append(future.result())
1267-
progress.advance(task)
1262+
compile_results.extend(
1263+
future.result()
1264+
for future in concurrent.futures.as_completed(result_futures)
1265+
)
12681266

12691267
app_root = self._app_root(app_wrappers=app_wrappers)
12701268

@@ -1289,10 +1287,12 @@ def _submit_work(fn: Callable, *args, **kwargs):
12891287
progress.advance(task)
12901288

12911289
# Compile custom components.
1292-
*custom_components_result, custom_components_imports = (
1293-
compiler.compile_components(custom_components)
1294-
)
1295-
compile_results.append(custom_components_result)
1290+
(
1291+
custom_components_output,
1292+
custom_components_result,
1293+
custom_components_imports,
1294+
) = compiler.compile_components(custom_components)
1295+
compile_results.append((custom_components_output, custom_components_result))
12961296
all_imports.update(custom_components_imports)
12971297

12981298
progress.advance(task)

reflex/compiler/compiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ def compile_tailwind(
508508
The compiled Tailwind config.
509509
"""
510510
# Get the path for the output file.
511-
output_path = get_web_dir() / constants.Tailwind.CONFIG
511+
output_path = str((get_web_dir() / constants.Tailwind.CONFIG).absolute())
512512

513513
# Compile the config.
514514
code = _compile_tailwind(config)

reflex/config.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@
22

33
from __future__ import annotations
44

5+
import concurrent.futures
56
import dataclasses
67
import enum
78
import importlib
89
import inspect
10+
import multiprocessing
911
import os
12+
import platform
1013
import sys
1114
import threading
1215
import urllib.parse
@@ -17,6 +20,7 @@
1720
from typing import (
1821
TYPE_CHECKING,
1922
Any,
23+
Callable,
2024
Dict,
2125
Generic,
2226
List,
@@ -497,6 +501,95 @@ class PerformanceMode(enum.Enum):
497501
OFF = "off"
498502

499503

504+
class ExecutorType(enum.Enum):
505+
"""Executor for compiling the frontend."""
506+
507+
THREAD = "thread"
508+
PROCESS = "process"
509+
MAIN_THREAD = "main_thread"
510+
511+
@classmethod
512+
def get_executor_from_environment(cls):
513+
"""Get the executor based on the environment variables.
514+
515+
Returns:
516+
The executor.
517+
"""
518+
executor_type = environment.REFLEX_COMPILE_EXECUTOR.get()
519+
520+
reflex_compile_processes = environment.REFLEX_COMPILE_PROCESSES.get()
521+
reflex_compile_threads = environment.REFLEX_COMPILE_THREADS.get()
522+
# By default, use the main thread. Unless the user has specified a different executor.
523+
# Using a process pool is much faster, but not supported on all platforms. It's gated behind a flag.
524+
if executor_type is None:
525+
if (
526+
platform.system() not in ("Linux", "Darwin")
527+
and reflex_compile_processes is not None
528+
):
529+
console.warn("Multiprocessing is only supported on Linux and MacOS.")
530+
531+
if (
532+
platform.system() in ("Linux", "Darwin")
533+
and reflex_compile_processes is not None
534+
):
535+
if reflex_compile_processes == 0:
536+
console.warn(
537+
"Number of processes must be greater than 0. If you want to use the default number of processes, set REFLEX_COMPILE_EXECUTOR to 'process'. Defaulting to None."
538+
)
539+
reflex_compile_processes = None
540+
elif reflex_compile_processes < 0:
541+
console.warn(
542+
"Number of processes must be greater than 0. Defaulting to None."
543+
)
544+
reflex_compile_processes = None
545+
executor_type = ExecutorType.PROCESS
546+
elif reflex_compile_threads is not None:
547+
if reflex_compile_threads == 0:
548+
console.warn(
549+
"Number of threads must be greater than 0. If you want to use the default number of threads, set REFLEX_COMPILE_EXECUTOR to 'thread'. Defaulting to None."
550+
)
551+
reflex_compile_threads = None
552+
elif reflex_compile_threads < 0:
553+
console.warn(
554+
"Number of threads must be greater than 0. Defaulting to None."
555+
)
556+
reflex_compile_threads = None
557+
executor_type = ExecutorType.THREAD
558+
else:
559+
executor_type = ExecutorType.MAIN_THREAD
560+
561+
match executor_type:
562+
case ExecutorType.PROCESS:
563+
executor = concurrent.futures.ProcessPoolExecutor(
564+
max_workers=reflex_compile_processes,
565+
mp_context=multiprocessing.get_context("fork"),
566+
)
567+
case ExecutorType.THREAD:
568+
executor = concurrent.futures.ThreadPoolExecutor(
569+
max_workers=reflex_compile_threads
570+
)
571+
case ExecutorType.MAIN_THREAD:
572+
FUTURE_RESULT_TYPE = TypeVar("FUTURE_RESULT_TYPE")
573+
574+
class MainThreadExecutor:
575+
def __enter__(self):
576+
return self
577+
578+
def __exit__(self, *args):
579+
pass
580+
581+
def submit(
582+
self, fn: Callable[..., FUTURE_RESULT_TYPE], *args, **kwargs
583+
) -> concurrent.futures.Future[FUTURE_RESULT_TYPE]:
584+
future_job = concurrent.futures.Future()
585+
future_job.set_result(fn(*args, **kwargs))
586+
return future_job
587+
588+
executor = MainThreadExecutor()
589+
590+
return executor
591+
592+
500593
class EnvironmentVariables:
501594
"""Environment variables class to instantiate environment variables."""
502595

@@ -538,6 +631,8 @@ class EnvironmentVariables:
538631
Path(constants.Dirs.UPLOADED_FILES)
539632
)
540633

634+
REFLEX_COMPILE_EXECUTOR: EnvVar[Optional[ExecutorType]] = env_var(None)
635+
541636
# Whether to use separate processes to compile the frontend and how many. If not set, defaults to thread executor.
542637
REFLEX_COMPILE_PROCESSES: EnvVar[Optional[int]] = env_var(None)
543638

0 commit comments

Comments
 (0)