@@ -116,7 +116,8 @@ func (s ClickHouseSuite) Test_MySQL_Time() {
116116
117117 EnvWaitForEqualTablesWithNames (env , s , "waiting on cdc" , srcTableName , dstTableName , "id,\" key\" ,d,dt,tm,t" )
118118
119- // Verify that TIME column uses Time64(3) when ClickHouse version >= 25.6 and internal version >= InternalVersion_ClickHouseTime64
119+ // Verify that TIME column uses Time64(3) when ClickHouse version >= 25.6 (assumes latest internal version)
120+ // Backward compatibility with older internal versions is tested in Test_MySQL_Time_BackwardCompatibility
120121 ch , err := connclickhouse .Connect (s .t .Context (), nil , s .Peer ().GetClickhouseConfig ())
121122 require .NoError (s .t , err )
122123 defer ch .Close ()
@@ -139,14 +140,8 @@ func (s ClickHouseSuite) Test_MySQL_Time() {
139140
140141 // If ClickHouse >= 25.6 and using latest internal version, should use Time64(3)
141142 if major > 25 || (major == 25 && minor >= 6 ) {
142- if flowConnConfig .Version >= shared .InternalVersion_ClickHouseTime64 {
143- require .Contains (s .t , columnType , "Time64(3)" ,
144- "Expected Time64(3) for TIME column when ClickHouse >= 25.6 and internal version >= InternalVersion_ClickHouseTime64, got %s" , columnType )
145- } else {
146- // Old version should use DateTime64(6)
147- require .Contains (s .t , columnType , "DateTime64(6)" ,
148- "Expected DateTime64(6) for TIME column with old internal version, got %s" , columnType )
149- }
143+ require .Contains (s .t , columnType , "Time64(3)" ,
144+ "Expected Time64(3) for TIME column when ClickHouse >= 25.6 and using latest internal version, got %s" , columnType )
150145 } else {
151146 // Older ClickHouse versions should use DateTime64(6)
152147 require .Contains (s .t , columnType , "DateTime64(6)" ,
@@ -158,6 +153,64 @@ func (s ClickHouseSuite) Test_MySQL_Time() {
158153 RequireEnvCanceled (s .t , env )
159154}
160155
156+ func (s ClickHouseSuite ) Test_MySQL_Time_BackwardCompatibility () {
157+ if _ , ok := s .source .(* MySqlSource ); ! ok {
158+ s .t .Skip ("only applies to mysql" )
159+ }
160+
161+ srcTableName := "test_datetime_backward_compat"
162+ srcFullName := s .attachSchemaSuffix (srcTableName )
163+ quotedSrcFullName := "\" " + strings .ReplaceAll (srcFullName , "." , "\" .\" " ) + "\" "
164+ dstTableName := "test_datetime_backward_compat_dst"
165+
166+ require .NoError (s .t , s .source .Exec (s .t .Context (), fmt .Sprintf (`
167+ CREATE TABLE IF NOT EXISTS %s (
168+ id SERIAL PRIMARY KEY,
169+ "key" TEXT NOT NULL,
170+ t TIME NOT NULL
171+ )
172+ ` , quotedSrcFullName )))
173+
174+ require .NoError (s .t , s .source .Exec (s .t .Context (), fmt .Sprintf (`INSERT INTO %s ("key",t) VALUES
175+ ('init','14:21.654321')` ,
176+ quotedSrcFullName )))
177+
178+ connectionGen := FlowConnectionGenerationConfig {
179+ FlowJobName : s .attachSuffix (srcTableName ),
180+ TableNameMapping : map [string ]string {srcFullName : dstTableName },
181+ Destination : s .Peer ().Name ,
182+ }
183+ flowConnConfig := connectionGen .GenerateFlowConnectionConfigs (s )
184+ flowConnConfig .DoInitialSnapshot = true
185+ // Explicitly set to old internal version to test backward compatibility
186+ flowConnConfig .Version = shared .InternalVersion_First
187+
188+ tc := NewTemporalClient (s .t )
189+ env := ExecutePeerflow (s .t , tc , flowConnConfig )
190+ SetupCDCFlowStatusQuery (s .t , env , flowConnConfig )
191+
192+ EnvWaitForEqualTablesWithNames (env , s , "waiting on initial" , srcTableName , dstTableName , "id,\" key\" ,t" )
193+
194+ // Verify that TIME column uses DateTime64(6) even with ClickHouse >= 25.6 when using old internal version
195+ ch , err := connclickhouse .Connect (s .t .Context (), nil , s .Peer ().GetClickhouseConfig ())
196+ require .NoError (s .t , err )
197+ defer ch .Close ()
198+
199+ var columnType string
200+ err = ch .QueryRow (s .t .Context (), fmt .Sprintf (
201+ "SELECT type FROM system.columns WHERE database = currentDatabase() AND table = %s AND name = 't'" ,
202+ clickhouse .QuoteLiteral (dstTableName ),
203+ )).Scan (& columnType )
204+ require .NoError (s .t , err )
205+
206+ // With old internal version, should always use DateTime64(6) regardless of ClickHouse version
207+ require .Contains (s .t , columnType , "DateTime64(6)" ,
208+ "Expected DateTime64(6) for TIME column with old internal version (InternalVersion_First), got %s" , columnType )
209+
210+ env .Cancel (s .t .Context ())
211+ RequireEnvCanceled (s .t , env )
212+ }
213+
161214func (s ClickHouseSuite ) Test_MySQL_Bit () {
162215 if _ , ok := s .source .(* MySqlSource ); ! ok {
163216 s .t .Skip ("only applies to mysql" )
0 commit comments