Skip to content

Commit 2372319

Browse files
committed
hived: ignore the invalid connection from forked child processes
1 parent 913432c commit 2372319

File tree

10 files changed

+167
-26
lines changed

10 files changed

+167
-26
lines changed

src/client/hive_server.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,10 @@ void TrHiveServer::onCreateClient(hive_comm::TrCreateClientRequest &req, hive_co
209209
}
210210
isChild = true;
211211
running = false;
212+
213+
auto client = sender.getClient();
214+
assert(client != nullptr);
215+
client->disconnect(); // Disconnect the client at the child process.
212216
}
213217
}
214218

src/common/hive/sender.hpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ namespace hive_comm
3535

3636
if (TR_UNLIKELY(message == nullptr))
3737
{
38-
DEBUG(LOG_TAG_CONTENT, "Failed to serialize HiveCommand(%d)", command.type);
38+
DEBUG(LOG_TAG_ERROR, "Failed to create message for hive command: type=%d", command.type);
3939
return false;
4040
}
4141

@@ -44,11 +44,23 @@ namespace hive_comm
4444
auto success = message->serialize(&data, &size);
4545
delete message;
4646
if (!success)
47+
{
48+
DEBUG(LOG_TAG_ERROR, "Failed to serialize HiveCommand(%d) from message", command.type);
4749
return false;
50+
}
4851

4952
auto r = sendRaw(data, size);
5053
free(data);
51-
return r;
54+
55+
if (!r)
56+
{
57+
DEBUG(LOG_TAG_ERROR, "Failed to send HiveCommand(%d) to the peer", command.type);
58+
return false;
59+
}
60+
else
61+
{
62+
return true;
63+
}
5264
}
5365
};
5466
}

src/common/ipc.hpp

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,29 @@ namespace ipc
148148
}
149149

150150
public:
151+
inline int getFd() const
152+
{
153+
return fd;
154+
}
155+
156+
inline TrOneShotClient<T> *getClient() const
157+
{
158+
return client;
159+
}
160+
151161
bool send(T data)
152162
{
153163
return sendRaw(&data, sizeof(data));
154164
}
155165
bool sendRaw(const void *data, size_t size)
156166
{
157-
if (fd == -1 || client == nullptr || client->invalid())
167+
if (fd == -1 ||
168+
client == nullptr ||
169+
client->invalid())
170+
{
171+
DEBUG(LOG_TAG_ERROR, "Failed to send data because the client is invalid or disconnected.");
158172
return false;
173+
}
159174

160175
int bytesSent = 0;
161176
while (bytesSent < size)
@@ -168,7 +183,7 @@ namespace ipc
168183
continue;
169184
if (errno == ECONNRESET || errno == EPIPE)
170185
client->invalid(true);
171-
DEBUG(LOG_TAG_IPC, "Failed to send data(bytes=%d): %s", remaining, strerror(errno));
186+
DEBUG(LOG_TAG_ERROR, "Failed to send data(bytes=%d): %s", remaining, strerror(errno));
172187
return false;
173188
}
174189
else
@@ -202,7 +217,15 @@ namespace ipc
202217
}
203218

204219
public:
205-
int getFd() { return fd; }
220+
inline int getFd() const
221+
{
222+
return fd;
223+
}
224+
225+
inline TrOneShotClient<T> *getClient() const
226+
{
227+
return client;
228+
}
206229

