Skip to content

Commit 1da9c4d

Browse files
authored
Improve activity log throttling check (#34244)
Rather than throttle on the first Prepare+Bind+Execution execution, we throttle on all statement executions similar to the sampling check. Key differences being: - `throttled_count` column is no longer the number of unique prepared statements throttled before it. It now includes all throttled statement executions too (i.e. multiple Executes of a prepared statement). See commit messages for details. <!-- Describe the contents of the PR briefly but completely. If you write detailed commit messages, it is acceptable to copy/paste them here, or write "see commit messages for details." If there is only one commit in the PR, GitHub will have already added its commit message above. --> ### Motivation * This PR fixes a recognized bug. - [ ] MaterializeInc/database-issues#9922 - [ ] MaterializeInc/database-issues#9604 <!-- Which of the following best describes the motivation behind this PR? [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent 70852b4 commit 1da9c4d

File tree

5 files changed

+169
-67
lines changed

5 files changed

+169
-67
lines changed

doc/user/content/sql/system-catalog/mz_internal.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ granted the [`mz_monitor` role](/security/appendix/appendix-built-in-roles/#syst
8787
| `session_id` | [`uuid`] | An ID that is unique for each session. Corresponds to [mz_sessions.id](#mz_sessions). |
8888
| `prepared_at` | [`timestamp with time zone`] | The time at which the statement was prepared. |
8989
| `statement_type` | [`text`] | The _type_ of the statement, e.g. `select` for a `SELECT` query, or `NULL` if the statement was empty. |
90-
| `throttled_count` | [`uint8`] | The number of statements that were dropped due to throttling before the current one was seen. If you have a very high volume of queries and need to log them without throttling, [contact our team](/support/). |
90+
| `throttled_count` | [`uint8`] | The number of statement executions that were dropped due to throttling before the current one was seen. If you have a very high volume of queries and need to log them without throttling, [contact our team](/support/). |
9191
| `connected_at` | [`timestamp with time zone`] | The time at which the session was established. |
9292
| `initial_application_name` | [`text`] | The initial value of `application_name` at the beginning of the session. |
9393
| `authenticated_user` | [`text`] | The name of the user for which the session was established. |
@@ -726,21 +726,27 @@ have one or more corresponding executions in
726726
| `prepared_at` | [`timestamp with time zone`] | The time at which the statement was prepared. |
727727
-->
728728

729-
<!--
729+
730730
## `mz_session_history`
731731

732732
The `mz_session_history` table contains all the sessions that have
733733
been established in the last 30 days, or (even if older) that are
734734
referenced from
735-
[`mz_prepared_statement_history`](#mz_prepared_statement_history).
735+
[`mz_recent_activity_log`](#mz_recent_activity_log).
736+
737+
{{< warning >}}
738+
Do not rely on all sessions being logged in this view. Materialize
739+
controls the maximum rate at which statements are sampled, and may change
740+
this rate at any time.
741+
{{< /warning >}}
736742

743+
<!-- RELATION_SPEC mz_internal.mz_session_history -->
737744
| Field | Type | Meaning |
738745
|----------------------|------------------------------|-----------------------------------------------------------------------------------------------------------------------------------|
739746
| `session_id` | [`uuid`] | The globally unique ID of the session. Corresponds to [`mz_sessions.id`](#mz_sessions). |
740747
| `connected_at` | [`timestamp with time zone`] | The time at which the session was established. |
741-
| `application_name` | [`text`] | The `application_name` session metadata field. |
748+
| `initial_application_name` | [`text`] | The `application_name` session metadata field. |
742749
| `authenticated_user` | [`text`] | The name of the user for which the session was established. |
743-
-->
744750

745751
{{< if-unreleased "v0.113" >}}
746752
### `mz_recent_storage_usage`
@@ -1338,7 +1344,6 @@ The `mz_webhook_sources` table contains a row for each webhook source in the sys
13381344
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_prepared_statement_history -->
13391345
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_sql_text -->
13401346
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_recent_sql_text_redacted -->
1341-
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_session_history -->
13421347
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_all_objects -->
13431348
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_clusters -->
13441349
<!-- RELATION_SPEC_UNDOCUMENTED mz_internal.mz_show_cluster_replicas -->

src/adapter/src/coord/statement_logging.rs

Lines changed: 120 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -178,16 +178,15 @@ impl StatementLogging {
178178
}
179179

180180
/// Check if we need to drop a statement
181-
/// due to throttling, and update internal data structures appropriately.
181+
/// due to throttling, and update the number of available tokens appropriately.
182182
///
183-
/// Returns `None` if we must throttle this statement, and `Some(n)` otherwise, where `n`
184-
/// is the number of statements that were dropped due to throttling before this one.
183+
/// Returns `false` if we must throttle this statement, and `true` otherwise.
185184
fn throttling_check(
186185
&mut self,
187186
cost: u64,
188187
target_data_rate: u64,
189188
max_data_credit: Option<u64>,
190-
) -> Option<usize> {
189+
) -> bool {
191190
let ts = (self.now)() / 1000;
192191
// We use saturating_sub here because system time isn't monotonic, causing cases
193192
// when last_logged_ts_seconds is greater than ts.
@@ -202,14 +201,13 @@ impl StatementLogging {
202201
if let Some(remaining) = self.tokens.checked_sub(cost) {
203202
debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
204203
self.tokens = remaining;
205-
Some(std::mem::take(&mut self.throttled_count))
204+
true
206205
} else {
207206
debug!(
208207
"throttling check failed. tokens available: {}; cost: {cost}",
209208
self.tokens
210209
);
211-
self.throttled_count += 1;
212-
None
210+
false
213211
}
214212
}
215213
}
@@ -276,15 +274,25 @@ impl Coordinator {
276274
/// Check whether we need to do throttling (i.e., whether `STATEMENT_LOGGING_TARGET_DATA_RATE` is set).
277275
/// If so, actually do the check.
278276
///
279-
/// Returns `None` if we must throttle this statement, and `Some(n)` otherwise, where `n`
280-
/// is the number of statements that were dropped due to throttling before this one.
281-
fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize> {
277+
/// We expect `rows` to be the list of rows we intend to record and calculate the cost by summing the
278+
/// byte lengths of the rows.
279+
///
280+
/// Returns `false` if we must throttle this statement, and `true` otherwise.
281+
fn statement_logging_throttling_check<'a, I>(&mut self, rows: I) -> bool
282+
where
283+
I: IntoIterator<Item = Option<&'a Row>>,
284+
{
285+
let cost = rows
286+
.into_iter()
287+
.filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
288+
.fold(0_usize, |acc, x| acc.saturating_add(x));
289+
282290
let Some(target_data_rate) = self
283291
.catalog
284292
.system_config()
285293
.statement_logging_target_data_rate()
286294
else {
287-
return Some(std::mem::take(&mut self.statement_logging.throttled_count));
295+
return true;
288296
};
289297
let max_data_credit = self
290298
.catalog
@@ -297,25 +305,43 @@ impl Coordinator {
297305
)
298306
}
299307

308+
/// Marks a prepared statement as "already logged".
309+
/// Mutates the `PreparedStatementLoggingInfo` metadata.
310+
fn record_prepared_statement_as_logged(
311+
&self,
312+
uuid: Uuid,
313+
session: &mut Session,
314+
logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
315+
) {
316+
let logging = session.qcell_rw(&*logging);
317+
if let PreparedStatementLoggingInfo::StillToLog { .. } = logging {
318+
*logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
319+
}
320+
}
321+
300322
/// Returns any statement logging events needed for a particular
301323
/// prepared statement. Possibly mutates the `PreparedStatementLoggingInfo` metadata.
302324
///
303325
/// This function does not do a sampling check, and assumes we did so in a higher layer.
304326
///
305-
/// It _does_ do a throttling check, and returns `None` if we must not log due to throttling.
306-
pub(crate) fn log_prepared_statement(
307-
&mut self,
308-
session: &mut Session,
327+
///
328+
/// Returns A tuple containing:
329+
/// - `Option<(StatementPreparedRecord, PreparedStatementEvent)>`: If the prepared statement
330+
/// has not yet been logged, returns the prepared statement record, the packed row of the
331+
/// prepared statement record, and a row for the SQL text.
332+
/// - `Uuid`: The UUID of the prepared statement if the prepared statement has been logged
333+
pub(crate) fn get_prepared_statement_info(
334+
&self,
335+
session: &Session,
309336
logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
310-
) -> Option<(
337+
) -> (
311338
Option<(StatementPreparedRecord, PreparedStatementEvent)>,
312339
Uuid,
313-
)> {
314-
let logging = session.qcell_rw(&*logging);
315-
let mut out = None;
340+
) {
341+
let logging = session.qcell_ro(&*logging);
316342

317-
let uuid = match logging {
318-
PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
343+
match logging {
344+
PreparedStatementLoggingInfo::AlreadyLogged { uuid } => (None, *uuid),
319345
PreparedStatementLoggingInfo::StillToLog {
320346
sql,
321347
redacted_sql,
@@ -331,13 +357,11 @@ impl Coordinator {
331357
"accounting for logging should be done in `begin_statement_execution`"
332358
);
333359
let uuid = epoch_to_uuid_v7(prepared_at);
334-
let sql = std::mem::take(sql);
335-
let redacted_sql = std::mem::take(redacted_sql);
336360
let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
337361
let record = StatementPreparedRecord {
338362
id: uuid,
339363
sql_hash,
340-
name: std::mem::take(name),
364+
name: name.to_string(),
341365
session_id: *session_id,
342366
prepared_at: *prepared_at,
343367
kind: *kind,
@@ -357,22 +381,21 @@ impl Coordinator {
357381
Datum::String(redacted_sql.as_str()),
358382
]);
359383

360-
let cost = mpsh_packer.byte_len() + sql_row.byte_len();
361-
let throttled_count = self.statement_logging_throttling_check(cost)?;
384+
let throttled_count = self.statement_logging.throttled_count;
362385
mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit")));
363-
out = Some((
364-
record,
365-
PreparedStatementEvent {
366-
prepared_statement: mpsh_row,
367-
sql_text: sql_row,
368-
},
369-
));
370386

371-
*logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
372-
uuid
387+
(
388+
Some((
389+
record,
390+
PreparedStatementEvent {
391+
prepared_statement: mpsh_row,
392+
sql_text: sql_row,
393+
},
394+
)),
395+
uuid,
396+
)
373397
}
374-
};
375-
Some((out, uuid))
398+
}
376399
}
377400
/// The rate at which statement execution should be sampled.
378401
/// This is the value of the session var `statement_logging_sample_rate`,
@@ -700,6 +723,8 @@ impl Coordinator {
700723
distribution.sample(&mut rand::rng())
701724
};
702725

726+
// Figure out the cost of everything before we log.
727+
703728
// Track how many statements we're recording.
704729
let sampled_label = sample.then_some("true").unwrap_or("false");
705730
self.metrics
@@ -728,7 +753,8 @@ impl Coordinator {
728753
if !sample {
729754
return None;
730755
}
731-
let (ps_record, ps_uuid) = self.log_prepared_statement(session, logging)?;
756+
757+
let (maybe_ps, ps_uuid) = self.get_prepared_statement_info(session, logging);
732758

733759
let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
734760
lifecycle_timestamps.received
@@ -737,11 +763,6 @@ impl Coordinator {
737763
};
738764
let now = self.now();
739765
let execution_uuid = epoch_to_uuid_v7(&now);
740-
self.record_statement_lifecycle_event(
741-
&StatementLoggingId(execution_uuid),
742-
&StatementLifecycleEvent::ExecutionBegan,
743-
began_at,
744-
);
745766

746767
let params = std::iter::zip(params.execute_types.iter(), params.datums.iter())
747768
.map(|(r#type, datum)| {
@@ -786,26 +807,70 @@ impl Coordinator {
786807
.collect(),
787808
};
788809
let mseh_update = Self::pack_statement_began_execution_update(&record);
810+
811+
let (maybe_ps_event, maybe_sh_event) = if let Some((ps_record, ps_event)) = maybe_ps {
812+
if let Some(sh) = self
813+
.statement_logging
814+
.unlogged_sessions
815+
.get(&ps_record.session_id)
816+
{
817+
(
818+
Some(ps_event),
819+
Some((Self::pack_session_history_update(sh), ps_record.session_id)),
820+
)
821+
} else {
822+
(Some(ps_event), None)
823+
}
824+
} else {
825+
(None, None)
826+
};
827+
828+
let maybe_ps_prepared_statement = maybe_ps_event.as_ref().map(|e| &e.prepared_statement);
829+
let maybe_ps_sql_text = maybe_ps_event.as_ref().map(|e| &e.sql_text);
830+
831+
if !self.statement_logging_throttling_check([
832+
Some(&mseh_update),
833+
maybe_ps_prepared_statement,
834+
maybe_ps_sql_text,
835+
maybe_sh_event.as_ref().map(|(row, _)| row),
836+
]) {
837+
self.statement_logging.throttled_count += 1;
838+
return None;
839+
}
840+
// When we successfully log the first instance of a prepared statement
841+
// (i.e., it is not throttled), we also capture the number of previously
842+
// throttled statement executions in the builtin prepared statement history table above,
843+
// and then reset the throttled count for future tracking.
844+
else if let PreparedStatementLoggingInfo::StillToLog { .. } = session.qcell_ro(logging) {
845+
self.statement_logging.throttled_count = 0;
846+
}
847+
848+
self.record_prepared_statement_as_logged(ps_uuid, session, logging);
849+
850+
self.record_statement_lifecycle_event(
851+
&StatementLoggingId(execution_uuid),
852+
&StatementLifecycleEvent::ExecutionBegan,
853+
began_at,
854+
);
855+
789856
self.statement_logging
790857
.pending_statement_execution_events
791858
.push((mseh_update, Diff::ONE));
792859
self.statement_logging
793860
.executions_begun
794861
.insert(execution_uuid, record);
795-
if let Some((ps_record, ps_update)) = ps_record {
862+
863+
if let Some((sh_update, session_id)) = maybe_sh_event {
864+
self.statement_logging
865+
.pending_session_events
866+
.push(sh_update);
867+
// Mark the session as logged to avoid logging it again in the future
868+
self.statement_logging.unlogged_sessions.remove(&session_id);
869+
}
870+
if let Some(ps_event) = maybe_ps_event {
796871
self.statement_logging
797872
.pending_prepared_statement_events
798-
.push(ps_update);
799-
if let Some(sh) = self
800-
.statement_logging
801-
.unlogged_sessions
802-
.remove(&ps_record.session_id)
803-
{
804-
let sh_update = Self::pack_session_history_update(&sh);
805-
self.statement_logging
806-
.pending_session_events
807-
.push(sh_update);
808-
}
873+
.push(ps_event);
809874
}
810875
Some(StatementLoggingId(execution_uuid))
811876
}

src/adapter/src/session.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,10 @@ impl<T: TimestampManipulation> Session<T> {
269269
))
270270
}
271271

272+
pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
273+
self.qcell_owner.ro(&*cell)
274+
}
275+
272276
pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
273277
self.qcell_owner.rw(&*cell)
274278
}

0 commit comments

Comments
 (0)