@@ -7,8 +7,8 @@ namespace ExtProc {
77
88NetworkExtProcFilter::NetworkExtProcFilter (ConfigConstSharedPtr config,
99 ExternalProcessorClientPtr&& client)
10- : config_(config), client_(std::move(client )), config_with_hash_key_(config_-> grpcService ( )),
11- downstream_callbacks_(*this ) {}
10+ : config_(config), stats_(config-> stats ( )), client_(std::move(client )),
11+ config_with_hash_key_(config_-> grpcService ()), downstream_callbacks_(*this ) {}
1212
1313NetworkExtProcFilter::~NetworkExtProcFilter () { closeStream (); }
1414
@@ -81,6 +81,7 @@ Network::FilterStatus NetworkExtProcFilter::handleStreamError() {
8181
8282 if (config_->failureModeAllow ()) {
8383 // In failure allow mode, continue processing despite stream errors
84+ stats_.failure_mode_allowed_ .inc ();
8485 return Network::FilterStatus::Continue;
8586 } else {
8687 // In strict mode, close the connection and stop processing
@@ -140,10 +141,12 @@ NetworkExtProcFilter::StreamOpenState NetworkExtProcFilter::openStream() {
140141 if (stream_object == nullptr ) {
141142 ENVOY_CONN_LOG (error, " Failed to create gRPC stream to external processor" ,
142143 read_callbacks_->connection ());
144+ stats_.stream_open_failures_ .inc ();
143145 return StreamOpenState::Error;
144146 }
145147
146148 stream_ = std::move (stream_object);
149+ stats_.streams_started_ .inc ();
147150 return StreamOpenState::Ok;
148151}
149152
@@ -166,14 +169,17 @@ void NetworkExtProcFilter::sendRequest(Buffer::Instance& data, bool end_stream,
166169 auto * read_data = request.mutable_read_data ();
167170 read_data->set_data (data.toString ());
168171 read_data->set_end_of_stream (end_stream);
172+ stats_.read_data_sent_ .inc ();
169173 } else {
170174 auto * write_data = request.mutable_write_data ();
171175 write_data->set_data (data.toString ());
172176 write_data->set_end_of_stream (end_stream);
177+ stats_.write_data_sent_ .inc ();
173178 }
174179
175180 // Send to external processor
176181 stream_->send (std::move (request), false );
182+ stats_.stream_msgs_sent_ .inc ();
177183
178184 // Clear data buffer after sending
179185 data.drain (data.length ());
@@ -183,11 +189,13 @@ void NetworkExtProcFilter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&
183189 if (processing_complete_) {
184190 ENVOY_CONN_LOG (debug, " Ignoring response message: processing already completed" ,
185191 read_callbacks_->connection ());
192+ stats_.spurious_msgs_received_ .inc ();
186193 return ;
187194 }
188195
189196 auto response = std::move (res);
190197 ENVOY_CONN_LOG (debug, " Received response from external processor" , read_callbacks_->connection ());
198+ stats_.stream_msgs_received_ .inc ();
191199
192200 if (response->has_read_data ()) {
193201 const auto & data = response->read_data ();
@@ -197,15 +205,18 @@ void NetworkExtProcFilter::onReceiveMessage(std::unique_ptr<ProcessingResponse>&
197205 Buffer::OwnedImpl buffer (data.data ());
198206 read_callbacks_->injectReadDataToFilterChain (buffer, data.end_of_stream ());
199207 updateCloseCallbackStatus (false , true );
208+ stats_.read_data_injected_ .inc ();
200209 } else if (response->has_write_data ()) {
201210 const auto & data = response->write_data ();
202211 ENVOY_CONN_LOG (trace, " Processing WRITE data response: {} bytes, end_stream={}" ,
203212 read_callbacks_->connection (), data.data ().size (), data.end_of_stream ());
204213 Buffer::OwnedImpl buffer (data.data ());
205214 write_callbacks_->injectWriteDataToFilterChain (buffer, data.end_of_stream ());
206215 updateCloseCallbackStatus (false , false );
216+ stats_.write_data_injected_ .inc ();
207217 } else {
208218 ENVOY_CONN_LOG (debug, " Response contained no data, continuing" , read_callbacks_->connection ());
219+ stats_.empty_response_received_ .inc ();
209220 }
210221}
211222
@@ -216,6 +227,7 @@ void NetworkExtProcFilter::onGrpcError(Grpc::Status::GrpcStatus status,
216227 // Mark processing as complete to avoid further gRPC calls
217228 processing_complete_ = true ;
218229 closeStream ();
230+ stats_.streams_grpc_error_ .inc ();
219231
220232 // If failure mode is not to allow, close the connection
221233 if (!config_->failureModeAllow ()) {
@@ -224,11 +236,14 @@ void NetworkExtProcFilter::onGrpcError(Grpc::Status::GrpcStatus status,
224236 closeConnection (" ext_proc_grpc_error" );
225237 return ;
226238 }
239+
240+ stats_.failure_mode_allowed_ .inc ();
227241}
228242
229243void NetworkExtProcFilter::onGrpcClose () {
230244 ENVOY_CONN_LOG (debug, " gRPC stream closed by peer" , read_callbacks_->connection ());
231245 processing_complete_ = true ;
246+ stats_.streams_grpc_close_ .inc ();
232247 closeStream ();
233248}
234249
@@ -238,6 +253,9 @@ void NetworkExtProcFilter::closeStream() {
238253 }
239254
240255 bool closed = stream_->close ();
256+ if (closed) {
257+ stats_.streams_closed_ .inc ();
258+ }
241259 ENVOY_CONN_LOG (debug, " Stream closed: {}" , read_callbacks_->connection (), closed);
242260 stream_ = nullptr ;
243261}
@@ -249,6 +267,7 @@ void NetworkExtProcFilter::closeConnection(const std::string& reason) {
249267 read_callbacks_->disableClose (false );
250268 write_callbacks_->disableClose (false );
251269 read_callbacks_->connection ().close (Network::ConnectionCloseType::FlushWrite, reason);
270+ stats_.connections_closed_ .inc ();
252271}
253272
254273} // namespace ExtProc
0 commit comments