forked from Mapepire-IBMi/mapepire-go
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery.go
More file actions
199 lines (166 loc) · 4.92 KB
/
query.go
File metadata and controls
199 lines (166 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package mapepire
import (
"fmt"
"sync"
"sync/atomic"
)
// Represents options for query execution
type QueryOptions struct {
Rows int // The amount of rows to fetch
Parameters [][]any // Parameters, if any
TerseResult bool // Whether the result returns in terse format
IsCLcommand bool // Whether the command is a CL command
}
// Represents a SQL Query that can be executed and managed within a SQL job
type Query struct {
ID string // The unique identifier
clCommand string // CL command
sqlQuery string // SQL query
parameters string // Parameters, if any
terse bool // Whether the result returns in terse format
rowsToFetch string // The amount of rows to fetch
prepared bool // Whether the query has been prepared
job *SQLJob // Pointer to the SQL Job
state atomic.Int32 // The current state of the query
}
// Represents a query list managed by the job
type queryList struct {
list []*Query // List of all open queries
lock sync.Mutex // Mutex
}
const (
STATE_RUN_DONE = iota
STATE_RUN_MORE_DATA
STATE_NOT_YET_RUN
)
// Receive a new query list
func newQueryList() *queryList {
return &queryList{list: []*Query{}}
}
// Add a new Query to the list
func (ql *queryList) addQuery(query *Query) {
ql.lock.Lock()
ql.list = append(ql.list, query)
ql.lock.Unlock()
}
// validates the cont_id
func (ql *queryList) validateID(ID string) bool {
ql.lock.Lock()
defer ql.lock.Unlock()
for _, query := range ql.list {
if query.ID == ID && query.state.Load() != STATE_RUN_DONE {
return true
}
}
return false
}
// cleans up the query list
func (ql *queryList) cleanup() {
ql.lock.Lock()
defer ql.lock.Unlock()
newList := make([]*Query, 0, len(ql.list))
for _, query := range ql.list {
if query.state.Load() != STATE_RUN_DONE {
newList = append(newList, query)
} else {
query.sqlCloseUnsafe(query.ID)
}
}
ql.list = newList
}
// Executes the query/command and returns the results
func (q *Query) Execute() (*ServerResponse, error) {
q.job.setJobStatus(JOBSTATUS_BUSY)
if q.state.Load() != STATE_NOT_YET_RUN {
q.job.setJobStatus(JOBSTATUS_ERROR)
err := fmt.Errorf("statement has already been run")
return &ServerResponse{ID: q.ID}, err
}
jsonreq := func() string {
if q.clCommand != "" {
return fmt.Sprintf(`{"id":"%s","type":"cl","cmd":"%s","terse":%t}`, q.ID, q.clCommand, q.terse)
}
if q.prepared {
return fmt.Sprintf(`{"id":"%s","type":"prepare_sql_execute","sql":"%s","parameters":%s,"rows":"%s","terse":%t}`, q.ID, q.sqlQuery, q.parameters, q.rowsToFetch, q.terse)
}
return fmt.Sprintf(`{"id":"%s","type":"sql","sql":"%s","rows":"%s","terse":%t}`, q.ID, q.sqlQuery, q.rowsToFetch, q.terse)
}()
request := &serverRequest{
id: q.ID,
jsonreq: jsonreq,
}
return q.sendRequest(request)
}
// Fetch more rows from a previous request with the ID
func (q *Query) FetchMore(contID string, rows string) (*ServerResponse, error) {
q.job.setJobStatus(JOBSTATUS_BUSY)
resp := &ServerResponse{
ID: q.ID,
}
valid := q.job.queryList.validateID(contID)
if !valid {
q.job.setJobStatus(JOBSTATUS_ERROR)
err := fmt.Errorf("need ID from previous SQL")
return resp, err
} else if q.state.Load() == STATE_NOT_YET_RUN {
q.job.setJobStatus(JOBSTATUS_ERROR)
err := fmt.Errorf("statement has not yet been run")
return resp, err
}
jsonreq :=
fmt.Sprintf(`{"id":"%s","type":"sqlmore","cont_id":"%s","rows":"%s"}`, q.ID, contID, rows)
request := &serverRequest{
id: q.ID,
jsonreq: jsonreq,
}
response, err := q.sendRequest(request)
if err != nil {
return resp, err
}
if response.Success {
response.HasResults = true
}
return response, nil
}
// Close cursor from a previous request.
// Select querys are automatically closed after fetching all data.
func (q *Query) SQLClose(contID string) error {
valid := q.job.queryList.validateID(contID)
if !valid {
q.job.setJobStatus(JOBSTATUS_ERROR)
return fmt.Errorf("need ID from previous query")
}
return q.sqlCloseUnsafe(contID)
}
// Close cursor from a previous request without validating existence of contID
func (q *Query) sqlCloseUnsafe(contID string) error {
q.job.setJobStatus(JOBSTATUS_BUSY)
jsonreq :=
fmt.Sprintf(`{"id":"%v","type":"sqlclose","cont_id":"%v"}`, q.ID, contID)
request := &serverRequest{
id: q.ID,
jsonreq: jsonreq,
}
_, err := q.job.send(*request)
if err != nil {
q.job.setJobStatus(JOBSTATUS_ERROR)
return err
}
return nil
}
// sends the request and sets the query state
func (q *Query) sendRequest(request *serverRequest) (*ServerResponse, error) {
resp, err := q.job.send(*request)
if err != nil {
q.job.setJobStatus(JOBSTATUS_ERROR)
return resp, err
}
if resp.IsDone && resp.Success {
q.state.Store(STATE_RUN_DONE)
} else if resp.Success && !resp.IsDone {
q.state.Store(STATE_RUN_MORE_DATA)
}
q.job.queryList.cleanup()
q.job.setJobStatus(JOBSTATUS_READY)
return resp, nil
}