@@ -37,6 +37,7 @@ use datafusion::datasource::DefaultTableSource;
3737use futures:: Stream ;
3838use futures:: stream:: StreamExt ;
3939use query:: parser:: PromQuery ;
40+ use servers:: http:: prom_store:: PHYSICAL_TABLE_PARAM ;
4041use servers:: interceptor:: { GrpcQueryInterceptor , GrpcQueryInterceptorRef } ;
4142use servers:: query_handler:: grpc:: GrpcQueryHandler ;
4243use servers:: query_handler:: sql:: SqlQueryHandler ;
@@ -73,10 +74,20 @@ impl GrpcQueryHandler for Instance {
7374
7475 let output = match request {
7576 Request :: Inserts ( requests) => self . handle_inserts ( requests, ctx. clone ( ) ) . await ?,
76- Request :: RowInserts ( requests) => {
77- self . handle_row_inserts ( requests, ctx. clone ( ) , false , false )
77+ Request :: RowInserts ( requests) => match ctx. extension ( PHYSICAL_TABLE_PARAM ) {
78+ Some ( physical_table) => {
79+ self . handle_metric_row_inserts (
80+ requests,
81+ ctx. clone ( ) ,
82+ physical_table. to_string ( ) ,
83+ )
7884 . await ?
79- }
85+ }
86+ None => {
87+ self . handle_row_inserts ( requests, ctx. clone ( ) , false , false )
88+ . await ?
89+ }
90+ } ,
8091 Request :: Deletes ( requests) => self . handle_deletes ( requests, ctx. clone ( ) ) . await ?,
8192 Request :: RowDeletes ( requests) => self . handle_row_deletes ( requests, ctx. clone ( ) ) . await ?,
8293 Request :: Query ( query_request) => {
0 commit comments