Skip to content

Commit bfde6df

Browse files
authored
sp: refactor aggregate functions (#3288)
Signed-off-by: Masoud Koleini <[email protected]>
1 parent 1623bac commit bfde6df

File tree

15 files changed

+869
-1507
lines changed

15 files changed

+869
-1507
lines changed

include/fluent-bit/stream_processor/flb_sp.h

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@
2929
#include <monkey/mk_core.h>
3030
#include <rbtree.h>
3131

32-
/* Aggr num type */
32+
/* Aggregate num type */
3333
#define FLB_SP_NUM_I64 0
3434
#define FLB_SP_NUM_F64 1
3535
#define FLB_SP_BOOLEAN 2
3636
#define FLB_SP_STRING 3
3737

38-
struct aggr_num {
38+
struct aggregate_num {
3939
int type;
4040
int ops;
4141
int64_t i64;
@@ -44,17 +44,21 @@ struct aggr_num {
4444
flb_sds_t string;
4545
};
4646

47-
struct timeseries {
48-
struct aggr_num *nums;
47+
struct aggregate_data {
48+
struct aggregate_num *nums;
4949
struct mk_list _head;
5050
};
5151

5252
struct timeseries_forecast {
53-
struct aggr_num *nums;
53+
struct aggregate_num *nums;
5454
struct mk_list _head;
5555

56-
double *offset;
57-
double *latest_x;
56+
// future time to forecast
57+
double future_time;
58+
59+
// time offset (the first time value captured)
60+
double offset;
61+
double latest_x;
5862

5963
double sigma_x;
6064
double sigma_y;
@@ -63,15 +67,15 @@ struct timeseries_forecast {
6367
double sigma_x2;
6468
};
6569

66-
struct aggr_node {
70+
struct aggregate_node {
6771
int groupby_keys;
6872
int records;
6973
int nums_size;
70-
struct aggr_num *nums;
71-
struct aggr_num *groupby_nums;
74+
struct aggregate_num *nums;
75+
struct aggregate_num *groupby_nums;
7276

73-
/* Timeseries data */
74-
struct timeseries **ts;
77+
/* Aggregate data */
78+
struct aggregate_data **aggregate_data;
7579

7680
/* To keep track of the aggregation nodes */
7781
struct rb_tree_node _rb_head;
@@ -85,8 +89,8 @@ struct flb_sp_window_data {
8589
};
8690

8791
struct flb_sp_hopping_slot {
88-
struct rb_tree aggr_tree;
89-
struct mk_list aggr_list;
92+
struct rb_tree aggregate_tree;
93+
struct mk_list aggregate_list;
9094
int records;
9195
struct mk_list _head;
9296
};
@@ -98,8 +102,8 @@ struct flb_sp_task_window {
98102
struct mk_event event;
99103
struct mk_event event_hop;
100104

101-
struct rb_tree aggr_tree;
102-
struct mk_list aggr_list;
105+
struct rb_tree aggregate_tree;
106+
struct mk_list aggregate_list;
103107

104108
/* Hopping window parameters */
105109
/*
@@ -133,7 +137,7 @@ struct flb_sp_task {
133137
*/
134138
void *stream;
135139

136-
int aggr_keys; /* do commands contains aggregated keys ? */
140+
int aggregate_keys; /* do commands contains aggregate keys? */
137141
struct flb_sp *sp; /* parent context */
138142
struct flb_sp_cmd *cmd; /* (SQL) commands */
139143

@@ -167,8 +171,8 @@ struct flb_sp_task *flb_sp_task_create(struct flb_sp *sp, const char *name,
167171
const char *query);
168172
int flb_sp_fd_event(int fd, struct flb_sp *sp);
169173
void flb_sp_task_destroy(struct flb_sp_task *task);
170-
void groupby_nums_destroy(struct aggr_num *groupby_nums, int size);
171-
void flb_sp_aggr_node_destroy(struct flb_sp_cmd *cmd,
172-
struct aggr_node *aggr_node);
174+
void groupby_nums_destroy(struct aggregate_num *groupby_nums, int size);
175+
void flb_sp_aggregate_node_destroy(struct flb_sp_cmd *cmd,
176+
struct aggregate_node *aggregate_node);
173177

174178
#endif
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019-2020 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#ifndef FLB_SP_AGGREGATE_FUNC_H
22+
#define FLB_SP_AGGREGATE_FUNC_H
23+
24+
25+
typedef void (*aggregate_function_destroy)(struct aggregate_node *,
26+
int);
27+
28+
typedef int (*aggregate_function_clone)(struct aggregate_node *,
29+
struct aggregate_node *,
30+
struct flb_sp_cmd_key *,
31+
int);
32+
33+
typedef void (*aggregate_function_add)(struct aggregate_node *,
34+
struct flb_sp_cmd_key *,
35+
int,
36+
struct flb_time *,
37+
int64_t, double);
38+
39+
typedef void (*aggregate_function_calc)(struct aggregate_node *,
40+
struct flb_sp_cmd_key *,
41+
msgpack_packer *,
42+
int);
43+
44+
typedef void (*aggregate_function_remove)(struct aggregate_node *,
45+
struct aggregate_node *,
46+
int);
47+
48+
extern char aggregate_func_string[AGGREGATE_FUNCTIONS][sizeof("TIMESERIES_FORECAST") + 1];
49+
50+
extern aggregate_function_clone aggregate_func_clone[AGGREGATE_FUNCTIONS];
51+
extern aggregate_function_add aggregate_func_add[AGGREGATE_FUNCTIONS];
52+
extern aggregate_function_calc aggregate_func_calc[AGGREGATE_FUNCTIONS];
53+
extern aggregate_function_remove aggregate_func_remove[AGGREGATE_FUNCTIONS];
54+
extern aggregate_function_destroy aggregate_func_destroy[AGGREGATE_FUNCTIONS];
55+
56+
#endif

include/fluent-bit/stream_processor/flb_sp_parser.h

Lines changed: 12 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@
3434
#define FLB_SP_COUNT 3
3535
#define FLB_SP_MIN 4
3636
#define FLB_SP_MAX 5
37+
#define FLB_SP_FORECAST 6
38+
39+
/* Update this whenever a new aggregate function is added */
40+
#define AGGREGATE_FUNCTIONS 6
3741

3842
/* Date time functions */
3943
#define FLB_SP_NOW 10
@@ -43,12 +47,6 @@
4347
#define FLB_SP_RECORD_TAG 20
4448
#define FLB_SP_RECORD_TIME 21
4549

46-
/* Timeseries functions */
47-
#define FLB_SP_TIMESERIES_START 30
48-
#define FLB_SP_FORECAST 30
49-
#define FLB_SP_FORECAST_R 31
50-
#define FLB_SP_TIMESERIES_END 39
51-
5250
/* Status */
5351
#define FLB_SP_OK 0
5452
#define FLB_SP_ERROR -1
@@ -78,7 +76,6 @@ enum Expressions {
7876
FLB_EXP_NULL,
7977
FLB_EXP_FUNC,
8078
FLB_EXP_PARAM,
81-
FLB_EXP_TIMESERIES,
8279
};
8380

8481
/* Logical operation */
@@ -124,13 +121,13 @@ struct flb_sp_cmd_key {
124121
int aggr_func; /* Aggregation function */
125122
int time_func; /* Time function */
126123
int record_func; /* Record function */
127-
int timeseries_func; /* Timeseries function */
128124
flb_sds_t name; /* Parent Key name */
129125
flb_sds_t alias; /* Key output alias (key AS alias) */
130-
flb_sds_t name_keys; /* Key name with sub-keys */
131126
void *gb_key; /* Key name reference to gb_key */
127+
// TODO: make it a general union type (or array of values)
128+
int constant; /* constant parameter value
129+
(used specifically for timeseries_forecast) */
132130
struct mk_list *subkeys; /* sub-keys selection */
133-
struct flb_exp_timeseries *timeseries; /* Timeseries functions */
134131
struct mk_list _head; /* Link to flb_sp_cmd->keys */
135132
};
136133

@@ -158,13 +155,7 @@ struct flb_sp_cmd {
158155
struct flb_sp_window window; /* WINDOW window in select statement */
159156

160157
struct mk_list gb_keys; /* list head of group-by record fields */
161-
162-
int timeseries_num; /* Number of timeseries functions */
163-
/*
164-
* This keeps the temporary list of parameters parameters in timeseries
165-
* functions during SQL statement parsing.
166-
*/
167-
struct mk_list *tmp_params;
158+
char *alias;
168159

169160
/*
170161
* When parsing a SQL statement that have references to keys with sub-keys
@@ -244,21 +235,6 @@ struct flb_exp_param {
244235
struct flb_exp *param;
245236
};
246237

247-
struct flb_exp_timeseries {
248-
int type;
249-
struct mk_list _head;
250-
struct mk_list params;
251-
252-
struct timeseries *(*cb_func_alloc) (int);
253-
struct timeseries *(*cb_func_clone) (struct timeseries *);
254-
void (*cb_func_add) (struct timeseries *, struct flb_time *);
255-
void (*cb_func_rem) (struct timeseries *, struct timeseries *,
256-
struct flb_time *);
257-
void (*cb_func_calc) (struct timeseries *, struct flb_sp_cmd_key *,
258-
msgpack_packer *, int, struct flb_time *);
259-
void (*cb_func_destroy) (struct timeseries *);
260-
};
261-
262238
struct flb_sp_cmd *flb_sp_cmd_create(const char *sql);
263239
void flb_sp_cmd_destroy(struct flb_sp_cmd *cmd);
264240

@@ -271,9 +247,9 @@ void flb_sp_cmd_stream_prop_del(struct flb_sp_cmd_prop *prop);
271247
const char *flb_sp_cmd_stream_prop_get(struct flb_sp_cmd *cmd, const char *key);
272248

273249
/* Selection keys */
274-
int flb_sp_cmd_key_add(struct flb_sp_cmd *cmd, int func,
275-
const char *key_name, const char *key_alias);
250+
int flb_sp_cmd_key_add(struct flb_sp_cmd *cmd, int func, const char *key_name);
276251
void flb_sp_cmd_key_del(struct flb_sp_cmd_key *key);
252+
void flb_sp_cmd_alias_add(struct flb_sp_cmd *cmd, const char *key_alias);
277253
int flb_sp_cmd_source(struct flb_sp_cmd *cmd, int type, const char *source);
278254
void flb_sp_cmd_dump(struct flb_sp_cmd *cmd);
279255

@@ -307,9 +283,7 @@ void flb_sp_cmd_gb_key_del(struct flb_sp_cmd_gb_key *key);
307283

308284
void flb_sp_cmd_limit_add(struct flb_sp_cmd *cmd, int limit);
309285

310-
/* Timeseries */
311-
int flb_sp_cmd_param_add(struct flb_sp_cmd *cmd, int func, struct flb_exp *param);
312-
int flb_sp_cmd_timeseries(struct flb_sp_cmd *cmd, char *func, const char *key_alias);
313-
void flb_cmd_params_del(struct mk_list *params);
286+
int flb_sp_cmd_timeseries_forecast(struct flb_sp_cmd *cmd, int func,
287+
const char *key_name, int seconds);
314288

315289
#endif

0 commit comments

Comments
 (0)