55 "database/sql"
66 "database/sql/driver"
77 "encoding/json"
8+ "errors"
89 "fmt"
910 "math/big"
1011 "math/rand"
@@ -1103,6 +1104,7 @@ type (
11031104 appStructTableUDF struct {
11041105 n int64
11051106 count int64
1107+ err error // return this when done. Could be nil
11061108 }
11071109
11081110 appParStructTableUDF struct {
@@ -1113,19 +1115,17 @@ type (
11131115)
11141116
11151117func (udf * appStructTableUDF ) ColumnInfos () []ColumnInfo {
1116- t , _ := NewTypeInfo (TYPE_BIGINT )
1117- t2 , _ := NewTypeInfo (TYPE_UTINYINT )
11181118 return []ColumnInfo {
1119- {Name : "id" , T : t },
1120- {Name : "uint8" , T : t2 },
1119+ {Name : "id" , T : typeBigintTableUDF },
1120+ {Name : "uint8" , T : typeUTinyintTableUDF },
11211121 }
11221122}
11231123
11241124func (udf * appStructTableUDF ) Init () {}
11251125
11261126func (udf * appStructTableUDF ) FillRow (row Row ) (bool , error ) {
11271127 if udf .count >= udf .n {
1128- return false , nil
1128+ return false , udf . err
11291129 }
11301130 udf .count ++
11311131 err := SetRowValue (row , 0 , udf .count )
@@ -1153,7 +1153,10 @@ func (udf *appStructTableUDF) Cardinality() *CardinalityInfo {
11531153}
11541154
11551155func (udf * appParStructTableUDF ) ColumnInfos () []ColumnInfo {
1156- return []ColumnInfo {{Name : "result" , T : typeBigintTableUDF }}
1156+ return []ColumnInfo {
1157+ {Name : "result" , T : typeBigintTableUDF },
1158+ {Name : "result2" , T : typeBigintTableUDF },
1159+ }
11571160}
11581161
11591162func (udf * appParStructTableUDF ) Init () ParallelTableSourceInfo {
@@ -1196,7 +1199,6 @@ func (udf *appParStructTableUDF) FillRow(localState any, row Row) (bool, error)
11961199 }
11971200 err = SetRowValue (row , 1 , state .start )
11981201 return true , err
1199-
12001202}
12011203
12021204func (udf * appParStructTableUDF ) GetValue (r , c int ) any {
@@ -1247,16 +1249,17 @@ func TestAppendParallelRowSource(t *testing.T) {
12471249 args [i ] = & values [i ]
12481250 }
12491251
1250- count := 0
1252+ count := int64 ( 0 )
12511253 for r := 0 ; res .Next (); r ++ {
12521254 require .NoError (t , res .Scan (args ... ))
12531255 for i , value := range values {
12541256 expected := f .GetValue (r , i )
1255- require .Equal (t , expected , value , "incorrect value" , r , i )
1257+ require .Equal (t , expected , value , "incorrect value at row %v column %v " , r , i )
12561258 }
12571259 count ++
12581260 }
12591261 cleanupAppender (t , c , db , con , a )
1262+ require .Equal (t , f .n , count , "number of rows should be equal" )
12601263}
12611264
12621265func TestAppendParallelRowSourceSingle (t * testing.T ) {
@@ -1289,7 +1292,7 @@ func TestAppendParallelRowSourceSingle(t *testing.T) {
12891292 args [i ] = & values [i ]
12901293 }
12911294
1292- count := 0
1295+ count := int64 ( 0 )
12931296 for r := 0 ; res .Next (); r ++ {
12941297 require .NoError (t , res .Scan (args ... ))
12951298 for i , value := range values {
@@ -1301,6 +1304,7 @@ func TestAppendParallelRowSourceSingle(t *testing.T) {
13011304 count ++
13021305 }
13031306 cleanupAppender (t , c , db , con , a )
1307+ require .Equal (t , f .n , count , "number of rows should be correct" )
13041308}
13051309
13061310func TestAppendRowSource (t * testing.T ) {
@@ -1332,7 +1336,95 @@ func TestAppendRowSource(t *testing.T) {
13321336 args [i ] = & values [i ]
13331337 }
13341338
1335- count := 0
1339+ count := int64 (0 )
1340+ for r := 0 ; res .Next (); r ++ {
1341+ require .NoError (t , res .Scan (args ... ))
1342+ for i , value := range values {
1343+ expected := f .GetValue (r , i )
1344+ require .Equal (t , expected , value , "incorrect value" , r , i )
1345+ }
1346+ count ++
1347+ }
1348+ cleanupAppender (t , c , db , con , a )
1349+ require .Equal (t , f .n , count , "number of rows should be equal" )
1350+ }
1351+
1352+ func TestAppendRowSourceError (t * testing.T ) {
1353+ t .Parallel ()
1354+ sc := `
1355+ CREATE TABLE test (
1356+ id BIGINT,
1357+ uint8 UTINYINT
1358+ )`
1359+ c , db , con , a := prepareAppender (t , sc )
1360+
1361+ f := appStructTableUDF {
1362+ n : 3000 ,
1363+ err : errors .New ("Test test" ),
1364+ }
1365+
1366+ err := a .AppendTableSource (NewAppenderRowSource (& f ))
1367+ require .Equal (t , err , f .err )
1368+
1369+ err = a .Flush ()
1370+ require .NoError (t , err )
1371+
1372+ // Verify results.
1373+ res , err := sql .OpenDB (c ).QueryContext (context .Background (), `SELECT * FROM test ORDER BY id` )
1374+ require .NoError (t , err )
1375+
1376+ values := f .GetTypes ()
1377+ args := make ([]any , len (values ))
1378+ for i := range values {
1379+ args [i ] = & values [i ]
1380+ }
1381+
1382+ count := int64 (0 )
1383+ for r := 0 ; res .Next (); r ++ {
1384+ require .NoError (t , res .Scan (args ... ))
1385+ for i , value := range values {
1386+ expected := f .GetValue (r , i )
1387+ require .Equal (t , expected , value , "incorrect value" , r , i )
1388+ }
1389+ count ++
1390+ }
1391+ cleanupAppender (t , c , db , con , a )
1392+ require .Equal (t , f .n , count , "number of rows should be equal" )
1393+ }
1394+
1395+ func roundDownChunk [T int64 | int ](x T ) T {
1396+ return (x / 2048 ) * 2048
1397+ }
1398+
1399+ func TestAppendChunkSourceError (t * testing.T ) {
1400+ t .Parallel ()
1401+ sc := `
1402+ CREATE TABLE test (
1403+ id BIGINT,
1404+ )`
1405+ c , db , con , a := prepareAppender (t , sc )
1406+
1407+ f := chunkIncTableUDF {
1408+ n : 3000 ,
1409+ err : errors .New ("Test test" ),
1410+ }
1411+ err := a .AppendTableSource (NewAppenderChunkSource (& f ))
1412+ require .Equal (t , err , f .err )
1413+
1414+ err = a .Flush ()
1415+ require .NoError (t , err )
1416+
1417+ // Verify results.
1418+ res , err := sql .OpenDB (c ).QueryContext (context .Background (), `SELECT * FROM test ORDER BY id` )
1419+ require .NoError (t , err )
1420+
1421+ values := f .GetTypes ()
1422+ args := make ([]any , len (values ))
1423+ for i := range values {
1424+ args [i ] = & values [i ]
1425+ }
1426+
1427+ count := int64 (0 )
13361428 for r := 0 ; res .Next (); r ++ {
13371429 require .NoError (t , res .Scan (args ... ))
13381430 for i , value := range values {
@@ -1342,6 +1434,7 @@ func TestAppendRowSource(t *testing.T) {
13421434 count ++
13431435 }
13441436 cleanupAppender (t , c , db , con , a )
1437+ require .Equal (t , roundDownChunk (f .n ), count , "number of rows should be equal" )
13451438}
13461439
13471440func BenchmarkAppenderNested (b * testing.B ) {
@@ -1467,7 +1560,7 @@ func benchmarkAppenderSingle[T any](v T) func(*testing.B) {
14671560 tableSQL := fmt .Sprintf (createSingleTableSQL , types [reflect .TypeFor [T ]()])
14681561 c , db , con , a := prepareAppender (b , tableSQL )
14691562
1470- var vec [ benchmarkRowsToAppend ] T = [benchmarkRowsToAppend ]T {}
1563+ vec : = [benchmarkRowsToAppend ]T {}
14711564 for i := range benchmarkRowsToAppend {
14721565 vec [i ] = v
14731566 }
0 commit comments