diff --git a/.gitignore b/.gitignore index 2a09c1c..69866f6 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,7 @@ outTest/ *.iws *.iml tmp/ -**/.DS_Store \ No newline at end of file +**/.DS_Store +*.pyc +*.pyo +*.csv \ No newline at end of file diff --git a/BookStates.ExBook.schema b/BookStates.ExBook.schema new file mode 100644 index 0000000..53d77f5 --- /dev/null +++ b/BookStates.ExBook.schema @@ -0,0 +1,44 @@ + + + + + + + + + + + + + + + + + + + + + + import io.deephaven.enterprise.codec.DoubleArrayCodec; + import io.deephaven.enterprise.codec.LongArrayCodec; + import io.deephaven.enterprise.codec.IntArrayCodec; + + + + + + + + + + + + + + + + + + + +
\ No newline at end of file diff --git a/build.gradle b/build.gradle index 957a8cd..9d3780b 100644 --- a/build.gradle +++ b/build.gradle @@ -27,7 +27,9 @@ java { dependencies { implementation platform("io.deephaven:deephaven-bom:${dhcVersion}"), "io.deephaven:deephaven-engine-table", - "it.unimi.dsi:fastutil:8.5.13" + "it.unimi.dsi:fastutil:8.5.13", + "io.deephaven:deephaven-engine-tuple", + "io.deephaven:deephaven-engine-tuplesource" runtimeOnly "io.deephaven:deephaven-log-to-slf4j", 'ch.qos.logback:logback-classic:1.4.5' } diff --git a/docker/data/Quotes.parquet b/docker/data/Quotes.parquet new file mode 100644 index 0000000..465bbe1 Binary files /dev/null and b/docker/data/Quotes.parquet differ diff --git a/docker/data/storage/notebooks/Example.py b/docker/data/storage/notebooks/Example.py index 6568205..357373a 100644 --- a/docker/data/storage/notebooks/Example.py +++ b/docker/data/storage/notebooks/Example.py @@ -2,10 +2,9 @@ from deephaven import TableReplayer import bookbuilder -static_data = read("/data/Quotes.parquet") -rp = TableReplayer("2017-08-25T09:30:00 ET", "2017-08-25T23:59:59 ET") -ticking_data = rp.add_table(static_data, "Timestamp") -rp.start() +static_data_ = read("/data/Quotes.parquet") +rp_ = TableReplayer("2017-08-25T09:30:00 ET", "2017-08-25T23:59:59 ET") +ticking_data_ = rp_.add_table(static_data_, "Timestamp") +rp_.start() -book = bookbuilder.build_book(ticking_data) \ - .last_by("Key") \ No newline at end of file +book_ = bookbuilder.build_book(ticking_data_).last_by("Sym").rename_columns("SYMB=Sym") \ No newline at end of file diff --git a/docker/data/storage/notebooks/__pycache__/bookbuilder.cpython-310.pyc b/docker/data/storage/notebooks/__pycache__/bookbuilder.cpython-310.pyc deleted file mode 100644 index c599934..0000000 Binary files a/docker/data/storage/notebooks/__pycache__/bookbuilder.cpython-310.pyc and /dev/null differ diff --git a/docker/data/storage/notebooks/arrays.py b/docker/data/storage/notebooks/arrays.py new file mode 100644 index 0000000..690bc93 --- /dev/null +++ b/docker/data/storage/notebooks/arrays.py @@ -0,0 +1,26 @@ +from deephaven.parquet import read +from deephaven import TableReplayer, merge +import bookbuilder + +# OAK: new order ack; CRAK: cancel replace ack; CC: order cancel; +# INF: internal fill; AWF: away market fill. + +EVT_map = {"Order Ack": 1, "Cancel Replace Ack" : 1}#2} +static_data_ = read("/data/Quotes.parquet") + +static_data = order_sample.update("EPOCH_TS = Instant.ofEpochSecond((long) (EPOCH_TS/SECOND), EPOCH_TS % SECOND)")\ +.update_view("EVT_ID = EVT_map.containsKey(EVT_TYP) ? (int) EVT_map[EVT_TYP] : null") + + +rp = TableReplayer("2024-10-10T02:30:00 ET", "2024-10-25T02:40:00 ET") +ticking_data = rp.add_table(static_data, "EPOCH_TS") +rp.start() + +book = bookbuilder.build_book(source=ticking_data,\ + book_depth = 1,\ + timestamp_col = "EPOCH_TS",\ + size_col = "QTY",\ + side_col = "SIDE",\ + op_col = "EVT_ID",\ + price_col = "PRC",\ + group_cols = ["SYMB"]) diff --git a/docker/data/storage/notebooks/bookbuilder.py b/docker/data/storage/notebooks/bookbuilder.py index 05e0b57..a1692ba 100644 --- a/docker/data/storage/notebooks/bookbuilder.py +++ b/docker/data/storage/notebooks/bookbuilder.py @@ -1,12 +1,13 @@ from typing import Union, List - import jpy from deephaven.table import Table _J_BookBuilder = jpy.get_type("io.deephaven.book.PriceBook") -def build_book(source: Table, - book_depth: int = 5, + +def build_book_with_snap(source: Table, + snapshot: Table, + book_depth: int = 2, batch_timestamps: bool = False, timestamp_col: str = "Timestamp", size_col: str = "Size", @@ -14,4 +15,11 @@ def build_book(source: Table, op_col: str = "Op", price_col: str = "Price", group_cols: Union[str, List[str]] = ["Sym"]): - return Table(_J_BookBuilder.build(source.j_object, book_depth, batch_timestamps, timestamp_col, size_col, side_col, op_col, price_col, group_cols)) \ No newline at end of file + + if snapshot is not None: + snapshot = snapshot.j_object + + return Table(_J_BookBuilder.build(source.j_object, snapshot, book_depth, batch_timestamps, timestamp_col, size_col, side_col, op_col, price_col, group_cols)) + + + diff --git a/docker/data/storage/notebooks/test_snapshot.py b/docker/data/storage/notebooks/test_snapshot.py new file mode 100644 index 0000000..82dd144 --- /dev/null +++ b/docker/data/storage/notebooks/test_snapshot.py @@ -0,0 +1,72 @@ +from deephaven.parquet import read +from deephaven import TableReplayer, merge, read_csv +from deephaven.time import dh_now, to_j_instant +from datetime import datetime, timezone, timedelta +from deephaven.numpy import to_numpy +from deephaven import parquet as dhpq +import zoneinfo +import bookbuilder + +# OAK: new order ack; +# CRAK: cancel replace ack; +# CC: order cancel; +# INF: internal fill; +# AWF: away market fill. + +EVT_map = {"Order Ack": 1, "Cancel Replace Ack" : 2, "Cancel Order": 3, "Internal Fill": 4, "Away Market Fill": 5} +order_sample = read_csv("/data/order_sample.csv").view(["EVT_TYP", "SYMB", "EPOCH_TS", "ORD_ID=CLIENT_ORD_ID", "ORD_QTY=QTY", "EXEC_QTY=(int) null", "CXL_QTY=(int) null", "PRC", "SIDE"])\ + .sort("EPOCH_TS") + +# Align timestamps to now +now_time = dh_now() +order_sample_start = to_numpy(order_sample, cols=["EPOCH_TS"]).flatten()[0] +order_sample = order_sample.update_view(["NowTime = now_time.getEpochSecond()*SECOND + now_time.getNano()", + "Diff = NowTime - (long) order_sample_start", + f"EPOCH_TS_Nano = EPOCH_TS+Diff", + f"EPOCH_TS = Instant.ofEpochSecond((long) ((EPOCH_TS_Nano)/SECOND), (EPOCH_TS_Nano) % SECOND)", + # We are ignoring the actual book build for now so make everything 1... + "EVT_ID = 1"]) + + +# Get some old book. Choose where to split the data... +split_time = to_j_instant(to_numpy(order_sample, cols=["EPOCH_TS"]).flatten()[5]) +old_data = order_sample.where(f"EPOCH_TS < split_time") + +old_book = bookbuilder.build_book_with_snap(old_data,\ + snapshot = None,\ + book_depth = 3,\ + timestamp_col = "EPOCH_TS",\ + size_col = "ORD_QTY",\ + side_col = "SIDE",\ + op_col = "EVT_ID",\ + price_col = "PRC",\ + group_cols = ["SYMB"]).last_by("SYMB") +old_book = old_book.snapshot() + +# Save the book for later +dhpq.write(old_book, "/tmp/old_book.parquet") + + + +# Start up the rest of the data +# And run for 10 minutes +new_data = order_sample.where("EPOCH_TS >= split_time") +rp = TableReplayer(now_time.minusNanos(5*1000000000), now_time.plusNanos(10*60*1000000000)) +ticking_data = rp.add_table(new_data, "EPOCH_TS") +rp.start() + +# Load old book (or just grab the variable if running in one sequence) +old_book_snapshot = dhpq.read("/tmp/old_book.parquet") + +# Make new book starting with old one +book = bookbuilder.build_book_with_snap(source=ticking_data,\ + snapshot = old_book_snapshot,\ + book_depth = 3,\ + timestamp_col = "EPOCH_TS",\ + size_col = "ORD_QTY",\ + side_col = "SIDE",\ + op_col = "EVT_ID",\ + price_col = "PRC",\ + group_cols = ["SYMB"]).last_by("SYMB") + + diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index be590e4..022357d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -6,8 +6,9 @@ services: dockerfile: docker/Dockerfile ports: - '${DEEPHAVEN_PORT:-20000}:10000' + - '5005:5005' volumes: - ./data:/data environment: - PYTHONPATH=${PYTHONPATH}:/data/storage/notebooks - - START_OPTS=-Xmx40g -DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler + - START_OPTS=-Xmx40g -DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 diff --git a/scripts/book_utils.py b/scripts/book_utils.py new file mode 100644 index 0000000..12c9577 --- /dev/null +++ b/scripts/book_utils.py @@ -0,0 +1,58 @@ +from deephaven.parquet import read +from deephaven import TableReplayer, merge, read_csv +from deephaven.time import dh_now, to_j_instant +from datetime import datetime, timezone, timedelta +from deephaven.numpy import to_numpy +from deephaven import parquet as dhpq +import zoneinfo + + +from typing import Union, List +import jpy +from deephaven.table import Table + +_J_BookBuilder = jpy.get_type("io.deephaven.book.PriceBook") + + +def build_book_with_snap(source: Table, + snapshot: Table, + book_depth: int = 2, + batch_timestamps: bool = False, + timestamp_col: str = "EPOCH_TS", + size_col: str = "ORD_QTY", + side_col: str = "SIDE", + op_col: str = "EVT_ID", + price_col: str = "PRC", + ord_id_col: str = "ORD_ID", + group_cols: Union[str, List[str]] = ["SYMB"]): + + if snapshot is not None: + snapshot = snapshot.j_object + + return Table(_J_BookBuilder.build(source.j_object, snapshot, book_depth, batch_timestamps, timestamp_col, size_col, side_col, op_col, price_col, ord_id_col, group_cols)) + + +def prepare_data(start_time): + # OAK: new order ack; + # CRAK: cancel replace ack; + # CC: order cancel; + # INF: internal fill; + # AWF: away market fill. + EVT_map = {"Order Ack": 1, "Cancel Replace Ack" : 2, "Cancel Order": 3, "Internal Fill": 4, "Away Market Fill": 5} + + order_sample = read_csv("/tmp/data/order_sample.csv").view(["EVT_TYP", "FIRM= EPOCH_TS % 2 = 0 ? `Traders Inc` : `Big Bank Inc`", "SYMB", "EPOCH_TS", "ORD_ID=ORD_ID_PUB", "ORD_QTY=QTY", "EXEC_QTY=(int) null", "CXL_QTY=(int) null", "PRC", "SIDE"])\ + .sort("EPOCH_TS") + + + # Align timestamps to now + now_time = start_time + order_sample_start = to_numpy(order_sample, cols=["EPOCH_TS"]).flatten()[0] + order_sample = order_sample.update_view(["NowTime = now_time.getEpochSecond()*SECOND + now_time.getNano()", + "Diff = NowTime - (long) order_sample_start", + f"EPOCH_TS_Nano = EPOCH_TS+Diff", + f"EPOCH_TS = Instant.ofEpochSecond((long) ((EPOCH_TS_Nano)/SECOND), (EPOCH_TS_Nano) % SECOND)", + # We are ignoring the actual book build for now so make everything 1... + "EVT_ID = 1"]) + + return order_sample + diff --git a/scripts/snap_resume.py b/scripts/snap_resume.py new file mode 100644 index 0000000..ff38fea --- /dev/null +++ b/scripts/snap_resume.py @@ -0,0 +1,90 @@ +from deephaven.time import dh_today, dh_now, to_j_instant +from deephaven import parquet as dhpq +from deephaven import new_table, time_table, TableReplayer, merge, read_csv +from deephaven.table_listener import listen +from deephaven.column import datetime_col +from deephaven.numpy import to_numpy +from deephaven.execution_context import get_exec_ctx +from deephaven_enterprise.notebook import meta_import +from deephaven_enterprise import system_table_logger as stl + +meta_import(db, "notebook") +from notebook.book_utils import prepare_data, build_book_with_snap + + +#--- Prepare some fake data ---# +trade_date = dh_today() +start_time = dh_now() +split_time = start_time.plusSeconds(30*3) #1.5 minutes +order_sample = prepare_data(start_time).update_view(f"Date = `{trade_date}`") + +old_data = order_sample.where(f"EPOCH_TS < split_time") +rp = TableReplayer(start_time, split_time) +ticking_data = rp.add_table(order_sample, "EPOCH_TS") +rp.start() + + +#--- Run the book while saving snapshots ---# +curr_book = build_book_with_snap(ticking_data,\ + snapshot = None,\ + book_depth = 3).last_by("SYMB") + + +# Save snaps with listener. +# (10 seconds just to get some quick snaps. In practice, should be closer to ~5 minutes) +snap_freq = "PT10S" +ctx = get_exec_ctx() + +trigger = ( + time_table(snap_freq, None) + .rename_columns(["SnapTime = Timestamp"]) +) + +# Listener function triggered by the time_table +def log_table(update, is_replay): + with ctx: + # Get the snap time and last timestamp from the source data + snap_time = dh_now() + last_tick_time = to_j_instant(to_numpy(ticking_data.tail(1), cols=["EPOCH_TS"]).flatten()[0]) + to_append = curr_book.update(["SnapTime = snap_time", "LastTickTime = last_tick_time"]).snapshot() + stl.log_table("BookStates", "ExBook", to_append, columnPartition=dh_today(), \ + codecs={"Bid_Price" : stl.double_array_codec(), \ + "Bid_Timestamp" : stl.long_array_codec(), \ + "Bid_Size" : stl.int_array_codec(), \ + "Ask_Price" : stl.double_array_codec(), \ + "Ask_Timestamp" : stl.long_array_codec(), \ + "Ask_Size" : stl.int_array_codec()}) + +handle = listen(trigger, log_table) + +# Remove handle after a few minutes. +# Optionally view all snapshots. +handle.stop() +book_snapshots = db.live_table("BookStates", "ExBook").where("Date = today()") + + +#--- Resume from a certain time (adjust trade_time) ---# +trade_time = "11:23:55" +source = order_sample + +def prepare_resume(trade_date, trade_time, source): + # Get first snapshot right before the given time + resume_time = to_j_instant(trade_date + "T" + trade_time + " ET") + latest_snap = db.live_table("BookStates", "ExBook").where([f"Date = `{trade_date}`", "SnapTime <= resume_time"])\ + .sort("SnapTime")\ + .last_by("SYMB").snapshot() + + # Get the actual last tick time so we can filter the source data from then, on + last_tick_time = to_j_instant(to_numpy(latest_snap.tail(1), cols=["LastTickTime"]).flatten()[0]) + source_since = source.where([f"Date = `{trade_date}`", "EPOCH_TS > last_tick_time"]) + + return latest_snap, source_since + +old_book, new_source = prepare_resume(trade_date, trade_time, source) + +# Make new book starting with old one +book = build_book_with_snap(source=new_source,\ + snapshot = old_book,\ + book_depth = 3).last_by("SYMB") + + diff --git a/scripts/with_ids.py b/scripts/with_ids.py new file mode 100644 index 0000000..b716559 --- /dev/null +++ b/scripts/with_ids.py @@ -0,0 +1,19 @@ +from deephaven.time import dh_today, dh_now, to_j_instant +from deephaven_enterprise.notebook import meta_import + +meta_import(db, "notebook") +from notebook.book_utils import prepare_data, build_book_with_snap + +#--- Prepare some fake data ---# +trade_date = dh_today() +start_time = dh_now() +split_time = start_time.plusSeconds(30*3) #1.5 minutes +order_sample = prepare_data(start_time).update_view(f"Date = `{trade_date}`") + + +#--- Run the book ---# +curr_book = build_book_with_snap(order_sample,\ + snapshot = None,\ + book_depth = 3, + group_cols=["FIRM", "SYMB"]).last_by(["FIRM", "SYMB"]) + diff --git a/src/main/java/io/deephaven/book/PriceBook.java b/src/main/java/io/deephaven/book/PriceBook.java index 1ed7b4a..57f9528 100644 --- a/src/main/java/io/deephaven/book/PriceBook.java +++ b/src/main/java/io/deephaven/book/PriceBook.java @@ -1,3 +1,7 @@ + +// Should we just log the book+lastBy to a system table instead of taking snapshots? + + package io.deephaven.book; @@ -14,7 +18,11 @@ import it.unimi.dsi.fastutil.doubles.Double2IntOpenHashMap; import it.unimi.dsi.fastutil.doubles.Double2LongOpenHashMap; import org.jetbrains.annotations.NotNull; +import io.deephaven.engine.table.impl.sources.ReinterpretUtils; +import io.deephaven.time.DateTimeUtils; +import java.time.Instant; +import java.util.stream.Collectors; import java.util.*; /** @@ -39,25 +47,39 @@ * By default, the PriceBook will group input rows that have identical timestamps into a single emitted output row. * If this is not the desired behavior, or you require more fine grained control of the columns used to build the book * you may use the more specific builder method - * {@link #build(Table, int, boolean, String, String, String, String, String, String...)} + * {@link #build(Table, Table, int, boolean, String, String, String, String, String, String, String...)} *

*

*

* The following example creates a book of depth 5, that does NOT group identical timestamps, and groups input rows by "Symbol" and "Exchange" *

{@code
- *      book = PriceBook.build(orderStream, 5, false, "Timestamp", "OrderSize", "OrderSize", "BookOp", "Price", "Symbol", "Exchange")
+ *      book = PriceBook.build(orderStream, 5, false, "Timestamp", "OrderSize", "OrderSize", "BookOp", "Price", "OrderId", "Symbol", "Exchange")
  *      }
* */ public class PriceBook { private static final int CHUNK_SIZE = 2048; - private static final int SIDE_BUY = 0; - private static final int SIDE_SELL = 1; + private static final int SIDE_BUY = 1; + private static final int SIDE_SELL = 2; + // These will need to be properly implemented private static final int OP_INSERT = 1; private static final int OP_REMOVE = 2; + private static final int OP_CANCEL = 3; + private static final int OP_INTERNAL_FILL = 4; + private static final int OP_AWAY_FILL = 5; + + private static final String BID_PRC_NAME = "Bid_Price"; + private static final String BID_TIME_NAME = "Bid_Timestamp"; + private static final String BID_SIZE_NAME = "Bid_Size"; + private static final String BID_ORDID_NAME = "Bid_OrderId"; - private final Map states = new HashMap<>(); + private static final String ASK_PRC_NAME = "Ask_Price"; + private static final String ASK_TIME_NAME = "Ask_Timestamp"; + private static final String ASK_SIZE_NAME = "Ask_Size"; + private static final String ASK_ORDID_NAME = "Ask_OrderId"; + + private Map states; private final boolean batchTimestamps; private final int depth; @@ -67,6 +89,7 @@ public class PriceBook { private final ColumnSource sizeSource; private final ColumnSource sideSource; private final ColumnSource opSource; + private final ColumnSource ordIdSource; private final TupleSource keySource; // endregion @@ -82,16 +105,19 @@ public class PriceBook { final ObjectArraySource bidPriceResults; final ObjectArraySource bidTimeResults; final ObjectArraySource bidSizeResults; + final ObjectArraySource bidOrdIdResults; final ObjectArraySource askPriceResults; final ObjectArraySource askTimeResults; final ObjectArraySource askSizeResults; + final ObjectArraySource askOrdIdResults; // endregion private final boolean sourceIsBlink; private PriceBook(@NotNull final Table table, + final Table snapshot, final int depth, final boolean batchTimestamps, @NotNull String timestampColumnName, @@ -99,18 +125,24 @@ private PriceBook(@NotNull final Table table, @NotNull String sideColumnName, @NotNull String opColumnName, @NotNull String priceColumnName, + @NotNull String ordIdColumnName, @NotNull String... groupingCols) { final QueryTable source = (QueryTable) table.coalesce(); this.batchTimestamps = batchTimestamps; this.depth = depth; this.sourceIsBlink = BlinkTableTools.isBlink(source); + // Need to set states + this.states = new HashMap<>(); + + // Begin by getting references to the column sources from the input table to process later. - this.timeSource = source.getColumnSource(timestampColumnName).reinterpret(long.class); + this.timeSource = ReinterpretUtils.instantToLongSource(source.getColumnSource(timestampColumnName)); this.priceSource = source.getColumnSource(priceColumnName); this.sizeSource = source.getColumnSource(sizeColumnName); this.sideSource = source.getColumnSource(sideColumnName); this.opSource = source.getColumnSource(opColumnName); + this.ordIdSource = source.getColumnSource(ordIdColumnName); // Since we may group by more than one column (say Symbol, Exchange) we want to create a single key object to look into // the book state map. Packing the key sources into a tuple does this neatly. @@ -136,34 +168,77 @@ private PriceBook(@NotNull final Table table, bidPriceResults = new ObjectArraySource<>(double[].class); bidTimeResults = new ObjectArraySource<>(long[].class); bidSizeResults = new ObjectArraySource<>(int[].class); - columnSourceMap.put("Bid_Price", bidPriceResults); - columnSourceMap.put("Bid_Timestamp", bidTimeResults); - columnSourceMap.put("Bid_Size", bidSizeResults); + bidOrdIdResults = new ObjectArraySource<>(long[].class); + columnSourceMap.put(BID_PRC_NAME, bidPriceResults); + columnSourceMap.put(BID_TIME_NAME, bidTimeResults); + columnSourceMap.put(BID_SIZE_NAME, bidSizeResults); + columnSourceMap.put(BID_ORDID_NAME, bidOrdIdResults); askPriceResults = new ObjectArraySource<>(double[].class); askTimeResults = new ObjectArraySource<>(long[].class); askSizeResults = new ObjectArraySource<>(int[].class); - columnSourceMap.put("Ask_Price", askPriceResults); - columnSourceMap.put("Ask_Timestamp", askTimeResults); - columnSourceMap.put("Ask_Size", askSizeResults); + askOrdIdResults = new ObjectArraySource<>(long[].class); + columnSourceMap.put(ASK_PRC_NAME, askPriceResults); + columnSourceMap.put(ASK_TIME_NAME, askTimeResults); + columnSourceMap.put(ASK_SIZE_NAME, askSizeResults); + columnSourceMap.put(ASK_ORDID_NAME, askOrdIdResults); - // Finally, create the result table for the user + // Set result table final OperationSnapshotControl snapshotControl = source.createSnapshotControlIfRefreshing(OperationSnapshotControl::new); final MutableObject result = new MutableObject<>(); final MutableObject listenerHolder = new MutableObject<>(); + final long initialRowCount; + final Map tempStates; + final Context ctx = new Context(depth); + + if (snapshot != null) { + // Process state from snapshot here and just make a copy in the initializeWithSnapshot + // context, so we dont keep re-processing things if that init fails + tempStates = processInitBook(snapshot, groupingCols); + + // Record changes to initialize the output table + final long timeNowNanos = DateTimeUtils.epochNanos(DateTimeUtils.now()); + for (var entry : tempStates.entrySet()) { + recordChange(ctx, entry.getValue(), timeNowNanos, entry.getKey()); + } + + // Keep track of rows + initialRowCount = ctx.rowsAdded; + + } else { + tempStates = null; + initialRowCount = 0; + } + + + // result table will be the snapshot + any new data from the source QueryTable.initializeWithSnapshot("bookBuilder", snapshotControl, (prevRequested, beforeClock) -> { + + // Deep copy states + if (snapshot != null && snapshotControl != null) { + this.states = tempStates.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + entry -> entry.getValue().deepCopy())); + } + final boolean usePrev = prevRequested && source.isRefreshing(); // Initialize the internal state by processing the entire input table. This will be done asynchronously from // the LTM thread and so it must know if it should use previous values or current values. - final long rowsAdded = processAdded(usePrev ? source.getRowSet().prev() : source.getRowSet(), usePrev); + processAdded(usePrev ? source.getRowSet().prev() : source.getRowSet(), usePrev, ctx); // + + // New book table final QueryTable bookTable = new QueryTable( - (rowsAdded == 0 ? RowSetFactory.empty() : RowSetFactory.fromRange(0, rowsAdded - 1)).toTracking() + (RowSetFactory.flat(ctx.rowsAdded)).toTracking() , columnSourceMap); + + if (snapshotControl != null) { columnSourceMap.values().forEach(ColumnSource::startTrackingPrevValues); bookTable.setRefreshing(true); @@ -197,7 +272,8 @@ private PriceBook(@NotNull final Table table, } else { bookListener = null; } - } +} + /** * Process all the added rows, potentially using previous values. @@ -207,9 +283,9 @@ private PriceBook(@NotNull final Table table, * @return the number of new rows emitted to the result. */ @SuppressWarnings("unchecked") - private long processAdded(RowSet added, boolean usePrev) { + private long processAdded(RowSet added, boolean usePrev, Context ctx_) { // First create the context object in a try-with-resources so it gets automatically cleaned up when we're done. - try(final Context ctx = new Context(depth)) { + try(final Context ctx = ctx_ == null ? new Context(depth) : ctx_) { // Next we get an iterator into the added index so that we can process the update in chunks. final RowSequence.Iterator okit = added.getRowSequenceIterator(); @@ -220,6 +296,7 @@ private long processAdded(RowSet added, boolean usePrev) { final ChunkSource.FillContext sizefc = ctx.makeFillContext(sizeSource); final ChunkSource.FillContext sidefc = ctx.makeFillContext(sideSource); final ChunkSource.FillContext opfc = ctx.makeFillContext(opSource); + final ChunkSource.FillContext oifc = ctx.makeFillContext(ordIdSource); final ChunkSource.FillContext keyfc = ctx.makeFillContext(keySource); BookState currentState = null; @@ -241,6 +318,7 @@ private long processAdded(RowSet added, boolean usePrev) { sideSource.fillPrevChunk(sidefc, (WritableChunk) ctx.sideChunk, nextKeys); sizeSource.fillPrevChunk(sizefc, (WritableChunk) ctx.sizeChunk, nextKeys); priceSource.fillPrevChunk(pricefc, (WritableChunk) ctx.priceChunk, nextKeys); + ordIdSource.fillPrevChunk(oifc, (WritableChunk) ctx.ordIdChunk, nextKeys); timeSource.fillPrevChunk(timefc, (WritableChunk) ctx.timeChunk, nextKeys); } else { keySource.fillChunk(keyfc, (WritableChunk) ctx.keyChunk, nextKeys); @@ -248,6 +326,7 @@ private long processAdded(RowSet added, boolean usePrev) { sideSource.fillChunk(sidefc, (WritableChunk) ctx.sideChunk, nextKeys); sizeSource.fillChunk(sizefc, (WritableChunk) ctx.sizeChunk, nextKeys); priceSource.fillChunk(pricefc, (WritableChunk) ctx.priceChunk, nextKeys); + ordIdSource.fillChunk(pricefc, (WritableChunk) ctx.ordIdChunk, nextKeys); timeSource.fillChunk(timefc, (WritableChunk) ctx.timeChunk, nextKeys); } @@ -277,9 +356,10 @@ private long processAdded(RowSet added, boolean usePrev) { final int size = ctx.sizeChunk.get(ii); final int side = ctx.sideChunk.get(ii); final int op = ctx.opChunk.get(ii); + final long ordId = ctx.ordIdChunk.get(ii); final long timestamp = ctx.timeChunk.get(ii); - batchUpdated |= currentState.update(timestamp, price, size, side, op); + batchUpdated |= currentState.update(timestamp, price, size, side, op, ordId); // We should only log a row if we are either not batching, or the timestamp changed final boolean logRowGate = (!batchTimestamps || timestamp != lastTime); @@ -305,6 +385,98 @@ private long processAdded(RowSet added, boolean usePrev) { } } + /** + * Process all rows from a book table snapshot + * + * @param t the snapshot of the book table + * @param groupings the grouping columns of the book table + * @return the Map of grouping keys to BookState + */ + final Map processInitBook(final Table t, String... groupings) { + // Must be static + assert !t.isRefreshing(); + + final Map initStates = new HashMap<>(); + final ColumnSource bidTSSource = t.getColumnSource(BID_TIME_NAME); + final ColumnSource askTSSource = t.getColumnSource(ASK_TIME_NAME); + final ColumnSource bidSizesSource = t.getColumnSource(BID_SIZE_NAME); + final ColumnSource askSizesSource = t.getColumnSource(ASK_SIZE_NAME); + final ColumnSource bidsSource = t.getColumnSource(BID_PRC_NAME); + final ColumnSource asksSource = t.getColumnSource(ASK_PRC_NAME); + final ColumnSource bidOrdIdSource = t.getColumnSource(BID_ORDID_NAME); + final ColumnSource askOrdIdSource = t.getColumnSource(ASK_ORDID_NAME); + + + final List> groupingSources = new ArrayList<>(); + for (final String grouping : groupings) { + final ColumnSource colSource = t.getColumnSource(grouping); + groupingSources.add(colSource); + } + + TupleSource snapKeySource = TupleSourceFactory.makeTupleSource(Arrays.stream(groupings).map(t::getColumnSource).toArray(ColumnSource[]::new)); + + try(final InitContext context = new InitContext(depth, groupingSources)) { + // Next we get an iterator into the added index so that we can process the update in chunks. + final RowSequence.Iterator okit = t.getRowSet().getRowSequenceIterator(); + + final ChunkSource.FillContext bidTimeFc = context.makeFillContext(bidTSSource); + final ChunkSource.FillContext askTimeFc = context.makeFillContext(askTSSource); + final ChunkSource.FillContext bidSizeFc = context.makeFillContext(bidSizesSource); + final ChunkSource.FillContext askSizeFc = context.makeFillContext(askSizesSource); + final ChunkSource.FillContext bidPriceFc = context.makeFillContext(bidsSource); + final ChunkSource.FillContext askPriceFc = context.makeFillContext(asksSource); + final ChunkSource.FillContext bidOrdIdFc = context.makeFillContext(bidOrdIdSource); + final ChunkSource.FillContext askOrdIdFc = context.makeFillContext(askOrdIdSource); + final ChunkSource.FillContext[] groupingfc = new ChunkSource.FillContext[groupingSources.size()]; + for (int i = 0; i < groupingSources.size(); i++) { + groupingfc[i] = context.makeFillContext(groupingSources.get(i)); + } + + while (okit.hasMore()) { + context.sc.reset(); + + // Grab up to the next CHUNK_SIZE rows + final RowSequence nextKeys = okit.getNextRowSequenceWithLength(CHUNK_SIZE); + + bidTSSource.fillChunk(bidTimeFc, (WritableChunk) context.bidTimeChunk, nextKeys); + askTSSource.fillChunk(askTimeFc, (WritableChunk) context.askTimeChunk, nextKeys); + bidSizesSource.fillChunk(bidSizeFc, (WritableChunk) context.bidSizeChunk, nextKeys); + askSizesSource.fillChunk(askSizeFc, (WritableChunk) context.askSizeChunk, nextKeys); + bidsSource.fillChunk(bidPriceFc, (WritableChunk) context.bidPriceChunk, nextKeys); + asksSource.fillChunk(askPriceFc, (WritableChunk) context.askPriceChunk, nextKeys); + bidOrdIdSource.fillChunk(bidOrdIdFc, (WritableChunk) context.bidOrdIdChunk, nextKeys); + askOrdIdSource.fillChunk(askOrdIdFc, (WritableChunk) context.askOrdIdChunk, nextKeys); + + for (int i = 0; i < groupingSources.size(); i++) { + groupingSources.get(i).fillChunk(groupingfc[i], (WritableChunk) context.groupingChunks.get(i), nextKeys); + } + + for (int ii = 0; ii < nextKeys.size(); ii++) { + final int finalII = ii; + final Object key = snapKeySource.createTuple(finalII); + + if (initStates.get(key) == null) { + initStates.put(key, new BookState(depth, + context.bidTimeChunk.get(ii), + context.askTimeChunk.get(ii), + context.bidSizeChunk.get(ii), + context.askSizeChunk.get(ii), + context.bidPriceChunk.get(ii), + context.askPriceChunk.get(ii), + context.bidOrdIdChunk.get(ii), + context.askOrdIdChunk.get(ii) + )); + + } else { + throw new IllegalArgumentException("Input book must have only one key per row"); + } + } + } + } + + return initStates; + } + /** * Write out a single row of updates to the output table. * @@ -330,11 +502,11 @@ public void recordChange(Context ctx, BookState state, long timestamp, Object ke // Fill out the bid columns fillFrom(nextIndex, newSize, state.bids, ctx, Comparator.reverseOrder(), - bidPriceResults, bidTimeResults, bidSizeResults); + bidPriceResults, bidTimeResults, bidSizeResults, bidOrdIdResults); // Fill out the ask columns fillFrom(nextIndex, newSize, state.asks, ctx, Comparator.naturalOrder(), - askPriceResults, askTimeResults, askSizeResults); + askPriceResults, askTimeResults, askSizeResults, askOrdIdResults); // Increment the number of rows added. ctx.rowsAdded++; @@ -357,11 +529,13 @@ private void fillFrom(long destination, Comparator comparator, WritableColumnSource priceDestination, WritableColumnSource timeDestination, - WritableColumnSource sizeDestination) { + WritableColumnSource sizeDestination, + WritableColumnSource ordIdDestination) { final int count = book.bestPrices.size(); final double[] prices = new double[count]; final long[] times = new long[count]; final int[] sizes = new int[count]; + final long[] ordIds = new long[count]; // First copy the best prices from the book into our temporary array and sort it appropriately Arrays.sort(book.bestPrices.toArray(ctx.priceBuf), 0, book.bestPrices.size(), comparator); @@ -371,6 +545,7 @@ private void fillFrom(long destination, prices[ii] = price; times[ii] = book.timestampMap.get(price); sizes[ii] = book.sizeMap.get(price); + ordIds[ii] = book.ordIdMap.get(price); } priceDestination.ensureCapacity(newSize); @@ -381,6 +556,9 @@ private void fillFrom(long destination, sizeDestination.ensureCapacity(newSize); sizeDestination.set(destination, sizes); + + ordIdDestination.ensureCapacity(newSize); + ordIdDestination.set(destination, ordIds); } /** @@ -395,23 +573,94 @@ private void fillFrom(long destination, private static class Book { private final MinMaxPriorityQueue bestPrices; private final PriorityQueue overflowPrices; - private final Double2IntOpenHashMap sizeMap = new Double2IntOpenHashMap(); - private final Double2LongOpenHashMap timestampMap = new Double2LongOpenHashMap(); + private final Double2IntOpenHashMap sizeMap; + private final Double2LongOpenHashMap timestampMap; + private final Double2LongOpenHashMap ordIdMap; - private final long depth; + private final int depth; private final Comparator comparator; + Book(int depth, + Comparator comparator, + long[] timeArr, + int[] sizeArr, + double[] priceArr, + long[] ordIdArr) { + + this.depth = depth; + this.comparator = comparator; + this.sizeMap = new Double2IntOpenHashMap(priceArr, sizeArr); + this.timestampMap = new Double2LongOpenHashMap(priceArr, timeArr); + this.ordIdMap = new Double2LongOpenHashMap(priceArr, ordIdArr); + this.sizeMap.defaultReturnValue(-1); + this.timestampMap.defaultReturnValue(-1); + this.ordIdMap.defaultReturnValue(-1); + + // Insert prices into minmax q ... + bestPrices = MinMaxPriorityQueue.orderedBy(comparator).maximumSize(depth).create(); + for(int ii = 0; ii < priceArr.length; ii++) { + bestPrices.add(priceArr[ii]); + } + + overflowPrices = new PriorityQueue<>(comparator); + } + + Book(int depth, Comparator comparator) { this.depth = depth; this.comparator = comparator; + this.sizeMap = new Double2IntOpenHashMap(); + this.timestampMap = new Double2LongOpenHashMap(); + this.ordIdMap = new Double2LongOpenHashMap(); this.sizeMap.defaultReturnValue(-1); this.timestampMap.defaultReturnValue(-1); + this.ordIdMap.defaultReturnValue(-1); bestPrices = MinMaxPriorityQueue.orderedBy(comparator).maximumSize(depth).create(); overflowPrices = new PriorityQueue<>(comparator); } + + // Constructor for copy + Book(int depth, + Comparator comparator, + Double2IntOpenHashMap sizeMap, + Double2LongOpenHashMap timestampMap, + Double2LongOpenHashMap ordIdMap, + MinMaxPriorityQueue bestPrices, + PriorityQueue overflowPrices) { + + this.depth = depth; + this.comparator = comparator; + this.sizeMap = sizeMap; + this.timestampMap = timestampMap; + this.ordIdMap = ordIdMap; + this.sizeMap.defaultReturnValue(-1); + this.timestampMap.defaultReturnValue(-1); + this.ordIdMap.defaultReturnValue(-1); + this.bestPrices = bestPrices; + this.overflowPrices = overflowPrices; + } + + private Book deepCopy() { + + MinMaxPriorityQueue bestPricesCopy = MinMaxPriorityQueue.create(); + for (Double element : this.bestPrices) { + bestPricesCopy.add(element.doubleValue()); + } + + PriorityQueue overflowPricesCopy = new PriorityQueue(overflowPrices); + + return new Book(this.depth, this.comparator, + (Double2IntOpenHashMap) this.sizeMap.clone(), + (Double2LongOpenHashMap) this.timestampMap.clone(), + (Double2LongOpenHashMap) this.ordIdMap.clone(), + (MinMaxPriorityQueue) bestPricesCopy, + (PriorityQueue) overflowPricesCopy); + } + + /** - * Update the specified price in the book. If the price was new and larger than the any of the prices in the + * Update the specified price in the book. If the price was new and larger than any of the prices in the * bestPrices queue, the last price in the bestPrices queue is moved to the backlog and the new price is inserted * into the bestPrices, otherwise it is inserted directly into the backlog. * @@ -420,8 +669,9 @@ private static class Book { * @param time the order time * @return true of the price was added */ - private boolean updatePrice(double price, int size, long time) { + private boolean updatePrice(double price, int size, long time, long ordId) { final long prevSize = sizeMap.put(price, size); + final long prevOrdId = ordIdMap.put(price, ordId); final long prevTime = timestampMap.put(price, time); // It's a new price! @@ -441,7 +691,7 @@ private boolean updatePrice(double price, int size, long time) { return true; } - return prevSize != size || prevTime != time; + return prevSize != size || prevTime != time || prevOrdId != ordId; } /** @@ -468,6 +718,7 @@ private boolean removeFrom(double price) { sizeMap.remove(price); timestampMap.remove(price); + ordIdMap.remove(price); return wasRemoved; } @@ -491,11 +742,35 @@ private static class BookState { final Book bids; final Book asks; + BookState(int depth, + long[] bidTimeArr, + long[] askTimeArr, + int[] bidSizeArr, + int[] askSizeArr, + double[] bidPriceArr, + double[] askPriceArr, + long[] bidOrdIdArr, + long[] askOrdIdArr) { + + bids = new Book(depth, Comparator.reverseOrder(), bidTimeArr, bidSizeArr, bidPriceArr, bidOrdIdArr); + asks = new Book(depth, Comparator.naturalOrder(), askTimeArr, askSizeArr, askPriceArr, askOrdIdArr); + + } + BookState(int depth) { bids = new Book(depth, Comparator.reverseOrder()); asks = new Book(depth, Comparator.naturalOrder()); } + BookState(Book bidsBook, Book asksBook) { + bids = bidsBook; + asks = asksBook; + } + + private BookState deepCopy() { + return new BookState(this.bids.deepCopy(), this.asks.deepCopy()); + } + /** * Update the book state with the specified price. If the book op was DELETE or REMOVE, or the size was 0 * the price will be removed from the book. @@ -511,7 +786,8 @@ public boolean update(final long time, final double price, final int size, final int side, - final int op) { + final int op, + final long ordId) { final Book book = (side == SIDE_SELL) ? asks: bids; // Remove this price from the book entirely. @@ -519,7 +795,7 @@ public boolean update(final long time, return book.removeFrom(price); } - return book.updatePrice(price, size, time); + return book.updatePrice(price, size, time, ordId); } } @@ -537,6 +813,7 @@ private static class Context implements SafeCloseable { final WritableIntChunk sizeChunk; final WritableIntChunk sideChunk; final WritableIntChunk opChunk; + final WritableLongChunk ordIdChunk; final WritableObjectChunk keyChunk; /* @@ -566,6 +843,7 @@ private static class Context implements SafeCloseable { sizeChunk = WritableIntChunk.makeWritableChunk(CHUNK_SIZE); sideChunk = WritableIntChunk.makeWritableChunk(CHUNK_SIZE); opChunk = WritableIntChunk.makeWritableChunk(CHUNK_SIZE); + ordIdChunk = WritableLongChunk.makeWritableChunk(CHUNK_SIZE); keyChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); } @@ -582,6 +860,7 @@ public void close() { sizeChunk.close(); sideChunk.close(); opChunk.close(); + ordIdChunk.close(); keyChunk.close(); } @@ -598,6 +877,81 @@ ChunkSource.FillContext makeFillContext(ChunkSource cs) { } } + private static class InitContext implements SafeCloseable { + /* + * Each of these WriteableChunks are used to process the update data more efficiently in linear chunks + * instead of iterating over an index. This avoids virtual method calls and is much more cache-friendly + */ + final WritableObjectChunk bidTimeChunk; + final WritableObjectChunk askTimeChunk; + final WritableObjectChunk bidSizeChunk; + final WritableObjectChunk askSizeChunk; + final WritableObjectChunk bidPriceChunk; + final WritableObjectChunk askPriceChunk; + final WritableObjectChunk bidOrdIdChunk; + final WritableObjectChunk askOrdIdChunk; + final List> groupingChunks; + + /* + * The SharedContext and FillContexts are used by the column sources when they copy data into the chunks + * above in order to share resources within a single update cycle. + */ + final SharedContext sc; + final List fillContexts = new ArrayList<>(10); + + InitContext(int depth, List> groupingSources) { + sc = SharedContext.makeSharedContext(); + bidTimeChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); + askTimeChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); + bidSizeChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); + askSizeChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); + bidPriceChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); + askPriceChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); + bidOrdIdChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); + askOrdIdChunk = WritableObjectChunk.makeWritableChunk(CHUNK_SIZE); + + groupingChunks = new ArrayList<>(); + for (int i = 0; i < groupingSources.size(); i++) { + groupingChunks.add(WritableObjectChunk.makeWritableChunk(CHUNK_SIZE)); + } + } + + /** + * At the end of an update cycle this must be invoked to close and release any shared resources that were claimed + *

+ * during the update cycle. + */ + @Override + public void close() { + sc.close(); + fillContexts.forEach(ChunkSource.FillContext::close); + bidTimeChunk.close(); + askTimeChunk.close(); + bidSizeChunk.close(); + askSizeChunk.close(); + bidPriceChunk.close(); + askPriceChunk.close(); + bidOrdIdChunk.close(); + askOrdIdChunk.close(); + + for (final WritableChunk chunk : groupingChunks) { + chunk.close(); + } + } + + /** + * Just a helper method to create fill contexts and save them so they can be cleaned up neatly on close. + * + * @param cs the column source + * @return a new fill context for that source. + */ + ChunkSource.FillContext makeFillContext(ChunkSource cs) { + final ChunkSource.FillContext fc = cs.makeFillContext(CHUNK_SIZE, sc); + fillContexts.add(fc); + return fc; + } + } + /** * This class listens for updates from the source table and pushes each row through the BookState for the symbol * indicated for the row, emitting a new row into the output table for each input row which results in a book @@ -659,7 +1013,7 @@ public void process() { } // First process all of the new rows - final long rowsAdded = processAdded(upstream.added(), false); + final long rowsAdded = processAdded(upstream.added(), false, null); // Handle the case where the input rows generate no book state changes, we don't want to accidentally // try to inject a -1 into the row set. @@ -680,11 +1034,13 @@ public void process() { } } + /** * Build a book of bid and ask prices with the specified number of levels from the requested table, grouping input rows by the * specified set of grouping columns. Levels will be represented as a set of columns (Price, Time, Size) for each level. * * @param source the table with the source data + * @param snapshot the table with the book * @param depth the desired book depth * @param batchTimestamps set to true to batch input rows with identical timestamps into the a single output row. * @param timestampColumnName the name of the source timestamp column @@ -692,12 +1048,14 @@ public void process() { * @param sideColumnName the name of the source side column * @param opColumnName the name of the source book-op column * @param priceColumnName the name of the price column + * @param ordIdColumnName the name of the Order ID column * @param groupingCols the columns to group the source table by * * @return a new table representing the current state of the book. This table will update as the source table updates. */ @SuppressWarnings("unused") public static QueryTable build(@NotNull Table source, + Table snapshot, int depth, boolean batchTimestamps, @NotNull String timestampColumnName, @@ -705,8 +1063,10 @@ public static QueryTable build(@NotNull Table source, @NotNull String sideColumnName, @NotNull String opColumnName, @NotNull String priceColumnName, + @NotNull String ordIdColumnName, @NotNull String... groupingCols) { final PriceBook book = new PriceBook(source, + snapshot, depth, batchTimestamps, timestampColumnName, @@ -714,6 +1074,7 @@ public static QueryTable build(@NotNull Table source, sideColumnName, opColumnName, priceColumnName, + ordIdColumnName, groupingCols); return book.resultTable;