Skip to content

Commit 341b55e

Browse files
Bug fix for query that doesn't produce rows (#136)
When executing a statement that doesn't produce rows (ex. CREATE TABLE ....) using DB.Query() instead of DB.Exec() the driver would panic. When DB.Query() is used we try to create a row object. In this case there was a mismatch in the schema metadata returned by thrift and the arrow schema metadata. The thrift schema was indicating a single column named 'Result' while the arrow schema metadata was indicating no columns. Moved the code creating the arrow schema and column info into separate functions. Updated creation of the column info to handle mismatch between the arrow and thrift schemas. Signed-off-by: Raymond Cypher <[email protected]>
2 parents c8bfc90 + 1e10053 commit 341b55e

File tree

3 files changed

+201
-33
lines changed

3 files changed

+201
-33
lines changed

internal/rows/arrowbased/arrowRows.go

Lines changed: 60 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -103,41 +103,13 @@ func NewArrowRowScanner(resultSetMetadata *cli_service.TGetResultSetMetadataResp
103103
arrowConfig = cfg.ArrowConfig
104104
}
105105

106-
var arrowSchema *arrow.Schema
107-
schemaBytes := resultSetMetadata.ArrowSchema
108-
if schemaBytes == nil {
109-
var err error
110-
// convert the TTableSchema to an arrow Schema
111-
arrowSchema, err = tTableSchemaToArrowSchema(resultSetMetadata.Schema, &arrowConfig)
112-
if err != nil {
113-
logger.Err(err).Msg(errArrowRowsConvertSchema)
114-
return nil, dbsqlerrint.NewDriverError(ctx, errArrowRowsConvertSchema, err)
115-
}
116-
117-
// serialize the arrow schema
118-
schemaBytes, err = getArrowSchemaBytes(arrowSchema, ctx)
119-
if err != nil {
120-
logger.Err(err).Msg(errArrowRowsSerializeSchema)
121-
return nil, dbsqlerrint.NewDriverError(ctx, errArrowRowsSerializeSchema, err)
122-
}
123-
} else {
124-
br := &chunkedByteReader{chunks: [][]byte{schemaBytes}}
125-
rdr, err := ipc.NewReader(br)
126-
if err != nil {
127-
return nil, dbsqlerrint.NewDriverError(ctx, errArrowRowsUnableToReadBatch, err)
128-
}
129-
defer rdr.Release()
130-
131-
arrowSchema = rdr.Schema()
106+
schemaBytes, arrowSchema, metadataErr := tGetResultSetMetadataRespToArrowSchema(resultSetMetadata, arrowConfig, ctx, logger)
107+
if metadataErr != nil {
108+
return nil, metadataErr
132109
}
133110

134-
// get the database type names for each column
135-
colInfos := make([]colInfo, len(resultSetMetadata.Schema.Columns))
136-
for i := range resultSetMetadata.Schema.Columns {
137-
col := resultSetMetadata.Schema.Columns[i]
138-
field := arrowSchema.Field(i)
139-
colInfos[i] = colInfo{name: field.Name, arrowType: field.Type, dbType: rowscanner.GetDBType(col)}
140-
}
111+
// Create column info
112+
colInfos := getColumnInfo(arrowSchema, resultSetMetadata.Schema)
141113

142114
// get the function for converting arrow timestamps to a time.Time
143115
// time values from the server are returned as UTC with microsecond precision
@@ -553,6 +525,61 @@ func tColumnDescToArrowField(columnDesc *cli_service.TColumnDesc, arrowConfig *c
553525
return arrowField, nil
554526
}
555527

528+
// Build a slice of columnInfo using the arrow schema and the thrift schema
529+
func getColumnInfo(arrowSchema *arrow.Schema, schema *cli_service.TTableSchema) []colInfo {
530+
if arrowSchema == nil || schema == nil {
531+
return []colInfo{}
532+
}
533+
534+
nFields := len(arrowSchema.Fields())
535+
if len(schema.Columns) < nFields {
536+
nFields = len(schema.Columns)
537+
}
538+
539+
colInfos := make([]colInfo, nFields)
540+
for i := 0; i < nFields; i++ {
541+
col := schema.Columns[i]
542+
field := arrowSchema.Field(i)
543+
colInfos[i] = colInfo{name: field.Name, arrowType: field.Type, dbType: rowscanner.GetDBType(col)}
544+
}
545+
546+
return colInfos
547+
}
548+
549+
// Derive an arrow.Schema object and the corresponding serialized bytes from TGetResultSetMetadataResp
550+
func tGetResultSetMetadataRespToArrowSchema(resultSetMetadata *cli_service.TGetResultSetMetadataResp, arrowConfig config.ArrowConfig, ctx context.Context, logger *dbsqllog.DBSQLLogger) ([]byte, *arrow.Schema, dbsqlerr.DBError) {
551+
552+
var arrowSchema *arrow.Schema
553+
schemaBytes := resultSetMetadata.ArrowSchema
554+
if schemaBytes == nil {
555+
var err error
556+
// convert the TTableSchema to an arrow Schema
557+
arrowSchema, err = tTableSchemaToArrowSchema(resultSetMetadata.Schema, &arrowConfig)
558+
if err != nil {
559+
logger.Err(err).Msg(errArrowRowsConvertSchema)
560+
return nil, nil, dbsqlerrint.NewDriverError(ctx, errArrowRowsConvertSchema, err)
561+
}
562+
563+
// serialize the arrow schema
564+
schemaBytes, err = getArrowSchemaBytes(arrowSchema, ctx)
565+
if err != nil {
566+
logger.Err(err).Msg(errArrowRowsSerializeSchema)
567+
return nil, nil, dbsqlerrint.NewDriverError(ctx, errArrowRowsSerializeSchema, err)
568+
}
569+
} else {
570+
br := &chunkedByteReader{chunks: [][]byte{schemaBytes}}
571+
rdr, err := ipc.NewReader(br)
572+
if err != nil {
573+
return nil, nil, dbsqlerrint.NewDriverError(ctx, errArrowRowsUnableToReadBatch, err)
574+
}
575+
defer rdr.Release()
576+
577+
arrowSchema = rdr.Schema()
578+
}
579+
580+
return schemaBytes, arrowSchema, nil
581+
}
582+
556583
type sparkRecordReader struct{}
557584

558585
// Make sure sparkRecordReader fulfills the recordReader interface

internal/rows/arrowbased/arrowRows_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,75 @@ func TestArrowRowScanner(t *testing.T) {
13491349
assert.Equal(t, expected[i], dest[i])
13501350
}
13511351
})
1352+
1353+
t.Run("Mismatched schemas", func(t *testing.T) {
1354+
// Test for
1355+
var arrowSchema *arrow.Schema
1356+
var schema *cli_service.TTableSchema
1357+
colInfos := getColumnInfo(arrowSchema, schema)
1358+
assert.NotNil(t, colInfos)
1359+
assert.Zero(t, len(colInfos))
1360+
1361+
arrowSchema = &arrow.Schema{}
1362+
colInfos = getColumnInfo(arrowSchema, schema)
1363+
assert.NotNil(t, colInfos)
1364+
assert.Zero(t, len(colInfos))
1365+
1366+
arrowSchema = nil
1367+
schema = &cli_service.TTableSchema{}
1368+
colInfos = getColumnInfo(arrowSchema, schema)
1369+
assert.NotNil(t, colInfos)
1370+
assert.Zero(t, len(colInfos))
1371+
1372+
arrowSchema = &arrow.Schema{}
1373+
schema.Columns = []*cli_service.TColumnDesc{{ColumnName: "Result"}}
1374+
colInfos = getColumnInfo(arrowSchema, schema)
1375+
assert.NotNil(t, colInfos)
1376+
assert.Zero(t, len(colInfos))
1377+
1378+
schema.Columns = nil
1379+
arrowSchema = arrow.NewSchema([]arrow.Field{{Name: "Result", Type: arrow.PrimitiveTypes.Int16}}, nil)
1380+
colInfos = getColumnInfo(arrowSchema, schema)
1381+
assert.NotNil(t, colInfos)
1382+
assert.Zero(t, len(colInfos))
1383+
1384+
schema.Columns = []*cli_service.TColumnDesc{
1385+
{
1386+
ColumnName: "Result",
1387+
TypeDesc: &cli_service.TTypeDesc{
1388+
Types: []*cli_service.TTypeEntry{
1389+
{
1390+
PrimitiveEntry: &cli_service.TPrimitiveTypeEntry{
1391+
Type: cli_service.TTypeId_BOOLEAN_TYPE,
1392+
},
1393+
},
1394+
},
1395+
},
1396+
},
1397+
{
1398+
ColumnName: "Result2",
1399+
},
1400+
}
1401+
colInfos = getColumnInfo(arrowSchema, schema)
1402+
assert.NotNil(t, colInfos)
1403+
assert.Equal(t, 1, len(colInfos))
1404+
assert.Equal(t, "Result", colInfos[0].name)
1405+
assert.Equal(t, cli_service.TTypeId_BOOLEAN_TYPE, colInfos[0].dbType)
1406+
assert.Equal(t, arrow.PrimitiveTypes.Int16, colInfos[0].arrowType)
1407+
1408+
// results of executing query:
1409+
// "create or replace view hive_metastore.databricks_sql_go.test as select 1"
1410+
// using DB.Query() instead of DB.Exec()
1411+
executeStatementResp := cli_service.TExecuteStatementResp{}
1412+
loadTestData(t, "queryVExec.json", &executeStatementResp)
1413+
config := config.WithDefaults()
1414+
config.UseArrowNativeTimestamp = true
1415+
config.UseArrowNativeComplexTypes = true
1416+
config.UseArrowNativeDecimal = false
1417+
config.UseArrowNativeIntervalTypes = false
1418+
_, err := NewArrowRowScanner(executeStatementResp.DirectResults.ResultSetMetadata, executeStatementResp.DirectResults.ResultSet.Results, config, nil, context.Background())
1419+
assert.Nil(t, err)
1420+
})
13521421
}
13531422

