Skip to content

Commit e497c91

Browse files
modular-magicianc2thorngeojaz
authored
Add the google_dataflow_flex_template_job resource (#3772) (#2303)
Co-authored-by: Cameron Thornton <[email protected]> Co-authored-by: eric-hole <[email protected]> Signed-off-by: Modular Magician <[email protected]> Co-authored-by: Cameron Thornton <[email protected]> Co-authored-by: eric-hole <[email protected]>
1 parent e74cbe1 commit e497c91

File tree

6 files changed

+408
-0
lines changed

6 files changed

+408
-0
lines changed

.changelog/3772.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:new-resource
2+
`google_dataflow_flex_template_job`
3+
```

google-beta/provider.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -982,6 +982,7 @@ func ResourceMapWithErrors() (map[string]*schema.Resource, error) {
982982
"google_container_node_pool": resourceContainerNodePool(),
983983
"google_container_registry": resourceContainerRegistry(),
984984
"google_dataflow_job": resourceDataflowJob(),
985+
"google_dataflow_flex_template_job": resourceDataflowFlexTemplateJob(),
985986
"google_dataproc_cluster": resourceDataprocCluster(),
986987
"google_dataproc_cluster_iam_binding": ResourceIamBinding(IamDataprocClusterSchema, NewDataprocClusterUpdater, DataprocClusterIdParseFunc),
987988
"google_dataproc_cluster_iam_member": ResourceIamMember(IamDataprocClusterSchema, NewDataprocClusterUpdater, DataprocClusterIdParseFunc),
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
package google
2+
3+
import (
4+
"fmt"
5+
"log"
6+
"strings"
7+
"time"
8+
9+
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
10+
"google.golang.org/api/googleapi"
11+
12+
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
13+
"github.com/hashicorp/terraform-plugin-sdk/helper/validation"
14+
dataflow "google.golang.org/api/dataflow/v1b3"
15+
)
16+
17+
// NOTE: resource_dataflow_flex_template currently does not support updating existing jobs.
18+
// Changing any non-computed field will result in the job being deleted (according to its
19+
// on_delete policy) and recreated with the updated parameters.
20+
21+
// resourceDataflowFlexTemplateJob defines the schema for Dataflow FlexTemplate jobs.
22+
func resourceDataflowFlexTemplateJob() *schema.Resource {
23+
return &schema.Resource{
24+
Create: resourceDataflowFlexTemplateJobCreate,
25+
Read: resourceDataflowFlexTemplateJobRead,
26+
Update: resourceDataflowFlexTemplateJobUpdate,
27+
Delete: resourceDataflowFlexTemplateJobDelete,
28+
Schema: map[string]*schema.Schema{
29+
30+
"container_spec_gcs_path": {
31+
Type: schema.TypeString,
32+
Required: true,
33+
ForceNew: true,
34+
},
35+
36+
"name": {
37+
Type: schema.TypeString,
38+
Required: true,
39+
ForceNew: true,
40+
},
41+
42+
"on_delete": {
43+
Type: schema.TypeString,
44+
ValidateFunc: validation.StringInSlice([]string{"cancel", "drain"}, false),
45+
Optional: true,
46+
Default: "cancel",
47+
},
48+
49+
"labels": {
50+
Type: schema.TypeMap,
51+
Optional: true,
52+
DiffSuppressFunc: resourceDataflowJobLabelDiffSuppress,
53+
ForceNew: true,
54+
},
55+
56+
"parameters": {
57+
Type: schema.TypeMap,
58+
Optional: true,
59+
ForceNew: true,
60+
},
61+
62+
"project": {
63+
Type: schema.TypeString,
64+
Optional: true,
65+
Computed: true,
66+
ForceNew: true,
67+
},
68+
69+
"job_id": {
70+
Type: schema.TypeString,
71+
Computed: true,
72+
},
73+
74+
"state": {
75+
Type: schema.TypeString,
76+
Computed: true,
77+
},
78+
},
79+
}
80+
}
81+
82+
// resourceDataflowFlexTemplateJobCreate creates a Flex Template Job from TF code.
83+
func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interface{}) error {
84+
config := meta.(*Config)
85+
86+
project, err := getProject(d, config)
87+
if err != nil {
88+
return err
89+
}
90+
91+
region, err := getRegion(d, config)
92+
if err != nil {
93+
return err
94+
}
95+
96+
request := dataflow.LaunchFlexTemplateRequest{
97+
LaunchParameter: &dataflow.LaunchFlexTemplateParameter{
98+
ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
99+
JobName: d.Get("name").(string),
100+
Parameters: expandStringMap(d, "parameters"),
101+
},
102+
}
103+
104+
response, err := config.clientDataflow.Projects.Locations.FlexTemplates.Launch(project, region, &request).Do()
105+
if err != nil {
106+
return err
107+
}
108+
109+
job := response.Job
110+
d.SetId(job.Id)
111+
d.Set("job_id", job.Id)
112+
113+
return resourceDataflowFlexTemplateJobRead(d, meta)
114+
}
115+
116+
// resourceDataflowFlexTemplateJobRead reads a Flex Template Job resource.
117+
func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{}) error {
118+
config := meta.(*Config)
119+
120+
project, err := getProject(d, config)
121+
if err != nil {
122+
return err
123+
}
124+
125+
region, err := getRegion(d, config)
126+
if err != nil {
127+
return err
128+
}
129+
130+
jobId := d.Id()
131+
132+
job, err := resourceDataflowJobGetJob(config, project, region, jobId)
133+
if err != nil {
134+
return handleNotFoundError(err, d, fmt.Sprintf("Dataflow job %s", jobId))
135+
}
136+
137+
d.Set("state", job.CurrentState)
138+
d.Set("name", job.Name)
139+
d.Set("project", project)
140+
d.Set("labels", job.Labels)
141+
142+
if _, ok := dataflowTerminalStatesMap[job.CurrentState]; ok {
143+
log.Printf("[DEBUG] Removing resource '%s' because it is in state %s.\n", job.Name, job.CurrentState)
144+
d.SetId("")
145+
return nil
146+
}
147+
148+
return nil
149+
}
150+
151+
// resourceDataflowFlexTemplateJobUpdate is a blank method to enable updating
152+
// the on_delete virtual field
153+
func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interface{}) error {
154+
return nil
155+
}
156+
157+
func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interface{}) error {
158+
config := meta.(*Config)
159+
160+
project, err := getProject(d, config)
161+
if err != nil {
162+
return err
163+
}
164+
165+
region, err := getRegion(d, config)
166+
if err != nil {
167+
return err
168+
}
169+
170+
id := d.Id()
171+
172+
requestedState, err := resourceDataflowJobMapRequestedState(d.Get("on_delete").(string))
173+
if err != nil {
174+
return err
175+
}
176+
177+
// Retry updating the state while the job is not ready to be canceled/drained.
178+
err = resource.Retry(time.Minute*time.Duration(15), func() *resource.RetryError {
179+
// To terminate a dataflow job, we update the job with a requested
180+
// terminal state.
181+
job := &dataflow.Job{
182+
RequestedState: requestedState,
183+
}
184+
185+
_, updateErr := resourceDataflowJobUpdateJob(config, project, region, id, job)
186+
if updateErr != nil {
187+
gerr, isGoogleErr := updateErr.(*googleapi.Error)
188+
if !isGoogleErr {
189+
// If we have an error and it's not a google-specific error, we should go ahead and return.
190+
return resource.NonRetryableError(updateErr)
191+
}
192+
193+
if strings.Contains(gerr.Message, "not yet ready for canceling") {
194+
// Retry cancelling job if it's not ready.
195+
// Sleep to avoid hitting update quota with repeated attempts.
196+
time.Sleep(5 * time.Second)
197+
return resource.RetryableError(updateErr)
198+
}
199+
200+
if strings.Contains(gerr.Message, "Job has terminated") {
201+
// Job has already been terminated, skip.
202+
return nil
203+
}
204+
}
205+
206+
return nil
207+
})
208+
if err != nil {
209+
return err
210+
}
211+
212+
// Wait for state to reach terminal state (canceled/drained/done)
213+
_, ok := dataflowTerminalStatesMap[d.Get("state").(string)]
214+
for !ok {
215+
log.Printf("[DEBUG] Waiting for job with job state %q to terminate...", d.Get("state").(string))
216+
time.Sleep(5 * time.Second)
217+
218+
err = resourceDataflowFlexTemplateJobRead(d, meta)
219+
if err != nil {
220+
return fmt.Errorf("Error while reading job to see if it was properly terminated: %v", err)
221+
}
222+
_, ok = dataflowTerminalStatesMap[d.Get("state").(string)]
223+
}
224+
225+
// Only remove the job from state if it's actually successfully canceled.
226+
if _, ok := dataflowTerminalStatesMap[d.Get("state").(string)]; ok {
227+
log.Printf("[DEBUG] Removing dataflow job with final state %q", d.Get("state").(string))
228+
d.SetId("")
229+
return nil
230+
}
231+
return fmt.Errorf("Unable to cancel the dataflow job '%s' - final state was %q.", d.Id(), d.Get("state").(string))
232+
}
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package google
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/hashicorp/terraform-plugin-sdk/helper/resource"
8+
)
9+
10+
func TestAccDataflowFlexTemplateJob_basic(t *testing.T) {
11+
t.Parallel()
12+
13+
randStr := randString(t, 10)
14+
bucket := "tf-test-dataflow-gcs-" + randStr
15+
job := "tf-test-dataflow-job-" + randStr
16+
17+
vcrTest(t, resource.TestCase{
18+
PreCheck: func() { testAccPreCheck(t) },
19+
Providers: testAccProviders,
20+
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
21+
Steps: []resource.TestStep{
22+
{
23+
Config: testAccDataflowFlowFlexTemplateJob_basic(bucket, job),
24+
Check: resource.ComposeTestCheckFunc(
25+
testAccDataflowJobExists(t, "google_dataflow_flex_template_job.big_data"),
26+
),
27+
},
28+
},
29+
})
30+
}
31+
32+
// note: this config creates a job that doesn't actually do anything
33+
func testAccDataflowFlowFlexTemplateJob_basic(bucket, job string) string {
34+
return fmt.Sprintf(`
35+
resource "google_storage_bucket" "temp" {
36+
name = "%s"
37+
force_destroy = true
38+
}
39+
40+
resource "google_storage_bucket_object" "flex_template" {
41+
name = "flex_template.json"
42+
bucket = google_storage_bucket.temp.name
43+
content = <<EOF
44+
{
45+
"image": "my-image",
46+
"metadata": {
47+
"description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
48+
"name": "Streaming Beam SQL",
49+
"parameters": [
50+
{
51+
"helpText": "Pub/Sub subscription to read from.",
52+
"label": "Pub/Sub input subscription.",
53+
"name": "inputSubscription",
54+
"regexes": [
55+
"[-_.a-zA-Z0-9]+"
56+
]
57+
},
58+
{
59+
"helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
60+
"is_optional": true,
61+
"label": "BigQuery output table",
62+
"name": "outputTable",
63+
"regexes": [
64+
"[^:]+:[^.]+[.].+"
65+
]
66+
}
67+
]
68+
},
69+
"sdkInfo": {
70+
"language": "JAVA"
71+
}
72+
}
73+
EOF
74+
}
75+
76+
resource "google_dataflow_flex_template_job" "big_data" {
77+
name = "%s"
78+
container_spec_gcs_path = "${google_storage_bucket.temp.url}/${google_storage_bucket_object.flex_template.name}"
79+
on_delete = "cancel"
80+
parameters = {
81+
inputSubscription = "my-subscription"
82+
outputTable = "my-project:my-dataset.my-table"
83+
}
84+
}
85+
`, bucket, job)
86+
}

0 commit comments

Comments
 (0)