ParquetDataCatalog append mode missing in v1.219? #2788
-
Hello, I use ParquetDataCatalog to save real-time market data. In version 1.218, there was append mode. But in version 1.219, I cannot find it. Problem: tick1 is gone! Only tick2 in file catalog = ParquetDataCatalog("/path/to/catalog")
start_ns, end_ns = get_day_range_nanos() # Today 00:00 to 23:59
# 09:30 AM - First trade comes
catalog.write_data(data=[tick1], start=start_ns, end=end_ns)
# 09:31 AM - Second trade comes (1 minutes later)
catalog.write_data(data=[tick2], start=start_ns, end=end_ns) What I need:
Questions:
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 6 replies
-
The catalog has changed. Each write data creates a new file. There's also consolidate by period function that allows to rewrite files. Look at the documentation, I've updated it. |
Beta Was this translation helpful? Give feedback.
-
Here is an example that was found. I'm sharing it with the community. If there are any mistakes, please feel free to point them out. from os import environ
from dotenv import load_dotenv
from nautilus_trader.config import (
InstrumentProviderConfig,
LiveDataClientConfig,
TradingNodeConfig,
)
from nautilus_trader.live.node import TradingNode
from nautilus_trader.model.data import (
InstrumentStatus,
OrderBookDepth10,
TradeTick,
)
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.persistence.catalog import ParquetDataCatalog
from nautilus_trader.persistence.config import RotationMode, StreamingConfig
from pandas import Timedelta
# my custom adapter
from tw_adapter.live_market_data_client_factory import TwLiveDataClientFactory
from tw_adapter.twse_api import TwseMarket, TwseType
from tw_adapter.utils import tw_name
from tw_persist_actor import TwPersistActor, TwPersistActorConfig
def main():
load_dotenv()
instrument_ids = [
InstrumentId.from_str("1101.TW"),
InstrumentId.from_str("2330.TW"),
]
actor = TwPersistActor(TwPersistActorConfig(instrument_ids))
markets: list[TwseMarket] = ["上市", "上櫃"]
types: list[TwseType] = ["股票"]
query = f"market in {markets} and type in {types}"
node = TradingNode(
config=TradingNodeConfig(
data_clients={
tw_name: LiveDataClientConfig(
instrument_provider=InstrumentProviderConfig(
load_all=True, filters={"query": query}
)
)
},
streaming=StreamingConfig(
catalog_path=environ.get("CATALOG_PATH"),
fs_protocol="file",
include_types=[TradeTick, InstrumentStatus, OrderBookDepth10],
flush_interval_ms=1000, # Flush every second
replace_existing=False,
rotation_mode=RotationMode.INTERVAL,
rotation_interval=Timedelta(days=1),
rotation_timezone="Asia/Taipei",
max_file_size=1024 * 1024 * 100, # 100MB max file size
),
)
)
node.add_data_client_factory(tw_name, TwLiveDataClientFactory)
node.trader.add_strategy(actor)
node.build()
node.run()
catalog = ParquetDataCatalog(path=environ.get("CATALOG_PATH"))
catalog.convert_stream_to_data(
node.instance_id.value,
data_cls=TradeTick,
subdirectory="live",
)
catalog.convert_stream_to_data(
node.instance_id.value,
data_cls=InstrumentStatus,
subdirectory="live",
)
catalog.convert_stream_to_data(
node.instance_id.value,
data_cls=OrderBookDepth10,
subdirectory="live",
)
if __name__ == "__main__":
main() |
Beta Was this translation helpful? Give feedback.
The catalog has changed. Each write data creates a new file. There's also consolidate by period function that allows to rewrite files. Look at the documentation, I've updated it.