Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 0.3.0

- **Breaking**: Prefix generated struct with `Compressed` for clarity
- **Breaking**: Skip time range filter in `decompress` when used by `delete`
- This prevents data loss when compacting data into a single row to improve compression
- Add documentation to the generated code
Expand Down
35 changes: 20 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@ To see the generated code, look in [tests/expand](tests/expand) or run `cargo ex
## Supported data types

- pco supports `u16`, `u32`, `u64`, `i16`, `i32`, `i64`, `f16`, `f32`, `f64`
- pco_store adds support for `SystemTime`, `bool`
- pco_store adds support for `SystemTime` (mapped to ???), `bool` (mapped to ???)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • pco_store adds support for SystemTime (stored as i64), bool (stored as u16)


## Performance

Numeric compression algorithms take advantage of the mathematic relationships between a series of numbers to compress them to a higher degree than binary compression can. Of the numeric compression algorithms available in Rust, pco achieves both the best compression ratio and the best round-trip read and write time.
Numeric compression algorithms take advantage of the mathematic relationships between a series of numbers to compress them to a higher degree than binary compression can. Of the numeric compression algorithms available in Rust, in our tests pco achieves both the best compression ratio and the best round-trip read and write time.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might read better:

In our tests of the numeric compression algorithms available in Rust, pco achieves both the best compression ratio and the best round-trip read and write time


Compared to Postgres array data types, pco_store improves the compression ratio by 2x and improves read and write time by 5x in the included [benchmarks](benches). Better compression ratios can be expected with larger datasets.
Compared to Postgres array data types compressed with pglz, pco_store improves the compression ratio by 2x and improves read and write time by 5x in the included [benchmarks](benches). Better compression ratios can be expected with larger datasets.

## Usage

The `pco_store::store` procedural macro accepts these arguments:

- `timestamp` accepts the field name for a timestamp in the struct. Timestamps are internally stored as an `i64` microsecond offset from the Unix epoch. This adds `start_at` and `end_at` timestamp columns to the resulting table. A composite index should cover `start_at` and `end_at`.
- `group_by` accepts one or more field names that are stored as uncompressed fields on the Postgres table that all other fields are grouped by. The fields are added as `load` filters, and `store` automatically groups the input data by them. A composite index should cover these fields.
- `float_round` sets the number of fractional decimal points to retain for float values. This helps improve the compression ratio when you don't need the full precision of the source data. Internally this stores the values as `i64`, with the fractional precision retained by multiplying by 10^N at write time, and then at read time casting to float and dividing by 10^N. Users should confirm that the generated integer values won't overflow past `i64::MAX`.
- `table_name` overrides the Postgres table name. By default it underscores and pluralizes the struct name, so `QueryStat` becomes `query_stats`.
- `timestamp` accepts the field name for a timestamp in the struct. Timestamps are internally stored as an `i64` microsecond offset from the Unix epoch. This requires `start_at` and `end_at` timestamp columns on the underlying table. A composite index should cover `start_at` and `end_at`.
- `group_by` accepts one or more field names that are stored as uncompressed fields on the Postgres table that all other fields are grouped by. The fields are required for `load`, and `store` automatically groups the input data by them. A composite index should cover these fields.
- `float_round` sets the number of fractional decimal points to retain for float values. This helps improve the compression ratio when you don't need the full precision of the source data. Internally this stores the values as `i64`, with the fractional precision retained by multiplying by 10^N at write time, and then at read time casting to float and dividing by 10^N. Users should confirm that the generated integer values won't overflow past `i64::MAX` (larger values will wrap around and become negative).
- `table_name` overrides the Postgres table name that is used. By default it underscores and pluralizes the struct name, so `QueryStat` becomes `query_stats`.

Additional notes:

Expand Down Expand Up @@ -67,9 +67,9 @@ CREATE INDEX ON query_stats USING btree (database_id);
CREATE INDEX ON query_stats USING btree (end_at, start_at);
```

