Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ def rank_error(msg):
state_update_callback(response.currentState)
_LOGGER.info('Job %s is in state %s', job_id, response.currentState)
last_job_state = response.currentState
if str(response.currentState) != 'JOB_STATE_RUNNING':
if str(response.currentState) not in ('JOB_STATE_RUNNING',
'JOB_STATE_PAUSED',
'JOB_STATE_PAUSING'):
# Stop checking for new messages on timeout, explanatory
# message received, success, or a terminal job state caused
# by the user that therefore doesn't require explanation.
Expand Down Expand Up @@ -751,6 +753,8 @@ def api_jobstate_to_pipeline_state(api_jobstate):
values_enum.JOB_STATE_CANCELLING: PipelineState.CANCELLING,
values_enum.JOB_STATE_RESOURCE_CLEANING_UP: PipelineState.
RESOURCE_CLEANING_UP,
values_enum.JOB_STATE_PAUSING: PipelineState.PAUSING,
values_enum.JOB_STATE_PAUSED: PipelineState.PAUSED,
})

return (
Expand Down
42 changes: 42 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,22 @@ def get_job_side_effect(*args, **kwargs):
result = duration_timedout_result.wait_until_finish(5000)
self.assertEqual(result, PipelineState.RUNNING)

with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20, 20])):
duration_timedout_runner = MockDataflowRunner(
[values_enum.JOB_STATE_PAUSING])
duration_timedout_result = DataflowPipelineResult(
duration_timedout_runner.job, duration_timedout_runner, options)
result = duration_timedout_result.wait_until_finish(5000)
self.assertEqual(result, PipelineState.PAUSING)

with mock.patch('time.time', mock.MagicMock(side_effect=[1, 9, 9, 20, 20])):
duration_timedout_runner = MockDataflowRunner(
[values_enum.JOB_STATE_PAUSED])
duration_timedout_result = DataflowPipelineResult(
duration_timedout_runner.job, duration_timedout_runner, options)
result = duration_timedout_result.wait_until_finish(5000)
self.assertEqual(result, PipelineState.PAUSED)

with mock.patch('time.time', mock.MagicMock(side_effect=[1, 1, 2, 2, 3])):
with self.assertRaisesRegex(DataflowRuntimeException,
'Dataflow pipeline failed. State: CANCELLED'):
Expand Down Expand Up @@ -239,6 +255,32 @@ def __init__(self, state, cancel_result):
terminal_runner.job, terminal_runner, options)
terminal_result.cancel()

def test_api_jobstate_to_pipeline_state(self):
values_enum = dataflow_api.Job.CurrentStateValueValuesEnum
expected_mappings = [
(values_enum.JOB_STATE_UNKNOWN, PipelineState.UNKNOWN),
(values_enum.JOB_STATE_STOPPED, PipelineState.STOPPED),
(values_enum.JOB_STATE_RUNNING, PipelineState.RUNNING),
(values_enum.JOB_STATE_DONE, PipelineState.DONE),
(values_enum.JOB_STATE_FAILED, PipelineState.FAILED),
(values_enum.JOB_STATE_CANCELLED, PipelineState.CANCELLED),
(values_enum.JOB_STATE_UPDATED, PipelineState.UPDATED),
(values_enum.JOB_STATE_DRAINING, PipelineState.DRAINING),
(values_enum.JOB_STATE_DRAINED, PipelineState.DRAINED),
(values_enum.JOB_STATE_PENDING, PipelineState.PENDING),
(values_enum.JOB_STATE_CANCELLING, PipelineState.CANCELLING),
(
values_enum.JOB_STATE_RESOURCE_CLEANING_UP,
PipelineState.RESOURCE_CLEANING_UP),
(values_enum.JOB_STATE_PAUSING, PipelineState.PAUSING),
(values_enum.JOB_STATE_PAUSED, PipelineState.PAUSED),
]

for api_state, pipeline_state in expected_mappings:
self.assertEqual(
DataflowPipelineResult.api_jobstate_to_pipeline_state(api_state),
pipeline_state)

def test_create_runner(self):
self.assertTrue(isinstance(create_runner('DataflowRunner'), DataflowRunner))
self.assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2372,6 +2372,8 @@ class ExecutionStageStateValueValuesEnum(_messages.Enum):
indicates that the batch job's associated resources are currently
being cleaned up after a successful run. Currently, this is an opt-in
feature, please reach out to Cloud support team if you are interested.
JOB_STATE_PAUSING: `JOB_STATE_PAUSING` is not implemented yet.
JOB_STATE_PAUSED: `JOB_STATE_PAUSED` is not implemented yet.
"""
JOB_STATE_UNKNOWN = 0
JOB_STATE_STOPPED = 1
Expand All @@ -2386,6 +2388,8 @@ class ExecutionStageStateValueValuesEnum(_messages.Enum):
JOB_STATE_CANCELLING = 10
JOB_STATE_QUEUED = 11
JOB_STATE_RESOURCE_CLEANING_UP = 12
JOB_STATE_PAUSING = 13
JOB_STATE_PAUSED = 14

currentStateTime = _messages.StringField(1)
executionStageName = _messages.StringField(2)
Expand Down Expand Up @@ -3166,6 +3170,8 @@ class CurrentStateValueValuesEnum(_messages.Enum):
indicates that the batch job's associated resources are currently
being cleaned up after a successful run. Currently, this is an opt-in
feature, please reach out to Cloud support team if you are interested.
JOB_STATE_PAUSING: `JOB_STATE_PAUSING` is not implemented yet.
JOB_STATE_PAUSED: `JOB_STATE_PAUSED` is not implemented yet.
"""
JOB_STATE_UNKNOWN = 0
JOB_STATE_STOPPED = 1
Expand All @@ -3180,6 +3186,8 @@ class CurrentStateValueValuesEnum(_messages.Enum):
JOB_STATE_CANCELLING = 10
JOB_STATE_QUEUED = 11
JOB_STATE_RESOURCE_CLEANING_UP = 12
JOB_STATE_PAUSING = 13
JOB_STATE_PAUSED = 14

class RequestedStateValueValuesEnum(_messages.Enum):
r"""The job's requested state. Applies to `UpdateJob` requests. Set
Expand Down Expand Up @@ -3240,6 +3248,8 @@ class RequestedStateValueValuesEnum(_messages.Enum):
indicates that the batch job's associated resources are currently
being cleaned up after a successful run. Currently, this is an opt-in
feature, please reach out to Cloud support team if you are interested.
JOB_STATE_PAUSING: `JOB_STATE_PAUSING` is not implemented yet.
JOB_STATE_PAUSED: `JOB_STATE_PAUSED` is not implemented yet.
"""
JOB_STATE_UNKNOWN = 0
JOB_STATE_STOPPED = 1
Expand All @@ -3254,6 +3264,8 @@ class RequestedStateValueValuesEnum(_messages.Enum):
JOB_STATE_CANCELLING = 10
JOB_STATE_QUEUED = 11
JOB_STATE_RESOURCE_CLEANING_UP = 12
JOB_STATE_PAUSING = 13
JOB_STATE_PAUSED = 14

class TypeValueValuesEnum(_messages.Enum):
r"""Optional. The type of Dataflow job.
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ class PipelineState(object):
# in the process of stopping
RESOURCE_CLEANING_UP = 'RESOURCE_CLEANING_UP' # job's resources are being
# cleaned up
PAUSING = 'PAUSING' # job is in the process of pausing
PAUSED = 'PAUSED' # job has been paused
UNRECOGNIZED = 'UNRECOGNIZED' # the job state reported by a runner cannot be
# interpreted by the SDK.

Expand Down
Loading