Skip to content

Commit 2030c43

Browse files
committed
order book transformation test
1 parent d1d569b commit 2030c43

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed

connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2789,6 +2789,75 @@ public void testOrderBookToArraySMT_targetCollidesWithExisting() {
27892789
);
27902790
}
27912791

2792+
@Test
2793+
public void testMarketData_orderBookToArray_withTimestamp_schemaless() {
2794+
connect.kafka().createTopic(topicName, 1);
2795+
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, true);
2796+
props.put("value.converter.schemas.enable", "false");
2797+
props.put(QuestDBSinkConnectorConfig.DESIGNATED_TIMESTAMP_COLUMN_NAME_CONFIG, "received_at");
2798+
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
2799+
props.put(QuestDBSinkConnectorConfig.SYMBOL_COLUMNS_CONFIG, "market");
2800+
2801+
// The connector parses received_at natively; U+ accepts variable-width fractional seconds (1-6 digits)
2802+
props.put(QuestDBSinkConnectorConfig.TIMESTAMP_STRING_FIELDS, "received_at");
2803+
props.put(QuestDBSinkConnectorConfig.TIMESTAMP_FORMAT, "yyyy-MM-ddTHH:mm:ss.U+Z");
2804+
2805+
// Transform pipeline:
2806+
// 1. ExtractField unwraps the schema/payload envelope
2807+
// 2. OrderBookToArray transposes bids/asks arrays-of-objects into double[][]
2808+
props.put("transforms", "extractPayload,orderbook");
2809+
props.put("transforms.extractPayload.type", "org.apache.kafka.connect.transforms.ExtractField$Value");
2810+
props.put("transforms.extractPayload.field", "payload");
2811+
props.put("transforms.orderbook.type", "io.questdb.kafka.OrderBookToArray$Value");
2812+
props.put("transforms.orderbook.mappings", "bids:bids:price,amount;asks:asks:price,amount");
2813+
2814+
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
2815+
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
2816+
2817+
// Pre-create QuestDB table: market as symbol, venue as int, bids/asks as 2D double arrays,
2818+
// received_at as designated timestamp partitioned by hour
2819+
QuestDBUtils.assertSql(
2820+
"{\"ddl\":\"OK\"}",
2821+
"create table " + topicName
2822+
+ " (market symbol, venue int, bids double[][], asks double[][], received_at timestamp)"
2823+
+ " timestamp(received_at) partition by hour wal",
2824+
httpPort,
2825+
QuestDBUtils.Endpoint.EXEC);
2826+
2827+
// Full JSON with schema/payload envelope, matching the real upstream message structure.
2828+
// price/amount are numeric (not string-encoded) so OrderBookToArray can cast them to double.
2829+
// Extra fields (side, quote_entry_id, time, type, entry_id) inside bids/asks are ignored
2830+
// by OrderBookToArray — only the mapped fields (price, amount) are extracted.
2831+
String json = "{"
2832+
+ "\"schema\":\"market_data\","
2833+
+ "\"payload\":{"
2834+
+ "\"market\":\"BTCUSD\","
2835+
+ "\"venue\":1,"
2836+
+ "\"received_at\":\"2026-01-13T22:00:00.014Z\","
2837+
+ "\"type\":\"SNAPSHOT\","
2838+
+ "\"bids\":["
2839+
+ "{\"side\":\"BUY\",\"amount\":2.45,\"price\":45120.50,\"quote_entry_id\":\"uuid-1\",\"time\":\"2026-01-13T22:00:00.014Z\",\"type\":\"NEW\",\"entry_id\":\"45120.50\"},"
2840+
+ "{\"side\":\"BUY\",\"amount\":5.12,\"price\":45119.00,\"quote_entry_id\":\"uuid-2\",\"time\":\"2026-01-13T22:00:00.014Z\",\"type\":\"NEW\",\"entry_id\":\"45119.00\"}"
2841+
+ "],"
2842+
+ "\"asks\":["
2843+
+ "{\"side\":\"SELL\",\"amount\":1.83,\"price\":45121.00,\"quote_entry_id\":\"uuid-3\",\"time\":\"2026-01-13T22:00:00.014Z\",\"type\":\"NEW\",\"entry_id\":\"45121.00\"},"
2844+
+ "{\"side\":\"SELL\",\"amount\":3.27,\"price\":45122.50,\"quote_entry_id\":\"uuid-4\",\"time\":\"2026-01-13T22:00:00.014Z\",\"type\":\"NEW\",\"entry_id\":\"45122.50\"}"
2845+
+ "]"
2846+
+ "}}";
2847+
connect.kafka().produce(topicName, json);
2848+
2849+
// After OrderBookToArray transpose with mappings "bids:bids:price,amount":
2850+
// bids: [[45120.5, 45119.0], [2.45, 5.12]] (row 0 = prices, row 1 = amounts)
2851+
// asks: [[45121.0, 45122.5], [1.83, 3.27]]
2852+
// The extra top-level "type" field ("SNAPSHOT") becomes an additional string column in QuestDB
2853+
QuestDBUtils.assertSqlEventually(
2854+
"\"market\",\"venue\",\"bids\",\"asks\",\"received_at\"\r\n"
2855+
+ "\"BTCUSD\",1,\"[[45120.5,45119.0],[2.45,5.12]]\",\"[[45121.0,45122.5],[1.83,3.27]]\",\"2026-01-13T22:00:00.014000Z\"\r\n",
2856+
"select market, venue, bids, asks, received_at from " + topicName,
2857+
httpPort
2858+
);
2859+
}
2860+
27922861
@ParameterizedTest
27932862
@ValueSource(booleans = {true, false})
27942863
public void testComposedTimestamp_schemaless(boolean useHttp) {

0 commit comments

Comments
 (0)