@@ -116,7 +116,7 @@ TEST_F(LiveThreadedTests, TestStop) {
116116 TimeDeltaNanos{},
117117 100 };
118118 std::atomic<std::uint32_t > call_count{};
119- const mock::MockLsgServer mock_server{
119+ std::unique_ptr< mock::MockLsgServer> mock_server{ new mock::MockLsgServer {
120120 dataset::kXnasItch , [&kRec , &call_count](mock::MockLsgServer& self) {
121121 self.Accept ();
122122 self.Authenticate ();
@@ -129,17 +129,14 @@ TEST_F(LiveThreadedTests, TestStop) {
129129 std::this_thread::sleep_for (std::chrono::milliseconds{50 });
130130 const std::string rec_str{reinterpret_cast <const char *>(&kRec ),
131131 sizeof (kRec )};
132- for (size_t i = 0 ; i < 5 ; ++i) {
133- if (self.UncheckedSend (rec_str) <
134- static_cast <::ssize_t >(rec_str.size ())) {
135- return ;
136- }
132+ while (self.UncheckedSend (rec_str) <
133+ static_cast <::ssize_t >(rec_str.size ())) {
137134 }
138- FAIL () << " Connection remained open" ;
139- }};
135+ }}};
140136
141- LiveThreaded target{logger_.get (), kKey , dataset::kXnasItch , " 127.0.0.1" ,
142- mock_server.Port (), false };
137+ LiveThreaded target{logger_.get (), kKey ,
138+ dataset::kXnasItch , " 127.0.0.1" ,
139+ mock_server->Port (), false };
143140 target.Start (
144141 [kSchema ](Metadata&& metadata) { EXPECT_EQ (metadata.schema , kSchema ); },
145142 [&call_count, &kRec ](const Record& rec) {
@@ -149,10 +146,9 @@ TEST_F(LiveThreadedTests, TestStop) {
149146 EXPECT_EQ (rec.Get <MboMsg>(), kRec );
150147 return databento::KeepGoing::Stop;
151148 });
152- while (call_count < 1 ) {
153- std::this_thread::yield ();
154- }
155- std::this_thread::sleep_for (std::chrono::milliseconds{50 });
149+ // kill mock server and join thread before client goes out of scope
150+ // to ensure Stop is killing the connection, not the client's destructor
151+ mock_server.reset ();
156152}
157153} // namespace test
158154} // namespace databento
0 commit comments