Skip to content

Commit 845b513

Browse files
Matt Lukasjotruon
authored andcommitted
Added - Support for structured streaming for Data Flow
1 parent 8e8d3dc commit 845b513

15 files changed

+56
-0
lines changed

examples/dataflow/main.tf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ resource "oci_dataflow_application" "tf_application" {
9191
#description = var.application_description
9292
#freeform_tags = var.application_freeform_tags
9393
#logs_bucket_uri = var.application_logs_bucket_uri}"
94+
type = "BATCH"
9495
archive_uri = var.application_archive_uri
9596
#parameters {
9697
#Required

internal/integrationtest/dataflow_application_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ var (
6464
"metastore_id": acctest.Representation{RepType: acctest.Optional, Create: `${var.metastore_id}`},
6565
"parameters": acctest.RepresentationGroup{RepType: acctest.Optional, Group: applicationParametersRepresentation},
6666
"private_endpoint_id": acctest.Representation{RepType: acctest.Optional, Create: `${oci_dataflow_private_endpoint.test_private_endpoint.id}`},
67+
"type": acctest.Representation{RepType: acctest.Optional, Create: `BATCH`},
6768
"warehouse_bucket_uri": acctest.Representation{RepType: acctest.Optional, Create: `${var.dataflow_warehouse_bucket_uri}`},
6869
}
6970
applicationParametersRepresentation = map[string]interface{}{
@@ -169,6 +170,7 @@ func TestDataflowApplicationResource_basic(t *testing.T) {
169170
resource.TestCheckResourceAttrSet(resourceName, "state"),
170171
resource.TestCheckResourceAttrSet(resourceName, "time_created"),
171172
resource.TestCheckResourceAttrSet(resourceName, "time_updated"),
173+
resource.TestCheckResourceAttr(resourceName, "type", "BATCH"),
172174
resource.TestCheckResourceAttr(resourceName, "warehouse_bucket_uri", warehouseBucketUri),
173175

174176
func(s *terraform.State) (err error) {
@@ -215,6 +217,7 @@ func TestDataflowApplicationResource_basic(t *testing.T) {
215217
resource.TestCheckResourceAttrSet(resourceName, "state"),
216218
resource.TestCheckResourceAttrSet(resourceName, "time_created"),
217219
resource.TestCheckResourceAttrSet(resourceName, "time_updated"),
220+
resource.TestCheckResourceAttr(resourceName, "type", "BATCH"),
218221
resource.TestCheckResourceAttrSet(resourceName, "warehouse_bucket_uri"),
219222

220223
func(s *terraform.State) (err error) {
@@ -259,6 +262,7 @@ func TestDataflowApplicationResource_basic(t *testing.T) {
259262
resource.TestCheckResourceAttrSet(resourceName, "state"),
260263
resource.TestCheckResourceAttrSet(resourceName, "time_created"),
261264
resource.TestCheckResourceAttrSet(resourceName, "time_updated"),
265+
resource.TestCheckResourceAttr(resourceName, "type", "BATCH"),
262266
resource.TestCheckResourceAttr(resourceName, "warehouse_bucket_uri", warehouseBucketUri),
263267
resource.TestCheckResourceAttr(resourceName, "class_name", classNameUpdated),
264268

@@ -294,6 +298,7 @@ func TestDataflowApplicationResource_basic(t *testing.T) {
294298
resource.TestCheckResourceAttrSet(datasourceName, "applications.0.state"),
295299
resource.TestCheckResourceAttrSet(datasourceName, "applications.0.time_created"),
296300
resource.TestCheckResourceAttrSet(datasourceName, "applications.0.time_updated"),
301+
resource.TestCheckResourceAttr(datasourceName, "applications.0.type", "BATCH"),
297302
),
298303
},
299304
// verify singular datasource
@@ -330,6 +335,7 @@ func TestDataflowApplicationResource_basic(t *testing.T) {
330335
resource.TestCheckResourceAttrSet(singularDatasourceName, "state"),
331336
resource.TestCheckResourceAttrSet(singularDatasourceName, "time_created"),
332337
resource.TestCheckResourceAttrSet(singularDatasourceName, "time_updated"),
338+
resource.TestCheckResourceAttr(singularDatasourceName, "type", "BATCH"),
333339
resource.TestCheckResourceAttr(singularDatasourceName, "warehouse_bucket_uri", warehouseBucketUri),
334340
),
335341
},

internal/integrationtest/dataflow_invoke_run_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ var (
5959
"metastore_id": acctest.Representation{RepType: acctest.Optional, Create: `${var.metastore_id}`},
6060
"num_executors": acctest.Representation{RepType: acctest.Optional, Create: `1`},
6161
"parameters": acctest.RepresentationGroup{RepType: acctest.Optional, Group: invokeRunParametersRepresentation},
62+
"spark_version": acctest.Representation{RepType: acctest.Optional, Create: `sparkVersion`},
63+
"type": acctest.Representation{RepType: acctest.Optional, Create: `BATCH`},
6264
"warehouse_bucket_uri": acctest.Representation{RepType: acctest.Optional, Create: `${var.dataflow_warehouse_bucket_uri}`},
6365
}
6466
invokeRunParametersRepresentation = map[string]interface{}{
@@ -153,6 +155,7 @@ func TestDataflowInvokeRunResource_basic(t *testing.T) {
153155
resource.TestCheckResourceAttrSet(resourceName, "state"),
154156
resource.TestCheckResourceAttrSet(resourceName, "time_created"),
155157
resource.TestCheckResourceAttrSet(resourceName, "time_updated"),
158+
resource.TestCheckResourceAttr(resourceName, "type", "BATCH"),
156159
resource.TestCheckResourceAttr(resourceName, "warehouse_bucket_uri", warehouseBucketUri),
157160
resource.TestCheckResourceAttr(resourceName, "metastore_id", metastoreId),
158161

@@ -199,6 +202,7 @@ func TestDataflowInvokeRunResource_basic(t *testing.T) {
199202
resource.TestCheckResourceAttrSet(resourceName, "state"),
200203
resource.TestCheckResourceAttrSet(resourceName, "time_created"),
201204
resource.TestCheckResourceAttrSet(resourceName, "time_updated"),
205+
resource.TestCheckResourceAttr(resourceName, "type", "BATCH"),
202206
resource.TestCheckResourceAttrSet(resourceName, "warehouse_bucket_uri"),
203207

204208
func(s *terraform.State) (err error) {
@@ -237,6 +241,7 @@ func TestDataflowInvokeRunResource_basic(t *testing.T) {
237241
resource.TestCheckResourceAttrSet(resourceName, "state"),
238242
resource.TestCheckResourceAttrSet(resourceName, "time_created"),
239243
resource.TestCheckResourceAttrSet(resourceName, "time_updated"),
244+
resource.TestCheckResourceAttr(resourceName, "type", "BATCH"),
240245
resource.TestCheckResourceAttr(resourceName, "warehouse_bucket_uri", warehouseBucketUri),
241246

242247
func(s *terraform.State) (err error) {
@@ -275,6 +280,7 @@ func TestDataflowInvokeRunResource_basic(t *testing.T) {
275280
resource.TestCheckResourceAttrSet(datasourceName, "runs.0.time_created"),
276281
resource.TestCheckResourceAttrSet(datasourceName, "runs.0.time_updated"),
277282
resource.TestCheckResourceAttrSet(datasourceName, "runs.0.total_ocpu"),
283+
resource.TestCheckResourceAttr(datasourceName, "runs.0.type", "BATCH"),
278284
),
279285
},
280286
// verify singular datasource
@@ -316,6 +322,7 @@ func TestDataflowInvokeRunResource_basic(t *testing.T) {
316322
resource.TestCheckResourceAttrSet(singularDatasourceName, "time_created"),
317323
resource.TestCheckResourceAttrSet(singularDatasourceName, "time_updated"),
318324
resource.TestCheckResourceAttrSet(singularDatasourceName, "total_ocpu"),
325+
resource.TestCheckResourceAttr(singularDatasourceName, "type", "BATCH"),
319326
resource.TestCheckResourceAttr(singularDatasourceName, "warehouse_bucket_uri", warehouseBucketUri),
320327
resource.TestCheckResourceAttr(singularDatasourceName, "metastore_id", metastoreId),
321328
),

internal/service/dataflow/dataflow_application_data_source.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ func (s *DataflowApplicationDataSourceCrud) SetData() error {
158158
s.D.Set("time_updated", s.Res.TimeUpdated.String())
159159
}
160160

161+
s.D.Set("type", s.Res.Type)
162+
161163
if s.Res.WarehouseBucketUri != nil {
162164
s.D.Set("warehouse_bucket_uri", *s.Res.WarehouseBucketUri)
163165
}

internal/service/dataflow/dataflow_application_resource.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,12 @@ func DataflowApplicationResource() *schema.Resource {
144144
Optional: true,
145145
Computed: true,
146146
},
147+
"type": {
148+
Type: schema.TypeString,
149+
Optional: true,
150+
Computed: true,
151+
ForceNew: true,
152+
},
147153
"warehouse_bucket_uri": {
148154
Type: schema.TypeString,
149155
Optional: true,
@@ -362,6 +368,10 @@ func (s *DataflowApplicationResourceCrud) Create() error {
362368
request.SparkVersion = &tmp
363369
}
364370

371+
if type_, ok := s.D.GetOkExists("type"); ok {
372+
request.Type = oci_dataflow.ApplicationTypeEnum(type_.(string))
373+
}
374+
365375
if warehouseBucketUri, ok := s.D.GetOkExists("warehouse_bucket_uri"); ok {
366376
tmp := warehouseBucketUri.(string)
367377
request.WarehouseBucketUri = &tmp
@@ -646,6 +656,8 @@ func (s *DataflowApplicationResourceCrud) SetData() error {
646656
s.D.Set("time_updated", s.Res.TimeUpdated.String())
647657
}
648658

659+
s.D.Set("type", s.Res.Type)
660+
649661
if s.Res.WarehouseBucketUri != nil {
650662
s.D.Set("warehouse_bucket_uri", *s.Res.WarehouseBucketUri)
651663
}

internal/service/dataflow/dataflow_applications_data_source.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ func (s *DataflowApplicationsDataSourceCrud) SetData() error {
167167
application["time_updated"] = r.TimeUpdated.String()
168168
}
169169

170+
application["type"] = r.Type
171+
170172
resources = append(resources, application)
171173
}
172174

internal/service/dataflow/dataflow_invoke_run_data_source.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ func (s *DataflowInvokeRunDataSourceCrud) SetData() error {
195195
s.D.Set("total_ocpu", *s.Res.TotalOCpu)
196196
}
197197

198+
s.D.Set("type", s.Res.Type)
199+
198200
if s.Res.WarehouseBucketUri != nil {
199201
s.D.Set("warehouse_bucket_uri", *s.Res.WarehouseBucketUri)
200202
}

internal/service/dataflow/dataflow_invoke_run_resource.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,12 @@ func DataflowInvokeRunResource() *schema.Resource {
149149
Computed: true,
150150
ForceNew: true,
151151
},
152+
"type": {
153+
Type: schema.TypeString,
154+
Optional: true,
155+
Computed: true,
156+
ForceNew: true,
157+
},
152158
"warehouse_bucket_uri": {
153159
Type: schema.TypeString,
154160
Optional: true,
@@ -446,6 +452,10 @@ func (s *DataflowInvokeRunResourceCrud) Create() error {
446452
request.SparkVersion = &tmp
447453
}
448454

455+
if type_, ok := s.D.GetOkExists("type"); ok {
456+
request.Type = oci_dataflow.ApplicationTypeEnum(type_.(string))
457+
}
458+
449459
if warehouseBucketUri, ok := s.D.GetOkExists("warehouse_bucket_uri"); ok {
450460
tmp := warehouseBucketUri.(string)
451461
request.WarehouseBucketUri = &tmp
@@ -673,6 +683,8 @@ func (s *DataflowInvokeRunResourceCrud) SetData() error {
673683
s.D.Set("total_ocpu", *s.Res.TotalOCpu)
674684
}
675685

686+
s.D.Set("type", s.Res.Type)
687+
676688
if s.Res.WarehouseBucketUri != nil {
677689
s.D.Set("warehouse_bucket_uri", *s.Res.WarehouseBucketUri)
678690
}

internal/service/dataflow/dataflow_invoke_runs_data_source.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ func (s *DataflowInvokeRunsDataSourceCrud) SetData() error {
214214
invokeRun["total_ocpu"] = *r.TotalOCpu
215215
}
216216

217+
invokeRun["type"] = r.Type
218+
217219
resources = append(resources, invokeRun)
218220
}
219221

website/docs/d/dataflow_application.html.markdown

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,6 @@ The following attributes are exported:
6161
* `state` - The current state of this application.
6262
* `time_created` - The date and time a application was created, expressed in [RFC 3339](https://tools.ietf.org/html/rfc3339) timestamp format. Example: `2018-04-03T21:10:29.600Z`
6363
* `time_updated` - The date and time a application was updated, expressed in [RFC 3339](https://tools.ietf.org/html/rfc3339) timestamp format. Example: `2018-04-03T21:10:29.600Z`
64+
* `type` - The Spark application processing type.
6465
* `warehouse_bucket_uri` - An Oracle Cloud Infrastructure URI of the bucket to be used as default warehouse directory for BATCH SQL runs. See https://docs.cloud.oracle.com/iaas/Content/API/SDKDocs/hdfsconnector.htm#uriformat.
6566

0 commit comments

Comments
 (0)