Skip to content

Commit 836b2f8

Browse files
Update MsSqlReader to emit indexed schema (#335)
1 parent 3fff088 commit 836b2f8

File tree

4 files changed

+36
-42
lines changed

4 files changed

+36
-42
lines changed

src/main/scala/services/base/SchemaProvider.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ trait CanAdd[Schema]:
1717
*/
1818
extension (a: Schema) def addField(fieldName: String, fieldType: ArcaneType): Schema
1919

20+
extension (a: Schema) def addIndexedField(fieldName: String, fieldType: ArcaneType, fieldId: Int): Schema
21+
2022
/** Represents a provider of a schema for a data produced by Arcane.
2123
*
2224
* @tparam Schema
Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,28 @@
11
package com.sneaksanddata.arcane.framework
22
package services.mssql
33

4-
import services.base.CanAdd
4+
import models.schemas.{ArcaneSchema, MergeKeyField, given_CanAdd_ArcaneSchema}
55
import utils.SqlUtils.{JdbcFieldInfo, toArcaneType}
66

7-
import scala.annotation.tailrec
8-
import scala.util.{Failure, Success, Try}
9-
107
/** Represents the schema of a table in a Microsoft SQL Server database. The schema is represented as a sequence of
118
* tuples, where each tuple contains the column name, type (java.sql.Types), precision, and scale.
129
*/
1310
type SqlSchema = Seq[(String, Int, Int, Int)]
1411

15-
/** Companion object for [[SqlSchema]].
16-
*/
17-
object SqlSchema:
18-
19-
/** Converts a SQL schema to an Arcane schema and normalizes the field names.
20-
*
21-
* @param sqlSchema
22-
* The SQL schema to convert.
23-
* @param schema
24-
* The Arcane schema to add the fields to.
25-
* @return
26-
* The Arcane schema with the fields added.
27-
*/
28-
@tailrec
29-
def toSchema[Schema: CanAdd](sqlSchema: SqlSchema, schema: Schema): Try[Schema] =
30-
sqlSchema match
31-
case Seq() => Success(schema)
32-
case (name, fieldType, precision, scale) +: xs =>
33-
toArcaneType(new JdbcFieldInfo(name = name, typeId = fieldType, precision = precision, scale = scale)) match
34-
case Success(arcaneType) => toSchema(xs, schema.addField("\\W+".r.replaceAllIn(name, ""), arcaneType))
35-
case Failure(exception) => Failure[Schema](exception)
12+
given Conversion[SqlSchema, ArcaneSchema]:
13+
// assume that sqlSchema contains merge key and it always comes first
14+
// check resources/get_select_delta_query.sql
15+
override def apply(sqlSchema: SqlSchema): ArcaneSchema = sqlSchema
16+
.foldLeft((ArcaneSchema.empty(), 0)) { case ((agg, fieldIndex), (name, fieldType, precision, scale)) =>
17+
(
18+
agg.addIndexedField(
19+
fieldId = fieldIndex,
20+
fieldName = "\\W+".r.replaceAllIn(name, ""),
21+
// propagate failure by resolving Try
22+
fieldType =
23+
toArcaneType(new JdbcFieldInfo(name = name, typeId = fieldType, precision = precision, scale = scale)).get
24+
),
25+
fieldIndex + 1
26+
)
27+
}
28+
._1

src/main/scala/services/mssql/base/MsSqlReader.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import models.schemas.{ArcaneSchema, DataRow, given_CanAdd_ArcaneSchema}
77
import models.settings.mssql.MsSqlServerDatabaseSourceSettings
88
import services.base.SchemaProvider
99
import services.mssql.QueryProvider.{getBackfillQuery, getChangesQuery, getSchemaQuery}
10-
import services.mssql.SqlSchema.toSchema
10+
import services.mssql.given_Conversion_SqlSchema_ArcaneSchema
1111
import services.mssql.base.MsSqlReader.{closeSafe, executeQuerySafe}
1212
import services.mssql.query.LazyQueryResult.toDataRow
1313
import services.mssql.query.{LazyQueryResult, ScalarQueryResult}
@@ -196,10 +196,9 @@ class MsSqlReader(
196196
*/
197197
override lazy val getSchema: Task[this.SchemaType] =
198198
for
199-
query <- this.getSchemaQuery
200-
sqlSchema <- getSqlSchema(query)
201-
arcaneSchema <- ZIO.fromTry(toSchema(sqlSchema, empty))
202-
yield arcaneSchema
199+
query <- this.getSchemaQuery
200+
sqlSchema <- getSqlSchema(query)
201+
yield sqlSchema
203202

204203
private def getSqlSchema(query: String): Task[SqlSchema] =
205204
ZIO.scoped {

src/test/scala/tests/mssql/MsSqlReaderTests.scala

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ package com.sneaksanddata.arcane.framework
22
package tests.mssql
33

44
import models.schemas.ArcaneType.*
5-
import models.schemas.{ArcaneSchemaField, DataCell, Field, MergeKeyField}
5+
import models.schemas.{ArcaneSchemaField, DataCell, IndexedField, IndexedMergeKeyField}
66
import models.settings.*
77
import models.settings.mssql.MsSqlServerDatabaseSourceSettings
88
import services.filters.ColumnSummaryFieldsFilteringService
@@ -287,17 +287,17 @@ object MsSqlReaderTests extends ZIOSpecDefault:
287287
)
288288
expected <- ZIO.succeed(
289289
List(
290-
Field("x", IntType),
291-
Field("SYS_CHANGE_VERSION", LongType),
292-
Field("SYS_CHANGE_OPERATION", StringType),
293-
Field("y", IntType),
294-
Field("z", BigDecimalType(30, 6)),
295-
Field("a", ByteArrayType),
296-
Field("b", TimestampType),
297-
Field("cd", IntType),
298-
Field("e", FloatType),
299-
Field("ChangeTrackingVersion", LongType),
300-
MergeKeyField
290+
IndexedField("x", IntType, 0),
291+
IndexedField("SYS_CHANGE_VERSION", LongType, 1),
292+
IndexedField("SYS_CHANGE_OPERATION", StringType, 2),
293+
IndexedField("y", IntType, 3),
294+
IndexedField("z", BigDecimalType(30, 6), 4),
295+
IndexedField("a", ByteArrayType, 5),
296+
IndexedField("b", TimestampType, 6),
297+
IndexedField("cd", IntType, 7),
298+
IndexedField("e", FloatType, 8),
299+
IndexedField("ChangeTrackingVersion", LongType, 9),
300+
IndexedMergeKeyField(10)
301301
)
302302
)
303303
schema <- connector.getSchema

0 commit comments

Comments
 (0)