|
23 | 23 | from datetime import timedelta, timezone
|
24 | 24 |
|
25 | 25 | from arcticdb.exceptions import ArcticNativeException, SortingException
|
| 26 | +from arcticdb.version_store.processing import QueryBuilder |
26 | 27 | from arcticdb_ext.version_store import StreamDescriptorMismatch, NoSuchVersionException
|
27 | 28 |
|
28 | 29 | from arcticdb_ext.exceptions import (
|
29 | 30 | UnsortedDataException,
|
30 | 31 | InternalException,
|
31 | 32 | NormalizationException,
|
| 33 | + UserInputException, |
| 34 | + MissingDataException, |
| 35 | + SchemaException, |
32 | 36 | )
|
33 | 37 |
|
34 | 38 | from benchmarks.bi_benchmarks import assert_frame_equal
|
35 |
| -from tests.util.mark import LINUX, SLOW_TESTS_MARK |
| 39 | +from tests.util.mark import LINUX, SLOW_TESTS_MARK, WINDOWS |
36 | 40 |
|
37 | 41 |
|
38 | 42 | def add_index(df: pd.DataFrame, start_time: pd.Timestamp):
|
@@ -584,6 +588,198 @@ def check_incomplete_staged(sym: str, remove_staged: bool = True) -> None:
|
584 | 588 | check_incomplete_staged(symbol)
|
585 | 589 |
|
586 | 590 |
|
| 591 | +@pytest.mark.parametrize("dynamic_strings", [True, False]) |
| 592 | +@pytest.mark.storage |
| 593 | +def test_batch_read_and_join_scenarios(basic_store_factory, dynamic_strings): |
| 594 | + """The test covers usage of batch_read_and_join with multiple parameters and error conditions""" |
| 595 | + lib: NativeVersionStore = basic_store_factory(dynamic_strings=dynamic_strings) |
| 596 | + |
| 597 | + q = QueryBuilder() |
| 598 | + q.concat("outer") |
| 599 | + df0 = ( |
| 600 | + DFGenerator(size=20) |
| 601 | + .add_bool_col("bool") |
| 602 | + .add_float_col("A", np.float32) |
| 603 | + .add_int_col("B", np.int32) |
| 604 | + .add_int_col("C", np.uint16) |
| 605 | + .generate_dataframe() |
| 606 | + ) |
| 607 | + |
| 608 | + df0_1 = ( |
| 609 | + DFGenerator(size=20) |
| 610 | + .add_bool_col("bool") |
| 611 | + .add_float_col("A", np.float32) |
| 612 | + .add_int_col("B", np.int16) |
| 613 | + .add_int_col("C", np.uint16) |
| 614 | + .add_string_col("str", 10, include_unicode=True) |
| 615 | + .add_timestamp_col("ts") |
| 616 | + .generate_dataframe() |
| 617 | + ) |
| 618 | + |
| 619 | + df1_len = 13 |
| 620 | + df1 = ( |
| 621 | + DFGenerator(size=df1_len) |
| 622 | + .add_bool_col("bool") |
| 623 | + .add_float_col("A", np.float64) |
| 624 | + .add_float_col("B", np.float32) |
| 625 | + .add_int_col("C", np.int64) |
| 626 | + .generate_dataframe() |
| 627 | + ) |
| 628 | + |
| 629 | + df1_1 = ( |
| 630 | + DFGenerator(size=df1_len) |
| 631 | + .add_bool_col("bool") |
| 632 | + .add_string_col("A", 10) |
| 633 | + .add_int_col("B", np.int64) |
| 634 | + .generate_dataframe() |
| 635 | + ) |
| 636 | + |
| 637 | + lib.write("symbol0", df0) |
| 638 | + lib.write("symbol1", df1) |
| 639 | + |
| 640 | + # Concatenate multiple times |
| 641 | + data: pd.DataFrame = lib.batch_read_and_join(["symbol0", "symbol1", "symbol0", "symbol1"], query_builder=q).data |
| 642 | + expected = pd.concat([df0, df1, df0, df1], ignore_index=True) |
| 643 | + assert_frame_equal(expected, data) |
| 644 | + |
| 645 | + # Concatenate with error QB |
| 646 | + with pytest.raises(UserInputException): |
| 647 | + data: pd.DataFrame = lib.batch_read_and_join( |
| 648 | + ["symbol0", "symbol1"], query_builder=QueryBuilder(), as_ofs=[0, 0] |
| 649 | + ).data |
| 650 | + |
| 651 | + # Concatenate with column filter |
| 652 | + data: pd.DataFrame = lib.batch_read_and_join( |
| 653 | + ["symbol0", "symbol1"], query_builder=q, columns=[["A", "C", "none"], None] |
| 654 | + ).data |
| 655 | + df0_subset = df0[["A", "C"]] |
| 656 | + expected = pd.concat([df0_subset, df1], ignore_index=True) |
| 657 | + # Pandas concat will fill NaN for bools, Arcticdb is using False |
| 658 | + expected["bool"] = expected["bool"].fillna(False) |
| 659 | + assert_frame_equal(expected, data) |
| 660 | + |
| 661 | + # Concatenate symbols with column filters + row range |
| 662 | + # row range limit is beyond the len of data |
| 663 | + data: pd.DataFrame = lib.batch_read_and_join( |
| 664 | + ["symbol0", "symbol1"], query_builder=q, columns=[["A"], ["B"]], row_ranges=[(2, 3), (10, df1_len + 2)] |
| 665 | + ).data |
| 666 | + df0_subset = df0.loc[2:2, ["A"]] |
| 667 | + df1_subset = df1.loc[10:df1_len, ["B"]] |
| 668 | + expected = pd.concat([df0_subset, df1_subset], ignore_index=True) |
| 669 | + assert_frame_equal(expected, data) |
| 670 | + |
| 671 | + lib.write("symbol0", df0_1) |
| 672 | + lib.write("symbol1", df1_1) |
| 673 | + |
| 674 | + # Concatenate with wrong schema |
| 675 | + with pytest.raises(SchemaException): |
| 676 | + data: pd.DataFrame = lib.batch_read_and_join(["symbol0", "symbol1"], as_ofs=[1, 1], query_builder=q) |
| 677 | + |
| 678 | + # Concatenate symbols with column filters + row range |
| 679 | + # row range limit is beyond the len of data |
| 680 | + data: pd.DataFrame = lib.batch_read_and_join( |
| 681 | + ["symbol0", "symbol1", "symbol0"], |
| 682 | + as_ofs=[0, 0, 1], |
| 683 | + query_builder=q, |
| 684 | + columns=[None, ["B", "C"], None], |
| 685 | + row_ranges=[(2, 3), None, None], |
| 686 | + ).data |
| 687 | + df0_subset = df0.loc[2:2] |
| 688 | + df1_subset = df1[["B", "C"]] |
| 689 | + expected = pd.concat([df0_subset, df1_subset, df0_1], ignore_index=True) |
| 690 | + # Pandas concat will fill NaN for bools, Arcticdb is using False |
| 691 | + expected["bool"] = expected["bool"].fillna(False) |
| 692 | + # Pandas concat will fill NaN for strings, Arcticdb is using None |
| 693 | + if not dynamic_strings: |
| 694 | + # make expected result like the actual due to static string |
| 695 | + if not WINDOWS: |
| 696 | + # windows does not have static strings |
| 697 | + expected["str"] = expected["str"].fillna("") |
| 698 | + assert_frame_equal(expected, data) |
| 699 | + |
| 700 | + # Cover query builders per symbols |
| 701 | + q0 = QueryBuilder() |
| 702 | + q0 = q0[q0["A"] == 123.58743343] |
| 703 | + q1 = QueryBuilder() |
| 704 | + q1 = q1[q1["B"] == -3483.123434343] |
| 705 | + data: pd.DataFrame = lib.batch_read_and_join( |
| 706 | + ["symbol0", "symbol1"], as_ofs=[0, 0], query_builder=q, per_symbol_query_builders=[q0, q1] |
| 707 | + ).data |
| 708 | + assert len(data) == 0 # Nothing is selected |
| 709 | + data: pd.DataFrame = lib.batch_read_and_join( |
| 710 | + ["symbol0", "symbol1"], as_ofs=[0, 0], query_builder=q, per_symbol_query_builders=[q0, None] |
| 711 | + ).data |
| 712 | + assert_frame_equal(df1, data) |
| 713 | + |
| 714 | + |
| 715 | +@pytest.mark.xfail(True, reason="When non-existing symbol is used, MissingDataException is not raised 18023146743") |
| 716 | +def test_batch_read_and_join_scenarios_errors(basic_store): |
| 717 | + lib: NativeVersionStore = basic_store |
| 718 | + |
| 719 | + q = QueryBuilder() |
| 720 | + q.concat("outer") |
| 721 | + df0 = DFGenerator(size=20).add_bool_col("bool").generate_dataframe() |
| 722 | + |
| 723 | + lib.write("symbol0", df0) |
| 724 | + |
| 725 | + # Concatenate with missing symbol |
| 726 | + with pytest.raises(MissingDataException): |
| 727 | + data: pd.DataFrame = lib.batch_read_and_join(["symbol0", "symbol2"], query_builder=q).data |
| 728 | + |
| 729 | + |
| 730 | +@pytest.mark.storage |
| 731 | +@pytest.mark.xfail(True, reason="Filtering of columns does not work for dynamic schema 18023047637") |
| 732 | +def test_batch_read_and_join_scenarios_dynamic_schema_filtering_error(lmdb_version_store_dynamic_schema_v1): |
| 733 | + lib: NativeVersionStore = lmdb_version_store_dynamic_schema_v1 |
| 734 | + |
| 735 | + q = QueryBuilder() |
| 736 | + q.concat("outer") |
| 737 | + df0 = ( |
| 738 | + DFGenerator(size=20) |
| 739 | + .add_bool_col("bool") |
| 740 | + .add_float_col("A", np.float32) |
| 741 | + .add_int_col("B", np.int32) |
| 742 | + .add_int_col("C", np.uint16) |
| 743 | + .generate_dataframe() |
| 744 | + ) |
| 745 | + |
| 746 | + df1_len = 13 |
| 747 | + df1 = ( |
| 748 | + DFGenerator(size=df1_len) |
| 749 | + .add_bool_col("bool") |
| 750 | + .add_float_col("A", np.float64) |
| 751 | + .add_float_col("B", np.float32) |
| 752 | + .add_int_col("C", np.int64) |
| 753 | + .generate_dataframe() |
| 754 | + ) |
| 755 | + |
| 756 | + lib.write("symbol0", df0) |
| 757 | + lib.write("symbol1", df1) |
| 758 | + |
| 759 | + # Concatenate with column filter |
| 760 | + data: pd.DataFrame = lib.batch_read_and_join( |
| 761 | + ["symbol0", "symbol1"], query_builder=q, columns=[["A", "C", "none"], None] |
| 762 | + ).data |
| 763 | + df0_subset = df0[["A", "C"]] |
| 764 | + expected = pd.concat([df0_subset, df1], ignore_index=True) |
| 765 | + # Pandas concat will fill NaN for bools, Arcticdb is using False |
| 766 | + expected["bool"] = expected["bool"].fillna(False) |
| 767 | + ## ERROR: With dynamic schema filtering of the columns will fail |
| 768 | + # here in the 'data' df instead of None/Na values for first 19 rows for |
| 769 | + # bool and B column we will see values, which should not have been there |
| 770 | + # If this was static schema - ie 'basic_store' fixture all would be fine |
| 771 | + assert_frame_equal(expected, data) |
| 772 | + |
| 773 | + data: pd.DataFrame = lib.batch_read_and_join( |
| 774 | + ["symbol0", "symbol1"], query_builder=q, columns=[["A"], ["B"]], row_ranges=[(2, 3), (10, df1_len + 2)] |
| 775 | + ).data |
| 776 | + df0_subset = df0.loc[2:2, ["A"]] |
| 777 | + df1_subset = df1.loc[10:df1_len, ["B"]] |
| 778 | + expected = pd.concat([df0_subset, df1_subset], ignore_index=True) |
| 779 | + # ERROR - here we observe that -/+ inf is added for int column "A" |
| 780 | + assert_frame_equal(expected, data) |
| 781 | + |
| 782 | + |
587 | 783 | def test_add_to_snapshot_and_remove_from_snapshots_scenarios(basic_store):
|
588 | 784 | lib: NativeVersionStore = basic_store
|
589 | 785 | lib.write("s1", 100)
|
|
0 commit comments