Skip to content

Commit c1656e6

Browse files
- Further decouple.
1 parent 910202a commit c1656e6

File tree

1 file changed

+184
-132
lines changed

1 file changed

+184
-132
lines changed

internal/stackql/primitivebuilder/single_select_acquire.go

Lines changed: 184 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -109,45 +109,187 @@ func (ss *SingleSelectAcquire) GetTail() primitivegraph.PrimitiveNode {
109109
return ss.root
110110
}
111111

112+
type eliderPayload struct {
113+
currentTcc internaldto.TxnControlCounters
114+
tableName string
115+
reqEncoding string
116+
}
117+
118+
type standardMethodElider struct {
119+
elisionFunc func(...any) bool
120+
}
121+
122+
func (sme *standardMethodElider) IsElide(argz ...any) bool {
123+
return sme.elisionFunc(argz)
124+
}
125+
126+
func newStandardMethodElider(elisionFunc func(...any) bool) methodElider {
127+
return &standardMethodElider{
128+
elisionFunc: elisionFunc,
129+
}
130+
}
131+
112132
//nolint:lll // chaining
113133
func (ss *SingleSelectAcquire) elideActionIfPossible(
114134
currentTcc internaldto.TxnControlCounters,
115135
tableName string,
116136
reqEncoding string,
117-
) bool {
118-
olderTcc, isMatch := ss.handlerCtx.GetNamespaceCollection().GetAnalyticsCacheTableNamespaceConfigurator().Match(
119-
tableName,
120-
reqEncoding,
121-
ss.drmCfg.GetControlAttributes().GetControlLatestUpdateColumnName(), ss.drmCfg.GetControlAttributes().GetControlInsertEncodedIDColumnName())
122-
if isMatch {
123-
nonControlColumns := ss.insertPreparedStatementCtx.GetNonControlColumns()
124-
var nonControlColumnNames []string
125-
for _, c := range nonControlColumns {
126-
nonControlColumnNames = append(nonControlColumnNames, c.GetName())
127-
}
128-
//nolint:errcheck // TODO: fix
129-
ss.handlerCtx.GetGarbageCollector().Update(
137+
) methodElider {
138+
elisionFunc := func(argz ...any) bool {
139+
olderTcc, isMatch := ss.handlerCtx.GetNamespaceCollection().GetAnalyticsCacheTableNamespaceConfigurator().Match(
130140
tableName,
131-
olderTcc.Clone(),
132-
currentTcc,
133-
)
134-
//nolint:errcheck // TODO: fix
135-
ss.insertionContainer.SetTableTxnCounters(tableName, olderTcc)
136-
ss.insertPreparedStatementCtx.SetGCCtrlCtrs(olderTcc)
137-
r, sqlErr := ss.handlerCtx.GetNamespaceCollection().GetAnalyticsCacheTableNamespaceConfigurator().Read(
138-
tableName, reqEncoding,
139-
ss.drmCfg.GetControlAttributes().GetControlInsertEncodedIDColumnName(),
140-
nonControlColumnNames)
141-
if sqlErr != nil {
142-
internaldto.NewErroneousExecutorOutput(sqlErr)
141+
reqEncoding,
142+
ss.drmCfg.GetControlAttributes().GetControlLatestUpdateColumnName(), ss.drmCfg.GetControlAttributes().GetControlInsertEncodedIDColumnName())
143+
if isMatch {
144+
nonControlColumns := ss.insertPreparedStatementCtx.GetNonControlColumns()
145+
var nonControlColumnNames []string
146+
for _, c := range nonControlColumns {
147+
nonControlColumnNames = append(nonControlColumnNames, c.GetName())
148+
}
149+
//nolint:errcheck // TODO: fix
150+
ss.handlerCtx.GetGarbageCollector().Update(
151+
tableName,
152+
olderTcc.Clone(),
153+
currentTcc,
154+
)
155+
//nolint:errcheck // TODO: fix
156+
ss.insertionContainer.SetTableTxnCounters(tableName, olderTcc)
157+
ss.insertPreparedStatementCtx.SetGCCtrlCtrs(olderTcc)
158+
r, sqlErr := ss.handlerCtx.GetNamespaceCollection().GetAnalyticsCacheTableNamespaceConfigurator().Read(
159+
tableName, reqEncoding,
160+
ss.drmCfg.GetControlAttributes().GetControlInsertEncodedIDColumnName(),
161+
nonControlColumnNames)
162+
if sqlErr != nil {
163+
internaldto.NewErroneousExecutorOutput(sqlErr)
164+
}
165+
ss.drmCfg.ExtractObjectFromSQLRows(r, nonControlColumns, ss.stream)
166+
return true
143167
}
144-
ss.drmCfg.ExtractObjectFromSQLRows(r, nonControlColumns, ss.stream)
145-
return true
168+
return false
146169
}
147-
return false
170+
return newStandardMethodElider(elisionFunc)
171+
}
172+
173+
type methodElider interface {
174+
IsElide(...any) bool
148175
}
149176

150-
func (ss *SingleSelectAcquire) actionHTTP() error {
177+
func (ss *SingleSelectAcquire) actionHTTP(
178+
elider methodElider,
179+
) error {
180+
return nil
181+
}
182+
183+
//nolint:nestif,funlen // acceptable for now
184+
func (ss *SingleSelectAcquire) actionInsertPreparation(
185+
target interface{},
186+
resErr error,
187+
housekeepingDone bool,
188+
tableName string,
189+
paramsUsed map[string]interface{},
190+
reqEncoding string,
191+
) error {
192+
var items interface{}
193+
var ok bool
194+
logging.GetLogger().Infoln(fmt.Sprintf("SingleSelectAcquire.Execute() target = %v", target))
195+
switch pl := target.(type) {
196+
// add case for xml object,
197+
case map[string]interface{}:
198+
if ss.tableMeta.GetSelectItemsKey() != "" && ss.tableMeta.GetSelectItemsKey() != "/*" {
199+
items, ok = pl[ss.tableMeta.GetSelectItemsKey()]
200+
if !ok {
201+
if resErr != nil {
202+
items = []interface{}{}
203+
ok = true
204+
} else {
205+
items = []interface{}{
206+
pl,
207+
}
208+
ok = true
209+
}
210+
}
211+
} else {
212+
items = []interface{}{
213+
pl,
214+
}
215+
ok = true
216+
}
217+
case []interface{}:
218+
items = pl
219+
ok = true
220+
case []map[string]interface{}:
221+
items = pl
222+
ok = true
223+
case nil:
224+
return nil
225+
}
226+
keys := make(map[string]map[string]interface{})
227+
228+
//nolint:nestif // TODO: fix
229+
if ok {
230+
iArr, iErr := castItemsArray(items)
231+
if iErr != nil {
232+
return iErr
233+
}
234+
streamErr := ss.stream.Write(iArr)
235+
if streamErr != nil {
236+
return streamErr
237+
}
238+
if ok && len(iArr) > 0 {
239+
if !housekeepingDone && ss.insertPreparedStatementCtx != nil {
240+
_, execErr := ss.handlerCtx.GetSQLEngine().Exec(ss.insertPreparedStatementCtx.GetGCHousekeepingQueries())
241+
tcc := ss.insertPreparedStatementCtx.GetGCCtrlCtrs()
242+
tcc.SetTableName(tableName)
243+
//nolint:errcheck // TODO: fix
244+
ss.insertionContainer.SetTableTxnCounters(tableName, tcc)
245+
housekeepingDone = true
246+
if execErr != nil {
247+
return execErr
248+
}
249+
}
250+
251+
for i, item := range iArr {
252+
if item != nil {
253+
if len(paramsUsed) > 0 {
254+
for k, v := range paramsUsed {
255+
if _, itemOk := item[k]; !itemOk {
256+
item[k] = v
257+
}
258+
}
259+
}
260+
261+
logging.GetLogger().Infoln(
262+
fmt.Sprintf(
263+
"running insert with query = '''%s''', control parameters: %v",
264+
ss.insertPreparedStatementCtx.GetQuery(),
265+
ss.insertPreparedStatementCtx.GetGCCtrlCtrs(),
266+
),
267+
)
268+
r, rErr := ss.drmCfg.ExecuteInsertDML(
269+
ss.handlerCtx.GetSQLEngine(),
270+
ss.insertPreparedStatementCtx,
271+
item,
272+
reqEncoding,
273+
)
274+
logging.GetLogger().Infoln(
275+
fmt.Sprintf(
276+
"insert result = %v, error = %v",
277+
r,
278+
rErr,
279+
),
280+
)
281+
if rErr != nil {
282+
return fmt.Errorf(
283+
"sql insert error: '%w' from query: %s",
284+
rErr,
285+
ss.insertPreparedStatementCtx.GetQuery(),
286+
)
287+
}
288+
keys[strconv.Itoa(i)] = item
289+
}
290+
}
291+
}
292+
}
151293
return nil
152294
}
153295

@@ -214,7 +356,8 @@ func (ss *SingleSelectAcquire) Build() error {
214356
return internaldto.NewErroneousExecutorOutput(paramErr)
215357
}
216358
reqEncoding := reqCtx.Encode()
217-
elideOk := ss.elideActionIfPossible(currentTcc, tableName, reqEncoding)
359+
elider := ss.elideActionIfPossible(currentTcc, tableName, reqEncoding)
360+
elideOk := elider.IsElide(reqEncoding)
218361
if elideOk {
219362
return internaldto.NewEmptyExecutorOutput()
220363
}
@@ -260,110 +403,19 @@ func (ss *SingleSelectAcquire) Build() error {
260403
}
261404
ss.handlerCtx.LogHTTPResponseMap(res.GetProcessedBody())
262405
logging.GetLogger().Infoln(fmt.Sprintf("SingleSelectAcquire.Execute() response = %v", res))
263-
var items interface{}
264-
var ok bool
265-
target := res.GetProcessedBody()
266-
logging.GetLogger().Infoln(fmt.Sprintf("SingleSelectAcquire.Execute() target = %v", target))
267-
switch pl := target.(type) {
268-
// add case for xml object,
269-
case map[string]interface{}:
270-
if ss.tableMeta.GetSelectItemsKey() != "" && ss.tableMeta.GetSelectItemsKey() != "/*" {
271-
items, ok = pl[ss.tableMeta.GetSelectItemsKey()]
272-
if !ok {
273-
if resErr != nil {
274-
items = []interface{}{}
275-
ok = true
276-
} else {
277-
items = []interface{}{
278-
pl,
279-
}
280-
ok = true
281-
}
282-
}
283-
} else {
284-
items = []interface{}{
285-
pl,
286-
}
287-
ok = true
288-
}
289-
case []interface{}:
290-
items = pl
291-
ok = true
292-
case []map[string]interface{}:
293-
items = pl
294-
ok = true
295-
case nil:
296-
return internaldto.NewEmptyExecutorOutput()
297-
}
298-
keys := make(map[string]map[string]interface{})
299406

300-
//nolint:nestif // TODO: fix
301-
if ok {
302-
iArr, iErr := castItemsArray(items)
303-
if iErr != nil {
304-
return internaldto.NewErroneousExecutorOutput(iErr)
305-
}
306-
err = ss.stream.Write(iArr)
307-
if err != nil {
308-
return internaldto.NewErroneousExecutorOutput(err)
309-
}
310-
if ok && len(iArr) > 0 {
311-
if !housekeepingDone && ss.insertPreparedStatementCtx != nil {
312-
_, err = ss.handlerCtx.GetSQLEngine().Exec(ss.insertPreparedStatementCtx.GetGCHousekeepingQueries())
313-
tcc := ss.insertPreparedStatementCtx.GetGCCtrlCtrs()
314-
tcc.SetTableName(tableName)
315-
//nolint:errcheck // TODO: fix
316-
ss.insertionContainer.SetTableTxnCounters(tableName, tcc)
317-
housekeepingDone = true
318-
}
319-
if err != nil {
320-
return internaldto.NewErroneousExecutorOutput(err)
321-
}
322-
323-
for i, item := range iArr {
324-
if item != nil {
325-
if err == nil {
326-
for k, v := range paramsUsed {
327-
if _, itemOk := item[k]; !itemOk {
328-
item[k] = v
329-
}
330-
}
331-
}
332-
333-
logging.GetLogger().Infoln(
334-
fmt.Sprintf(
335-
"running insert with query = '''%s''', control parameters: %v",
336-
ss.insertPreparedStatementCtx.GetQuery(),
337-
ss.insertPreparedStatementCtx.GetGCCtrlCtrs(),
338-
),
339-
)
340-
r, rErr := ss.drmCfg.ExecuteInsertDML(
341-
ss.handlerCtx.GetSQLEngine(),
342-
ss.insertPreparedStatementCtx,
343-
item,
344-
reqEncoding,
345-
)
346-
logging.GetLogger().Infoln(
347-
fmt.Sprintf(
348-
"insert result = %v, error = %v",
349-
r,
350-
rErr,
351-
),
352-
)
353-
if rErr != nil {
354-
return internaldto.NewErroneousExecutorOutput(
355-
fmt.Errorf(
356-
"sql insert error: '%w' from query: %s",
357-
rErr,
358-
ss.insertPreparedStatementCtx.GetQuery(),
359-
),
360-
)
361-
}
362-
keys[strconv.Itoa(i)] = item
363-
}
364-
}
365-
}
407+
insertPrepErr := ss.actionInsertPreparation(
408+
res.GetProcessedBody(),
409+
resErr,
410+
housekeepingDone,
411+
tableName,
412+
paramsUsed,
413+
reqEncoding,
414+
)
415+
if insertPrepErr != nil {
416+
return internaldto.NewErroneousExecutorOutput(insertPrepErr)
366417
}
418+
367419
if npt == nil || nptRequest == nil {
368420
break
369421
}

0 commit comments

Comments
 (0)