Skip to content

Commit 4c00b5e

Browse files
update prism home api
1 parent 1055c81 commit 4c00b5e

File tree

2 files changed

+17
-32
lines changed

2 files changed

+17
-32
lines changed

src/connectors/kafka/processor.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,6 @@
1616
*
1717
*/
1818

19-
use async_trait::async_trait;
20-
use futures_util::StreamExt;
21-
use rdkafka::consumer::{CommitMode, Consumer};
22-
use serde_json::Value;
23-
use std::collections::HashMap;
24-
use std::sync::Arc;
25-
use tokio_stream::wrappers::ReceiverStream;
26-
use tracing::{debug, error};
2719
use crate::{
2820
connectors::common::processor::Processor,
2921
event::{
@@ -34,6 +26,14 @@ use crate::{
3426
parseable::PARSEABLE,
3527
storage::StreamType,
3628
};
29+
use async_trait::async_trait;
30+
use futures_util::StreamExt;
31+
use rdkafka::consumer::{CommitMode, Consumer};
32+
use serde_json::Value;
33+
use std::collections::HashMap;
34+
use std::sync::Arc;
35+
use tokio_stream::wrappers::ReceiverStream;
36+
use tracing::{debug, error};
3737

3838
use super::{ConsumerRecord, StreamConsumer, TopicPartition, config::BufferConfig};
3939

src/prism/home/mod.rs

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,19 @@ use tracing::error;
2929
use crate::{
3030
alerts::{ALERTS, AlertError, AlertsSummary, get_alerts_summary},
3131
correlation::{CORRELATIONS, CorrelationError},
32-
event::format::LogSource,
33-
handlers::http::{cluster::fetch_daily_stats, logstream::error::StreamError},
32+
handlers::{
33+
TelemetryType,
34+
http::{cluster::fetch_daily_stats, logstream::error::StreamError},
35+
},
3436
parseable::PARSEABLE,
3537
rbac::{Users, map::SessionKey, role::Action},
3638
stats::Stats,
3739
storage::{ObjectStorageError, ObjectStoreFormat, STREAM_ROOT_DIRECTORY, StreamType},
3840
users::{dashboards::DASHBOARDS, filters::FILTERS},
3941
};
4042

41-
type StreamMetadataResponse = Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError>;
43+
type StreamMetadataResponse =
44+
Result<(String, Vec<ObjectStoreFormat>, TelemetryType), PrismHomeError>;
4245

4346
#[derive(Debug, Serialize, Default)]
4447
pub struct DatedStats {
@@ -48,17 +51,10 @@ pub struct DatedStats {
4851
storage: u64,
4952
}
5053

51-
#[derive(Debug, Serialize)]
52-
enum DataSetType {
53-
Logs,
54-
Metrics,
55-
Traces,
56-
}
57-
5854
#[derive(Debug, Serialize)]
5955
pub struct DataSet {
6056
title: String,
61-
dataset_type: DataSetType,
57+
dataset_type: TelemetryType,
6258
}
6359

6460
#[derive(Debug, Serialize)]
@@ -208,7 +204,7 @@ fn get_top_5_streams_by_ingestion(
208204

209205
async fn get_stream_metadata(
210206
stream: String,
211-
) -> Result<(String, Vec<ObjectStoreFormat>, DataSetType), PrismHomeError> {
207+
) -> Result<(String, Vec<ObjectStoreFormat>, TelemetryType), PrismHomeError> {
212208
let path = RelativePathBuf::from_iter([&stream, STREAM_ROOT_DIRECTORY]);
213209
let obs = PARSEABLE
214210
.storage
@@ -237,18 +233,7 @@ async fn get_stream_metadata(
237233
)));
238234
}
239235

240-
// let log_source = &stream_jsons[0].clone().log_source;
241-
let log_source_format = stream_jsons
242-
.iter()
243-
.find(|sj| !sj.log_source.is_empty())
244-
.map(|sj| sj.log_source[0].log_source_format.clone())
245-
.unwrap_or_default();
246-
247-
let dataset_type = match log_source_format {
248-
LogSource::OtelMetrics => DataSetType::Metrics,
249-
LogSource::OtelTraces => DataSetType::Traces,
250-
_ => DataSetType::Logs,
251-
};
236+
let dataset_type = stream_jsons[0].telemetry_type;
252237

253238
Ok((stream, stream_jsons, dataset_type))
254239
}

0 commit comments

Comments
 (0)