207230
/**
208231
* Try to receive a new instance T with a timeout.
@@ -342,7 +365,7 @@ namespace ipc
342365
disconnect();
343366
}
344367

345-
private:
368+
public:
346369
bool connect(int port, bool blocking)
347370
{
348371
if (connected == true)
@@ -377,6 +400,8 @@ namespace ipc
377400
}
378401
connected = false;
379402
}
403+
404+
private:
380405
bool sendHandshake(int customId)
381406
{
382407
if (fd == -1 || connected == false)

src/common/ipc_message.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,10 @@ namespace ipc
173173

174174
char *buffer = (char *)malloc(bufferSize);
175175
if (buffer == nullptr)
176+
{
177+
DEBUG(LOG_TAG_ERROR, "[OOM] Failed to allocate memory(%zu bytes) for IPC message", bufferSize);
176178
return false; // out of memory
179+
}
177180

178181
size_t offset = 0;
179182
// Write header

src/runtime/constellation.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ uint32_t TrConstellation::open(string url, optional<TrDocumentRequestInit> init)
121121
auto content = contentManager->makeContent();
122122
if (content == nullptr)
123123
{
124-
DEBUG(LOG_TAG_ERROR, "Failed to create a new content: %s", url.c_str());
124+
DEBUG(LOG_TAG_ERROR, "Failed to select or create a new content process: %s", url.c_str());
125125
return 0;
126126
}
127127

src/runtime/content.cpp

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,22 @@ void TrContentRuntime::preStart()
3333
// Send the create process request to the hive daemon.
3434
TrDocumentRequestInit init;
3535
init.id = id;
36-
contentManager->hived->createClient(init, [this](pid_t pid)
37-
{ this->pid = pid; });
38-
reportDocumentEvent(TrDocumentEventType::SpawnProcess);
36+
37+
auto setupContentOnCreated = [this](pid_t pid)
38+
{
39+
DEBUG(LOG_TAG_CONTENT, "Content(%d) is created with pid: %d", id, pid);
40+
assert(pid > 0 && "The process ID should be valid.");
41+
this->pid = pid;
42+
};
43+
if (contentManager->hived->createClient(init, setupContentOnCreated))
44+
{
45+
reportDocumentEvent(TrDocumentEventType::SpawnProcess);
46+
}
47+
else
48+
{
49+
DEBUG(LOG_TAG_ERROR, "Failed to pre-start the content(%d) process.", id);
50+
available = false; // If the process creation fails, set available to false.
51+
}
3952
}
4053

4154
void TrContentRuntime::start(TrDocumentRequestInit &init)
@@ -63,16 +76,15 @@ void TrContentRuntime::resume()
6376

6477
void TrContentRuntime::dispose(bool waitsForExit)
6578
{
66-
assert(pid != 0);
67-
if (pid == INVALID_PID) // The process is not started or exited.
68-
return;
69-
7079
available = false;
7180
disableRendering = true;
7281
contentManager->constellation->mediaManager->removeSoundSourcesByContent(id); // Remove the sound sources.
7382
contentManager->hived->terminateClient(id);
83+
7484
if (waitsForExit)
7585
{
86+
assert(pid != INVALID_PID);
87+
7688
DEBUG(LOG_TAG_CONTENT, "Waiting for the content(%d) to exit...", id);
7789
unique_lock<mutex> lock(exitingMutex);
7890
exitedCv.wait(lock, [this]

src/runtime/content.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,14 @@ class TrContentRuntime final : public std::enable_shared_from_this<TrContentRunt
232232
bool removeXRSession(xr::TrXRSession *session);
233233

234234
private:
235+
inline bool isUsed() const
236+
{
237+
return used.load();
238+
}
239+
inline void setUsed()
240+
{
241+
used = true;
242+
}
235243
void recvCommandBuffers(WorkerThread &worker, uint32_t timeout);
236244
void recvEvent();
237245
void recvMediaRequest();
@@ -297,7 +305,7 @@ class TrContentRuntime final : public std::enable_shared_from_this<TrContentRunt
297305
*/
298306
std::atomic<bool> started = false;
299307
/**
300-
* The flag `available` is to indicate the content is available for the client process, it's set to true when the client process is
308+
* The flag `available` is to indicate the content is available for the client process, it's set to `true` when the client process is
301309
* started or pre-started.
302310
*/
303311
std::atomic<bool> available = false;

