Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions src/check_config_provider/redis_config_provider.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
use anyhow::Result;
use std::sync::Arc;
use std::{collections::HashSet, time::Instant};
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;

use crate::{
app::config::Config, manager::Manager, redis::RedisClient, types::check_config::CheckConfig,
};

use anyhow::Result;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::sync::Arc;
use std::time::Duration;
use std::{collections::HashSet, time::Instant};
use tokio::task::JoinHandle;
use tokio::time::interval;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

#[derive(Debug)]
Expand Down Expand Up @@ -73,6 +71,20 @@ pub struct RedisConfigProvider {
check_interval: Duration,
}

fn deserialize_rmp(config_payload: &Vec<u8>) -> Option<CheckConfig> {
match rmp_serde::from_slice::<CheckConfig>(config_payload) {
Ok(config) => Some(config),
Err(err) => {
if let Ok(config) = rmp_serde::from_slice::<serde_json::Value>(config_payload) {
tracing::error!(?err, ?config, "config_consumer.invalid_config_message");
} else {
tracing::error!(?err, "config_consumer.invalid_config_message");
}
None
}
}
}

impl RedisConfigProvider {
pub fn new(
redis_url: &str,
Expand Down Expand Up @@ -158,14 +170,13 @@ impl RedisConfigProvider {
.set(config_payloads.len() as f64);

for config_payload in config_payloads {
match rmp_serde::from_slice(&config_payload) {
Ok(config) => manager
if let Some(config) = deserialize_rmp(&config_payload) {
manager
.get_service(partition.number)
.get_config_store()
.write()
.expect("lock not poisoned")
.add_config(config),
Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"),
.add_config(config);
}
}
let partition_loading_time = partition_start_loading.elapsed().as_secs_f64();
Expand Down Expand Up @@ -250,21 +261,18 @@ impl RedisConfigProvider {
.await;

for config_payload in config_payloads {
match rmp_serde::from_slice::<CheckConfig>(&config_payload) {
Ok(config) => {
tracing::debug!(
partition = partition.number,
subscription_id = %config.subscription_id,
"redis_config_provider.upserting_config"
);
manager
.get_service(partition.number)
.get_config_store()
.write()
.expect("lock not poisoned")
.add_config(config);
}
Err(err) => tracing::error!(?err, "config_consumer.invalid_config_message"),
if let Some(config) = deserialize_rmp(&config_payload) {
tracing::debug!(
partition = partition.number,
subscription_id = %config.subscription_id,
"redis_config_provider.upserting_config"
);
manager
.get_service(partition.number)
.get_config_store()
.write()
.expect("lock not poisoned")
.add_config(config);
}
}
let partition_update_duration = partition_update_start.elapsed().as_secs_f64();
Expand Down
Loading