Skip to content

Commit 514ab23

Browse files
JsonRpcMuxer: Refactor to move job handling to Processor and simplify… (#948)
* JsonRpcMuxer: Refactor to move job handling to Processor and simplifying job dispatching * Add whitespace to re-trigger stuck copyright check --------- Co-authored-by: MFransen69 <39826971+MFransen69@users.noreply.github.com>
1 parent 21fd881 commit 514ab23

File tree

1 file changed

+127
-85
lines changed

1 file changed

+127
-85
lines changed

JsonRpcMuxer/JsonRpcMuxer.cpp

Lines changed: 127 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ namespace Plugin {
2929

3030
class JsonRpcMuxer::Processor {
3131
private:
32+
static constexpr Core::hresult ACCEPT_AND_LEAVE_CONNECTION_OPEN_TILL_PROCESSED = Core::hresult(~0);
33+
3234
class Batch {
3335
private:
3436
enum state : uint8_t {
@@ -80,15 +82,20 @@ namespace Plugin {
8082
Core::JSONRPC::Message::Info Error;
8183
};
8284

85+
public:
8386
class Request {
87+
private:
88+
friend class Batch;
89+
8490
public:
8591
Request() = delete;
8692
Request(const Request&) = delete;
8793
Request& operator=(Request&&) = delete;
8894
Request& operator=(const Request&) = delete;
8995

90-
Request(const Core::JSONRPC::Message& message)
91-
: _id(message.Id.Value())
96+
Request(Batch& batch, const Core::JSONRPC::Message& message)
97+
: _batch(batch)
98+
, _id(message.Id.Value())
9299
, _designator(message.Designator.Value())
93100
, _data(message.Parameters.Value())
94101
, _errorCode(Core::ERROR_GENERAL)
@@ -98,7 +105,8 @@ namespace Plugin {
98105
~Request() = default;
99106

100107
Request(Request&& other) noexcept
101-
: _id(other._id)
108+
: _batch(other._batch)
109+
, _id(other._id)
102110
, _designator(other._designator)
103111
, _data(std::move(other._data))
104112
, _errorCode(other._errorCode)
@@ -115,6 +123,14 @@ namespace Plugin {
115123
{
116124
return (_id);
117125
}
126+
inline uint32_t ChannelId() const
127+
{
128+
return _batch.ChannelId();
129+
}
130+
inline const string& Token() const
131+
{
132+
return _batch.Token();
133+
}
118134
inline const string& Designator() const
119135
{
120136
return (_designator);
@@ -125,65 +141,22 @@ namespace Plugin {
125141
}
126142
inline void Set(const Core::hresult errorCode, const string& output)
127143
{
128-
_completed = true;
129-
if (errorCode == Core::ERROR_NONE) {
130-
_data = output;
131-
} else {
132-
_errorCode = errorCode;
133-
_data = output;
134-
}
144+
_batch.CompleteRequest(*this, errorCode, output);
135145
}
136146
inline Core::hresult ErrorCode() const
137147
{
138148
return (_errorCode);
139149
}
140150

141151
private:
152+
Batch& _batch;
142153
const uint32_t _id;
143154
const string _designator;
144155
string _data;
145156
Core::hresult _errorCode;
146157
bool _completed;
147158
};
148159

149-
public:
150-
class Job : public Core::IDispatch {
151-
public:
152-
Job() = delete;
153-
Job(Job&&) = delete;
154-
Job(const Job&) = delete;
155-
Job& operator=(const Job&) = delete;
156-
Job& operator=(Job&&) = delete;
157-
158-
Job(Batch& parent, Request& request)
159-
: _parent(parent)
160-
, _context(request)
161-
{
162-
TRACE(Trace::Information, (_T("Constructing job %p"), this));
163-
}
164-
165-
virtual ~Job()
166-
{
167-
TRACE(Trace::Information, (_T("Destructing job %p"), this));
168-
}
169-
170-
void Dispatch() override
171-
{
172-
TRACE(Trace::Information, (_T("Dispatch job %p"), this));
173-
_parent.Process(this);
174-
TRACE(Trace::Information, (_T("Dispatch job %p Done"), this));
175-
}
176-
177-
Request& Context()
178-
{
179-
return _context;
180-
}
181-
182-
private:
183-
Batch& _parent;
184-
Request& _context;
185-
};
186-
187160
Batch() = delete;
188161
Batch(Batch&&) = delete;
189162
Batch(const Batch&) = delete;
@@ -209,7 +182,7 @@ namespace Plugin {
209182
_requests.reserve(requests.Elements().Count());
210183

211184
while (index.Next() == true) {
212-
_requests.emplace_back(index.Current());
185+
_requests.emplace_back(*this, index.Current());
213186
}
214187

215188
TRACE(Trace::Information, (_T("Created batch[responseId] with %d requests"), _requests.size()));
@@ -228,6 +201,11 @@ namespace Plugin {
228201
return _channelId;
229202
}
230203

204+
const string& Token() const
205+
{
206+
return _token;
207+
}
208+
231209
uint32_t ResponseId() const
232210
{
233211
return _responseId;
@@ -283,62 +261,79 @@ namespace Plugin {
283261
return result;
284262
}
285263

286-
Core::ProxyType<Job> GetJob()
264+
Request* GetRequest()
287265
{
288-
Core::ProxyType<Job> job;
266+
Request* request = nullptr;
267+
268+
_lock.Lock();
289269

290270
if (_current < _requests.size()) {
291-
job = Core::ProxyType<Job>::Create(*this, _requests[_current]);
271+
request = &_requests[_current];
292272
_current++;
293-
294-
_lock.Lock();
295273
_activeJobCount++;
296-
_lock.Unlock();
297274

298-
TRACE(Trace::Information, (_T("Batch[%d] Provided job %d/%d"), _responseId, _current, _requests.size()));
275+
TRACE(Trace::Information, (_T("Batch[%d] Provided request %d/%d"), _responseId, _current, _requests.size()));
299276
}
300277

301-
return job;
278+
_lock.Unlock();
279+
280+
return request;
302281
}
303282

304283
bool IsActive() const
305284
{
306-
return _activeJobCount > 0;
285+
_lock.Lock();
286+
bool active = _activeJobCount > 0;
287+
_lock.Unlock();
288+
return active;
307289
}
308290

309-
private:
310-
void Process(Job* job)
291+
void Complete()
311292
{
312-
ASSERT(job != nullptr);
313-
314-
string response;
315-
Core::hresult result = Core::ERROR_NONE;
316-
Request& request = job->Context();
317-
318293
_lock.Lock();
319294

320295
if ((_state != ABORTED) && (_state != COMPLETED)) {
321-
_lock.Unlock();
296+
++_completed;
297+
298+
if (_completed == _requests.size()) {
299+
_state = COMPLETED;
300+
}
301+
}
322302

323-
result = _parent.Invoke(_channelId, request.Id(), _token, request.Designator(), request.Parameters(), response);
324-
request.Set(result, response);
303+
_activeJobCount--;
325304

326-
_lock.Lock();
305+
TRACE(Trace::Information, (_T("Batch[%d] completed request (%d/%d)"), _responseId, _completed, _requests.size()));
327306

328-
++_completed;
307+
_lock.Unlock();
308+
}
309+
310+
void CompleteRequest(Request& request, const Core::hresult errorCode, const string& output)
311+
{
312+
_lock.Lock();
313+
314+
// Update request data under lock
315+
request._completed = true;
316+
if (errorCode == Core::ERROR_NONE) {
317+
request._data = output;
318+
} else {
319+
request._errorCode = errorCode;
320+
request._data = output;
329321
}
330322

331-
if (_completed == _requests.size()) {
332-
_state = COMPLETED;
323+
// Update batch state
324+
if ((_state != ABORTED) && (_state != COMPLETED)) {
325+
++_completed;
326+
327+
if (_completed == _requests.size()) {
328+
_state = COMPLETED;
329+
}
333330
}
334331

335332
_activeJobCount--;
336333

337-
TRACE(Trace::Information, (_T("Batch[%d] completed request[%d] (%d/%d)"), _responseId, request.Id(), _completed, _requests.size()));
334+
TRACE(Trace::Information, (_T("Batch[%d] completed request (%d/%d)"), _responseId, _completed, _requests.size()));
338335

339336
_lock.Unlock();
340-
341-
_parent.Checkout(job);
342337
}
343338

344339
private:
@@ -355,6 +350,52 @@ namespace Plugin {
355350
uint16_t _activeJobCount;
356351
};
357352

353+
class Job : public Core::IDispatch {
354+
public:
355+
Job() = delete;
356+
Job(Job&&) = delete;
357+
Job(const Job&) = delete;
358+
Job& operator=(const Job&) = delete;
359+
Job& operator=(Job&&) = delete;
360+
361+
Job(Processor& processor, Batch::Request& request)
362+
: _processor(processor)
363+
, _request(request)
364+
{
365+
TRACE(Trace::Information, (_T("Constructing job %p"), this));
366+
}
367+
368+
virtual ~Job()
369+
{
370+
TRACE(Trace::Information, (_T("Destructing job %p"), this));
371+
}
372+
373+
void Dispatch() override
374+
{
375+
TRACE(Trace::Information, (_T("Dispatch job %p"), this));
376+
377+
// Safety check: don't execute if processor is shutting down
378+
if (_processor._shuttingDown) {
379+
TRACE(Trace::Warning, (_T("Job %p aborted - processor shutting down"), this));
380+
return;
381+
}
382+
383+
string response;
384+
385+
Core::hresult result = _processor.Invoke(_request, response);
386+
387+
_request.Set(result, response);
388+
389+
_processor.Checkout(this);
390+
391+
TRACE(Trace::Information, (_T("Dispatch job %p Done"), this));
392+
}
393+
394+
private:
395+
Processor& _processor;
396+
Batch::Request& _request;
397+
};
398+
358399
public:
359400
Processor() = delete;
360401
Processor(Processor&&) = delete;
@@ -401,7 +442,7 @@ namespace Plugin {
401442

402443
void Stop()
403444
{
404-
std::vector<Core::ProxyType<Batch::Job>> jobsToRevoke;
445+
std::vector<Core::ProxyType<Job>> jobsToRevoke;
405446

406447
_lock.Lock();
407448

@@ -449,11 +490,11 @@ namespace Plugin {
449490

450491
Process();
451492

452-
return ~0;
493+
return ACCEPT_AND_LEAVE_CONNECTION_OPEN_TILL_PROCESSED;
453494
}
454495

455496
private:
456-
void Checkout(Batch::Job* job)
497+
void Checkout(Job* job)
457498
{
458499
ASSERT(job != nullptr);
459500

@@ -479,10 +520,10 @@ namespace Plugin {
479520
}
480521
}
481522

482-
uint32_t Invoke(const uint32_t channelId, const uint32_t id, const string& token, const string& method, const string& parameters, string& response)
523+
uint32_t Invoke(const Batch::Request& request, string& response)
483524
{
484525
if (_dispatch != nullptr) {
485-
return _dispatch->Invoke(channelId, id, token, method, parameters, response);
526+
return _dispatch->Invoke(request.ChannelId(), request.Id(), request.Token(), request.Designator(), request.Parameters(), response);
486527
} else {
487528
response = "IDispatcher not available";
488529
return Core::ERROR_UNAVAILABLE;
@@ -534,9 +575,10 @@ namespace Plugin {
534575
break; // This batch already has a job, move to next batch
535576
}
536577

537-
Core::ProxyType<Batch::Job> job = batch->GetJob();
578+
Batch::Request* request = batch->GetRequest();
538579

539-
if (job.IsValid() == true) {
580+
if (request != nullptr) {
581+
Core::ProxyType<Job> job = Core::ProxyType<Job>::Create(*this, *request);
540582
_activeJobs.emplace_back(job);
541583
Core::IWorkerPool::Instance().Submit(Core::ProxyType<Core::IDispatch>(job));
542584
TRACE(Trace::Information, (_T("Submitted job %p..."), job.operator->()));
@@ -588,7 +630,7 @@ namespace Plugin {
588630
const uint8_t _maxConcurrentJobs;
589631
bool _shuttingDown;
590632
std::vector<Batch*> _batches;
591-
std::vector<Core::ProxyType<Batch::Job>> _activeJobs;
633+
std::vector<Core::ProxyType<Job>> _activeJobs;
592634
};
593635

594636
JsonRpcMuxer::JsonRpcMuxer()
@@ -686,4 +728,4 @@ namespace Plugin {
686728
return result;
687729
}
688730
}
689-
}
731+
}

0 commit comments

Comments
 (0)