@@ -126,36 +126,42 @@ protected function _returnSearch($params, $source)
126126 }
127127
128128
129- public function processDistinct ($ column , $ wheres ): Results
129+ public function processDistinct ($ wheres , $ options , $ columns , $ includeDocCount = false ): Results
130130 {
131- $ col = $ column ;
132- if (is_array ($ column )) {
133- $ col = $ column [0 ];
134- }
135-
136-
137131 try {
138- $ params = $ this ->buildParams ($ this ->index , $ wheres );
139- $ params ['body ' ]['aggs ' ]['distinct_ ' .$ col ]['terms ' ] = [
140- 'field ' => $ col ,
141- 'size ' => $ this ->maxSize ,
142- ];
143- $ process = $ this ->client ->search ($ params );
132+ if ($ columns && !is_array ($ columns )) {
133+ $ columns = [$ columns ];
134+ }
135+ $ sort = $ options ['sort ' ] ?? [];
136+ $ skip = $ options ['skip ' ] ?? false ;
137+ $ limit = $ options ['limit ' ] ?? false ;
138+ unset($ options ['sort ' ]);
139+ unset($ options ['skip ' ]);
140+ unset($ options ['limit ' ]);
141+
142+ $ params = $ this ->buildParams ($ this ->index , $ wheres , $ options );
143+ $ params ['body ' ]['aggs ' ] = $ this ->createNestedAggs ($ columns , $ sort );
144+
145+ $ response = $ this ->client ->search ($ params );
146+
147+
144148 $ data = [];
145- if (!empty ($ process ['aggregations ' ]['distinct_ ' .$ col ]['buckets ' ])) {
146- foreach ($ process ['aggregations ' ]['distinct_ ' .$ col ]['buckets ' ] as $ bucket ) {
147- $ data [] = $ bucket ['key ' ];
148- }
149+ if (!empty ($ response ['aggregations ' ])) {
150+ $ data = $ this ->_sanitizeDistinctResponse ($ response ['aggregations ' ], $ columns , $ includeDocCount );
151+ }
152+
153+ //process limit and skip from all results
154+ if ($ skip || $ limit ) {
155+ $ data = array_slice ($ data , $ skip , $ limit );
149156 }
150157
151- return $ this ->_return ($ data , $ process , $ params , $ this ->_queryTag (__FUNCTION__ ));
158+ return $ this ->_return ($ data , $ response , $ params , $ this ->_queryTag (__FUNCTION__ ));
152159 } catch (Exception $ e ) {
153160
154161 $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
155162 throw new Exception ($ result ->errorMessage );
156163 }
157164
158-
159165 }
160166
161167 /**
@@ -641,6 +647,147 @@ private function _matrixAggregate($wheres, $options, $columns): Results
641647
642648 }
643649
650+ //----------------------------------------------------------------------
651+ // Distinct Aggregates
652+ //----------------------------------------------------------------------
653+
654+ public function processDistinctAggregate ($ function , $ wheres , $ options , $ columns ): Results
655+ {
656+ return $ this ->{'_ ' .$ function .'DistinctAggregate ' }($ wheres , $ options , $ columns );
657+ }
658+
659+ private function _countDistinctAggregate ($ wheres , $ options , $ columns ): Results
660+ {
661+ try {
662+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
663+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
664+ $ count = count ($ process ->data );
665+
666+ return $ this ->_return ($ count , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
667+ } catch (Exception $ e ) {
668+
669+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
670+ throw new Exception ($ result ->errorMessage );
671+ }
672+
673+ }
674+
675+
676+ private function _minDistinctAggregate ($ wheres , $ options , $ columns ): Results
677+ {
678+ try {
679+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
680+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
681+
682+ $ min = 0 ;
683+ $ hasBeenSet = false ;
684+ if (!empty ($ process ->data )) {
685+ foreach ($ process ->data as $ datum ) {
686+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
687+ if (!$ hasBeenSet ) {
688+ $ min = $ datum [$ columns [0 ]];
689+ $ hasBeenSet = true ;
690+ } else {
691+ $ min = min ($ min , $ datum [$ columns [0 ]]);
692+ }
693+
694+ }
695+ }
696+ }
697+
698+ return $ this ->_return ($ min , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
699+ } catch (Exception $ e ) {
700+
701+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
702+ throw new Exception ($ result ->errorMessage );
703+ }
704+
705+ }
706+
707+ private function _maxDistinctAggregate ($ wheres , $ options , $ columns ): Results
708+ {
709+ try {
710+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
711+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
712+
713+ $ max = 0 ;
714+ if (!empty ($ process ->data )) {
715+ foreach ($ process ->data as $ datum ) {
716+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
717+ $ max = max ($ max , $ datum [$ columns [0 ]]);
718+ }
719+ }
720+ }
721+
722+
723+ return $ this ->_return ($ max , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
724+ } catch (Exception $ e ) {
725+
726+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
727+ throw new Exception ($ result ->errorMessage );
728+ }
729+
730+ }
731+
732+
733+ private function _sumDistinctAggregate ($ wheres , $ options , $ columns ): Results
734+ {
735+ try {
736+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
737+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
738+ $ sum = 0 ;
739+ if (!empty ($ process ->data )) {
740+ foreach ($ process ->data as $ datum ) {
741+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
742+ $ sum += $ datum [$ columns [0 ]];
743+ }
744+ }
745+ }
746+
747+ return $ this ->_return ($ sum , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
748+ } catch (Exception $ e ) {
749+
750+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
751+ throw new Exception ($ result ->errorMessage );
752+ }
753+
754+ }
755+
756+ private function _avgDistinctAggregate ($ wheres , $ options , $ columns )
757+ {
758+ try {
759+ $ params = $ this ->buildParams ($ this ->index , $ wheres );
760+ $ process = $ this ->processDistinct ($ wheres , $ options , $ columns );
761+ $ sum = 0 ;
762+ $ count = 0 ;
763+ $ avg = 0 ;
764+ if (!empty ($ process ->data )) {
765+ foreach ($ process ->data as $ datum ) {
766+ if (!empty ($ datum [$ columns [0 ]]) && is_numeric ($ datum [$ columns [0 ]])) {
767+ $ count ++;
768+ $ sum += $ datum [$ columns [0 ]];
769+ }
770+ }
771+ }
772+ if ($ count > 0 ) {
773+ $ avg = $ sum / $ count ;
774+ }
775+
776+
777+ return $ this ->_return ($ avg , $ process ->getMetaData (), $ params , $ this ->_queryTag (__FUNCTION__ ));
778+ } catch (Exception $ e ) {
779+
780+ $ result = $ this ->_returnError ($ e ->getMessage (), $ e ->getCode (), [], $ this ->_queryTag (__FUNCTION__ ));
781+ throw new Exception ($ result ->errorMessage );
782+ }
783+
784+ }
785+
786+ private function _matrixDistinctAggregate ($ wheres , $ options , $ columns ): Results
787+ {
788+ return $ this ->_returnError ('Matrix distinct aggregate not supported ' , 500 , [], $ this ->_queryTag (__FUNCTION__ ));
789+ }
790+
644791
645792 //======================================================================
646793 // Private & Sanitization methods
@@ -675,10 +822,58 @@ private function _sanitizeSearchResponse($response, $params, $queryTag)
675822 return $ this ->_return ($ data , $ meta , $ params , $ queryTag );
676823 }
677824
825+ private function _sanitizeDistinctResponse ($ response , $ columns , $ includeDocCount )
826+ {
827+ $ keys = [];
828+ foreach ($ columns as $ column ) {
829+ $ keys [] = 'by_ ' .$ column ;
830+ }
831+
832+ return $ this ->processBuckets ($ columns , $ keys , $ response , 0 , $ includeDocCount );
833+
834+ }
835+
836+ private function processBuckets ($ columns , $ keys , $ response , $ index , $ includeDocCount , $ currentData = [])
837+ {
838+ $ data = [];
839+
840+ if (!empty ($ response [$ keys [$ index ]]['buckets ' ])) {
841+ foreach ($ response [$ keys [$ index ]]['buckets ' ] as $ res ) {
842+ $ datum = $ currentData ;
843+ $ datum [$ columns [$ index ]] = $ res ['key ' ];
844+ if ($ includeDocCount ) {
845+ $ datum [$ columns [$ index ].'_count ' ] = $ res ['doc_count ' ];
846+ }
847+
848+ if (isset ($ columns [$ index + 1 ])) {
849+ $ nestedData = $ this ->processBuckets ($ columns , $ keys , $ res , $ index + 1 , $ includeDocCount , $ datum );
850+
851+ if (!empty ($ nestedData )) {
852+ $ data = array_merge ($ data , $ nestedData );
853+ } else {
854+ $ data [] = $ datum ;
855+ }
856+ } else {
857+ $ data [] = $ datum ;
858+ }
859+ }
860+ }
861+
862+ return $ data ;
863+ }
864+
678865 private function _return ($ data , $ meta , $ params , $ queryTag ): Results
679866 {
867+ if (is_object ($ meta )) {
868+ $ metaAsArray = [];
869+ if (method_exists ($ meta , 'asArray ' )) {
870+ $ metaAsArray = $ meta ->asArray ();
871+ }
872+ $ results = new Results ($ data , $ metaAsArray , $ params , $ queryTag );
873+ } else {
874+ $ results = new Results ($ data , $ meta , $ params , $ queryTag );
875+ }
680876
681- $ results = new Results ($ data , $ meta , $ params , $ queryTag );
682877 if ($ this ->queryLogger && !$ this ->queryLoggerOnErrorOnly ) {
683878 $ this ->_logQuery ($ results );
684879 }
0 commit comments