|
| 1 | +--- |
| 2 | +title: Threads for formula evaluation |
| 3 | +sidebar_label: Formulas and threads |
| 4 | +--- |
| 5 | + |
| 6 | +The Deephaven query engine executes as a Java process and makes use of multiple threads to process requests. When the Deephaven engine executes a query, it does so with an [`ExecutionContext`](https://docs.deephaven.io/core/javadoc/io/deephaven/engine/context/ExecutionContext.html) that controls which variables and methods are available to formulas and provides information about the user and query initialization. The query engine can be used as a library, in which case the application developer is responsible for providing a consistent threading model. This guide describes the standard Deephaven server application, as executed via our Docker containers, the Gradle `server-jetty-app:run` task, or as a Core+ worker in the Deephaven Enterprise system. |
| 7 | + |
| 8 | +Most interaction with the Deephaven engine is via gRPC. When processing a gRPC request, it is initially handled on a web-server thread as part of the Java gRPC library. Depending on the request, it is then dispatched to one of two thread pools for request handling. The serial executor handles requests using a single thread. Evaluating script commands from a Code Studio and resolving scope tickets execute on the serial queue. Other requests are handled on the concurrent executor, which has four threads by default, but is user-configurable by setting the configuration property `scheduler.poolSize`. The single-threaded executor pool provides a well-defined order for script code execution and resolving variables. Other operations may execute concurrently, with necessary locking handled by the default [Periodic Update Graph](https://deephaven.io/core/groovy/docs/conceptual/periodic-update-graph-configuration/). |
| 9 | + |
| 10 | +A query operation begins on one of these thread pools, but evaluation may move to another thread depending on the operation. Consider the following snippet executed from a Code Studio: |
| 11 | + |
| 12 | +```groovy |
| 13 | +thread_name=emptyTable(1).update([Selectable.parse("Thr=java.lang.Thread.currentThread().getName()").withSerial()]) |
| 14 | +``` |
| 15 | + |
| 16 | +```python |
| 17 | +from deephaven.table import Selectable |
| 18 | +from deephaven import empty_table |
| 19 | + |
| 20 | +thread_name = empty_table(1).update( |
| 21 | + [Selectable.parse("Thr=java.lang.Thread.currentThread().getName()").with_serial()] |
| 22 | +) |
| 23 | +``` |
| 24 | + |
| 25 | +The `withSerial` method indicates to the Deephaven engine that the `Thr` column must be evaluated in order, and therefore it is not multi-threaded. The result is that the table contains `DeephavenApiServer-Scheduler-Serial-1`, indicating that it has executed on the serial executor thread. |
| 26 | + |
| 27 | +To illustrate, we'll remove the `withSerial` method and execute the following query: |
| 28 | + |
| 29 | +```groovy |
| 30 | +thread_name=emptyTable(1).update("Thr=java.lang.Thread.currentThread().getName()") |
| 31 | +``` |
| 32 | + |
| 33 | +```python |
| 34 | +from deephaven import empty_table |
| 35 | + |
| 36 | +thread_name = empty_table(1).update(["Thr=java.lang.Thread.currentThread().getName()"]) |
| 37 | +``` |
| 38 | + |
| 39 | +The Deephaven engine may parallelize evaluation, thus resulting in a value of `Thr` of `OperationInitializationThreadPool-initializationExecutor-3`, indicating that the formula was evaluated on the operation initialization thread pool. |
| 40 | + |
| 41 | +Similarly, each time a source table updates, the downstream effects are evaluated by an Update Graph. The default Periodic Update Graph uses a thread pool that has the same number of threads as the machine has processors (the number of threads can be configured by the property `PeriodicUpdateGraph.updateThreads`). |
| 42 | + |
| 43 | +```groovy order=null |
| 44 | +thread_name=timeTable("PT1s").head(2).update("Thr=java.lang.Thread.currentThread().getName()") |
| 45 | +``` |
| 46 | + |
| 47 | +```python order=null |
| 48 | +from deephaven import time_table |
| 49 | + |
| 50 | +thread_name = ( |
| 51 | + time_table(1).head(2).update(["Thr=java.lang.Thread.currentThread().getName()"]) |
| 52 | +) |
| 53 | +``` |
| 54 | + |
| 55 | +In this case, the formula is evaluated on one of the update executor threads (e.g., `PeriodicUpdateGraph-updateExecutor-6`). |
| 56 | + |
| 57 | +The `select` and `update` operations behave identically to each other, eagerly computing the result during initialization or in response to a table update. |
| 58 | + |
| 59 | +## `view` and `updateView` |
| 60 | + |
| 61 | +Unlike `select` and `update`, the `view` and `updateView` operations only compute the result when the result is accessed. This can happen on a variety of threads. For example, when performing another query operation, the results are read from the thread executing that operation: |
| 62 | + |
| 63 | +```groovy order=thread_name,distinct_threads |
| 64 | +thread_name=emptyTable(1).view("Thr=java.lang.Thread.currentThread().getName()") |
| 65 | +distinct_threads=thread_name.selectDistinct() |
| 66 | +``` |
| 67 | + |
| 68 | +```python order=thread_name,distinct_threads |
| 69 | +from deephaven import empty_table |
| 70 | + |
| 71 | +thread_name = empty_table(1).view(["Thr=java.lang.Thread.currentThread().getName()"]) |
| 72 | +distinct_threads = thread_name.select_distinct() |
| 73 | +``` |
| 74 | + |
| 75 | +The value of `Thr` in `distinct_threads` is `DeephavenApiServer-Scheduler-Serial-1` - the thread that executed the `selectDistinct` operation. However, when viewing the table `thread_name`, the `Thr` column takes on a value like `DeephavenApiServer-Scheduler-Concurrent-4` because that is the thread that the barrage snapshot operation read the value on. Each time a cell is accessed (e.g., by reloading or scrolling around a table), the value is recomputed potentially on another thread. |
| 76 | + |
| 77 | +## `where` |
| 78 | + |
| 79 | +The `where` operation operates similarly to `select` and `update`, evaluating the formula eagerly. In the following snippet, we record the thread used by the evaluation and can see that the function was evaluated on the initialization thread pool: |
| 80 | + |
| 81 | +```groovy |
| 82 | +used_threads = new LinkedHashSet<>() |
| 83 | +record_thread = { int x -> |
| 84 | + used_threads.add(java.lang.Thread.currentThread().getName()) |
| 85 | + return true |
| 86 | +} |
| 87 | +
|
| 88 | +emptyTable(5).update("Row=i").where("(boolean)record_thread(Row)") |
| 89 | +println(used_threads) |
| 90 | +``` |
| 91 | + |
| 92 | +```python |
| 93 | +import jpy |
| 94 | +from deephaven import empty_table |
| 95 | + |
| 96 | +thr = jpy.get_type("java.lang.Thread") |
| 97 | + |
| 98 | +used_threads = set() |
| 99 | + |
| 100 | + |
| 101 | +def record_thread(x: int) -> bool: |
| 102 | + used_threads.add(thr.currentThread().getName()) |
| 103 | + return True |
| 104 | + |
| 105 | + |
| 106 | +empty_table(5).update("Row=i").where("(boolean)record_thread(Row)") |
| 107 | +print(used_threads) |
| 108 | +``` |
| 109 | + |
| 110 | +Similarly, a refreshing `where` operation is evaluated on the Update Graph thread pool: |
| 111 | + |
| 112 | +```groovy test-set=1 order=null |
| 113 | +used_threads = new LinkedHashSet<>() |
| 114 | +record_thread = { int x -> |
| 115 | + used_threads.add(java.lang.Thread.currentThread().getName()) |
| 116 | + return true |
| 117 | +} |
| 118 | +
|
| 119 | +recorded_threads=timeTable("PT1s").head(2).update("Row=i").where("(boolean)record_thread(Row)") |
| 120 | +``` |
| 121 | + |
| 122 | +```python test-set=2 order=null |
| 123 | +import jpy |
| 124 | +from deephaven import time_table |
| 125 | + |
| 126 | +thr = jpy.get_type("java.lang.Thread") |
| 127 | + |
| 128 | +used_threads = set() |
| 129 | + |
| 130 | + |
| 131 | +def record_thread(x: int) -> bool: |
| 132 | + used_threads.add(thr.currentThread().getName()) |
| 133 | + return True |
| 134 | + |
| 135 | + |
| 136 | +recorded_threads = ( |
| 137 | + time_table("PT1s").head(2).update("Row=i").where("(boolean)record_thread(Row)") |
| 138 | +) |
| 139 | +``` |
| 140 | + |
| 141 | +After waiting for the table to tick, we can print the value of `used_threads`: |
| 142 | + |
| 143 | +```groovy test-set=1 |
| 144 | +println(used_threads) |
| 145 | +``` |
| 146 | + |
| 147 | +```python test-set=2 |
| 148 | +print(used_threads) |
| 149 | +``` |
| 150 | + |
| 151 | +## Table operations in formulas |
| 152 | + |
| 153 | +The Deephaven engine can create a new table by evaluating a formula, which is how a [Partitioned Table](../partitioned-tables.md) transform is implemented. A `select` or `update` that has a return type of [LivenessReferent](https://docs.deephaven.io/core/javadoc/io/deephaven/engine/liveness/LivenessReferent.html) (of which a Table is a subtype) maintains the liveness of the resulting object, until it is removed or replaced in the result table. It is incorrect to use `view` or `updateView` to create a column of new Tables, because the `view` result does not have a well-defined [liveness scope](https://deephaven.io/core/groovy/docs/conceptual/liveness-scope-concept/). |
| 154 | + |
| 155 | +The threads used for formulas that result in a Table are evaluated in exactly the same manner as other `select` and `update` operations described above. This means that your table operations may not be executed on the same thread as you initiated them. If you have not explicitly defined an [`ExecutionContext`](https://deephaven.io/core/groovy/docs/conceptual/periodic-update-graph-configuration/) before instantiating your operation, then `select` and `update` use a newly created context that shares the source table's update graph. The newly created context does not have a query library or query scope; therefore, you may not use table operations that include a formula. If you have opened an explicit ExecutionContext, the context is used for evaluation, and you may use table operations that include a formula. Partitioned tables automatically use the current context for `transform`. |
| 156 | + |
| 157 | +## Related documentation |
| 158 | + |
| 159 | +- [Parallelizing queries](https://deephaven.io/core/groovy/docs/conceptual/query-engine/parallelization/) |
| 160 | +- [Periodic Update Graph](https://deephaven.io/core/groovy/docs/conceptual/periodic-update-graph-configuration/) |
0 commit comments