Skip to content

Commit 3564be4

Browse files
author
Devdutt Shenoi
committed
clone less during ingestion
1 parent 8dee91a commit 3564be4

File tree

5 files changed

+118
-132
lines changed

5 files changed

+118
-132
lines changed

src/event/format/json.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ impl EventFormat for Event {
4949
static_schema_flag: Option<&String>,
5050
time_partition: Option<&String>,
5151
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
52-
let data = flatten_json_body(self.data, None, None, None, false)?;
52+
let data = flatten_json_body(&self.data, None, None, None, false)?;
5353
let stream_schema = schema;
5454

5555
// incoming event may be a single json or a json array

src/handlers/http/ingest.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
6161
}
6262
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
6363

64-
flatten_and_push_logs(req, body, stream_name).await?;
64+
flatten_and_push_logs(req, body, &stream_name).await?;
6565
Ok(HttpResponse::Ok().finish())
6666
} else {
6767
Err(PostError::Header(ParseHeaderError::MissingStreamName))
@@ -114,9 +114,9 @@ pub async fn handle_otel_ingestion(
114114
.iter()
115115
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
116116
{
117-
let stream_name = stream_name.to_str().unwrap().to_owned();
118-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
119-
push_logs(stream_name.to_string(), req.clone(), body).await?;
117+
let stream_name = stream_name.to_str().unwrap();
118+
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;
119+
push_logs(stream_name, &req, &body).await?;
120120
} else {
121121
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
122122
}
@@ -149,7 +149,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
149149
}
150150
}
151151

152-
flatten_and_push_logs(req, body, stream_name).await?;
152+
flatten_and_push_logs(req, body, &stream_name).await?;
153153
Ok(HttpResponse::Ok().finish())
154154
}
155155

@@ -314,7 +314,7 @@ mod tests {
314314
.append_header((PREFIX_META.to_string() + "C", "meta1"))
315315
.to_http_request();
316316

317-
let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
317+
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();
318318

319319
assert_eq!(rb.num_rows(), 1);
320320
assert_eq!(rb.num_columns(), 6);
@@ -354,7 +354,7 @@ mod tests {
354354

355355
let req = TestRequest::default().to_http_request();
356356

357-
let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
357+
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();
358358

359359
assert_eq!(rb.num_rows(), 1);
360360
assert_eq!(rb.num_columns(), 5);
@@ -386,7 +386,7 @@ mod tests {
386386

387387
let req = TestRequest::default().to_http_request();
388388

389-
let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
389+
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();
390390

391391
assert_eq!(rb.num_rows(), 1);
392392
assert_eq!(rb.num_columns(), 5);
@@ -418,7 +418,7 @@ mod tests {
418418

419419
let req = TestRequest::default().to_http_request();
420420

421-
assert!(into_event_batch(req, json, schema, None, None).is_err());
421+
assert!(into_event_batch(&req, &json, schema, None, None).is_err());
422422
}
423423

424424
#[test]
@@ -436,7 +436,7 @@ mod tests {
436436

437437
let req = TestRequest::default().to_http_request();
438438

439-
let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
439+
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();
440440

441441
assert_eq!(rb.num_rows(), 1);
442442
assert_eq!(rb.num_columns(), 3);
@@ -448,7 +448,7 @@ mod tests {
448448

449449
let req = TestRequest::default().to_http_request();
450450

451-
assert!(into_event_batch(req, json, HashMap::default(), None, None).is_err())
451+
assert!(into_event_batch(&req, &json, HashMap::default(), None, None).is_err())
452452
}
453453

454454
#[test]
@@ -471,7 +471,7 @@ mod tests {
471471

472472
let req = TestRequest::default().to_http_request();
473473

474-
let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
474+
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();
475475

476476
assert_eq!(rb.num_rows(), 3);
477477
assert_eq!(rb.num_columns(), 6);
@@ -519,7 +519,7 @@ mod tests {
519519

520520
let req = TestRequest::default().to_http_request();
521521

522-
let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
522+
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();
523523

524524
assert_eq!(rb.num_rows(), 3);
525525
assert_eq!(rb.num_columns(), 6);
@@ -567,7 +567,7 @@ mod tests {
567567
);
568568
let req = TestRequest::default().to_http_request();
569569

570-
let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap();
570+
let (rb, _) = into_event_batch(&req, &json, schema, None, None).unwrap();
571571

572572
assert_eq!(rb.num_rows(), 3);
573573
assert_eq!(rb.num_columns(), 6);
@@ -616,7 +616,7 @@ mod tests {
616616
.into_iter(),
617617
);
618618

619-
assert!(into_event_batch(req, json, schema, None, None).is_err());
619+
assert!(into_event_batch(&req, &json, schema, None, None).is_err());
620620
}
621621

622622
#[test]
@@ -644,7 +644,7 @@ mod tests {
644644

645645
let req = TestRequest::default().to_http_request();
646646

647-
let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap();
647+
let (rb, _) = into_event_batch(&req, &json, HashMap::default(), None, None).unwrap();
648648

649649
assert_eq!(rb.num_rows(), 4);
650650
assert_eq!(rb.num_columns(), 7);

0 commit comments

Comments
 (0)