Skip to content

Commit 346bb50

Browse files
Cleanup YAML from beta testing (#174)
1 parent 04c2387 commit 346bb50

File tree

4 files changed

+14
-19
lines changed

4 files changed

+14
-19
lines changed

integration-tests.env

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
STREAMCONTEXT__SPEC='{ "backfillJobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-large-job" }, "groupingIntervalSeconds": 1, "httpClientMaxRetries": 3, "httpClientRetryDelaySeconds": 1, "jobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-standard-job" }, "lookBackInterval": 21000, "tableProperties": { "partitionExpressions": [], "format": "PARQUET", "sortedBy": [], "parquetBloomFilterColumns": [] }, "rowsPerGroup": 10000, "sinkSettings": { "archiveTableName": "iceberg.test.archive_test", "optimizeSettings": { "batchThreshold": 60, "fileSizeThreshold": "512MB" }, "orphanFilesExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "snapshotExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "analyzeSettings" : { "batchThreshold": 60, "includedColumns": [] }, "targetTableName": "iceberg.test.test" }, "sourceSettings": { "baseLocation": "abfss://cdm-e2e@devstoreaccount1.dfs.core.windows.net/", "name": "dimensionattributelevelvalue", "changeCaptureIntervalSeconds": 5 }, "stagingDataSettings": { "maxRowsPerFile": 10000, "catalog": { "catalogName": "iceberg", "catalogUri": "http://localhost:20001/catalog", "schemaName": "test", "warehouse": "demo" }, "tableNamePrefix": "staging_dimensionattributelevelvalue" }, "fieldSelectionRule": { "ruleType": "all", "fields": [] }, "backfillBehavior": "overwrite", "backfillStartDate": "2025-03-04T07.00.00Z" }'
1+
STREAMCONTEXT__SPEC='{ "backfillJobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-large-job" }, "groupingIntervalSeconds": 1, "httpClientMaxRetries": 3, "httpClientRetryDelaySeconds": 1, "jobTemplateRef": { "apiGroup": "streaming.sneaksanddata.com", "kind": "StreamingJobTemplate", "name": "arcane-stream-microsoft-synapse-link-standard-job" }, "lookBackInterval": 21000, "tableProperties": { "partitionExpressions": [], "format": "PARQUET", "sortedBy": [], "parquetBloomFilterColumns": [] }, "rowsPerGroup": 10000, "sinkSettings": { "archiveTableName": "iceberg.test.archive_test", "optimizeSettings": { "batchThreshold": 60, "fileSizeThreshold": "512MB" }, "orphanFilesExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "snapshotExpirationSettings": { "batchThreshold": 60, "retentionThreshold": "6h" }, "analyzeSettings" : { "batchThreshold": 60, "includedColumns": [] }, "targetTableName": "iceberg.test.test", "sinkCatalogSettings": {"namespace": "test", "warehouse": "demo", "catalogUri": "http://localhost:20001/catalog" } }, "sourceSettings": { "baseLocation": "abfss://cdm-e2e@devstoreaccount1.dfs.core.windows.net/", "name": "dimensionattributelevelvalue", "changeCaptureIntervalSeconds": 5 }, "stagingDataSettings": { "maxRowsPerFile": 10000, "catalog": { "catalogName": "iceberg", "catalogUri": "http://localhost:20001/catalog", "schemaName": "test", "warehouse": "demo" }, "tableNamePrefix": "staging_dimensionattributelevelvalue" }, "fieldSelectionRule": { "ruleType": "all", "fields": [] }, "backfillBehavior": "overwrite", "backfillStartDate": "2025-03-04T07.00.00Z" }'
22
STREAMCONTEXT__STREAM_ID=test
33
STREAMCONTEXT__STREAM_KIND=MicrosoftSynapseStream
44
ARCANE_FRAMEWORK__S3_CATALOG_ACCESS_KEY_ID=minioadmin
@@ -15,6 +15,3 @@ ARCANE_FRAMEWORK__STORAGE_CONTAINER=cdm-e2e
1515
ARCANE_FRAMEWORK__STORAGE_ENDPOINT=http://localhost:10001/devstoreaccount1
1616
ARCANE_FRAMEWORK__S3_CATALOG_AUTH_SESSION_TIMEOUT_MILLIS=2000
1717
ARCANE_FRAMEWORK__MERGE_SERVICE_CONNECTION_URI=jdbc:trino://localhost:8080/iceberg/test?user=test
18-
ARCANE_FRAMEWORK__ICEBERG_SINK_NAMESPACE=test
19-
ARCANE_FRAMEWORK__ICEBERG_SINK_WAREHOUSE=demo
20-
ARCANE_FRAMEWORK__ICEBERG_SINK_CATALOG_URI=http://localhost:20001/catalog

src/main/scala/models/app/MicrosoftSynapseLinkStreamContext.scala

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,9 @@ case class MicrosoftSynapseLinkStreamContext(spec: StreamSpec)
170170
sys.env.getOrElse("ARCANE_FRAMEWORK__METRICS_PUBLISHER_INTERVAL_MILLIS", "100").toInt
171171
)
172172
override val icebergSinkSettings: IcebergSinkSettings = new IcebergSinkSettings {
173-
override val namespace: String =
174-
sys.env("ARCANE_FRAMEWORK__ICEBERG_SINK_NAMESPACE") // spec.sinkSettings.sinkCatalogSettings.namespace.getOrElse()
175-
override val warehouse: String =
176-
sys.env("ARCANE_FRAMEWORK__ICEBERG_SINK_WAREHOUSE") // spec.sinkSettings.sinkCatalogSettings.warehouse.getOrElse()
177-
override val catalogUri: String =
178-
sys.env(
179-
"ARCANE_FRAMEWORK__ICEBERG_SINK_CATALOG_URI"
180-
) // spec.sinkSettings.sinkCatalogSettings.catalogUri.getOrElse()
173+
override val namespace: String = spec.sinkSettings.sinkCatalogSettings.namespace
174+
override val warehouse: String = spec.sinkSettings.sinkCatalogSettings.warehouse
175+
override val catalogUri: String = spec.sinkSettings.sinkCatalogSettings.catalogUri
181176
override val additionalProperties: Map[String, String] = IcebergCatalogCredential.oAuth2Properties
182177
}
183178

