Skip to content

Commit 5224c67

Browse files
Add skip_wait_on_job_termination option for dataflow job resources (#5844) (#4196)
Signed-off-by: Modular Magician <[email protected]>
1 parent 6f0582c commit 5224c67

File tree

6 files changed

+179
-14
lines changed

6 files changed

+179
-14
lines changed

.changelog/5844.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
dataflow: added `skip_wait_on_job_termination` attribute to `google_dataflow_job` and `google_dataflow_flex_template_job` resources (issue #10559)
3+
```

google-beta/resource_dataflow_flex_template_job.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,13 @@ func resourceDataflowFlexTemplateJob() *schema.Resource {
8282
Type: schema.TypeString,
8383
Computed: true,
8484
},
85+
86+
"skip_wait_on_job_termination": {
87+
Type: schema.TypeBool,
88+
Optional: true,
89+
Default: false,
90+
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not ensure that the job names are different, e.g. by embedding a release ID or by using a random_id.`,
91+
},
8592
},
8693
UseJSONNumber: true,
8794
}
@@ -165,7 +172,7 @@ func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{
165172
return fmt.Errorf("Error setting labels: %s", err)
166173
}
167174

168-
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
175+
if ok := shouldStopDataflowJobDeleteQuery(job.CurrentState, d.Get("skip_wait_on_job_termination").(bool)); ok {
169176
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
170177
d.SetId("")
171178
return nil
@@ -331,8 +338,10 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
331338
return err
332339
}
333340

334-
// Wait for state to reach terminal state (canceled/drained/done)
335-
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
341+
// Wait for state to reach terminal state (canceled/drained/done plus cancelling/draining if skipWait)
342+
skipWait := d.Get("skip_wait_on_job_termination").(bool)
343+
var ok bool
344+
ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
336345
for !ok {
337346
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
338347
time.Sleep(5 * time.Second)
@@ -341,11 +350,11 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
341350
if err != nil {
342351
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
343352
}
344-
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
353+
ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
345354
}
346355

347-
// Only remove the job from state if it's actually successfully canceled.
348-
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
356+
// Only remove the job from state if it's actually successfully hit a final state.
357+
if ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait); ok {
349358
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
350359
d.SetId("")
351360
return nil

google-beta/resource_dataflow_job.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,13 @@ func resourceDataflowJob() *schema.Resource {
209209
Optional: true,
210210
Description: `Indicates if the job should use the streaming engine feature.`,
211211
},
212+
213+
"skip_wait_on_job_termination": {
214+
Type: schema.TypeBool,
215+
Optional: true,
216+
Default: false,
217+
Description: `If true, treat DRAINING and CANCELLING as terminal job states and do not wait for further changes before removing from terraform state and moving on. WARNING: this will lead to job name conflicts if you do not ensure that the job names are different, e.g. by embedding a release ID or by using a random_id.`,
218+
},
212219
},
213220
UseJSONNumber: true,
214221
}
@@ -238,6 +245,16 @@ func resourceDataflowJobTypeCustomizeDiff(_ context.Context, d *schema.ResourceD
238245
return nil
239246
}
240247

248+
// return true if a job is in a terminal state, OR if a job is in a
249+
// terminating state and skipWait is true
250+
func shouldStopDataflowJobDeleteQuery(state string, skipWait bool) bool {
251+
_, stopQuery := dataflowTerminalStatesMap[state]
252+
if !stopQuery && skipWait {
253+
_, stopQuery = dataflowTerminatingStatesMap[state]
254+
}
255+
return stopQuery
256+
}
257+
241258
func resourceDataflowJobCreate(d *schema.ResourceData, meta interface{}) error {
242259
config := meta.(*Config)
243260
userAgent, err := generateUserAgentString(d, config.userAgent)
@@ -348,7 +365,7 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
348365
return fmt.Errorf("Error setting additional_experiments: %s", err)
349366
}
350367

351-
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
368+
if ok := shouldStopDataflowJobDeleteQuery(job.CurrentState, d.Get("skip_wait_on_job_termination").(bool)); ok {
352369
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
353370
d.SetId("")
354371
return nil
@@ -474,8 +491,9 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
474491
return err
475492
}
476493

477-
// Wait for state to reach terminal state (canceled/drained/done)
478-
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
494+
// Wait for state to reach terminal state (canceled/drained/done plus cancelling/draining if skipWait)
495+
skipWait := d.Get("skip_wait_on_job_termination").(bool)
496+
ok := shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
479497
for !ok {
480498
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
481499
time.Sleep(5 * time.Second)
@@ -484,11 +502,11 @@ func resourceDataflowJobDelete(d *schema.ResourceData, meta interface{}) error {
484502
if err != nil {
485503
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
486504
}
487-
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
505+
ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait)
488506
}
489507

