Skip to content

Commit 12535b8

Browse files
jatorreclaude
andcommitted
feat: add GeoArrow export support for GEOGRAPHY/GEOMETRY columns
Detect GEOGRAPHY/GEOMETRY columns during query execution and tag them with geoarrow.wkb Arrow extension metadata, enabling DuckDB and other Arrow consumers to receive native geometry types with CRS information. How it works: 1. Set GEOGRAPHY/GEOMETRY_OUTPUT_FORMAT=WKB at connection time so geo columns arrive as binary WKB instead of GeoJSON strings 2. Before executing a query, extract the table name and run DESCRIBE TABLE to identify GEOGRAPHY/GEOMETRY columns (catalog metadata is unaffected by the WKB output format setting) 3. Tag identified columns with geoarrow.wkb extension metadata in the Arrow schema — GEOGRAPHY gets CRS "EPSG:4326", GEOMETRY gets no CRS 4. Data flows as binary WKB with zero conversion overhead Note: Snowflake's REST API reports geo columns as "binary" in rowtype metadata when WKB output format is set, losing the original type info. This is why we need the separate DESCRIBE TABLE query. We've reported this to Snowflake. Limitations (documented as TODOs): - GEOMETRY SRID: requires data inspection to determine, same cross-driver issue as adbc-drivers/redshift#2 and adbc-drivers/databricks#350 - Arbitrary queries: only table scans (SELECT ... FROM table) get geoarrow metadata. Complex queries with joins/subqueries don't trigger geo detection. The data is still correct WKB, just without the metadata. Tested end-to-end: DuckDB reads Snowflake GEOGRAPHY as native GEOMETRY with CRS EPSG:4326, and GeoParquet export preserves the type. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a1a694f commit 12535b8

File tree

4 files changed

+147
-10
lines changed

4 files changed

+147
-10
lines changed

go/connection.go

