Skip to content

Commit 685178c

Browse files
koleiniedsiper
authored andcommitted
sp: add timeseries function TIMESERIES_FORECAST_R (#1539)
Signed-off-by: Masoud Koleini <[email protected]>
1 parent 5e6b65f commit 685178c

File tree

6 files changed

+175
-12
lines changed

6 files changed

+175
-12
lines changed

include/fluent-bit/stream_processor/flb_sp_parser.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@
4444
#define FLB_SP_RECORD_TIME 21
4545

4646
/* Timeseries functions */
47-
#define FLB_SP_FORECAST 30
47+
#define FLB_SP_TIMESERIES_START 30
48+
#define FLB_SP_FORECAST 30
49+
#define FLB_SP_FORECAST_R 31
50+
#define FLB_SP_TIMESERIES_END 39
4851

4952
/* Status */
5053
#define FLB_SP_OK 0

include/fluent-bit/stream_processor/flb_sp_timeseries.h

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
#define FLB_SP_TIMESERIES_H
2323

2424
#include <math.h>
25+
#include <float.h>
2526
#include <fluent-bit/stream_processor/flb_sp_parser.h>
2627

27-
#define TIMESERIES_FUNCTIONS_SIZE 1
28+
#define TIMESERIES_FUNCTIONS_SIZE 2
2829

2930
typedef struct timeseries *(*timeseries_function_alloc_typ)(int);
3031
typedef struct timeseries *(*timeseries_function_clone_typ)(struct timeseries *);
@@ -209,6 +210,87 @@ void cb_forecast_calc(struct timeseries *ts, struct flb_sp_cmd_key *cmd_key,
209210
msgpack_pack_float(mp_pck, result);
210211
}
211212

213+
void cb_forecast_r_calc(struct timeseries *ts, struct flb_sp_cmd_key *cmd_key,
214+
msgpack_packer *mp_pck, int records, struct flb_time *tm)
215+
{
216+
double mean_x;
217+
double mean_y;
218+
double var_x;
219+
double cov_xy;
220+
double result;
221+
/* y = b0 + b1 * x */
222+
double b0;
223+
double b1;
224+
225+
double maximum_x;
226+
struct aggr_num *val;
227+
struct timeseries_forecast *forecast;
228+
229+
forecast = (struct timeseries_forecast *) ts;
230+
231+
mean_x = forecast->sigma_x / records;
232+
mean_y = forecast->sigma_y / records;
233+
cov_xy = (forecast->sigma_xy / (double) records) - mean_x * mean_y;
234+
var_x = (forecast->sigma_x2 / records) - mean_x * mean_x;
235+
236+
b1 = cov_xy / var_x;
237+
b0 = mean_y - b1 * mean_x;
238+
239+
240+
/* Get the cap (4th arguement) */
241+
val = ts->nums + 3;
242+
switch (val->type) {
243+
case FLB_SP_NUM_I64:
244+
maximum_x = (double) val->i64;
245+
break;
246+
case FLB_SP_NUM_F64:
247+
maximum_x = val->f64;
248+
break;
249+
default:
250+
return;
251+
break;
252+
}
253+
254+
/*
255+
* calculate forecast for value (3rd argument).
256+
*/
257+
val = ts->nums + 2;
258+
259+
if (b1 == 0) {
260+
result = maximum_x;
261+
}
262+
else {
263+
switch (val->type) {
264+
case FLB_SP_NUM_I64:
265+
result = (((double) val->i64 - b0) / b1) - *forecast->latest_x;
266+
break;
267+
case FLB_SP_NUM_F64:
268+
result = ((val->i64 - b0) / b1) - *forecast->latest_x;
269+
break;
270+
default:
271+
result = nan("");
272+
break;
273+
}
274+
275+
if (result < 0) {
276+
result = maximum_x;
277+
}
278+
}
279+
280+
/* pack the result */
281+
if (cmd_key->alias) {
282+
msgpack_pack_str(mp_pck, flb_sds_len(cmd_key->alias));
283+
msgpack_pack_str_body(mp_pck,
284+
cmd_key->alias,
285+
flb_sds_len(cmd_key->alias));
286+
}
287+
else {
288+
msgpack_pack_str(mp_pck, 10);
289+
msgpack_pack_str_body(mp_pck, "FORECAST_R", 10);
290+
}
291+
msgpack_pack_float(mp_pck, result);
292+
}
293+
212294
void cb_forecast_destroy(struct timeseries *ts)
213295
{
214296
struct timeseries_forecast *forecast;
@@ -225,36 +307,43 @@ void cb_forecast_destroy(struct timeseries *ts)
225307

226308
char *timeseries_functions[TIMESERIES_FUNCTIONS_SIZE] = {
227309
"forecast",
310+
"forecast_r",
228311
};
229312

230313
/* Timeseries function memory allocation */
231314
timeseries_function_alloc_typ timeseries_functions_alloc_ptr[TIMESERIES_FUNCTIONS_SIZE] = {
232315
cb_forecast_alloc,
316+
cb_forecast_alloc,
233317
};
234318

235319
/* Timeseries function clone */
236320
timeseries_function_clone_typ timeseries_functions_clone_ptr[TIMESERIES_FUNCTIONS_SIZE] = {
237321
cb_forecast_clone,
322+
cb_forecast_clone,
238323
};
239324

240325
/* Timeseries function record addition */
241326
timeseries_function_add_typ timeseries_functions_add_ptr[TIMESERIES_FUNCTIONS_SIZE] = {
242327
cb_forecast_add,
328+
cb_forecast_add,
243329
};
244330

245331
/* Timeseries function record removal */
246332
timeseries_function_rem_typ timeseries_functions_rem_ptr[TIMESERIES_FUNCTIONS_SIZE] = {
247333
cb_forecast_rem,
334+
cb_forecast_rem,
248335
};
249336

250337
/* Timeseries function calculation */
251338
timeseries_function_calc_typ timeseries_functions_calc_ptr[TIMESERIES_FUNCTIONS_SIZE] = {
252339
cb_forecast_calc,
340+
cb_forecast_r_calc,
253341
};
254342

255343
/* Timeseries function calculation */
256344
timeseries_function_destroy_typ timeseries_functions_destroy_ptr[TIMESERIES_FUNCTIONS_SIZE] = {
257345
cb_forecast_destroy,
346+
cb_forecast_destroy,
258347
};
259348

260349
#endif

src/stream_processor/parser/flb_sp_parser.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ struct flb_sp_cmd_key *flb_sp_key_create(struct flb_sp_cmd *cmd, int func,
184184
/* Record function */
185185
record_func = func;
186186
}
187-
else if (func >= FLB_SP_FORECAST && func <= FLB_SP_FORECAST) {
187+
else if (func >= FLB_SP_TIMESERIES_START && func <= FLB_SP_TIMESERIES_END) {
188188
/* Timeseries function */
189189
timeseries_func = func;
190190
}
@@ -834,9 +834,9 @@ int flb_sp_cmd_timeseries(struct flb_sp_cmd *cmd, char *func, const char *key_al
834834
for (i = 0; i < TIMESERIES_FUNCTIONS_SIZE; i++)
835835
{
836836
ts_name = timeseries_functions[i];
837-
if (strncmp(ts_name, func, strlen(ts_name)) == 0)
837+
if (strcmp(ts_name, func) == 0)
838838
{
839-
key = flb_sp_key_create(cmd, i + FLB_SP_FORECAST, NULL, key_alias);
839+
key = flb_sp_key_create(cmd, i + FLB_SP_TIMESERIES_START, NULL, key_alias);
840840

841841
if (!key) {
842842
return -1;

src/stream_processor/parser/sql.l

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ TIME return TIME;
8181

8282
/* Timeseries Functions */
8383
TIMESERIES_FORECAST return TIMESERIES_FORECAST;
84+
TIMESERIES_FORECAST_R return TIMESERIES_FORECAST_R;
8485

8586
/* Window Types */
8687
TUMBLING return TUMBLING;

src/stream_processor/parser/sql.y

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ void yyerror(struct flb_sp_cmd *cmd, const char *query, void *scanner,
4646
%token RECORD CONTAINS TIME
4747

4848
/* Timeseries functions */
49-
%token TIMESERIES_FORECAST
49+
%token TIMESERIES_FORECAST TIMESERIES_FORECAST_R
5050

5151
/* Time functions */
5252
%token NOW UNIX_TIMESTAMP
@@ -310,15 +310,26 @@ select: SELECT keys FROM source window where groupby ';'
310310
flb_free($7);
311311
}
312312
|
313-
TIMESERIES_FORECAST '(' params ')'
313+
TIMESERIES_FORECAST '(' param ',' param ',' param ')'
314314
{
315315
flb_sp_cmd_timeseries(cmd, "forecast", NULL);
316316
}
317317
|
318-
TIMESERIES_FORECAST '(' params ')' AS alias
318+
TIMESERIES_FORECAST '(' param ',' param ',' param ')' AS alias
319319
{
320-
flb_sp_cmd_timeseries(cmd, "forecast", $6);
321-
flb_free($6);
320+
flb_sp_cmd_timeseries(cmd, "forecast", $10);
321+
flb_free($10);
322+
}
323+
|
324+
TIMESERIES_FORECAST_R '(' param ',' param ',' param ',' param ')'
325+
{
326+
flb_sp_cmd_timeseries(cmd, "forecast_r", NULL);
327+
}
328+
|
329+
TIMESERIES_FORECAST_R '(' param ',' param ',' param ',' param ')' AS alias
330+
{
331+
flb_sp_cmd_timeseries(cmd, "forecast_r", $12);
332+
flb_free($12);
322333
}
323334
|
324335
NOW '(' ')'
@@ -531,7 +542,6 @@ select: SELECT keys FROM source window where groupby ';'
531542
{
532543
flb_sp_cmd_param_add(cmd, -1, $1);
533544
}
534-
params: param | param ',' params
535545
value: INTEGER
536546
{
537547
$$ = flb_sp_cmd_condition_integer(cmd, $1);

tests/internal/stream_processor.c

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,52 @@ static void cb_forecast_hopping_window(int id, struct task_check *check,
11791179
TEST_CHECK(ret == FLB_TRUE);
11801180
}
11811181

1182+
static void cb_forecast_r_tumbling_window(int id, struct task_check *check,
1183+
char *buf, size_t size)
1184+
{
1185+
int ret;
1186+
1187+
/* Expect one record only */
1188+
ret = mp_count_rows(buf, size);
1189+
TEST_CHECK(ret == 1);
1190+
1191+
/* Check SUM value result */
1192+
ret = mp_record_key_cmp(buf, size, 0, "FORECAST_R",
1193+
MSGPACK_OBJECT_FLOAT,
1194+
NULL, 0, 39.0);
1195+
TEST_CHECK(ret == FLB_TRUE);
1196+
1197+
/* Check AVG value result */
1198+
ret = mp_record_key_cmp(buf, size, 0, "AVG(usage)",
1199+
MSGPACK_OBJECT_FLOAT,
1200+
NULL, 0, 60.0);
1201+
1202+
TEST_CHECK(ret == FLB_TRUE);
1203+
}
1204+
1205+
static void cb_forecast_r_hopping_window(int id, struct task_check *check,
1206+
char *buf, size_t size)
1207+
{
1208+
int ret;
1209+
1210+
/* Expect one record only */
1211+
ret = mp_count_rows(buf, size);
1212+
TEST_CHECK(ret == 1);
1213+
1214+
/* Check SUM value result */
1215+
ret = mp_record_key_cmp(buf, size, 0, "FORECAST_R",
1216+
MSGPACK_OBJECT_FLOAT,
1217+
NULL, 0, 24.0);
1218+
TEST_CHECK(ret == FLB_TRUE);
1219+
1220+
/* Check AVG value result */
1221+
ret = mp_record_key_cmp(buf, size, 0, "AVG(usage)",
1222+
MSGPACK_OBJECT_FLOAT,
1223+
NULL, 0, 175.0);
1224+
1225+
TEST_CHECK(ret == FLB_TRUE);
1226+
}
1227+
11821228
/* Tests for 'test_window' */
11831229
struct task_check window_checks[] = {
11841230
{
@@ -1210,7 +1256,7 @@ struct task_check window_checks[] = {
12101256
"ADVANCE BY 2 SECOND) WHERE word3 IS NOT NULL;",
12111257
cb_hopping_window_5_second
12121258
},
1213-
{
1259+
{ /* FORECAST */
12141260
4, FLB_SP_WINDOW_TUMBLING, 1, 0,
12151261
"timeseries_forecast_window_tumbling",
12161262
"SELECT AVG(usage), TIMESERIES_FORECAST(id, usage, 20) FROM " \
@@ -1224,6 +1270,20 @@ struct task_check window_checks[] = {
12241270
"STREAM:FLB WINDOW HOPPING (5 SECOND, ADVANCE BY 2 SECOND);",
12251271
cb_forecast_hopping_window
12261272
},
1273+
{ /* FORECAST_R */
1274+
6, FLB_SP_WINDOW_TUMBLING, 1, 0,
1275+
"timeseries_forecast_r_window_tumbling",
1276+
"SELECT AVG(usage), TIMESERIES_FORECAST_R(id, usage, 500, 10000) FROM " \
1277+
"STREAM:FLB WINDOW TUMBLING (5 SECOND);",
1278+
cb_forecast_r_tumbling_window
1279+
},
1280+
{
1281+
7, FLB_SP_WINDOW_HOPPING, 5, 2,
1282+
"timeseries_forecast_r_window_hopping",
1283+
"SELECT AVG(usage), TIMESERIES_FORECAST_R(id, usage, 500, 10000) FROM " \
1284+
"STREAM:FLB WINDOW HOPPING (5 SECOND, ADVANCE BY 2 SECOND);",
1285+
cb_forecast_r_hopping_window
1286+
},
12271287
};
12281288

12291289
static void test_window()

0 commit comments

Comments
 (0)