src/main/scala/models/app/contracts/StreamSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ case class OrphanFilesExpirationSettings(batchThreshold: Int, retentionThreshold
3737
case class AnalyzeSettings(batchThreshold: Int, includedColumns: Seq[String]) derives ReadWriter
3838

3939
case class IcebergSinkSettings(
40-
namespace: Option[String] = None,
41-
warehouse: Option[String] = None,
42-
catalogUri: Option[String] = None
40+
namespace: String,
41+
warehouse: String,
42+
catalogUri: String
4343
) derives ReadWriter
4444

4545
/** The configuration of Iceberg sink.
@@ -50,7 +50,7 @@ case class SinkSettings(
5050
snapshotExpirationSettings: SnapshotExpirationSettingsSpec,
5151
orphanFilesExpirationSettings: OrphanFilesExpirationSettings,
5252
analyzeSettings: AnalyzeSettings,
53-
sinkCatalogSettings: Option[IcebergSinkSettings] = None
53+
sinkCatalogSettings: IcebergSinkSettings
5454
) derives ReadWriter
5555

5656
/** The configuration of the stream source.

src/test/scala/integration/StreamRunner.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ object StreamRunner extends ZIOSpecDefault:
5353
| "batchThreshold": 60,
5454
| "includedColumns": []
5555
| },
56-
| "targetTableName": "$targetTableName"
56+
| "targetTableName": "$targetTableName",
57+
| "sinkCatalogSettings": {
58+
| "namespace": "test",
59+
| "warehouse": "demo",
60+
| "catalogUri": "http://localhost:20001/catalog"
61+
| }
5762
| },
5863
| "sourceSettings": {
5964
| "baseLocation": "abfss://cdm-e2e@devstoreaccount1.dfs.core.windows.net/",
@@ -138,8 +143,6 @@ object StreamRunner extends ZIOSpecDefault:
138143
)
139144

140145
streamedWatermark <- Common.getWatermark(streamingStreamContext.targetTableFullName.split('.').last)
141-
142-
// TODO: verify watermarks
143146
yield assertTrue(result.isSuccess) implies assertTrue(backfilledCount == 5) implies assertTrue(
144147
backfilledWatermark.version == s"${formatter.format(startTime.minusHours(1))}Z"
145148
) implies assertTrue(currentRows.size == 5 - 1 + 2) implies assertTrue(

0 commit comments

Comments
 (0)