Skip to content

Commit 0360fea

Browse files
andremarianielloAndre Marianiello
andauthored
Allow Dataflow flex template jobs to be updated (#3069)
* Allow Dataflow flex template jobs to be updated * Update Flex Template tests to run a real job * Add test for updating flex template streaming jobs * Use resourceSchema parameter in resourceDataflowJobIsVirtualUpdate Co-authored-by: Andre Marianiello <[email protected]>
1 parent 02fd1fd commit 0360fea

File tree

3 files changed

+179
-118
lines changed

3 files changed

+179
-118
lines changed

google-beta/resource_dataflow_flex_template_job.go

Lines changed: 89 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
3030
"container_spec_gcs_path": {
3131
Type: schema.TypeString,
3232
Required: true,
33-
ForceNew: true,
3433
},
3534

3635
"name": {
@@ -58,15 +57,13 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
5857
Type: schema.TypeMap,
5958
Optional: true,
6059
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
61-
ForceNew: true,
6260
// TODO add support for labels when the API supports it
6361
Deprecated: "Deprecated until the API supports this field",
6462
},
6563

6664
"parameters": {
6765
Type: schema.TypeMap,
6866
Optional: true,
69-
ForceNew: true,
7067
},
7168

7269
"project": {
@@ -177,10 +174,96 @@ func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{
177174
return nil
178175
}
179176

180-
// resourceDataflowFlexTemplateJobUpdate is a blank method to enable updating
181-
// the on_delete virtual field
177+
func waitForDataflowJobState(d *schema.ResourceData, config *Config, jobID, userAgent string, timeout time.Duration, targetState string) error {
178+
return resource.Retry(timeout, func() *resource.RetryError {
179+
project, err := getProject(d, config)
180+
if err != nil {
181+
return resource.NonRetryableError(err)
182+
}
183+
184+
region, err := getRegion(d, config)
185+
if err != nil {
186+
return resource.NonRetryableError(err)
187+
}
188+
189+
job, err := resourceDataflowJobGetJob(config, project, region, userAgent, jobID)
190+
if err != nil {
191+
if isRetryableError(err) {
192+
return resource.RetryableError(err)
193+
}
194+
return resource.NonRetryableError(err)
195+
}
196+
197+
state := job.CurrentState
198+
if state == targetState {
199+
log.Printf("[DEBUG] the job with ID %q has state %q.", jobID, state)
200+
return nil
201+
}
202+
if _, terminated := dataflowTerminalStatesMap[state]; terminated {
203+
return resource.NonRetryableError(fmt.Errorf("the job with ID %q has terminated with state %q instead of expected state %q", jobID, state, targetState))
204+
} else {
205+
log.Printf("[DEBUG] the job with ID %q has state %q.", jobID, state)
206+
return resource.RetryableError(fmt.Errorf("the job with ID %q has state %q, waiting for %q", jobID, state, targetState))
207+
}
208+
})
209+
}
210+
211+
// resourceDataflowFlexTemplateJobUpdate updates a Flex Template Job resource.
182212
func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interface{}) error {
183-
return nil
213+
// Don't send an update request if only virtual fields have changes
214+
if resourceDataflowJobIsVirtualUpdate(d, resourceDataflowFlexTemplateJob().Schema) {
215+
return nil
216+
}
217+
218+
config := meta.(*Config)
219+
userAgent, err := generateUserAgentString(d, config.userAgent)
220+
if err != nil {
221+
return err
222+
}
223+
224+
project, err := getProject(d, config)
225+
if err != nil {
226+
return err
227+
}
228+
229+
region, err := getRegion(d, config)
230+
if err != nil {
231+
return err
232+
}
233+
234+
// wait until current job is running or terminated
235+
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_RUNNING")
236+
if err != nil {
237+
return fmt.Errorf("Error waiting for job with job ID %q to be running: %v", d.Id(), err)
238+
}
239+
240+
request := dataflow.LaunchFlexTemplateRequest{
241+
LaunchParameter: &dataflow.LaunchFlexTemplateParameter{
242+
ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
243+
JobName: d.Get("name").(string),
244+
Parameters: expandStringMap(d, "parameters"),
245+
Update: true,
246+
},
247+
}
248+
249+
response, err := config.NewDataflowClient(userAgent).Projects.Locations.FlexTemplates.Launch(project, region, &request).Do()
250+
if err != nil {
251+
return err
252+
}
253+
254+
// don't set id until previous job is successfully updated
255+
err = waitForDataflowJobState(d, config, d.Id(), userAgent, d.Timeout(schema.TimeoutUpdate), "JOB_STATE_UPDATED")
256+
if err != nil {
257+
return fmt.Errorf("Error waiting for Job with job ID %q to be updated: %v", d.Id(), err)
258+
}
259+
260+
job := response.Job
261+
d.SetId(job.Id)
262+
if err := d.Set("job_id", job.Id); err != nil {
263+
return fmt.Errorf("Error setting job_id: %s", err)
264+
}
265+
266+
return resourceDataflowFlexTemplateJobRead(d, meta)
184267
}
185268

186269
func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interface{}) error {

google-beta/resource_dataflow_flex_template_job_test.go

Lines changed: 88 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ func TestAccDataflowFlexTemplateJob_basic(t *testing.T) {
1818
t.Parallel()
1919

2020
randStr := randString(t, 10)
21-
bucket := "tf-test-dataflow-gcs-" + randStr
2221
job := "tf-test-dataflow-job-" + randStr
2322

2423
vcrTest(t, resource.TestCase{
@@ -27,9 +26,39 @@ func TestAccDataflowFlexTemplateJob_basic(t *testing.T) {
2726
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
2827
Steps: []resource.TestStep{
2928
{
30-
Config: testAccDataflowFlexTemplateJob_basic(bucket, job),
29+
Config: testAccDataflowFlexTemplateJob_basic(job, "mytopic"),
3130
Check: resource.ComposeTestCheckFunc(
32-
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.big_data"),
31+
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.job"),
32+
),
33+
},
34+
},
35+
})
36+
}
37+
38+
func TestAccDataflowFlexTemplateJob_streamUpdate(t *testing.T) {
39+
// This resource uses custom retry logic that cannot be sped up without
40+
// modifying the actual resource
41+
skipIfVcr(t)
42+
t.Parallel()
43+
44+
randStr := randString(t, 10)
45+
job := "tf-test-dataflow-job-" + randStr
46+
47+
vcrTest(t, resource.TestCase{
48+
PreCheck: func() { testAccPreCheck(t) },
49+
Providers: testAccProviders,
50+
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
51+
Steps: []resource.TestStep{
52+
{
53+
Config: testAccDataflowFlexTemplateJob_basic(job, "mytopic"),
54+
Check: resource.ComposeTestCheckFunc(
55+
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.job"),
56+
),
57+
},
58+
{
59+
Config: testAccDataflowFlexTemplateJob_basic(job, "mytopic2"),
60+
Check: resource.ComposeTestCheckFunc(
61+
testAccDataflowJobHasOption(t, "google_dataflow_flex_template_job.job", "topic", "projects/myproject/topics/mytopic2"),
3362
),
3463
},
3564
},
@@ -43,7 +72,6 @@ func TestAccDataflowFlexTemplateJob_withServiceAccount(t *testing.T) {
4372
t.Parallel()
4473

4574
randStr := randString(t, 10)
46-
bucket := "tf-test-dataflow-gcs-" + randStr
4775
job := "tf-test-dataflow-job-" + randStr
4876
accountId := "tf-test-dataflow-sa" + randStr
4977
zone := "us-central1-b"
@@ -54,10 +82,10 @@ func TestAccDataflowFlexTemplateJob_withServiceAccount(t *testing.T) {
5482
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
5583
Steps: []resource.TestStep{
5684
{
57-
Config: testAccDataflowFlexTemplateJob_serviceAccount(bucket, job, accountId, zone),
85+
Config: testAccDataflowFlexTemplateJob_serviceAccount(job, accountId, zone),
5886
Check: resource.ComposeTestCheckFunc(
59-
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.big_data"),
60-
testAccDataflowFlexTemplateJobHasServiceAccount(t, "google_dataflow_flex_template_job.big_data", accountId, zone),
87+
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.job"),
88+
testAccDataflowFlexTemplateJobHasServiceAccount(t, "google_dataflow_flex_template_job.job", accountId, zone),
6189
),
6290
},
6391
},
@@ -123,136 +151,87 @@ func testAccDataflowFlexTemplateJobGetGeneratedInstance(t *testing.T, s *terrafo
123151
return instance, nil
124152
}
125153

126-
// note: this config creates a job that doesn't actually do anything
127-
func testAccDataflowFlexTemplateJob_basic(bucket, job string) string {
154+
// note: this config creates a job that doesn't actually do anything, but still runs
155+
func testAccDataflowFlexTemplateJob_basic(job, topicName string) string {
128156
return fmt.Sprintf(`
129-
resource "google_storage_bucket" "temp" {
130-
name = "%s"
131-
force_destroy = true
157+
data "google_storage_bucket_object" "flex_template" {
158+
name = "latest/flex/Streaming_Data_Generator"
159+
bucket = "dataflow-templates"
132160
}
133161
134-
resource "google_storage_bucket_object" "flex_template" {
135-
name = "flex_template.json"
136-
bucket = google_storage_bucket.temp.name
137-
content = <<EOF
138-
{
139-
"image": "my-image",
140-
"metadata": {
141-
"description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
142-
"name": "Streaming Beam SQL",
143-
"parameters": [
144-
{
145-
"helpText": "Pub/Sub subscription to read from.",
146-
"label": "Pub/Sub input subscription.",
147-
"name": "inputSubscription",
148-
"regexes": [
149-
"[-_.a-zA-Z0-9]+"
150-
]
151-
},
152-
{
153-
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
154-
"is_optional": true,
155-
"label": "BigQuery output table",
156-
"name": "outputTable",
157-
"regexes": [
158-
"[^:]+:[^.]+[.].+"
159-
]
160-
}
161-
]
162-
},
163-
"sdkInfo": {
164-
"language": "JAVA"
165-
}
166-
}
167-
EOF
168-
}
169-
170-
resource "google_dataflow_flex_template_job" "big_data" {
162+
resource "google_dataflow_flex_template_job" "job" {
171163
name = "%s"
172-
container_spec_gcs_path = "${google_storage_bucket.temp.url}/${google_storage_bucket_object.flex_template.name}"
164+
container_spec_gcs_path = "gs://${data.google_storage_bucket_object.flex_template.bucket}/${data.google_storage_bucket_object.flex_template.name}"
173165
on_delete = "cancel"
174166
parameters = {
175-
inputSubscription = "my-subscription"
176-
outputTable = "my-project:my-dataset.my-table"
167+
schemaLocation = "gs://mybucket/schema.json"
168+
qps = "1"
169+
topic = "projects/myproject/topics/%s"
177170
}
178171
}
179-
`, bucket, job)
172+
`, job, topicName)
180173
}
181174

182-
// note: this config creates a job that doesn't actually do anything
183-
func testAccDataflowFlexTemplateJob_serviceAccount(bucket, job, accountId, zone string) string {
175+
// note: this config creates a job that doesn't actually do anything, but still runs
176+
func testAccDataflowFlexTemplateJob_serviceAccount(job, accountId, zone string) string {
184177
return fmt.Sprintf(`
185-
resource "google_storage_bucket" "temp" {
186-
name = "%s"
187-
force_destroy = true
188-
}
189-
190178
resource "google_service_account" "dataflow-sa" {
191179
account_id = "%s"
192180
display_name = "DataFlow Service Account"
193181
}
194182
195-
resource "google_storage_bucket_iam_member" "dataflow-gcs" {
196-
bucket = google_storage_bucket.temp.name
197-
role = "roles/storage.objectAdmin"
198-
member = "serviceAccount:${google_service_account.dataflow-sa.email}"
199-
}
200-
201183
resource "google_project_iam_member" "dataflow-worker" {
202184
role = "roles/dataflow.worker"
203185
member = "serviceAccount:${google_service_account.dataflow-sa.email}"
204186
}
205187
206-
resource "google_storage_bucket_object" "flex_template" {
207-
name = "flex_template.json"
208-
bucket = google_storage_bucket.temp.name
209-
content = <<EOF
210-
{
211-
"image": "my-image",
212-
"metadata": {
213-
"description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
214-
"name": "Streaming Beam SQL",
215-
"parameters": [
216-
{
217-
"helpText": "Pub/Sub subscription to read from.",
218-
"label": "Pub/Sub input subscription.",
219-
"name": "inputSubscription",
220-
"regexes": [
221-
"[-_.a-zA-Z0-9]+"
222-
]
223-
},
224-
{
225-
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
226-
"is_optional": true,
227-
"label": "BigQuery output table",
228-
"name": "outputTable",
229-
"regexes": [
230-
"[^:]+:[^.]+[.].+"
231-
]
232-
}
233-
]
234-
},
235-
"sdkInfo": {
236-
"language": "JAVA"
237-
}
238-
}
239-
EOF
188+
data "google_storage_bucket_object" "flex_template" {
189+
name = "latest/flex/Streaming_Data_Generator"
190+
bucket = "dataflow-templates"
240191
}
241192
242-
resource "google_dataflow_flex_template_job" "big_data" {
193+
resource "google_dataflow_flex_template_job" "job" {
243194
name = "%s"
244-
container_spec_gcs_path = "${google_storage_bucket.temp.url}/${google_storage_bucket_object.flex_template.name}"
195+
container_spec_gcs_path = "gs://${data.google_storage_bucket_object.flex_template.bucket}/${data.google_storage_bucket_object.flex_template.name}"
245196
on_delete = "cancel"
246197
parameters = {
247-
inputSubscription = "my-subscription"
248-
outputTable = "my-project:my-dataset.my-table"
198+
schemaLocation = "gs://mybucket/schema.json"
199+
qps = "1"
200+
topic = "projects/myproject/topics/mytopic"
249201
serviceAccount = google_service_account.dataflow-sa.email
250202
zone = "%s"
251203
}
252-
depends_on = [
253-
google_storage_bucket_iam_member.dataflow-gcs,
254-
google_project_iam_member.dataflow-worker
255-
]
256204
}
257-
`, bucket, accountId, job, zone)
205+
`, accountId, job, zone)
206+
}
207+
208+
func testAccDataflowJobHasOption(t *testing.T, res, option, expectedValue string) resource.TestCheckFunc {
209+
return func(s *terraform.State) error {
210+
rs, ok := s.RootModule().Resources[res]
211+
if !ok {
212+
return fmt.Errorf("resource %q not found in state", res)
213+
}
214+
215+
if rs.Primary.ID == "" {
216+
return fmt.Errorf("No ID is set")
217+
}
218+
config := googleProviderConfig(t)
219+
220+
job, err := config.NewDataflowClient(config.userAgent).Projects.Jobs.Get(config.Project, rs.Primary.ID).View("JOB_VIEW_ALL").Do()
221+
if err != nil {
222+
return fmt.Errorf("dataflow job does not exist")
223+
}
224+
225+
sdkPipelineOptions, err := ConvertToMap(job.Environment.SdkPipelineOptions)
226+
if err != nil {
227+
return fmt.Errorf("error from ConvertToMap: %s", err)
228+
}
229+
optionsMap := sdkPipelineOptions["options"].(map[string]interface{})
230+
231+
if optionsMap[option] != expectedValue {
232+
return fmt.Errorf("Option %s do not match. Got %s while expecting %s", option, optionsMap[option], expectedValue)
233+
}
234+
235+
return nil
236+
}
258237
}

0 commit comments

Comments
 (0)