@@ -25,11 +25,13 @@ import scala.collection.JavaConverters._
25
25
import org .apache .spark .sql .delta .catalog .DeltaTableV2
26
26
import org .apache .spark .sql .delta .commands .cdc .CDCReader
27
27
import org .apache .spark .sql .delta .sources .DeltaDataSource
28
+ import org .apache .spark .sql .delta .util .AnalysisHelper
28
29
29
30
import org .apache .spark .sql .SparkSession
30
31
import org .apache .spark .sql .catalyst .FunctionIdentifier
31
32
import org .apache .spark .sql .catalyst .analysis .{FunctionRegistryBase , NamedRelation , TableFunctionRegistry , UnresolvedLeafNode , UnresolvedRelation }
32
33
import org .apache .spark .sql .catalyst .expressions .{Attribute , Expression , ExpressionInfo , StringLiteral }
34
+ import org .apache .spark .sql .catalyst .optimizer .ComputeCurrentTime
33
35
import org .apache .spark .sql .catalyst .plans .logical .{LeafNode , LogicalPlan , UnaryNode }
34
36
import org .apache .spark .sql .connector .catalog .V1Table
35
37
import org .apache .spark .sql .execution .datasources .LogicalRelation
@@ -115,7 +117,10 @@ trait CDCStatementBase extends DeltaTableValueFunction {
115
117
case _ : IntegerType | LongType => (keyPrefix + " Version" ) -> value.eval().toString
116
118
case _ : StringType => (keyPrefix + " Timestamp" ) -> value.eval().toString
117
119
case _ : TimestampType => (keyPrefix + " Timestamp" ) -> {
118
- val time = value.eval().toString
120
+ // Resolve current time.
121
+ val fakePlan = AnalysisHelper .FakeLogicalPlan (Seq (value), Nil )
122
+ val timestampExpression = ComputeCurrentTime (fakePlan).expressions.head
123
+ val time = timestampExpression.eval().toString
119
124
val fmt = new SimpleDateFormat (" yyyy-MM-dd HH:mm:ss.SSS" )
120
125
// when evaluated the time is represented with microseconds, which needs to be trimmed.
121
126
fmt.format(new Date (time.toLong / 1000 ))
0 commit comments