Skip to content

Commit d734215

Browse files
committed
fix: fixed publish in webservice
Signed-off-by: Jan Zachmann <50990105+JanZachmann@users.noreply.github.com>
1 parent f7371c5 commit d734215

File tree

3 files changed

+47
-40
lines changed

3 files changed

+47
-40
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ license = "MIT OR Apache-2.0"
88
name = "omnect-device-service"
99
readme = "README.md"
1010
repository = "https://github.com/omnect/omnect-device-service.git"
11-
version = "0.41.8"
11+
version = "0.41.9"
1212

1313
[dependencies]
1414
actix-server = { version = "2.6", default-features = false }

src/web_service.rs

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,25 @@ use crate::{
55
use actix_server::ServerHandle;
66
use actix_web::{App, HttpResponse, HttpServer, http::StatusCode, web};
77
use anyhow::{Context, Result};
8-
use lazy_static::lazy_static;
98
use log::{debug, error, info};
109
use reqwest::{Client, header};
1110
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
1211
use reqwest_retry::{RetryTransientMiddleware, policies::ExponentialBackoff};
1312
use serde::{Deserialize, Serialize};
1413
use serde_json::json;
15-
use std::{collections::HashMap, env, path::Path, str::FromStr, sync::LazyLock};
14+
use std::{
15+
collections::HashMap,
16+
env,
17+
path::Path,
18+
str::FromStr,
19+
sync::{LazyLock, OnceLock},
20+
};
1621
use tokio::{
1722
sync::{Mutex, mpsc, oneshot},
1823
time::Duration,
1924
};
2025

26+
static IS_WEBSERVICE_DISABLED: OnceLock<bool> = OnceLock::new();
2127
static PUBLISH_CHANNEL_MAP: LazyLock<Mutex<serde_json::Map<String, serde_json::Value>>> =
2228
LazyLock::new(|| Mutex::new(serde_json::Map::default()));
2329
static PUBLISH_STATUS_MAP: LazyLock<Mutex<serde_json::Map<String, serde_json::Value>>> =
@@ -28,7 +34,6 @@ static PUBLISH_CLIENT: LazyLock<Mutex<ClientWithMiddleware>> = LazyLock::new(||
2834
Mutex::new(
2935
ClientBuilder::new(
3036
Client::builder()
31-
.timeout(Duration::from_secs(3))
3237
.danger_accept_invalid_certs(true)
3338
.build()
3439
.expect("building ClientWithMiddleware failed"),
@@ -82,36 +87,48 @@ struct PublishEndpoint {
8287
headers: Vec<Header>,
8388
}
8489

90+
impl PublishEndpoint {
91+
fn headers(&self) -> Result<header::HeaderMap> {
92+
let mut headers = header::HeaderMap::new();
93+
94+
for h in &self.headers {
95+
headers.insert(
96+
header::HeaderName::from_str(&h.name).context("failed to get header name")?,
97+
header::HeaderValue::from_str(&h.value).context("failed to get header value")?,
98+
);
99+
}
100+
101+
Ok(headers)
102+
}
103+
}
104+
85105
#[derive(Debug, Deserialize)]
86106
struct PublishEndpointRequest {
87107
id: String,
88108
endpoint: PublishEndpoint,
89109
}
90110

91-
lazy_static! {
92-
static ref IS_WEBSERVICE_ENABLED: bool = {
93-
env::var("DISABLE_WEBSERVICE")
94-
.unwrap_or("false".to_string())
95-
.to_lowercase()
96-
!= "true"
97-
};
98-
}
99-
100111
pub struct WebService {
101112
srv_handle: ServerHandle,
102113
}
103114

104115
impl WebService {
105116
pub async fn run(tx_request: mpsc::Sender<CommandRequest>) -> Result<Option<Self>> {
106117
// we only start web service feature if not explicitly disabled by 'DISABLE_WEBSERVICE="true"' env var
107-
if !(*IS_WEBSERVICE_ENABLED) {
118+
if *IS_WEBSERVICE_DISABLED.get_or_init(|| {
119+
env::var("DISABLE_WEBSERVICE")
120+
.unwrap_or("false".to_string())
121+
.to_lowercase()
122+
== "true"
123+
}) {
108124
info!("WebService is disabled");
109125
return Ok(None);
110126
};
111127

112128
info!("WebService is enabled");
113129

114130
if matches!(Path::new(&publish_endpoints_path!()).try_exists(), Ok(true)) {
131+
debug!("restore publish endpoints");
115132
*PUBLISH_ENDPOINTS.lock().await = from_json_file(publish_endpoints_path!())?;
116133
}
117134

@@ -396,14 +413,14 @@ impl WebService {
396413
}
397414

398415
pub async fn publish(channel: PublishChannel, value: serde_json::Value) {
399-
if !(*IS_WEBSERVICE_ENABLED) {
416+
if *IS_WEBSERVICE_DISABLED.wait() {
400417
debug!("publish: skip since feature not enabled");
401418
return;
402419
}
403420

404421
debug!("publish");
405422

406-
let msg = json!({"channel": channel.to_string(), "data": value});
423+
let msg = json!({"channel": channel.to_string(), "data": value}).to_string();
407424

408425
PUBLISH_CHANNEL_MAP
409426
.lock()
@@ -416,17 +433,21 @@ pub async fn publish(channel: PublishChannel, value: serde_json::Value) {
416433
.insert(channel.to_status_string(), value.clone());
417434

418435
for endpoint in PUBLISH_ENDPOINTS.lock().await.values() {
419-
if let Err(e) = publish_to_endpoint(&msg, endpoint).await {
420-
error!("publish: {e:#}");
421-
}
436+
let msg = msg.clone();
437+
let endpoint = endpoint.clone();
438+
tokio::spawn(async move {
439+
if let Err(e) = publish_to_endpoint(msg, &endpoint).await {
440+
error!("publish: {e:#}");
441+
}
442+
});
422443
}
423444
}
424445

425446
async fn republish_to_endpoint(endpoint: &PublishEndpoint) -> HttpResponse {
426447
for (channel, value) in PUBLISH_CHANNEL_MAP.lock().await.iter() {
427-
let msg = json!({"channel": channel, "data": value});
448+
let msg = json!({"channel": channel, "data": value}).to_string();
428449

429-
if let Err(e) = publish_to_endpoint(&msg, endpoint).await {
450+
if let Err(e) = publish_to_endpoint(msg, endpoint).await {
430451
error!("republish_to_endpoint: {e:#}");
431452
return HttpResponse::InternalServerError().body(e.to_string());
432453
}
@@ -435,30 +456,16 @@ async fn republish_to_endpoint(endpoint: &PublishEndpoint) -> HttpResponse {
435456
HttpResponse::Ok().finish()
436457
}
437458

438-
async fn publish_to_endpoint(msg: &serde_json::Value, endpoint: &PublishEndpoint) -> Result<()> {
439-
let mut headers = header::HeaderMap::new();
440-
441-
info!("try to send {msg} to {}.", endpoint.url);
442-
443-
for h in &endpoint.headers {
444-
headers.insert(
445-
header::HeaderName::from_str(&h.name)?,
446-
header::HeaderValue::from_str(&h.value)?,
447-
);
448-
}
449-
459+
async fn publish_to_endpoint(msg: String, endpoint: &PublishEndpoint) -> Result<reqwest::Response> {
450460
PUBLISH_CLIENT
451461
.lock()
452462
.await
453463
.post(&endpoint.url)
454-
.headers(headers)
455-
.body(msg.to_string())
464+
.headers(endpoint.headers()?)
465+
.body(msg)
456466
.send()
457467
.await
458-
.context("sending request failed")?
459-
.error_for_status()
460-
.context("response error")
461-
.map(|_| ())
468+
.context("publish_to_endpoint")
462469
}
463470

464471
async fn save_publish_endpoints() -> Result<()> {

0 commit comments

Comments
 (0)