Skip to content

Commit 3b98dd8

Browse files
trueleotheteachr
andauthored
feat: add custom table catalogue format (#561)
This PR introduces a new table format that keeps track of all the data files in the data storage. The format is inspired by Apache Iceberg so has similar naming scheme for things. Snapshot which is stored in the stream metadata file is the main entry point to a table. A snapshot essentially is a list of URL to manifest file and primary time statistics for pruning said manifest during query. A manifest file contains list of all the actual files along with their file level statistics. Currently a manifest file is generated per top level partition ( i.e date ). For old data (data ingested by older versions of Parseable) the query mechanism falls back to old style of query data. Signed-off-by: Satyam Singh <[email protected]> Co-authored-by: Nick <[email protected]>
1 parent 05de709 commit 3b98dd8

File tree

18 files changed

+1686
-423
lines changed

18 files changed

+1686
-423
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ humantime = "2.1.0"
9999
openid = { version = "0.12.0", default-features = false, features = ["rustls"] }
100100
url = "2.4.0"
101101
http-auth-basic = "0.3.3"
102+
serde_repr = "0.1.17"
102103

103104
[build-dependencies]
104105
cargo_toml = "0.15"

server/src/catalog.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::sync::Arc;
20+
21+
use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc};
22+
use relative_path::RelativePathBuf;
23+
24+
use crate::{
25+
catalog::manifest::Manifest,
26+
query::PartialTimeFilter,
27+
storage::{ObjectStorage, ObjectStorageError},
28+
};
29+
30+
use self::{column::Column, snapshot::ManifestItem};
31+
32+
pub mod column;
33+
pub mod manifest;
34+
pub mod snapshot;
35+
36+
pub use manifest::create_from_parquet_file;
37+
38+
pub trait Snapshot {
39+
fn manifests(&self, time_predicates: Vec<PartialTimeFilter>) -> Vec<ManifestItem>;
40+
}
41+
42+
pub trait ManifestFile {
43+
fn file_name(&self) -> &str;
44+
fn ingestion_size(&self) -> u64;
45+
fn file_size(&self) -> u64;
46+
fn num_rows(&self) -> u64;
47+
fn columns(&self) -> &[Column];
48+
}
49+
50+
impl ManifestFile for manifest::File {
51+
fn file_name(&self) -> &str {
52+
&self.file_path
53+
}
54+
55+
fn ingestion_size(&self) -> u64 {
56+
self.ingestion_size
57+
}
58+
59+
fn file_size(&self) -> u64 {
60+
self.file_size
61+
}
62+
63+
fn num_rows(&self) -> u64 {
64+
self.num_rows
65+
}
66+
67+
fn columns(&self) -> &[Column] {
68+
self.columns.as_slice()
69+
}
70+
}
71+
72+
pub async fn update_snapshot(
73+
storage: Arc<dyn ObjectStorage + Send>,
74+
stream_name: &str,
75+
change: manifest::File,
76+
) -> Result<(), ObjectStorageError> {
77+
fn get_file_bounds(file: &manifest::File) -> (DateTime<Utc>, DateTime<Utc>) {
78+
match file
79+
.columns()
80+
.iter()
81+
.find(|col| col.name == "p_timestamp")
82+
.unwrap()
83+
.stats
84+
.clone()
85+
.unwrap()
86+
{
87+
column::TypedStatistics::Int(stats) => (
88+
NaiveDateTime::from_timestamp_millis(stats.min)
89+
.unwrap()
90+
.and_utc(),
91+
NaiveDateTime::from_timestamp_millis(stats.min)
92+
.unwrap()
93+
.and_utc(),
94+
),
95+
_ => unreachable!(),
96+
}
97+
}
98+
99+
// get current snapshot
100+
let mut meta = storage.get_snapshot(stream_name).await?;
101+
let manifests = &mut meta.manifest_list;
102+
103+
let (lower_bound, _) = get_file_bounds(&change);
104+
let pos = manifests.iter().position(|item| {
105+
item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound
106+
});
107+
108+
// We update the manifest referenced by this position
109+
// This updates an existing file so there is no need to create a snapshot entry.
110+
if let Some(pos) = pos {
111+
let info = &mut manifests[pos];
112+
let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound);
113+
let Some(mut manifest) = storage.get_manifest(&path).await? else {
114+
return Err(ObjectStorageError::UnhandledError(
115+
"Manifest found in snapshot but not in object-storage"
116+
.to_string()
117+
.into(),
118+
));
119+
};
120+
manifest.apply_change(change);
121+
storage.put_manifest(&path, manifest).await?;
122+
} else {
123+
let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc();
124+
let upper_bound = lower_bound
125+
.date_naive()
126+
.and_time(
127+
NaiveTime::from_num_seconds_from_midnight_opt(
128+
23 * 3600 + 59 * 60 + 59,
129+
999_999_999,
130+
)
131+
.unwrap(),
132+
)
133+
.and_utc();
134+
135+
let manifest = Manifest {
136+
files: vec![change],
137+
..Manifest::default()
138+
};
139+
140+
let path = partition_path(stream_name, lower_bound, upper_bound).join("manifest.json");
141+
storage
142+
.put_object(&path, serde_json::to_vec(&manifest).unwrap().into())
143+
.await?;
144+
let path = storage.absolute_url(&path);
145+
let new_snapshot_entriy = snapshot::ManifestItem {
146+
manifest_path: path.to_string(),
147+
time_lower_bound: lower_bound,
148+
time_upper_bound: upper_bound,
149+
};
150+
manifests.push(new_snapshot_entriy);
151+
storage.put_snapshot(stream_name, meta).await?;
152+
}
153+
154+
Ok(())
155+
}
156+
157+
/// Partition the path to which this manifest belongs.
158+
/// Useful when uploading the manifest file.
159+
fn partition_path(
160+
stream: &str,
161+
lower_bound: DateTime<Utc>,
162+
upper_bound: DateTime<Utc>,
163+
) -> RelativePathBuf {
164+
let lower = lower_bound.date_naive().format("%Y-%m-%d").to_string();
165+
let upper = upper_bound.date_naive().format("%Y-%m-%d").to_string();
166+
if lower == upper {
167+
RelativePathBuf::from_iter([stream, &format!("date={}", lower)])
168+
} else {
169+
RelativePathBuf::from_iter([stream, &format!("date={}:{}", lower, upper)])
170+
}
171+
}

