Skip to content

Commit b58245c

Browse files
barino86barino-carvanaclaude
authored
feat(go): support target catalog/schema in ingest (adbc-drivers#95)
## What's Changed Adds support for the standard ADBC `adbc.ingest.target_catalog` and `adbc.ingest.target_db_schema` options in the Go Snowflake driver. When set, the driver builds a fully-qualified table identifier (e.g. `"catalog"."schema"."table"`) for all ingestion operations including `CREATE TABLE`, `DROP TABLE IF EXISTS`, `COPY INTO`, and row count queries. Previously, ingestion only used the unqualified table name, requiring users to connect directly to the target database and schema. With this change, users can ingest into any `database.schema.table` from any connection context. Closes adbc-drivers#94. --------- Co-authored-by: Brandon <brandon.arino@carvana.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a116c5d commit b58245c

File tree

4 files changed

+167
-9
lines changed

4 files changed

+167
-9
lines changed

go/bulk_ingestion.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func (st *statement) ingestRecord(ctx context.Context) (nrows int64, err error)
149149

150150
var (
151151
initialRows int64
152-
target = quoteTblName(st.targetTable)
152+
target = st.qualifiedTableName()
153153
)
154154

155155
// Check final row count of target table to get definitive rows affected
@@ -233,7 +233,7 @@ func (st *statement) ingestStream(ctx context.Context) (nrows int64, err error)
233233

234234
var (
235235
initialRows int64
236-
target = quoteTblName(st.targetTable)
236+
target = st.qualifiedTableName()
237237
)
238238

239239
// Check final row count of target table to get definitive rows affected

go/bulk_ingestion_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,59 @@ func TestIngestBatchedParquetWithFileLimit(t *testing.T) {
8888
require.ErrorIs(t, writeParquet(rdr.Schema(), &buf, records, -1, parquetProps, arrowProps), io.EOF)
8989
}
9090

91+
func TestQualifiedTableName(t *testing.T) {
92+
tests := []struct {
93+
name string
94+
targetCatalog string
95+
targetDbSchema string
96+
targetTable string
97+
expected string
98+
}{
99+
{
100+
name: "table only",
101+
targetTable: "my_table",
102+
expected: `"my_table"`,
103+
},
104+
{
105+
name: "schema and table (2-part)",
106+
targetDbSchema: "my_schema",
107+
targetTable: "my_table",
108+
expected: `"my_schema"."my_table"`,
109+
},
110+
{
111+
name: "catalog, schema, and table (3-part)",
112+
targetCatalog: "my_catalog",
113+
targetDbSchema: "my_schema",
114+
targetTable: "my_table",
115+
expected: `"my_catalog"."my_schema"."my_table"`,
116+
},
117+
{
118+
name: "catalog and table (no schema)",
119+
targetCatalog: "my_catalog",
120+
targetTable: "my_table",
121+
expected: `"my_catalog"."my_table"`,
122+
},
123+
{
124+
name: "identifiers with special characters",
125+
targetCatalog: `my"catalog`,
126+
targetDbSchema: `my"schema`,
127+
targetTable: `my"table`,
128+
expected: `"my""catalog"."my""schema"."my""table"`,
129+
},
130+
}
131+
132+
for _, tt := range tests {
133+
t.Run(tt.name, func(t *testing.T) {
134+
st := &statement{
135+
targetCatalog: tt.targetCatalog,
136+
targetDbSchema: tt.targetDbSchema,
137+
targetTable: tt.targetTable,
138+
}
139+
assert.Equal(t, tt.expected, st.qualifiedTableName())
140+
})
141+
}
142+
}
143+
91144
func makeRec(mem memory.Allocator, nCols, nRows int) arrow.RecordBatch {
92145
vals := make([]int8, nRows)
93146
for val := range nRows {

go/driver_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2538,6 +2538,91 @@ func TestIngestCancelContext(t *testing.T) {
25382538
})
25392539
}
25402540

2541+
func TestIngestWithQualifiedTableName(t *testing.T) {
2542+
withQuirks(t, func(q *SnowflakeQuirks) {
2543+
ctx := context.Background()
2544+
2545+
drv := q.SetupDriver(t)
2546+
defer q.TearDownDriver(t, drv)
2547+
2548+
db, err := drv.NewDatabase(q.DatabaseOptions())
2549+
require.NoError(t, err)
2550+
2551+
cnxn, err := db.Open(ctx)
2552+
require.NoError(t, err)
2553+
2554+
sc := arrow.NewSchema([]arrow.Field{
2555+
{Name: "col_int64", Type: arrow.PrimitiveTypes.Int64, Nullable: true},
2556+
}, nil)
2557+
2558+
bldr := array.NewRecordBuilder(q.Alloc(), sc)
2559+
defer bldr.Release()
2560+
2561+
bldr.Field(0).(*array.Int64Builder).AppendValues([]int64{1, 2, 3}, nil)
2562+
2563+
rec := bldr.NewRecordBatch()
2564+
defer rec.Release()
2565+
2566+
// Test 2-part name: schema.table
2567+
t.Run("schema_qualified", func(t *testing.T) {
2568+
tblName := "bulk_ingest_schema_qualified"
2569+
require.NoError(t, q.DropTable(cnxn, tblName))
2570+
2571+
stmt, err := cnxn.NewStatement()
2572+
require.NoError(t, err)
2573+
2574+
require.NoError(t, stmt.Bind(ctx, rec))
2575+
require.NoError(t, stmt.SetOption(adbc.OptionKeyIngestTargetTable, tblName))
2576+
require.NoError(t, stmt.SetOption(adbc.OptionValueIngestTargetDBSchema, q.DBSchema()))
2577+
2578+
n, err := stmt.ExecuteUpdate(ctx)
2579+
require.NoError(t, err)
2580+
require.EqualValues(t, 3, n)
2581+
2582+
require.NoError(t, stmt.SetSqlQuery(fmt.Sprintf(`SELECT * FROM "%s"."%s" ORDER BY "col_int64" ASC`, q.DBSchema(), tblName)))
2583+
rdr, _, err := stmt.ExecuteQuery(ctx)
2584+
require.NoError(t, err)
2585+
defer rdr.Release()
2586+
require.True(t, rdr.Next())
2587+
require.EqualValues(t, 3, rdr.RecordBatch().NumRows())
2588+
2589+
require.NoError(t, stmt.Close())
2590+
require.NoError(t, q.DropTable(cnxn, tblName))
2591+
})
2592+
2593+
// Test 3-part name: catalog.schema.table
2594+
t.Run("catalog_and_schema_qualified", func(t *testing.T) {
2595+
tblName := "bulk_ingest_fully_qualified"
2596+
require.NoError(t, q.DropTable(cnxn, tblName))
2597+
2598+
stmt, err := cnxn.NewStatement()
2599+
require.NoError(t, err)
2600+
2601+
require.NoError(t, stmt.Bind(ctx, rec))
2602+
require.NoError(t, stmt.SetOption(adbc.OptionKeyIngestTargetTable, tblName))
2603+
require.NoError(t, stmt.SetOption(adbc.OptionValueIngestTargetCatalog, q.Catalog()))
2604+
require.NoError(t, stmt.SetOption(adbc.OptionValueIngestTargetDBSchema, q.DBSchema()))
2605+
2606+
n, err := stmt.ExecuteUpdate(ctx)
2607+
require.NoError(t, err)
2608+
require.EqualValues(t, 3, n)
2609+
2610+
require.NoError(t, stmt.SetSqlQuery(fmt.Sprintf(`SELECT * FROM "%s"."%s"."%s" ORDER BY "col_int64" ASC`, q.Catalog(), q.DBSchema(), tblName)))
2611+
rdr, _, err := stmt.ExecuteQuery(ctx)
2612+
require.NoError(t, err)
2613+
defer rdr.Release()
2614+
require.True(t, rdr.Next())
2615+
require.EqualValues(t, 3, rdr.RecordBatch().NumRows())
2616+
2617+
require.NoError(t, stmt.Close())
2618+
require.NoError(t, q.DropTable(cnxn, tblName))
2619+
})
2620+
2621+
require.NoError(t, cnxn.Close())
2622+
require.NoError(t, db.Close())
2623+
})
2624+
}
2625+
25412626
func (suite *SnowflakeTests) TestChangeDatabaseAndGetObjects() {
25422627
// this test demonstrates:
25432628
// 1. changing the database context

go/statement.go

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,13 @@ type statement struct {
6262
useHighPrecision bool
6363
maxTimestampPrecision MaxTimestampPrecision
6464

65-
query string
66-
targetTable string
67-
ingestMode string
68-
ingestOptions *ingestOptions
69-
queryTag string
65+
query string
66+
targetTable string
67+
targetCatalog string
68+
targetDbSchema string
69+
ingestMode string
70+
ingestOptions *ingestOptions
71+
queryTag string
7072

7173
bound arrow.RecordBatch
7274
streamBind array.RecordReader
@@ -76,6 +78,20 @@ func (st *statement) Base() *driverbase.StatementImplBase {
7678
return &st.StatementImplBase
7779
}
7880

81+
// qualifiedTableName builds a fully-qualified table identifier from the
82+
// configured catalog, schema, and table name.
83+
func (st *statement) qualifiedTableName() string {
84+
parts := make([]string, 0, 3)
85+
if st.targetCatalog != "" {
86+
parts = append(parts, quoteTblName(st.targetCatalog))
87+
}
88+
if st.targetDbSchema != "" {
89+
parts = append(parts, quoteTblName(st.targetDbSchema))
90+
}
91+
parts = append(parts, quoteTblName(st.targetTable))
92+
return strings.Join(parts, ".")
93+
}
94+
7995
// setQueryContext applies the query tag if present.
8096
func (st *statement) setQueryContext(ctx context.Context) context.Context {
8197
if st.queryTag != "" {
@@ -148,6 +164,10 @@ func (st *statement) SetOption(key string, val string) error {
148164
case adbc.OptionKeyIngestTargetTable:
149165
st.query = ""
150166
st.targetTable = val
167+
case adbc.OptionValueIngestTargetCatalog:
168+
st.targetCatalog = val
169+
case adbc.OptionValueIngestTargetDBSchema:
170+
st.targetDbSchema = val
151171
case adbc.OptionKeyIngestMode:
152172
switch val {
153173
case adbc.OptionValueIngestModeAppend:
@@ -405,7 +425,7 @@ func (st *statement) initIngest(ctx context.Context) error {
405425
if st.ingestMode == adbc.OptionValueIngestModeCreateAppend {
406426
createBldr.WriteString(" IF NOT EXISTS ")
407427
}
408-
createBldr.WriteString(quoteTblName(st.targetTable))
428+
createBldr.WriteString(st.qualifiedTableName())
409429
createBldr.WriteString(" (")
410430

411431
var schema *arrow.Schema
@@ -442,7 +462,7 @@ func (st *statement) initIngest(ctx context.Context) error {
442462
case adbc.OptionValueIngestModeAppend:
443463
// Do nothing
444464
case adbc.OptionValueIngestModeReplace:
445-
replaceQuery := "DROP TABLE IF EXISTS " + quoteTblName(st.targetTable)
465+
replaceQuery := "DROP TABLE IF EXISTS " + st.qualifiedTableName()
446466
_, err := st.cnxn.cn.ExecContext(ctx, replaceQuery, nil)
447467
if err != nil {
448468
return errToAdbcErr(adbc.StatusInternal, err)

0 commit comments

Comments
 (0)