|
| 1 | +# **RFC0x for Presto** |
| 2 | + |
| 3 | + |
| 4 | +## Replacing HTTP Exchange with Binary Exchange Protocol |
| 5 | + |
| 6 | +Proposers |
| 7 | +* Daniel Bauer ( [email protected]) |
| 8 | + |
| 9 | +* Andrea Giovannini ( [email protected]) |
| 10 | + |
| 11 | +## Related Issues |
| 12 | + |
| 13 | +[Redesign Exchange protocol to reduce query latency](https://github.com/prestodb/presto/issues/21926) |
| 14 | + |
| 15 | +Above protocol enhancement is integrated into the proposed binary exchange protocol. |
| 16 | + |
| 17 | +## Summary |
| 18 | + |
| 19 | +The binary exchange protocol (BinX) is an alternative for the existing HTTP-based exchange protocol that |
| 20 | +runs between Prestissimo worker nodes. It offers the same functionality and API |
| 21 | +but uses binary encoding that can be more efficiently parsed than HTTP nessages. |
| 22 | +This translates into a performance benefit for exchange-intensive queries. |
| 23 | +BinX does not replace the control protocol that runs between the coordinator and the |
| 24 | +worker nodes. The control protocol continues to use HTTP. |
| 25 | + |
| 26 | +## Background |
| 27 | + |
| 28 | +The exchange protocol provides remote-procedure-call semantics for obtaining |
| 29 | +data from a remote worker, acknowledging data receipt and terminating (aborting) the exchange. |
| 30 | +The implementation on top of HTTP uses a small subset of the features that HTTP offers. |
| 31 | +Transaction- and session multiplexing are not needed. The parsing of the generic HTTP messages |
| 32 | +is more complex than decoding binary encoded messages. |
| 33 | + |
| 34 | +### Goals |
| 35 | + |
| 36 | +The proposal is to use a binary exchange protocol as a light-weight alternative to the existinig HTTP exchange protocol. |
| 37 | +As a prototypical implementation shows that such a protocol reduces query run-time of exchange heavy queries by |
| 38 | +20% to 30%. |
| 39 | + |
| 40 | +A further goal is to open the way to enable network accelerators, i.e. support for smart network interface |
| 41 | +cards that offload the transport stack onto the NIC. |
| 42 | + |
| 43 | +## Description of the Prototype Implementation |
| 44 | + |
| 45 | +The aim is to minimize changes to the existing code base by adding light-weight |
| 46 | +components to the Prestissimo workers without changing the coordinator. BinX is an optional feature |
| 47 | +disabled by default that can be activated in the Prestissimo worker configuration. |
| 48 | +Once activated, the data exchange between all worker nodes is done using BinX. Communication |
| 49 | +between worker and coordinator continue to use the existing HTTP exchange protocol. |
| 50 | +Mixed-mode operation where only some worker nodes use BinX is not supported. |
| 51 | + |
| 52 | +BinX communicates using its own, separate port number. When enabled, the `http://` and `https://` |
| 53 | +scheme provided by the coordinator are re-mapped and a BinX exchange source is instantiated instead |
| 54 | +of the default HttpExchangeSource (formerly known as PrestoExchangeSource). |
| 55 | +The communication to the coordinator node and other exchange types such as broadcast exchange and unsafe row exchange |
| 56 | +are not affected. |
| 57 | + |
| 58 | +The semantics of the exchange protocol remains unchanged with the following properties: |
| 59 | +- Exchanges are initiated by the data consumer for a remote buffer. Once the buffer has |
| 60 | +been transferred or the owning remote task has been stopped, the exchange is terminated. |
| 61 | +- The exchange is implemented as a request/reply protocol, i.e. it is pull based. |
| 62 | +- A data request is answered with a data reply. The reply contains zero, one or more |
| 63 | + "chunks", depending on the availability of data on the serving side. |
| 64 | +- A timeout mechanism with exponential back-off provides robustness in the face of failures. |
| 65 | +- Acknowledge requests are used to free memory on the serving side, acknowledge replies |
| 66 | +exist but are ignored on the requesting side. |
| 67 | +- An explicit delete (or abort) request informs the serving side that the transfer is |
| 68 | +stopping and that associated resources can be freed. Delete requests are acknowledged. |
| 69 | + |
| 70 | +Requests and replies are implemented as binary encoded protocol data units on top of TCP. |
| 71 | +This minimizes the protocol overhead. |
| 72 | + |
| 73 | +### Configuration Options |
| 74 | + |
| 75 | +BinX has two configuration properties that are part of the Prestissimo (native) worker |
| 76 | +configuration: |
| 77 | + |
| 78 | +|Property name | Type| Values| Effect | |
| 79 | +|--------|------|------|------| |
| 80 | +|binx.enabled|bool| True or false, defaults to false | Enables BinX when set to true| |
| 81 | +|binx.server.port|uint64_t|Valid port number > 1000, defaults to 8091| The port number used by BinX| |
| 82 | + |
| 83 | +The configuration must be homogeneous across all worker nodes. BinX is disabled by default |
| 84 | +and must be explicitly enabled. |
| 85 | + |
| 86 | +If enabled, the worker's logs will contain the following messages: |
| 87 | +``` |
| 88 | +I20240828 14:57:11.361124 19 PrestoServer.cpp:599] [PRESTO_STARTUP] Binary Server IO executor 'BinSrvIO' has 64 threads. |
| 89 | +I20240828 14:57:11.364262 19 BinaryExchangeServer.cpp:203] Starting binary exchange server on port 8091 |
| 90 | +``` |
| 91 | + |
| 92 | +## Implementation Overview |
| 93 | + |
| 94 | +The implementation covers the protocol design, the BinX server implementation and the BinX exchange |
| 95 | +source implementation. |
| 96 | + |
| 97 | +### Binary Exchange Protocol |
| 98 | + |
| 99 | +The protocol consists of requests and replies that are implemented by the `BinRequest` and the `BinReply` classes defined |
| 100 | +in `BinRequestReply.h`. All three request types for data, acknowledge and delete share the same structure. |
| 101 | +A request consists of the following fields: |
| 102 | +|Field name| Type | Semantic | |
| 103 | +| ---- | ---- | ---- | |
| 104 | +|requestType| enum (DATA, ACK, DELETE) | The type of request | |
| 105 | +|getDataSize| bool | When true, requests the sizes of the remaining available data pages; when false requests data | |
| 106 | +|maxSizeOctets| uint64_t | The maximum allowed size of the data pages| |
| 107 | +|maxWaitMicroSeconds|uint64_t| The maximum wait time in microseconds| |
| 108 | +|taskId| std::string | The unique ID of the remote task from which data is requested| |
| 109 | +|bufferId| uint64_t | The ID of the buffer within the remote task| |
| 110 | +|pageToken| uint64_t | The sequence number of the data page that is requested| |
| 111 | + |
| 112 | +Acknowledge and delete requests don't need all fields and set `getDataSize`, `maxSizeOctets`, |
| 113 | +and `maxWaitMicroSeconds` to false and 0, respectively. |
| 114 | + |
| 115 | +A reply has the following fields: |
| 116 | + |
| 117 | +|Field name| Type | Semantic | |
| 118 | +| ---- | ---- | ---- | |
| 119 | +|replyType| enum (DATA, ACK, DELETE) | The type of reply | |
| 120 | +|status| enum (OK, NODATA, SERVER_ERROR, TIMEOUT) | The reply status | |
| 121 | +|bufferComplete| bool | True if the entire buffer has been transferred | |
| 122 | +|pageToken|uint64_t| The token (sequence number) of the first data page in the reply| |
| 123 | +|pageNextToken|uint64_t| The token (sequence number) of the next available page that can be requested| |
| 124 | +|remainingBytes| std::vector<uint64_t> | Contains the sizes of available pages when requested| |
| 125 | +|data| IOBuf | Contains the data payload | |
| 126 | + |
| 127 | +`BinRequest` and `BinReply` are in-memory representations. The serialization and deserialization on top |
| 128 | +of TCP is implemented by the `ServerSerializeHandler` and `ClientSerializeHandler`. On the server side, |
| 129 | +requests are read and replies are written and vice-versa on the client side. During serialization, a |
| 130 | +length field is prepended that is needed to correctly de-serialize the protocol data units. |
| 131 | + |
| 132 | +The implementation uses Meta's [Wangle](https://github.com/facebook/wangle) framework. |
| 133 | + |
| 134 | +### Binary Exchange Server |
| 135 | + |
| 136 | +The binary exchange server is a self-contained component that is started when `binx.enabled` is configured. |
| 137 | +It implements the exchange service using the BinX protocol. Incoming requests are forwarded to the |
| 138 | +`TaskManager` and the result is marshalled into a reply and sent back. |
| 139 | + |
| 140 | +The binary exchange server is started in `PrestoServer::run()`. It uses a dedicated |
| 141 | +IO thread pool in order to not interfere with the HTTP IO thread pool. The CPU thread pool is shared |
| 142 | +with the HTTP exchange. |
| 143 | + |
| 144 | +#### Implementation Notes |
| 145 | + |
| 146 | +The BinX server uses Wangle. It consists of the following components that are implemented in |
| 147 | +the file `BinaryExchangeServer.h`: |
| 148 | + |
| 149 | +* The `BinaryExchangeServer` is a controller for starting and stopping the Wangle protocol stack. |
| 150 | +It takes the port number, the IO thread pool and the CPU thread pool as construction parameters. |
| 151 | +The `start()` method creates a Wangle factory for the BinX protocol stack and binds this factory |
| 152 | +to a listening socket. Connection- and protocol-handing is then done by the Wangle framework. |
| 153 | +The `stop()` method destroys the protocol stack and joins in the threads again. |
| 154 | +* The `ExchangeServerFactory` defines the BinX protocol stack. The stack consists of an asynchronous TCP |
| 155 | +socket handler, a threading component called `EventBaseHandler` that makes sure that all operations are |
| 156 | +carried out in the same IO thread, the server-side serialization and deserialization handler, and the |
| 157 | +service implementation on top of the stack. |
| 158 | +* The `BinaryExchangeService` processes incoming requests and calls the appropriate TaskManager methods. |
| 159 | +The results from the TaskManager are packaged into replies and sent back to the requesting BinX exchange source. |
| 160 | +This exchange service follows the design of the existing `TaskResource` service. |
| 161 | + |
| 162 | +The `TaskManagerStub` class is an implementation detail that enables the BinX server to interact with |
| 163 | +a mock TaskManager implementation. This is used in the unit tests and allows to test the BinX server |
| 164 | +implementation along with the BinX exchange source implementation. |
| 165 | + |
| 166 | +### Binary Exchange Source and Binary Exchange Client |
| 167 | + |
| 168 | +The client side of BinX is implemented in two components. The `BinaryExchangeSource` implements the "higher level" parts |
| 169 | +that gets called by the Velox exchange client and interfaces with Velox's page memory management. It also implements |
| 170 | +the exchange protocol logic. |
| 171 | +The `BinaryExchangeSource` uses the `BinaryExchangeClient` that is responsible for the protocol mechanics and |
| 172 | +implements connection setup, sending and receiving of requests and replies, and timeouts when replies don't arrive. |
| 173 | + |
| 174 | +The `PrestoServer` registers a factory method for creating exchange sources. This factory method has been extended |
| 175 | +such that `BinaryExchangeSource`s are created instead of HTTP exchanges when enabled by configuration. |
| 176 | +One exception are connections to the |
| 177 | +Presto coordinator that always uses the HTTP based exchange protocol. In a Kubernetes environment with its virtual |
| 178 | +networking, it is unfortunately not straight forward to detect whether the target host is the Presto connector |
| 179 | +since the connector's service IP used in the Presto configuration doesn't correspond to the IP address used by the |
| 180 | +pod running the coordinator. In order to circumvent this problem, a helper class called `CoordinatorInfoResolver` |
| 181 | +uses the node status endpoint of the coordinator to retrieve the coordinator's IP address. Using this address |
| 182 | +allows the factory method to create `HttpExchangeSource`s when connecting to the coordinator and `BinaryExchangeSources` |
| 183 | +when connecting to worker nodes. |
| 184 | + |
| 185 | + |
| 186 | +#### Binary Exchange Client |
| 187 | + |
| 188 | +The `BinaryExchangeClient` is a Wangle client. The `ExchangeClientFactory` is an inner class of the client that |
| 189 | +defines the protocol stack. On top of an asynchronous socket, an `EventBaseHandler` controls threading, followed by the |
| 190 | +`ClientSerializeHandler` that is responsible for serializing requests and de-serializing replies. The top of the client |
| 191 | +protocol stack is formed by the `ExchangeClientDispatcher`. Its main task is to keep track of outstanding data-, |
| 192 | +acknowledge- and delete-requests. The client dispatcher maintains hash maps for mapping the requests' sequence numbers |
| 193 | +to the outstanding promises. These promises are fulfilled whenever a corresponding reply arrives or when the |
| 194 | +associated timeout fires. |
| 195 | + |
| 196 | +The `BinaryExchangeClient` provides a single `request()` method for sending a request. It returns a future that becomes |
| 197 | +valid once the reply arrives or an error or timeout occurs. The connection to the remote node is set up lazily on the first request. |
| 198 | +Since connection setup is asynchronous, incoming requests are queued until the connection setup has completed. |
| 199 | +While normally only a single data request can be outstanding at any point in time, the queue is nevertheless necessary in the case |
| 200 | +that the exchange client immediately closed after the first request and a delete request is issued before the connection setup could complete. |
| 201 | + |
| 202 | +#### Binary Exchange Source |
| 203 | + |
| 204 | +The `BinaryExchangeSource` class provides the same interface as the existing `HttpExchangeSource`. It transfers the contents of a remote |
| 205 | +buffer into a series of Velox pages and appends these pages to the provided `ExchangeQueue`. Once the buffer is transferred, the |
| 206 | +exchange source is discarded. Its implementation is similar to that of the `HttpExchangeSource` with the following differences: |
| 207 | +- Instead of an `HttpClient`, a `BinaryExchangeClient` is instantiated. |
| 208 | +- The `BinaryExchangeSource` doesn't use a session- or connection pool. |
| 209 | +- `BinRequest` and `BinReply` are used rather than HTTP requests and HTTP replies. |
| 210 | +- All buffer transfers are immediate, i.e. there is no support for `exchange.immediate-buffer-transfer=false`. |
| 211 | + |
| 212 | + |
| 213 | +## Metrics / Benchmarks |
| 214 | + |
| 215 | +The following heatmap shows the results of a micro-benchmark conducted on two servers, each providing 128 hardware cores. |
| 216 | +One server is running the exchange client and the other the exchange server. |
| 217 | +The machines are connected back-to-back using 200 Gbit/s network interfaces. |
| 218 | +The exchanged buffers always consisted of 128 chunks (slices) with chunk sizes ranging from 32 bytes to 16 MBytes. |
| 219 | +Only one chunk at a time was sent per data request. |
| 220 | +The client spawned between 1 and 128 parallel exchanges. Both client and server used thread pools of 129 threads. |
| 221 | + |
| 222 | + |
| 223 | + |
| 224 | +The binary exchange protocol shows a performance advantage over the HTTP protocol of a factor of around 2. |
| 225 | +For parallel exchanges of more than 48 exchanges, the binary exchange protocol was close to saturating the |
| 226 | +available network capacity. |
| 227 | + |
| 228 | +## Other Approaches Considered |
| 229 | + |
| 230 | +Both Thrift and gRPC have been considered but abandoned as the overhead of a generic RPC mechanism wasn't worth |
| 231 | +the additional complexity. |
| 232 | + |
| 233 | +## Adoption Plan |
| 234 | + |
| 235 | +- What impact (if any) will there be on existing users? Are there any new session parameters, configurations, SPI updates, client API updates, or SQL grammar? |
| 236 | + |
| 237 | + - There is one additional configuration option to enable BinX. Otherwise, there is no impact on session parameters, no API changes |
| 238 | + and no changes to SQL. |
| 239 | + |
| 240 | +- If we are changing behaviour how will we phase out the older behaviour? |
| 241 | + |
| 242 | + - The HTTP stack is still required for the control message. The cost of keeping the HttpExchangeSource is minimal. |
| 243 | + |
| 244 | +- If we need special migration tools, describe them here. |
| 245 | + |
| 246 | + - No tools required. |
| 247 | + |
| 248 | +- When will we remove the existing behaviour, if applicable. |
| 249 | + |
| 250 | + - Existing behaviour will remain as the default option. |
| 251 | + |
| 252 | +- How should this feature be taught to new and existing users? Basically mention if documentation changes/new blog are needed? |
| 253 | + |
| 254 | + - The documentation should mention that an alternative exchange protocol is available. |
| 255 | + |
| 256 | +- What related issues do you consider out of scope for this RFC that could be addressed in the future independently of the solution that comes out of this RFC? |
| 257 | + |
| 258 | + - Support for SmartNICs should be considered. |
| 259 | + |
| 260 | +## Test Plan |
| 261 | + |
| 262 | +Test plan involves running performance measurements using TPC-DS and TPC-H benchmarks that compare the performance of HTTP versus BinX. |
| 263 | + |
| 264 | +The TPC-DS benchmark test has been conducted using a dataset with scale factor 1000 on an on-prem cluster with 8 nodes. The results |
| 265 | +for this 1TB dataset have shown that overall runtime for the 99 queries was ~56 minutes when using HTTP compared to ~43 minutes for BinX. |
0 commit comments