Skip to content

Commit 67619f2

Browse files
committed
FIX: Live client stream flush on timeout
1 parent cc3f04d commit 67619f2

File tree

3 files changed

+71
-4
lines changed

3 files changed

+71
-4
lines changed

CHANGELOG.md

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
# Changelog
22

3-
### 0.24.0 - TBD
3+
## 0.24.0 - TBD
44

5-
##### Enhancements
5+
#### Enhancements
66
- Added new publishers for consolidated DBEQ.BASIC and DBEQ.PLUS
77

8+
#### Bug fixes
9+
- Fixed an issue where `Live.block_for_close` and `Live.wait_for_close` would not flush streams if the timeout was reached.
10+
811
## 0.23.0 - 2023-10-26
912

1013
#### Enhancements

databento/live/client.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,7 @@ def terminate(self) -> None:
498498
if self._session is None:
499499
raise ValueError("cannot terminate a live client before it is connected")
500500
self._session.abort()
501+
self._cleanup_client()
501502

502503
def block_for_close(
503504
self,
@@ -539,6 +540,8 @@ def block_for_close(
539540
raise
540541
except Exception:
541542
raise BentoError("connection lost") from None
543+
finally:
544+
self._cleanup_client()
542545

543546
async def wait_for_close(
544547
self,
@@ -581,9 +584,13 @@ async def wait_for_close(
581584
self.terminate()
582585
if isinstance(exc, KeyboardInterrupt):
583586
raise
587+
except BentoError:
588+
raise
584589
except Exception:
585590
logger.exception("exception encountered waiting for close")
586591
raise BentoError("connection lost") from None
592+
finally:
593+
self._cleanup_client()
587594

588595
async def _shutdown(self) -> None:
589596
"""
@@ -597,6 +604,12 @@ async def _shutdown(self) -> None:
597604
return
598605
await self._session.wait_for_close()
599606

607+
def _cleanup_client(self) -> None:
608+
"""
609+
Cleanup any stateful client data.
610+
"""
611+
self._symbology_map.clear()
612+
600613
to_remove = []
601614
for stream in self._user_streams:
602615
stream_name = getattr(stream, "name", str(stream))
@@ -609,8 +622,6 @@ async def _shutdown(self) -> None:
609622
for key in to_remove:
610623
self._user_streams.pop(key)
611624

612-
self._symbology_map.clear()
613-
614625
def _map_symbol(self, record: DBNRecord) -> None:
615626
if isinstance(record, databento_dbn.SymbolMappingMsg):
616627
out_symbol = record.stype_out_symbol

tests/test_live_client.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,31 @@ def test_live_block_for_close_timeout(
525525
live_client.terminate.assert_called_once() # type: ignore
526526

527527

528+
@pytest.mark.usefixtures("mock_live_server")
529+
def test_live_block_for_close_timeout_stream(
530+
live_client: client.Live,
531+
monkeypatch: pytest.MonkeyPatch,
532+
tmp_path: pathlib.Path,
533+
) -> None:
534+
"""
535+
Test that block_for_close flushes user streams on timeout.
536+
"""
537+
live_client.subscribe(
538+
dataset=Dataset.GLBX_MDP3,
539+
schema=Schema.MBO,
540+
stype_in=SType.INSTRUMENT_ID,
541+
symbols="ALL_SYMBOLS",
542+
start=None,
543+
)
544+
path = tmp_path / "test.dbn"
545+
stream = path.open("wb")
546+
monkeypatch.setattr(stream, "flush", MagicMock())
547+
live_client.add_stream(stream)
548+
549+
live_client.block_for_close(timeout=0)
550+
stream.flush.assert_called() # type: ignore [attr-defined]
551+
552+
528553
@pytest.mark.usefixtures("mock_live_server")
529554
async def test_live_wait_for_close(
530555
live_client: client.Live,
@@ -571,6 +596,32 @@ async def test_live_wait_for_close_timeout(
571596
live_client.terminate.assert_called_once() # type: ignore
572597

573598

599+
@pytest.mark.usefixtures("mock_live_server")
600+
async def test_live_wait_for_close_timeout_stream(
601+
live_client: client.Live,
602+
monkeypatch: pytest.MonkeyPatch,
603+
tmp_path: pathlib.Path,
604+
) -> None:
605+
"""
606+
Test that wait_for_close flushes user streams on timeout.
607+
"""
608+
live_client.subscribe(
609+
dataset=Dataset.GLBX_MDP3,
610+
schema=Schema.MBO,
611+
stype_in=SType.INSTRUMENT_ID,
612+
symbols="ALL_SYMBOLS",
613+
start=None,
614+
)
615+
616+
path = tmp_path / "test.dbn"
617+
stream = path.open("wb")
618+
monkeypatch.setattr(stream, "flush", MagicMock())
619+
live_client.add_stream(stream)
620+
621+
await live_client.wait_for_close(timeout=0)
622+
stream.flush.assert_called() # type: ignore [attr-defined]
623+
624+
574625
def test_live_add_callback(
575626
live_client: client.Live,
576627
) -> None:
@@ -615,6 +666,7 @@ def test_live_add_stream_invalid(
615666
with pytest.raises(ValueError):
616667
live_client.add_stream(readable_file.open(mode="rb"))
617668

669+
618670
def test_live_add_stream_path_directory(
619671
tmp_path: pathlib.Path,
620672
live_client: client.Live,
@@ -625,6 +677,7 @@ def test_live_add_stream_path_directory(
625677
with pytest.raises(OSError):
626678
live_client.add_stream(tmp_path)
627679

680+
628681
@pytest.mark.skipif(platform.system() == "Windows", reason="flaky on windows runner")
629682
async def test_live_async_iteration(
630683
live_client: client.Live,

0 commit comments

Comments
 (0)