Skip to content

Commit ff581d4

Browse files
committed
formatting
1 parent 0230241 commit ff581d4

File tree

2 files changed

+217
-49
lines changed

2 files changed

+217
-49
lines changed

sygra/utils/constants.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,10 +105,14 @@
105105
DATASET_JOIN_TYPE = "join_type"
106106
PRIMARY_KEY = "primary_key"
107107
JOIN_KEY = "join_key"
108-
JOIN_TYPE_VSTACK = "vstack" # verticle stacking with common columns, variables will not have alias prefix and sink should be single
108+
JOIN_TYPE_VSTACK = "vstack" # verticle stacking with common columns, variables will not have alias prefix and sink should be single
109109
# below all are for horizontal concat
110-
JOIN_TYPE_PRIMARY = "primary" # when joining horizontally, this dataset will be primary
111-
JOIN_TYPE_SEQUENTIAL = "sequential" # merge column sequentially from secondary, if less rotate to index 0
112-
JOIN_TYPE_RANDOM = "random" # pick random and join at each primary dataset record in horizontal way(add column)
113-
JOIN_TYPE_CROSS = "cross" # Each primary will join the secondary record(MxN)
114-
JOIN_TYPE_COLUMN = "column" # join like RDBMS column based inner join
110+
JOIN_TYPE_PRIMARY = "primary" # when joining horizontally, this dataset will be primary
111+
JOIN_TYPE_SEQUENTIAL = (
112+
"sequential" # merge column sequentially from secondary, if less rotate to index 0
113+
)
114+
JOIN_TYPE_RANDOM = (
115+
"random" # pick random and join at each primary dataset record in horizontal way(add column)
116+
)
117+
JOIN_TYPE_CROSS = "cross" # Each primary will join the secondary record(MxN)
118+
JOIN_TYPE_COLUMN = "column" # join like RDBMS column based inner join

tests/core/test_base_task_executor.py

Lines changed: 207 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import json
99
from unittest.mock import MagicMock, Mock, mock_open, patch
1010

11-
import pytest
1211
import pandas as pd
12+
import pytest
1313

