Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a543334
Implement determine_timestamp using constraints
frankmcsherry Jun 23, 2024
a6a9a25
Merge remote-tracking branch 'origin/main' into determine_timestamp_c…
DAlperin Mar 20, 2025
f31e1c2
Better logging
DAlperin Mar 20, 2025
36a5586
Fix minimization
DAlperin Mar 20, 2025
bf997b5
Fix strong session serializable timestamp selection logic in the cons…
DAlperin Mar 20, 2025
d51ef8d
Add EXPLAIN TIMESTAMP support
DAlperin Mar 24, 2025
e2e0d34
Improve output
DAlperin Mar 24, 2025
7b8baf3
remove unused import.
DAlperin Mar 24, 2025
5552fb4
update sql test to reflect new explain contents
DAlperin Mar 24, 2025
307843b
Fix testdrive
DAlperin Mar 24, 2025
20ce3a1
Don't add unneeded constraints
DAlperin Mar 24, 2025
284d2a4
fix tests
DAlperin Mar 24, 2025
dd20c58
fix more tests
DAlperin Mar 24, 2025
f087206
dyncfg
DAlperin Mar 24, 2025
a25c4e1
fix rust tests
DAlperin Mar 25, 2025
3534a30
Add metrics
DAlperin Mar 25, 2025
11aa520
unbreak
DAlperin Mar 25, 2025
9442655
fix timestamp selection test
DAlperin Mar 25, 2025
0438397
oops, return the right selection
DAlperin Mar 25, 2025
ee6888a
clippy
DAlperin Mar 25, 2025
84b6c5f
Fix materialized_views.py
DAlperin Mar 25, 2025
c028fcf
This test file is liable to be the death of me
DAlperin Mar 25, 2025
9531fea
Address notes from frank
DAlperin Mar 25, 2025
dc672a5
add copyright header
DAlperin Mar 25, 2025
0bcb8b0
fix test build
DAlperin Mar 25, 2025
7cb1536
oh fine
DAlperin Mar 25, 2025
cf406cc
truncate if needed
DAlperin Mar 26, 2025
08c5b16
Address feedback
DAlperin Mar 26, 2025
81f7c8c
I literally can't tell what this test wants anymore
DAlperin Mar 26, 2025
4b59ced
import default directly
DAlperin Mar 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -316,16 +316,16 @@ def validate(self) -> Testdrive:
> SHOW CREATE MATERIALIZED VIEW refresh_view_late_3
"materialize.public.refresh_view_late_3" "CREATE MATERIALIZED VIEW \\"materialize\\".\\"public\\".\\"refresh_view_late_3\\" IN CLUSTER \\"quickstart\\" WITH (REFRESH = AT <TIMESTAMP>::\\"mz_catalog\\".\\"mz_timestamp\\"::\\"pg_catalog\\".\\"text\\"::\\"pg_catalog\\".\\"int8\\" + 86400000) AS SELECT DISTINCT (\\"x\\") FROM \\"materialize\\".\\"public\\".\\"refresh_table\\""

$ set-regex match=(s\\d+|\\d{13}|[ ]{12}0|u\\d{1,3}|\\(\\d+-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d\\.\\d\\d\\d\\)) replacement=<>
$ set-regex match=(s\\d+|\\d{13}|[ ]{12}0|u\\d{1,3}|\\(\\d+-\\d\\d-\\d\\d\\s\\d\\d:\\d\\d:\\d\\d\\.\\d\\d\\d\\)|\\((\\d+)\\)) replacement=<>

> EXPLAIN TIMESTAMP FOR SELECT * FROM refresh_view_late_1
" query timestamp: <> <>\\n oracle read timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: false\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.refresh_view_late_1 (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n"
" query timestamp: <> <>\\n oracle read timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: false\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.refresh_view_late_1 (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n\\nbinding constraints:\\nlower:\\n (StorageInput([User<>])): [<> <>]\\n (ComputeInput([])): [<> <>]\\n"

> EXPLAIN TIMESTAMP FOR SELECT * FROM refresh_view_late_2
" query timestamp: <> <>\\n oracle read timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: false\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.refresh_view_late_2 (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n"
" query timestamp: <> <>\\n oracle read timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: false\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.refresh_view_late_2 (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n\\nbinding constraints:\\nlower:\\n (StorageInput([User<>)])): [<> <>]\\n (ComputeInput([])): [<> <>]\\n"

