Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
val scala361 = "3.6.1"

ThisBuild / version := "0.0.1-SNAPSHOT"
ThisBuild / version := "0.0.3-SNAPSHOT"
ThisBuild / organization := "com.sneaksanddata"
ThisBuild / scalaVersion := scala361

Expand Down Expand Up @@ -71,7 +71,7 @@ lazy val root = (project in file("."))
// For ZIO
libraryDependencies += "dev.zio" %% "zio-logging" % "2.3.0",
libraryDependencies += "dev.zio" %% "zio-logging-slf4j" % "2.3.0",

// For DataDog
libraryDependencies += "org.apache.logging.log4j" % "log4j-to-slf4j" % "2.24.3",
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.5.16",
Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/services/consumers/JdbcConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type BatchApplicationResult = Boolean
/**
* The result of applying a batch.
*/
type BatchArchivationResult = ResultSet
class BatchArchivationResult

/**
* A consumer that consumes batches from a JDBC source.
Expand All @@ -46,7 +46,7 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](val options: JdbcConsumerOptio

implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

private val logger: Logger = LoggerFactory.getLogger(classOf[BackfillDataGraphBuilder])
private val logger: Logger = LoggerFactory.getLogger(this.getClass)
private lazy val sqlConnection: Connection = DriverManager.getConnection(options.connectionUrl)

def getPartitionValues(batchName: String, partitionFields: List[String]): Future[Map[String, List[String]]] =
Expand All @@ -65,9 +65,10 @@ class JdbcConsumer[Batch <: StagedVersionedBatch](val options: JdbcConsumerOptio
sqlConnection.prepareStatement(batch.batchQuery.query).execute()
}

def archiveBatch(batch: Batch): Future[BatchArchivationResult] =
Future(sqlConnection.prepareStatement(batch.archiveExpr).executeQuery())
.flatMap(_ => Future(sqlConnection.prepareStatement(s"DROP TABLE ${batch.name}").executeQuery()))
def archiveBatch(batch: Batch, archiveTableName: String): Future[BatchArchivationResult] =
Future(sqlConnection.prepareStatement(batch.archiveExpr(archiveTableName)).execute())
.flatMap(_ => Future(sqlConnection.prepareStatement(s"DROP TABLE ${batch.name}").execute()))
.map(_ => new BatchArchivationResult)

def close(): Unit = sqlConnection.close()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class SqlServerChangeTrackingBackfillBatch(batchName: String, batchSchema: Arcan

override val batchQuery: OverwriteQuery = SqlServerChangeTrackingBackfillQuery(targetName, reduceExpr)

def archiveExpr: String = s"INSERT OVERWRITE ${targetName}_stream_archive $reduceExpr"
def archiveExpr(archiveTableName: String): String = s"INSERT OVERWRITE $archiveTableName $reduceExpr"

object SqlServerChangeTrackingBackfillBatch:
/**
Expand All @@ -66,7 +66,7 @@ class SqlServerChangeTrackingMergeBatch(batchName: String, batchSchema: ArcaneSc
override val batchQuery: MergeQuery =
SqlServerChangeTrackingMergeQuery(targetName = targetName, sourceQuery = reduceExpr, partitionValues = partitionValues, mergeKey = mergeKey, columns = schema.map(f => f.name))

def archiveExpr: String = s"INSERT INTO ${targetName}_stream_archive $reduceExpr"
def archiveExpr(archiveTableName: String): String = s"INSERT INTO $archiveTableName $reduceExpr"

object SqlServerChangeTrackingMergeBatch:
def apply(batchName: String, batchSchema: ArcaneSchema, targetName: String, partitionValues: Map[String, List[String]]): StagedVersionedBatch =
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/services/consumers/StagedBatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait StagedBatch[Query <: StreamingBatchQuery]:
* Query that should be used to archive this batch data
* @return SQL query text
*/
def archiveExpr: String
def archiveExpr(archiveTableName: String): String


