Skip to content

Commit 428a460

Browse files
committed
Implemented connection native
1 parent f1c3436 commit 428a460

File tree

2 files changed

+123
-2
lines changed

2 files changed

+123
-2
lines changed

app/server/datasource/rdbms/ydb/connection_manager.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,21 @@ func (c *connectionManager) Make(
110110
ydbDriver,
111111
formatter,
112112
c.cfg.ResourcePool,
113+
c.cfg.Mode,
114+
)
115+
case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE_ARROW:
116+
logger.Debug("connector will use Native SDK over Query Service with Arrow format")
117+
118+
formatter := NewSQLFormatter(config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE, c.cfg.Pushdown)
119+
ydbConn = newConnectionNative(
120+
ctx,
121+
c.QueryLoggerFactory.Make(logger, zap.String("resource_pool", c.cfg.ResourcePool)),
122+
dsi,
123+
params.TableName,
124+
ydbDriver,
125+
formatter,
126+
c.cfg.ResourcePool,
127+
c.cfg.Mode,
113128
)
114129
case config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES:
115130
logger.Debug("connector will use database/sql SDK with scan queries over Table Service")

app/server/datasource/rdbms/ydb/connection_native.go

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@ import (
1414
ydb_sdk "github.com/ydb-platform/ydb-go-sdk/v3"
1515
ydb_sdk_query "github.com/ydb-platform/ydb-go-sdk/v3/query"
1616

17+
"github.com/apache/arrow/go/v13/arrow"
18+
"github.com/apache/arrow/go/v13/arrow/ipc"
19+
1720
api_common "github.com/ydb-platform/fq-connector-go/api/common"
21+
"github.com/ydb-platform/fq-connector-go/app/config"
1822
"github.com/ydb-platform/fq-connector-go/app/server/conversion"
1923
rdbms_utils "github.com/ydb-platform/fq-connector-go/app/server/datasource/rdbms/utils"
2024
"github.com/ydb-platform/fq-connector-go/app/server/paging"
@@ -107,6 +111,82 @@ func (r *rowsNative) Close() error {
107111
return nil
108112
}
109113

114+
var _ rdbms_utils.Columns = (*columnsNative)(nil)
115+
116+
type columnsNative struct {
117+
ctx context.Context
118+
err error
119+
arrowResult ydb_sdk_query.ArrowResult
120+
currentPart io.Reader
121+
reader *ipc.Reader
122+
record arrow.Record
123+
}
124+
125+
func (c *columnsNative) Close() error {
126+
if err := c.arrowResult.Close(c.ctx); err != nil {
127+
return fmt.Errorf("arrow result close: %w", err)
128+
}
129+
return nil
130+
}
131+
132+
func (c *columnsNative) Err() error {
133+
return c.err
134+
}
135+
136+
func (c *columnsNative) Next() bool {
137+
// If we have a reader and it has more records, get the next one
138+
if c.reader != nil && c.reader.Next() {
139+
c.record = c.reader.Record()
140+
return true
141+
}
142+
143+
// Try to get the next part
144+
var part io.Reader
145+
var err error
146+
147+
for p, e := range c.arrowResult.Parts(c.ctx) {
148+
if e != nil {
149+
if errors.Is(e, io.EOF) {
150+
c.err = nil
151+
} else {
152+
c.err = fmt.Errorf("next part: %w", e)
153+
}
154+
155+
return false
156+
}
157+
158+
part = p
159+
160+
break
161+
}
162+
163+
if part == nil {
164+
return false
165+
}
166+
167+
// Create a new reader for this part
168+
c.currentPart = part
169+
reader, err := ipc.NewReader(part)
170+
if err != nil {
171+
c.err = fmt.Errorf("create arrow reader: %w", err)
172+
return false
173+
}
174+
c.reader = reader
175+
176+
// Get the first record from this part
177+
if !c.reader.Next() {
178+
c.err = fmt.Errorf("no records in arrow part")
179+
return false
180+
}
181+
182+
c.record = c.reader.Record()
183+
return true
184+
}
185+
186+
func (c *columnsNative) Record() arrow.Record {
187+
return c.record
188+
}
189+
110190
var _ rdbms_utils.Connection = (*connectionNative)(nil)
111191

112192
type connectionNative struct {
@@ -117,12 +197,11 @@ type connectionNative struct {
117197
tableName string
118198
formatter rdbms_utils.SQLFormatter
119199
resourcePool string
200+
mode config.TYdbConfig_Mode
120201
}
121202

122203
// nolint: gocyclo
123204
func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.QueryResult, error) {
124-
resultChan := make(chan *rdbms_utils.QueryResult, 1)
125-
126205
// modify query with args
127206
queryRewritten, err := c.rewriteQuery(params)
128207
if err != nil {
@@ -209,6 +288,31 @@ func (c *connectionNative) Query(params *rdbms_utils.QueryParams) (*rdbms_utils.
209288

210289
c.queryLogger.Dump(queryRewritten, params.QueryArgs.Values()...)
211290

291+
// Create a channel to receive the query result
292+
resultChan := make(chan *rdbms_utils.QueryResult, 1)
293+
294+
// Arrow-based approach
295+
if c.mode == config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE_ARROW {
296+
arrowResult, err := c.driver.Query().QueryArrow(
297+
params.Ctx,
298+
queryRewritten,
299+
ydb_sdk_query.WithParameters(paramsBuilder.Build()),
300+
ydb_sdk_query.WithResourcePool(c.resourcePool),
301+
ydb_sdk_query.WithIdempotent(),
302+
)
303+
if err != nil {
304+
return nil, fmt.Errorf("query arrow: %w", err)
305+
}
306+
307+
columns := &columnsNative{
308+
ctx: c.ctx,
309+
arrowResult: arrowResult,
310+
}
311+
312+
return &rdbms_utils.QueryResult{Columns: columns}, nil
313+
}
314+
315+
// Traditional row-based approach
212316
finalErr := c.driver.Query().Do(
213317
params.Ctx,
214318
func(ctx context.Context, session ydb_sdk_query.Session) (err error) {
@@ -340,6 +444,7 @@ func newConnectionNative(
340444
driver *ydb_sdk.Driver,
341445
formatter rdbms_utils.SQLFormatter,
342446
resourcePool string,
447+
mode config.TYdbConfig_Mode,
343448
) Connection {
344449
return &connectionNative{
345450
ctx: ctx,
@@ -349,5 +454,6 @@ func newConnectionNative(
349454
tableName: tableName,
350455
formatter: formatter,
351456
resourcePool: resourcePool,
457+
mode: mode,
352458
}
353459
}

0 commit comments

Comments
 (0)