src/runtime/content_manager.cpp

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,33 @@ shared_ptr<TrContentRuntime> TrContentManager::makeContent()
105105
unique_lock<shared_mutex> lock(contentsMutex);
106106
for (auto content : contents)
107107
{
108-
if (!content->used)
108+
// Check if the content is available and not destroyed.
109+
if (!content->available ||
110+
content->pid == INVALID_PID ||
111+
content->shouldDestroy)
112+
continue;
113+
114+
if (!content->isUsed())
109115
{
110-
content->used = true;
116+
content->setUsed();
111117
contentToUse = content;
118+
DEBUG(LOG_TAG_CONTENT,
119+
"Selected an pre-content runtime #%d(%d) for use",
120+
content->id,
121+
content->pid.load());
112122
break;
113123
}
114124
}
115125
}
126+
116127
if (contentToUse == nullptr)
117128
{
118129
// Create a new content runtime when there is no available content.
119130
contentToUse = TrContentRuntime::Make(this);
131+
DEBUG(LOG_TAG_CONTENT, "Failed to find an available content, creating a new one #%d", contentToUse->id);
120132
{
121133
unique_lock<shared_mutex> lock(contentsMutex);
122-
contentToUse->used = true;
134+
contentToUse->setUsed();
123135
contents.push_back(contentToUse);
124136
}
125137
}

