Skip to content

Commit 09120dd

Browse files
Leonardo Alminanaedsiper
authored andcommitted
task: added locking and route dropping mechanisms
A lock was added because it needs to be used when checking the route status from the main thread when trying to drop roll chunks over. Signed-off-by: Leonardo Alminana <[email protected]>
1 parent 5c3dcfe commit 09120dd

File tree

2 files changed

+102
-1
lines changed

2 files changed

+102
-1
lines changed

include/fluent-bit/flb_task.h

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,13 @@
5050
#define FLB_TASK_SET(ret, task_id, out_id) \
5151
(uint32_t) ((ret << 28) | (task_id << 14) | out_id)
5252

53+
/* Route status */
54+
#define FLB_TASK_ROUTE_INACTIVE 0
55+
#define FLB_TASK_ROUTE_ACTIVE 1
56+
#define FLB_TASK_ROUTE_DROPPED 2
57+
5358
struct flb_task_route {
59+
int status;
5460
struct flb_output_instance *out;
5561
struct mk_list _head;
5662
};
@@ -86,6 +92,7 @@ struct flb_task {
8692
struct mk_list _head; /* link to input_instance */
8793
struct flb_input_instance *i_ins; /* input instance */
8894
struct flb_config *config; /* parent flb config */
95+
pthread_mutex_t lock;
8996
};
9097

9198
/*
@@ -177,5 +184,96 @@ static inline void flb_task_users_dec(struct flb_task *task, int release_check)
177184
}
178185
}
179186

187+
static inline void flb_task_acquire_lock(struct flb_task *task)
188+
{
189+
pthread_mutex_lock(&task->lock);
190+
}
191+
192+
static inline void flb_task_release_lock(struct flb_task *task)
193+
{
194+
pthread_mutex_unlock(&task->lock);
195+
}
196+
197+
static FLB_INLINE int flb_task_get_active_route_count(
198+
struct flb_task *task)
199+
{
200+
struct mk_list *iterator;
201+
int result;
202+
struct flb_task_route *route;
203+
204+
result = 0;
205+
206+
mk_list_foreach(iterator, &task->routes) {
207+
route = mk_list_entry(iterator, struct flb_task_route, _head);
208+
209+
if (route->status == FLB_TASK_ROUTE_ACTIVE) {
210+
result++;
211+
}
212+
}
213+
214+
return result;
215+
}
216+
217+
static FLB_INLINE size_t flb_task_get_route_status(
218+
struct flb_task *task,
219+
struct flb_output_instance *o_ins)
220+
{
221+
struct mk_list *iterator;
222+
size_t result;
223+
struct flb_task_route *route;
224+
225+
result = FLB_TASK_ROUTE_INACTIVE;
226+
227+
mk_list_foreach(iterator, &task->routes) {
228+
route = mk_list_entry(iterator, struct flb_task_route, _head);
229+
230+
if (route->out == o_ins) {
231+
result = route->status;
232+
break;
233+
}
234+
}
235+
236+
return result;
237+
}
238+
239+
static FLB_INLINE void flb_task_set_route_status(
240+
struct flb_task *task,
241+
struct flb_output_instance *o_ins,
242+
int new_status)
243+
{
244+
struct mk_list *iterator;
245+
struct flb_task_route *route;
246+
247+
mk_list_foreach(iterator, &task->routes) {
248+
route = mk_list_entry(iterator, struct flb_task_route, _head);
249+
250+
if (route->out == o_ins) {
251+
route->status = new_status;
252+
break;
253+
}
254+
}
255+
}
256+
257+
258+
static FLB_INLINE void flb_task_activate_route(
259+
struct flb_task *task,
260+
struct flb_output_instance *o_ins)
261+
{
262+
flb_task_set_route_status(task, o_ins, FLB_TASK_ROUTE_ACTIVE);
263+
}
264+
265+
static FLB_INLINE void flb_task_deactivate_route(
266+
struct flb_task *task,
267+
struct flb_output_instance *o_ins)
268+
{
269+
flb_task_set_route_status(task, o_ins, FLB_TASK_ROUTE_INACTIVE);
270+
}
271+
272+
static FLB_INLINE void flb_task_drop_route(
273+
struct flb_task *task,
274+
struct flb_output_instance *o_ins)
275+
{
276+
flb_task_set_route_status(task, o_ins, FLB_TASK_ROUTE_DROPPED);
277+
}
180278

181279
#endif

src/flb_task.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,8 @@ static struct flb_task *task_alloc(struct flb_config *config)
256256
mk_list_init(&task->routes);
257257
mk_list_init(&task->retries);
258258

259+
pthread_mutex_init(&task->lock, NULL);
260+
259261
return task;
260262
}
261263

@@ -420,12 +422,13 @@ struct flb_task *flb_task_create(uint64_t ref_id,
420422
}
421423

422424
if (flb_routes_mask_get_bit(task_ic->routes_mask, o_ins->id) != 0) {
423-
route = flb_malloc(sizeof(struct flb_task_route));
425+
route = flb_calloc(1, sizeof(struct flb_task_route));
424426
if (!route) {
425427
flb_errno();
426428
continue;
427429
}
428430

431+
route->status = FLB_TASK_ROUTE_INACTIVE;
429432
route->out = o_ins;
430433
mk_list_add(&route->_head, &task->routes);
431434
count++;

0 commit comments

Comments
 (0)