Skip to content

Parallel historic and live stream leads to live data loss #202

@martinschlueter91

Description

@martinschlueter91

What happened?

The basis for our trading algorithm is an in memory record of the public order book. When starting the trading, the order book must be initialized with historic order updates. However, if we first download the historic orders and then start the live stream after the historic orders are downloaded, we will miss live order updates while waiting for the historic stream to finish. Thus, we implemented this setup:

  1. Start the live stream
  2. extract the timestamp from the first live order and start the historic stream
  3. the historic stream then goes from 15.00 german time yesterday to the timestamp from step 2.
  4. Historic and live stream now run in parallel

However, the live stream does not work correctly when running in parallel to the historic stream. I have replicated our initialization setup in the standalone script below. I also attached two log outputs from the script. You can see in the logs that when a big chunk of historic orders is interrupting the live stream, that there is a jump in the timestamps of the live orders. I.e., live data is getting lost somewhere. In log 1 we see a big jump from 11:14:04.692 to 11:16:33.427 and in log 2 we see a smaller jump from 11:31:56.560 to 11:32:06.159. Scroll down in the logs to the big chunks of historic data, starting line 3496 in log 1 and line 5214 in log 2, to see the described jumps in the live data timestamps.

Also, while studying the logs I found that the live timestamps are not completely monotonic. Not sure if it is related to the other problem, or a general problem, but wanted to report here anyway, because one can see it in the attached logs. Here is an example from the beginning of log 1:

2025-10-16 11:13:16.814 [INFO] Live order with update time 2025-10-16 11:13:16.789000+02:00: Order #10 - Batch #10, Batch Position 0 - ID: 969810265
2025-10-16 11:13:16.814 [INFO] Live order with update time 2025-10-16 11:13:12.410000+02:00: Order #11 - Batch #11, Batch Position 0 - ID: 1104236509
2025-10-16 11:13:16.814 [INFO] Live order with update time 2025-10-16 11:13:16.790000+02:00: Order #12 - Batch #12, Batch Position 0 - ID: 969862280

order_book_init_test_1.log

order_book_init_test_2.log

import asyncio
import logging
import os
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

import frequenz.client.electricity_trading as fcet
from dotenv import load_dotenv


