Skip to content

Commit d14c1fc

Browse files
koleiniedsiper
authored andcommitted
sp: add hopping window to Stream Processor
Signed-off-by: Masoud Koleini <[email protected]>
1 parent 4b5ad3b commit d14c1fc

File tree

8 files changed

+360
-54
lines changed

8 files changed

+360
-54
lines changed

include/fluent-bit/stream_processor/flb_sp.h

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,33 @@ struct flb_sp_window_data {
6161
struct mk_list _head;
6262
};
6363

64+
struct flb_sp_hopping_slot {
65+
struct rb_tree aggr_tree;
66+
struct mk_list aggr_list;
67+
int records;
68+
struct mk_list _head;
69+
};
70+
6471
struct flb_sp_task_window {
6572
int type;
6673

6774
int fd;
6875
struct mk_event event;
76+
struct mk_event event_hop;
6977

7078
struct rb_tree aggr_tree;
7179
struct mk_list aggr_list;
80+
81+
/* Hopping window parameters */
82+
/*
83+
* first hopping window. Timer event is set to window size for the first,
84+
* and will change to the advance_by time thereafter
85+
*/
86+
bool first_hop;
87+
int fd_hop;
88+
int advance_by;
89+
struct mk_list hopping_slot;
90+
7291
int records;
7392

7493
struct mk_list data;
@@ -114,7 +133,8 @@ int flb_sp_test_do(struct flb_sp *sp, struct flb_sp_task *task,
114133
const char *tag, int tag_len,
115134
const char *buf_data, size_t buf_size,
116135
char **out_data, size_t *out_size);
117-
int flb_sp_test_fd_event(struct flb_sp_task *task, char **out_data, size_t *out_size);
136+
int flb_sp_test_fd_event(int fd, struct flb_sp_task *task, char **out_data,
137+
size_t *out_size);
118138

119139
struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
120140
const char *query);

include/fluent-bit/stream_processor/flb_sp_parser.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ struct flb_sp_cmd_key {
117117
struct flb_sp_window {
118118
int type;
119119
time_t size;
120+
time_t advance_by;
120121
};
121122

122123
struct flb_sp_cmd {
@@ -222,8 +223,9 @@ void flb_sp_cmd_key_del(struct flb_sp_cmd_key *key);
222223
int flb_sp_cmd_source(struct flb_sp_cmd *cmd, int type, const char *source);
223224
void flb_sp_cmd_dump(struct flb_sp_cmd *cmd);
224225

225-
void flb_sp_cmd_window(struct flb_sp_cmd *cmd,
226-
int window_type, int size, int time_unit);
226+
int flb_sp_cmd_window(struct flb_sp_cmd *cmd, int window_type,
227+
int size, int time_unit,
228+
int advance_by_size, int advance_by_time_unit);
227229

228230
void flb_sp_cmd_condition_add(struct flb_sp_cmd *cmd, struct flb_exp *e);
229231
struct flb_exp *flb_sp_cmd_operation(struct flb_sp_cmd *cmd,

include/fluent-bit/stream_processor/flb_sp_window.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#define FLB_SP_WINDOW_DEFAULT 0
2222
#define FLB_SP_WINDOW_TUMBLING 1
23+
#define FLB_SP_WINDOW_HOPPING 2
2324

2425
void flb_sp_window_prune(struct flb_sp_task *task);
2526
int flb_sp_window_populate(struct flb_sp_task *task, const char *buf_data,

0 commit comments

Comments
 (0)