> EXPLAIN TIMESTAMP FOR SELECT * FROM refresh_view_late_3
" query timestamp: <> <>\\n oracle read timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: false\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.refresh_view_late_3 (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n"
" query timestamp: <> <>\\n oracle read timestamp: <> <>\\nlargest not in advance of upper: <> <>\\n upper:[<> <>]\\n since:[<> <>]\\n can respond immediately: false\\n timeline: Some(EpochMilliseconds)\\n session wall time: <> <>\\n\\nsource materialize.public.refresh_view_late_3 (<>, storage):\\n read frontier:[<> <>]\\n write frontier:[<> <>]\\n\\nbinding constraints:\\nlower:\\n (StorageInput([User<>])): [<> <>]\\n (ComputeInput([])): [<> <>]\\n"
"""
)
)
Expand Down
1 change: 1 addition & 0 deletions src/adapter-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mz-storage-types = { path = "../storage-types" }
serde = "1.0.219"
timely = "0.19.0"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
tracing = "0.1.37"

[package.metadata.cargo-udeps.ignore]
normal = ["workspace-hack"]
Expand Down
9 changes: 9 additions & 0 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::time::Duration;

use mz_dyncfg::{Config, ConfigSet};

use crate::timestamp_oracle::ConstraintBasedTimestampSelection;

pub const ALLOW_USER_SESSIONS: Config<bool> = Config::new(
"allow_user_sessions",
true,
Expand Down Expand Up @@ -116,6 +118,12 @@ pub const ENABLE_MULTI_REPLICA_SOURCES: Config<bool> = Config::new(
"Enable multi-replica sources.",
);

pub const CONSTRAINT_BASED_TIMESTAMP_SELECTION: Config<&'static str> = Config::new(
"constraint_based_timestamp_selection",
ConstraintBasedTimestampSelection::const_default().as_str(),
"Whether to use the constraint-based timestamp selection, one of: enabled, disabled, verify",
);

/// Adds the full set of all compute `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand All @@ -134,4 +142,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_CONTINUAL_TASK_BUILTINS)
.add(&ENABLE_EXPRESSION_CACHE)
.add(&ENABLE_MULTI_REPLICA_SOURCES)
.add(&CONSTRAINT_BASED_TIMESTAMP_SELECTION)
}
43 changes: 43 additions & 0 deletions src/adapter-types/src/timestamp_oracle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

use std::time::Duration;

use serde::{Deserialize, Serialize};
use tracing;

/// Default value for `DynamicConfig::pg_connection_pool_max_size`.
pub const DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE: usize = 50;

Expand All @@ -26,3 +29,43 @@ pub const DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT: Duration = Duration::from

/// Default value for `DynamicConfig::pg_connection_pool_tcp_user_timeout`.
pub const DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT: Duration = Duration::from_secs(30);

/// Whether to use the constraint-based timestamp selection.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum ConstraintBasedTimestampSelection {
Enabled,
Disabled,
Verify,
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the right file for this? It seems less like "timestamp oracle" material and more "timestamp selection" material, which uses the oracle as input, but uses various other things as input as well. Would timestamp_selection.rs be more appropriate, or is there a reason to put it here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is I misread it and thought it did say timestamp selection :) will fix


impl std::default::Default for ConstraintBasedTimestampSelection {
fn default() -> Self {
Self::Verify
}
}

impl ConstraintBasedTimestampSelection {
pub const fn const_default() -> Self {
Self::Verify
}

pub fn from_str(s: &str) -> Self {
match s {
"enabled" => Self::Enabled,
"disabled" => Self::Disabled,
"verify" => Self::Verify,
_ => {
tracing::error!("invalid value for ConstraintBasedTimestampSelection: {}", s);
ConstraintBasedTimestampSelection::default()
}
}
}

pub const fn as_str(&self) -> &'static str {
match self {
Self::Enabled => "enabled",
Self::Disabled => "disabled",
Self::Verify => "verify",
}
}
}
Loading