def setup_logger() -> logging.Logger:
    """Set up a logger with timestamp formatting that writes to a file.

    Returns:
        Configured logger instance.
    """
    logger = logging.getLogger("order_book_init_test")
    logger.setLevel(logging.INFO)

    handler = logging.FileHandler("order_book_init_test.log", mode="w")
    formatter = logging.Formatter("%(asctime)s.%(msecs)03d [%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
    handler.setFormatter(formatter)
    logger.addHandler(handler)

    return logger


async def init_historic_order_book(
    client: fcet.Client,
    delivery_area: fcet.DeliveryArea,
    end_time: datetime,
    logger: logging.Logger,
    historic_complete_event: asyncio.Event,
) -> None:
    """Load historic orders from yesterday 15:00 until end_time.

    Args:
        client: Frequenz trading client instance.
        delivery_area: Delivery area to filter orders.
        end_time: End time for historic data (timestamp of first live order).
        logger: Logger instance for output.
        historic_complete_event: Event to signal when historic stream is complete.
    """
    # Calculate start time (yesterday 15:00 German time)
    now = datetime.now(ZoneInfo("Europe/Berlin"))
    yesterday_market_open = datetime(
        year=now.year,
        month=now.month,
        day=now.day,
        hour=15,
        minute=0,
        tzinfo=ZoneInfo("Europe/Berlin"),
    ) - timedelta(days=1)

    logger.info(
        "Historic stream: Starting from %s to %s",
        yesterday_market_open,
        end_time.astimezone(ZoneInfo("Europe/Berlin")),
    )

    historic_stream = client.receive_public_order_book(
        delivery_area=delivery_area,
        start_time=yesterday_market_open.astimezone(ZoneInfo("UTC")),
        end_time=end_time.astimezone(ZoneInfo("UTC")),
    )

    batch_count = 0
    order_count = 0

    async for batch in historic_stream.new_receiver(maxsize=100_000):
        batch_count += 1
        batch_size = len(batch)
        order_count += batch_size
        logger.info(
            "Historic stream: Received batch #%d with %d orders (total: %d)",
            batch_count,
            batch_size,
            order_count,
        )

    logger.info("Historic stream: Completed - %d batches, %d total orders", batch_count, order_count)
    historic_complete_event.set()


async def stream_live_orders(
    client: fcet.Client,
    delivery_area: fcet.DeliveryArea,
    task_group: asyncio.TaskGroup,
    logger: logging.Logger,
    historic_complete_event: asyncio.Event,
) -> None:
    """Stream live orders and launch historic stream after first order.

    Args:
        client: Frequenz trading client instance.
        delivery_area: Delivery area to filter orders.
        task_group: Parent TaskGroup to which historic stream task will be attached.
        logger: Logger instance for output.
        historic_complete_event: Event to wait for historic stream completion.
    """
    logger.info("Live stream: Starting...")

    live_stream = client.receive_public_order_book(delivery_area=delivery_area)
    stream = live_stream.new_receiver(maxsize=100_000)

    # Get first batch
    first_batch = await stream.__aiter__().__anext__()
    first_order = first_batch[0]
    first_order_timestamp = first_order.update_time

    # Start historic stream as background task in the same task group
    task_group.create_task(
        init_historic_order_book(
            client=client,
            delivery_area=delivery_area,
            end_time=first_order_timestamp,
            logger=logger,
            historic_complete_event=historic_complete_event,
        )
    )

    # Process first batch
    batch_count = 1
    order_count = 0
    for batch_position, order in enumerate(first_batch):
        order_count += 1
        logger.info(
            "Live order with update time %s: Order #%d - Batch #%d, Batch Position %d - ID: %s",
            order.update_time.astimezone(ZoneInfo("Europe/Berlin")),
            order_count,
            batch_count,
            batch_position,
            order.public_order_id,
        )

    # Continue live stream until historic stream is complete
    async for batch in stream:
        batch_count += 1
        for batch_position, order in enumerate(batch):
            order_count += 1
            logger.info(
                "Live order with update time %s: Order #%d - Batch #%d, Batch Position %d - ID: %s",
                order.update_time.astimezone(ZoneInfo("Europe/Berlin")),
                order_count,
                batch_count,
                batch_position,
                order.public_order_id,
            )

        if historic_complete_event.is_set():
            logger.info("Live stream: Historic stream completed, stopping live stream...")
            break

    logger.info("Live stream: Completed - %d batches, %d total orders", batch_count, order_count)


async def main() -> None:
    """Main entry point for the order book initialization test.

    1. TaskGroup is created
    2. Live stream task is started with the task_group passed to it
    3. Live stream gets first order and starts historic stream in the same task_group
    4. Both streams run in parallel
    5. Live stream continues until historic stream completes
    """
    load_dotenv()

    api_key = os.getenv("FREQUENZ_API_KEY")
    if api_key is None:
        raise ValueError("FREQUENZ_API_KEY environment variable is not set")

    trading_url = os.getenv("FREQUENZ_TRADING_TEST_URL")
    if trading_url is None:
        raise ValueError("FREQUENZ_TRADING_TEST_URL environment variable is not set")

    print("Logs will be written to: order_book_init_test.log")

    logger = setup_logger()
    logger.info("Initializing Frequenz trading client...")

    client = fcet.Client(server_url=trading_url, auth_key=api_key)
    delivery_area = fcet.DeliveryArea(
        code="10YDE-RWENET---I",
        code_type=fcet.EnergyMarketCodeType.EUROPE_EIC,
    )

    try:
        logger.info("=" * 80)
        logger.info("Starting parallel order book initialization test")
        logger.info("=" * 80)

        historic_complete_event = asyncio.Event()

        async with asyncio.TaskGroup() as task_group:
            task_group.create_task(
                stream_live_orders(
                    client=client,
                    delivery_area=delivery_area,
                    task_group=task_group,
                    logger=logger,
                    historic_complete_event=historic_complete_event,
                )
            )

        logger.info("=" * 80)
        logger.info("Parallel streaming test completed")
        logger.info("=" * 80)

        print("Test completed. Check order_book_init_test.log for results.")
    except Exception as e:
        logger.error("Error during streaming: %s", e, exc_info=True)
        print("Test failed with error. Check order_book_init_test.log for details.")
        raise


if __name__ == "__main__":
    asyncio.run(main())

What did you expect instead?

No data loss in the live stream

Affected version(s)

0.12.2

Affected part(s)

I don't know (part:❓)

Extra information

No response

Metadata

Metadata

Labels

priority:❓We need to figure out how soon this should be addressedtype:bugSomething isn't working

Type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions