Skip to content

Commit 3faf2a7

Browse files
authored
Merge pull request #150 from pomerium/kralicky/ssh-graceful-shutdown
This PR includes refactors to channel lifetime management, and new APIs to allow for graceful interrupt handling. Using a new ChannelControlAction type, the management server can preconfigure a ChannelData message to be sent to the downstream client in the event that the connection is interrupted. Currently the API is limited in what it allows the server to configure, but the underlying mechanism is much more flexible should we need to support more complex logic in the future. One major limitation of the existing code here was that it would treat a management server grpc disconnection as a connection level error, which would send an immediate DisconnectMsg to the downstream, which ignores all other channel state and forcibly ends the whole connection. Sending channel messages on an open channel, then relying on the Disconnect message to clean things up is rather unreliable for several reasons. Instead, we now are able to force a proper channel close handshake for any/all active channels, which will guarantee delivery of the configured interrupt channel data messages (as long as Envoy isn't killed during this, of course). This also allows us to handle server-wide errors, _and_ hook into the builtin envoy drain system to force graceful shutdown of all active connections if pomerium is shutting down. This is implemented using a new mechanism that allows Envoy to "preempt" an active channel, which kicks off the channel close handshake sequence. This sequence of messages is described in more detail in the code. There are several different states the pair of channels could be in when this happens, so the channel state tracking in ChannelIDManager was made more granular to be able to capture all the edge cases. Some other smaller changes include: - Simplified the Channel interface, and added a new `wire::ChannelMessage` type alias which contains only a subset of all message types. This is used in places where only channel-related messages would be received. - The logic for dealing with grpc errors/disconnects in HandoffChannel has been improved, to work properly with the graceful shutdown system. - Some connection service tests were split up for compile time reasons
2 parents 4e2fc39 + 88c915f commit 3faf2a7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3539
-794
lines changed

WORKSPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ http_archive(
5454
"//patches/envoy:0005-suppress-duplicate-wip-warnings.patch",
5555
"//patches/envoy:0006-coverage-format.patch",
5656
"//patches/envoy:0007-user-space-io-handle.patch",
57+
"//patches/envoy:0008-fake-upstream.patch",
5758
"//patches/envoy:tmp-transport-socket-options.patch",
5859
],
5960
sha256 = "bb111b2037e35d8732f12f003ccf82e0d09dfc8a8b7810e849eb081f36d50ddc",

api/extensions/filters/network/ssh/ssh.pb.go

Lines changed: 184 additions & 114 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/extensions/filters/network/ssh/ssh.proto

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,10 +343,20 @@ message SSHChannelControlAction {
343343
AllowResponse upstream_auth = 4;
344344
}
345345

346+
message InterruptOptions {
347+
// If set, Envoy will send a channel data message to the client just before disconnecting
348+
// with the specified contents.
349+
bytes send_channel_data = 1;
350+
}
351+
346352
oneof action {
347353
// HandOffUpstream instructs Envoy to end the "hijacked" internal stream, and connect the
348354
// downstream client to the real upstream server.
349355
HandOffUpstream hand_off = 2;
356+
357+
// Configures logic for handling cleanup or other finalization tasks if the connection
358+
// is interrupted.
359+
InterruptOptions set_interrupt_options = 3;
350360
}
351361
}
352362

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc
2+
index 559484e147..fc2d34f653 100644
3+
--- a/test/integration/fake_upstream.cc
4+
+++ b/test/integration/fake_upstream.cc
5+
@@ -681,7 +681,8 @@ FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transp
6+
http3_options_(config.http3_options_), quic_options_(config.quic_options_),
7+
socket_(Network::SocketSharedPtr(listen_socket.release())),
8+
api_(Api::createApiForTest(stats_store_)), time_system_(config.time_system_),
9+
- dispatcher_(api_->allocateDispatcher("fake_upstream")),
10+
+ dispatcher_(api_->allocateDispatcher(config.dispatcher_name_ != "" ? config.dispatcher_name_
11+
+ : "fake_upstream")),
12+
handler_(new Server::ConnectionHandlerImpl(*dispatcher_, 0)), config_(config),
13+
read_disable_on_new_connection_(true), enable_half_close_(config.enable_half_close_),
14+
listener_(*this, http_type_ == Http::CodecType::HTTP3),
15+
diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h
16+
index 42799230cc..382c147ffe 100644
17+
--- a/test/integration/fake_upstream.h
18+
+++ b/test/integration/fake_upstream.h
19+
@@ -716,6 +716,7 @@ struct FakeUpstreamConfig {
20+
uint32_t max_request_headers_count_ = Http::DEFAULT_MAX_HEADERS_COUNT;
21+
envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
22+
headers_with_underscores_action_ = envoy::config::core::v3::HttpProtocolOptions::ALLOW;
23+
+ std::string dispatcher_name_;
24+
};
25+
26+
/**
27+
@@ -765,6 +766,10 @@ public:
28+
ABSL_MUST_USE_RESULT
29+
testing::AssertionResult assertPendingConnectionsEmpty();
30+
31+
+ bool hasNewConnections() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
32+
+ return !new_connections_.empty();
33+
+ }
34+
+
35+
ABSL_MUST_USE_RESULT
36+
testing::AssertionResult
37+
waitForRawConnection(FakeRawConnectionPtr& connection,
38+
@@ -844,9 +849,13 @@ public:
39+
const envoy::config::core::v3::Http3ProtocolOptions& http3Options() { return http3_options_; }
40+
41+
Event::DispatcherPtr& dispatcher() { return dispatcher_; }
42+
- absl::Mutex& lock() { return lock_; }
43+
+ absl::Mutex& lock() ABSL_LOCK_RETURNED(lock_) { return lock_; }
44+
45+
void runOnDispatcherThread(std::function<void()> cb);
46+
+ AssertionResult
47+
+ runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
48+
+ std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
49+
+ SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_);
50+
51+
protected:
52+
const FakeUpstreamConfig& config() const { return config_; }
53+
@@ -993,11 +1002,7 @@ private:
54+
};
55+
56+
void threadRoutine();
57+
- SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_);
58+
Network::FilterStatus onRecvDatagram(Network::UdpRecvData& data);
59+
- AssertionResult
60+
- runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
61+
- std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
62+
63+
const envoy::config::core::v3::Http2ProtocolOptions http2_options_;
64+
const envoy::config::core::v3::Http3ProtocolOptions http3_options_;
65+
diff --git a/test/integration/server.cc b/test/integration/server.cc
66+
index 3a75dba9d0..cc5239a4e2 100644
67+
--- a/test/integration/server.cc
68+
+++ b/test/integration/server.cc
69+
@@ -282,6 +282,9 @@ void IntegrationTestServerImpl::createAndRunEnvoyServer(
70+
server.run();
71+
}
72+
server_gone_.Notify();
73+
+ server_ = nullptr;
74+
+ admin_address_ = nullptr;
75+
+ stat_store_ = nullptr;
76+
}
77+
78+
IntegrationTestServerImpl::~IntegrationTestServerImpl() {

source/common/type_traits.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
#include "source/common/optref.h"
44
#include <source_location>
5+
#include <type_traits>
56

67
// Supplemental type traits
78

@@ -187,6 +188,21 @@ constexpr bool all_types_equal = all_types_equal_to<Rest...>;
187188
template <typename T, typename... Ts>
188189
constexpr bool contains_type = index_of_type<T, Ts...>::found;
189190

191+
template <typename T, typename U>
192+
struct strict_subset : std::false_type {};
193+
194+
template <typename... Ts, typename... Us>
195+
requires (sizeof...(Us) > 0 &&
196+
sizeof...(Us) < sizeof...(Ts) &&
197+
(contains_type<Us, Ts...> && ...))
198+
struct strict_subset<std::tuple<Ts...>, std::tuple<Us...>>
199+
: std::true_type {};
200+
201+
// strict_subset_v is true if T and U are tuples, and the types in U are a strict subset of the
202+
// types in T, otherwise false. The tuples should not contain duplicate types.
203+
template <typename T, typename U>
204+
constexpr bool strict_subset_v = strict_subset<T, U>::value;
205+
190206
// is_vector<T> is true if T is a vector of any type, otherwise false.
191207
template <typename T>
192208
struct is_vector : std::false_type {};

source/extensions/filters/network/ssh/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ licenses(["notice"])
1010
envoy_cc_library(
1111
name = "pomerium_ssh",
1212
srcs = [
13+
"channel.cc",
1314
"client_transport.cc",
1415
"config.cc",
1516
"extension_ping.cc",
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#include "source/extensions/filters/network/ssh/channel.h"
2+
3+
namespace Envoy::Extensions::NetworkFilters::GenericProxy::Codec {
4+
5+
Channel::~Channel() {
6+
if (callbacks_ != nullptr) {
7+
callbacks_->cleanup();
8+
}
9+
};
10+
11+
absl::Status Channel::setChannelCallbacks(ChannelCallbacks& callbacks) {
12+
callbacks_ = &callbacks;
13+
return absl::OkStatus();
14+
}
15+
16+
absl::Status PassthroughChannel::readMessage(wire::ChannelMessage&& msg) {
17+
return callbacks_->sendMessageRemote(std::move(msg));
18+
}
19+
20+
absl::Status ForceCloseChannel::readMessage(wire::ChannelMessage&& msg) {
21+
return msg.visit(
22+
[&](wire::ChannelOpenConfirmationMsg&) {
23+
ENVOY_LOG(debug, "channel {}: closing due to peer preemption", callbacks_->channelId());
24+
callbacks_->sendMessageLocal(wire::ChannelCloseMsg{
25+
.recipient_channel = callbacks_->channelId(),
26+
});
27+
return absl::OkStatus();
28+
},
29+
[&](wire::ChannelOpenFailureMsg&) {
30+
return absl::OkStatus();
31+
},
32+
[](wire::ChannelCloseMsg&) {
33+
return absl::OkStatus();
34+
},
35+
[&](auto& msg) {
36+
// Ignore any messages received before the reply to our ChannelClose request.
37+
// Note: The only way to get here is after receiving a ChannelOpenConfirmation.
38+
ENVOY_LOG(debug, "channel {}: dropping message: {}", callbacks_->channelId(), msg.msg_type());
39+
return absl::OkStatus();
40+
});
41+
}
42+
43+
} // namespace Envoy::Extensions::NetworkFilters::GenericProxy::Codec

source/extensions/filters/network/ssh/channel.h

Lines changed: 33 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#pragma once
22

3+
#include "source/extensions/filters/network/ssh/transport.h"
34
#include "source/extensions/filters/network/ssh/wire/messages.h"
45
#pragma clang unsafe_buffer_usage begin
56
#include "envoy/stats/scope.h"
7+
#include "envoy/common/callback.h"
68
#include "api/extensions/filters/network/ssh/ssh.pb.h"
79
#pragma clang unsafe_buffer_usage end
810

@@ -41,11 +43,34 @@ class ChannelCallbacks {
4143
// for the channel.
4244
virtual void setStatsProvider(ChannelStatsProvider& stats_provider) PURE;
4345

46+
// Adds an interrupt callback to be invoked before a channel is closed to perform graceful
47+
// shutdown in the event of an unexpected disconnect or other non-connection-fatal issue.
48+
// These callbacks can be invoked using runInterruptCallbacks(). The returned handle can be
49+
// deleted to remove the callback.
50+
[[nodiscard]]
51+
virtual Common::CallbackHandlePtr addInterruptCallback(std::function<void(absl::Status, TransportCallbacks&)> cb) PURE;
52+
53+
// Invokes all previously added interrupt callbacks, then clears the interrupt callback list.
54+
// Deleting a callback handle obtained from addInterruptCallbacks after calling this function
55+
// is a no-op; it is safe to let the callback handles go out of scope normally.
56+
// This function can be invoked manually from a Channel implementation. It may also be invoked
57+
// by the ConnectionService itself. Either way, any added callbacks are only invoked once.
58+
virtual void runInterruptCallbacks(absl::Status err) PURE;
59+
60+
// Terminates the connection with an error. This will send an immediate Disconnect message.
61+
virtual void terminate(absl::Status err) PURE;
62+
4463
private:
4564
friend class Channel;
4665
virtual void cleanup() PURE;
4766
};
4867

68+
class ChannelEventCallbacks {
69+
public:
70+
virtual ~ChannelEventCallbacks() = default;
71+
virtual void sendChannelEvent(const pomerium::extensions::ssh::ChannelEvent& ev) PURE;
72+
};
73+
4974
// Channel handles the read path for a single peer (upstream or downstream) for messages on a
5075
// SSH channel. For channels known to both the upstream server and downstream client, two Channel
5176
// objects will exist: one managed by the upstream ConnectionService, and one by the downstream
@@ -57,27 +82,12 @@ class ChannelCallbacks {
5782
// upstream.
5883
class Channel {
5984
public:
60-
virtual ~Channel() {
61-
if (callbacks_ != nullptr) {
62-
callbacks_->cleanup();
63-
}
64-
};
65-
virtual absl::Status setChannelCallbacks(ChannelCallbacks& callbacks) {
66-
callbacks_ = &callbacks;
67-
return absl::OkStatus();
68-
}
85+
virtual ~Channel();
86+
virtual absl::Status setChannelCallbacks(ChannelCallbacks& callbacks);
6987

7088
// Handles a channel message (see concept ChannelMsg) read from the local peer, to be sent to
7189
// the remote peer.
72-
virtual absl::Status readMessage(wire::Message&& msg) PURE;
73-
74-
// Called when the channel is successfully opened. ChannelOpenConfirmation messages are only
75-
// sent here, not to readMessage().
76-
virtual absl::Status onChannelOpened(wire::ChannelOpenConfirmationMsg&&) PURE;
77-
78-
// Called when the channel failed to open. ChannelOpenFailure messages are only sent here,
79-
// not to readMessage().
80-
virtual absl::Status onChannelOpenFailed(wire::ChannelOpenFailureMsg&&) PURE;
90+
virtual absl::Status readMessage(wire::ChannelMessage&& msg) PURE;
8191

8292
protected:
8393
ChannelCallbacks* callbacks_{};
@@ -87,23 +97,14 @@ class PassthroughChannel : public Channel {
8797
public:
8898
PassthroughChannel() = default;
8999

90-
absl::Status readMessage(wire::Message&& msg) override {
91-
return callbacks_->sendMessageRemote(std::move(msg));
92-
}
93-
94-
absl::Status onChannelOpened(wire::ChannelOpenConfirmationMsg&& msg) override {
95-
return callbacks_->sendMessageRemote(std::move(msg));
96-
}
97-
98-
absl::Status onChannelOpenFailed(wire::ChannelOpenFailureMsg&& msg) override {
99-
return callbacks_->sendMessageRemote(std::move(msg));
100-
}
100+
absl::Status readMessage(wire::ChannelMessage&& msg) override;
101101
};
102102

103-
class ChannelEventCallbacks {
103+
class ForceCloseChannel : public Channel, public Logger::Loggable<Logger::Id::filter> {
104104
public:
105-
virtual ~ChannelEventCallbacks() = default;
106-
virtual void sendChannelEvent(const pomerium::extensions::ssh::ChannelEvent& ev) PURE;
105+
ForceCloseChannel() = default;
106+
107+
absl::Status readMessage(wire::ChannelMessage&& msg) override;
107108
};
108109

109110
} // namespace Envoy::Extensions::NetworkFilters::GenericProxy::Codec

source/extensions/filters/network/ssh/client_transport.cc

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -274,34 +274,7 @@ class HandoffChannel : public Channel, public Logger::Loggable<Logger::Id::filte
274274
: info_(info),
275275
handoff_callbacks_(callbacks) {}
276276

277-
absl::Status onChannelOpened(wire::ChannelOpenConfirmationMsg&&) override {
278-
if (info_.pty_info == nullptr) {
279-
return absl::InvalidArgumentError("session is not interactive");
280-
}
281-
282-
ENVOY_LOG(debug, "handoff started");
283-
// 2: PTY open request
284-
wire::ChannelRequestMsg channelReq{
285-
.recipient_channel = callbacks_->channelId(),
286-
.want_reply = true,
287-
.request = wire::PtyReqChannelRequestMsg{
288-
.term_env = info_.pty_info->term_env(),
289-
.width_columns = info_.pty_info->width_columns(),
290-
.height_rows = info_.pty_info->height_rows(),
291-
.width_px = info_.pty_info->width_px(),
292-
.height_px = info_.pty_info->height_px(),
293-
.modes = info_.pty_info->modes(),
294-
},
295-
};
296-
callbacks_->sendMessageLocal(std::move(channelReq));
297-
return absl::OkStatus();
298-
}
299-
absl::Status onChannelOpenFailed(wire::ChannelOpenFailureMsg&& msg) override {
300-
// this should end the connection
301-
return absl::UnavailableError(*msg.description);
302-
}
303-
304-
absl::Status readMessage(wire::Message&& msg) override {
277+
absl::Status readMessage(wire::ChannelMessage&& msg) override {
305278
if (handoff_complete_) {
306279
return callbacks_->sendMessageRemote(std::move(msg));
307280
}
@@ -324,12 +297,42 @@ class HandoffChannel : public Channel, public Logger::Loggable<Logger::Id::filte
324297
[&](const wire::ChannelFailureMsg&) {
325298
return absl::InternalError("failed to open upstream tty");
326299
},
300+
[&](const wire::ChannelOpenConfirmationMsg& msg) {
301+
return onChannelOpened(msg);
302+
},
303+
[&](const wire::ChannelOpenFailureMsg& msg) {
304+
// this should end the connection
305+
return absl::UnavailableError(*msg.description);
306+
},
327307
[](const auto& msg) {
328308
return absl::InternalError(fmt::format("invalid message received during handoff: {}", msg.msg_type()));
329309
});
330310
}
331311

332312
private:
313+
absl::Status onChannelOpened(const wire::ChannelOpenConfirmationMsg&) {
314+
if (info_.pty_info == nullptr) {
315+
return absl::InvalidArgumentError("session is not interactive");
316+
}
317+
318+
ENVOY_LOG(debug, "handoff started");
319+
// 2: PTY open request
320+
wire::ChannelRequestMsg channelReq{
321+
.recipient_channel = callbacks_->channelId(),
322+
.want_reply = true,
323+
.request = wire::PtyReqChannelRequestMsg{
324+
.term_env = info_.pty_info->term_env(),
325+
.width_columns = info_.pty_info->width_columns(),
326+
.height_rows = info_.pty_info->height_rows(),
327+
.width_px = info_.pty_info->width_px(),
328+
.height_px = info_.pty_info->height_px(),
329+
.modes = info_.pty_info->modes(),
330+
},
331+
};
332+
callbacks_->sendMessageLocal(std::move(channelReq));
333+
return absl::OkStatus();
334+
}
335+
333336
bool handoff_complete_{false};
334337
const HandoffInfo& info_;
335338
HandoffChannelCallbacks& handoff_callbacks_;

0 commit comments

Comments
 (0)