Skip to content

Commit 73748e0

Browse files
Make dataflow on_delete a virtual field (#3567) (#2110)
* Make dataflow on_delete a virtual field * Clarify test * Add attribute check for good measure * remove comments * only iterate on "labels" map Signed-off-by: Modular Magician <[email protected]>
1 parent 8a752f4 commit 73748e0

File tree

3 files changed

+134
-4
lines changed

3 files changed

+134
-4
lines changed

.changelog/3567.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
```release-note:bug
2+
`dataflow`: fixed an issue where `google_dataflow_job` would try to update `max_workers`
3+
```
4+
```release-note:bug
5+
`dataflow`: fixed an issue where updating `on_delete` in `google_dataflow_job` would cause the job to be replaced
6+
```

google-beta/resource_dataflow_job.go

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ func resourceDataflowJob() *schema.Resource {
8585
"max_workers": {
8686
Type: schema.TypeInt,
8787
Optional: true,
88+
// ForceNew applies to both stream and batch jobs
89+
ForceNew: true,
8890
},
8991

9092
"parameters": {
@@ -166,12 +168,15 @@ func resourceDataflowJob() *schema.Resource {
166168
}
167169

168170
func resourceDataflowJobTypeCustomizeDiff(d *schema.ResourceDiff, meta interface{}) error {
169-
// All changes are ForceNew for batch jobs
171+
// All non-virtual fields are ForceNew for batch jobs
170172
if d.Get("type") == "JOB_TYPE_BATCH" {
171173
resourceSchema := resourceDataflowJob().Schema
172-
for field, fieldSchema := range resourceSchema {
173-
// Each key within a map must be checked for a change
174-
if fieldSchema.Type == schema.TypeMap {
174+
for field := range resourceSchema {
175+
if field == "on_delete" {
176+
continue
177+
}
178+
// Labels map will likely have suppressed changes, so we check each key instead of the parent field
179+
if field == "labels" {
175180
resourceDataflowJobIterateMapForceNew(field, d)
176181
} else if d.HasChange(field) {
177182
d.ForceNew(field)
@@ -265,6 +270,11 @@ func resourceDataflowJobRead(d *schema.ResourceData, meta interface{}) error {
265270

266271
// Stream update method. Batch job changes should have been set to ForceNew via custom diff
267272
func resourceDataflowJobUpdateByReplacement(d *schema.ResourceData, meta interface{}) error {
273+
// Don't send an update request if only virtual fields have changes
274+
if resourceDataflowJobIsVirtualUpdate(d) {
275+
return nil
276+
}
277+
268278
config := meta.(*Config)
269279

270280
project, err := getProject(d, config)
@@ -456,3 +466,36 @@ func resourceDataflowJobIterateMapForceNew(mapKey string, d *schema.ResourceDiff
456466
}
457467
}
458468
}
469+
470+
func resourceDataflowJobIterateMapHasChange(mapKey string, d *schema.ResourceData) bool {
471+
obj := d.Get(mapKey).(map[string]interface{})
472+
for k := range obj {
473+
entrySchemaKey := mapKey + "." + k
474+
if d.HasChange(entrySchemaKey) {
475+
return true
476+
}
477+
}
478+
return false
479+
}
480+
481+
func resourceDataflowJobIsVirtualUpdate(d *schema.ResourceData) bool {
482+
// on_delete is the only virtual field
483+
if d.HasChange("on_delete") {
484+
// Check if other fields have changes, which would require an actual update request
485+
resourceSchema := resourceDataflowJob().Schema
486+
for field := range resourceSchema {
487+
if field == "on_delete" {
488+
continue
489+
}
490+
// Labels map will likely have suppressed changes, so we check each key instead of the parent field
491+
if (field == "labels" && resourceDataflowJobIterateMapHasChange(field, d)) ||
492+
(field != "labels" && d.HasChange(field)) {
493+
return false
494+
}
495+
}
496+
// on_delete is changing, but nothing else
497+
return true
498+
}
499+
500+
return false
501+
}

google-beta/resource_dataflow_job_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,39 @@ func TestAccDataflowJob_streamUpdate(t *testing.T) {
259259
})
260260
}
261261

262+
func TestAccDataflowJob_virtualUpdate(t *testing.T) {
263+
// Dataflow responses include serialized java classes and bash commands
264+
// This makes body comparison infeasible
265+
skipIfVcr(t)
266+
t.Parallel()
267+
268+
suffix := randString(t, 10)
269+
270+
// If the update is virtual-only, the ID should remain the same after updating.
271+
var id string
272+
vcrTest(t, resource.TestCase{
273+
PreCheck: func() { testAccPreCheck(t) },
274+
Providers: testAccProviders,
275+
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
276+
Steps: []resource.TestStep{
277+
{
278+
Config: testAccDataflowJob_virtualUpdate(suffix, "drain"),
279+
Check: resource.ComposeTestCheckFunc(
280+
testAccDataflowJobExists(t, "google_dataflow_job.pubsub_stream"),
281+
testAccDataflowSetId(t, "google_dataflow_job.pubsub_stream", &id),
282+
),
283+
},
284+
{
285+
Config: testAccDataflowJob_virtualUpdate(suffix, "cancel"),
286+
Check: resource.ComposeTestCheckFunc(
287+
testAccDataflowCheckId(t, "google_dataflow_job.pubsub_stream", &id),
288+
resource.TestCheckResourceAttr("google_dataflow_job.pubsub_stream", "on_delete", "cancel"),
289+
),
290+
},
291+
},
292+
})
293+
}
294+
262295
func testAccCheckDataflowJobDestroyProducer(t *testing.T) func(s *terraform.State) error {
263296
return func(s *terraform.State) error {
264297
for _, rs := range s.RootModule().Resources {
@@ -323,6 +356,32 @@ func testAccDataflowJobExists(t *testing.T, resource string) resource.TestCheckF
323356
}
324357
}
325358

359+
func testAccDataflowSetId(t *testing.T, resource string, id *string) resource.TestCheckFunc {
360+
return func(s *terraform.State) error {
361+
rs, ok := s.RootModule().Resources[resource]
362+
if !ok {
363+
return fmt.Errorf("resource %q not in state", resource)
364+
}
365+
366+
*id = rs.Primary.ID
367+
return nil
368+
}
369+
}
370+
371+
func testAccDataflowCheckId(t *testing.T, resource string, id *string) resource.TestCheckFunc {
372+
return func(s *terraform.State) error {
373+
rs, ok := s.RootModule().Resources[resource]
374+
if !ok {
375+
return fmt.Errorf("resource %q not in state", resource)
376+
}
377+
378+
if rs.Primary.ID != *id {
379+
return fmt.Errorf("ID did not match. Expected %s, received %s", *id, rs.Primary.ID)
380+
}
381+
return nil
382+
}
383+
}
384+
326385
func testAccDataflowJobHasNetwork(t *testing.T, res, expected string) resource.TestCheckFunc {
327386
return func(s *terraform.State) error {
328387
instanceTmpl, err := testAccDataflowJobGetGeneratedInstanceTemplate(t, s, res)
@@ -772,3 +831,25 @@ resource "google_dataflow_job" "pubsub_stream" {
772831
}
773832
`, suffix, suffix, suffix, suffix, testDataflowJobTemplateTextToPubsub, tempLocation)
774833
}
834+
835+
func testAccDataflowJob_virtualUpdate(suffix, onDelete string) string {
836+
return fmt.Sprintf(`
837+
resource "google_pubsub_topic" "topic" {
838+
name = "tf-test-dataflow-job-%s"
839+
}
840+
resource "google_storage_bucket" "bucket" {
841+
name = "tf-test-bucket-%s"
842+
force_destroy = true
843+
}
844+
resource "google_dataflow_job" "pubsub_stream" {
845+
name = "tf-test-dataflow-job-%s"
846+
template_gcs_path = "%s"
847+
temp_gcs_location = google_storage_bucket.bucket.url
848+
parameters = {
849+
inputFilePattern = "${google_storage_bucket.bucket.url}/*.json"
850+
outputTopic = google_pubsub_topic.topic.id
851+
}
852+
on_delete = "%s"
853+
}
854+
`, suffix, suffix, suffix, testDataflowJobTemplateTextToPubsub, onDelete)
855+
}

0 commit comments

Comments
 (0)