Skip to content

Comments

[Ingestion] Fix stale session handles causing permanent SessionClosed errors#4433

Open
muhamadazmy wants to merge 1 commit intorestatedev:mainfrom
muhamadazmy:pr4433
Open

[Ingestion] Fix stale session handles causing permanent SessionClosed errors#4433
muhamadazmy wants to merge 1 commit intorestatedev:mainfrom
muhamadazmy:pr4433

Conversation

@muhamadazmy
Copy link
Contributor

[Ingestion] Fix stale session handles causing permanent SessionClosed errors

SessionManagerInner caches SessionHandles in a DashMap that is never
cleaned up. If a PartitionSession background task terminates (e.g. due
to ConnectError::Shutdown or cancellation), the cached handle becomes
permanently stale — every subsequent ingest() call for that partition
returns SessionClosed forever.

Fix this by checking if the cached handle is closed (via the underlying
mpsc sender) before returning it from SessionManagerInner::get(). When
a stale handle is detected, it is evicted from the cache and a fresh
session is started transparently.

/// It guarantees that only one session is started per partition id.
pub fn get(&self, id: PartitionId) -> SessionHandle {
// Evict stale handle if the underlying session has terminated.
if let Some(handle) = self.handles.get(&id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a drop implementation that auto-remove the handle from the map be a reliable alternative here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a change that just uses self.handlers.remove_if().

Was also thinking of auto removing this when the session task is dropped or exited, but it felt that doing this proactively is easier.

I am reconsidering now after your comment :)

@muhamadazmy muhamadazmy force-pushed the pr4433 branch 2 times, most recently from 5b9b7f8 to 2c0a1f4 Compare February 23, 2026 19:35
… errors

SessionManagerInner caches SessionHandles in a DashMap that is never
cleaned up. If a PartitionSession background task terminates (e.g. due
to ConnectError::Shutdown or cancellation), the cached handle becomes
permanently stale — every subsequent ingest() call for that partition
returns SessionClosed forever.

Fix this by checking if the cached handle is closed (via the underlying
mpsc sender) before returning it from SessionManagerInner::get(). When
a stale handle is detected, it is evicted from the cache and a fresh
session is started transparently.

impl<T> Drop for PartitionSession<T> {
fn drop(&mut self) {
self.manager.handles.remove(&self.partition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible for multiple sessions to exist for the same key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that should not happen. The ingestion client opens the sessions re-actively when they are first needed. The session should never terminate (it can reconnect internally) but should remain running until the client is closed by the .close() function.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants