Skip to content
This repository was archived by the owner on May 22, 2025. It is now read-only.

Commit f04a619

Browse files
authored
Merge pull request #2 from yogstation13/influxdb2
Influxdb2
2 parents daeeedf + 962089d commit f04a619

File tree

9 files changed

+152
-3
lines changed

9 files changed

+152
-3
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ rayon = { version = "1.5", optional = true }
5656
dbpnoise = { version = "0.1.2", optional = true }
5757
pathfinding = { version = "3.0.13", optional = true }
5858
num = { version = "0.4.0", optional = true }
59+
concat-string = { version = "1.0.1", optional = true }
5960

6061
[features]
6162
default = [
@@ -101,6 +102,7 @@ hash = [
101102
"serde",
102103
"serde_json",
103104
]
105+
influxdb2 = ["concat-string", "serde", "serde_json", "http"]
104106
pathfinder = ["num", "pathfinding", "serde", "serde_json"]
105107
redis_pubsub = ["flume", "redis", "serde", "serde_json"]
106108
unzip = ["zip", "jobs"]

dmsrc/influxdb2.dm

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#define rustg_influxdb2_publish(data, endpoint, token) RUSTG_CALL(RUST_G, "influxdb2_publish")(data, endpoint, token)
2+
#define rustg_influxdb2_publish_profile(data, endpoint, token, round_id) RUSTG_CALL(RUST_G, "influxdb2_publish_profile")(data, endpoint, token, round_id)

dmsrc/time.dm

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,6 @@
44

55
/proc/rustg_unix_timestamp()
66
return text2num(RUSTG_CALL(RUST_G, "unix_timestamp")())
7+
8+
/proc/rustg_unix_timestamp_int()
9+
return RUSTG_CALL(RUST_G, "unix_timestamp_int")()

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ pub enum Error {
6161
#[cfg(feature = "hash")]
6262
#[error("Unable to decode hex value.")]
6363
HexDecode,
64+
#[cfg(feature = "influxdb2")]
65+
#[error("Invalid metrics format")]
66+
InvalidMetrics,
6467
}
6568

6669
impl From<Utf8Error> for Error {

src/http.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,12 @@ pub static HTTP_CLIENT: Lazy<reqwest::blocking::Client> = Lazy::new(setup_http_c
8383
// ----------------------------------------------------------------------------
8484
// Request construction and execution
8585

86-
struct RequestPrep {
86+
pub struct RequestPrep {
8787
req: reqwest::blocking::RequestBuilder,
8888
output_filename: Option<String>,
8989
}
9090

91-
fn construct_request(
91+
pub fn construct_request(
9292
method: &str,
9393
url: &str,
9494
body: &str,
@@ -130,7 +130,7 @@ fn construct_request(
130130
})
131131
}
132132

133-
fn submit_request(prep: RequestPrep) -> Result<String> {
133+
pub fn submit_request(prep: RequestPrep) -> Result<String> {
134134
let mut response = prep.req.send()?;
135135

136136
let body;

src/influxdb2.rs

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
use serde::{Deserialize, Serialize};
2+
use serde_json::Value;
3+
4+
use crate::error::Error;
5+
use crate::http::{construct_request, submit_request, RequestPrep};
6+
use crate::jobs;
7+
8+
byond_fn!(
9+
fn influxdb2_publish(data, endpoint, token) {
10+
let data = data.to_owned();
11+
let endpoint = endpoint.to_owned();
12+
let token = token.to_owned();
13+
Some(jobs::start(move || {
14+
fn handle(data: &str, endpoint: &str, token: &str) -> Result<RequestPrep, Error> {
15+
let mut lines = vec!();
16+
17+
let data: Value = serde_json::from_str(data)?;
18+
for entry in data.as_array().unwrap() {
19+
let entry = entry.as_object().ok_or(Error::InvalidMetrics)?;
20+
21+
let measurement = entry.get("@measurement").ok_or(Error::InvalidMetrics)?.as_str().ok_or(Error::InvalidMetrics)?.to_owned();
22+
let mut measurement_tags = vec!{measurement};
23+
24+
let tags = entry.get("@tags").ok_or(Error::InvalidMetrics)?.as_object().ok_or(Error::InvalidMetrics)?;
25+
for (key, val) in tags {
26+
measurement_tags.push(concat_string!(key, "=", val.as_str().ok_or(Error::InvalidMetrics)?))
27+
};
28+
29+
let mut fields = vec!{};
30+
for (key, val) in entry {
31+
if key.starts_with('@') {
32+
continue;
33+
}
34+
fields.push(concat_string!(key, "=", val.to_string()))
35+
};
36+
37+
let timestamp = entry.get("@timestamp").ok_or(Error::InvalidMetrics)?.as_str().ok_or(Error::InvalidMetrics)?;
38+
lines.push(concat_string!(measurement_tags.join(","), " ", fields.join(",") , " ", timestamp));
39+
}
40+
41+
construct_request(
42+
"post",
43+
endpoint,
44+
lines.join("\n").as_str(),
45+
concat_string!("{\"Authorization\":\"Token ", token ,"\"}").as_str(),
46+
""
47+
)
48+
}
49+
50+
let req = match handle(data.as_str(), endpoint.as_str(), token.as_str()) {
51+
Ok(r) => r,
52+
Err(e) => return e.to_string()
53+
};
54+
match submit_request(req) {
55+
Ok(r) => r,
56+
Err(e) => e.to_string()
57+
}
58+
}))
59+
}
60+
);
61+
62+
#[derive(Serialize, Deserialize)]
63+
struct ProfileProcEntry {
64+
name: String,
65+
#[serde(rename = "self")]
66+
self_: f32,
67+
total: f32,
68+
real: f32,
69+
over: f32,
70+
calls: f32,
71+
}
72+
byond_fn!(
73+
fn influxdb2_publish_profile(data, endpoint, token, round_id) {
74+
let data = data.to_owned();
75+
let endpoint = endpoint.to_owned();
76+
let token = token.to_owned();
77+
let round_id = round_id.to_owned();
78+
Some(jobs::start(move || {
79+
fn handle(data: &str, endpoint: &str, token: &str, round_id: &str) -> Result<RequestPrep, Error> {
80+
let mut lines = vec!();
81+
82+
let data: Vec<ProfileProcEntry> = serde_json::from_str(data)?;
83+
let timestamp = std::time::SystemTime::now()
84+
.duration_since(std::time::UNIX_EPOCH)
85+
.unwrap()
86+
.as_secs()
87+
.to_string();
88+
for entry in data {
89+
let mut name = entry.name;
90+
if name.is_empty() {
91+
name = String::from("(no_name)");
92+
}
93+
lines.push(concat_string!("profile,proc=", name, " self=", entry.self_.to_string(), ",total=", entry.total.to_string(), ",real=", entry.real.to_string(), ",over=", entry.over.to_string(), ",calls=", entry.calls.to_string(), ",round_id=", round_id.to_string(), " ", timestamp));
94+
}
95+
96+
construct_request(
97+
"post",
98+
endpoint,
99+
lines.join("\n").as_str(),
100+
concat_string!("{\"Authorization\":\"Token ", token ,"\"}").as_str(),
101+
""
102+
)
103+
}
104+
105+
let req = match handle(data.as_str(), endpoint.as_str(), token.as_str(), round_id.as_str()) {
106+
Ok(r) => r,
107+
Err(e) => return e.to_string()
108+
};
109+
match submit_request(req) {
110+
Ok(r) => r,
111+
Err(e) => e.to_string()
112+
}
113+
}))
114+
}
115+
);

src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#![forbid(unsafe_op_in_unsafe_fn)]
22

3+
#[cfg(feature = "concat-string")]
4+
#[macro_use(concat_string)]
5+
extern crate concat_string;
36
#[macro_use]
47
mod byond;
58
#[allow(dead_code)]
@@ -24,6 +27,8 @@ pub mod git;
2427
pub mod hash;
2528
#[cfg(feature = "http")]
2629
pub mod http;
30+
#[cfg(feature = "influxdb2")]
31+
pub mod influxdb2;
2732
#[cfg(feature = "json")]
2833
pub mod json;
2934
#[cfg(feature = "log")]

src/time.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,15 @@ byond_fn!(
4747
)
4848
}
4949
);
50+
51+
byond_fn!(
52+
fn unix_timestamp_int() {
53+
Some(
54+
std::time::SystemTime::now()
55+
.duration_since(std::time::UNIX_EPOCH)
56+
.unwrap()
57+
.as_secs()
58+
.to_string(),
59+
)
60+
}
61+
);

0 commit comments

Comments
 (0)