@@ -26,8 +26,8 @@ def register_activity_type(self, *args, **kwargs):
2626 Workflow ,
2727 activity ,
2828 futures ,
29- constants ,
3029)
30+ from simpleflow .swf import constants
3131from simpleflow .swf .executor import Executor
3232
3333
@@ -1007,3 +1007,142 @@ def test_activity_not_found_schedule_failed_already_exists():
10071007 decisions , _ = executor .replay (history )
10081008
10091009 check_task_scheduled_decision (decisions [0 ], increment )
1010+
1011+
1012+ class TestDefinitionMoreThanMaxOpenActivities (TestWorkflow ):
1013+ """
1014+ This workflow executes more tasks than the maximum number of decisions a
1015+ decider can take once.
1016+
1017+ """
1018+ def run (self ):
1019+ results = self .map (
1020+ increment ,
1021+ xrange (constants .MAX_OPEN_ACTIVITY_COUNT + 20 ))
1022+ futures .wait (* results )
1023+
1024+
1025+ def test_more_than_1000_open_activities_scheduled ():
1026+ workflow = TestDefinitionMoreThanMaxOpenActivities
1027+ executor = Executor (DOMAIN , workflow )
1028+ history = builder .History (workflow )
1029+
1030+ # The first time, the executor should schedule
1031+ # ``constants.MAX_OPEN_ACTIVITY_COUNT`` decisions.
1032+ # No timer because we wait for at least an activity to complete.
1033+ for i in xrange (constants .MAX_OPEN_ACTIVITY_COUNT / constants .MAX_DECISIONS ):
1034+ decisions , _ = executor .replay (history )
1035+ assert len (decisions ) == constants .MAX_DECISIONS
1036+
1037+ decision_id = history .last_id
1038+ for i in xrange (constants .MAX_OPEN_ACTIVITY_COUNT ):
1039+ history .add_activity_task (
1040+ increment ,
1041+ decision_id = decision_id ,
1042+ activity_id = 'activity-tests.test_dataflow.increment-{}' .format (
1043+ i + 1 ),
1044+ last_state = 'scheduled' ,
1045+ result = i + 1 )
1046+ (history
1047+ .add_decision_task_scheduled ()
1048+ .add_decision_task_started ())
1049+
1050+ decisions , _ = executor .replay (history )
1051+ assert executor ._open_activity_count == constants .MAX_OPEN_ACTIVITY_COUNT
1052+ assert len (decisions ) == 0
1053+
1054+
1055+ def test_more_than_1000_open_activities_scheduled_and_running ():
1056+ def get_random_state ():
1057+ import random
1058+ return random .choice (['scheduled' , 'started' ])
1059+
1060+ workflow = TestDefinitionMoreThanMaxOpenActivities
1061+ executor = Executor (DOMAIN , workflow )
1062+ history = builder .History (workflow )
1063+
1064+ # The first time, the executor should schedule
1065+ # ``constants.MAX_OPEN_ACTIVITY_COUNT`` decisions.
1066+ # No timer because we wait for at least an activity to complete.
1067+ for i in xrange (constants .MAX_OPEN_ACTIVITY_COUNT / constants .MAX_DECISIONS ):
1068+ decisions , _ = executor .replay (history )
1069+ assert len (decisions ) == constants .MAX_DECISIONS
1070+
1071+ decision_id = history .last_id
1072+ for i in xrange (constants .MAX_OPEN_ACTIVITY_COUNT ):
1073+ history .add_activity_task (
1074+ increment ,
1075+ decision_id = decision_id ,
1076+ activity_id = 'activity-tests.test_dataflow.increment-{}' .format (
1077+ i + 1 ),
1078+ last_state = get_random_state (),
1079+ result = i + 1 )
1080+ (history
1081+ .add_decision_task_scheduled ()
1082+ .add_decision_task_started ())
1083+
1084+ decisions , _ = executor .replay (history )
1085+ assert len (decisions ) == 0
1086+
1087+
1088+ def test_more_than_1000_open_activities_partial_max ():
1089+ workflow = TestDefinitionMoreThanMaxOpenActivities
1090+ executor = Executor (DOMAIN , workflow )
1091+ history = builder .History (workflow )
1092+ decisions , _ = executor .replay (history )
1093+
1094+ first_decision_id = history .last_id
1095+ for i in xrange (constants .MAX_OPEN_ACTIVITY_COUNT - 2 ):
1096+ history .add_activity_task (
1097+ increment ,
1098+ decision_id = first_decision_id ,
1099+ activity_id = 'activity-tests.test_dataflow.increment-{}' .format (
1100+ i + 1 ),
1101+ last_state = 'scheduled' ,
1102+ result = i + 1 )
1103+ (history
1104+ .add_decision_task_scheduled ()
1105+ .add_decision_task_started ())
1106+
1107+ decisions , _ = executor .replay (history )
1108+ assert executor ._open_activity_count == constants .MAX_OPEN_ACTIVITY_COUNT
1109+ assert len (decisions ) == 2
1110+
1111+ history .add_decision_task_completed ()
1112+ for i in xrange (2 ):
1113+ id_ = constants .MAX_OPEN_ACTIVITY_COUNT - 2 + i + 1
1114+ history .add_activity_task (
1115+ increment ,
1116+ decision_id = history .last_id ,
1117+ activity_id = 'activity-tests.test_dataflow.increment-{}' .format (
1118+ id_ ),
1119+ last_state = 'scheduled' ,
1120+ result = id_ ,
1121+ )
1122+
1123+ (history
1124+ .add_decision_task_scheduled ()
1125+ .add_decision_task_started ())
1126+
1127+ decisions , _ = executor .replay (history )
1128+ assert executor ._open_activity_count == constants .MAX_OPEN_ACTIVITY_COUNT
1129+ assert len (decisions ) == 0
1130+
1131+ history .add_decision_task_completed ()
1132+
1133+ for i in xrange (constants .MAX_OPEN_ACTIVITY_COUNT - 2 ):
1134+ scheduled_id = first_decision_id + i + 1
1135+ history .add_activity_task_started (scheduled_id )
1136+ history .add_activity_task_completed (
1137+ scheduled_id ,
1138+ started = history .last_id ,
1139+ )
1140+
1141+ (history
1142+ .add_decision_task_scheduled ()
1143+ .add_decision_task_started ())
1144+
1145+ decisions , _ = executor .replay (history )
1146+ # 2 already scheduled + 20 to schedule now
1147+ assert executor ._open_activity_count == 22
1148+ assert len (decisions ) == 20
0 commit comments