Skip to content

Commit 8951a8c

Browse files
committed
http_server: storage: new api/v1/storage endpoint for storage metrics
Signed-off-by: Eduardo Silva <[email protected]>
1 parent e973f22 commit 8951a8c

File tree

7 files changed

+256
-30
lines changed

7 files changed

+256
-30
lines changed

src/http_server/api/v1/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
set(src
33
uptime.c
44
metrics.c
5+
storage.c
56
plugins.c
67
register.c
78
)

src/http_server/api/v1/metrics.c

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
/* Fluent Bit
44
* ==========
5-
* Copyright (C) 2019 The Fluent Bit Authors
5+
* Copyright (C) 2019-2020 The Fluent Bit Authors
66
* Copyright (C) 2015-2018 Treasure Data Inc.
77
*
88
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -60,7 +60,7 @@ static struct flb_hs_buf *metrics_get_latest()
6060
}
6161

6262
/* Delete unused metrics, note that we only care about the latest node */
63-
int cleanup_metrics()
63+
static int cleanup_metrics()
6464
{
6565
int c = 0;
6666
struct mk_list *tmp;
@@ -126,6 +126,7 @@ static void cb_mq_metrics(mk_mq_t *queue, void *data, size_t size)
126126
buf = flb_malloc(sizeof(struct flb_hs_buf));
127127
if (!buf) {
128128
flb_errno();
129+
flb_sds_destroy(out_data);
129130
return;
130131
}
131132
buf->users = 0;
@@ -141,13 +142,15 @@ static void cb_mq_metrics(mk_mq_t *queue, void *data, size_t size)
141142
}
142143

143144
int string_cmp(const void* a_arg, const void* b_arg) {
144-
char* a = *(char **)a_arg;
145-
char* b = *(char **)b_arg;
145+
char *a = *(char **)a_arg;
146+
char *b = *(char **)b_arg;
147+
146148
return strcmp(a, b);
147149
}
148150

