@@ -126,36 +126,42 @@ protected function _returnSearch($params, $source)
126126 }
127127
128128
129- public function processDistinct ($ column , $ wheres ): Results
129+ public function processDistinct ($ wheres , $ options , $ columns , $ includeDocCount = false ): Results
130130 {
131- $ col = $ column ;
132- if (is_array ($ column )) {
133- $ col = $ column [0 ];
134- }
135-
136-
137131 try {
138- $ params = $ this ->buildParams ($ this ->index , $ wheres );
139- $ params ['body ' ]['aggs ' ]['distinct_ ' .$ col ]['terms ' ] = [
140- 'field ' => $ col ,
141- 'size ' => $ this ->maxSize ,
142- ];
143- $ process = $ this ->client ->search ($ params );
132+ if ($ columns && !is_array ($ columns )) {
133+ $ columns = [$ columns ];
134+ }
135+ $ sort = $ options ['sort ' ] ?? [];
136+ $ skip = $ options ['skip ' ] ?? false ;
137+ $ limit = $ options ['limit ' ] ?? false ;
138+ unset($ options ['sort ' ]);
139+ unset($ options ['skip ' ]);
140+ unset($ options ['limit ' ]);
141+
142+ $ params = $ this ->buildParams ($ this ->index , $ wheres , $ options );
143+ $ params ['body ' ]['aggs ' ] = $ this ->createNestedAggs ($ columns , $ sort );
144+
145+ $ response = $ this ->client ->search ($ params );
146+
147+
144148 $ data = [];
145- if (!empty ($ process ['aggregations ' ]['distinct_ ' .$ col ]['buckets ' ])) {
146- foreach ($ process ['aggregations ' ]['distinct_ ' .$ col ]['buckets ' ] as $ bucket ) {
147- $ data [] = $ bucket ['key ' ];
148- }
149+ if (!empty ($ response ['aggregations ' ])) {
150+ $ data = $ this ->_sanitizeDistinctResponse ($ response ['aggregations ' ], $ columns , $ includeDocCount );
151+ }
152+
153+ //process limit and skip from all results
154+ if ($ skip || $ limit ) {
155+ $ data = array_slice ($ data , $ skip , $ limit );
149156 }
150157
151- return $ this ->_return ($ data , $ process , $ params , $ this ->_queryTag (__FUNCTION__ ));
158+ return $ this ->_return ($ data , $ response , $ params , $ this ->_queryTag (__FUNCTION__ ));
152159 } catch (Exception $ e ) {
153160
154161 $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
155162 throw new Exception ($ result ->errorMessage );
156163 }
157164
158-
159165 }
160166
161167 /**
@@ -640,6 +646,146 @@ private function _matrixAggregate($wheres, $options, $columns): Results
640646 }
641647
642648 }
649+ //----------------------------------------------------------------------
650+ // Distinct Aggregates
651+ //----------------------------------------------------------------------
652+
653+ public function processDistinctAggregate ($ function , $ wheres , $ options , $ columns ): Results
654+ {
655+ return $ this ->{'_ ' .$ function .'DistinctAggregate ' }($ wheres , $ options , $ columns );
656+ }
657+
658+ private function _countDistinctAggregate ($ wheres , $ options , $ columns ): Results
659+ {
660+ try {
661+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
662+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
663+ $ count = count ($ process ->data );
664+
665+ return $ this ->_return ($ count , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
666+ } catch (Exception $ e ) {
667+
668+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
669+ throw new Exception ($ result ->errorMessage );
670+ }
671+
672+ }
673+
674+
675+ private function _minDistinctAggregate ($ wheres , $ options , $ columns ): Results
676+ {
677+ try {
678+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
679+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
680+
681+ $ min = 0 ;
682+ $ hasBeenSet = false ;
683+ if (!empty ($ process ->data )) {
684+ foreach ($ process ->data as $ datum ) {
685+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
686+ if (!$ hasBeenSet ) {
687+ $ min = $ datum [$ columns [0 ]];
688+ $ hasBeenSet = true ;
689+ } else {
690+ $ min = min ($ min , $ datum [$ columns [0 ]]);
691+ }
692+
693+ }
694+ }
695+ }
696+
697+ return $ this ->_return ($ min , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
698+ } catch (Exception $ e ) {
699+
700+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
701+ throw new Exception ($ result ->errorMessage );
702+ }
703+
704+ }
705+
706+ private function _maxDistinctAggregate ($ wheres , $ options , $ columns ): Results
707+ {
708+ try {
709+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
710+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
711+
712+ $ max = 0 ;
713+ if (!empty ($ process ->data )) {
714+ foreach ($ process ->data as $ datum ) {
715+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
716+ $ max = max ($ max , $ datum [$ columns [0 ]]);
717+ }
718+ }
719+ }
720+
721+
722+ return $ this ->_return ($ max , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
723+ } catch (Exception $ e ) {
724+
725+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
726+ throw new Exception ($ result ->errorMessage );
727+ }
728+
729+ }
730+
731+
732+ private function _sumDistinctAggregate ($ wheres , $ options , $ columns ): Results
733+ {
734+ try {
735+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
736+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
737+ $ sum = 0 ;
738+ if (!empty ($ process ->data )) {
739+ foreach ($ process ->data as $ datum ) {
740+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
741+ $ sum += $ datum [$ columns [0 ]];
742+ }
743+ }
744+ }
745+
746+ return $ this ->_return ($ sum , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
747+ } catch (Exception $ e ) {
748+
749+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
750+ throw new Exception ($ result ->errorMessage );
751+ }
752+
753+ }
754+
755+ private function _avgDistinctAggregate ($ wheres , $ options , $ columns )
756+ {
757+ try {
758+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
759+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
760+ $ sum = 0 ;
761+ $ count = 0 ;
762+ $ avg = 0 ;
763+ if (!empty ($ process ->data )) {
764+ foreach ($ process ->data as $ datum ) {
765+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
766+ $ count ++;
767+ $ sum += $ datum [$ columns [0 ]];
768+ }
769+ }
770+ }
771+ if ($ count > 0 ) {
772+ $ avg = $ sum / $ count ;
773+ }
774+
775+
776+ return $ this ->_return ($ avg , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
777+ } catch (Exception $ e ) {
778+
779+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
780+ throw new Exception ($ result ->errorMessage );
781+ }
782+
783+ }
784+
785+ private function _matrixDistinctAggregate ($ wheres , $ options , $ columns ): Results
786+ {
787+ return $ this ->_returnError ('Matrix distinct aggregate not supported ' , 500 , [], $ this ->_queryTag (__FUNCTION__ ));
788+ }
643789
644790
645791 //======================================================================
@@ -675,10 +821,58 @@ private function _sanitizeSearchResponse($response, $params, $queryTag)
675821 return $ this ->_return ($ data , $ meta , $ params , $ queryTag );
676822 }
677823
824+ private function _sanitizeDistinctResponse ($ response , $ columns , $ includeDocCount )
825+ {
826+ $ keys = [];
827+ foreach ($ columns as $ column ) {
828+ $ keys [] = 'by_ ' .$ column ;
829+ }
830+
831+ return $ this ->processBuckets ($ columns , $ keys , $ response , 0 , $ includeDocCount );
832+
833+ }
834+
835+ private function processBuckets ($ columns , $ keys , $ response , $ index , $ includeDocCount , $ currentData = [])
836+ {
837+ $ data = [];
838+
839+ if (!empty ($ response [$ keys [$ index ]]['buckets ' ])) {
840+ foreach ($ response [$ keys [$ index ]]['buckets ' ] as $ res ) {
841+ $ datum = $ currentData ;
842+ $ datum [$ columns [$ index ]] = $ res ['key ' ];
843+ if ($ includeDocCount ) {
844+ $ datum [$ columns [$ index ].'_count ' ] = $ res ['doc_count ' ];
845+ }
846+
847+ if (isset ($ columns [$ index + 1 ])) {
848+ $ nestedData = $ this ->processBuckets ($ columns , $ keys , $ res , $ index + 1 , $ includeDocCount , $ datum );
849+
850+ if (!empty ($ nestedData )) {
851+ $ data = array_merge ($ data , $ nestedData );
852+ } else {
853+ $ data [] = $ datum ;
854+ }
855+ } else {
856+ $ data [] = $ datum ;
857+ }
858+ }
859+ }
860+
861+ return $ data ;
862+ }
863+
678864 private function _return ($ data , $ meta , $ params , $ queryTag ): Results
679865 {
866+ if (is_object ($ meta )) {
867+ $ metaAsArray = [];
868+ if (method_exists ($ meta , 'asArray ' )) {
869+ $ metaAsArray = $ meta ->asArray ();
870+ }
871+ $ results = new Results ($ data , $ metaAsArray , $ params , $ queryTag );
872+ } else {
873+ $ results = new Results ($ data , $ meta , $ params , $ queryTag );
874+ }
680875
681- $ results = new Results ($ data , $ meta , $ params , $ queryTag );
682876 if ($ this ->queryLogger && !$ this ->queryLoggerOnErrorOnly ) {
683877 $ this ->_logQuery ($ results );
684878 }
0 commit comments