Skip to content

Commit 35ac3b3

Browse files
committed
adapter: Fix ExecuteContextExtra drop panics from connection drops
Fixes MaterializeInc/database-issues#7304
1 parent 49f79b0 commit 35ac3b3

File tree

3 files changed

+49
-27
lines changed

3 files changed

+49
-27
lines changed

src/adapter/src/coord.rs

Lines changed: 43 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,7 @@ use mz_ore::task::{JoinHandle, spawn};
127127
use mz_ore::thread::JoinHandleExt;
128128
use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
129129
use mz_ore::url::SensitiveUrl;
130-
use mz_ore::{
131-
assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, soft_panic_or_log, stack,
132-
};
130+
use mz_ore::{assert_none, instrument, soft_assert_eq_or_log, soft_assert_or_log, stack};
133131
use mz_persist_client::PersistClient;
134132
use mz_persist_client::batch::ProtoBatch;
135133
use mz_persist_client::usage::{ShardsUsageReferenced, StorageUsageClient};
@@ -1315,48 +1313,65 @@ impl PendingRead {
13151313
/// is intended for use by code that invokes the execution processing flow
13161314
/// (i.e., `sequence_plan`) without actually being a statement execution.
13171315
///
1318-
/// This struct must not be dropped if it contains non-trivial
1319-
/// state. The only valid way to get rid of it is to pass it to the
1320-
/// coordinator for retirement. To enforce this, we assert in the
1321-
/// `Drop` implementation.
1316+
/// If this struct is dropped with Some `statement_uuid`, the `Drop`
1317+
/// implementation will automatically send a `Message::RetireExecute` to log
1318+
/// the statement ending (with `Canceled` reason). This handles cases like
1319+
/// connection drops where the context cannot be explicitly retired.
1320+
/// See <https://github.com/MaterializeInc/database-issues/issues/7304>
13221321
#[derive(Debug, Default)]
13231322
#[must_use]
13241323
pub struct ExecuteContextExtra {
13251324
statement_uuid: Option<StatementLoggingId>,
1325+
/// Channel for sending messages to the coordinator. Used for auto-retiring on drop.
1326+
/// If `None`, dropping will not send a retire message (this can happen for `Default` instances).
1327+
coordinator_tx: Option<mpsc::UnboundedSender<Message>>,
13261328
}
13271329

13281330
impl ExecuteContextExtra {
1329-
pub(crate) fn new(statement_uuid: Option<StatementLoggingId>) -> Self {
1330-
Self { statement_uuid }
1331+
pub(crate) fn new(
1332+
statement_uuid: Option<StatementLoggingId>,
1333+
coordinator_tx: mpsc::UnboundedSender<Message>,
1334+
) -> Self {
1335+
Self {
1336+
statement_uuid,
1337+
coordinator_tx: Some(coordinator_tx),
1338+
}
13311339
}
13321340
pub fn is_trivial(&self) -> bool {
1333-
let Self { statement_uuid } = self;
1334-
statement_uuid.is_none()
1341+
self.statement_uuid.is_none()
13351342
}
13361343
pub fn contents(&self) -> Option<StatementLoggingId> {
1337-
let Self { statement_uuid } = self;
1338-
*statement_uuid
1344+
self.statement_uuid
13391345
}
13401346
/// Take responsibility for the contents. This should only be
13411347
/// called from code that knows what to do to finish up logging
13421348
/// based on the inner value.
13431349
#[must_use]
13441350
pub(crate) fn retire(mut self) -> Option<StatementLoggingId> {
1345-
let Self { statement_uuid } = &mut self;
1346-
statement_uuid.take()
1351+
self.coordinator_tx = None;
1352+
self.statement_uuid.take()
13471353
}
13481354
}
13491355

13501356
impl Drop for ExecuteContextExtra {
13511357
fn drop(&mut self) {
1352-
let Self { statement_uuid } = &*self;
1353-
if let Some(statement_uuid) = statement_uuid {
1354-
// Note: the impact when this error hits
1355-
// is that the statement will never be marked
1356-
// as finished in the statement log.
1357-
soft_panic_or_log!(
1358-
"execute context for statement {statement_uuid:?} dropped without being properly retired."
1359-
);
1358+
if let Some(statement_uuid) = self.statement_uuid.take() {
1359+
if let Some(tx) = &self.coordinator_tx {
1360+
// Auto-retire with Canceled reason since the context was dropped
1361+
// without explicit retirement (likely due to connection drop).
1362+
let msg = Message::RetireExecute {
1363+
data: ExecuteContextExtra {
1364+
statement_uuid: Some(statement_uuid),
1365+
coordinator_tx: None, // Prevent recursion
1366+
},
1367+
otel_ctx: OpenTelemetryContext::obtain(),
1368+
reason: StatementEndedExecutionReason::Canceled,
1369+
};
1370+
let _ = tx.send(msg);
1371+
}
1372+
// If no channel is available (e.g., Default instance), we silently
1373+
// drop. This is acceptable because Default instances should only be
1374+
// used for non-logged statements.
13601375
}
13611376
}
13621377
}
@@ -4312,6 +4327,10 @@ pub fn serve(
43124327

43134328
let (group_commit_tx, group_commit_rx) = appends::notifier();
43144329

4330+
// Clone cmd_tx before moving it into the coordinator thread closure,
4331+
// so it can be used for Client::new() outside the closure.
4332+
let cmd_tx_for_client = cmd_tx.clone();
4333+
43154334
let parent_span = tracing::Span::current();
43164335
let thread = thread::Builder::new()
43174336
// The Coordinator thread tends to keep a lot of data on its stack. To
@@ -4456,7 +4475,7 @@ pub fn serve(
44564475
};
44574476
let client = Client::new(
44584477
build_info,
4459-
cmd_tx.clone(),
4478+
cmd_tx_for_client,
44604479
metrics_clone,
44614480
now,
44624481
environment_id,

src/adapter/src/coord/command_handler.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ impl Coordinator {
948948
lifecycle_timestamps,
949949
);
950950

951-
ExecuteContextExtra::new(maybe_uuid)
951+
ExecuteContextExtra::new(maybe_uuid, self.internal_cmd_tx.clone())
952952
};
953953
let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
954954
(stmt, ctx, params)
@@ -1934,7 +1934,10 @@ impl Coordinator {
19341934
conn_id: conn_id.clone(),
19351935
cluster_id,
19361936
depends_on,
1937-
ctx_extra: ExecuteContextExtra::new(statement_logging_id),
1937+
ctx_extra: ExecuteContextExtra::new(
1938+
statement_logging_id,
1939+
self.internal_cmd_tx.clone(),
1940+
),
19381941
is_fast_path,
19391942
},
19401943
);

src/adapter/src/coord/peek.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1243,7 +1243,7 @@ impl crate::coord::Coordinator {
12431243
// relevant parts of the old `implement_peek_plan` into this method, and remove the old
12441244
// `implement_peek_plan`.
12451245
self.implement_peek_plan(
1246-
&mut ExecuteContextExtra::new(statement_logging_id),
1246+
&mut ExecuteContextExtra::new(statement_logging_id, self.internal_cmd_tx.clone()),
12471247
planned_peek,
12481248
finishing,
12491249
compute_instance,

0 commit comments

Comments
 (0)