|
1 | 1 | #include "tor/transport/obfs4_listener.hpp" |
2 | 2 | #include "tor/util/logging.hpp" |
| 3 | +#include "obfs4/transport/packet.hpp" |
| 4 | +#include "obfs4/transport/framing.hpp" |
| 5 | +#include "obfs4/common/csrand.hpp" |
3 | 6 | #include <cstring> |
| 7 | +#include <thread> |
4 | 8 |
|
5 | 9 | namespace tor::transport { |
6 | 10 |
|
@@ -70,155 +74,216 @@ void Obfs4Listener::stop() { |
70 | 74 |
|
71 | 75 | void Obfs4Listener::handle_connection(std::shared_ptr<net::TcpConnection> conn) { |
72 | 76 | LOG_INFO("obfs4: starting handshake for new connection"); |
73 | | - auto handshake = std::make_shared<Obfs4ServerHandshake>(node_id_, identity_key_); |
74 | 77 |
|
| 78 | + // Spawn a thread per connection to avoid blocking the accept loop |
| 79 | + auto handshake = std::make_shared<Obfs4ServerHandshake>(node_id_, identity_key_); |
75 | 80 | auto stats_completed = &handshakes_completed_; |
76 | 81 | auto stats_failed = &handshakes_failed_; |
77 | 82 | auto local_or_port = or_port_; |
78 | 83 | auto& io = io_context_; |
79 | 84 |
|
80 | | - auto buffer = std::make_shared<std::array<uint8_t, 4096>>(); |
81 | | - |
82 | | - // Handshake state machine: read -> consume -> check state -> loop or finish |
83 | | - struct HandshakeReader { |
84 | | - std::shared_ptr<net::TcpConnection> conn; |
85 | | - std::shared_ptr<Obfs4ServerHandshake> handshake; |
86 | | - std::shared_ptr<std::array<uint8_t, 4096>> buffer; |
87 | | - std::atomic<uint64_t>* completed; |
88 | | - std::atomic<uint64_t>* failed; |
89 | | - uint16_t or_port; |
90 | | - boost::asio::io_context* io_ctx; |
91 | | - |
92 | | - void start() { |
93 | | - read_more(); |
94 | | - } |
| 85 | + std::thread([conn, handshake, stats_completed, stats_failed, |
| 86 | + local_or_port, &io]() { |
| 87 | + auto buffer = std::array<uint8_t, 8192>{}; |
95 | 88 |
|
96 | | - void read_more() { |
| 89 | + // --- Phase 1: Handshake --- |
| 90 | + while (true) { |
97 | 91 | auto bytes_read = conn->read( |
98 | | - std::span<uint8_t>(buffer->data(), buffer->size())); |
| 92 | + std::span<uint8_t>(buffer.data(), buffer.size())); |
99 | 93 |
|
100 | 94 | if (!bytes_read || *bytes_read == 0) { |
101 | 95 | LOG_WARN("obfs4 handshake: connection closed during read"); |
102 | | - failed->fetch_add(1, std::memory_order_relaxed); |
| 96 | + stats_failed->fetch_add(1, std::memory_order_relaxed); |
103 | 97 | return; |
104 | 98 | } |
105 | 99 |
|
106 | 100 | LOG_INFO("obfs4 handshake: received {} bytes", *bytes_read); |
107 | 101 |
|
108 | | - auto data = std::span<const uint8_t>(buffer->data(), *bytes_read); |
| 102 | + auto data = std::span<const uint8_t>(buffer.data(), *bytes_read); |
109 | 103 | auto consume_result = handshake->consume(data); |
110 | 104 | if (!consume_result) { |
111 | 105 | LOG_WARN("obfs4 handshake failed: {}", |
112 | 106 | obfs4_error_message(consume_result.error())); |
113 | | - failed->fetch_add(1, std::memory_order_relaxed); |
| 107 | + stats_failed->fetch_add(1, std::memory_order_relaxed); |
114 | 108 | return; |
115 | 109 | } |
116 | 110 |
|
117 | 111 | if (handshake->state() == Obfs4ServerHandshake::State::Completed) { |
118 | | - auto hello = handshake->generate_server_hello(); |
119 | | - if (!hello) { |
120 | | - LOG_ERROR("obfs4: failed to generate server hello"); |
121 | | - failed->fetch_add(1, std::memory_order_relaxed); |
122 | | - return; |
123 | | - } |
| 112 | + break; |
| 113 | + } |
| 114 | + if (handshake->state() == Obfs4ServerHandshake::State::Failed) { |
| 115 | + stats_failed->fetch_add(1, std::memory_order_relaxed); |
| 116 | + return; |
| 117 | + } |
| 118 | + } |
124 | 119 |
|
125 | | - auto hello_span = std::span<const uint8_t>(hello->data(), hello->size()); |
126 | | - auto write_result = conn->write(hello_span); |
127 | | - if (!write_result) { |
128 | | - LOG_ERROR("obfs4: failed to send server hello"); |
129 | | - failed->fetch_add(1, std::memory_order_relaxed); |
130 | | - return; |
131 | | - } |
| 120 | + // --- Phase 2: Initialize framing and send server hello + seed frame --- |
| 121 | + const auto& keys = handshake->session_keys(); |
132 | 122 |
|
133 | | - completed->fetch_add(1, std::memory_order_relaxed); |
134 | | - LOG_INFO("obfs4 handshake completed successfully"); |
135 | | - |
136 | | - // Set up framing with session keys and DRBG seeds |
137 | | - auto framing = std::make_unique<Obfs4Framing>(); |
138 | | - const auto& keys = handshake->session_keys(); |
139 | | - framing->init_send(keys.send_key, keys.send_nonce, keys.send_drbg_seed); |
140 | | - framing->init_recv(keys.recv_key, keys.recv_nonce, keys.recv_drbg_seed); |
141 | | - |
142 | | - // Connect to local OR port and start proxying |
143 | | - auto or_conn = std::make_shared<net::TcpConnection>(*io_ctx); |
144 | | - auto connect_result = or_conn->connect("127.0.0.1", or_port); |
145 | | - if (!connect_result) { |
146 | | - LOG_ERROR("obfs4: failed to connect to local OR port {}", or_port); |
147 | | - return; |
148 | | - } |
| 123 | + auto framing = std::make_shared<Obfs4Framing>(); |
| 124 | + framing->init_send(keys.send_key, keys.send_nonce, keys.send_drbg_seed); |
| 125 | + framing->init_recv(keys.recv_key, keys.recv_nonce, keys.recv_drbg_seed); |
149 | 126 |
|
150 | | - proxy_loop(conn, or_conn, std::move(framing)); |
151 | | - } else if (handshake->state() == Obfs4ServerHandshake::State::Failed) { |
152 | | - failed->fetch_add(1, std::memory_order_relaxed); |
153 | | - return; |
154 | | - } else { |
155 | | - read_more(); |
156 | | - } |
| 127 | + // Generate the handshake response |
| 128 | + auto hello = handshake->generate_server_hello(); |
| 129 | + if (!hello) { |
| 130 | + LOG_ERROR("obfs4: failed to generate server hello"); |
| 131 | + stats_failed->fetch_add(1, std::memory_order_relaxed); |
| 132 | + return; |
| 133 | + } |
| 134 | + |
| 135 | + // Generate and append inline seed frame (PrngSeed packet) |
| 136 | + // The Go client (lyrebird) expects this immediately after the handshake. |
| 137 | + // Without it, the client waits forever → deadlock. |
| 138 | + auto seed_bytes = obfs4::common::random_bytes(24); |
| 139 | + auto seed_pkt = obfs4::transport::make_packet( |
| 140 | + obfs4::transport::PacketType::PrngSeed, |
| 141 | + std::span<const uint8_t>(seed_bytes.data(), seed_bytes.size())); |
| 142 | + auto seed_frame = framing->encode( |
| 143 | + std::span<const uint8_t>(seed_pkt.data(), seed_pkt.size())); |
| 144 | + |
| 145 | + // Append seed frame to server hello |
| 146 | + hello->insert(hello->end(), seed_frame.begin(), seed_frame.end()); |
| 147 | + |
| 148 | + LOG_INFO("obfs4: sending server hello ({} bytes handshake + {} bytes seed frame)", |
| 149 | + hello->size() - seed_frame.size(), seed_frame.size()); |
| 150 | + |
| 151 | + auto hello_span = std::span<const uint8_t>(hello->data(), hello->size()); |
| 152 | + auto write_result = conn->write(hello_span); |
| 153 | + if (!write_result) { |
| 154 | + LOG_ERROR("obfs4: failed to send server hello"); |
| 155 | + stats_failed->fetch_add(1, std::memory_order_relaxed); |
| 156 | + return; |
| 157 | + } |
| 158 | + |
| 159 | + stats_completed->fetch_add(1, std::memory_order_relaxed); |
| 160 | + LOG_INFO("obfs4 handshake completed successfully"); |
| 161 | + |
| 162 | + // --- Phase 3: Connect to local OR port --- |
| 163 | + auto or_conn = std::make_shared<net::TcpConnection>(io); |
| 164 | + auto connect_result = or_conn->connect("127.0.0.1", local_or_port); |
| 165 | + if (!connect_result) { |
| 166 | + LOG_ERROR("obfs4: failed to connect to local OR port {}", local_or_port); |
| 167 | + return; |
157 | 168 | } |
158 | 169 |
|
159 | | - void proxy_loop( |
160 | | - std::shared_ptr<net::TcpConnection> obfs4_conn, |
161 | | - std::shared_ptr<net::TcpConnection> or_conn, |
162 | | - std::unique_ptr<Obfs4Framing> framing) { |
| 170 | + LOG_INFO("obfs4: connected to local OR port {}, starting proxy", local_or_port); |
| 171 | + |
| 172 | + // --- Phase 4: Full-duplex bidirectional proxy --- |
| 173 | + // Two threads: one for each direction. |
| 174 | + // encode() and decode() access separate internal state (Encoder/Decoder), |
| 175 | + // so concurrent access from different threads is safe. |
| 176 | + std::atomic<bool> running{true}; |
163 | 177 |
|
164 | | - auto shared_framing = std::shared_ptr<Obfs4Framing>(std::move(framing)); |
165 | | - auto proxy_buf = std::make_shared<std::array<uint8_t, 4096>>(); |
| 178 | + // Thread A: obfs4 client → OR port |
| 179 | + // Decode frames, parse packets, forward Payload data to OR |
| 180 | + std::thread client_to_or([&running, conn, or_conn, framing]() { |
| 181 | + auto buf = std::array<uint8_t, 4096>{}; |
166 | 182 |
|
167 | | - while (true) { |
168 | | - // Read encrypted data from obfs4 client |
169 | | - auto obfs4_read = obfs4_conn->read( |
170 | | - std::span<uint8_t>(proxy_buf->data(), proxy_buf->size())); |
| 183 | + while (running.load(std::memory_order_relaxed)) { |
| 184 | + auto obfs4_read = conn->read( |
| 185 | + std::span<uint8_t>(buf.data(), buf.size())); |
171 | 186 | if (!obfs4_read || *obfs4_read == 0) { |
| 187 | + LOG_DEBUG("obfs4 proxy: client connection closed"); |
172 | 188 | break; |
173 | 189 | } |
174 | 190 |
|
175 | | - // Decrypt frames |
176 | | - auto encrypted = std::span<const uint8_t>(proxy_buf->data(), *obfs4_read); |
177 | | - auto frames = shared_framing->decode(encrypted); |
| 191 | + auto encrypted = std::span<const uint8_t>(buf.data(), *obfs4_read); |
| 192 | + auto frames = framing->decode(encrypted); |
178 | 193 | if (!frames) { |
179 | | - LOG_WARN("obfs4: frame decryption failed"); |
| 194 | + LOG_WARN("obfs4 proxy: frame decryption failed"); |
180 | 195 | break; |
181 | 196 | } |
182 | 197 |
|
183 | | - // Forward decrypted data to OR port |
184 | | - for (const auto& frame : frames->frames) { |
185 | | - auto frame_span = std::span<const uint8_t>(frame.data(), frame.size()); |
186 | | - auto wr = or_conn->write(frame_span); |
187 | | - if (!wr) { |
188 | | - goto done; |
| 198 | + // Process packet layer: extract payload from frames |
| 199 | + for (const auto& frame_payload : frames->frames) { |
| 200 | + auto packets = obfs4::transport::parse_packets( |
| 201 | + std::span<const uint8_t>(frame_payload.data(), |
| 202 | + frame_payload.size())); |
| 203 | + |
| 204 | + for (const auto& pkt : packets) { |
| 205 | + if (pkt.type == obfs4::transport::PacketType::Payload |
| 206 | + && !pkt.payload.empty()) { |
| 207 | + auto wr = or_conn->write( |
| 208 | + std::span<const uint8_t>(pkt.payload.data(), |
| 209 | + pkt.payload.size())); |
| 210 | + if (!wr) { |
| 211 | + LOG_DEBUG("obfs4 proxy: OR write failed"); |
| 212 | + running.store(false, std::memory_order_relaxed); |
| 213 | + return; |
| 214 | + } |
| 215 | + } |
| 216 | + // PrngSeed packets: client updating DRBG (ignored for now) |
189 | 217 | } |
190 | 218 | } |
| 219 | + } |
191 | 220 |
|
192 | | - // Read plaintext from OR port |
| 221 | + running.store(false, std::memory_order_relaxed); |
| 222 | + conn->close(); |
| 223 | + or_conn->close(); |
| 224 | + }); |
| 225 | + |
| 226 | + // Thread B: OR port → obfs4 client |
| 227 | + // Read plaintext, wrap in Payload packet, encode frame, forward to client |
| 228 | + std::thread or_to_client([&running, conn, or_conn, framing]() { |
| 229 | + auto buf = std::array<uint8_t, 4096>{}; |
| 230 | + |
| 231 | + while (running.load(std::memory_order_relaxed)) { |
193 | 232 | auto or_read = or_conn->read( |
194 | | - std::span<uint8_t>(proxy_buf->data(), proxy_buf->size())); |
195 | | - if (or_read && *or_read > 0) { |
196 | | - auto plaintext = std::span<const uint8_t>(proxy_buf->data(), *or_read); |
197 | | - auto encoded = shared_framing->encode(plaintext); |
198 | | - auto enc_span = std::span<const uint8_t>(encoded.data(), encoded.size()); |
199 | | - auto wr = obfs4_conn->write(enc_span); |
| 233 | + std::span<uint8_t>(buf.data(), buf.size())); |
| 234 | + if (!or_read || *or_read == 0) { |
| 235 | + LOG_DEBUG("obfs4 proxy: OR connection closed"); |
| 236 | + break; |
| 237 | + } |
| 238 | + |
| 239 | + auto plaintext = std::span<const uint8_t>(buf.data(), *or_read); |
| 240 | + |
| 241 | + // Split into chunks that fit in a single frame |
| 242 | + constexpr size_t max_chunk = obfs4::transport::MAX_FRAME_PAYLOAD |
| 243 | + - obfs4::transport::PACKET_OVERHEAD; |
| 244 | + size_t offset = 0; |
| 245 | + |
| 246 | + while (offset < plaintext.size()) { |
| 247 | + size_t chunk_len = std::min(plaintext.size() - offset, max_chunk); |
| 248 | + auto chunk = plaintext.subspan(offset, chunk_len); |
| 249 | + |
| 250 | + // Wrap in Payload packet |
| 251 | + auto pkt = obfs4::transport::make_packet( |
| 252 | + obfs4::transport::PacketType::Payload, chunk); |
| 253 | + |
| 254 | + // Encode as obfs4 frame |
| 255 | + auto frame = framing->encode( |
| 256 | + std::span<const uint8_t>(pkt.data(), pkt.size())); |
| 257 | + |
| 258 | + auto wr = conn->write( |
| 259 | + std::span<const uint8_t>(frame.data(), frame.size())); |
200 | 260 | if (!wr) { |
201 | | - break; |
| 261 | + LOG_DEBUG("obfs4 proxy: client write failed"); |
| 262 | + running.store(false, std::memory_order_relaxed); |
| 263 | + return; |
202 | 264 | } |
| 265 | + |
| 266 | + offset += chunk_len; |
203 | 267 | } |
204 | 268 | } |
205 | | -done: |
206 | | - obfs4_conn->close(); |
| 269 | + |
| 270 | + running.store(false, std::memory_order_relaxed); |
| 271 | + conn->close(); |
207 | 272 | or_conn->close(); |
208 | | - } |
209 | | - }; |
| 273 | + }); |
| 274 | + |
| 275 | + client_to_or.join(); |
| 276 | + or_to_client.join(); |
210 | 277 |
|
211 | | - auto reader = std::make_shared<HandshakeReader>( |
212 | | - HandshakeReader{conn, handshake, buffer, |
213 | | - stats_completed, stats_failed, local_or_port, &io}); |
214 | | - reader->start(); |
| 278 | + LOG_INFO("obfs4: proxy session ended"); |
| 279 | + }).detach(); |
215 | 280 | } |
216 | 281 |
|
217 | 282 | void Obfs4Listener::proxy_connection( |
218 | 283 | [[maybe_unused]] std::shared_ptr<net::TcpConnection> obfs4_conn, |
219 | 284 | [[maybe_unused]] std::shared_ptr<net::TcpConnection> or_conn, |
220 | 285 | [[maybe_unused]] std::unique_ptr<Obfs4Framing> framing) { |
221 | | - // Proxy logic is handled inside HandshakeReader::proxy_loop |
| 286 | + // Proxy logic is handled inside handle_connection thread |
222 | 287 | } |
223 | 288 |
|
224 | 289 | } // namespace tor::transport |
0 commit comments