`STORAGE EXTERNAL` is set so that Postgres doesn't try to compress the already-compressed fields
The pco-compressed columns are expected to typically be in Postgres [TOAST](https://www.postgresql.org/docs/current/storage-toast.html). Using `STORAGE EXTERNAL` is recommended so that Postgres doesn't try to compress the already-compressed fields, speeding up writes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the TOAST sentence should be removed because it's just explaining how Postgres works, and pco_store would work whether or not TOAST existed in its current form.

I'd also reword the existing sentence:

STORAGE EXTERNAL is recommended to improve write times by skipping the default Postgres compression step.


This uses a `(end_at, start_at)` index because it's more selective than `(start_at, end_at)` for common use cases. For example when loading the last week of stats, the `end_at` filter is what's doing the work to filter out rows.
Its recommended to index `(end_at, start_at)`, because it's more selective than `(start_at, end_at)` for common use cases. For example when loading the last week of stats, the `end_at` filter is what's doing the work to filter out rows.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: It's. I'm also not sure the added comma makes sense

```sql
end_at >= now() - interval '7 days' AND start_at <= now()
```
Expand All @@ -85,13 +85,13 @@ async fn example() -> anyhow::Result<()> {

// Write
let stats = vec![QueryStat { database_id, collected_at: end - Duration::from_secs(120), fingerprint: 1, calls: 1, total_time: 1.0 }];
QueryStats::store(db, stats).await?;
CompressedQueryStats::store(db, stats).await?;
let stats = vec![QueryStat { database_id, collected_at: end - Duration::from_secs(60), fingerprint: 1, calls: 1, total_time: 1.0 }];
QueryStats::store(db, stats).await?;
CompressedQueryStats::store(db, stats).await?;

// Read
let mut calls = 0;
for group in QueryStats::load(db, &[database_id], start, end).await? {
for group in CompressedQueryStats::load(db, &[database_id], start, end).await? {
for stat in group.decompress()? {
calls += stat.calls;
}
Expand All @@ -106,16 +106,16 @@ async fn example() -> anyhow::Result<()> {
assert_eq!(2, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
transaction!(db, {
let mut stats = Vec::new();
for group in QueryStats::delete(db, &[database_id], start, end).await? {
for group in CompressedQueryStats::delete(db, &[database_id], start, end).await? {
for stat in group.decompress()? {
stats.push(stat);
}
}
assert_eq!(0, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
QueryStats::store(db, stats).await?;
CompressedQueryStats::store(db, stats).await?;
});
assert_eq!(1, db.query_one("SELECT count(*) FROM query_stats", &[]).await?.get::<_, i64>(0));
let group = QueryStats::load(db, &[database_id], start, end).await?.remove(0);
let group = CompressedQueryStats::load(db, &[database_id], start, end).await?.remove(0);
assert_eq!(group.start_at, end - Duration::from_secs(120));
assert_eq!(group.end_at, end - Duration::from_secs(60));
let stats = group.decompress()?;
Expand Down Expand Up @@ -181,3 +181,8 @@ These crates also implement numeric compression:
[stream-vbyte]: https://crates.io/crates/stream-vbyte
[bitpacking]: https://crates.io/crates/bitpacking
[tsz-compress]: https://crates.io/crates/tsz-compress

## License

Licensed under the MIT license, see LICENSE file for details.
Copyright (c) 2025, Duboce Labs, Inc. (pganalyze) <team@pganalyze.com>
8 changes: 4 additions & 4 deletions benches/bucket_size/one_day.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn store() -> Result<()> {
pub async fn load() -> Result<()> {
let db = &DB_POOL.get().await.unwrap();
let mut stats = Vec::new();
for group in QueryStats::load(db).await? {
for group in CompressedQueryStats::load(db).await? {
for stat in group.decompress() {
stats.push(stat);
}
Expand All @@ -123,7 +123,7 @@ pub async fn load() -> Result<()> {
pub shared_blks_hit: i64,
pub shared_blks_read: i64,
}
pub struct QueryStats {
pub struct CompressedQueryStats {
database_id: i64,
start_at: SystemTime,
collected_at: Vec<u8>,
Expand All @@ -137,15 +137,15 @@ pub async fn load() -> Result<()> {
shared_blks_hit: Vec<u8>,
shared_blks_read: Vec<u8>,
}
impl QueryStats {
impl CompressedQueryStats {
pub async fn load(db: &Client) -> Result<Vec<Self>> {
let sql = "
SELECT database_id, start_at, collected_at, collected_secs, fingerprint, postgres_role_id, calls, rows, total_time, io_time, shared_blks_hit, shared_blks_read
FROM bucket_size_one_day
";
let mut results = Vec::new();
for row in db.query(&db.prepare_cached(sql).await?, &[]).await? {
results.push(QueryStats {
results.push(Self {
database_id: row.get(0),
start_at: row.get(1),
collected_at: row.get(2),
Expand Down
8 changes: 4 additions & 4 deletions benches/bucket_size/ten_minute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ pub async fn store() -> Result<()> {
pub async fn load() -> Result<()> {
let db = &DB_POOL.get().await.unwrap();
let mut stats = Vec::new();
for group in QueryStats::load(db).await? {
for group in CompressedQueryStats::load(db).await? {
for stat in group.decompress() {
stats.push(stat);
}
Expand All @@ -105,7 +105,7 @@ pub async fn load() -> Result<()> {
pub shared_blks_hit: i64,
pub shared_blks_read: i64,
}
pub struct QueryStats {
pub struct CompressedQueryStats {
database_id: i64,
start_at: SystemTime,
collected_at: Vec<u8>,
Expand All @@ -119,15 +119,15 @@ pub async fn load() -> Result<()> {
shared_blks_hit: Vec<u8>,
shared_blks_read: Vec<u8>,
}
impl QueryStats {
impl CompressedQueryStats {
pub async fn load(db: &Client) -> Result<Vec<Self>> {
let sql = "
SELECT database_id, start_at, collected_at, collected_secs, fingerprint, postgres_role_id, calls, rows, total_time, io_time, shared_blks_hit, shared_blks_read
FROM bucket_size_ten_minute
";
let mut results = Vec::new();
for row in db.query(&db.prepare_cached(sql).await?, &[]).await? {
results.push(QueryStats {
results.push(Self {
database_id: row.get(0),
start_at: row.get(1),
collected_at: row.get(2),
Expand Down
8 changes: 4 additions & 4 deletions benches/comparison/pco.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub async fn store() -> Result<()> {
pub async fn load() -> Result<()> {
let db = &DB_POOL.get().await.unwrap();
let mut stats = Vec::new();
for group in QueryStats::load(db).await? {
for group in CompressedQueryStats::load(db).await? {
for stat in group.decompress() {
stats.push(stat);
}
Expand All @@ -125,7 +125,7 @@ pub async fn load() -> Result<()> {
pub shared_blks_hit: i64,
pub shared_blks_read: i64,
}
pub struct QueryStats {
pub struct CompressedQueryStats {
database_id: i64,
start_at: SystemTime,
collected_at: Vec<u8>,
Expand All @@ -139,15 +139,15 @@ pub async fn load() -> Result<()> {
shared_blks_hit: Vec<u8>,
shared_blks_read: Vec<u8>,
}
impl QueryStats {
impl CompressedQueryStats {
pub async fn load(db: &Client) -> Result<Vec<Self>> {
let sql = "
SELECT database_id, start_at, collected_at, collected_secs, fingerprint, postgres_role_id, calls, rows, total_time, io_time, shared_blks_hit, shared_blks_read
FROM comparison_pco
";
let mut results = Vec::new();
for row in db.query(&db.prepare_cached(sql).await?, &[]).await? {
results.push(QueryStats {
results.push(Self {
database_id: row.get(0),
start_at: row.get(1),
collected_at: row.get(2),
Expand Down
4 changes: 2 additions & 2 deletions benches/comparison/pco_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub async fn store() -> Result<()> {
});
}
}
QueryStats::store(db, stats).await?;
CompressedQueryStats::store(db, stats).await?;
}
}
Ok(())
Expand All @@ -90,7 +90,7 @@ pub async fn load() -> Result<()> {
let db = &DB_POOL.get().await.unwrap();
let database_ids: Vec<i64> = db.query_one("SELECT array_agg(DISTINCT database_id) FROM comparison_pco_stores", &[]).await?.get(0);
let mut stats = Vec::new();
for group in QueryStats::load(db, &database_ids, SystemTime::UNIX_EPOCH, SystemTime::now()).await? {
for group in CompressedQueryStats::load(db, &database_ids, SystemTime::UNIX_EPOCH, SystemTime::now()).await? {
for stat in group.decompress()? {
stats.push(stat);
}
Expand Down
8 changes: 4 additions & 4 deletions benches/float/mult.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub async fn store(mult: u8) -> Result<()> {
pub async fn load(mult: u8) -> Result<()> {
let db = &DB_POOL.get().await.unwrap();
let mut stats = Vec::new();
for group in QueryStats::load(db, mult).await? {
for group in CompressedQueryStats::load(db, mult).await? {
for stat in group.decompress() {
stats.push(stat);
}
Expand All @@ -129,7 +129,7 @@ pub async fn load(mult: u8) -> Result<()> {
pub shared_blks_hit: i64,
pub shared_blks_read: i64,
}
pub struct QueryStats {
pub struct CompressedQueryStats {
database_id: i64,
start_at: SystemTime,
collected_at: Vec<u8>,
Expand All @@ -143,15 +143,15 @@ pub async fn load(mult: u8) -> Result<()> {
shared_blks_hit: Vec<u8>,
shared_blks_read: Vec<u8>,
}
impl QueryStats {
impl CompressedQueryStats {
pub async fn load(db: &Client, mult: u8) -> Result<Vec<Self>> {
let sql = &format!("
SELECT database_id, start_at, collected_at, collected_secs, fingerprint, postgres_role_id, calls, rows, total_time, io_time, shared_blks_hit, shared_blks_read
FROM float_mult_{mult}
");
let mut results = Vec::new();
for row in db.query(&db.prepare_cached(sql).await?, &[]).await? {
results.push(QueryStats {
results.push(Self {
database_id: row.get(0),
start_at: row.get(1),
collected_at: row.get(2),
Expand Down
8 changes: 4 additions & 4 deletions benches/float/round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn round(float: f64, decimals: u8) -> f64 {
pub async fn load(decimals: u8) -> Result<()> {
let db = &DB_POOL.get().await.unwrap();
let mut stats = Vec::new();
for group in QueryStats::load(db, decimals).await? {
for group in CompressedQueryStats::load(db, decimals).await? {
for stat in group.decompress() {
stats.push(stat);
}
Expand All @@ -135,7 +135,7 @@ pub async fn load(decimals: u8) -> Result<()> {
pub shared_blks_hit: i64,
pub shared_blks_read: i64,
}
pub struct QueryStats {
pub struct CompressedQueryStats {
database_id: i64,
start_at: SystemTime,
collected_at: Vec<u8>,
Expand All @@ -149,15 +149,15 @@ pub async fn load(decimals: u8) -> Result<()> {
shared_blks_hit: Vec<u8>,
shared_blks_read: Vec<u8>,
}
impl QueryStats {
impl CompressedQueryStats {
pub async fn load(db: &Client, decimals: u8) -> Result<Vec<Self>> {
let sql = &format!("
SELECT database_id, start_at, collected_at, collected_secs, fingerprint, postgres_role_id, calls, rows, total_time, io_time, shared_blks_hit, shared_blks_read
FROM float_round_{decimals}
");
let mut results = Vec::new();
for row in db.query(&db.prepare_cached(sql).await?, &[]).await? {
results.push(QueryStats {
results.push(Self {
database_id: row.get(0),
start_at: row.get(1),
collected_at: row.get(2),
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn store(args: TokenStream, item: TokenStream) -> TokenStream {
let model = parse_macro_input!(i as ItemStruct);
let item = proc_macro2::TokenStream::from(item);
let name = model.ident.clone();
let packed_name = Ident::new(&(model.ident.to_string() + "s"), Span::call_site());
let packed_name = Ident::new(&format!("Compressed{}s", model.ident), Span::call_site());

let table_name = if let Some(table_name) = table_name {
table_name.to_string()
Expand Down
12 changes: 6 additions & 6 deletions tests/expand/boolean.expanded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ pub struct QueryStat {
pub calls: i64,
}
/// Generated by pco_store to store and load compressed versions of [QueryStat]
pub struct QueryStats {
pub struct CompressedQueryStats {
pub database_id: i64,
toplevel: Vec<u8>,
calls: Vec<u8>,
}
impl QueryStats {
impl CompressedQueryStats {
/// Loads data for the specified filters.
///
/// For models with a timestamp, [decompress][Self::decompress] automatically filters out
/// rows outside the requested time range.
pub async fn load(
db: &deadpool_postgres::Object,
database_id: &[i64],
) -> anyhow::Result<Vec<QueryStats>> {
) -> anyhow::Result<Vec<CompressedQueryStats>> {
if database_id.is_empty() {
return ::anyhow::__private::Err({
use ::anyhow::__private::kind::*;
Expand All @@ -38,7 +38,7 @@ impl QueryStats {
let mut results = Vec::new();
for row in db.query(&db.prepare_cached(&sql).await?, &[&database_id]).await? {
results
.push(QueryStats {
.push(CompressedQueryStats {
database_id: row.get(0usize),
toplevel: row.get(1usize),
calls: row.get(2usize),
Expand All @@ -53,7 +53,7 @@ impl QueryStats {
pub async fn delete(
db: &deadpool_postgres::Object,
database_id: &[i64],
) -> anyhow::Result<Vec<QueryStats>> {
) -> anyhow::Result<Vec<CompressedQueryStats>> {
if database_id.is_empty() {
return ::anyhow::__private::Err({
use ::anyhow::__private::kind::*;
Expand All @@ -74,7 +74,7 @@ impl QueryStats {
let mut results = Vec::new();
for row in db.query(&db.prepare_cached(&sql).await?, &[&database_id]).await? {
results
.push(QueryStats {
.push(CompressedQueryStats {
database_id: row.get(0usize),
toplevel: row.get(1usize),
calls: row.get(2usize),
Expand Down
Loading