Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,7 @@ outTest/
*.iws
*.iml
tmp/
**/.DS_Store
**/.DS_Store
*.pyc
*.pyo
*.csv
44 changes: 44 additions & 0 deletions BookStates.ExBook.schema
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<Table name="ExBook" namespace="BookStates" defaultMergeFormat="DeephavenV1" storageType="NestedPartitionedOnDisk">
<Partitions keyFormula="${autobalance_by_first_grouping_column}" />

<Column name="Date" dataType="String" columnType="Partitioning" />
<Column name="SYMB" dataType="String"/>
<Column name="SnapTime" dataType="DateTime"/>
<Column name="LastTickTime" dataType="DateTime"/>
<Column name="Timestamp" dataType="DateTime"/>

<Column name="Bid_Price" dataType="double[]" objectCodec="io.deephaven.enterprise.codec.DoubleArrayCodec"/>
<Column name="Bid_Timestamp" dataType="long[]" objectCodec="io.deephaven.enterprise.codec.LongArrayCodec"/>
<Column name="Bid_Size" dataType="int[]" objectCodec="io.deephaven.enterprise.codec.IntArrayCodec"/>

<Column name="Ask_Price" dataType="double[]" objectCodec="io.deephaven.enterprise.codec.DoubleArrayCodec"/>
<Column name="Ask_Timestamp" dataType="long[]" objectCodec="io.deephaven.enterprise.codec.LongArrayCodec"/>
<Column name="Ask_Size" dataType="int[]" objectCodec="io.deephaven.enterprise.codec.IntArrayCodec"/>


<Listener logFormat="1" listenerPackage="com.mycompany" listenerClass="ExampleDataFormat1Listener">

<ListenerImports>
import io.deephaven.enterprise.codec.DoubleArrayCodec;
import io.deephaven.enterprise.codec.LongArrayCodec;
import io.deephaven.enterprise.codec.IntArrayCodec;
</ListenerImports>


<Column name="Date" intradayType="none" />
<Column name="SYMB"/>
<Column name="SnapTime" dbSetter="DBTimeUtils.nanosToTime(SnapTime)"/>
<Column name="LastTickTime" dbSetter="DBTimeUtils.nanosToTime(LastTickTime)"/>
<Column name="Timestamp" dbSetter="DBTimeUtils.nanosToTime(Timestamp)"/>

<Column name="Bid_Price" intradayType="Blob" dbSetter="DoubleArrayCodec.decodeStatic(Bid_Price)"/>
<Column name="Bid_Timestamp" intradayType="Blob" dbSetter="LongArrayCodec.decodeStatic(Bid_Timestamp)"/>
<Column name="Bid_Size" intradayType="Blob" dbSetter="IntArrayCodec.decodeStatic(Bid_Size)"/>

<Column name="Ask_Price" intradayType="Blob" dbSetter="DoubleArrayCodec.decodeStatic(Ask_Price)"/>
<Column name="Ask_Timestamp" intradayType="Blob" dbSetter="LongArrayCodec.decodeStatic(Ask_Timestamp)"/>
<Column name="Ask_Size" intradayType="Blob" dbSetter="IntArrayCodec.decodeStatic(Ask_Size)"/>

</Listener>

