Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions RFC-0001.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
# **RFC1 for Presto**

## [Table Sampling in the Iceberg Connector]

Proposers

* @ZacBlanco

## [Related Issues]

- prestodb/presto#20937
- prestodb/presto#20993
- prestodb/presto#20776
- prestodb/presto#20720


## Summary

Collecting and keeping statistics up to date is costly. This proposal seeks to introduce the concept
of "stored samples" within the iceberg connector. Stored samples would allow statistics to be
derived for a table and its partitions by storing a small sample of rows alongside the true table.

## Background

Query optimizers use table statistics in order to make cost-based decisions at optimization time on
the output size of particular operators such as joins and filters. Having accurate statistics is
imperative for the optimizer to make the most correct query planning decisions. The downside is that
for large tables, such as those on the scale of tens to thousands of terabytes, scanning the entire
table for statistics is costly.

There are ways to reduce the cost of generating statistics. One such way is through table samples.
Table samples are a useful way to allow DBMS systems to calculate statistics without resorting to
full table scans every time the statistics need to be updated. A sample is essentially a subset of
rows from the original table. Each row from the original table should have equal likelihood of being
chosen to be in the sample to have the most representative sample[^4]. Recently, we've added support
for fixed-size bernoulli sampling with #21296.

In this issue I'm proposing we add a full set of features to support statistics via sampling in the
Iceberg connector. The details are described in the following sections

### [Optional] Goals

- Provide the optimizer with table statistics without having to resort to full table scans
- Provide a way to update statistics in a cost-effective manner in the face of updates.

### [Optional] Non-goals


## Proposed Implementation

The first section describes the general user experience. The following section describes some of the
implementation details.

### UX