server/src/catalog/column.rs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2023 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
19+
use std::cmp::{max, min};
20+
21+
use parquet::file::statistics::Statistics;
22+
23+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
24+
pub struct BoolType {
25+
pub min: bool,
26+
pub max: bool,
27+
}
28+
29+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
30+
pub struct Float64Type {
31+
pub min: f64,
32+
pub max: f64,
33+
}
34+
35+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
36+
pub struct Int64Type {
37+
pub min: i64,
38+
pub max: i64,
39+
}
40+
41+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
42+
pub struct Utf8Type {
43+
pub min: String,
44+
pub max: String,
45+
}
46+
47+
// Typed statistics are typed variant of statistics
48+
// Currently all parquet types are casted down to these 4 types
49+
// Binary types are assumed to be of valid Utf8
50+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
51+
pub enum TypedStatistics {
52+
Bool(BoolType),
53+
Int(Int64Type),
54+
Float(Float64Type),
55+
String(Utf8Type),
56+
}
57+
58+
impl TypedStatistics {
59+
pub fn update(self, other: Self) -> Self {
60+
match (self, other) {
61+
(TypedStatistics::Bool(this), TypedStatistics::Bool(other)) => {
62+
TypedStatistics::Bool(BoolType {
63+
min: min(this.min, other.min),
64+
max: max(this.max, other.max),
65+
})
66+
}
67+
(TypedStatistics::Float(this), TypedStatistics::Float(other)) => {
68+
TypedStatistics::Float(Float64Type {
69+
min: this.min.min(other.min),
70+
max: this.max.max(other.max),
71+
})
72+
}
73+
(TypedStatistics::Int(this), TypedStatistics::Int(other)) => {
74+
TypedStatistics::Int(Int64Type {
75+
min: min(this.min, other.min),
76+
max: max(this.max, other.max),
77+
})
78+
}
79+
(TypedStatistics::String(this), TypedStatistics::String(other)) => {
80+
TypedStatistics::String(Utf8Type {
81+
min: min(this.min, other.min),
82+
max: max(this.max, other.max),
83+
})
84+
}
85+
_ => panic!("Cannot update wrong types"),
86+
}
87+
}
88+
}
89+
90+
/// Column statistics are used to track statistics for a column in a given file.
91+
/// This is similar to and derived from parquet statistics.
92+
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
93+
pub struct Column {
94+
pub name: String,
95+
pub stats: Option<TypedStatistics>,
96+
pub uncompressed_size: u64,
97+
pub compressed_size: u64,
98+
}
99+
100+
impl TryFrom<&Statistics> for TypedStatistics {
101+
type Error = parquet::errors::ParquetError;
102+
fn try_from(value: &Statistics) -> Result<Self, Self::Error> {
103+
if !value.has_min_max_set() {
104+
return Err(parquet::errors::ParquetError::General(
105+
"min max is not set".to_string(),
106+
));
107+
}
108+
109+
let res = match value {
110+
Statistics::Boolean(stats) => TypedStatistics::Bool(BoolType {
111+
min: *stats.min(),
112+
max: *stats.max(),
113+
}),
114+
Statistics::Int32(stats) => TypedStatistics::Int(Int64Type {
115+
min: *stats.min() as i64,
116+
max: *stats.max() as i64,
117+
}),
118+
Statistics::Int64(stats) => TypedStatistics::Int(Int64Type {
119+
min: *stats.min(),
120+
max: *stats.max(),
121+
}),
122+
Statistics::Int96(stats) => TypedStatistics::Int(Int64Type {
123+
min: stats.min().to_i64(),
124+
max: stats.max().to_i64(),
125+
}),
126+
Statistics::Float(stats) => TypedStatistics::Float(Float64Type {
127+
min: *stats.min() as f64,
128+
max: *stats.max() as f64,
129+
}),
130+
Statistics::Double(stats) => TypedStatistics::Float(Float64Type {
131+
min: *stats.min(),
132+
max: *stats.max(),
133+
}),
134+
Statistics::ByteArray(stats) => TypedStatistics::String(Utf8Type {
135+
min: stats.min().as_utf8()?.to_owned(),
136+
max: stats.max().as_utf8()?.to_owned(),
137+
}),
138+
Statistics::FixedLenByteArray(stats) => TypedStatistics::String(Utf8Type {
139+
min: stats.min().as_utf8()?.to_owned(),
140+
max: stats.max().as_utf8()?.to_owned(),
141+
}),
142+
};
143+
144+
Ok(res)
145+
}
146+
}

0 commit comments

Comments
 (0)