13541423
type fakeColumnValues struct {
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
{
2+
"status": {
3+
"statusCode": "SUCCESS_STATUS"
4+
},
5+
"operationHandle": {
6+
"operationId": {
7+
"guid": "Ae4RVsJdEj6Egpavp/LAJA==",
8+
"secret": "M41SnYJyRuuEgstBlGaDnQ=="
9+
},
10+
"operationType": "EXECUTE_STATEMENT",
11+
"hasResultSet": true
12+
},
13+
"directResults": {
14+
"operationStatus": {
15+
"status": {
16+
"statusCode": "SUCCESS_STATUS"
17+
},
18+
"operationState": "FINISHED_STATE",
19+
"operationStarted": 1687477482139,
20+
"operationCompleted": 1687477484937
21+
},
22+
"resultSetMetadata": {
23+
"status": {
24+
"statusCode": "SUCCESS_STATUS"
25+
},
26+
"schema": {
27+
"columns": [
28+
{
29+
"columnName": "Result",
30+
"typeDesc": {
31+
"types": [
32+
{
33+
"primitiveEntry": {
34+
"type": "STRING_TYPE"
35+
}
36+
}
37+
]
38+
},
39+
"position": 1,
40+
"comment": ""
41+
}
42+
]
43+
},
44+
"resultFormat": "ARROW_BASED_SET",
45+
"lz4Compressed": false,
46+
"arrowSchema": "/////0AAAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAAAAAAAAAAA",
47+
"cacheLookupResult": "CACHE_INELIGIBLE",
48+
"uncompressedBytes": 0,
49+
"compressedBytes": 0
50+
},
51+
"resultSet": {
52+
"status": {
53+
"statusCode": "SUCCESS_STATUS"
54+
},
55+
"hasMoreRows": false,
56+
"results": {
57+
"startRowOffset": 0,
58+
"rows": []
59+
}
60+
},
61+
"closeOperation": {
62+
"status": {
63+
"statusCode": "SUCCESS_STATUS"
64+
}
65+
}
66+
},
67+
"executionRejected": false,
68+
"maxClusterCapacity": 10,
69+
"queryCost": 0,
70+
"currentClusterLoad": 0,
71+
"idempotencyType": "NON_IDEMPOTENT"
72+
}

0 commit comments

Comments
 (0)