Skip to content

Commit 79bdc75

Browse files
Prepare for 2.0 release (#168)
1 parent ea20a95 commit 79bdc75

File tree

8 files changed

+73
-33
lines changed

8 files changed

+73
-33
lines changed

.helm/templates/crd-microsoft-synapse.yaml

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ spec:
1313
shortNames:
1414
- mssynapsestream
1515
versions:
16-
- name: v1beta1
16+
- name: v1
1717
served: true
1818
storage: true
1919
additionalPrinterColumns:
@@ -23,7 +23,7 @@ spec:
2323
- name: Entity
2424
type: string
2525
jsonPath: .spec.sourceSettings.name
26-
- name: Refresh Interval
26+
- name: Change Capture Interval
2727
type: string
2828
jsonPath: .spec.sourceSettings.changeCaptureIntervalSeconds
2929
- name: Sink location
@@ -78,10 +78,10 @@ spec:
7878
type: string
7979
baseLocation:
8080
type: string
81-
description: Location root for Synapse Link data (Dataverse container)
81+
description: Location root for Synapse Link data (Dataverse container), in a format abfss://account@container.dfs.core.windows.net/
8282
changeCaptureIntervalSeconds:
8383
type: integer
84-
description: How long to wait before polling for next result set. Can be from 1 to 1 hour.
84+
description: How long to wait before polling for next change set. Accepted range is between 1s and 3600s
8585
minimum: 1
8686
maximum: 3600
8787
connectionStringRef:
@@ -93,7 +93,7 @@ spec:
9393
type: string
9494
jobTemplateRef:
9595
description: |
96-
Name of the job template to be used for the streaming job if stream is running in normal mode.
96+
Name of the job template to be used for the application in streaming mode.
9797
type: object
9898
properties:
9999
name:
@@ -104,7 +104,7 @@ spec:
104104
type: string
105105
backfillJobTemplateRef:
106106
description: |
107-
Name of the job template to be used for the streaming job if stream is running in the backfill mode.
107+
Name of the job template to be used for the for the application in backfill mode.
108108
type: object
109109
properties:
110110
name:
@@ -121,42 +121,42 @@ spec:
121121
description: Max retry delay on blob reads for the http client.
122122
rowsPerGroup:
123123
type: integer
124-
description: Number of rows per parquet rowgroup.
124+
description: Maximum number of rows to be grouped together for the staging process to consume.
125125
groupingIntervalSeconds:
126126
type: integer
127-
description: Max time to wait for rowsPerGroup to accumulate. Can be from 1 to 60 seconds.
127+
description: Max time to wait for rowsPerGroup to accumulate and then proceed to staging. Can be from 1 to 60 seconds.
128128
minimum: 1
129129
maximum: 60
130130
sinkSettings:
131131
type: object
132132
properties:
133133
optimizeSettings:
134134
type: object
135-
description: Optimization settings for Iceberg tables.
135+
description: Configuration for target table optimize (data file aggregation into bigger files)
136136
properties:
137137
batchThreshold:
138138
type: integer
139139
default: 60
140-
description: Number of batches to accumulate before running the optimization query.
140+
description: Number of batches to accumulate before triggering the OPTIMIZE
141141
fileSizeThreshold:
142142
type: string
143143
default: 100MB
144-
description: File size to target for the optimization query.
144+
description: File size to target when running OPTIMIZE
145145
default:
146146
batchThreshold: 60
147147
fileSizeThreshold: 100MB
148148
snapshotExpirationSettings:
149149
type: object
150-
description: Expiration query configuration for Iceberg tables.
150+
description: Configuration for EXPIRE SNAPSHOTS (table transaction log cutoff)
151151
properties:
152152
batchThreshold:
153153
type: integer
154154
default: 60
155-
description: Number of batches to accumulate before running the snapshot expiration query.
155+
description: Number of batches to accumulate triggering EXPIRE SNAPSHOTS
156156
retentionThreshold:
157157
type: string
158158
default: 6h
159-
description: File retention period.
159+
description: Maximum age of records in the transaction log to keep
160160
default:
161161
batchThreshold: 60
162162
retentionThreshold: 6h
@@ -167,17 +167,17 @@ spec:
167167
batchThreshold:
168168
type: integer
169169
default: 60
170-
description: Number of batches to accumulate before running the expire orphan files query.
170+
description: Configuration for EXPIRE ORPHAN FILES (cleanup of files no longer referenced by Iceberg snapshots)
171171
retentionThreshold:
172172
type: string
173173
default: 6h
174-
description: File retention period.
174+
description: Number of batches to accumulate before triggering EXPIRE ORPHAN FILES
175175
default:
176176
batchThreshold: 60
177177
retentionThreshold: 6h
178178
analyzeSettings:
179179
type: object
180-
description: Settings for running ANALYZE on target.
180+
description: Configuration for ANALYZE (full refresh of extended statistics on the target)
181181
properties:
182182
batchThreshold:
183183
type: integer
@@ -195,9 +195,21 @@ spec:
195195
targetTableName:
196196
type: string
197197
description: Name for the target Iceberg table.
198+
sinkCatalogSettings:
199+
type: object
200+
description: Connection settings for Iceberg REST Catalog for the sink (target). This is used by watermarking process.
201+
properties:
202+
namespace:
203+
type: string
204+
warehouse:
205+
type: string
206+
catalogUri:
207+
type: string
198208
lookBackInterval:
199209
type: integer
200210
description: |
211+
DEPRECATED - TO BE REMOVED IN 2.1 RELEASE. DO NOT USE THIS SETTING!
212+
201213
Number of seconds to look back when determining first set of changes to extract.
202214
Can be set in interval from 1 second to 10 hours. Default is 1 hour.
203215
minimum: 1
@@ -206,15 +218,19 @@ spec:
206218
stagingDataSettings:
207219
type: object
208220
properties:
221+
dataLocation:
222+
type: string
223+
description: Option data location override. Only use this setting for debugging and never in production environments.
209224
maxRowsPerFile:
210225
type: integer
226+
description: The maximum number of rows per each data file in the staging table.
211227
default: 10000
212-
dataLocation:
213-
type: string
214228
tableNamePrefix:
215229
type: string
230+
description: Prefix for staging tables created by Arcane. Must be UNIQUE in the WAREHOUSE scope.
216231
catalog:
217232
type: object
233+
description: Settings for Iceberg REST Catalog used for staging tables
218234
properties:
219235
catalogName:
220236
type: string
@@ -233,6 +249,7 @@ spec:
233249
overwrite
234250
fieldSelectionRule:
235251
type: object
252+
description: INCLUDE will only use fields provided. You must specify mandatory fields like ARCANE_MERGE_KEY as well. EXCLUDE will exclude provided fields instead. ALL (default) will use all fields without filters.
236253
properties:
237254
ruleType:
238255
type: string

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ lazy val plugin = (project in file("."))
2525
name := "arcane-stream-microsoft-synapse-link",
2626
idePackagePrefix := Some("com.sneaksanddata.arcane.microsoft_synapse_link"),
2727

28-
libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "1.2.4-11-g42e238e",
28+
libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "1.2.4-19-gdc3dd1f",
2929
libraryDependencies += "io.netty" % "netty-tcnative-boringssl-static" % "2.0.74.Final",
3030

3131
// bugfix for upgrade header

integration-tests.env

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,7 @@ ARCANE_FRAMEWORK__STORAGE_ACCOUNT=devstoreaccount1
1414
ARCANE_FRAMEWORK__STORAGE_CONTAINER=cdm-e2e
1515
ARCANE_FRAMEWORK__STORAGE_ENDPOINT=http://localhost:10001/devstoreaccount1
1616
ARCANE_FRAMEWORK__S3_CATALOG_AUTH_SESSION_TIMEOUT_MILLIS=2000
17-
ARCANE_FRAMEWORK__MERGE_SERVICE_CONNECTION_URI=jdbc:trino://localhost:8080/iceberg/test?user=test
17+
ARCANE_FRAMEWORK__MERGE_SERVICE_CONNECTION_URI=jdbc:trino://localhost:8080/iceberg/test?user=test
18+
ARCANE_FRAMEWORK__ICEBERG_SINK_NAMESPACE=test
19+
ARCANE_FRAMEWORK__ICEBERG_SINK_WAREHOUSE=demo
20+
ARCANE_FRAMEWORK__ICEBERG_SINK_CATALOG_URI=http://localhost:20001/catalog

src/main/scala/main.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import com.sneaksanddata.arcane.framework.services.filters.{
2222
FieldsFilteringService,
2323
FieldsFilteringService as FrameworkFieldsFilteringService
2424
}
25-
import com.sneaksanddata.arcane.framework.services.iceberg.IcebergS3CatalogWriter
25+
import com.sneaksanddata.arcane.framework.services.iceberg.{IcebergS3CatalogWriter, IcebergTablePropertyManager}
2626
import com.sneaksanddata.arcane.framework.services.metrics.{ArcaneDimensionsProvider, DataDog, DeclaredMetrics}
2727
import com.sneaksanddata.arcane.framework.services.storage.services.azure.AzureBlobStorageReader
2828
import com.sneaksanddata.arcane.framework.services.streaming.data_providers.backfill.{
@@ -101,7 +101,8 @@ object main extends ZIOAppDefault {
101101
ArcaneDimensionsProvider.layer,
102102
DataDog.UdsPublisher.layer,
103103
WatermarkProcessor.layer,
104-
BackfillOverwriteWatermarkProcessor.layer
104+
BackfillOverwriteWatermarkProcessor.layer,
105+
IcebergTablePropertyManager.layer
105106
)
106107

107108
@main

src/main/scala/models/app/MicrosoftSynapseLinkStreamContext.scala

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,19 @@ trait ParallelismSettings:
3030
trait GraphExecutionSettings:
3131
val sourceDeleteDryRun: Boolean
3232

33-
/** The context for the SQL Server Change Tracking stream.
33+
/** The context for the Synapse Link stream.
3434
*
3535
* @param spec
3636
* The stream specification
3737
*/
3838
case class MicrosoftSynapseLinkStreamContext(spec: StreamSpec)
3939
extends StreamContext
4040
with GroupingSettings
41-
with IcebergCatalogSettings
41+
with IcebergStagingSettings
4242
with JdbcMergeServiceClientSettings
4343
with VersionedDataGraphBuilderSettings
4444
with AzureConnectionSettings
45-
with TargetTableSettings
45+
with SinkSettings
4646
with ParallelismSettings
4747
with TablePropertiesSettings
4848
with FieldSelectionRuleSettings
@@ -65,8 +65,8 @@ case class MicrosoftSynapseLinkStreamContext(spec: StreamSpec)
6565
override val stagingLocation: Option[String] = spec.stagingDataSettings.dataLocation
6666

6767
override val additionalProperties: Map[String, String] = sys.env.get("ARCANE_FRAMEWORK__CATALOG_NO_AUTH") match
68-
case Some(_) => Map()
69-
case None => IcebergCatalogCredential.oAuth2Properties
68+
case Some(_) => S3CatalogFileIO.properties
69+
case None => S3CatalogFileIO.properties ++ IcebergCatalogCredential.oAuth2Properties
7070

7171
override val s3CatalogFileIO: S3CatalogFileIO = S3CatalogFileIO
7272

@@ -169,6 +169,17 @@ case class MicrosoftSynapseLinkStreamContext(spec: StreamSpec)
169169
val metricsPublisherInterval: Duration = Duration.ofMillis(
170170
sys.env.getOrElse("ARCANE_FRAMEWORK__METRICS_PUBLISHER_INTERVAL_MILLIS", "100").toInt
171171
)
172+
override val icebergSinkSettings: IcebergSinkSettings = new IcebergSinkSettings {
173+
override val namespace: String =
174+
sys.env("ARCANE_FRAMEWORK__ICEBERG_SINK_NAMESPACE") // spec.sinkSettings.sinkCatalogSettings.namespace.getOrElse()
175+
override val warehouse: String =
176+
sys.env("ARCANE_FRAMEWORK__ICEBERG_SINK_WAREHOUSE") // spec.sinkSettings.sinkCatalogSettings.warehouse.getOrElse()
177+
override val catalogUri: String =
178+
sys.env(
179+
"ARCANE_FRAMEWORK__ICEBERG_SINK_CATALOG_URI"
180+
) // spec.sinkSettings.sinkCatalogSettings.catalogUri.getOrElse()
181+
override val additionalProperties: Map[String, String] = IcebergCatalogCredential.oAuth2Properties
182+
}
172183

173184
given Conversion[MicrosoftSynapseLinkStreamContext, DatagramSocketConfig] with
174185
def apply(context: MicrosoftSynapseLinkStreamContext): DatagramSocketConfig =
@@ -180,8 +191,8 @@ given Conversion[MicrosoftSynapseLinkStreamContext, MetricsConfig] with
180191

181192
object MicrosoftSynapseLinkStreamContext {
182193

183-
type Environment = StreamContext & GroupingSettings & VersionedDataGraphBuilderSettings & IcebergCatalogSettings &
184-
JdbcMergeServiceClientSettings & AzureConnectionSettings & TargetTableSettings & MicrosoftSynapseLinkStreamContext &
194+
type Environment = StreamContext & GroupingSettings & VersionedDataGraphBuilderSettings & IcebergStagingSettings &
195+
JdbcMergeServiceClientSettings & AzureConnectionSettings & SinkSettings & MicrosoftSynapseLinkStreamContext &
185196
GraphExecutionSettings & TablePropertiesSettings & FieldSelectionRuleSettings & BackfillSettings &
186197
StagingDataSettings & SynapseSourceSettings & SourceBufferingSettings & MetricsConfig & DatagramSocketConfig &
187198
DatadogPublisherConfig

src/main/scala/models/app/contracts/StreamSpec.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,21 @@ case class OrphanFilesExpirationSettings(batchThreshold: Int, retentionThreshold
3636

3737
case class AnalyzeSettings(batchThreshold: Int, includedColumns: Seq[String]) derives ReadWriter
3838

39+
case class IcebergSinkSettings(
40+
namespace: Option[String] = None,
41+
warehouse: Option[String] = None,
42+
catalogUri: Option[String] = None
43+
) derives ReadWriter
44+
3945
/** The configuration of Iceberg sink.
4046
*/
4147
case class SinkSettings(
4248
targetTableName: String,
4349
optimizeSettings: OptimizeSettingsSpec,
4450
snapshotExpirationSettings: SnapshotExpirationSettingsSpec,
4551
orphanFilesExpirationSettings: OrphanFilesExpirationSettings,
46-
analyzeSettings: AnalyzeSettings
52+
analyzeSettings: AnalyzeSettings,
53+
sinkCatalogSettings: Option[IcebergSinkSettings] = None
4754
) derives ReadWriter
4855

4956
/** The configuration of the stream source.

src/test/scala/common/Common.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import com.sneaksanddata.arcane.framework.services.app.{GenericStreamRunnerServi
1010
import com.sneaksanddata.arcane.framework.services.app.base.{InterruptionToken, StreamLifetimeService}
1111
import com.sneaksanddata.arcane.framework.services.caching.schema_cache.MutableSchemaCache
1212
import com.sneaksanddata.arcane.framework.services.filters.{ColumnSummaryFieldsFilteringService, FieldsFilteringService}
13-
import com.sneaksanddata.arcane.framework.services.iceberg.IcebergS3CatalogWriter
13+
import com.sneaksanddata.arcane.framework.services.iceberg.{IcebergS3CatalogWriter, IcebergTablePropertyManager}
1414
import com.sneaksanddata.arcane.framework.services.merging.JdbcMergeServiceClient
1515
import com.sneaksanddata.arcane.framework.services.metrics.{ArcaneDimensionsProvider, DataDog, DeclaredMetrics}
1616
import com.sneaksanddata.arcane.framework.services.storage.services.azure.AzureBlobStorageReader
@@ -102,7 +102,8 @@ object Common:
102102
ArcaneDimensionsProvider.layer,
103103
DataDog.UdsPublisher.layer,
104104
WatermarkProcessor.layer,
105-
BackfillOverwriteWatermarkProcessor.layer
105+
BackfillOverwriteWatermarkProcessor.layer,
106+
IcebergTablePropertyManager.layer
106107
)
107108

108109
/** Gets the data from the *target* table. Using the connection string provided in the

unit-tests.env

Whitespace-only changes.

0 commit comments

Comments
 (0)