Skip to content

Commit cc35eb3

Browse files
Support Analyze and batch duration metrics (#162)
1 parent 77566c3 commit cc35eb3

File tree

4 files changed

+13
-3
lines changed

4 files changed

+13
-3
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ lazy val plugin = (project in file("."))
2525
name := "arcane-stream-microsoft-synapse-link",
2626
idePackagePrefix := Some("com.sneaksanddata.arcane.microsoft_synapse_link"),
2727

28-
libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "1.1.9",
28+
libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "1.2.0",
2929
libraryDependencies += "io.netty" % "netty-tcnative-boringssl-static" % "2.0.65.Final",
3030

3131
// bugfix for upgrade header

src/main/scala/main.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.sneaksanddata.arcane.microsoft_synapse_link
33
import models.app.{AzureConnectionSettings, GraphExecutionSettings, MicrosoftSynapseLinkStreamContext}
44

55
import com.azure.storage.common.StorageSharedKeyCredential
6-
import com.sneaksanddata.arcane.framework.excpetions.StreamFailException
6+
import com.sneaksanddata.arcane.framework.exceptions.StreamFailException
77
import com.sneaksanddata.arcane.framework.logging.ZIOLogAnnotations.zlog
88
import com.sneaksanddata.arcane.framework.models.app.StreamContext
99
import com.sneaksanddata.arcane.framework.models.settings.{GroupingSettings, VersionedDataGraphBuilderSettings}

src/main/scala/models/app/MicrosoftSynapseLinkStreamContext.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,13 @@ case class MicrosoftSynapseLinkStreamContext(spec: StreamSpec)
9595
}
9696
)
9797

98+
override val targetAnalyzeSettings: Option[AnalyzeSettings] = Some(
99+
new AnalyzeSettings {
100+
override val batchThreshold: Int = spec.sinkSettings.analyzeSettings.batchThreshold
101+
override val includedColumns: Seq[String] = spec.sinkSettings.analyzeSettings.includedColumns
102+
}
103+
)
104+
98105
override val endpoint: String = sys.env("ARCANE_FRAMEWORK__STORAGE_ENDPOINT")
99106
override val container: String = sys.env("ARCANE_FRAMEWORK__STORAGE_CONTAINER")
100107
override val account: String = sys.env("ARCANE_FRAMEWORK__STORAGE_ACCOUNT")

src/main/scala/models/app/contracts/StreamSpec.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,16 @@ case class SnapshotExpirationSettingsSpec(batchThreshold: Int, retentionThreshol
3535
*/
3636
case class OrphanFilesExpirationSettings(batchThreshold: Int, retentionThreshold: String) derives ReadWriter
3737

38+
case class AnalyzeSettings(batchThreshold: Int, includedColumns: Seq[String]) derives ReadWriter
39+
3840
/** The configuration of Iceberg sink.
3941
*/
4042
case class SinkSettings(
4143
targetTableName: String,
4244
optimizeSettings: OptimizeSettingsSpec,
4345
snapshotExpirationSettings: SnapshotExpirationSettingsSpec,
44-
orphanFilesExpirationSettings: OrphanFilesExpirationSettings
46+
orphanFilesExpirationSettings: OrphanFilesExpirationSettings,
47+
analyzeSettings: AnalyzeSettings
4548
) derives ReadWriter
4649

4750
/** The configuration of the stream source.

0 commit comments

Comments
 (0)