@@ -114,34 +114,22 @@ impl local::Exporter<OtapPdata> for OTAPExporter {
114114 ) )
115115 . await ;
116116
117+ let exporter_id = effect_handler. exporter_id ( ) ;
118+ let channel = Channel :: from_shared ( self . config . grpc_endpoint . clone ( ) )
119+ . map_err ( |e| Error :: ExporterError {
120+ exporter : exporter_id,
121+ error : format ! ( "grpc channel error {e}" ) ,
122+ } ) ?
123+ . connect_lazy ( ) ;
124+
117125 let timer_cancel_handle = effect_handler
118126 . start_periodic_telemetry ( Duration :: from_secs ( 1 ) )
119127 . await ?;
120128
121129 // start a grpc client and connect to the server
122- let mut arrow_metrics_client =
123- ArrowMetricsServiceClient :: connect ( self . config . grpc_endpoint . clone ( ) )
124- . await
125- . map_err ( |error| Error :: ExporterError {
126- exporter : effect_handler. exporter_id ( ) ,
127- error : error. to_string ( ) ,
128- } ) ?;
129-
130- let mut arrow_logs_client =
131- ArrowLogsServiceClient :: connect ( self . config . grpc_endpoint . clone ( ) )
132- . await
133- . map_err ( |error| Error :: ExporterError {
134- exporter : effect_handler. exporter_id ( ) ,
135- error : error. to_string ( ) ,
136- } ) ?;
137-
138- let mut arrow_traces_client =
139- ArrowTracesServiceClient :: connect ( self . config . grpc_endpoint . clone ( ) )
140- . await
141- . map_err ( |error| Error :: ExporterError {
142- exporter : effect_handler. exporter_id ( ) ,
143- error : error. to_string ( ) ,
144- } ) ?;
130+ let mut arrow_metrics_client = ArrowMetricsServiceClient :: new ( channel. clone ( ) ) ;
131+ let mut arrow_logs_client = ArrowLogsServiceClient :: new ( channel. clone ( ) ) ;
132+ let mut arrow_traces_client = ArrowTracesServiceClient :: new ( channel. clone ( ) ) ;
145133
146134 if let Some ( ref compression) = self . config . compression_method {
147135 let encoding = compression. map_to_compression_encoding ( ) ;
@@ -453,13 +441,24 @@ mod tests {
453441 use crate :: compression:: CompressionMethod ;
454442 use otap_df_config:: node:: NodeUserConfig ;
455443 use otap_df_engine:: context:: ControllerContext ;
444+ use otap_df_engine:: control:: Controllable ;
445+ use otap_df_engine:: control:: NodeControlMsg ;
446+ use otap_df_engine:: control:: PipelineCtrlMsgSender ;
447+ use otap_df_engine:: control:: pipeline_ctrl_msg_channel;
456448 use otap_df_engine:: error:: Error ;
457449 use otap_df_engine:: exporter:: ExporterWrapper ;
450+ use otap_df_engine:: local:: message:: LocalReceiver ;
451+ use otap_df_engine:: local:: message:: LocalSender ;
452+ use otap_df_engine:: message:: Receiver ;
453+ use otap_df_engine:: message:: Sender ;
454+ use otap_df_engine:: node:: NodeWithPDataReceiver ;
455+ use otap_df_engine:: testing:: create_not_send_channel;
458456 use otap_df_engine:: testing:: {
459457 exporter:: { TestContext , TestRuntime } ,
460458 test_node,
461459 } ;
462460 use otap_df_telemetry:: registry:: MetricsRegistryHandle ;
461+ use otap_df_telemetry:: reporter:: MetricsReporter ;
463462 use otel_arrow_rust:: otap:: OtapArrowRecords ;
464463 use otel_arrow_rust:: proto:: opentelemetry:: arrow:: v1:: {
465464 ArrowPayloadType , arrow_logs_service_server:: ArrowLogsServiceServer ,
@@ -736,4 +735,173 @@ mod tests {
736735 exporter. config. arrow. payload_compression
737736 ) ;
738737 }
738+
739+ #[ test]
740+ fn test_receiver_not_ready_on_start ( ) {
741+ let grpc_addr = "127.0.0.1" ;
742+ let grpc_port = portpicker:: pick_unused_port ( ) . expect ( "No free ports" ) ;
743+ let grpc_endpoint = format ! ( "http://{grpc_addr}:{grpc_port}" ) ;
744+ let tokio_rt = Runtime :: new ( ) . unwrap ( ) ;
745+
746+ let test_runtime = TestRuntime :: < OtapPdata > :: new ( ) ;
747+ let node_config = Arc :: new ( NodeUserConfig :: new_exporter_config ( OTAP_EXPORTER_URN ) ) ;
748+ let metrics_registry_handle = MetricsRegistryHandle :: new ( ) ;
749+ let controller_ctx = ControllerContext :: new ( metrics_registry_handle) ;
750+ let node_id = test_node ( test_runtime. config ( ) . name . clone ( ) ) ;
751+ let pipeline_ctx =
752+ controller_ctx. pipeline_context_with ( "grp" . into ( ) , "pipeline" . into ( ) , 0 , 0 ) ;
753+
754+ let mut exporter = ExporterWrapper :: local (
755+ OTAPExporter :: from_config (
756+ pipeline_ctx,
757+ & serde_json:: json!( {
758+ "grpc_endpoint" : grpc_endpoint,
759+ "compression_method" : "none"
760+ } ) ,
761+ )
762+ . unwrap ( ) ,
763+ node_id. clone ( ) ,
764+ node_config,
765+ test_runtime. config ( ) ,
766+ ) ;
767+
768+ let control_sender = exporter. control_sender ( ) ;
769+ let ( pdata_tx, pdata_rx) = create_not_send_channel :: < OtapPdata > ( 1 ) ;
770+ let pdata_tx = Sender :: Local ( LocalSender :: MpscSender ( pdata_tx) ) ;
771+ let pdata_rx = Receiver :: Local ( LocalReceiver :: MpscReceiver ( pdata_rx) ) ;
772+ let ( pipeline_ctrl_msg_tx, _pipeline_ctrl_msg_rx) = pipeline_ctrl_msg_channel ( 2 ) ;
773+ exporter
774+ . set_pdata_receiver ( node_id. clone ( ) , pdata_rx)
775+ . expect ( "Failed to set PData Receiver" ) ;
776+
777+ let ( req_sender, req_receiver) = tokio:: sync:: mpsc:: channel ( 1 ) ;
778+ let ( server_startup_sender, mut server_startup_receiver) = tokio:: sync:: mpsc:: channel ( 1 ) ;
779+ let ( server_start_ack_sender, server_start_ack_receiver) = tokio:: sync:: mpsc:: channel ( 1 ) ;
780+ let ( server_shutdown_sender, server_shutdown_signal) = tokio:: sync:: oneshot:: channel ( ) ;
781+
782+ async fn start_exporter (
783+ exporter : ExporterWrapper < OtapPdata > ,
784+ pipeline_ctrl_msg_tx : PipelineCtrlMsgSender < OtapPdata > ,
785+ ) -> Result < ( ) , Error > {
786+ _ = exporter. start ( pipeline_ctrl_msg_tx) . await ;
787+ Ok ( ( ) )
788+ }
789+
790+ async fn drive_test (
791+ server_startup_sender : tokio:: sync:: mpsc:: Sender < bool > ,
792+ mut server_startup_ack_receiver : tokio:: sync:: mpsc:: Receiver < bool > ,
793+ server_shutdown_sender1 : tokio:: sync:: oneshot:: Sender < bool > ,
794+ pdata_tx : Sender < OtapPdata > ,
795+ control_sender : Sender < NodeControlMsg < OtapPdata > > ,
796+ mut req_receiver : tokio:: sync:: mpsc:: Receiver < OtapPdata > ,
797+ ) {
798+ // send a request before while the server isn't running and check how we handle it
799+ let log_message = create_otap_batch ( LOG_BATCH_ID , ArrowPayloadType :: Logs ) ;
800+ pdata_tx
801+ . send ( OtapPdata :: new_default ( log_message. into ( ) ) )
802+ . await
803+ . expect ( "Failed to send log message" ) ;
804+ // TODO instead of sleeping here, once we handle ACK/NACK we should wait to get a NACK
805+ // from the control channel
806+ tokio:: time:: sleep ( Duration :: from_millis ( 5 ) ) . await ;
807+
808+ // wait a bit before starting the server. This will ensure the exporter no-long exits
809+ // when start is called if the endpoint can't be reached
810+ tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
811+ server_startup_sender. send ( true ) . await . unwrap ( ) ;
812+ _ = server_startup_ack_receiver. recv ( ) . await . unwrap ( ) ;
813+
814+ // send another pdata now that the server has started
815+ let log_message = create_otap_batch ( LOG_BATCH_ID + 1 , ArrowPayloadType :: Logs ) ;
816+ pdata_tx
817+ . send ( OtapPdata :: new_default ( log_message. into ( ) ) )
818+ . await
819+ . expect ( "Failed to send log message" ) ;
820+ _ = req_receiver. recv ( ) . await . unwrap ( ) ; // ensure we got response
821+ // TODO instead of sleeping here, once we handle ACK/NACK we should wait to get a ACK
822+ // from the control channel
823+ tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
824+
825+ // check the metrics:
826+ let ( metrics_rx, metrics_reporter) = MetricsReporter :: create_new_and_receiver ( 32 ) ;
827+ control_sender
828+ . send ( NodeControlMsg :: CollectTelemetry {
829+ metrics_reporter : metrics_reporter. clone ( ) ,
830+ } )
831+ . await
832+ . unwrap ( ) ;
833+ let metrics = metrics_rx. recv_async ( ) . await . unwrap ( ) ;
834+ let logs_exported_count = metrics. get_metrics ( ) [ 4 ] ; // logs exported
835+ assert_eq ! ( logs_exported_count, 1 ) ;
836+ let logs_failed_count = metrics. get_metrics ( ) [ 5 ] ; // logs failed
837+ assert_eq ! ( logs_failed_count, 1 ) ;
838+
839+ control_sender
840+ . send ( NodeControlMsg :: Shutdown {
841+ deadline : Duration :: from_millis ( 10 ) ,
842+ reason : "shutting down" . into ( ) ,
843+ } )
844+ . await
845+ . unwrap ( ) ;
846+
847+ server_shutdown_sender1. send ( true ) . unwrap ( ) ;
848+ }
849+
850+ async fn run_server (
851+ listening_addr : String ,
852+ startup_ack_sender : tokio:: sync:: mpsc:: Sender < bool > ,
853+ shutdown_signal : tokio:: sync:: oneshot:: Receiver < bool > ,
854+ req_sender : tokio:: sync:: mpsc:: Sender < OtapPdata > ,
855+ ) {
856+ let listening_addr: SocketAddr = listening_addr. to_string ( ) . parse ( ) . unwrap ( ) ;
857+ let tcp_listener = TcpListener :: bind ( listening_addr) . await . unwrap ( ) ;
858+ let tcp_stream = TcpListenerStream :: new ( tcp_listener) ;
859+
860+ let logs_service = ArrowLogsServiceServer :: new ( ArrowLogsServiceMock :: new ( req_sender) ) ;
861+
862+ Server :: builder ( )
863+ . add_service ( logs_service)
864+ . serve_with_incoming_shutdown ( tcp_stream, async {
865+ startup_ack_sender. send ( true ) . await . unwrap ( ) ;
866+ let _ = shutdown_signal. await ;
867+ } )
868+ . await
869+ . expect ( "uh oh server failed" ) ;
870+ }
871+
872+ let server_handle = tokio_rt. spawn ( async move {
873+ let listening_addr = format ! ( "{grpc_addr}:{grpc_port}" ) ;
874+
875+ // wait for signal to start the server
876+ _ = server_startup_receiver. recv ( ) . await . unwrap ( ) ;
877+ run_server (
878+ listening_addr. clone ( ) ,
879+ server_start_ack_sender. clone ( ) ,
880+ server_shutdown_signal,
881+ req_sender. clone ( ) ,
882+ )
883+ . await ;
884+ } ) ;
885+
886+ let _ = tokio_rt. block_on ( async move {
887+ let local_set = tokio:: task:: LocalSet :: new ( ) ;
888+ let _fut = local_set
889+ . spawn_local ( async move { start_exporter ( exporter, pipeline_ctrl_msg_tx) . await } ) ;
890+ tokio:: join!(
891+ local_set,
892+ drive_test(
893+ server_startup_sender,
894+ server_start_ack_receiver,
895+ server_shutdown_sender,
896+ pdata_tx,
897+ control_sender,
898+ req_receiver
899+ )
900+ )
901+ } ) ;
902+
903+ tokio_rt
904+ . block_on ( server_handle)
905+ . expect ( "server shutdown success" ) ;
906+ }
739907}
0 commit comments