- The overall usage would resemble the following steps:
- Introduce a boolean configuration property in the iceberg connector `iceberg.use_sample_tables` (to be used later)
- Iceberg users run a procedure which generates (but does not populate) a sample table in their directory existing table's structure. The generated sample table is also an iceberg table.
- ```SQL
CALL {catalog}.system.create_sample_table('{table}')
```
- If the iceberg table's location exists at `/warehouse/database/table`, then a new iceberg table is created at `/warehouse/database/table/samples`. Because iceberg tables only use files if they are specifically listed in a manifest, we can muck around with the directory structure around the table without incurring any changes in query behavior to the main table.
- Iceberg users populate sample tables with a specific query using the recently-added `reservoir_sample` function:
- ```sql
INSERT INTO "{schema}.{table}$samples"
SELECT reservoir_sample(...)
FROM {schema}.{table};
Comment on lines +65 to +67
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can reservoir_sample(...) use directly here? As described in PR #21296, reservoir_sample(...) will output a single row type with two columns.

```
- Currently, `reservoir_sample` needs to process every single row to guarantee a [bernoulli sample](https://en.wikipedia.org/wiki/Bernoulli_sampling). However, if a perfect sample is not a requirement, it may be desirable to generate the reservoir sample from an even smaller subset of data to prevent scanning a large amount of data. e.g.
- ```SQL
INSERT INTO "{schema}.{table}$samples"
SELECT reservoir_sample(...)
FROM {schema}.{table}$sample TABLESAMPLE SYSTEM 1
```
- This will have the effect of producing a sample with very little resources at the cost of accuracy. It may only be desirable for tables which might be extremely large (e.g. 1000s of TB)
- Users run `ANALYZE` command on all desired tables.
- The connector will selects the sample table handle rather than the true table handle when executing the `ANALYZE` query if `iceberg.use_sample_tables` is set to true.
- The resulting stats are then stored alongside the sample table, completely independent of the real table
- A new query using one or more of the iceberg tables comes in:
- ```SQL
SELECT ... FROM {table};
```
- when the optimizer tries to call `getTableStatistics` on `{table}` it will check `iceberg.use_sample_table` and if the sample table exists, it will attempt to read the statistics from the sample table rather than the real table.

### Implementation Details

There are four cases where we need to inject logic in the existing operation of the iceberg
connector in order to support sample tables. All other changes should logically separate.

1. Using the sample table handle when the user executes `ANALYZE` when the configuration is enabled.
2. Returning statistics from the sample when the optimizer calls to `getTableStatistics`
3. Generating splits from the sample table from the `IcebergSplitManager` when reading from the sample table.
4. Writing pages to the sample table when populating data within the `IcebergPageSinkProvider`

These should each be straightforward as it is just a matter of checking the feature flag of
`iceberg.use_sample_statistics` first and checking whether the sample table exists. If either of the
above checks return false, then the connector will continue using its existing logic.

The following sections flesh out a few more areas where additional details for the implementation
may be desirable.

#### Sample Table Handles

Within the Iceberg connector, there is an enum for `IcebergTableType`. We would add a new entry to
the enum named `SAMPLES`. A table handle could have this set as the table type. When generating
splits, inserting data, or retrieving statistics the table type on the handle should be checked, and
logic adjusted accordingly to utilize the iceberg table stored at the sample path, rather than the
true table.

One benefit of creating the sample tables as an iceberg table is that we can re-use the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you also use the file-format of the original table, or have decided on something else ? Would be good to clarify that as well.

infrastructure of the iceberg connector to read the tables. All that is necessary is to properly
pass a correctly constructued `IcebergTableHandle` and then pass it to the necessary functions.

#### Time Travel and Schema Updates

The iceberg connector supports time travel and schema updates to the columns and partitions of the
table. The sample table should undergo the same schema updates whenever the real table does. The
implementation should perform the same schema updates that the real table does.

In the implementation, this means that when the connector calls `Metadata#renameColumn`,
`dropColumn`, etc then the operation should run on both the real and sample table.

#### Correctly Generating Stats through ANALYZE

When using samples, generally all of the stats except for distinct values can map 1:1 onto the stats
for the full table. So, there needs to be some additional logic to estimate the number of distinct
values in the real dataset based on the sample[^2].

A recent PR, #20993, introduced the ability for connectors to specify the function used for a
specific column statistic aggregation. When analyzing a sample table, we can replace the
`approx_distinct` function with a separate function which uses one of the sample NDV estimators.
We'll then write the distinct values from the result of that function into the iceberg table.
Initially we can use the Chao Estimator[^1] as it's easy to implement, but can replace it with
different estimators as we see fit.

#### Statistics for Partitioned Tables

Currently, iceberg does not fully support support partition-level statistics[^3]. Once partitions
statistics are officially released, the iceberg connector should be updated to support collecting
and reading the partition-level stats. As long as the sample table creation code ensures the sample
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give a simple explanation of what the partition-level statistics are and why they are not affected by sample tables.

table's layout is the exact same as the true table we should not need to change the stats
calculation code path in the connector's implementation of `getTableStatistics`.

#### Sample Metadata

For external systems to determine when and how to update the sample, additional metadata may
need to be stored, such as the last snapshot ID of the table used to create or update the sample,
or the sampling method and/or source used.

As long as the metadata stored is limited to something small, say, less than 1KiB, then it is
probably fine to store this information inside of the iceberg's table properties which is a simple
`map<string, string>`.

#### Sample Maintenance

Sample maintenance can be performed through a set of SQL statements which replace records in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add an example for this?

sample with updates, deletes, or inserts that occur on the real table. With the introduction of
Iceberg's changelog table in #20937 we can efficiently update the sample after the table has been
updated without needing to re-sample the entire table.
Comment on lines +157 to +159
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg changelog table currently do not support V2 row level deletion. It seems to be a big problem for incremental maintenance.


Incremental sample maintenance will be out of scope for the main PR, but all of the infrastructure
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do better than this comment. We can say, with the help of an orchestration tool, sample maintenance using Presto or Spark is as simple as running the following queries on a fixed cadence according to their desire for up to date samples--right?

Copy link
Contributor

@tdcmeehan tdcmeehan Jun 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, once #12 is merged, perhaps we can add a new distributed procedure which updates the table samples? CC: @hantangwangd

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, once #12 is merged, perhaps we can add a new distributed procedure which updates the table samples?

Sure, in terms of feasibility, I think we can do this through a customized distributed procedure.

for sample maintenance is available already once the samples are generated. The biggest hurdle for
users will be that sample maintenance will need to be done manually. We can document the maintenance
process for samples, but there currently isn't a way to schedule or automatically run the correct
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically maintenance happens using an ETL tool or Kafka pipelines. We could investigate how to integrate with these. WxD has https://www.ibm.com/topics/data-pipeline these options.

set of maintenance queries for the sample tables.
Comment on lines +162 to +165
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides of incremental sample maintenance, should we describe the mechanism of sample table refreshment as well? How to refresh the sample table and what granularity of refreshment is supported.

In addition, should we maintain the snapshot version correspondence between the sample table and the source table? As we do analyzing on the sample table, meanwhile do querying on the source table. We need a way to find the nearest statistics on sample table for the specified snapshot version on source table.


Further discussion on sample maintenance can be done in another issue.

### Prototype

A prototype containing most of the implementation above is at https://github.com/prestodb/presto/pull/21955

## [Optional] Metrics

We should be able to see improvement in query plans and execution times without running full
table scans to generate the samples. Additionally, cost of maintaining statistics should decrease.
I would recommend measuring the amount of data scanned across multiple ANALYZE queries and compare
it to the amount of data scanned for sample creation and maintenance.

## [Optional] Other Approaches Considered

### Table-Level Storage in Puffin files

One alternative approach is to generate the samples and store them in Puffin files[^puffin_files].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better to write a technical doc in a third person tone.

Puffin files may store statistics or other arbitrary data at the table-level. This approach allows
us to store samples without having to generate a secondary table and still enables us to version the
samples because snapshots can have its own association with a specific puffin file. I like this
approach because it doesn't require us to "own" any kind of secondary table and would let other
systems read off the sample generated by presto. I do have a few large concerns noted below:

The first downside to this approach is that you lose fine granularity of control over the sample
because you can no longer treat it as a regular table. Having the sample table be a separate iceberg
table lets you control the sample at a fine granularity using SQL. You can
update/delete/insert/select specific rows from the sample or run queries against it. `ANALYZE` is
basically just a SQL query with many aggregate functions. It would require work to figure out how to
use the puffin file as a data source for the `ANALYZE` query.

Second, it isn't clear to me yet how exactly we would be able to from sample to a set of statistics.
To generate a puffin file blob with the sample, we would need to run the `reservoir_sample`
aggregation function. This means it would require updating the `TableStatisticType` enum to include
a new `SAMPLE` entry. Then, the connector could specify it supports samples. After specifying
support and running the `ANALYZE` aggregation query, we could then store the sample in the table.
However, the part that I don't have a clear answer to is how you would then run the `ANALYZE`
aggregation again on the sample to generate all the necessary column statistics, and where exactly
to store those.

My trouble with this is that I can't envision a clear path that would allow you to go from puffin
file blob to set of column statistics that are viable to use by the optimizer without having to
re-implement a lot of what `ANALYZE` already does.

## Adoption Plan

Users would need to opt-in to this feature by enabling a feature flag
`iceberg.use_sample_statistics` and the samples would only be used if a sample table exixts for the
given table.

## Test Plan

How do we ensure the feature works as expected? Mention if any functional tests/integration tests
are needed. Special mention for product-test changes. If any PoC has been done already, please
mention the relevant test results here that you think will bolster your case of getting this RFC
approved.

### References

[^1]: [Chao Estimator](https://www.jstor.org/stable/2290471)
[^2]: [Sample-Based NDV Estimation](https://vldb.org/conf/1995/P311.PDF)
[^3]: Iceberg Partition Stats Issue Tracker - Issue 8450
[^4]: [Bernoulli Sampling](https://en.wikipedia.org/wiki/Bernoulli_sampling)
[^puffin_files]: [Puffin Spec](https://iceberg.apache.org/puffin-spec/)