src/runtime/hive_daemon.cpp

Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,12 @@ void TrHiveDaemon::shutdown()
9393
bool TrHiveDaemon::createClient(TrDocumentRequestInit &requestInit, function<void(pid_t)> callback)
9494
{
9595
if (commandSender == nullptr)
96+
{
97+
DEBUG(LOG_TAG_ERROR, "Skipped creating a client because the command sender is not initialized.");
9698
return false;
99+
}
97100

101+
unique_lock<shared_mutex> lock(mutexForCommand);
98102
hive_comm::TrCreateClientRequest req(requestInit);
99103
{
100104
unique_lock<shared_mutex> lock(mutexForCreateProcessCallbacks);
@@ -106,7 +110,7 @@ bool TrHiveDaemon::createClient(TrDocumentRequestInit &requestInit, function<voi
106110
bool TrHiveDaemon::terminateClient(uint32_t id)
107111
{
108112
assert(commandSender != nullptr);
109-
113+
unique_lock<shared_mutex> lock(mutexForCommand);
110114
hive_comm::TrTerminateClientRequest req(id);
111115
return commandSender->sendCommand(req);
112116
}
@@ -115,7 +119,12 @@ void TrHiveDaemon::tick()
115119
{
116120
recvOutput();
117121
if (!checkDaemonAlive())
122+
{
123+
DEBUG(LOG_TAG_ERROR, "The hive daemon is not alive, cleaning up and restarting...");
124+
cleanupDaemonResources();
125+
start();
118126
return;
127+
}
119128

120129
// Check for new client
121130
acceptChanClient(0);
@@ -188,13 +197,17 @@ void TrHiveDaemon::onDeamonProcess()
188197

189198
void TrHiveDaemon::onNewChanClient(TrOneShotClient<hive_comm::TrHiveCommandMessage> &client)
190199
{
191-
if (commandReceiver != nullptr || commandSender != nullptr)
200+
if (commandReceiver == nullptr && commandSender == nullptr)
192201
{
193-
commandReceiver.reset();
194-
commandSender.reset();
202+
commandReceiver = make_unique<hive_comm::TrHiveCommandReceiver>(&client);
203+
commandSender = make_unique<hive_comm::TrHiveCommandSender>(&client);
204+
}
205+
else
206+
{
207+
commandChanServer->removeClient(&client);
208+
DEBUG(LOG_TAG_ERROR,
209+
"Received a new hived command channel client, but the command receiver or sender is already initialized.");
195210
}
196-
commandReceiver = make_unique<hive_comm::TrHiveCommandReceiver>(&client);
197-
commandSender = make_unique<hive_comm::TrHiveCommandSender>(&client);
198211
}
199212

200213
void TrHiveDaemon::acceptChanClient(int timeout)
@@ -238,6 +251,45 @@ bool TrHiveDaemon::checkDaemonAlive()
238251
return true;
239252
}
240253

254+
void TrHiveDaemon::cleanupDaemonResources()
255+
{
256+
// Reset daemon state
257+
daemonPid = -1;
258+
daemonReady = false;
259+
260+
// Close child pipes if they're open
261+
if (childPipes[0] != -1)
262+
{
263+
close(childPipes[0]);
264+
childPipes[0] = -1;
265+
}
266+
if (childPipes[1] != -1)
267+
{
268+
close(childPipes[1]);
269+
childPipes[1] = -1;
270+
}
271+
272+
// Stop and reset command receiver worker
273+
if (recvWorker != nullptr)
274+
{
275+
recvWorker->stop();
276+
recvWorker.reset();
277+
}
278+
279+
// Reset command channel components
280+
if (commandReceiver != nullptr)
281+
commandReceiver.reset();
282+
if (commandSender != nullptr)
283+
commandSender.reset();
284+
if (commandChanServer != nullptr)
285+
commandChanServer.reset();
286+
287+
// Clear pending callbacks and notify them of failure
288+
pendingCreateProcessCallbacks.clear();
289+
290+
DEBUG(LOG_TAG_CONTENT, "Hive daemon resources cleaned up.");
291+
}
292+
241293
bool TrHiveDaemon::recvOutput()
242294
{
243295
if (!started())
@@ -300,22 +352,29 @@ void TrHiveDaemon::recvCommand()
300352
case hive_comm::TrHiveCommandType::CreateClientResponse:
301353
{
302354
auto res = hive_comm::TrHiveCommandBase::FromMessage<hive_comm::TrCreateClientResponse>(commandMessage);
355+
DEBUG(LOG_TAG_CONTENT, "<< CreateClientResponse(%d) with pid(%d)", res.documentId, res.pid);
356+
303357
function<void(pid_t)> callback;
304358
{
305359
shared_lock<shared_mutex> lock(mutexForCreateProcessCallbacks);
306360
auto it = pendingCreateProcessCallbacks.find(res.documentId);
307361
if (it != pendingCreateProcessCallbacks.end())
308362
{
309363
callback = it->second;
364+
pendingCreateProcessCallbacks.erase(it);
310365
}
311366
}
312-
if (callback)
367+
368+
if (TR_LIKELY(callback))
313369
callback(res.pid);
370+
else
371+
DEBUG(LOG_TAG_ERROR, "No callback found for CreateClientResponse(%d)", res.documentId);
314372
break;
315373
}
316374
case hive_comm::TrHiveCommandType::TerminateClientResponse:
317375
{
318376
auto res = hive_comm::TrHiveCommandBase::FromMessage<hive_comm::TrTerminateClientResponse>(commandMessage);
377+
DEBUG(LOG_TAG_CONTENT, "<< TerminateClientResponse(%d)", res.documentId);
319378
// TODO: handle the response?
320379
break;
321380
}
@@ -327,7 +386,8 @@ void TrHiveDaemon::recvCommand()
327386
case hive_comm::TrHiveCommandType::OnExitEvent:
328387
{
329388
auto exitEvent = hive_comm::TrHiveCommandBase::FromMessage<hive_comm::TrOnExitEvent>(commandMessage);
330-
DEBUG(LOG_TAG_ERROR, "Received exit event from client(%d): %d", exitEvent.documentId, exitEvent.code);
389+
DEBUG(LOG_TAG_CONTENT, "<< OnExitEvent(%d) with code(%d)", exitEvent.documentId, exitEvent.code);
390+
331391
auto contentToExit = constellation->contentManager->getContent(exitEvent.documentId, true);
332392
if (contentToExit != nullptr)
333393
contentToExit->onClientProcessExited(exitEvent.code);

0 commit comments

Comments
 (0)