/**
Expand Down
12 changes: 6 additions & 6 deletions src/main/scala/services/consumers/SynapseLink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@ import models.querygen.{MergeQuery, MergeQueryCommons, OnSegment, OverwriteQuery

object MatchedAppendOnlyDelete:
def apply(): WhenMatchedDelete = new WhenMatchedDelete {
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = true")
override val segmentCondition: Option[String] = Some(s"coalesce(${MergeQueryCommons.SOURCE_ALIAS}.IsDelete, false) = true")
}

object MatchedAppendOnlyUpdate {
def apply(cols: Seq[String]): WhenMatchedUpdate = new WhenMatchedUpdate {
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = false AND ${MergeQueryCommons.SOURCE_ALIAS}.versionnumber > ${MergeQueryCommons.TARGET_ALIAS}.versionnumber")
override val segmentCondition: Option[String] = Some(s"coalesce(${MergeQueryCommons.SOURCE_ALIAS}.IsDelete, false) = false AND ${MergeQueryCommons.SOURCE_ALIAS}.versionnumber > ${MergeQueryCommons.TARGET_ALIAS}.versionnumber")
override val columns: Seq[String] = cols
}
}

object NotMatchedAppendOnlyInsert {
def apply(cols: Seq[String]): WhenNotMatchedInsert = new WhenNotMatchedInsert {
override val columns: Seq[String] = cols
override val segmentCondition: Option[String] = Some(s"${MergeQueryCommons.SOURCE_ALIAS}.IsDelete = false")
override val segmentCondition: Option[String] = Some(s"coalesce(${MergeQueryCommons.SOURCE_ALIAS}.IsDelete, false) = false")
}
}

Expand All @@ -43,11 +43,11 @@ class SynapseLinkBackfillBatch(batchName: String, batchSchema: ArcaneSchema, tar
// thus, we need identify which of the latest versions were deleted after we have found the latest versions for each `Id` - since for backfill we must exclude deletions
s"""SELECT * FROM (
| SELECT * FROM $name ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
|) WHERE IsDelete = false""".stripMargin
|) WHERE coalesce(IsDelete, false) = false""".stripMargin

override val batchQuery: OverwriteQuery = SynapseLinkBackfillQuery(targetName, reduceExpr)

def archiveExpr: String = s"INSERT OVERWRITE ${targetName}_stream_archive $reduceExpr"
def archiveExpr(archiveTableName: String): String = s"INSERT OVERWRITE $archiveTableName $reduceExpr"

object SynapseLinkBackfillBatch:
/**
Expand All @@ -68,7 +68,7 @@ class SynapseLinkMergeBatch(batchName: String, batchSchema: ArcaneSchema, target
override val batchQuery: MergeQuery =
SynapseLinkMergeQuery(targetName = targetName, sourceQuery = reduceExpr, partitionValues = partitionValues, mergeKey = mergeKey, columns = schema.map(f => f.name))

def archiveExpr: String = s"INSERT INTO ${targetName}_stream_archive $reduceExpr"
override def archiveExpr(archiveTableName: String): String = s"INSERT INTO $archiveTableName $reduceExpr"

object SynapseLinkMergeBatch:
def apply(batchName: String, batchSchema: ArcaneSchema, targetName: String, partitionValues: Map[String, List[String]]): StagedVersionedBatch =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import java.time.Duration
* @tparam DataVersionType The type of the data version.
* @tparam DataBatchType The type of the data batch.
*/
trait VersionedDataProvider[DataVersionType, DataBatchType: HasVersion] {
trait VersionedDataProvider[DataVersionType, DataBatchType] {

/**
* Requests the changes from the data source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ USING (SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
)) t_s
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = true THEN DELETE
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
colA = t_s.colA,
colB = t_s.colB,
Id = t_s.Id,
versionnumber = t_s.versionnumber
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
WHEN NOT MATCHED AND coalesce(t_s.IsDelete, false) = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
t_s.colA,
t_s.colB,
t_s.Id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ USING (SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
)) t_s
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY AND t_o.colA IN ('a','b','c')
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = true THEN DELETE
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
colA = t_s.colA,
colB = t_s.colB,
Id = t_s.Id,
versionnumber = t_s.versionnumber
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
WHEN NOT MATCHED AND coalesce(t_s.IsDelete, false) = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
t_s.colA,
t_s.colB,
t_s.Id,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
INSERT OVERWRITE test.table_a
SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
) WHERE IsDelete = false
) WHERE coalesce(IsDelete, false) = false
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ USING (SELECT * FROM (
SELECT * FROM test.staged_a ORDER BY ROW_NUMBER() OVER (PARTITION BY Id ORDER BY versionnumber DESC) FETCH FIRST 1 ROWS WITH TIES
)) t_s
ON t_o.ARCANE_MERGE_KEY = t_s.ARCANE_MERGE_KEY AND t_o.colA IN ('a','b','c')
WHEN MATCHED AND t_s.IsDelete = true THEN DELETE
WHEN MATCHED AND t_s.IsDelete = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = true THEN DELETE
WHEN MATCHED AND coalesce(t_s.IsDelete, false) = false AND t_s.versionnumber > t_o.versionnumber THEN UPDATE SET
colA = t_s.colA,
colB = t_s.colB,
Id = t_s.Id,
versionnumber = t_s.versionnumber
WHEN NOT MATCHED AND t_s.IsDelete = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
WHEN NOT MATCHED AND coalesce(t_s.IsDelete, false) = false THEN INSERT (ARCANE_MERGE_KEY,colA,colB,Id,versionnumber) VALUES (t_s.ARCANE_MERGE_KEY,
t_s.colA,
t_s.colB,
t_s.Id,
Expand Down
Loading