2626#include <fluent-bit/flb_sds.h>
2727#include <fluent-bit/flb_upstream_ha.h>
2828
29+ #include <fluent-bit/flb_scheduler.h>
30+ #include <fluent-bit/flb_utils.h>
31+ #include <fluent-bit/flb_time.h>
32+ #include <sys/stat.h>
33+ #include <fcntl.h>
34+
2935/* refresh token every 50 minutes */
3036#define FLB_AZURE_KUSTO_TOKEN_REFRESH 3000
3137
5359
5460#define FLB_AZURE_KUSTO_INGEST_ENDPOINT_CONNECTION_TIMEOUT "60"
5561
62+ #define FLB_AZURE_KUSTO_BUFFER_DIR_MAX_SIZE "8G" /* 8GB buffer directory size */
63+ #define UPLOAD_TIMER_MAX_WAIT 180000
64+ #define UPLOAD_TIMER_MIN_WAIT 18000
65+ #define MAX_FILE_SIZE 4000000000 /* 4GB */
66+
67+ #define FLB_AZURE_IMDS_ENDPOINT "/metadata/identity/oauth2/token"
68+ #define FLB_AZURE_IMDS_API_VERSION "2018-02-01"
69+ #define FLB_AZURE_IMDS_RESOURCE "https://api.kusto.windows.net/"
70+
5671
5772struct flb_azure_kusto_resources {
5873 struct flb_upstream_ha * blob_ha ;
5974 struct flb_upstream_ha * queue_ha ;
6075 flb_sds_t identity_token ;
6176
6277 /* used to reload resouces after some time */
63- time_t load_time ;
78+ uint64_t load_time ;
6479};
6580
6681struct flb_azure_kusto {
@@ -75,6 +90,7 @@ struct flb_azure_kusto {
7590 flb_sds_t ingestion_mapping_reference ;
7691
7792 int ingestion_endpoint_connect_timeout ;
93+ int io_timeout ;
7894
7995 /* compress payload */
8096 int compression_enabled ;
@@ -88,14 +104,17 @@ struct flb_azure_kusto {
88104 int include_time_key ;
89105 flb_sds_t time_key ;
90106
91- /* --- internal data --- */
107+ flb_sds_t azure_kusto_buffer_key ;
92108
93- flb_sds_t ingestion_mgmt_endpoint ;
109+ /* --- internal data --- */
94110
95111 /* oauth2 context */
96112 flb_sds_t oauth_url ;
97113 struct flb_oauth2 * o ;
98114
115+ int timer_created ;
116+ int timer_ms ;
117+
99118 /* mutex for acquiring oauth tokens */
100119 pthread_mutex_t token_mutex ;
101120
@@ -107,9 +126,36 @@ struct flb_azure_kusto {
107126
108127 pthread_mutex_t blob_mutex ;
109128
129+ pthread_mutex_t buffer_mutex ;
130+
131+ int buffering_enabled ;
132+
133+ size_t file_size ;
134+ time_t upload_timeout ;
135+ time_t retry_time ;
136+
137+ int buffer_file_delete_early ;
138+ int unify_tag ;
139+ int blob_uri_length ;
140+ int scheduler_max_retries ;
141+ int delete_on_max_upload_error ;
142+
143+ int has_old_buffers ;
144+ size_t store_dir_limit_size ;
145+ /* track the total amount of buffered data */
146+ size_t current_buffer_size ;
147+ flb_sds_t buffer_dir ;
148+ char * store_dir ;
149+ struct flb_fstore * fs ;
150+ struct flb_fstore_stream * stream_active ; /* default active stream */
151+ struct flb_fstore_stream * stream_upload ;
152+
153+
110154 /* Upstream connection to the backend server */
111155 struct flb_upstream * u ;
112156
157+ struct flb_upstream * imds_upstream ;
158+
113159 /* Fluent Bit context */
114160 struct flb_config * config ;
115161
@@ -120,4 +166,4 @@ struct flb_azure_kusto {
120166flb_sds_t get_azure_kusto_token (struct flb_azure_kusto * ctx );
121167flb_sds_t execute_ingest_csl_command (struct flb_azure_kusto * ctx , const char * csl );
122168
123- #endif
169+ #endif
0 commit comments