Skip to content

Commit 3f72ac5

Browse files
authored
Use consistent ordering in T2Compute.ingest and StateT2Dependency.link_override (#333)
* Ensure that read and write roles share a client (for mongomock) See: AmpelProject/pytest-ampel-core@5fdeeb9 * Add a failing test to demonstrate sort instability * Replace confusing variable name * Present datapoints in the same order as they are stored in the T1 doc * Bump version
1 parent 33ed01a commit 3f72ac5

File tree

4 files changed

+107
-6
lines changed

4 files changed

+107
-6
lines changed

ampel/ingest/ChainedIngestionHandler.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -622,12 +622,15 @@ def ingest_point_t2s(self,
622622
stock_id: StockId,
623623
channel: ChannelId,
624624
ttl: None | timedelta,
625-
state_t2: list[T2Block],
625+
point_t2: list[T2Block],
626626
add_other_tag: None | MetaActivity = None,
627627
meta_extra: None | dict[str, Any] = None
628628
) -> None:
629+
# Present datapoints in the same order as they are stored in the T1 document
630+
if self.t1_compiler.sort:
631+
dps = sorted(dps, key=lambda x: x["id"])
629632

630-
for t2b in state_t2:
633+
for t2b in point_t2:
631634

632635
# Filter group selection / veto
633636
if t2b.group and isinstance(fres, int) and fres not in t2b.group:

ampel/test/test-data/testing-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ mongo:
7272
indexes:
7373
- field: process
7474
role:
75-
r: logger
75+
r: writer
7676
w: writer
7777
prefix: AmpelTest
7878
mongo_options:

ampel/test/test_IngestionHandler.py

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,12 @@
3434
DummyPointT2Unit,
3535
DummyStateT2Unit,
3636
DummyStockT2Unit,
37+
DummyTiedStateT2Unit,
3738
)
39+
from ampel.config.alter.HashT2Config import HashT2Config
3840
from ampel.util.freeze import recursive_unfreeze
41+
from ampel.t2.T2Worker import T2Worker
42+
from ampel.model.DPSelection import DPSelection
3943

4044

4145
@pytest.fixture
@@ -111,6 +115,16 @@ def get_handler(
111115
# need to disable provenance for dynamically registered unit
112116
_provenance=False,
113117
)
118+
# hash t2 configs
119+
if directives:
120+
directives = [
121+
IngestDirective(**d)
122+
for d in HashT2Config().alter(
123+
context,
124+
{"directives": [d.model_dump(exclude_unset=True) for d in directives]},
125+
logger,
126+
)["directives"]
127+
]
114128
return ChainedIngestionHandler(
115129
context=context,
116130
logger=logger,
@@ -132,7 +146,7 @@ def test_no_directive(dev_context):
132146
@pytest.fixture
133147
def datapoints() -> list[DataPoint]:
134148
return [
135-
{"id": i, "stock": "stockystock", "body": {"thing": i}} # type: ignore[typeddict-item]
149+
{"id": i, "stock": "stockystock", "body": {"thing": i, "constant": 11}} # type: ignore[typeddict-item]
136150
for i in range(3)
137151
]
138152

@@ -271,7 +285,9 @@ def __exit__(self, exc_type, exc_value, traceback) -> None:
271285
handler.ingest(datapoints, [(0, True)], stock_id="stockystock")
272286
assert len(items := handler.ingester._producer.items) == 1
273287
assert acknowledge_callback.call_count == 1, "acknowledge callback called"
274-
assert list(acknowledge_callback.call_args[0][0]) == [sentinel], "callback payload contains sentinel"
288+
assert list(acknowledge_callback.call_args[0][0]) == [sentinel], (
289+
"callback payload contains sentinel"
290+
)
275291
with handler.ingester.group():
276292
handler.ingest(datapoints, [(0, True)], stock_id="stockystock")
277293
assert len(items := handler.ingester._producer.items) == 2, (
@@ -565,3 +581,85 @@ def get_expire_time(dp: DataPoint | T1Document | T2Document):
565581
f"ttl updated for dp {dp['id']}"
566582
)
567583
assert len(dp["meta"]) == 1, "no extra meta entries added"
584+
585+
586+
@pytest.mark.usefixtures("_dummy_units")
587+
def test_link_override_order(
588+
mock_context: DevAmpelContext,
589+
datapoints: list[DataPoint],
590+
mocker: MockFixture,
591+
):
592+
"""
593+
T2Compute.ingest and T2StateDependency.link_override select the same
594+
document when sorted on a non-unique field, because a) list.sort is stable,
595+
and b) ChainedIngestionHandler and T2Worker see datapoints in the same
596+
order.
597+
"""
598+
for unit in (DummyStateT2Unit, DummyTiedStateT2Unit):
599+
mock_context.register_unit(unit)
600+
601+
directives = [
602+
IngestDirective(
603+
channel="TEST_CHANNEL",
604+
ingest=IngestBody(
605+
point_t2=[
606+
T2Compute(
607+
unit="DummyPointT2Unit",
608+
# sort on a non-unique field to test stability
609+
ingest=DPSelection(sort="constant", select="last"),
610+
)
611+
],
612+
combine=[
613+
T1Combine(
614+
unit="T1SimpleCombiner",
615+
state_t2=[
616+
T2Compute(
617+
unit="DummyTiedStateT2Unit",
618+
config={
619+
"t2_dependency": [
620+
{
621+
"unit": "DummyPointT2Unit",
622+
"link_override": {
623+
"sort": "constant",
624+
"select": "last",
625+
},
626+
}
627+
]
628+
},
629+
)
630+
],
631+
)
632+
],
633+
),
634+
),
635+
]
636+
637+
handler = get_handler(mock_context, directives)
638+
assert isinstance(handler.ingester, MongoIngester)
639+
datapoints_reversed = list(reversed(datapoints))
640+
handler.ingest(
641+
datapoints_reversed, # type: ignore[arg-type]
642+
[(0, True)],
643+
stock_id="stockystock",
644+
)
645+
handler.ingester._updates_buffer.push_updates()
646+
647+
col_t1 = mock_context.db.get_collection("t1")
648+
t1_doc = col_t1.find_one({"stock": "stockystock"})
649+
assert t1_doc is not None
650+
assert t1_doc["dps"] != [dp["id"] for dp in datapoints_reversed], (
651+
"T1 doc does not preserve input order"
652+
)
653+
col_t2 = mock_context.db.get_collection("t2")
654+
655+
assert col_t2.find_one({"unit": "DummyTiedStateT2Unit", "code": DocumentCode.NEW})
656+
657+
T2Worker(
658+
context=mock_context,
659+
process_name="test",
660+
raise_exc=True,
661+
).run()
662+
663+
assert (
664+
col_t2.find_one({"unit": "DummyTiedStateT2Unit"})["code"] == DocumentCode.OK # type: ignore[index]
665+
), "Dependencies were found and processed"

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "ampel-core"
3-
version = "0.10.6a21"
3+
version = "0.10.6a22"
44
description = "Alice in Modular Provenance-Enabled Land"
55
authors = ["Valery Brinnel"]
66
maintainers = ["Jakob van Santen <jakob.van.santen@desy.de>"]

0 commit comments

Comments
 (0)