Skip to content

Commit da483fd

Browse files
authored
Confugure publication for the package (#7)
1 parent 93f502d commit da483fd

10 files changed

+28
-27
lines changed

build.sbt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
val scala361 = "3.6.1"
22

3-
ThisBuild / version := "0.0.1-SNAPSHOT"
3+
ThisBuild / version := "0.0.3-SNAPSHOT"
44
ThisBuild / organization := "com.sneaksanddata"
55
ThisBuild / scalaVersion := scala361
66

@@ -71,7 +71,7 @@ lazy val root = (project in file("."))
7171
// For ZIO
7272
libraryDependencies += "dev.zio" %% "zio-logging" % "2.3.0",
7373
libraryDependencies += "dev.zio" %% "zio-logging-slf4j" % "2.3.0",
74-
74+
7575
// For DataDog
7676
libraryDependencies += "org.apache.logging.log4j" % "log4j-to-slf4j" % "2.24.3",
7777
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.5.16",

src/main/scala/services/consumers/JdbcConsumer.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type BatchApplicationResult = Boolean
3434
/**
3535
* The result of applying a batch.
3636
*/
37-
type BatchArchivationResult = ResultSet
37+
class BatchArchivationResult
3838

3939
/**
4040
* A consumer that consumes batches from a JDBC source.
@@ -46,7 +46,7 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](val options: JdbcConsumerOptio
4646

4747
implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global
4848

49-
private val logger: Logger = LoggerFactory.getLogger(classOf[BackfillDataGraphBuilder])
49+
private val logger: Logger = LoggerFactory.getLogger(this.getClass)
5050
private lazy val sqlConnection: Connection = DriverManager.getConnection(options.connectionUrl)
5151

5252
def getPartitionValues(batchName: String, partitionFields: List[String]): Future[Map[String, List[String]]] =
@@ -65,9 +65,10 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](val options: JdbcConsumerOptio
6565
sqlConnection.prepareStatement(batch.batchQuery.query).execute()
6666
}
6767

68-
def archiveBatch(batch: Batch): Future[BatchArchivationResult] =
69-
Future(sqlConnection.prepareStatement(batch.archiveExpr).executeQuery())
70-
.flatMap(_ => Future(sqlConnection.prepareStatement(s"DROP TABLE ${batch.name}").executeQuery()))
68+
def archiveBatch(batch: Batch, archiveTableName: String): Future[BatchArchivationResult] =
69+
Future(sqlConnection.prepareStatement(batch.archiveExpr(archiveTableName)).execute())
70+
.flatMap(_ => Future(sqlConnection.prepareStatement(s"DROP TABLE ${batch.name}").execute()))
71+
.map(_ => new BatchArchivationResult)
7172

7273
def close(): Unit = sqlConnection.close()
7374

src/main/scala/services/consumers/SqlServerChangeTracking.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class SqlServerChangeTrackingBackfillBatch(batchName: String, batchSchema: Arcan
4343

4444
override val batchQuery: OverwriteQuery = SqlServerChangeTrackingBackfillQuery(targetName, reduceExpr)
4545

46-
def archiveExpr: String = s"INSERT OVERWRITE ${targetName}_stream_archive $reduceExpr"
46+
def archiveExpr(archiveTableName: String): String = s"INSERT OVERWRITE $archiveTableName $reduceExpr"
4747

4848
object SqlServerChangeTrackingBackfillBatch:
4949
/**
@@ -66,7 +66,7 @@ class SqlServerChangeTrackingMergeBatch(batchName: String, batchSchema: ArcaneSc
6666
override val batchQuery: MergeQuery =
6767
SqlServerChangeTrackingMergeQuery(targetName = targetName, sourceQuery = reduceExpr, partitionValues = partitionValues, mergeKey = mergeKey, columns = schema.map(f => f.name))
6868

69-
def archiveExpr: String = s"INSERT INTO ${targetName}_stream_archive $reduceExpr"
69+
def archiveExpr(archiveTableName: String): String = s"INSERT INTO $archiveTableName $reduceExpr"
7070

7171
object SqlServerChangeTrackingMergeBatch:
7272
def apply(batchName: String, batchSchema: ArcaneSchema, targetName: String, partitionValues: Map[String, List[String]]): StagedVersionedBatch =

src/main/scala/services/consumers/StagedBatch.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ trait StagedBatch[Query <: StreamingBatchQuery]:
2929
* Query that should be used to archive this batch data
3030
* @return SQL query text
3131
*/
32-
def archiveExpr: String
32+
def archiveExpr(archiveTableName: String): String
3333

3434

3535
/**

src/main/scala/services/consumers/SynapseLink.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,20 @@ import models.querygen.{MergeQuery, MergeQueryCommons, OnSegment, OverwriteQuery
66

77
object MatchedAppendOnlyDelete:
88
def apply(): WhenMatchedDelete = new WhenMatchedDelete {
9-
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = true")
9+
override val segmentCondition: Option[String] = Some(s"coalesce(${MergeQueryCommons.SOURCE_ALIAS}.IsDelete, false) = true")
1010
}
1111

1212
object MatchedAppendOnlyUpdate {
1313
def apply(cols: Seq[String]): WhenMatchedUpdate = new WhenMatchedUpdate {
14-
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = false AND ${MergeQueryCommons.SOURCE_ALIAS}.versionnumber > ${MergeQueryCommons.TARGET_ALIAS}.versionnumber")
14+
override val segmentCondition: Option[String] = Some(s"coalesce(${MergeQueryCommons.SOURCE_ALIAS}.IsDelete, false) = false AND ${MergeQueryCommons.SOURCE_ALIAS}.versionnumber > ${MergeQueryCommons.TARGET_ALIAS}.versionnumber")
1515
override val columns: Seq[String] = cols
1616
}
1717
}
1818

1919
object NotMatchedAppendOnlyInsert {
2020
def apply(cols: Seq[String]): WhenNotMatchedInsert = new WhenNotMatchedInsert {
2121
override val columns: Seq[String] = cols
22-
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = false")
22+
override val segmentCondition: Option[String] = Some(s"coalesce(${MergeQueryCommons.SOURCE_ALIAS}.IsDelete, false) = false")
2323
}
2424
}
2525

@@ -43,11 +43,11 @@ class SynapseLinkBackfillBatch(batchName: String, batchSchema: ArcaneSchema, tar
4343
// thus, we need identify which of the latest versions were deleted after we have found the latest versions for each `Id` - since for backfill we must exclude deletions
4444
s"""SELECT * FROM (
4545
| SELECT * FROM $name ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
46-
|) WHERE IsDelete = false""".stripMargin
46+
|) WHERE coalesce(IsDelete, false) = false""".stripMargin
4747

4848
override val batchQuery: OverwriteQuery = SynapseLinkBackfillQuery(targetName, reduceExpr)
4949

50-
def archiveExpr: String = s"INSERT OVERWRITE ${targetName}_stream_archive $reduceExpr"
50+
def archiveExpr(archiveTableName: String): String = s"INSERT OVERWRITE $archiveTableName $reduceExpr"
5151

5252
object SynapseLinkBackfillBatch:
5353
/**
@@ -68,7 +68,7 @@ class SynapseLinkMergeBatch(batchName: String, batchSchema: ArcaneSchema, target
6868
override val batchQuery: MergeQuery =
6969
SynapseLinkMergeQuery(targetName = targetName, sourceQuery = reduceExpr, partitionValues = partitionValues, mergeKey = mergeKey, columns = schema.map(f => f.name))
7070

71-
def archiveExpr: String = s"INSERT INTO ${targetName}_stream_archive $reduceExpr"
71+
override def archiveExpr(archiveTableName: String): String = s"INSERT INTO $archiveTableName $reduceExpr"
7272

7373
object SynapseLinkMergeBatch:
7474
def apply(batchName: String, batchSchema: ArcaneSchema, targetName: String, partitionValues: Map[String, List[String]]): StagedVersionedBatch =

src/main/scala/services/streaming/base/VersionedDataProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import java.time.Duration
1010
* @tparam DataVersionType The type of the data version.
1111
* @tparam DataBatchType The type of the data batch.
1212
*/
13-
trait VersionedDataProvider[DataVersionType, DataBatchType: HasVersion] {
13+
trait VersionedDataProvider[DataVersionType, DataBatchType] {
1414

1515
/**
1616
* Requests the changes from the data source.

src/test/resources/generate_a_valid_merge_query_synapse_link.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ USING (SELECT * FROM (
33
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
44
)) t_s
55
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY
6-
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
7-
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
6+
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = true THEN DELETE
7+
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
88
colA = t_s.colA,
99
colB = t_s.colB,
1010
Id = t_s.Id,
1111
versionnumber = t_s.versionnumber
12-
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
12+
WHEN NOT MATCHED AND coalesce(t_s.IsDelete, false) = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
1313
t_s.colA,
1414
t_s.colB,
1515
t_s.Id,

src/test/resources/generate_a_valid_merge_query_with_partitions_synapse_link.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ USING (SELECT * FROM (
33
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
44
)) t_s
55
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY AND t_o.colA IN ('a','b','c')
6-
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
7-
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
6+
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = true THEN DELETE
7+
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
88
colA = t_s.colA,
99
colB = t_s.colB,
1010
Id = t_s.Id,
1111
versionnumber = t_s.versionnumber
12-
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
12+
WHEN NOT MATCHED AND coalesce(t_s.IsDelete, false) = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
1313
t_s.colA,
1414
t_s.colB,
1515
t_s.Id,
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
INSERT OVERWRITE test.table_a
22
SELECT * FROM (
33
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
4-
) WHERE IsDelete = false
4+
) WHERE coalesce(IsDelete, false) = false

src/test/resources/generate_a_valid_synapse_link_merge_query_with_partitions.sql

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@ USING (SELECT * FROM (
33
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
44
)) t_s
55
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY AND t_o.colA IN ('a','b','c')
6-
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
7-
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
6+
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = true THEN DELETE
7+
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
88
colA = t_s.colA,
99
colB = t_s.colB,
1010
Id = t_s.Id,
1111
versionnumber = t_s.versionnumber
12-
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
12+
WHEN NOT MATCHED AND coalesce(t_s.IsDelete, false) = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
1313
t_s.colA,
1414
t_s.colB,
1515
t_s.Id,

0 commit comments

Comments
 (0)