1414
from sygra.core.base_task_executor import BaseTaskExecutor
1515
from sygra.core.dataset.dataset_config import OutputType
@@ -623,89 +623,253 @@ def test_output_sink_jsonl_reading(
623623
dummy_instance.execute()
624624
mock_write.assert_called_once()
625625

626+
626627
def test_validate_data_config_rule1_success_flow(dummy_instance):
627628
# success flow in source and sink
628-
src_config_list = [{"alias": "ds1", "join_type": "primary", "type": "servicenow", "table": "incident", "limit": 10},
629-
{"alias": "ds2", "join_type": "sequential", "type": "servicenow", "table": "request", "limit": 10},
630-
{"alias": "ds3", "join_type": "random", "type": "servicenow", "table": "problem", "limit": 10}]
631-
sink_config_list = [{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}]
629+
src_config_list = [
630+
{
631+
"alias": "ds1",
632+
"join_type": "primary",
633+
"type": "servicenow",
634+
"table": "incident",
635+
"limit": 10,
636+
},
637+
{
638+
"alias": "ds2",
639+
"join_type": "sequential",
640+
"type": "servicenow",
641+
"table": "request",
642+
"limit": 10,
643+
},
644+
{
645+
"alias": "ds3",
646+
"join_type": "random",
647+
"type": "servicenow",
648+
"table": "problem",
649+
"limit": 10,
650+
},
651+
]
652+
sink_config_list = [
653+
{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}
654+
]
632655
validated = dummy_instance.validate_data_config(src_config_list, sink_config_list)
633-
assert validated == True
656+
assert validated
657+
634658

635659
def test_validate_data_config_rule1_missing_join_type(dummy_instance):
636660
# missing join type in source
637-
src_config_list = [{"alias": "ds1", "join_type": "primary", "type": "servicenow", "table": "incident", "limit": 10},
638-
{"alias": "ds2", "type": "servicenow", "table": "request", "limit": 10},
639-
{"alias": "ds3", "join_type": "random", "type": "servicenow", "table": "problem", "limit": 10}]
640-
sink_config_list = [{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}]
661+
src_config_list = [
662+
{
663+
"alias": "ds1",
664+
"join_type": "primary",
665+
"type": "servicenow",
666+
"table": "incident",
667+
"limit": 10,
668+
},
669+
{"alias": "ds2", "type": "servicenow", "table": "request", "limit": 10},
670+
{
671+
"alias": "ds3",
672+
"join_type": "random",
673+
"type": "servicenow",
674+
"table": "problem",
675+
"limit": 10,
676+
},
677+
]
678+
sink_config_list = [
679+
{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}
680+
]
641681
validated = dummy_instance.validate_data_config(src_config_list, sink_config_list)
642-
assert validated == False
682+
assert not validated
683+
643684

644685
def test_validate_data_config_rule1_missing_alias(dummy_instance):
645686
# missing alias in source
646-
src_config_list = [{"alias": "ds1", "join_type": "primary", "type": "servicenow", "table": "incident", "limit": 10},
647-
{"join_type": "sequential", "type": "servicenow", "table": "request", "limit": 10},
648-
{"alias": "ds3", "join_type": "random", "type": "servicenow", "table": "problem", "limit": 10}]
649-
sink_config_list = [{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}]
687+
src_config_list = [
688+
{
689+
"alias": "ds1",
690+
"join_type": "primary",
691+
"type": "servicenow",
692+
"table": "incident",
693+
"limit": 10,
694+
},
695+
{"join_type": "sequential", "type": "servicenow", "table": "request", "limit": 10},
696+
{
697+
"alias": "ds3",
698+
"join_type": "random",
699+
"type": "servicenow",
700+
"table": "problem",
701+
"limit": 10,
702+
},
703+
]
704+
sink_config_list = [
705+
{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}
706+
]
650707
validated = dummy_instance.validate_data_config(src_config_list, sink_config_list)
651-
assert validated == False
708+
assert not validated
652709

653710
# missing alias in sink
654-
src_config_list = [{"alias": "ds1", "join_type": "primary", "type": "servicenow", "table": "incident", "limit": 10},
655-
{"alias": "ds2", "join_type": "sequential", "type": "servicenow", "table": "request", "limit": 10},
656-
{"alias": "ds3", "join_type": "random", "type": "servicenow", "table": "problem", "limit": 10}]
711+
src_config_list = [
712+
{
713+
"alias": "ds1",
714+
"join_type": "primary",
715+
"type": "servicenow",
716+
"table": "incident",
717+
"limit": 10,
718+
},
719+
{
720+
"alias": "ds2",
721+
"join_type": "sequential",
722+
"type": "servicenow",
723+
"table": "request",
724+
"limit": 10,
725+
},
726+
{
727+
"alias": "ds3",
728+
"join_type": "random",
729+
"type": "servicenow",
730+
"table": "problem",
731+
"limit": 10,
732+
},
733+
]
657734
sink_config_list = [{"type": "servicenow", "table": "incident", "operation": "insert"}]
658735
validated = dummy_instance.validate_data_config(src_config_list, sink_config_list)
659-
assert validated == False
736+
assert not validated
737+
660738

661739
def test_validate_data_config_rule2_vstack_success(dummy_instance):
662740
# all source should be vstack
663-
src_config_list = [{"alias": "ds1", "join_type": "vstack", "type": "servicenow", "table": "incident", "limit": 10},
664-
{"alias": "ds2", "join_type": "vstack", "type": "servicenow", "table": "request", "limit": 10},
665-
{"alias": "ds3", "join_type": "vstack", "type": "servicenow", "table": "problem", "limit": 10}]
666-
sink_config_list = [{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}]
741+
src_config_list = [
742+
{
743+
"alias": "ds1",
744+
"join_type": "vstack",
745+
"type": "servicenow",
746+
"table": "incident",
747+
"limit": 10,
748+
},
749+
{
750+
"alias": "ds2",
751+
"join_type": "vstack",
752+
"type": "servicenow",
753+
"table": "request",
754+
"limit": 10,
755+
},
756+
{
757+
"alias": "ds3",
758+
"join_type": "vstack",
759+
"type": "servicenow",
760+
"table": "problem",
761+
"limit": 10,
762+
},
763+
]
764+
sink_config_list = [
765+
{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}
766+
]
667767
validated = dummy_instance.validate_data_config(src_config_list, sink_config_list)
668-
assert validated == True
768+
assert validated
769+
669770

670771
def test_validate_data_config_rule2_vstack_failure(dummy_instance):
671772
# some source are non vstack
672-
src_config_list = [{"alias": "ds1", "join_type": "vstack", "type": "servicenow", "table": "incident", "limit": 10},
673-
{"alias": "ds2", "join_type": "primary", "type": "servicenow", "table": "request", "limit": 10},
674-
{"alias": "ds3", "join_type": "random", "type": "servicenow", "table": "problem", "limit": 10}]
675-
sink_config_list = [{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}]
773+
src_config_list = [
774+
{
775+
"alias": "ds1",
776+
"join_type": "vstack",
777+
"type": "servicenow",
778+
"table": "incident",
779+
"limit": 10,
780+
},
781+
{
782+
"alias": "ds2",
783+
"join_type": "primary",
784+
"type": "servicenow",
785+
"table": "request",
786+
"limit": 10,
787+
},
788+
{
789+
"alias": "ds3",
790+
"join_type": "random",
791+
"type": "servicenow",
792+
"table": "problem",
793+
"limit": 10,
794+
},
795+
]
796+
sink_config_list = [
797+
{"alias": "ds1", "type": "servicenow", "table": "incident", "operation": "insert"}
798+
]
676799
validated = dummy_instance.validate_data_config(src_config_list, sink_config_list)
677-
assert validated == False
800+
assert not validated
801+
678802

679803
def test_rename_dataframe(dummy_instance):
680-
test_df = pd.DataFrame([{"roll":1, "name": "John", "marks": 123.5}, {"roll":2, "name": "Johny", "marks": 152.5}])
804+
test_df = pd.DataFrame(
805+
[{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}]
806+
)
681807
final_df = dummy_instance._rename_dataframe(test_df, "student")
682808
new_columns = list(final_df.columns)
683-
assert "student->roll" in new_columns and "student->name" in new_columns and "student->marks" in new_columns
809+
assert (
810+
"student->roll" in new_columns
811+
and "student->name" in new_columns
812+
and "student->marks" in new_columns
813+
)
814+
684815

685816
def test_repeat_to_merge_sequentially(dummy_instance):
686817
# horizontal merge with different columns
687818
# test 1 : both df has same rows
688-
primary_df = pd.DataFrame([{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}])
689-
secondary_df = pd.DataFrame([{"class": 5, "sports": "cricket"}, {"class": 6, "sports": "football"}])
819+
primary_df = pd.DataFrame(
820+
[{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}]
821+
)
822+
secondary_df = pd.DataFrame(
823+
[{"class": 5, "sports": "cricket"}, {"class": 6, "sports": "football"}]
824+
)
690825
merged_df = dummy_instance._repeat_to_merge_sequentially(primary_df, secondary_df)
691-
assert len(merged_df) == 2 and merged_df.iloc[0]["class"] == 5 and merged_df.iloc[1]["class"] == 6
826+
assert (
827+
len(merged_df) == 2 and merged_df.iloc[0]["class"] == 5 and merged_df.iloc[1]["class"] == 6
828+
)
692829

693830
# test 2 : secondary has less rows (need rotation with same data)
694-
primary_df = pd.DataFrame([{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}])
831+
primary_df = pd.DataFrame(
832+
[{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}]
833+
)
695834
secondary_df = pd.DataFrame([{"class": 5, "sports": "cricket"}])
696835
merged_df = dummy_instance._repeat_to_merge_sequentially(primary_df, secondary_df)
697-
assert len(merged_df) == 2 and merged_df.iloc[0]["class"] == 5 and merged_df.iloc[1]["class"] == 5
836+
assert (
837+
len(merged_df) == 2 and merged_df.iloc[0]["class"] == 5 and merged_df.iloc[1]["class"] == 5
838+
)
698839

699840
# test 3 : secondary has more rows (truncation needed)
700-
primary_df = pd.DataFrame([{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}])
701-
secondary_df = pd.DataFrame([{"class": 5, "sports": "cricket"}, {"class": 6, "sports": "football"}, {"class": 7, "sports": "tennis"}])
841+
primary_df = pd.DataFrame(
842+
[{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}]
843+
)
844+
secondary_df = pd.DataFrame(
845+
[
846+
{"class": 5, "sports": "cricket"},
847+
{"class": 6, "sports": "football"},
848+
{"class": 7, "sports": "tennis"},
849+
]
850+
)
702851
merged_df = dummy_instance._repeat_to_merge_sequentially(primary_df, secondary_df)
703-
assert len(merged_df) == 2 and merged_df.iloc[0]["class"] == 5 and merged_df.iloc[1]["class"] == 6
852+
assert (
853+
len(merged_df) == 2 and merged_df.iloc[0]["class"] == 5 and merged_df.iloc[1]["class"] == 6
854+
)
855+
704856

705857
def test_shuffle_and_extend(dummy_instance):
706858
# random merge from secondary by keeping primary rows same
707-
primary_df = pd.DataFrame([{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}])
708-
secondary_df = pd.DataFrame([{"class": 5, "sports": "cricket"}, {"class": 6, "sports": "football"}, {"class": 7, "sports": "tennis"}])
859+
primary_df = pd.DataFrame(
860+
[{"roll": 1, "name": "John", "marks": 123.5}, {"roll": 2, "name": "Johny", "marks": 152.5}]
861+
)
862+
secondary_df = pd.DataFrame(
863+
[
864+
{"class": 5, "sports": "cricket"},
865+
{"class": 6, "sports": "football"},
866+
{"class": 7, "sports": "tennis"},
867+
]
868+
)
709869
merged_df = dummy_instance._shuffle_and_extend(primary_df, secondary_df)
710870
# 2 records but new column can have value from any record(secondary)
711-
assert len(merged_df) == 2 and (merged_df.iloc[0]["class"] == 5 or merged_df.iloc[0]["class"] == 6 or merged_df.iloc[0]["class"] == 7)
871+
assert len(merged_df) == 2 and (
872+
merged_df.iloc[0]["class"] == 5
873+
or merged_df.iloc[0]["class"] == 6
874+
or merged_df.iloc[0]["class"] == 7
875+
)

0 commit comments

Comments
 (0)