35
35
import com .google .firestore .v1 .ExecutePipelineRequest ;
36
36
import com .google .firestore .v1 .ExecutePipelineResponse ;
37
37
import com .google .firestore .v1 .StructuredPipeline ;
38
- import com .google .firestore .v1 .Value ;
39
38
import io .opencensus .trace .AttributeValue ;
40
39
import io .opencensus .trace .Tracing ;
41
40
import java .util .ArrayList ;
98
97
public final class Pipeline {
99
98
private static Logger logger = Logger .getLogger (Pipeline .class .getName ());
100
99
private final FluentIterable <Stage > stages ;
101
- private final Firestore db ;
100
+ private final FirestoreRpcContext <?> rpcContext ;
102
101
103
- private Pipeline (Firestore db , FluentIterable <Stage > stages ) {
104
- this .db = db ;
102
+ private Pipeline (FirestoreRpcContext <?> rpcContext , FluentIterable <Stage > stages ) {
103
+ this .rpcContext = rpcContext ;
105
104
this .stages = stages ;
106
105
}
107
106
108
107
@ InternalApi
109
- Pipeline (Firestore db , Stage stage ) {
110
- this (db , FluentIterable .of (stage ));
108
+ Pipeline (FirestoreRpcContext <?> rpcContext , Stage stage ) {
109
+ this (rpcContext , FluentIterable .of (stage ));
111
110
}
112
111
113
112
private Pipeline append (Stage stage ) {
114
- return new Pipeline (this .db , stages .append (stage ));
113
+ return new Pipeline (this .rpcContext , stages .append (stage ));
115
114
}
116
115
117
116
/**
@@ -607,47 +606,29 @@ public Pipeline genericStage(String name, List<Object> params) {
607
606
*/
608
607
@ BetaApi
609
608
public ApiFuture <List <PipelineResult >> execute () {
610
- if (db instanceof FirestoreImpl ) {
611
- FirestoreImpl firestoreImpl = (FirestoreImpl ) db ;
612
- Value pipelineValue = toProto ();
613
- ExecutePipelineRequest request =
614
- ExecutePipelineRequest .newBuilder ()
615
- .setDatabase (firestoreImpl .getResourcePath ().getDatabaseName ().toString ())
616
- .setStructuredPipeline (
617
- StructuredPipeline .newBuilder ()
618
- .setPipeline (pipelineValue .getPipelineValue ())
619
- .build ())
620
- .build ();
621
-
622
- SettableApiFuture <List <PipelineResult >> futureResult = SettableApiFuture .create ();
623
-
624
- pipelineInternalStream ( // Assuming you have this method
625
- firestoreImpl ,
626
- request ,
627
- new PipelineResultObserver () {
628
- final List <PipelineResult > results = new ArrayList <>();
629
-
630
- @ Override
631
- public void onCompleted () {
632
- futureResult .set (results );
633
- }
609
+ SettableApiFuture <List <PipelineResult >> futureResult = SettableApiFuture .create ();
634
610
635
- @ Override
636
- public void onNext (PipelineResult result ) {
637
- results .add (result );
638
- }
611
+ execute ( // Assuming you have this method
612
+ new PipelineResultObserver () {
613
+ final List <PipelineResult > results = new ArrayList <>();
639
614
640
- @ Override
641
- public void onError (Throwable t ) {
642
- futureResult .setException (t );
643
- }
644
- });
615
+ @ Override
616
+ public void onCompleted () {
617
+ futureResult .set (results );
618
+ }
619
+
620
+ @ Override
621
+ public void onNext (PipelineResult result ) {
622
+ results .add (result );
623
+ }
624
+
625
+ @ Override
626
+ public void onError (Throwable t ) {
627
+ futureResult .setException (t );
628
+ }
629
+ });
645
630
646
- return futureResult ;
647
- } else {
648
- // Handle unsupported Firestore types
649
- throw new IllegalArgumentException ("Unsupported Firestore type" );
650
- }
631
+ return futureResult ;
651
632
}
652
633
653
634
/**
@@ -697,60 +678,46 @@ public void onError(Throwable t) {
697
678
*/
698
679
@ BetaApi
699
680
public void execute (ApiStreamObserver <PipelineResult > observer ) {
700
- if (db instanceof FirestoreImpl ) {
701
- FirestoreImpl firestoreImpl = (FirestoreImpl ) db ;
702
- Value pipelineValue = toProto ();
703
- ExecutePipelineRequest request =
704
- ExecutePipelineRequest .newBuilder ()
705
- .setDatabase (firestoreImpl .getResourcePath ().getDatabaseName ().toString ())
706
- .setStructuredPipeline (
707
- StructuredPipeline .newBuilder ()
708
- .setPipeline (pipelineValue .getPipelineValue ())
709
- .build ())
710
- .build ();
711
-
712
- pipelineInternalStream (
713
- firestoreImpl ,
714
- request ,
715
- new PipelineResultObserver () {
716
- @ Override
717
- public void onCompleted () {
718
- observer .onCompleted ();
719
- }
681
+ ExecutePipelineRequest request =
682
+ ExecutePipelineRequest .newBuilder ()
683
+ .setDatabase (rpcContext .getDatabaseName ())
684
+ .setStructuredPipeline (StructuredPipeline .newBuilder ().setPipeline (toProto ()).build ())
685
+ .build ();
686
+
687
+ pipelineInternalStream (
688
+ request ,
689
+ new PipelineResultObserver () {
690
+ @ Override
691
+ public void onCompleted () {
692
+ observer .onCompleted ();
693
+ }
720
694
721
- @ Override
722
- public void onNext (PipelineResult result ) {
723
- observer .onNext (result );
724
- }
695
+ @ Override
696
+ public void onNext (PipelineResult result ) {
697
+ observer .onNext (result );
698
+ }
725
699
726
- @ Override
727
- public void onError (Throwable t ) {
728
- observer .onError (t );
729
- }
730
- });
731
- } else {
732
- // Handle unsupported Firestore types
733
- throw new IllegalArgumentException ("Unsupported Firestore type" );
734
- }
700
+ @ Override
701
+ public void onError (Throwable t ) {
702
+ observer .onError (t );
703
+ }
704
+ });
735
705
}
736
706
737
- private Value toProto () {
738
- return Value .newBuilder ()
739
- .setPipelineValue (
740
- com .google .firestore .v1 .Pipeline .newBuilder ()
741
- .addAllStages (stages .transform (StageUtils ::toStageProto )))
707
+ private com .google .firestore .v1 .Pipeline toProto () {
708
+ return com .google .firestore .v1 .Pipeline .newBuilder ()
709
+ .addAllStages (stages .transform (StageUtils ::toStageProto ))
742
710
.build ();
743
711
}
744
712
745
713
private void pipelineInternalStream (
746
- FirestoreImpl rpcContext ,
747
- ExecutePipelineRequest request ,
748
- PipelineResultObserver resultObserver ) {
714
+ ExecutePipelineRequest request , PipelineResultObserver resultObserver ) {
749
715
ResponseObserver <ExecutePipelineResponse > observer =
750
716
new ResponseObserver <ExecutePipelineResponse >() {
751
717
Timestamp executionTime = null ;
752
718
boolean firstResponse = false ;
753
719
int numDocuments = 0 ;
720
+ int totalNumDocuments = 0 ;
754
721
boolean hasCompleted = false ;
755
722
756
723
@ Override
@@ -769,15 +736,16 @@ public void onResponse(ExecutePipelineResponse response) {
769
736
}
770
737
if (response .getResultsCount () > 0 ) {
771
738
numDocuments += response .getResultsCount ();
772
- if (numDocuments % 100 == 0 ) {
739
+ totalNumDocuments += response .getResultsCount ();
740
+ if (numDocuments > 100 ) {
773
741
Tracing .getTracer ()
774
742
.getCurrentSpan ()
775
- .addAnnotation ("Firestore.Query: Received 100 documents" );
743
+ .addAnnotation ("Firestore.Query: Received " + numDocuments + " documents" );
744
+ numDocuments = 0 ;
776
745
}
746
+ Timestamp executionTime = Timestamp .fromProto (response .getExecutionTime ());
777
747
for (Document doc : response .getResultsList ()) {
778
- resultObserver .onNext (
779
- PipelineResult .fromDocument (
780
- rpcContext , Timestamp .fromProto (response .getExecutionTime ()), doc ));
748
+ resultObserver .onNext (PipelineResult .fromDocument (rpcContext , executionTime , doc ));
781
749
}
782
750
}
783
751
@@ -804,7 +772,7 @@ public void onComplete() {
804
772
.addAnnotation (
805
773
"Firestore.ExecutePipeline: Completed" ,
806
774
ImmutableMap .of (
807
- "numDocuments" , AttributeValue .longAttributeValue ((long ) numDocuments )));
775
+ "numDocuments" , AttributeValue .longAttributeValue ((long ) totalNumDocuments )));
808
776
resultObserver .onCompleted (executionTime );
809
777
}
810
778
};
0 commit comments