Lines changed: 85 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,9 +514,21 @@ func (c *connectionImpl) toArrowField(columnInfo driverbase.ColumnInfo) arrow.Fi
514514
field.Type = arrow.FixedWidthTypes.Timestamp_ns
515515
}
516516
case "GEOGRAPHY":
517-
fallthrough
517+
// With GEOGRAPHY_OUTPUT_FORMAT=WKB, data arrives as binary WKB.
518+
// GEOGRAPHY is always WGS84 (SRID 4326).
519+
field.Type = arrow.BinaryTypes.Binary
520+
field.Metadata = arrow.MetadataFrom(map[string]string{
521+
"ARROW:extension:name": "geoarrow.wkb",
522+
"ARROW:extension:metadata": `{"crs":"EPSG:4326"}`,
523+
})
518524
case "GEOMETRY":
519-
field.Type = arrow.BinaryTypes.String
525+
// With GEOMETRY_OUTPUT_FORMAT=WKB, data arrives as binary WKB.
526+
// TODO: SRID for GEOMETRY requires inspecting data or a separate query.
527+
// Same cross-driver issue as adbc-drivers/redshift#2 and adbc-drivers/databricks#350.
528+
field.Type = arrow.BinaryTypes.Binary
529+
field.Metadata = arrow.MetadataFrom(map[string]string{
530+
"ARROW:extension:name": "geoarrow.wkb",
531+
})
520532
case "VECTOR":
521533
// despite the fact that Snowflake *does* support returning data
522534
// for VECTOR typed columns as Arrow FixedSizeLists, there's no way
@@ -559,9 +571,16 @@ func descToField(name, typ, isnull, primary string, comment sql.NullString, maxT
559571
case "VARIANT":
560572
field.Type = arrow.BinaryTypes.String
561573
case "GEOGRAPHY":
562-
fallthrough
574+
field.Type = arrow.BinaryTypes.Binary
575+
field.Metadata = arrow.MetadataFrom(map[string]string{
576+
"ARROW:extension:name": "geoarrow.wkb",
577+
"ARROW:extension:metadata": `{"crs":"EPSG:4326"}`,
578+
})
563579
case "GEOMETRY":
564-
field.Type = arrow.BinaryTypes.String
580+
field.Type = arrow.BinaryTypes.Binary
581+
field.Metadata = arrow.MetadataFrom(map[string]string{
582+
"ARROW:extension:name": "geoarrow.wkb",
583+
})
565584
case "BOOLEAN":
566585
field.Type = arrow.FixedWidthTypes.Boolean
567586
default:
@@ -623,6 +642,68 @@ func descToField(name, typ, isnull, primary string, comment sql.NullString, maxT
623642
return
624643
}
625644

645+
// detectGeoColumnsFromQuery attempts to extract a table name from a SQL query
646+
// and runs DESCRIBE TABLE to identify GEOGRAPHY/GEOMETRY columns.
647+
// Returns nil if the table name can't be determined or no geo columns exist.
648+
// This works for table scans (SELECT ... FROM schema.table) which is the common
649+
// case for adbc_scan. Arbitrary queries return nil — data is correct but without
650+
// geoarrow metadata. TODO: Support arbitrary queries.
651+
func (c *connectionImpl) detectGeoColumnsFromQuery(ctx context.Context, query string) map[string]geoColumnType {
652+
// Simple extraction: find "FROM <table>" in the query.
653+
// Handles: SELECT ... FROM schema.table, SELECT ... FROM "schema"."table", etc.
654+
upper := strings.ToUpper(strings.TrimSpace(query))
655+
fromIdx := strings.Index(upper, "FROM ")
656+
if fromIdx == -1 {
657+
return nil
658+
}
659+
660+
// Extract table reference after FROM
661+
rest := strings.TrimSpace(query[fromIdx+5:])
662+
// Take until next SQL keyword or end
663+
endIdx := len(rest)
664+
for _, kw := range []string{" WHERE ", " ORDER ", " GROUP ", " HAVING ", " LIMIT ", " UNION ", " JOIN ", " LEFT ", " RIGHT ", " INNER ", " OUTER ", " CROSS "} {
665+
if idx := strings.Index(strings.ToUpper(rest), kw); idx != -1 && idx < endIdx {
666+
endIdx = idx
667+
}
668+
}
669+
tableName := strings.TrimSpace(rest[:endIdx])
670+
if tableName == "" {
671+
return nil
672+
}
673+
674+
// Run DESCRIBE TABLE to get original column types
675+
rows, err := c.cn.QueryContext(ctx, "DESC TABLE "+tableName, nil)
676+
if err != nil {
677+
return nil
678+
}
679+
defer rows.Close()
680+
681+
geoCols := make(map[string]geoColumnType)
682+
dest := make([]driver.Value, len(rows.Columns()))
683+
for {
684+
if err := rows.Next(dest); err != nil {
685+
break
686+
}
687+
if len(dest) < 2 {
688+
continue
689+
}
690+
name, _ := dest[0].(string)
691+
typ, _ := dest[1].(string)
692+
typ = strings.ToUpper(typ)
693+
694+
if strings.HasPrefix(typ, "GEOGRAPHY") {
695+
geoCols[name] = geoColumnGeography
696+
} else if strings.HasPrefix(typ, "GEOMETRY") {
697+
geoCols[name] = geoColumnGeometry
698+
}
699+
}
700+
701+
if len(geoCols) == 0 {
702+
return nil
703+
}
704+
return geoCols
705+
}
706+
626707
func (c *connectionImpl) getStringQuery(query string) (value string, err error) {
627708
result, err := c.cn.QueryContext(context.Background(), query, nil)
628709
if err != nil {

go/database.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,22 @@ func (d *databaseImpl) Open(ctx context.Context) (adbcConnection adbc.Connection
532532
ctx, span := driverbase.StartSpan(ctx, "databaseImpl.Open", d)
533533
defer driverbase.EndSpan(span, err)
534534

535+
// Set WKB output for geospatial columns so they arrive as binary WKB
536+
// instead of GeoJSON strings. Geo column detection is done separately
537+
// via DESCRIBE TABLE (catalog metadata is unaffected by output format).
538+
// Note: Snowflake's REST API rowtype metadata reports "binary" instead of
539+
// "geography"/"geometry" when WKB format is set — we've reported this to Snowflake.
540+
if d.cfg.Params == nil {
541+
d.cfg.Params = make(map[string]*string)
542+
}
543+
wkb := "WKB"
544+
if _, ok := d.cfg.Params["GEOGRAPHY_OUTPUT_FORMAT"]; !ok {
545+
d.cfg.Params["GEOGRAPHY_OUTPUT_FORMAT"] = &wkb
546+
}
547+
if _, ok := d.cfg.Params["GEOMETRY_OUTPUT_FORMAT"]; !ok {
548+
d.cfg.Params["GEOMETRY_OUTPUT_FORMAT"] = &wkb
549+
}
550+
535551
connector := gosnowflake.NewConnector(drv, *d.cfg)
536552

537553
ctx = gosnowflake.WithArrowAllocator(

go/record_reader.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,15 @@ import (
4848

4949
const MetadataKeySnowflakeType = "SNOWFLAKE_TYPE"
5050

51+
// geoColumnType identifies the Snowflake geospatial type of a column.
52+
type geoColumnType int
53+
54+
const (
55+
geoColumnNone geoColumnType = iota
56+
geoColumnGeography // GEOGRAPHY — always WGS84/SRID 4326
57+
geoColumnGeometry // GEOMETRY — SRID unknown without data inspection
58+
)
59+
5160
func identCol(_ context.Context, a arrow.Array) (arrow.Array, error) {
5261
a.Retain()
5362
return a, nil
@@ -80,14 +89,39 @@ func getRecTransformer(sc *arrow.Schema, tr []colTransformer) recordTransformer
8089
}
8190
}
8291

83-
func getTransformer(sc *arrow.Schema, ld gosnowflake.ArrowStreamLoader, useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision) (*arrow.Schema, recordTransformer) {
92+
func getTransformer(sc *arrow.Schema, ld gosnowflake.ArrowStreamLoader, useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision, geoCols map[string]geoColumnType) (*arrow.Schema, recordTransformer) {
8493
loc, types := ld.Location(), ld.RowTypes()
8594

8695
fields := make([]arrow.Field, len(sc.Fields()))
8796
transformers := make([]func(context.Context, arrow.Array) (arrow.Array, error), len(sc.Fields()))
8897
for i, f := range sc.Fields() {
8998
srcMeta := types[i]
9099
originalArrowUnit := arrow.TimeUnit(srcMeta.Scale / 3)
100+
101+
// With GEOGRAPHY/GEOMETRY_OUTPUT_FORMAT=WKB, geo columns arrive as binary WKB
102+
// but srcMeta.Type is "binary" (Snowflake REST API limitation). Use the geoCols
103+
// map (from DESCRIBE TABLE) to identify them and tag with geoarrow.wkb metadata.
104+
// Data is already WKB binary — no conversion needed, just pass through.
105+
if geoType, ok := geoCols[f.Name]; ok && geoType != geoColumnNone {
106+
f.Type = arrow.BinaryTypes.Binary
107+
if geoType == geoColumnGeography {
108+
f.Metadata = arrow.MetadataFrom(map[string]string{
109+
"ARROW:extension:name": "geoarrow.wkb",
110+
"ARROW:extension:metadata": `{"crs":"EPSG:4326"}`,
111+
})
112+
} else {
113+
// TODO: GEOMETRY SRID requires inspecting data or a separate query.
114+
// Same cross-driver issue as adbc-drivers/redshift#2 and
115+
// adbc-drivers/databricks#350.
116+
f.Metadata = arrow.MetadataFrom(map[string]string{
117+
"ARROW:extension:name": "geoarrow.wkb",
118+
})
119+
}
120+
transformers[i] = identCol
121+
fields[i] = f
122+
continue
123+
}
124+
91125
switch strings.ToUpper(srcMeta.Type) {
92126
case "FIXED":
93127
switch f.Type.ID() {
@@ -551,7 +585,7 @@ type reader struct {
551585
done chan struct{} // signals all producer goroutines have finished
552586
}
553587

554-
func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake.ArrowStreamLoader, bufferSize, prefetchConcurrency int, useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision) (array.RecordReader, error) {
588+
func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake.ArrowStreamLoader, bufferSize, prefetchConcurrency int, useHighPrecision bool, maxTimestampPrecision MaxTimestampPrecision, geoCols map[string]geoColumnType) (array.RecordReader, error) {
555589
batches, err := ld.GetBatches()
556590
if err != nil {
557591
return nil, errToAdbcErr(adbc.StatusInternal, err)
@@ -671,7 +705,7 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
671705
done: make(chan struct{}),
672706
}
673707
close(rdr.done) // No goroutines to wait for
674-
rdr.schema, _ = getTransformer(schema, ld, useHighPrecision, maxTimestampPrecision)
708+
rdr.schema, _ = getTransformer(schema, ld, useHighPrecision, maxTimestampPrecision, nil)
675709
return rdr, nil
676710
}
677711

@@ -710,7 +744,7 @@ func newRecordReader(ctx context.Context, alloc memory.Allocator, ld gosnowflake
710744
}
711745

712746
var recTransform recordTransformer
713-
rdr.schema, recTransform = getTransformer(rr.Schema(), ld, useHighPrecision, maxTimestampPrecision)
747+
rdr.schema, recTransform = getTransformer(rr.Schema(), ld, useHighPrecision, maxTimestampPrecision, geoCols)
714748

715749
group.Go(func() (err error) {
716750
defer rr.Release()

go/statement.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ func (st *statement) ExecuteQuery(ctx context.Context) (reader array.RecordReade
548548
return nil, err
549549
}
550550

551-
reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency, st.useHighPrecision, st.maxTimestampPrecision)
551+
reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency, st.useHighPrecision, st.maxTimestampPrecision, nil)
552552
return reader, err
553553
},
554554
currentBatch: st.bound,
@@ -566,14 +566,20 @@ func (st *statement) ExecuteQuery(ctx context.Context) (reader array.RecordReade
566566
return
567567
}
568568

569+
// Detect geo columns before executing the query. For table scans,
570+
// try to extract the table name and run DESCRIBE TABLE to identify
571+
// GEOGRAPHY/GEOMETRY columns (catalog metadata is unaffected by WKB output format).
572+
// TODO: Support arbitrary queries — currently only table scans get geoarrow metadata.
573+
geoCols := st.cnxn.detectGeoColumnsFromQuery(ctx, st.query)
574+
569575
var loader gosnowflake.ArrowStreamLoader
570576
loader, err = st.cnxn.cn.QueryArrowStream(ctx, st.query)
571577
if err != nil {
572578
err = errToAdbcErr(adbc.StatusInternal, err)
573579
return
574580
}
575581

576-
reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency, st.useHighPrecision, st.maxTimestampPrecision)
582+
reader, err = newRecordReader(ctx, st.alloc, loader, st.queueSize, st.prefetchConcurrency, st.useHighPrecision, st.maxTimestampPrecision, geoCols)
577583
nRows = loader.TotalRows()
578584
return
579585
}

0 commit comments

Comments
 (0)