-
Notifications
You must be signed in to change notification settings - Fork 320
Expand file tree
/
Copy pathtask.h
More file actions
618 lines (545 loc) · 20.5 KB
/
task.h
File metadata and controls
618 lines (545 loc) · 20.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#pragma once
#include <absl/utility/utility.h>
#include <stdint.h>
#include <atomic>
#include <chrono>
#include <functional>
#include <list>
#include <tuple>
#include <utility>
#include <vector>
#include "rpc/rpc_message.h"
#include "runtime/api_layer1.h"
#include "runtime/api_task.h"
#include "task_code.h"
#include "task_spec.h"
#include "task_tracker.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/extensible_object.h"
#include "utils/fmt_logging.h"
#include "utils/join_point.h"
#include "utils/ports.h"
#include "utils/synchronize.h"
#include "utils/utils.h"
namespace dsn {
class env_provider;
class rpc_engine;
class service_node;
class task;
class task_worker;
class task_worker_pool;
class threadpool_code;
struct __tls_dsn__
{
uint32_t magic;
task *current_task;
task_worker *worker;
int worker_index;
service_node *node;
int node_id;
rpc_engine *rpc;
env_provider *env;
int last_worker_queue_size;
uint64_t node_pool_thread_ids; // 8,8,16 bits
uint32_t last_lower32_task_id; // 32bits
};
extern __thread struct __tls_dsn__ tls_dsn;
///
/// Task is a thread-like execution piece that is much lighter than a normal thread.
/// Huge number of tasks may be hosted by a small number of actual threads run within
/// a thread pool.
///
/// When creating the task, user must use 3 parameters to specify in which thread the
/// callback should run:
///
/// 1. node: specifies the computation engine of the callback, i.e, the "pool" of "thread_pool"
/// 2. task_code: a index to the "task_spec". task_spec specifies which thread pool of
/// the computation engine to run the callback. some other task information is also
/// recorded in task_spec. please refer to @task_code, @task_spec, @thread_pool_code
/// for more details.
/// 3. hash: specifies which thread in the thread pool to execute the callback. (This is
/// somewhat not accurate, coz "hash" will work together with thread_pool's "partition"
/// option. Please refer to @task_worker_pool for more details).
///
/// So the running thread of callback will be determined hierarchically:
///
/// |<---determined by "node"
/// | |<-----determined by "code"
/// | | |-------------determined by "hash"
/// | | |
/// |------V--------|-------|---------------------------| |--------------|
/// | |-------------V-------|-----| |-------------| | | |
/// | | |--------| |----V---| | | | | | |
/// | | | thread | ... | thread | | | | | | |
/// | | |--------| |--------| | | | | | |
/// | | thread pool | ... | thread pool | | | |
/// | |---------------------------| |-------------| | | |
/// | service node | ... | service node |
/// |---------------------------------------------------| |--------------|
///
/// A status value called "task_state" is kept in each task to indicate the task running state,
/// the transition among different states are:
///
/// |
/// (new created task)
/// |
/// V
/// |-----------> ready------------|
/// | | |
/// (timer task) (execute) (cancel)
/// | | |
/// | V V
/// |------------ running cancelled
/// |
/// (one shot task)
/// |
/// V
/// finished
///
/// As shown above, a new created task will be in "ready" state. After obtaining the data
/// (like a rpc gets the response, or a disk io succeeds), the data provider module will store
/// proper value into the task and dispatch it to some thread to execute the task with "enqueue".
///
/// After a task is "dequeued" from thread, it will be in "running" state.
///
/// For one shot tasks(in which callback can ONLY be executed ONCE), the task will be in
/// "finished" state after running of callbacks.
///
/// But for timer tasks, the state will transit to "ready" again after "running"
/// as the callback need to be executed periodically.
///
/// The callers can cancel the execution of "ready" tasks with method "cancel". However,
/// if a task is in not in "ready" state, the cancel will fail (returning false).
///
/// So from the perspective of task user, a created task can only be controlled by "enqueue" or
/// "cancel". An "enqueue" operation will make the callback to execute at some time in the future,
/// and an "cancel" operation will prevent the callback from executing.
///
/// So please take care when you call cancel. Memory leak may occur if you don't pay attention.
/// For example:
///
/// int *a = new int(5);
/// raw_task t = new raw_task(code, [a](){ std::cout << *a << std::endl; delete a; }, hash, node);
/// t->enqueue(10_seconds_latey);
/// if (t->cancel()) {
/// std::cout << "cancel succeed, the callback will not execute and a won't be deleted"
/// << std::endl;
/// }
///
/// In order to prevent this, we recommend you to pass RAII objects to callback:
///
/// std::shared_ptr<int> a = std::make_shared<int>(5);
/// raw_task t = new raw_task(code, [a]() { std::cout << *a << std::endl; }, hash, node);
///
/// In this case, the callback object will be destructed if t is cancelled.
///
/// Another key design in rDSN is that we add some "hook points" when the state is in transition,
/// like "on_task_create", "on_task_enqueue", "on_task_dequeue", etc. We can execute different
/// functions for different purposes on these hook points, you may want to refer to
/// "tracer", "profiler" and "fault_injector" for details.
///
class task : public ref_counter, public extensible_object<task, 4>
{
public:
task(task_code code, int hash = 0, service_node *node = nullptr);
virtual ~task();
virtual void enqueue();
//
// if we successfully change a task's state from ready to cancelled, then return true,
// otherwise, false is returned.
//
// if wait_until_finished is true, the function will wait a running task to be finished
//
// if finished isn't nullptr, it will be used to indicate
// whether the task is finished/cancelled, that is to say:
// *finished == true <- the task has been finished/cancelled when this method returns
// *finished == false <- the task may still be running or ready(timer tasks)
//
bool cancel(bool wait_until_finished, /*out*/ bool *finished = nullptr);
// wait until a task to finished/cancelled or timeout
bool wait(int timeout_milliseconds = TIME_MS_MAX);
// TODO: modify delay from chrono to int, keep in consistency with other API
void enqueue(std::chrono::milliseconds delay)
{
set_delay(static_cast<int>(delay.count()));
enqueue();
}
// used for task_worker to execute the task
void exec_internal();
// only call this function when task is running
//
// @param enqueue_immediately
// whether we should enqueue right now
// return:
// true : change task state from TASK_STATE_RUNNING to TASK_STATE_READY succeed
// false : change task state failed
bool set_retry(bool enqueue_immediately = true);
void set_error_code(error_code err) { _error = err; }
void set_delay(int delay_milliseconds = 0) { _delay_milliseconds = delay_milliseconds; }
void set_tracker(task_tracker *tracker) { _context_tracker.set_tracker(tracker, this); }
uint64_t id() const { return _task_id; }
task_state state() const { return _state.load(std::memory_order_acquire); }
task_code code() const { return _spec->code; }
task_spec &spec() const { return *_spec; }
int hash() const { return _hash; }
int delay_milliseconds() const { return _delay_milliseconds; }
error_code error() const { return _error; }
service_node *node() const { return _node; }
task_tracker *tracker() const { return _context_tracker.tracker(); }
bool is_empty() const { return _is_null; }
// static helper utilities
static task *get_current_task();
static uint64_t get_current_task_id();
static task_worker *get_current_worker();
static task_worker *get_current_worker2();
static service_node *get_current_node();
static service_node *get_current_node2();
static int get_current_node_id();
static int get_current_worker_index();
static const char *get_current_node_name();
static rpc_engine *get_current_rpc();
static rpc_engine *get_current_rpc2();
static env_provider *get_current_env();
static void set_tls_dsn_context(
service_node *node, // cannot be null
task_worker *worker // null for io or timer threads if they are not worker threads
);
protected:
void enqueue(task_worker_pool *pool);
void set_task_id(uint64_t tid) { _task_id = tid; }
virtual void exec() = 0;
//
// this function is used for clearing the non-trivial objects assigned to this task, like
// callback functors and some task-specific values.
//
// circular reference may occur if we don't clear them manually, for example:
//
// class A: public dsn::ref_counter {
// public:
// int value;
// task_ptr my_task;
// };
//
// dsn::ref_ptr<A> a_obj = new A();
// a_obj->my_task = tasking::enqueue(task_code,
// [a_obj](){ std::cout << value << std::endl; });
//
// in the case above, a_obj holds a ref_counter for my_task,
// my task holds a ref_counter for a_obj because it owns a functor
// which captures a_obj by value
//
// in order to prevent this case, we let the task to clear these non-trival objects when
// a task is finished or cancelled.
//
// we may call this function in "exec_internal" or "cancel". however, it's still subclass's
// duty to define "how to clear the callback".
//
// don't declare this as pure virtual function, coz it is not necessary for every subclass
// to have non trivial objects to clear.
//
virtual void clear_non_trivial_on_task_end() {}
bool _is_null;
error_code _error;
private:
friend class task_test;
task(const task &);
bool wait_on_cancel();
// return true if some waiters have been notified
bool signal_waiters();
static void check_tls_dsn();
static void on_tls_dsn_not_set();
mutable std::atomic<task_state> _state;
uint64_t _task_id;
std::atomic<void *> _wait_event;
int _hash;
int _delay_milliseconds;
bool _wait_for_cancel;
task_spec *_spec;
service_node *_node;
trackable_task _context_tracker; // when tracker is gone, the task is cancelled automatically
public:
// used by task queue only
task *next;
};
typedef dsn::ref_ptr<dsn::task> task_ptr;
class raw_task : public task
{
public:
raw_task(task_code code, const task_handler &cb, int hash = 0, service_node *node = nullptr)
: task(code, hash, node), _cb(cb)
{
}
raw_task(task_code code, task_handler &&cb, int hash = 0, service_node *node = nullptr)
: task(code, hash, node), _cb(std::move(cb))
{
}
~raw_task() override
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_lock);
_cb = nullptr;
}
void exec() override
{
if (dsn_likely(_cb != nullptr)) {
_cb();
}
}
protected:
void clear_non_trivial_on_task_end() override
{
::dsn::utils::auto_lock<::dsn::utils::ex_lock_nr> l(_lock);
_cb = nullptr;
}
task_handler _cb;
::dsn::utils::ex_lock_nr _lock;
};
//----------------- timer task -------------------------------------------------------
class timer_task : public task
{
public:
timer_task(task_code code,
const task_handler &cb,
int interval_milliseconds,
int hash = 0,
service_node *node = nullptr);
timer_task(task_code code,
task_handler &&cb,
int interval_milliseconds,
int hash = 0,
service_node *node = nullptr);
// for timer task, we will reset its state to TASK_READY after exec
void exec() override;
void enqueue() override;
void update_interval(int interval_ms);
protected:
void clear_non_trivial_on_task_end() override { _cb = nullptr; }
private:
// ATTENTION: if _interval_ms == 0, then timer task will just be executed once;
// otherwise, timer task will be executed periodically(period = _interval_ms)
int _interval_ms;
task_handler _cb;
};
typedef dsn::ref_ptr<dsn::timer_task> timer_task_ptr;
template <typename First, typename... Remaining>
class future_task : public task
{
public:
typedef std::function<void(const First, const Remaining &...)> TCallback;
future_task(task_code code, const TCallback &cb, int hash, service_node *node = nullptr)
: task(code, hash, node), _cb(cb)
{
}
future_task(task_code code, TCallback &&cb, int hash, service_node *node = nullptr)
: task(code, hash, node), _cb(std::move(cb))
{
}
virtual void exec() override { absl::apply(_cb, std::move(_values)); }
void enqueue_with(const First &t, const Remaining &...r, int delay_ms = 0)
{
_values = std::make_tuple(t, r...);
set_delay(delay_ms);
enqueue();
}
void enqueue_with(First &&t, Remaining &&...r, int delay_ms = 0)
{
_values = std::make_tuple(std::move(t), std::forward<Remaining>(r)...);
set_delay(delay_ms);
enqueue();
}
protected:
void clear_non_trivial_on_task_end() override
{
_cb = nullptr;
_values = {};
}
private:
TCallback _cb;
std::tuple<First, Remaining...> _values;
};
class rpc_request_task : public task
{
public:
rpc_request_task(message_ex *request, rpc_request_handler &&h, service_node *node);
virtual ~rpc_request_task() override;
message_ex *get_request() const { return _request; }
void enqueue() override;
void exec() override
{
if (0 == _enqueue_ts_ns ||
dsn_now_ns() - _enqueue_ts_ns <
static_cast<uint64_t>(_request->header->client.timeout_ms) * 1000000ULL) {
if (dsn_likely(nullptr != _handler)) {
_handler(_request);
}
} else {
LOG_DEBUG("rpc_request_task({}) from({}) stop to execute due to timeout_ms({}) exceed",
spec().name,
_request->header->from_address,
_request->header->client.timeout_ms);
spec().on_rpc_task_dropped.execute(this);
}
}
protected:
void clear_non_trivial_on_task_end() override { _handler = nullptr; }
protected:
message_ex *_request;
rpc_request_handler _handler;
uint64_t _enqueue_ts_ns;
};
typedef dsn::ref_ptr<rpc_request_task> rpc_request_task_ptr;
class rpc_response_task : public task
{
public:
rpc_response_task(message_ex *request,
const rpc_response_handler &cb,
int hash = 0,
service_node *node = nullptr);
rpc_response_task(message_ex *request,
rpc_response_handler &&cb,
int hash,
service_node *node = nullptr);
virtual ~rpc_response_task() override;
// return true for normal case, false for fault injection applied
bool enqueue(error_code err, message_ex *reply);
// re-enqueue after above enqueue, e.g., after delay
void enqueue() override;
void exec() override
{
if (dsn_likely(nullptr != _cb)) {
_cb(_error, _request, _response);
}
}
message_ex *get_request() const { return _request; }
message_ex *get_response() const { return _response; }
//
// rpc_response_task is a special kind of task, because
// we support the semantic of distributed service in sending rpc request, for example:
// "querying meta-server and send message to proper replica-server based on gpid".
//
// in order to support this, we need to replace the original rpc response callback
// to another one:
// 1. which can proecss the distributed service semantics, AND
// 2. which won't call the original callback until real results are got
//
// we supply 2 functions to meet this demand:
// 1. current_handler, you can get the original handler if you want to replace it
// 2. replace_callback, replace the callback to any one you like. there are two varieties
// for coping or move the new callback
//
// yo may want to refer to rpc_engine::call_uri for details
//
// TODO(sunweijie): totally elimite this feature
//
void fetch_current_handler(rpc_response_handler &cb) { cb = std::move(_cb); }
void replace_callback(rpc_response_handler &&cb)
{
task_state cur_state = state();
CHECK(cur_state == TASK_STATE_READY || cur_state == TASK_STATE_RUNNING,
"invalid task_state: {}",
enum_to_string(cur_state));
_cb = std::move(cb);
}
void replace_callback(const rpc_response_handler &cb)
{
replace_callback(rpc_response_handler(cb));
}
task_worker_pool *caller_pool() const { return _caller_pool; }
void set_caller_pool(task_worker_pool *pl) { _caller_pool = pl; }
protected:
void clear_non_trivial_on_task_end() override { _cb = nullptr; }
private:
message_ex *_request;
message_ex *_response;
task_worker_pool *_caller_pool;
rpc_response_handler _cb;
friend class rpc_engine;
};
typedef dsn::ref_ptr<rpc_response_task> rpc_response_task_ptr;
const std::vector<task_worker *> &get_threadpool_threads_info(threadpool_code code);
// ------------------------ inline implementations --------------------
__inline /*static*/ void task::check_tls_dsn()
{
if (tls_dsn.magic != 0xdeadbeef) {
on_tls_dsn_not_set();
}
}
__inline /*static*/ task *task::get_current_task()
{
check_tls_dsn();
return tls_dsn.current_task;
}
__inline /*static*/ uint64_t task::get_current_task_id()
{
if (tls_dsn.magic == 0xdeadbeef)
return tls_dsn.current_task ? tls_dsn.current_task->id() : 0;
else
return 0;
}
__inline /*static*/ task_worker *task::get_current_worker()
{
check_tls_dsn();
return tls_dsn.worker;
}
__inline /*static*/ task_worker *task::get_current_worker2()
{
return tls_dsn.magic == 0xdeadbeef ? tls_dsn.worker : nullptr;
}
__inline /*static*/ service_node *task::get_current_node()
{
check_tls_dsn();
return tls_dsn.node;
}
__inline /*static*/ int task::get_current_node_id()
{
return tls_dsn.magic == 0xdeadbeef ? tls_dsn.node_id : 0;
}
__inline /*static*/ service_node *task::get_current_node2()
{
return tls_dsn.magic == 0xdeadbeef ? tls_dsn.node : nullptr;
}
__inline /*static*/ int task::get_current_worker_index()
{
check_tls_dsn();
return tls_dsn.worker_index;
}
__inline /*static*/ rpc_engine *task::get_current_rpc()
{
check_tls_dsn();
return tls_dsn.rpc;
}
__inline /*static*/ rpc_engine *task::get_current_rpc2()
{
return tls_dsn.magic == 0xdeadbeef ? tls_dsn.rpc : nullptr;
}
__inline /*static*/ env_provider *task::get_current_env()
{
check_tls_dsn();
return tls_dsn.env;
}
} // namespace dsn