Blockchain data streaming in Apache Arrow format. Web3 data for data engineers, ready for large-scale analysis
This is an implementation of Apache Arrow Flight RPC server for blockchain data. Apache Arrow format allows zero-copy streaming data processing on top of standard eth-web3 RPC.
There are a few key points why to consider flight rpc:
-
Plenty of clients implementations including JavaScript, Go, Python, R and Rust (see: https://arrow.apache.org/docs/implementations.html)
-
Rich libraries support. Process eth json-rpc data with your favorite data framework:
- pandas
- polars
- apache datafusion
- and many more natively supporting apache arrow format, all with zero copy and without time loss on serialization and deserialization between the formats
-
ADBC Support Ready-made connectors to most popular databases and formats allow you to write data effortlessly
All datasets requests supports following params:
- start_block block number to start streaming from (default LATEST)
- end_block block number to stop streaming (default NULL)
- batch_size
See schema for stream return type
private static final Schema BLOCK_SCHEMA = new Schema(List.of(
Field.nullable(BLOCK_NUMBER, new ArrowType.Int(64, true)),
Field.nullable(BLOCK_HASH_FIELD, new ArrowType.Utf8()),
Field.nullable(BLOCK_PARENT_HASH, new ArrowType.Utf8()),
Field.nullable(BLOCK_TIMESTAMP, new ArrowType.Int(64, true)),
Field.nullable(BLOCK_AUTHOR, new ArrowType.Utf8()),
Field.nullable(BLOCK_MINER, new ArrowType.Utf8()),
Field.nullable(BLOCK_TRANSACTIONS_ROOT, new ArrowType.Utf8()),
Field.nullable(BLOCK_STATE_ROOT, new ArrowType.Utf8()),
Field.nullable(BLOCK_RECEIPTS_ROOT, new ArrowType.Utf8()),
Field.nullable(BLOCK_LOGS_BLOOM, new ArrowType.Utf8()),
Field.nullable(BLOCK_GAS_LIMIT, new ArrowType.Int(64, true)),
Field.nullable(BLOCK_GAS_USED, new ArrowType.Int(64, true)),
Field.nullable(BLOCK_SIZE, new ArrowType.Int(64, true)),
Field.nullable(BLOCK_EXTRA_DATA, new ArrowType.Utf8()),
Field.nullable(BLOCK_NONCE, new ArrowType.Utf8()),
Field.nullable(BLOCK_DIFFICULTY, new ArrowType.Utf8()),
Field.nullable(BLOCK_TOTAL_DIFFICULTY, new ArrowType.Utf8()),
Field.nullable(BLOCK_MIX_HASH, new ArrowType.Utf8()),
Field.nullable(BLOCK_SHA3_UNCLES, new ArrowType.Utf8()),
new Field(BLOCK_TRANSACTIONS, new FieldType(true, new ArrowType.List(), null),
Lists.newArrayList(Field.nullable(BLOCK_TX_CHILD_NAME, new ArrowType.Utf8()))),
new Field(BLOCK_UNCLES, new FieldType(true, new ArrowType.List(), null),
Lists.newArrayList(Field.nullable(BLOCK_UNCLE_CHILD_NAME, new ArrowType.Utf8()))),
new Field(BLOCK_SEAL_FIELDS, new FieldType(true, new ArrowType.List(), null),
Lists.newArrayList(Field.nullable(BLOCK_SEAL_FIELD_CHILD_NAME, new ArrowType.Utf8())))
));
Additional parameters:
- topics - list of topics to pass to web3 rpc filter
- addresses - list of addresses to pass to web3 rpc filter
private static final Schema LOG_SCHEMA = new Schema(List.of(
Field.nullable(LOG_ADDRESS, new ArrowType.Utf8()),
Field.nullable(LOG_DATA, new ArrowType.Utf8()),
new Field(LOG_TOPICS, new FieldType(true, new ArrowType.List(), null),
Lists.newArrayList(Field.nullable(LOG_TOPIC_CHILD_NAME, new ArrowType.Utf8()))),
Field.nullable(LOG_BLOCK_NUMBER, new ArrowType.Int(64, true)),
Field.nullable(LOG_TRANSACTION_HASH, new ArrowType.Utf8()),
Field.nullable(LOG_TRANSACTION_INDEX, new ArrowType.Int(32, true)),
Field.nullable(LOG_BLOCK_HASH, new ArrowType.Utf8()),
Field.nullable(LOG_INDEX, new ArrowType.Int(32, true)),
Field.nullable(LOG_REMOVED, new ArrowType.Bool())
));
- Java 21
- Access to an Ethereum node (for the server). Infura was used for development.
To build the project, run:
mvn clean package
This will create JAR file:
server/target/server.jar
- The server component
Application requires the following VM options:
--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED
The server requires an Ethereum node URL and can optionally specify a port for the Flight server.
Using environment variables:
java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar server/target/server.jar
With environment variables:
ETHEREUM_NODE_URL
- URL of the Ethereum node (required)FLIGHT_PORT
- Port for the Flight server (default: 8815)
Using command line arguments:
java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar server/target/server.jar <ethereum_node_url> <flight_port>
Example:
java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar server/target/server.jar https://mainnet.infura.io/v3/your-api-key 8815
The client can connect to a Flight server at a specified host and port.
java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar client/target/client.jar [host] [port]
By default, it connects to 127.0.0.1:8815
if no arguments are provided.
Example:
java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar client/target/client.jar 127.0.0.1 8815
-
Start the server with an Ethereum node URL:
java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar server/target/server.jar https://mainnet.infura.io/v3/your-api-key 8815
-
In another terminal, start the client:
java --add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED -jar client/target/client.jar 127.0.0.1 8815
-
The client will connect to the server and start receiving Ethereum logs data and displaying it in TSV format.
- The server requires a valid Ethereum node URL to function properly
Stay tuned for Python and Datafusion examples :)