Skip to content

Commit ed8c356

Browse files
committed
First successfull run via Arrow
1 parent 87bdac7 commit ed8c356

File tree

9 files changed

+50
-22
lines changed

9 files changed

+50
-22
lines changed

app/server/config/config.debug.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ datasources:
8181
ydb:
8282
<<: *data_source_default_var
8383
use_underlay_network_for_dedicated_databases: false
84-
mode: MODE_QUERY_SERVICE_NATIVE
84+
mode: MODE_QUERY_SERVICE_NATIVE_ARROW
8585
pushdown:
8686
enable_timestamp_pushdown: true
8787
splitting:

app/server/config/config.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -544,10 +544,10 @@ func validateYdbConfig(c *config.TYdbConfig) error {
544544
}
545545

546546
switch c.Mode {
547-
case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE:
548-
if c.ResourcePool == "" {
549-
return fmt.Errorf("you must set `resource_pool` if `mode` is `query_service_native`")
550-
}
547+
case config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE, config.TYdbConfig_MODE_QUERY_SERVICE_NATIVE_ARROW:
548+
// if c.ResourcePool == "" {
549+
// return fmt.Errorf("you must set `resource_pool` if `mode` is `query_service_native` or `query_service_native_arrow`")
550+
// }
551551
case config.TYdbConfig_MODE_TABLE_SERVICE_STDLIB_SCAN_QUERIES:
552552
if c.ResourcePool != "" {
553553
return fmt.Errorf("you must not set `resource_pool` if `mode` is `table_service_stdlib_scan_queries`")

app/server/datasource/rdbms/ydb/connection_manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func (c *connectionManager) Make(
5858
} else if dsi.Credentials.GetBasic() != nil {
5959
logger.Debug("connector will use base auth credentials for authorization")
6060

61-
cred = ydb_sdk.WithStaticCredentials(dsi.Credentials.GetBasic().Username, dsi.Credentials.GetBasic().Password)
61+
//cred = ydb_sdk.WithStaticCredentials(dsi.Credentials.GetBasic().Username, dsi.Credentials.GetBasic().Password)
62+
cred = ydb_sdk.WithAnonymousCredentials()
6263
} else {
6364
logger.Warn("connector will not use any credentials for authorization")
6465

app/server/paging/interface.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type ColumnarBufferFactory[T Acceptor] interface {
4343
// 4. flag marking this stream as completed
4444
type ReadResult[T Acceptor] struct {
4545
ColumnarBuffer ColumnarBuffer[T]
46+
Data []byte // serialized Arrow Record
4647
Stats *api_service_protos.TReadSplitsResponse_TStats
4748
Error error
4849
IsTerminalMessage bool

app/server/paging/sink.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -215,18 +215,13 @@ func (s *sinkImpl[T]) respondWithArrowRecord(
215215
return
216216
}
217217

218-
// Create a columnar buffer for the response
219-
// We'll create a simple implementation that just returns the response
220-
cb := &columnarBufferArrowIPCStreamingDefault[T]{
221-
arrowAllocator: nil,
222-
builders: nil,
223-
schema: record.Schema(),
224-
logger: s.logger,
225-
}
218+
// Get the serialized data from the buffer
219+
serializedData := buf.Bytes()
226220

227-
// Create a result with the response
221+
// Create a result with the serialized data
228222
result := &ReadResult[T]{
229-
ColumnarBuffer: cb,
223+
ColumnarBuffer: nil,
224+
Data: serializedData,
230225
Stats: stats,
231226
Error: err,
232227
IsTerminalMessage: isTerminalMessage,

app/server/streaming/read_splits_streamer.go

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,30 @@ func (s *ReadSplitsStreamer[T]) writeDataToStream() error {
6060
}
6161

6262
func (s *ReadSplitsStreamer[T]) sendResultToStream(result *paging.ReadResult[T]) error {
63-
// buffer must be explicitly marked as unused,
64-
// otherwise memory will leak
65-
defer result.ColumnarBuffer.Release()
63+
var resp *api_service_protos.TReadSplitsResponse
6664

67-
resp, err := result.ColumnarBuffer.ToResponse()
68-
if err != nil {
69-
return fmt.Errorf("buffer to response: %w", err)
65+
var err error
66+
67+
if result.Data != nil {
68+
fmt.Println(" >>> HERE <<<")
69+
// Handle the case where we have serialized Arrow data
70+
resp = &api_service_protos.TReadSplitsResponse{
71+
Payload: &api_service_protos.TReadSplitsResponse_ArrowIpcStreaming{
72+
ArrowIpcStreaming: result.Data,
73+
},
74+
}
75+
} else if result.ColumnarBuffer != nil {
76+
// Handle the case where we have a columnar buffer
77+
// buffer must be explicitly marked as unused,
78+
// otherwise memory will leak
79+
defer result.ColumnarBuffer.Release()
80+
81+
resp, err = result.ColumnarBuffer.ToResponse()
82+
if err != nil {
83+
return fmt.Errorf("buffer to response: %w", err)
84+
}
85+
} else {
86+
return fmt.Errorf("result contains neither Data nor ColumnarBuffer")
7087
}
7188

7289
resp.Stats = result.Stats
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CREATE OBJECT ydb_local_password (TYPE SECRET) WITH (value = password);
2+
3+
CREATE EXTERNAL DATA SOURCE external_datasource WITH (
4+
SOURCE_TYPE="Ydb",
5+
LOCATION="localhost:2136",
6+
AUTH_METHOD="BASIC",
7+
LOGIN="admin",
8+
DATABASE_NAME="/Root",
9+
PASSWORD_SECRET_NAME="ydb_local_password"
10+
);
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
PRAGMA generic.UsePredicatePushdown="true";
2+
3+
SELECT * FROM external_datasource.olap_lineitem_s10 LIMIT 10;
4+

0 commit comments

Comments
 (0)