</Table>
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ java {
dependencies {
implementation platform("io.deephaven:deephaven-bom:${dhcVersion}"),
"io.deephaven:deephaven-engine-table",
"it.unimi.dsi:fastutil:8.5.13"
"it.unimi.dsi:fastutil:8.5.13",
"io.deephaven:deephaven-engine-tuple",
"io.deephaven:deephaven-engine-tuplesource"
runtimeOnly "io.deephaven:deephaven-log-to-slf4j",
'ch.qos.logback:logback-classic:1.4.5'
}
Expand Down
Binary file added docker/data/Quotes.parquet
Binary file not shown.
11 changes: 5 additions & 6 deletions docker/data/storage/notebooks/Example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
from deephaven import TableReplayer
import bookbuilder

static_data = read("/data/Quotes.parquet")
rp = TableReplayer("2017-08-25T09:30:00 ET", "2017-08-25T23:59:59 ET")
ticking_data = rp.add_table(static_data, "Timestamp")
rp.start()
static_data_ = read("/data/Quotes.parquet")
rp_ = TableReplayer("2017-08-25T09:30:00 ET", "2017-08-25T23:59:59 ET")
ticking_data_ = rp_.add_table(static_data_, "Timestamp")
rp_.start()

book = bookbuilder.build_book(ticking_data) \
.last_by("Key")
book_ = bookbuilder.build_book(ticking_data_).last_by("Sym").rename_columns("SYMB=Sym")
Binary file not shown.
26 changes: 26 additions & 0 deletions docker/data/storage/notebooks/arrays.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from deephaven.parquet import read
from deephaven import TableReplayer, merge
import bookbuilder

# OAK: new order ack; CRAK: cancel replace ack; CC: order cancel;
# INF: internal fill; AWF: away market fill.

EVT_map = {"Order Ack": 1, "Cancel Replace Ack" : 1}#2}
static_data_ = read("/data/Quotes.parquet")

static_data = order_sample.update("EPOCH_TS = Instant.ofEpochSecond((long) (EPOCH_TS/SECOND), EPOCH_TS % SECOND)")\
.update_view("EVT_ID = EVT_map.containsKey(EVT_TYP) ? (int) EVT_map[EVT_TYP] : null")


rp = TableReplayer("2024-10-10T02:30:00 ET", "2024-10-25T02:40:00 ET")
ticking_data = rp.add_table(static_data, "EPOCH_TS")
rp.start()

book = bookbuilder.build_book(source=ticking_data,\
book_depth = 1,\
timestamp_col = "EPOCH_TS",\
size_col = "QTY",\
side_col = "SIDE",\
op_col = "EVT_ID",\
price_col = "PRC",\
group_cols = ["SYMB"])
16 changes: 12 additions & 4 deletions docker/data/storage/notebooks/bookbuilder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
from typing import Union, List

import jpy
from deephaven.table import Table

_J_BookBuilder = jpy.get_type("io.deephaven.book.PriceBook")

def build_book(source: Table,
book_depth: int = 5,

def build_book_with_snap(source: Table,
snapshot: Table,
book_depth: int = 2,
batch_timestamps: bool = False,
timestamp_col: str = "Timestamp",
size_col: str = "Size",
side_col: str = "Side",
op_col: str = "Op",
price_col: str = "Price",
group_cols: Union[str, List[str]] = ["Sym"]):
return Table(_J_BookBuilder.build(source.j_object, book_depth, batch_timestamps, timestamp_col, size_col, side_col, op_col, price_col, group_cols))

if snapshot is not None:
snapshot = snapshot.j_object

return Table(_J_BookBuilder.build(source.j_object, snapshot, book_depth, batch_timestamps, timestamp_col, size_col, side_col, op_col, price_col, group_cols))



72 changes: 72 additions & 0 deletions docker/data/storage/notebooks/test_snapshot.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from deephaven.parquet import read
from deephaven import TableReplayer, merge, read_csv
from deephaven.time import dh_now, to_j_instant
from datetime import datetime, timezone, timedelta
from deephaven.numpy import to_numpy
from deephaven import parquet as dhpq
import zoneinfo
import bookbuilder

# OAK: new order ack;
# CRAK: cancel replace ack;
# CC: order cancel;
# INF: internal fill;
# AWF: away market fill.

EVT_map = {"Order Ack": 1, "Cancel Replace Ack" : 2, "Cancel Order": 3, "Internal Fill": 4, "Away Market Fill": 5}
order_sample = read_csv("/data/order_sample.csv").view(["EVT_TYP", "SYMB", "EPOCH_TS", "ORD_ID=CLIENT_ORD_ID", "ORD_QTY=QTY", "EXEC_QTY=(int) null", "CXL_QTY=(int) null", "PRC", "SIDE"])\
.sort("EPOCH_TS")

# Align timestamps to now
now_time = dh_now()
order_sample_start = to_numpy(order_sample, cols=["EPOCH_TS"]).flatten()[0]
order_sample = order_sample.update_view(["NowTime = now_time.getEpochSecond()*SECOND + now_time.getNano()",
"Diff = NowTime - (long) order_sample_start",
f"EPOCH_TS_Nano = EPOCH_TS+Diff",
f"EPOCH_TS = Instant.ofEpochSecond((long) ((EPOCH_TS_Nano)/SECOND), (EPOCH_TS_Nano) % SECOND)",
# We are ignoring the actual book build for now so make everything 1...
"EVT_ID = 1"])


# Get some old book. Choose where to split the data...
split_time = to_j_instant(to_numpy(order_sample, cols=["EPOCH_TS"]).flatten()[5])
old_data = order_sample.where(f"EPOCH_TS < split_time")

old_book = bookbuilder.build_book_with_snap(old_data,\
snapshot = None,\
book_depth = 3,\
timestamp_col = "EPOCH_TS",\
size_col = "ORD_QTY",\
side_col = "SIDE",\
op_col = "EVT_ID",\
price_col = "PRC",\
group_cols = ["SYMB"]).last_by("SYMB")
old_book = old_book.snapshot()

# Save the book for later
dhpq.write(old_book, "/tmp/old_book.parquet")



# Start up the rest of the data
# And run for 10 minutes
new_data = order_sample.where("EPOCH_TS >= split_time")
rp = TableReplayer(now_time.minusNanos(5*1000000000), now_time.plusNanos(10*60*1000000000))
ticking_data = rp.add_table(new_data, "EPOCH_TS")
rp.start()

# Load old book (or just grab the variable if running in one sequence)
old_book_snapshot = dhpq.read("/tmp/old_book.parquet")

# Make new book starting with old one
book = bookbuilder.build_book_with_snap(source=ticking_data,\
snapshot = old_book_snapshot,\
book_depth = 3,\
timestamp_col = "EPOCH_TS",\
size_col = "ORD_QTY",\
side_col = "SIDE",\
op_col = "EVT_ID",\
price_col = "PRC",\
group_cols = ["SYMB"]).last_by("SYMB")


3 changes: 2 additions & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ services:
dockerfile: docker/Dockerfile
ports:
- '${DEEPHAVEN_PORT:-20000}:10000'
- '5005:5005'
volumes:
- ./data:/data
environment:
- PYTHONPATH=${PYTHONPATH}:/data/storage/notebooks
- START_OPTS=-Xmx40g -DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler
- START_OPTS=-Xmx40g -DAuthHandlers=io.deephaven.auth.AnonymousAuthenticationHandler -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
58 changes: 58 additions & 0 deletions scripts/book_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from deephaven.parquet import read
from deephaven import TableReplayer, merge, read_csv
from deephaven.time import dh_now, to_j_instant
from datetime import datetime, timezone, timedelta
from deephaven.numpy import to_numpy
from deephaven import parquet as dhpq
import zoneinfo


from typing import Union, List
import jpy
from deephaven.table import Table

_J_BookBuilder = jpy.get_type("io.deephaven.book.PriceBook")


def build_book_with_snap(source: Table,
snapshot: Table,
book_depth: int = 2,
batch_timestamps: bool = False,
timestamp_col: str = "EPOCH_TS",
size_col: str = "ORD_QTY",
side_col: str = "SIDE",
op_col: str = "EVT_ID",
price_col: str = "PRC",
ord_id_col: str = "ORD_ID",
group_cols: Union[str, List[str]] = ["SYMB"]):

if snapshot is not None:
snapshot = snapshot.j_object

return Table(_J_BookBuilder.build(source.j_object, snapshot, book_depth, batch_timestamps, timestamp_col, size_col, side_col, op_col, price_col, ord_id_col, group_cols))


def prepare_data(start_time):
# OAK: new order ack;
# CRAK: cancel replace ack;
# CC: order cancel;
# INF: internal fill;
# AWF: away market fill.
EVT_map = {"Order Ack": 1, "Cancel Replace Ack" : 2, "Cancel Order": 3, "Internal Fill": 4, "Away Market Fill": 5}

order_sample = read_csv("/tmp/data/order_sample.csv").view(["EVT_TYP", "FIRM= EPOCH_TS % 2 = 0 ? `Traders Inc` : `Big Bank Inc`", "SYMB", "EPOCH_TS", "ORD_ID=ORD_ID_PUB", "ORD_QTY=QTY", "EXEC_QTY=(int) null", "CXL_QTY=(int) null", "PRC", "SIDE"])\
.sort("EPOCH_TS")


# Align timestamps to now
now_time = start_time
order_sample_start = to_numpy(order_sample, cols=["EPOCH_TS"]).flatten()[0]
order_sample = order_sample.update_view(["NowTime = now_time.getEpochSecond()*SECOND + now_time.getNano()",
"Diff = NowTime - (long) order_sample_start",
f"EPOCH_TS_Nano = EPOCH_TS+Diff",
f"EPOCH_TS = Instant.ofEpochSecond((long) ((EPOCH_TS_Nano)/SECOND), (EPOCH_TS_Nano) % SECOND)",
# We are ignoring the actual book build for now so make everything 1...
"EVT_ID = 1"])

return order_sample

90 changes: 90 additions & 0 deletions scripts/snap_resume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
from deephaven.time import dh_today, dh_now, to_j_instant
from deephaven import parquet as dhpq
from deephaven import new_table, time_table, TableReplayer, merge, read_csv
from deephaven.table_listener import listen
from deephaven.column import datetime_col
from deephaven.numpy import to_numpy
from deephaven.execution_context import get_exec_ctx
from deephaven_enterprise.notebook import meta_import
from deephaven_enterprise import system_table_logger as stl

meta_import(db, "notebook")
from notebook.book_utils import prepare_data, build_book_with_snap


#--- Prepare some fake data ---#
trade_date = dh_today()
start_time = dh_now()
split_time = start_time.plusSeconds(30*3) #1.5 minutes
order_sample = prepare_data(start_time).update_view(f"Date = `{trade_date}`")

old_data = order_sample.where(f"EPOCH_TS < split_time")
rp = TableReplayer(start_time, split_time)
ticking_data = rp.add_table(order_sample, "EPOCH_TS")
rp.start()


#--- Run the book while saving snapshots ---#
curr_book = build_book_with_snap(ticking_data,\
snapshot = None,\
book_depth = 3).last_by("SYMB")


# Save snaps with listener.
# (10 seconds just to get some quick snaps. In practice, should be closer to ~5 minutes)
snap_freq = "PT10S"
ctx = get_exec_ctx()

trigger = (
time_table(snap_freq, None)
.rename_columns(["SnapTime = Timestamp"])
)

# Listener function triggered by the time_table
def log_table(update, is_replay):
with ctx:
# Get the snap time and last timestamp from the source data
snap_time = dh_now()
last_tick_time = to_j_instant(to_numpy(ticking_data.tail(1), cols=["EPOCH_TS"]).flatten()[0])
to_append = curr_book.update(["SnapTime = snap_time", "LastTickTime = last_tick_time"]).snapshot()
stl.log_table("BookStates", "ExBook", to_append, columnPartition=dh_today(), \
codecs={"Bid_Price" : stl.double_array_codec(), \
"Bid_Timestamp" : stl.long_array_codec(), \
"Bid_Size" : stl.int_array_codec(), \
"Ask_Price" : stl.double_array_codec(), \
"Ask_Timestamp" : stl.long_array_codec(), \
"Ask_Size" : stl.int_array_codec()})

handle = listen(trigger, log_table)

# Remove handle after a few minutes.
# Optionally view all snapshots.
handle.stop()
book_snapshots = db.live_table("BookStates", "ExBook").where("Date = today()")


#--- Resume from a certain time (adjust trade_time) ---#
trade_time = "11:23:55"
source = order_sample

def prepare_resume(trade_date, trade_time, source):
# Get first snapshot right before the given time
resume_time = to_j_instant(trade_date + "T" + trade_time + " ET")
latest_snap = db.live_table("BookStates", "ExBook").where([f"Date = `{trade_date}`", "SnapTime <= resume_time"])\
.sort("SnapTime")\
.last_by("SYMB").snapshot()

# Get the actual last tick time so we can filter the source data from then, on
last_tick_time = to_j_instant(to_numpy(latest_snap.tail(1), cols=["LastTickTime"]).flatten()[0])
source_since = source.where([f"Date = `{trade_date}`", "EPOCH_TS > last_tick_time"])

return latest_snap, source_since

old_book, new_source = prepare_resume(trade_date, trade_time, source)

# Make new book starting with old one
book = build_book_with_snap(source=new_source,\
snapshot = old_book,\
book_depth = 3).last_by("SYMB")


19 changes: 19 additions & 0 deletions scripts/with_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from deephaven.time import dh_today, dh_now, to_j_instant
from deephaven_enterprise.notebook import meta_import

meta_import(db, "notebook")
from notebook.book_utils import prepare_data, build_book_with_snap

#--- Prepare some fake data ---#
trade_date = dh_today()
start_time = dh_now()
split_time = start_time.plusSeconds(30*3) #1.5 minutes
order_sample = prepare_data(start_time).update_view(f"Date = `{trade_date}`")


#--- Run the book ---#
curr_book = build_book_with_snap(order_sample,\
snapshot = None,\
book_depth = 3,
group_cols=["FIRM", "SYMB"]).last_by(["FIRM", "SYMB"])

Loading