Skip to content

Commit 0fdae0c

Browse files
KamilPiechowiakManul from Pathway
authored andcommitted
better error handling in ix (#7704)
GitOrigin-RevId: a689209bd8ad20c929821506f415f24217a142af
1 parent 486de29 commit 0fdae0c

File tree

3 files changed

+81
-43
lines changed

3 files changed

+81
-43
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
77

88
### Changed
99
- values of non-deterministic UDFs are not stored in tables that are `append_only`.
10+
- `pw.Table.ix` has better runtime error message that includes id of the missing row.
1011

1112
### Fixed
1213
- temporal behaviors in temporal operators (`windowby`, `interval_join`) now consume no CPU when no data passes through them.

python/pathway/tests/test_errors.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,49 @@ def test_with_universe_of():
617617
)
618618

619619

620+
def test_ix():
621+
t1 = pw.debug.table_from_markdown(
622+
"""
623+
| a
624+
1 | 1
625+
2 | 3
626+
3 | 2
627+
4 | 2
628+
"""
629+
).with_columns(ap=pw.this.pointer_from(pw.this.a))
630+
631+
t2 = pw.debug.table_from_markdown(
632+
"""
633+
| c
634+
1 | 10
635+
2 | 13
636+
"""
637+
)
638+
res = t1.select(pw.this.a, c=t2.ix(pw.this.ap).c)
639+
res = res.select(pw.this.a, c=pw.fill_error(res.c, -1))
640+
expected = pw.debug.table_from_markdown(
641+
"""
642+
| a | c
643+
1 | 1 | 10
644+
2 | 3 | -1
645+
3 | 2 | 13
646+
5 | 2 | 13
647+
"""
648+
)
649+
expected_errors = T(
650+
"""
651+
message
652+
key missing in output table: ^Z3QWT294JQSHPSR8KTPG9ECE4W
653+
""",
654+
split_on_whitespace=False,
655+
)
656+
assert_table_equality_wo_index(
657+
(res, pw.global_error_log().select(pw.this.message)),
658+
(expected, expected_errors),
659+
terminate_on_error=False,
660+
)
661+
662+
620663
def test_remove_errors():
621664
t1 = T(
622665
"""

src/engine/dataflow.rs

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,24 +1021,6 @@ impl<S: MaybeTotalScope> DataflowGraphInner<S> {
10211021
self.static_column(universe_handle, Vec::new(), column_properties)
10221022
}
10231023

1024-
#[track_caller]
1025-
fn assert_collections_same_size<V1: Data, V2: Data>(
1026-
&self,
1027-
left: &Collection<S, V1>,
1028-
right: &Collection<S, V2>,
1029-
) -> Result<()> {
1030-
let error_logger = self.create_error_logger()?;
1031-
left.map_named("assert_collections_same_size::left", |_| ())
1032-
.negate()
1033-
.concat(&right.map_named("assert_collections_same_size::right", |_| ()))
1034-
.consolidate()
1035-
.inspect(move |(_data, _time, diff)| {
1036-
assert_ne!(diff, &0);
1037-
error_logger.log_error(DataError::ValueMissing);
1038-
});
1039-
Ok(())
1040-
}
1041-
10421024
#[track_caller]
10431025
fn assert_input_keys_match_output_keys(
10441026
&self,
@@ -1064,6 +1046,38 @@ impl<S: MaybeTotalScope> DataflowGraphInner<S> {
10641046
Ok(())
10651047
}
10661048

1049+
fn make_output_keys_match_input_keys(
1050+
&self,
1051+
input_values: &Values<S>,
1052+
output_collection: &Collection<S, (Key, Value)>,
1053+
) -> Result<Collection<S, (Key, Value)>> {
1054+
let leftover_values = input_values.concat(
1055+
&output_collection
1056+
.map_named(
1057+
"restrict_or_override_table_universe::compare",
1058+
|(key, values)| {
1059+
(
1060+
key,
1061+
values.as_tuple().expect("values should be a tuple")[0].clone(),
1062+
)
1063+
},
1064+
)
1065+
.distinct()
1066+
.negate(),
1067+
);
1068+
let error_logger = self.create_error_logger()?;
1069+
1070+
Ok(
1071+
output_collection.concat(&leftover_values.consolidate().map_named(
1072+
"restrict_or_override_table_universe::fill",
1073+
move |(key, new_values)| {
1074+
error_logger.log_error(DataError::KeyMissingInOutputTable(key));
1075+
(key, Value::from([new_values, Value::Error].as_slice()))
1076+
},
1077+
)),
1078+
)
1079+
}
1080+
10671081
fn static_universe(&mut self, keys: Vec<Key>) -> Result<UniverseHandle> {
10681082
let worker_count = self.scope.peers();
10691083
let worker_index = self.scope.index();
@@ -1701,29 +1715,7 @@ impl<S: MaybeTotalScope> DataflowGraphInner<S> {
17011715
))
17021716
},
17031717
);
1704-
let leftover_values = new_table.values().concat(
1705-
&result
1706-
.map_named(
1707-
"restrict_or_override_table_universe::compare",
1708-
|(key, values)| {
1709-
(
1710-
key,
1711-
values.as_tuple().expect("values should be a tuple")[0].clone(),
1712-
)
1713-
},
1714-
)
1715-
.distinct()
1716-
.negate(),
1717-
);
1718-
let error_logger = self.create_error_logger()?;
1719-
1720-
let result = result.concat(&leftover_values.consolidate().map_named(
1721-
"restrict_or_override_table_universe::fill",
1722-
move |(key, new_values)| {
1723-
error_logger.log_error(DataError::KeyMissingInOutputTable(key));
1724-
(key, Value::from([new_values, Value::Error].as_slice()))
1725-
},
1726-
));
1718+
let result = self.make_output_keys_match_input_keys(new_table.values(), &result)?;
17271719

17281720
if !self.ignore_asserts && same_universes {
17291721
self.assert_input_keys_match_output_keys(original_table.keys(), &result)?;
@@ -2249,8 +2241,10 @@ impl<S: MaybeTotalScope> DataflowGraphInner<S> {
22492241
}
22502242
_ => new_table,
22512243
};
2252-
if !self.ignore_asserts && ix_key_policy != IxKeyPolicy::SkipMissing {
2253-
self.assert_collections_same_size(key_table.values(), &new_table)?;
2244+
let new_table = if ix_key_policy == IxKeyPolicy::SkipMissing {
2245+
new_table
2246+
} else {
2247+
self.make_output_keys_match_input_keys(key_table.values(), &new_table)?
22542248
};
22552249

22562250
Ok(self

0 commit comments

Comments
 (0)