1
1
package ldbc .snb .datagen .transformation .transform
2
2
3
- import ldbc .snb .datagen .model .{Batched , BatchedEntity , EntityType , Mode }
3
+ import ldbc .snb .datagen .model .{Batched , BatchedEntity , EntityType , Graph , Mode }
4
4
import ldbc .snb .datagen .util .sql .qcol
5
5
import org .apache .spark .sql .DataFrame
6
6
import org .apache .spark .sql .functions .lit
7
7
import org .apache .spark .sql .types .{DateType , TimestampType }
8
8
import shapeless ._
9
9
10
10
trait ConvertDates [M <: Mode ] extends Transform [M , M ] {
11
+ def convertDatesInEntities (entities : Map [EntityType , M # Layout ]): Map [EntityType , (Option [String ], M # Layout )]
12
+
11
13
def convertDates (tpe : EntityType , df : DataFrame ): DataFrame = {
12
14
tpe match {
13
15
case tpe if ! tpe.isStatic =>
@@ -19,48 +21,51 @@ trait ConvertDates[M <: Mode] extends Transform[M, M] {
19
21
case _ => df
20
22
}
21
23
}
24
+
25
+ override def transform (input : Graph [M ]): Graph [M ] = {
26
+ if (input.definition.useTimestamp) {
27
+ throw new AssertionError (" Already using timestamp for dates" )
28
+ }
29
+
30
+ val updatedEntities = convertDatesInEntities(input.entities)
31
+
32
+ val modifiedEntities = updatedEntities
33
+ .map { case (k, (_, data)) => k -> data }
34
+
35
+ val modifiedEntityDefinitions = updatedEntities
36
+ .map { case (k, (schema, _)) => k -> schema }
37
+
38
+ val l = lens[In ]
39
+ (l.definition.useTimestamp ~ l.definition.entities ~ l.entities).set(input)((true , modifiedEntityDefinitions, modifiedEntities))
40
+ }
22
41
}
23
42
24
43
object ConvertDates {
25
44
def apply [T <: Mode : ConvertDates ] = implicitly[ConvertDates [T ]]
26
45
27
46
object instances {
28
47
implicit def batchedConvertDates [M <: Mode ](implicit ev : BatchedEntity =:= M # Layout ) = new ConvertDates [M ] {
29
- override def transform (input : In ): Out = {
30
- if (input.definition.useTimestamp) {
31
- throw new AssertionError (" Already using timestamp for dates" )
32
- }
33
- val modifiedEntities = input.entities.map { case (tpe, layout) => tpe -> {
48
+ override def convertDatesInEntities (entities : Map [EntityType ,M # Layout ]): Map [EntityType , (Option [String ], M # Layout )] = {
49
+ entities.map { case (tpe, layout) =>
34
50
val be = layout.asInstanceOf [BatchedEntity ]
35
- ev(BatchedEntity (
36
- convertDates(tpe, be.snapshot),
37
- be.insertBatches.map(b => Batched (convertDates(tpe, b.entity), b.batchId, b.ordering)),
38
- be.deleteBatches.map(b => Batched (convertDates(tpe, b.entity), b.batchId, b.ordering))
39
- ))
40
- }}
51
+ val convertedSnapshot = convertDates(tpe, be.snapshot)
41
52
42
- val modifiedEntityDefinitions = modifiedEntities
43
- .map { case (tpe, v) => tpe -> Some (v.asInstanceOf [BatchedEntity ].snapshot.schema.toDDL) }
44
-
45
- val l = lens[In ]
46
- (l.definition.useTimestamp ~ l.definition.entities ~ l.entities).set(input)((true , modifiedEntityDefinitions, modifiedEntities))
53
+ val convertedInserts = be.insertBatches.map(b => Batched (convertDates(tpe, b.entity), b.batchId, b.ordering))
54
+ val convertedDeletes = be.deleteBatches.map(b => Batched (convertDates(tpe, b.entity), b.batchId, b.ordering))
55
+ (tpe, (
56
+ Some (convertedSnapshot.schema.toDDL),
57
+ ev(BatchedEntity (convertedSnapshot, convertedInserts, convertedDeletes))
58
+ ))
59
+ }
47
60
}
48
61
}
49
62
50
63
implicit def simpleConvertDates [M <: Mode ](implicit ev : DataFrame =:= M # Layout ) = new ConvertDates [M ] {
51
- override def transform (input : In ): Out = {
52
- if (input.definition.useTimestamp) {
53
- throw new AssertionError (" Already using timestamp for dates" )
64
+ override def convertDatesInEntities (entities : Map [EntityType , M # Layout ]): Map [EntityType , (Option [String ], M # Layout )] = {
65
+ entities.map { case (tpe, v) =>
66
+ val convertedSnapshot = convertDates(tpe, v.asInstanceOf [DataFrame ])
67
+ (tpe, (Some (convertedSnapshot.schema.toDDL), ev(convertedSnapshot)))
54
68
}
55
-
56
- val modifiedEntities = input.entities
57
- .map { case (tpe, v) => tpe -> ev(convertDates(tpe, v.asInstanceOf [DataFrame ])) }
58
-
59
- val modifiedEntityDefinitions = modifiedEntities
60
- .map { case (tpe, v) => tpe -> Some (v.asInstanceOf [DataFrame ].schema.toDDL) }
61
-
62
- val l = lens[In ]
63
- (l.definition.useTimestamp ~ l.definition.entities ~ l.entities).set(input)((true , modifiedEntityDefinitions, modifiedEntities))
64
69
}
65
70
}
66
71
}
0 commit comments