@@ -6,8 +6,11 @@ import (
66 "database/sql"
77 "database/sql/driver"
88 "fmt"
9+ "math/rand"
10+ "os"
911 "strconv"
1012 "strings"
13+ "time"
1114
1215 "github.com/chdb-io/chdb-go/chdb"
1316 "github.com/chdb-io/chdb-go/chdbstable"
@@ -34,6 +37,10 @@ const (
3437 defaultBufferSize = 512
3538)
3639
40+ const charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
41+
42+ var seededRand * rand.Rand = rand .New (rand .NewSource (time .Now ().UnixNano ()))
43+
3744func (d DriverType ) String () string {
3845 switch d {
3946 case ARROW :
@@ -46,20 +53,51 @@ func (d DriverType) String() string {
4653 return ""
4754}
4855
49- func (d DriverType ) PrepareRows (result * chdbstable.LocalResult , buf []byte , bufSize int , useUnsafe bool ) (driver.Rows , error ) {
56+ func (d DriverType ) PrepareRows (result * chdbstable.LocalResult , buf []byte , bufSize int , useUnsafe bool , filePath string ) (driver.Rows , error ) {
5057 switch d {
5158 case ARROW :
52- reader , err := ipc .NewFileReader (bytes .NewReader (buf ))
53- if err != nil {
54- return nil , err
59+ var reader * ipc.FileReader
60+ var err error
61+ var fd * os.File
62+ if filePath != "" {
63+ fd , err = os .Open (filePath )
64+ if err != nil {
65+ return nil , err
66+ }
67+
68+ reader , err = ipc .NewFileReader (fd )
69+ if err != nil {
70+ return nil , err
71+ }
72+
73+ } else {
74+ reader , err = ipc .NewFileReader (bytes .NewReader (buf ))
75+ if err != nil {
76+ return nil , err
77+ }
5578 }
56- return & arrowRows {localResult : result , reader : reader }, nil
79+
80+ return & arrowRows {localResult : result , reader : reader , fd : fd }, nil
5781 case PARQUET :
58- reader := parquet.NewGenericReader [any ](bytes .NewReader (buf ))
82+ var reader * parquet.GenericReader [any ]
83+ var fd * os.File
84+ if filePath != "" {
85+ fl , err := os .Open (filePath )
86+ if err != nil {
87+ return nil , err
88+ }
89+ fd = fl
90+
91+ reader = parquet.NewGenericReader [any ](fl )
92+ } else {
93+ reader = parquet.NewGenericReader [any ](bytes .NewReader (buf ))
94+ }
95+
5996 return & parquetRows {
6097 localResult : result , reader : reader ,
6198 bufferSize : bufSize , needNewBuffer : true ,
6299 useUnsafeStringReader : useUnsafe ,
100+ fd : fd ,
63101 }, nil
64102 }
65103 return nil , fmt .Errorf ("Unsupported driver type" )
@@ -97,7 +135,8 @@ func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
97135 cc := & conn {
98136 udfPath : c .udfPath , session : c .session ,
99137 driverType : c .driverType , bufferSize : c .bufferSize ,
100- useUnsafe : c .useUnsafe ,
138+ useUnsafe : c .useUnsafe ,
139+ useFileInsteadOfMemory : true ,
101140 }
102141 cc .SetupQueryFun ()
103142 return cc , nil
@@ -184,12 +223,13 @@ func (d Driver) OpenConnector(name string) (driver.Connector, error) {
184223}
185224
186225type conn struct {
187- udfPath string
188- driverType DriverType
189- bufferSize int
190- useUnsafe bool
191- session * chdb.Session
192- QueryFun queryHandle
226+ udfPath string
227+ driverType DriverType
228+ bufferSize int
229+ useUnsafe bool
230+ useFileInsteadOfMemory bool
231+ session * chdb.Session
232+ QueryFun queryHandle
193233}
194234
195235func (c * conn ) Close () error {
@@ -230,14 +270,31 @@ func (c *conn) compileArguments(query string, args []driver.NamedValue) (string,
230270 } else {
231271 compiledQuery = query
232272 }
273+
233274 return compiledQuery , nil
234275}
235276
277+ func (c * conn ) createRandomFilePath (size int ) string {
278+ b := make ([]byte , size )
279+ for i := range b {
280+ b [i ] = charset [seededRand .Intn (len (charset ))]
281+ }
282+ return string (b )
283+
284+ }
285+
236286func (c * conn ) QueryContext (ctx context.Context , query string , args []driver.NamedValue ) (driver.Rows , error ) {
237287 compiledQuery , err := c .compileArguments (query , args )
238288 if err != nil {
239289 return nil , err
240290 }
291+ var filePath string
292+ if c .useFileInsteadOfMemory {
293+ compiledQuery = strings .TrimSuffix (compiledQuery , ";" )
294+ compiledQuery += " INTO OUTFILE "
295+ filePath = fmt .Sprintf ("/tmp/%s.%s" , c .createRandomFilePath (16 ), strings .ToLower (c .driverType .String ()))
296+ compiledQuery += fmt .Sprintf ("'%s'" , filePath )
297+ }
241298 result , err := c .QueryFun (compiledQuery , c .driverType .String (), c .udfPath )
242299 if err != nil {
243300 return nil , err
@@ -247,7 +304,7 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
247304 if buf == nil {
248305 return nil , fmt .Errorf ("result is nil" )
249306 }
250- return c .driverType .PrepareRows (result , buf , c .bufferSize , c .useUnsafe )
307+ return c .driverType .PrepareRows (result , buf , c .bufferSize , c .useUnsafe , filePath )
251308
252309}
253310
0 commit comments