Skip to content

Commit 9cf92bd

Browse files
feat: enable time partition
create stream with custom time partition use header - X-P-Time-Partition: <field-name> - this has to be a timestamp field X-P-Time-Partition-Limit: max historical range default to 30d server validates if all events in the batch has this time partition field if false, it rejects the whole batch
1 parent dc04eb3 commit 9cf92bd

File tree

5 files changed

+269
-52
lines changed

5 files changed

+269
-52
lines changed

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -108,25 +108,14 @@ pub async fn push_logs(
108108
let schema_version = stream.get_schema_version();
109109
let p_timestamp = Utc::now();
110110

111-
let data = if time_partition.is_some() || custom_partition.is_some() {
112-
convert_array_to_object(
113-
json,
114-
time_partition.as_ref(),
115-
time_partition_limit,
116-
custom_partition.as_ref(),
117-
schema_version,
118-
log_source,
119-
)?
120-
} else {
121-
vec![convert_to_array(convert_array_to_object(
122-
json,
123-
None,
124-
None,
125-
None,
126-
schema_version,
127-
log_source,
128-
)?)?]
129-
};
111+
let data = convert_array_to_object(
112+
json,
113+
time_partition.as_ref(),
114+
time_partition_limit,
115+
custom_partition.as_ref(),
116+
schema_version,
117+
log_source,
118+
)?;
130119

131120
for json in data {
132121
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length

src/parseable/mod.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -525,17 +525,23 @@ impl Parseable {
525525
.await;
526526
}
527527

528-
if !time_partition.is_empty() || !time_partition_limit.is_empty() {
529-
return Err(StreamError::Custom {
530-
msg: "Creating stream with time partition is not supported anymore".to_string(),
531-
status: StatusCode::BAD_REQUEST,
532-
});
533-
}
528+
let time_partition_in_days = if !time_partition_limit.is_empty() {
529+
Some(validate_time_partition_limit(&time_partition_limit)?)
530+
} else {
531+
None
532+
};
534533

535534
if let Some(custom_partition) = &custom_partition {
536535
validate_custom_partition(custom_partition)?;
537536
}
538537

538+
if !time_partition.is_empty() && custom_partition.is_some() {
539+
return Err(StreamError::Custom {
540+
msg: "Cannot set both time partition and custom partition".to_string(),
541+
status: StatusCode::BAD_REQUEST,
542+
});
543+
}
544+
539545
let schema = validate_static_schema(
540546
body,
541547
stream_name,
@@ -547,7 +553,7 @@ impl Parseable {
547553
self.create_stream(
548554
stream_name.to_string(),
549555
&time_partition,
550-
None,
556+
time_partition_in_days,
551557
custom_partition.as_ref(),
552558
static_schema_flag,
553559
schema,

src/parseable/staging/writer.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ use arrow_schema::Schema;
3131
use arrow_select::concat::concat_batches;
3232
use chrono::Utc;
3333
use itertools::Itertools;
34-
use tracing::{error, warn};
34+
use rand::distributions::{Alphanumeric, DistString};
35+
use tracing::error;
3536

3637
use crate::{
3738
parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION},
@@ -92,8 +93,15 @@ impl Drop for DiskWriter {
9293
let mut arrow_path = self.path.to_owned();
9394
arrow_path.set_extension(ARROW_FILE_EXTENSION);
9495

96+
// If file exists, append a random string before .date to avoid overwriting
9597
if arrow_path.exists() {
96-
warn!("File {arrow_path:?} exists and will be overwritten");
98+
let file_name = arrow_path.file_name().unwrap().to_string_lossy();
99+
let date_pos = file_name
100+
.find(".date")
101+
.expect("File name should contain .date");
102+
let random_suffix = Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
103+
let new_name = format!("{}{}", random_suffix, &file_name[date_pos..]);
104+
arrow_path.set_file_name(new_name);
97105
}
98106

99107
if let Err(err) = std::fs::rename(&self.path, &arrow_path) {

src/parseable/streams.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ use crate::{
5050
DEFAULT_TIMESTAMP_KEY,
5151
format::{LogSource, LogSourceEntry},
5252
},
53-
handlers::http::modal::{ingest_server::INGESTOR_META, query_server::QUERIER_META},
5453
metadata::{LogStreamMetadata, SchemaVersion},
5554
metrics,
5655
option::Mode,
@@ -274,7 +273,7 @@ impl Stream {
274273
init_signal: bool,
275274
shutdown_signal: bool,
276275
) -> HashMap<PathBuf, Vec<PathBuf>> {
277-
let random_string = self.get_node_id_string();
276+
let random_string = ulid::Ulid::new().to_string();
278277
let inprocess_dir = Self::inprocess_folder(&self.data_path, group_minute);
279278

280279
let arrow_files = self.fetch_arrow_files_for_conversion(exclude, shutdown_signal);
@@ -322,21 +321,6 @@ impl Stream {
322321
grouped
323322
}
324323

325-
/// Returns the node id string for file naming.
326-
fn get_node_id_string(&self) -> String {
327-
match self.options.mode {
328-
Mode::Query => QUERIER_META
329-
.get()
330-
.map(|querier_metadata| querier_metadata.get_node_id())
331-
.expect("Querier metadata should be set"),
332-
Mode::Ingest => INGESTOR_META
333-
.get()
334-
.map(|ingestor_metadata| ingestor_metadata.get_node_id())
335-
.expect("Ingestor metadata should be set"),
336-
_ => "000000000000000".to_string(),
337-
}
338-
}
339-
340324
/// Returns a mapping for inprocess arrow files (init_signal=true).
341325
fn group_inprocess_arrow_files(&self, random_string: &str) -> HashMap<PathBuf, Vec<PathBuf>> {
342326
let mut grouped: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();

0 commit comments

Comments
 (0)