490-
// Only remove the job from state if it's actually successfully canceled.
491-
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
508+
// Only remove the job from state if it's actually successfully hit a final state.
509+
if ok = shouldStopDataflowJobDeleteQuery(d.Get("state").(string), skipWait); ok {
492510
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
493511
d.SetId("")
494512
return nil

google-beta/resource_dataflow_job_test.go

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package google
22

33
import (
44
"fmt"
5+
"strconv"
56
"strings"
67
"testing"
78
"time"
@@ -44,6 +45,32 @@ func TestAccDataflowJob_basic(t *testing.T) {
4445
})
4546
}
4647

48+
func TestAccDataflowJobSkipWait_basic(t *testing.T) {
49+
// Dataflow responses include serialized java classes and bash commands
50+
// This makes body comparison infeasible
51+
skipIfVcr(t)
52+
t.Parallel()
53+
54+
randStr := randString(t, 10)
55+
bucket := "tf-test-dataflow-gcs-" + randStr
56+
job := "tf-test-dataflow-job-" + randStr
57+
zone := "us-central1-f"
58+
59+
vcrTest(t, resource.TestCase{
60+
PreCheck: func() { testAccPreCheck(t) },
61+
Providers: testAccProviders,
62+
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
63+
Steps: []resource.TestStep{
64+
{
65+
Config: testAccDataflowJobSkipWait_zone(bucket, job, zone),
66+
Check: resource.ComposeTestCheckFunc(
67+
testAccDataflowJobExists(t, "google_dataflow_job.big_data"),
68+
),
69+
},
70+
},
71+
})
72+
}
73+
4774
func TestAccDataflowJob_withRegion(t *testing.T) {
4875
// Dataflow responses include serialized java classes and bash commands
4976
// This makes body comparison infeasible
@@ -329,7 +356,16 @@ func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.Stat
329356
config := googleProviderConfig(t)
330357
job, err := config.NewDataflowClient(config.userAgent).Projects.Jobs.Get(config.Project, rs.Primary.ID).Do()
331358
if job != nil {
332-
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; !ok {
359+
var ok bool
360+
skipWait, err := strconv.ParseBool(rs.Primary.Attributes["skip_wait_on_job_termination"])
361+
if err != nil {
362+
return fmt.Errorf("could not parse attribute: %v", err)
363+
}
364+
_, ok = dataflowTerminalStatesMap[job.CurrentState]
365+
if !ok && skipWait {
366+
_, ok = dataflowTerminatingStatesMap[job.CurrentState]
367+
}
368+
if !ok {
333369
return fmt.Errorf("Job still present")
334370
}
335371
} else if err != nil {
@@ -351,7 +387,16 @@ func testAccCheckDataflowJobRegionDestroyProducer(t *testing.T) func(s *terrafor
351387
config := googleProviderConfig(t)
352388
job, err := config.NewDataflowClient(config.userAgent).Projects.Locations.Jobs.Get(config.Project, "us-central1", rs.Primary.ID).Do()
353389
if job != nil {
354-
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; !ok {
390+
var ok bool
391+
skipWait, err := strconv.ParseBool(rs.Primary.Attributes["skip_wait_on_job_termination"])
392+
if err != nil {
393+
return fmt.Errorf("could not parse attribute: %v", err)
394+
}
395+
_, ok = dataflowTerminalStatesMap[job.CurrentState]
396+
if !ok && skipWait {
397+
_, ok = dataflowTerminatingStatesMap[job.CurrentState]
398+
}
399+
if !ok {
355400
return fmt.Errorf("Job still present")
356401
}
357402
} else if err != nil {
@@ -635,6 +680,32 @@ resource "google_dataflow_job" "big_data" {
635680
`, bucket, job, zone, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)
636681
}
637682

683+
func testAccDataflowJobSkipWait_zone(bucket, job, zone string) string {
684+
return fmt.Sprintf(`
685+
resource "google_storage_bucket" "temp" {
686+
name = "%s"
687+
location = "US"
688+
force_destroy = true
689+
}
690+
691+
resource "google_dataflow_job" "big_data" {
692+
name = "%s"
693+
694+
zone = "%s"
695+
696+
machine_type = "e2-standard-2"
697+
template_gcs_path = "%s"
698+
temp_gcs_location = google_storage_bucket.temp.url
699+
parameters = {
700+
inputFile = "%s"
701+
output = "${google_storage_bucket.temp.url}/output"
702+
}
703+
on_delete = "cancel"
704+
skip_wait_on_job_termination = true
705+
}
706+
`, bucket, job, zone, testDataflowJobTemplateWordCountUrl, testDataflowJobSampleFileUrl)
707+
}
708+
638709
func testAccDataflowJob_region(bucket, job string) string {
639710
return fmt.Sprintf(`
640711
resource "google_storage_bucket" "temp" {

website/docs/r/dataflow_flex_template_job.html.markdown

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,38 @@ is "cancelled", but if a user sets `on_delete` to `"drain"` in the
4848
configuration, you may experience a long wait for your `terraform destroy` to
4949
complete.
5050

51+
You can potentially short-circuit the wait by setting `skip_wait_for_job_termination`
52+
to `true`, but beware that unless you take active steps to ensure that the job
53+
`name` parameter changes between instances, the name will conflict and the launch
54+
of the new job will fail. One way to do this is with a
55+
[random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id)
56+
resource, for example:
57+
58+
```hcl
59+
variable "big_data_job_subscription_id" {
60+
type = string
61+
default = "projects/myproject/subscriptions/messages"
62+
}
63+
64+
resource "random_id" "big_data_job_name_suffix" {
65+
byte_length = 4
66+
keepers = {
67+
region = var.region
68+
subscription_id = var.big_data_job_subscription_id
69+
}
70+
}
71+
resource "google_dataflow_flex_template_job" "big_data_job" {
72+
provider = google-beta
73+
name = "dataflow-flextemplates-job-${random_id.big_data_job_name_suffix.dec}"
74+
region = var.region
75+
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
76+
skip_wait_for_job_termination = true
77+
parameters = {
78+
inputSubscription = var.big_data_job_subscription_id
79+
}
80+
}
81+
```
82+
5183
## Argument Reference
5284

5385
The following arguments are supported:
@@ -74,6 +106,10 @@ labels will be ignored to prevent diffs on re-apply.
74106
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of
75107
deletion during `terraform destroy`. See above note.
76108

109+
* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will
110+
treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource,
111+
and will remove the resource from terraform state and move on. See above note.
112+
77113
* `project` - (Optional) The project in which the resource belongs. If it is not
78114
provided, the provider project is used.
79115

website/docs/r/dataflow_job.html.markdown

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,33 @@ The Dataflow resource is considered 'existing' while it is in a nonterminal stat
6565

6666
A Dataflow job which is 'destroyed' may be "cancelled" or "drained". If "cancelled", the job terminates - any data written remains where it is, but no new data will be processed. If "drained", no new data will enter the pipeline, but any data currently in the pipeline will finish being processed. The default is "drain". When `on_delete` is set to `"drain"` in the configuration, you may experience a long wait for your `terraform destroy` to complete.
6767

68+
You can potentially short-circuit the wait by setting `skip_wait_for_job_termination` to `true`, but beware that unless you take active steps to ensure that the job `name` parameter changes between instances, the name will conflict and the launch of the new job will fail. One way to do this is with a [random_id](https://registry.terraform.io/providers/hashicorp/random/latest/docs/resources/id) resource, for example:
69+
70+
```hcl
71+
variable "big_data_job_subscription_id" {
72+
type = string
73+
default = "projects/myproject/subscriptions/messages"
74+
}
75+
76+
resource "random_id" "big_data_job_name_suffix" {
77+
byte_length = 4
78+
keepers = {
79+
region = var.region
80+
subscription_id = var.big_data_job_subscription_id
81+
}
82+
}
83+
resource "google_dataflow_flex_template_job" "big_data_job" {
84+
provider = google-beta
85+
name = "dataflow-flextemplates-job-${random_id.big_data_job_name_suffix.dec}"
86+
region = var.region
87+
container_spec_gcs_path = "gs://my-bucket/templates/template.json"
88+
skip_wait_for_job_termination = true
89+
parameters = {
90+
inputSubscription = var.big_data_job_subscription_id
91+
}
92+
}
93+
```
94+
6895
## Argument Reference
6996

7097
The following arguments are supported:
@@ -83,6 +110,7 @@ The following arguments are supported:
83110
* `transform_name_mapping` - (Optional) Only applicable when updating a pipeline. Map of transform name prefixes of the job to be replaced with the corresponding name prefixes of the new job. This field is not used outside of update.
84111
* `max_workers` - (Optional) The number of workers permitted to work on the job. More workers may improve processing speed at additional cost.
85112
* `on_delete` - (Optional) One of "drain" or "cancel". Specifies behavior of deletion during `terraform destroy`. See above note.
113+
* `skip_wait_for_job_termination` - (Optional) If set to `true`, terraform will treat `DRAINING` and `CANCELLING` as terminal states when deleting the resource, and will remove the resource from terraform state and move on. See above note.
86114
* `project` - (Optional) The project in which the resource belongs. If it is not provided, the provider project is used.
87115
* `zone` - (Optional) The zone in which the created job should run. If it is not provided, the provider zone is used.
88116
* `region` - (Optional) The region in which the created job should run.

0 commit comments

Comments
 (0)