@@ -15,7 +15,7 @@ pub struct QueryExecutor<T: DeserializeOwned> {
1515 items_link : ResourceLink ,
1616 context : Context < ' static > ,
1717 query_engine : QueryEngineRef ,
18- base_request : Request ,
18+ base_request : Option < Request > ,
1919 query : Query ,
2020 pipeline : Option < OwnedQueryPipeline > ,
2121
@@ -37,15 +37,13 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
3737 ) -> azure_core:: Result < Self > {
3838 let items_link = container_link. feed ( ResourceType :: Items ) ;
3939 let context = options. method_options . context . into_owned ( ) ;
40- let base_request =
41- pipeline:: create_base_query_request ( http_pipeline. url ( & items_link) , & query) ?;
4240 Ok ( Self {
4341 http_pipeline,
4442 container_link,
4543 items_link,
4644 context,
4745 query_engine,
48- base_request,
46+ base_request : None ,
4947 query,
5048 pipeline : None ,
5149 phantom : std:: marker:: PhantomData ,
@@ -69,8 +67,13 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
6967 /// An item to yield, or None if execution is complete.
7068 #[ tracing:: instrument( skip_all) ]
7169 async fn step ( & mut self ) -> azure_core:: Result < Option < FeedPage < T > > > {
72- let pipeline = match self . pipeline . as_mut ( ) {
73- Some ( pipeline) => pipeline,
70+ let ( pipeline, base_request) = match self . pipeline . as_mut ( ) {
71+ Some ( pipeline) => (
72+ pipeline,
73+ self . base_request
74+ . as_ref ( )
75+ . expect ( "base_request should be set when pipeline is set" ) ,
76+ ) ,
7477 None => {
7578 // Initialize the pipeline.
7679 let query_plan = get_query_plan (
@@ -97,8 +100,16 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
97100 let pipeline =
98101 self . query_engine
99102 . create_pipeline ( & self . query . text , & query_plan, & pkranges) ?;
103+ self . query . text = pipeline. query ( ) . into ( ) ;
104+ self . base_request = Some ( crate :: pipeline:: create_base_query_request (
105+ self . http_pipeline . url ( & self . items_link ) ,
106+ & self . query ,
107+ ) ?) ;
100108 self . pipeline = Some ( pipeline) ;
101- self . pipeline . as_mut ( ) . expect ( "we just set it" )
109+ (
110+ self . pipeline . as_mut ( ) . unwrap ( ) ,
111+ self . base_request . as_ref ( ) . unwrap ( ) ,
112+ )
102113 }
103114 } ;
104115
@@ -113,7 +124,7 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
113124 let items = results
114125 . items
115126 . into_iter ( )
116- . map ( |item| serde_json:: from_slice :: < T > ( & item) )
127+ . map ( |item| serde_json:: from_str :: < T > ( item. get ( ) ) )
117128 . collect :: < Result < Vec < _ > , _ > > ( ) ?;
118129
119130 // TODO: Provide a continuation token.
@@ -122,7 +133,7 @@ impl<T: DeserializeOwned + 'static> QueryExecutor<T> {
122133
123134 // No items, so make any requests we need to make and provide them to the pipeline.
124135 for request in results. requests {
125- let mut query_request = self . base_request . clone ( ) ;
136+ let mut query_request = base_request. clone ( ) ;
126137 query_request. insert_header (
127138 constants:: PARTITION_KEY_RANGE_ID ,
128139 request. partition_key_range_id . clone ( ) ,
0 commit comments