Skip to content

Commit 24e8594

Browse files
authored
Merge pull request #2705 from spotify/claire/fix_dataflow_bq_return_type
Fix BigQueryTarget parsing in beam_dataflow module
2 parents baa54c9 + 5d9d38d commit 24e8594

File tree

2 files changed

+22
-3
lines changed

2 files changed

+22
-3
lines changed

luigi/contrib/beam_dataflow.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ class BeamDataflowJobTask(MixinNaiveBulkComplete, luigi.Task):
219219
def __init__(self):
220220
if not isinstance(self.dataflow_params, DataflowParamKeys):
221221
raise ValueError("dataflow_params must be of type DataflowParamKeys")
222+
super(BeamDataflowJobTask, self).__init__()
222223

223224
@abstractmethod
224225
def dataflow_executable(self):
@@ -471,9 +472,13 @@ def _format_output_args(self):
471472

472473
@staticmethod
473474
def get_target_path(target):
475+
"""
476+
Given a luigi Target, determine a stringly typed path to pass as a
477+
Dataflow job argument.
478+
"""
474479
if isinstance(target, luigi.LocalTarget) or isinstance(target, gcs.GCSTarget):
475480
return target.path
476481
elif isinstance(target, bigquery.BigQueryTarget):
477-
"{}:{}.{}".format(target.project_id, target.dataset_id, target.table_id)
482+
return "{}:{}.{}".format(target.table.project_id, target.table.dataset_id, target.table.table_id)
478483
else:
479-
raise ValueError("Target not supported")
484+
raise ValueError("Target %s not supported" % target)

test/contrib/beam_dataflow_test.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
import json
1919
import luigi
20-
from luigi.contrib import beam_dataflow
20+
from luigi.contrib import beam_dataflow, bigquery, gcs
2121
from luigi import local_target
2222
import mock
2323
from mock import MagicMock, patch
@@ -282,6 +282,20 @@ def output(self):
282282

283283
self.assertEqual(TestTaskDictOfMixedCompleteOutput().complete(), False)
284284

285+
def test_get_target_path(self):
286+
bq_target = bigquery.BigQueryTarget("p", "d", "t", client="fake_client")
287+
self.assertEqual(
288+
SimpleTestTask.get_target_path(bq_target),
289+
"p:d.t")
290+
291+
gcs_target = gcs.GCSTarget("gs://foo/bar.txt", client="fake_client")
292+
self.assertEqual(
293+
SimpleTestTask.get_target_path(gcs_target),
294+
"gs://foo/bar.txt")
295+
296+
with self.assertRaises(ValueError):
297+
SimpleTestTask.get_target_path("not_a_target")
298+
285299
def test_dataflow_runner_resolution(self):
286300
task = SimpleTestTask()
287301
# Test that supported runners are passed through

0 commit comments

Comments
 (0)