Skip to content

Commit dd436df

Browse files
koleiniedsiper
authored andcommitted
sp: allow RECORD_TIME() as parameter in timeseries functions (#1533)
Signed-off-by: Masoud Koleini <[email protected]>
1 parent 2995289 commit dd436df

File tree

5 files changed

+39
-7
lines changed

5 files changed

+39
-7
lines changed

include/fluent-bit/stream_processor/flb_sp_parser.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ struct flb_exp_key {
205205
struct mk_list _head;
206206
flb_sds_t name;
207207
struct mk_list *subkeys;
208+
int func;
208209
};
209210

210211
struct flb_exp_func {
@@ -295,7 +296,7 @@ int flb_sp_cmd_gb_key_add(struct flb_sp_cmd *cmd, const char *key);
295296
void flb_sp_cmd_gb_key_del(struct flb_sp_cmd_gb_key *key);
296297

297298
/* Timeseries */
298-
int flb_sp_cmd_param_add(struct flb_sp_cmd *cmd, struct flb_exp *param);
299+
int flb_sp_cmd_param_add(struct flb_sp_cmd *cmd, int func, struct flb_exp *param);
299300
int flb_sp_cmd_timeseries(struct flb_sp_cmd *cmd, char *func, const char *key_alias);
300301
void flb_cmd_params_del(struct mk_list *params);
301302

src/flb_config.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,7 @@ int flb_config_set_property(struct flb_config *config,
439439
flb_free(*s_val); /* release before overwriting */
440440
}
441441

442-
*s_val = malloc(flb_sds_len(tmp) * sizeof(char));
443-
strncpy(*s_val, tmp, flb_sds_len(tmp));
442+
*s_val = flb_strdup(tmp);
444443
flb_sds_destroy(tmp);
445444
break;
446445
default:

src/stream_processor/flb_sp.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,23 @@ static int sp_process_timeseries_data(struct flb_sp_cmd *cmd,
14021402

14031403
nums_ts = f->nums;
14041404

1405+
/* Populate pre-defined function parameters */
1406+
key_id_ts = 0;
1407+
mk_list_foreach(head_ts, &ckey->timeseries->params) {
1408+
param = mk_list_entry(head_ts, struct flb_exp_param, _head);
1409+
1410+
switch (param->param->type) {
1411+
case FLB_EXP_KEY:
1412+
exp_key = (struct flb_exp_key *) param->param;
1413+
1414+
if (exp_key->func == FLB_SP_RECORD_TIME) {
1415+
nums_ts[key_id_ts].type = FLB_SP_NUM_F64;
1416+
nums_ts[key_id_ts].f64 = flb_time_to_double(tms);
1417+
}
1418+
}
1419+
key_id_ts++;
1420+
}
1421+
14051422
for (i = 0; i < map_size; i++) {
14061423
key = map.via.map.ptr[i].key;
14071424

src/stream_processor/parser/flb_sp_parser.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -792,9 +792,19 @@ void flb_sp_cmd_condition_del(struct flb_sp_cmd *cmd)
792792
}
793793

794794
/* Timeseries functions */
795-
int flb_sp_cmd_param_add(struct flb_sp_cmd *cmd, struct flb_exp *param)
795+
int flb_sp_cmd_param_add(struct flb_sp_cmd *cmd, int func, struct flb_exp *param)
796796
{
797797
struct flb_exp_param *p;
798+
struct flb_exp_key *key;
799+
800+
if (func > 0) { /* Parameter is a pre-defined function */
801+
key = (struct flb_exp_key *) flb_sp_cmd_condition_key(cmd, NULL);
802+
if (!key) {
803+
return -1;
804+
}
805+
key->func = func;
806+
param = (struct flb_exp *) key;
807+
}
798808

799809
p = flb_calloc(1, sizeof(struct flb_exp_param));
800810

src/stream_processor/parser/sql.y

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -512,19 +512,24 @@ select: SELECT keys FROM source window where groupby ';'
512512
subkey subkey
513513
param: IDENTIFIER
514514
{
515-
flb_sp_cmd_param_add(cmd, flb_sp_cmd_condition_key(cmd, $1));
515+
flb_sp_cmd_param_add(cmd, -1, flb_sp_cmd_condition_key(cmd, $1));
516516
flb_free($1);
517517
}
518518
|
519519
IDENTIFIER record_subkey
520520
{
521-
flb_sp_cmd_param_add(cmd, flb_sp_cmd_condition_key(cmd, $1));
521+
flb_sp_cmd_param_add(cmd, -1, flb_sp_cmd_condition_key(cmd, $1));
522522
flb_free($1);
523523
}
524524
|
525+
RECORD_TIME '(' ')'
526+
{
527+
flb_sp_cmd_param_add(cmd, FLB_SP_RECORD_TIME, NULL);
528+
}
529+
|
525530
value
526531
{
527-
flb_sp_cmd_param_add(cmd, $1);
532+
flb_sp_cmd_param_add(cmd, -1, $1);
528533
}
529534
params: param | param ',' params
530535
value: INTEGER

0 commit comments

Comments
 (0)