@@ -2,15 +2,18 @@ package primitivebuilder
22
33import (
44 "fmt"
5+ "io"
6+ "net/http"
57 "strconv"
68
9+ "github.com/stackql/any-sdk/anysdk"
10+ "github.com/stackql/any-sdk/pkg/dto"
711 "github.com/stackql/any-sdk/pkg/httpelement"
812 "github.com/stackql/any-sdk/pkg/logging"
913 "github.com/stackql/any-sdk/pkg/response"
1014 "github.com/stackql/any-sdk/pkg/streaming"
1115 "github.com/stackql/stackql/internal/stackql/drm"
1216 "github.com/stackql/stackql/internal/stackql/handler"
13- "github.com/stackql/stackql/internal/stackql/httpmiddleware"
1417 "github.com/stackql/stackql/internal/stackql/internal_data_transfer/internaldto"
1518 "github.com/stackql/stackql/internal/stackql/internal_data_transfer/primitive_context"
1619 "github.com/stackql/stackql/internal/stackql/primitive"
@@ -285,6 +288,143 @@ func itemise(
285288 return newItemisationResult (items , ok , false )
286289}
287290
291+ func inferNextPageResponseElement (provider anysdk.Provider , method anysdk.OperationStore ) sdk_internal_dto.HTTPElement {
292+ st , ok := method .GetPaginationResponseTokenSemantic ()
293+ if ok {
294+ if tp , err := sdk_internal_dto .ExtractHTTPElement (st .GetLocation ()); err == nil {
295+ rv := sdk_internal_dto .NewHTTPElement (
296+ tp ,
297+ st .GetKey (),
298+ )
299+ transformer , tErr := st .GetTransformer ()
300+ if tErr == nil && transformer != nil {
301+ rv .SetTransformer (transformer )
302+ }
303+ return rv
304+ }
305+ }
306+ providerStr := provider .GetName ()
307+ switch providerStr {
308+ case "github" , "okta" :
309+ rv := sdk_internal_dto .NewHTTPElement (
310+ sdk_internal_dto .Header ,
311+ "Link" ,
312+ )
313+ rv .SetTransformer (anysdk .DefaultLinkHeaderTransformer )
314+ return rv
315+ default :
316+ return sdk_internal_dto .NewHTTPElement (
317+ sdk_internal_dto .BodyAttribute ,
318+ "nextPageToken" ,
319+ )
320+ }
321+ }
322+
323+ func inferNextPageRequestElement (provider anysdk.Provider , method anysdk.OperationStore ) sdk_internal_dto.HTTPElement {
324+ st , ok := method .GetPaginationRequestTokenSemantic ()
325+ if ok {
326+ if tp , err := sdk_internal_dto .ExtractHTTPElement (st .GetLocation ()); err == nil {
327+ rv := sdk_internal_dto .NewHTTPElement (
328+ tp ,
329+ st .GetKey (),
330+ )
331+ transformer , tErr := st .GetTransformer ()
332+ if tErr == nil && transformer != nil {
333+ rv .SetTransformer (transformer )
334+ }
335+ return rv
336+ }
337+ }
338+ providerStr := provider .GetName ()
339+ switch providerStr {
340+ case "github" , "okta" :
341+ return sdk_internal_dto .NewHTTPElement (
342+ sdk_internal_dto .RequestString ,
343+ "" ,
344+ )
345+ default :
346+ return sdk_internal_dto .NewHTTPElement (
347+ sdk_internal_dto .QueryParam ,
348+ "pageToken" ,
349+ )
350+ }
351+ }
352+
353+ type PagingState interface {
354+ GetPageCount () int
355+ IsFinished () bool
356+ GetHTTPResponse () * http.Response
357+ GetAPIError () error
358+ }
359+
360+ type httpPagingState struct {
361+ pageCount int
362+ isFinished bool
363+ httpResponse * http.Response
364+ apiErr error
365+ }
366+
367+ func (hps * httpPagingState ) GetPageCount () int {
368+ return hps .pageCount
369+ }
370+
371+ func (hps * httpPagingState ) IsFinished () bool {
372+ return hps .isFinished
373+ }
374+
375+ func (hps * httpPagingState ) GetHTTPResponse () * http.Response {
376+ return hps .httpResponse
377+ }
378+
379+ func (hps * httpPagingState ) GetAPIError () error {
380+ return hps .apiErr
381+ }
382+
383+ func newPagingState (
384+ pageCount int ,
385+ isFinished bool ,
386+ httpResponse * http.Response ,
387+ apiErr error ,
388+ ) PagingState {
389+ return & httpPagingState {
390+ pageCount : pageCount ,
391+ isFinished : isFinished ,
392+ httpResponse : httpResponse ,
393+ apiErr : apiErr ,
394+ }
395+ }
396+
397+ func page (
398+ res response.Response ,
399+ method anysdk.OperationStore ,
400+ provider anysdk.Provider ,
401+ reqCtx anysdk.HTTPArmouryParameters ,
402+ pageCount int ,
403+ rtCtx dto.RuntimeCtx ,
404+ authCtx * dto.AuthCtx ,
405+ outErrFile io.Writer ,
406+ ) PagingState {
407+ npt := inferNextPageResponseElement (provider , method )
408+ nptRequest := inferNextPageRequestElement (provider , method )
409+ if npt == nil || nptRequest == nil {
410+ return newPagingState (pageCount , true , nil , nil )
411+ }
412+ tk := extractNextPageToken (res , npt )
413+ //nolint:lll // long conditional
414+ if tk == "" || tk == "<nil>" || tk == "[]" || (rtCtx .HTTPPageLimit > 0 && pageCount >= rtCtx .HTTPPageLimit ) {
415+ return newPagingState (pageCount , true , nil , nil )
416+ }
417+ pageCount ++
418+ req , reqErr := reqCtx .SetNextPage (method , tk , nptRequest )
419+ if reqErr != nil {
420+ return newPagingState (pageCount , true , nil , reqErr )
421+ }
422+ cc := anysdk .NewAnySdkClientConfigurator (rtCtx , provider .GetName ())
423+ response , apiErr := anysdk .HTTPApiCallFromRequest (
424+ cc , rtCtx , authCtx , authCtx .Type , false , outErrFile , provider , method , req )
425+ return newPagingState (pageCount , false , response , apiErr )
426+ }
427+
288428//nolint:nestif,gocognit // acceptable for now
289429func (ss * SingleSelectAcquire ) actionInsertPreparation (
290430 itemisationResult ItemisationResult ,
@@ -368,6 +508,10 @@ func (ss *SingleSelectAcquire) Build() error {
368508 if err != nil {
369509 return err
370510 }
511+ provider , providerErr := prov .GetProvider ()
512+ if providerErr != nil {
513+ return providerErr
514+ }
371515 m , err := ss .tableMeta .GetMethod ()
372516 if err != nil {
373517 return err
@@ -376,6 +520,10 @@ func (ss *SingleSelectAcquire) Build() error {
376520 if err != nil {
377521 return err
378522 }
523+ authCtx , authCtxErr := ss .handlerCtx .GetAuthContext (prov .GetProviderString ())
524+ if authCtxErr != nil {
525+ return authCtxErr
526+ }
379527 ex := func (pc primitive.IPrimitiveCtx ) internaldto.ExecutorOutput {
380528 logging .GetLogger ().Infof ("SingleSelectAcquire.Execute() beginning execution for table %s" , tableName )
381529 currentTcc := ss .insertPreparedStatementCtx .GetGCCtrlCtrs ().Clone ()
@@ -431,9 +579,15 @@ func (ss *SingleSelectAcquire) Build() error {
431579 return internaldto .NewEmptyExecutorOutput ()
432580 }
433581 // TODO: fix cloning ops
434- response , apiErr := httpmiddleware .HTTPApiCallFromRequest (
435- ss .handlerCtx .Clone (),
436- prov ,
582+ cc := anysdk .NewAnySdkClientConfigurator (ss .handlerCtx .GetRuntimeContext (), provider .GetName ())
583+ response , apiErr := anysdk .HTTPApiCallFromRequest (
584+ cc ,
585+ ss .handlerCtx .GetRuntimeContext (),
586+ authCtx ,
587+ authCtx .Type ,
588+ false ,
589+ ss .handlerCtx .GetOutErrFile (),
590+ provider ,
437591 m ,
438592 reqCtx .GetRequest ().Clone (
439593 reqCtx .GetRequest ().Context (),
@@ -444,8 +598,7 @@ func (ss *SingleSelectAcquire) Build() error {
444598 continue
445599 }
446600 housekeepingDone := false
447- npt := prov .InferNextPageResponseElement (ss .tableMeta .GetHeirarchyObjects ())
448- nptRequest := prov .InferNextPageRequestElement (ss .tableMeta .GetHeirarchyObjects ())
601+ nptRequest := inferNextPageRequestElement (provider , m )
449602 pageCount := 1
450603 for {
451604 if apiErr != nil {
@@ -492,20 +645,25 @@ func (ss *SingleSelectAcquire) Build() error {
492645 return internaldto .NewErroneousExecutorOutput (insertPrepErr )
493646 }
494647
495- if npt == nil || nptRequest == nil {
496- break
497- }
498- tk := extractNextPageToken (res , npt )
499- //nolint:lll // long conditional
500- if tk == "" || tk == "<nil>" || tk == "[]" || (ss .handlerCtx .GetRuntimeContext ().HTTPPageLimit > 0 && pageCount >= ss .handlerCtx .GetRuntimeContext ().HTTPPageLimit ) {
648+ pageResult := page (
649+ res ,
650+ m ,
651+ provider ,
652+ reqCtx ,
653+ pageCount ,
654+ ss .handlerCtx .GetRuntimeContext (),
655+ authCtx ,
656+ ss .handlerCtx .GetOutErrFile (),
657+ )
658+
659+ if pageResult .IsFinished () {
501660 break
502661 }
503- pageCount ++
504- req , reqErr := reqCtx .SetNextPage (m , tk , nptRequest )
505- if reqErr != nil {
506- return internaldto .NewErroneousExecutorOutput (reqErr )
507- }
508- response , apiErr = httpmiddleware .HTTPApiCallFromRequest (ss .handlerCtx .Clone (), prov , m , req )
662+
663+ pageCount = pageResult .GetPageCount ()
664+
665+ response = pageResult .GetHTTPResponse ()
666+ apiErr = pageResult .GetAPIError ()
509667 }
510668 if reqCtx .GetRequest () != nil {
511669 q := reqCtx .GetRequest ().URL .Query ()
0 commit comments