Skip to content

Commit 3c3d206

Browse files
restrict for OSS
1 parent e3cbb16 commit 3c3d206

File tree

4 files changed

+102
-37
lines changed

4 files changed

+102
-37
lines changed

src/handlers/http/ingest.rs

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -123,22 +123,30 @@ pub async fn ingest(
123123

124124
//if stream exists, fetch the stream log source
125125
//return error if the stream log source is otel traces or otel metrics
126-
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
127-
stream
128-
.get_log_source()
129-
.iter()
130-
.find(|&stream_log_source_entry| {
131-
stream_log_source_entry.log_source_format != LogSource::OtelTraces
132-
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
133-
})
134-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
135-
}
126+
let stream = match PARSEABLE.get_stream(&stream_name) {
127+
Ok(stream) => {
128+
stream
129+
.get_log_source()
130+
.iter()
131+
.find(|&stream_log_source_entry| {
132+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
133+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
134+
})
135+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
136+
stream
137+
}
138+
Err(e) => return Err(PostError::from(e)),
139+
};
136140

137141
PARSEABLE
138142
.add_update_log_source(&stream_name, log_source_entry)
139143
.await?;
140144

141-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
145+
if stream.get_time_partition().is_some() {
146+
return Err(PostError::IngestionNotAllowedWithTimePartition);
147+
}
148+
149+
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?;
142150

