Skip to content

Commit 2cd41ef

Browse files
committed
Implemented connection native
1 parent b46a177 commit 2cd41ef

File tree

2 files changed

+96
-0
lines changed

2 files changed

+96
-0
lines changed

app/server/datasource/rdbms/ydb/connection_manager.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,21 @@ func (c *connectionManager) Make(
110110
ydbDriver,
111111
formatter,
112112
c.cfg.ResourcePool,
113+
c.cfg.Mode,
114+
)
115+
case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE_ARROW:
116+
logger.Debug("connector will use Native SDK over Query Service with Arrow format")
117+
118+
formatter := NewSQLFormatter(config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE, c.cfg.Pushdown)
119+
ydbConn = newConnectionNative(
120+
ctx,
121+
c.QueryLoggerFactory.Make(logger, zap.String("resource_pool", c.cfg.ResourcePool)),
122+
dsi,
123+
params.TableName,
124+
ydbDriver,
125+
formatter,
126+
c.cfg.ResourcePool,
127+
c.cfg.Mode,
113128
)
114129
case config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES:
115130
logger.Debug("connector will use database/sql SDK with scan queries over Table Service")

app/server/datasource/rdbms/ydb/connection_native.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ import (
1414
ydb_sdk "github.com/ydb-platform/ydb-go-sdk/v3"
1515
ydb_sdk_query "github.com/ydb-platform/ydb-go-sdk/v3/query"
1616

17+
"github.com/apache/arrow/go/v13/arrow"
18+
"github.com/apache/arrow/go/v13/arrow/ipc"
19+
1720
api_common "github.com/ydb-platform/fq-connector-go/api/common"
21+
"github.com/ydb-platform/fq-connector-go/app/config"
1822
"github.com/ydb-platform/fq-connector-go/app/server/conversion"
1923
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
2024
"github.com/ydb-platform/fq-connector-go/app/server/paging"
@@ -107,6 +111,82 @@ func (r *rowsNative) Close() error {
107111
return nil
108112
}
109113

114+
var _ rdbms_utils.Columns = (*columnsNative)(nil)
115+
116+
type columnsNative struct {
117+
ctx context.Context
118+
err error
119+
arrowResult ydb_sdk_query.ArrowResult
120+
currentPart io.Reader
121+
reader *ipc.Reader
122+
record arrow.Record
123+
}
124+
125+
func (c *columnsNative) Close() error {
126+
if err := c.arrowResult.Close(c.ctx); err != nil {
127+
return fmt.Errorf("arrow result close: %w", err)
128+
}
129+
return nil
130+
}
131+
132+
func (c *columnsNative) Err() error {
133+
return c.err
134+
}
135+
136+
func (c *columnsNative) Next() bool {
137+
// If we have a reader and it has more records, get the next one
138+
if c.reader != nil && c.reader.Next() {
139+
c.record = c.reader.Record()
140+
return true
141+
}
142+
143+
// Try to get the next part
144+
var part io.Reader
145+
var err error
146+
147+
for p, e := range c.arrowResult.Parts(c.ctx) {
148+
if e != nil {
149+
if errors.Is(e, io.EOF) {
150+
c.err = nil
151+
} else {
152+
c.err = fmt.Errorf("next part: %w", e)
153+
}
154+
155+
return false
156+
}
157+
158+
part = p
159+
160+
break
161+
}
162+
163+
if part == nil {
164+
return false
165+
}
166+
167+
// Create a new reader for this part
168+
c.currentPart = part
169+
reader, err := ipc.NewReader(part)
170+
if err != nil {
171+
c.err = fmt.Errorf("create arrow reader: %w", err)
172+
return false
173+
}
174+
c.reader = reader
175+
176+
// Get the first record from this part
177+
if !c.reader.Next() {
178+
c.err = fmt.Errorf("no records in arrow part")
179+
return false
180+
}
181+
182+
c.record = c.reader.Record()
183+
return true
184+
}
185+
186+
func (c *columnsNative) Record() arrow.Record {
187+
return c.record
188+
}
189+
110190
var _ rdbms_utils.Connection = (*connectionNative)(nil)
111191

112192
type connectionNative struct {
@@ -370,6 +450,7 @@ func newConnectionNative(
370450
driver *ydb_sdk.Driver,
371451
formatter rdbms_utils.SQLFormatter,
372452
resourcePool string,
453+
mode config.TYdbConfig_Mode,
373454
) Connection {
374455
return &connectionNative{
375456
driver: driver,

0 commit comments

Comments
 (0)