Skip to content

Commit 9cadd5f

Browse files
Revert "Prevent Dataflow options in parameters" (#11485) (#8006)
[upstream:8a8e61c4c4de536a721beeb97cca16229504a297] Signed-off-by: Modular Magician <[email protected]>
1 parent 09793e3 commit 9cadd5f

File tree

3 files changed

+55
-56
lines changed

3 files changed

+55
-56
lines changed

.changelog/11485.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
```release-note:enhancement
2+
dataflow: made provider return more descriptive errors when the `parameters` field of `google_dataflow_flex_template_job` contains Dataflow options (revert)
3+
```

google-beta/services/dataflow/resource_dataflow_flex_template_job.go

Lines changed: 42 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -311,23 +311,38 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
311311
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {
312312

313313
updatedParameters := tpgresource.ExpandStringMap(d, "parameters")
314-
if err := hasIllegalParametersErr(d); err != nil {
315-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
316-
}
317314

318315
additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))
319316

320317
var autoscalingAlgorithm string
321318
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)
322319

323-
numWorkers, err := parseInt64("num_workers", d)
324-
if err != nil {
325-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
320+
var numWorkers int
321+
if p, ok := d.GetOk("parameters.numWorkers"); ok {
322+
number, err := strconv.Atoi(p.(string))
323+
if err != nil {
324+
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
325+
}
326+
delete(updatedParameters, "numWorkers")
327+
numWorkers = number
328+
} else {
329+
if v, ok := d.GetOk("num_workers"); ok {
330+
numWorkers = v.(int)
331+
}
326332
}
327333

328-
maxNumWorkers, err := parseInt64("max_workers", d)
329-
if err != nil {
330-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
334+
var maxNumWorkers int
335+
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
336+
number, err := strconv.Atoi(p.(string))
337+
if err != nil {
338+
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
339+
}
340+
delete(updatedParameters, "maxNumWorkers")
341+
maxNumWorkers = number
342+
} else {
343+
if v, ok := d.GetOk("max_workers"); ok {
344+
maxNumWorkers = v.(int)
345+
}
331346
}
332347

333348
network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)
@@ -346,9 +361,22 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
346361

347362
ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)
348363

349-
enableStreamingEngine, err := parseBool("enable_streaming_engine", d)
350-
if err != nil {
351-
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, err
364+
var enableStreamingEngine bool
365+
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
366+
delete(updatedParameters, "enableStreamingEngine")
367+
e := strings.ToLower(p.(string))
368+
switch e {
369+
case "true":
370+
enableStreamingEngine = true
371+
case "false":
372+
enableStreamingEngine = false
373+
default:
374+
return dataflow.FlexTemplateRuntimeEnvironment{}, nil, fmt.Errorf("error when handling parameters.enableStreamingEngine value: expected value to be true or false but got value `%s`", e)
375+
}
376+
} else {
377+
if v, ok := d.GetOk("enable_streaming_engine"); ok {
378+
enableStreamingEngine = v.(bool)
379+
}
352380
}
353381

354382
sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)
@@ -358,8 +386,8 @@ func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_t
358386
env := dataflow.FlexTemplateRuntimeEnvironment{
359387
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "effective_labels"),
360388
AutoscalingAlgorithm: autoscalingAlgorithm,
361-
NumWorkers: numWorkers,
362-
MaxWorkers: maxNumWorkers,
389+
NumWorkers: int64(numWorkers),
390+
MaxWorkers: int64(maxNumWorkers),
363391
Network: network,
364392
ServiceAccountEmail: serviceAccountEmail,
365393
Subnetwork: subnetwork,
@@ -812,43 +840,3 @@ func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.Resourc
812840
}
813841
return nil
814842
}
815-
816-
func hasIllegalParametersErr(d *schema.ResourceData) error {
817-
pKey := "parameters"
818-
errFmt := "%s must not include Dataflow options, found: %s"
819-
for k := range ResourceDataflowFlexTemplateJob().Schema {
820-
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, k)); notOk {
821-
return fmt.Errorf(errFmt, pKey, k)
822-
}
823-
kk := tpgresource.SnakeToPascalCase(k)
824-
kk = strings.ToLower(kk)
825-
if _, notOk := d.GetOk(fmt.Sprintf("%s.%s", pKey, kk)); notOk {
826-
return fmt.Errorf(errFmt, pKey, kk)
827-
}
828-
}
829-
return nil
830-
}
831-
832-
func parseInt64(name string, d *schema.ResourceData) (int64, error) {
833-
v, ok := d.GetOk(name)
834-
if !ok {
835-
return 0, nil
836-
}
837-
vv, err := strconv.ParseInt(fmt.Sprint(v), 10, 64)
838-
if err != nil {
839-
return 0, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
840-
}
841-
return vv, nil
842-
}
843-
844-
func parseBool(name string, d *schema.ResourceData) (bool, error) {
845-
v, ok := d.GetOk(name)
846-
if !ok {
847-
return false, nil
848-
}
849-
vv, err := strconv.ParseBool(fmt.Sprint(v))
850-
if err != nil {
851-
return false, fmt.Errorf("illegal value assigned to %s, got: %s", name, v)
852-
}
853-
return vv, nil
854-
}

google-beta/services/dataflow/resource_dataflow_flex_template_job_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,12 +582,20 @@ func TestAccDataflowFlexTemplateJob_enableStreamingEngine(t *testing.T) {
582582
CheckDestroy: testAccCheckDataflowJobDestroyProducer(t),
583583
Steps: []resource.TestStep{
584584
{
585-
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
586-
ExpectError: regexp.MustCompile("must not include Dataflow options"),
585+
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_param(job, bucket, topic),
586+
Check: resource.ComposeTestCheckFunc(
587+
// Is set
588+
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine", "true"),
589+
// Is not set
590+
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine"),
591+
),
587592
},
588593
{
589594
Config: testAccDataflowFlexTemplateJob_enableStreamingEngine_field(job, bucket, topic),
590595
Check: resource.ComposeTestCheckFunc(
596+
// Now is unset
597+
resource.TestCheckNoResourceAttr("google_dataflow_flex_template_job.flex_job", "parameters.enableStreamingEngine"),
598+
// Now is set
591599
resource.TestCheckResourceAttr("google_dataflow_flex_template_job.flex_job", "enable_streaming_engine", "true"),
592600
),
593601
},

0 commit comments

Comments
 (0)