Skip to content

Commit 1b01c0a

Browse files
Improve performance when writing large batches (#127)
1 parent 982fb52 commit 1b01c0a

File tree

4 files changed

+10
-3
lines changed

4 files changed

+10
-3
lines changed

.helm/templates/crd-microsoft-synapse.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,9 @@ spec:
223223
stagingDataSettings:
224224
type: object
225225
properties:
226+
maxRowsPerFile:
227+
type: integer
228+
default: 10000
226229
dataLocation:
227230
type: string
228231
tableNamePrefix:

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ lazy val plugin = (project in file("."))
2525
name := "arcane-stream-microsoft-synapse-link",
2626
idePackagePrefix := Some("com.sneaksanddata.arcane.microsoft_synapse_link"),
2727

28-
libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "0.7.3",
28+
libraryDependencies += "com.sneaksanddata" % "arcane-framework_3" % "0.7.5",
2929
libraryDependencies += "io.netty" % "netty-tcnative-boringssl-static" % "2.0.65.Final",
3030

3131
// bugfix for upgrade header

src/main/scala/models/app/MicrosoftSynapseLinkStreamContext.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ case class MicrosoftSynapseLinkStreamContext(spec: StreamSpec) extends StreamCon
129129
override val baseLocation: String = spec.sourceSettings.baseLocation
130130
override val changeCaptureIntervalSeconds: Int = spec.sourceSettings.changeCaptureIntervalSeconds
131131

132+
override val maxRowsPerFile: Option[Int] = Some(spec.maxRowsPerFile)
133+
132134
private def parseBackfillStartDate(str: String): Option[OffsetDateTime] =
133135
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH.mm.ss'Z'").withZone(ZoneOffset.UTC)
134136
Try(OffsetDateTime.parse(str, formatter)) match

src/main/scala/models/app/contracts/StreamSpec.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ case class CatalogSettings(namespace: String,
1616
/**
1717
* The configuration of staging data.
1818
*/
19-
case class StagingDataSettings(tableNamePrefix: String, catalog: CatalogSettings, dataLocation: Option[String] = None) derives ReadWriter
19+
case class StagingDataSettings(tableNamePrefix: String, catalog: CatalogSettings, maxRowsPerFile: Int, dataLocation: Option[String] = None) derives ReadWriter
2020

2121
/**
2222
* The configuration of Iceberg sink.
@@ -74,11 +74,13 @@ case class FieldSelectionRuleSpec(ruleType: String, fields: Array[String]) deriv
7474
* @param lookBackInterval The look back interval in seconds
7575
*/
7676
case class StreamSpec(sourceSettings: SourceSettings,
77+
78+
// Staging settings
79+
maxRowsPerFile: Int,
7780

7881
// Grouping settings
7982
rowsPerGroup: Int,
8083
groupingIntervalSeconds: Int,
81-
groupsPerFile: Int,
8284
lookBackInterval: Int,
8385

8486
// Iceberg settings

0 commit comments

Comments
 (0)