149151
size_t extract_metric_name_end_position(char *s) {
150152
int i;
153+
151154
for (i = 0; i < flb_sds_len(s); i++) {
152155
if (s[i] == '{') {
153156
return i;
@@ -157,41 +160,51 @@ size_t extract_metric_name_end_position(char *s) {
157160
}
158161

159162
int is_same_metric(char *s1, char *s2) {
160-
int i;
161-
int p1 = extract_metric_name_end_position(s1);
162-
int p2 = extract_metric_name_end_position(s2);
163-
if (p1 != p2) {
164-
return 0;
165-
}
166-
for (i = 0; i < p1; i++) {
167-
if (s1[i] != s2[i]) {
168-
return 0;
163+
int i;
164+
int p1 = extract_metric_name_end_position(s1);
165+
int p2 = extract_metric_name_end_position(s2);
166+
167+
if (p1 != p2) {
168+
return 0;
169+
}
170+
171+
for (i = 0; i < p1; i++) {
172+
if (s1[i] != s2[i]) {
173+
return 0;
174+
}
169175
}
170-
}
171-
return 1;
176+
return 1;
172177
}
173178

174179
/* derive HELP text from metricname */
175180
/* if help text length > 128, increase init memory for metric_helptxt */
176181
flb_sds_t metrics_help_txt(char *metric_name, flb_sds_t *metric_helptxt)
177182
{
178-
if (strstr(metric_name, "input_bytes")) {
183+
if (strstr(metric_name, "input_bytes")) {
179184
return flb_sds_cat(*metric_helptxt, " Number of input bytes.\n", 24);
180-
} else if (strstr(metric_name, "input_records")) {
185+
}
186+
else if (strstr(metric_name, "input_records")) {
181187
return flb_sds_cat(*metric_helptxt, " Number of input records.\n", 26);
182-
} else if (strstr(metric_name, "output_bytes")) {
188+
}
189+
else if (strstr(metric_name, "output_bytes")) {
183190
return flb_sds_cat(*metric_helptxt, " Number of output bytes.\n", 25);
184-
} else if (strstr(metric_name, "output_records")) {
191+
}
192+
else if (strstr(metric_name, "output_records")) {
185193
return flb_sds_cat(*metric_helptxt, " Number of output records.\n", 27);
186-
} else if (strstr(metric_name, "output_errors")) {
194+
}
195+
else if (strstr(metric_name, "output_errors")) {
187196
return flb_sds_cat(*metric_helptxt, " Number of output errors.\n", 26);
188-
} else if (strstr(metric_name, "output_retries_failed")) {
197+
}
198+
else if (strstr(metric_name, "output_retries_failed")) {
189199
return flb_sds_cat(*metric_helptxt, " Number of output retries failed.\n", 34);
190-
} else if (strstr(metric_name, "output_retries")) {
200+
}
201+
else if (strstr(metric_name, "output_retries")) {
191202
return flb_sds_cat(*metric_helptxt, " Number of output retries.\n", 27);
192-
} else if (strstr(metric_name, "output_proc_records")) {
203+
}
204+
else if (strstr(metric_name, "output_proc_records")) {
193205
return flb_sds_cat(*metric_helptxt, " Number of processed output records.\n", 37);
194-
} else if (strstr(metric_name, "output_proc_bytes")) {
206+
}
207+
else if (strstr(metric_name, "output_proc_bytes")) {
195208
return flb_sds_cat(*metric_helptxt, " Number of processed output bytes.\n", 35);
196209
}
197210
else {
@@ -473,7 +486,8 @@ int api_v1_metrics(struct flb_hs *hs)
473486
pthread_key_create(&hs_metrics_key, NULL);
474487

475488
/* Create a message queue */
476-
hs->qid = mk_mq_create(hs->ctx, "/metrics", cb_mq_metrics, NULL);
489+
hs->qid_metrics = mk_mq_create(hs->ctx, "/metrics",
490+
cb_mq_metrics, NULL);
477491

478492
/* HTTP end-points */
479493
mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/metrics/prometheus",

src/http_server/api/v1/metrics.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
#include <fluent-bit/flb_sds.h>
2727

2828
int api_v1_metrics(struct flb_hs *hs);
29-
3029
flb_sds_t metrics_help_txt(char *metric_name, flb_sds_t *metric_helptxt);
3130

3231
#endif

src/http_server/api/v1/register.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
/* Fluent Bit
44
* ==========
5-
* Copyright (C) 2019 The Fluent Bit Authors
5+
* Copyright (C) 2019-2020 The Fluent Bit Authors
66
* Copyright (C) 2015-2018 Treasure Data Inc.
77
*
88
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,6 +23,7 @@
2323

2424
#include "uptime.h"
2525
#include "metrics.h"
26+
#include "storage.h"
2627
#include "plugins.h"
2728

2829
int api_v1_registration(struct flb_hs *hs)
@@ -31,5 +32,9 @@ int api_v1_registration(struct flb_hs *hs)
3132
api_v1_metrics(hs);
3233
api_v1_plugins(hs);
3334

35+
if (hs->config->storage_metrics == FLB_TRUE) {
36+
api_v1_storage_metrics(hs);
37+
}
38+
3439
return 0;
3540
}

src/http_server/api/v1/storage.c

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019-2020 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#include <fluent-bit/flb_info.h>
22+
#include <fluent-bit/flb_pack.h>
23+
#include <fluent-bit/flb_sds.h>
24+
#include "storage.h"
25+
26+
#include <fluent-bit/flb_http_server.h>
27+
#include <msgpack.h>
28+
29+
pthread_key_t hs_storage_metrics_key;
30+
31+
/* Return the newest storage metrics buffer */
32+
static struct flb_hs_buf *storage_metrics_get_latest()
33+
{
34+
struct flb_hs_buf *buf;
35+
struct mk_list *metrics_list;
36+
37+
metrics_list = pthread_getspecific(hs_storage_metrics_key);
38+
if (!metrics_list) {
39+
return NULL;
40+
}
41+
42+
if (mk_list_size(metrics_list) == 0) {
43+
return NULL;
44+
}
45+
46+
buf = mk_list_entry_last(metrics_list, struct flb_hs_buf, _head);
47+
return buf;
48+
}
49+
50+
/* Delete unused metrics, note that we only care about the latest node */
51+
static int cleanup_metrics()
52+
{
53+
int c = 0;
54+
struct mk_list *tmp;
55+
struct mk_list *head;
56+
struct mk_list *metrics_list;
57+
struct flb_hs_buf *last;
58+
struct flb_hs_buf *entry;
59+
60+
metrics_list = pthread_getspecific(hs_storage_metrics_key);
61+
if (!metrics_list) {
62+
return -1;
63+
}
64+
65+
last = storage_metrics_get_latest();
66+
if (!last) {
67+
return -1;
68+
}
69+
70+
mk_list_foreach_safe(head, tmp, metrics_list) {
71+
entry = mk_list_entry(head, struct flb_hs_buf, _head);
72+
if (entry != last && entry->users == 0) {
73+
mk_list_del(&entry->_head);
74+
flb_sds_destroy(entry->data);
75+
flb_free(entry->raw_data);
76+
flb_free(entry);
77+
c++;
78+
}
79+
}
80+
81+
return c;
82+
}
83+
84+
/*
85+
* Callback invoked every time some storage metrics are received through a
86+
* message queue channel. This function runs in a Monkey HTTP thread
87+
* worker and it purpose is to take the metrics data and store it
88+
* somewhere so then it can be available by the end-points upon
89+
* HTTP client requests.
90+
*/
91+
static void cb_mq_storage_metrics(mk_mq_t *queue, void *data, size_t size)
92+
{
93+
flb_sds_t out_data;
94+
struct flb_hs_buf *buf;
95+
struct mk_list *metrics_list = NULL;
96+
97+
metrics_list = pthread_getspecific(hs_storage_metrics_key);
98+
if (!metrics_list) {
99+
metrics_list = flb_malloc(sizeof(struct mk_list));
100+
if (!metrics_list) {
101+
flb_errno();
102+
return;
103+
}
104+
mk_list_init(metrics_list);
105+
pthread_setspecific(hs_storage_metrics_key, metrics_list);
106+
}
107+
108+
/* Convert msgpack to JSON */
109+
out_data = flb_msgpack_raw_to_json_sds(data, size);
110+
if (!out_data) {
111+
return;
112+
}
113+
114+
buf = flb_malloc(sizeof(struct flb_hs_buf));
115+
if (!buf) {
116+
flb_errno();
117+
flb_sds_destroy(out_data);
118+
return;
119+
}
120+
buf->users = 0;
121+
buf->data = out_data;
122+
123+
buf->raw_data = flb_malloc(size);
124+
memcpy(buf->raw_data, data, size);
125+
buf->raw_size = size;
126+
127+
mk_list_add(&buf->_head, metrics_list);
128+
129+
cleanup_metrics();
130+
}
131+
132+
static void cb_mq_storage_metrics_exit(mk_mq_t *queue, void *data)
133+
{
134+
135+
}
136+
137+
/* API: expose built-in storage metrics /api/v1/storage */
138+
static void cb_storage(mk_request_t *request, void *data)
139+
{
140+
struct flb_hs_buf *buf;
141+
142+
buf = storage_metrics_get_latest();
143+
if (!buf) {
144+
mk_http_status(request, 404);
145+
mk_http_done(request);
146+
return;
147+
}
148+
149+
buf->users++;
150+
151+
mk_http_status(request, 200);
152+
mk_http_send(request, buf->data, flb_sds_len(buf->data), NULL);
153+
mk_http_done(request);
154+
155+
buf->users--;
156+
}
157+
158+
/* Perform registration */
159+
int api_v1_storage_metrics(struct flb_hs *hs)
160+
{
161+
pthread_key_create(&hs_storage_metrics_key, NULL);
162+
163+
/* Create a message queue */
164+
hs->qid_storage = mk_mq_create(hs->ctx, "/storage",
165+
cb_mq_storage_metrics,
166+
NULL);
167+
168+
/* HTTP end-point */
169+
mk_vhost_handler(hs->ctx, hs->vid, "/api/v1/storage", cb_storage, hs);
170+
171+
return 0;
172+
}

src/http_server/api/v1/storage.h

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019 The Fluent Bit Authors
6+
* Copyright (C) 2015-2017 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#ifndef FLB_HS_API_V1_STORAGE_METRICS_H
22+
#define FLB_HS_API_V1_STORAGE_METRICS_H
23+
24+
#include <fluent-bit/flb_info.h>
25+
#include <fluent-bit/flb_http_server.h>
26+
27+
int api_v1_storage_metrics(struct flb_hs *hs);
28+
29+
#endif

src/http_server/flb_hs.c

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,16 @@ static void cb_root(mk_request_t *request, void *data)
3636
mk_http_done(request);
3737
}
3838

39-
/* Ingest metrics into the web service context */
40-
int flb_hs_push_metrics(struct flb_hs *hs, void *data, size_t size)
39+
/* Ingest pipeline metrics into the web service context */
40+
int flb_hs_push_pipeline_metrics(struct flb_hs *hs, void *data, size_t size)
4141
{
42-
return mk_mq_send(hs->ctx, hs->qid, data, size);
42+
return mk_mq_send(hs->ctx, hs->qid_metrics, data, size);
43+
}
44+
45+
/* Ingest storage metrics into the web service context */
46+
int flb_hs_push_storage_metrics(struct flb_hs *hs, void *data, size_t size)
47+
{
48+
return mk_mq_send(hs->ctx, hs->qid_storage, data, size);
4349
}
4450

4551
/* Create ROOT endpoints */

0 commit comments

Comments
 (0)