@@ -38,7 +38,7 @@ type PlanStore struct {
3838
3939func (s * PlanStore ) InitSqlPlanTable () error {
4040 // Create table if not exists
41- _ , err := s .db .Exec ( sqlconst .CreateSqlPlanTable )
41+ _ , err := s .db .ExecContext ( s . ctx , sqlconst .CreateSqlPlanTable )
4242 return err
4343}
4444
@@ -48,7 +48,14 @@ func NewPlanStore(c context.Context, path string, maxOpenConns int, threads int,
4848 return nil , fmt .Errorf ("failed to create data directory %s: %w" , path , err )
4949 }
5050
51- dsn := filepath .Join (path , "sql_plan.duckdb" )
51+ memLimit := os .Getenv ("DUCKDB_MEMORY_LIMIT" )
52+ if memLimit == "" {
53+ memLimit = "512MB"
54+ }
55+
56+ dsn := fmt .Sprintf ("%s?memory_limit=%s&allocator_background_threads=true&preserve_insertion_order=false&threads=%d" ,
57+ filepath .Join (path , "sql_plan.duckdb" ), memLimit , threads )
58+
5259 var db * sql.DB
5360 var err error
5461 var conn * sql.Conn
@@ -65,7 +72,8 @@ func NewPlanStore(c context.Context, path string, maxOpenConns int, threads int,
6572 // sql.Open doesn't actually connect. We need to try to get a connection.
6673 conn , err = db .Conn (c )
6774 if err == nil {
68- break // Success
75+ conn .Close () // Close check connection
76+ break // Success
6977 }
7078
7179 db .Close () // Close the db handle on failure
@@ -79,27 +87,6 @@ func NewPlanStore(c context.Context, path string, maxOpenConns int, threads int,
7987
8088 db .SetMaxOpenConns (maxOpenConns )
8189
82- // Set memory limit
83- memLimit := os .Getenv ("DUCKDB_MEMORY_LIMIT" )
84- if memLimit == "" {
85- memLimit = "512MB"
86- }
87- if _ , err := conn .ExecContext (c , fmt .Sprintf ("PRAGMA memory_limit='%s'" , memLimit )); err != nil {
88- l .Warnf ("Failed to set duckdb memory limit: %v" , err )
89- }
90-
91- if _ , err := conn .ExecContext (c , "SET allocator_background_threads=true" ); err != nil {
92- l .Warnf ("Failed to set allocator_background_threads: %v" , err )
93- }
94- if _ , err := conn .ExecContext (c , "SET preserve_insertion_order=false" ); err != nil {
95- l .Warnf ("Failed to set preserve_insertion_order=false: %v" , err )
96- }
97- if _ , err := conn .ExecContext (c , fmt .Sprintf ("SET threads=%d" , threads )); err != nil {
98- l .Warnf ("Failed to set threads=%d: %v" , threads , err )
99- }
100-
101- conn .Close () // Close the temporary connection, the pool will manage connections from here.
102-
10390 s := & PlanStore {db : db , ctx : c , Logger : l }
10491 return s , nil
10592}
@@ -108,7 +95,7 @@ func (s *PlanStore) LoadExistingPlans() ([]model.SqlPlanIdentifier, error) {
10895 s .mu .RLock ()
10996 defer s .mu .RUnlock ()
11097
111- rows , err := s .db .Query ( sqlconst .ListSqlPlanIdentifier )
98+ rows , err := s .db .QueryContext ( s . ctx , sqlconst .ListSqlPlanIdentifier )
11299 if err != nil {
113100 return nil , errors .Wrap (err , "failed to query existing plans" )
114101 }
@@ -130,6 +117,9 @@ func (s *PlanStore) LoadExistingPlans() ([]model.SqlPlanIdentifier, error) {
130117 PlanID : planID ,
131118 })
132119 }
120+ if err := rows .Err (); err != nil {
121+ return nil , errors .Wrap (err , "error during rows iteration" )
122+ }
133123 return existingPlans , nil
134124}
135125
@@ -144,7 +134,7 @@ func (s *PlanStore) Store(plan model.SqlPlan) error {
144134 plan .PartitionStart , plan .Other , plan .AccessPredicates , plan .FilterPredicates , plan .StartupPredicates ,
145135 plan .Projection , plan .SpecialPredicates , plan .QblockName , plan .Remarks , plan .OtherXML }
146136
147- if _ , err := s .db .Exec ( sqlconst .StoreSqlPlanStatement , valueArgs ... ); err != nil {
137+ if _ , err := s .db .ExecContext ( s . ctx , sqlconst .StoreSqlPlanStatement , valueArgs ... ); err != nil {
148138 return err
149139 }
150140
@@ -156,7 +146,7 @@ func (s *PlanStore) PlanExists(ident model.SqlPlanIdentifier) (bool, error) {
156146 defer s .mu .RUnlock ()
157147
158148 var count int
159- err := s .db .QueryRow ( sqlconst .CheckPlanExistence , ident .TenantID , ident .SvrIP , ident .SvrPort , ident .PlanID ).Scan (& count )
149+ err := s .db .QueryRowContext ( s . ctx , sqlconst .CheckPlanExistence , ident .TenantID , ident .SvrIP , ident .SvrPort , ident .PlanID ).Scan (& count )
160150 if err != nil {
161151 return false , errors .Wrap (err , "failed to query plan existence" )
162152 }
@@ -167,7 +157,7 @@ func (s *PlanStore) GetPlanDetail(ident model.SqlPlanIdentifier) ([]model.SqlPla
167157 s .mu .RLock ()
168158 defer s .mu .RUnlock ()
169159
170- rows , err := s .db .Query ( sqlconst .SelectSqlPlanFromDuckdb , ident .TenantID , ident .SvrIP , ident .SvrPort , ident .PlanID )
160+ rows , err := s .db .QueryContext ( s . ctx , sqlconst .SelectSqlPlanFromDuckdb , ident .TenantID , ident .SvrIP , ident .SvrPort , ident .PlanID )
171161 if err != nil {
172162 return nil , errors .Wrap (err , "failed to query plans by sqlId and planHash" )
173163 }
@@ -191,6 +181,9 @@ func (s *PlanStore) GetPlanDetail(ident model.SqlPlanIdentifier) ([]model.SqlPla
191181 }
192182 plans = append (plans , plan )
193183 }
184+ if err := rows .Err (); err != nil {
185+ return nil , errors .Wrap (err , "error during rows iteration" )
186+ }
194187 return plans , nil
195188}
196189
@@ -205,7 +198,7 @@ func (s *PlanStore) GetPlanStatsBySqlId(sqlId string) ([]model.PlanStatistic, er
205198 s .mu .RLock ()
206199 defer s .mu .RUnlock ()
207200
208- rows , err := s .db .Query ( sqlconst .GetPlanStats , sqlId )
201+ rows , err := s .db .QueryContext ( s . ctx , sqlconst .GetPlanStats , sqlId )
209202 if err != nil {
210203 return nil , errors .Wrap (err , "failed to query plan statistics by sqlId" )
211204 }
@@ -235,6 +228,9 @@ func (s *PlanStore) GetPlanStatsBySqlId(sqlId string) ([]model.PlanStatistic, er
235228 }
236229 stats = append (stats , stat )
237230 }
231+ if err := rows .Err (); err != nil {
232+ return nil , errors .Wrap (err , "error during rows iteration" )
233+ }
238234 return stats , nil
239235}
240236
@@ -243,7 +239,7 @@ func (s *PlanStore) GetTableInfoBySqlId(sqlId string) ([]model.TableInfo, error)
243239 defer s .mu .RUnlock ()
244240
245241 // keep the max object id(table id), tables may dropped and recreated
246- rows , err := s .db .Query ( sqlconst .GetTableInfo , sqlId )
242+ rows , err := s .db .QueryContext ( s . ctx , sqlconst .GetTableInfo , sqlId )
247243 if err != nil {
248244 return nil , errors .Wrap (err , "failed to query table info by sqlId" )
249245 }
@@ -261,14 +257,17 @@ func (s *PlanStore) GetTableInfoBySqlId(sqlId string) ([]model.TableInfo, error)
261257 }
262258 tables = append (tables , table )
263259 }
260+ if err := rows .Err (); err != nil {
261+ return nil , errors .Wrap (err , "error during rows iteration" )
262+ }
264263 return tables , nil
265264}
266265
267266func (s * PlanStore ) DebugQuery (query string , args ... interface {}) ([]map [string ]interface {}, error ) {
268267 s .mu .RLock ()
269268 defer s .mu .RUnlock ()
270269
271- rows , err := s .db .Query ( query , args ... )
270+ rows , err := s .db .QueryContext ( s . ctx , query , args ... )
272271 if err != nil {
273272 return nil , err
274273 }
@@ -307,6 +306,9 @@ func (s *PlanStore) DebugQuery(query string, args ...interface{}) ([]map[string]
307306 }
308307 results = append (results , m )
309308 }
309+ if err := rows .Err (); err != nil {
310+ return nil , err
311+ }
310312 return results , nil
311313}
312314
0 commit comments