143151
Ok(HttpResponse::Ok().finish())
144152
}
@@ -267,6 +275,7 @@ where
267275
stream_name,
268276
log_source,
269277
&p_custom_fields,
278+
None,
270279
)
271280
.await?;
272281
} else if content_type == CONTENT_TYPE_PROTOBUF {
@@ -281,7 +290,8 @@ where
281290
match decode_protobuf(body) {
282291
Ok(decoded) => {
283292
for record in flatten_protobuf(&decoded) {
284-
push_logs(stream_name, record, log_source, &p_custom_fields).await?;
293+
push_logs(stream_name, record, log_source, &p_custom_fields, None)
294+
.await?;
285295
}
286296
}
287297
Err(e) => {
@@ -452,18 +462,31 @@ pub async fn post_event(
452462

453463
//if stream exists, fetch the stream log source
454464
//return error if the stream log source is otel traces or otel metrics
455-
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
456-
stream
457-
.get_log_source()
458-
.iter()
459-
.find(|&stream_log_source_entry| {
460-
stream_log_source_entry.log_source_format != LogSource::OtelTraces
461-
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
462-
})
463-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
464-
}
465+
let stream = match PARSEABLE.get_stream(&stream_name) {
466+
Ok(stream) => {
467+
stream
468+
.get_log_source()
469+
.iter()
470+
.find(|&stream_log_source_entry| {
471+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
472+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
473+
})
474+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
475+
stream
476+
}
477+
Err(e) => return Err(PostError::from(e)),
478+
};
465479

466-
flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
480+
let time_partition = stream.get_time_partition();
481+
482+
flatten_and_push_logs(
483+
json,
484+
&stream_name,
485+
&log_source,
486+
&p_custom_fields,
487+
time_partition,
488+
)
489+
.await?;
467490

468491
Ok(HttpResponse::Ok().finish())
469492
}
@@ -543,6 +566,8 @@ pub enum PostError {
543566
InvalidQueryParameter,
544567
#[error("Missing query parameter")]
545568
MissingQueryParameter,
569+
#[error("Ingestion is not allowed to stream with time partition")]
570+
IngestionNotAllowedWithTimePartition,
546571
}
547572

548573
impl actix_web::ResponseError for PostError {
@@ -574,6 +599,7 @@ impl actix_web::ResponseError for PostError {
574599
PostError::FieldsCountLimitExceeded(_, _, _) => StatusCode::BAD_REQUEST,
575600
PostError::InvalidQueryParameter => StatusCode::BAD_REQUEST,
576601
PostError::MissingQueryParameter => StatusCode::BAD_REQUEST,
602+
PostError::IngestionNotAllowedWithTimePartition => StatusCode::BAD_REQUEST,
577603
}
578604
}
579605

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub async fn flatten_and_push_logs(
5353
stream_name: &str,
5454
log_source: &LogSource,
5555
p_custom_fields: &HashMap<String, String>,
56+
time_partition: Option<String>,
5657
) -> Result<(), PostError> {
5758
// Verify the dataset fields count
5859
verify_dataset_fields_count(stream_name)?;
@@ -63,30 +64,67 @@ pub async fn flatten_and_push_logs(
6364
let message: Message = serde_json::from_value(json)?;
6465
let flattened_kinesis_data = flatten_kinesis_logs(message).await?;
6566
let record = convert_to_array(flattened_kinesis_data)?;
66-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
67+
push_logs(
68+
stream_name,
69+
record,
70+
log_source,
71+
p_custom_fields,
72+
time_partition,
73+
)
74+
.await?;
6775
}
6876
LogSource::OtelLogs => {
6977
//custom flattening required for otel logs
7078
let logs: LogsData = serde_json::from_value(json)?;
7179
for record in flatten_otel_logs(&logs) {
72-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
80+
push_logs(
81+
stream_name,
82+
record,
83+
log_source,
84+
p_custom_fields,
85+
time_partition.clone(),
86+
)
87+
.await?;
7388
}
7489
}
7590
LogSource::OtelTraces => {
7691
//custom flattening required for otel traces
7792
let traces: TracesData = serde_json::from_value(json)?;
7893
for record in flatten_otel_traces(&traces) {
79-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
94+
push_logs(
95+
stream_name,
96+
record,
97+
log_source,
98+
p_custom_fields,
99+
time_partition.clone(),
100+
)
101+
.await?;
80102
}
81103
}
82104
LogSource::OtelMetrics => {
83105
//custom flattening required for otel metrics
84106
let metrics: MetricsData = serde_json::from_value(json)?;
85107
for record in flatten_otel_metrics(metrics) {
86-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
108+
push_logs(
109+
stream_name,
110+
record,
111+
log_source,
112+
p_custom_fields,
113+
time_partition.clone(),
114+
)
115+
.await?;
87116
}
88117
}
89-
_ => push_logs(stream_name, json, log_source, p_custom_fields).await?,
118+
_ => {
119+
push_logs(
120+
stream_name,
121+
json,
122+
log_source,
123+
p_custom_fields,
124+
time_partition,
125+
)
126+
.await?
127+
}
90128
}
91129

92130
Ok(())
@@ -97,9 +135,9 @@ pub async fn push_logs(
97135
json: Value,
98136
log_source: &LogSource,
99137
p_custom_fields: &HashMap<String, String>,
138+
time_partition: Option<String>,
100139
) -> Result<(), PostError> {
101140
let stream = PARSEABLE.get_stream(stream_name)?;
102-
let time_partition = stream.get_time_partition();
103141
let time_partition_limit = PARSEABLE
104142
.get_stream(stream_name)?
105143
.get_time_partition_limit();

src/handlers/mod.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,14 @@ pub mod http;
2525
pub mod livetail;
2626

2727
pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
28-
const LOG_SOURCE_KEY: &str = "x-p-log-source";
29-
const EXTRACT_LOG_KEY: &str = "x-p-extract-log";
30-
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
31-
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
32-
const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
33-
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
34-
const AUTHORIZATION_KEY: &str = "authorization";
35-
const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
28+
pub const LOG_SOURCE_KEY: &str = "x-p-log-source";
29+
pub const EXTRACT_LOG_KEY: &str = "x-p-extract-log";
30+
pub const TIME_PARTITION_KEY: &str = "x-p-time-partition";
31+
pub const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
32+
pub const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
33+
pub const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
34+
pub const AUTHORIZATION_KEY: &str = "authorization";
35+
pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
3636
pub const STREAM_TYPE_KEY: &str = "x-p-stream-type";
3737
pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type";
3838
const COOKIE_AGE_DAYS: usize = 7;

src/storage/field_stats.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ pub async fn calculate_field_stats(
123123
DATASET_STATS_STREAM_NAME,
124124
&LogSource::Json,
125125
&HashMap::new(),
126+
None,
126127
)
127128
.await?;
128129
Ok(stats_calculated)

0 commit comments

Comments
 (0)