@@ -172,6 +172,8 @@ std::unique_ptr<Session> ArrowFlightHandler::createSession(const arrow::flight::
172172
173173void ArrowFlightHandler::start ()
174174{
175+ chassert (!initialized && !stopped);
176+
175177 bool use_tls = server.config ().getBool (" arrowflight.enable_ssl" , false );
176178
177179 arrow::Result<arrow::flight::Location> parse_location_status;
@@ -215,11 +217,15 @@ void ArrowFlightHandler::start()
215217 throw Exception (ErrorCodes::UNKNOWN_EXCEPTION, " Failed init Arrow Flight Server: {}" , init_status.ToString ());
216218 }
217219
220+ initialized = true ;
221+
218222 server_thread.emplace ([this ]
219223 {
220224 try
221225 {
222226 setThreadName (" ArrowFlightSrv" );
227+ if (stopped)
228+ return ;
223229 auto serve_status = Serve ();
224230 if (!serve_status.ok ())
225231 LOG_ERROR (log, " Failed to serve Arrow Flight: {}" , serve_status.ToString ());
@@ -235,13 +241,27 @@ ArrowFlightHandler::~ArrowFlightHandler() = default;
235241
236242void ArrowFlightHandler::stop ()
237243{
238- auto status = Shutdown ();
239- if (!status.ok ())
244+ if (!initialized)
245+ return ;
246+
247+ if (!stopped.exchange (true ))
240248 {
241- throw Exception (ErrorCodes::UNKNOWN_EXCEPTION, " Failed shutdown Arrow Flight: {}" , status.ToString ());
249+ try
250+ {
251+ auto status = Shutdown ();
252+ if (!status.ok ())
253+ LOG_ERROR (log, " Failed to shutdown Arrow Flight: {}" , status.ToString ());
254+ }
255+ catch (...)
256+ {
257+ tryLogCurrentException (log, " Failed to shutdown Arrow Flight" );
258+ }
259+ if (server_thread)
260+ {
261+ server_thread->join ();
262+ server_thread.reset ();
263+ }
242264 }
243- server_thread->join ();
244- server_thread.reset ();
245265}
246266
247267UInt16 ArrowFlightHandler::portNumber () const
0 commit comments