|
24 | 24 | HttpRequest
|
25 | 25 | HttpResponse
|
26 | 26 | HttpContent
|
| 27 | + HttpUtil |
| 28 | + HttpHeaderNames |
27 | 29 | LastHttpContent
|
28 | 30 | FullHttpResponse
|
29 | 31 | HttpObjectAggregator]
|
30 | 32 | [io.netty.channel
|
31 | 33 | Channel
|
32 | 34 | ChannelHandler ChannelHandlerContext
|
33 | 35 | ChannelPipeline]
|
| 36 | + [io.netty.handler.stream ChunkedWriteHandler] |
| 37 | + [io.netty.handler.codec.http FullHttpRequest] |
34 | 38 | [io.netty.handler.codec.http.websocketx
|
35 | 39 | CloseWebSocketFrame
|
36 | 40 | PingWebSocketFrame
|
|
51 | 55 | HttpProxyHandler
|
52 | 56 | Socks4ProxyHandler
|
53 | 57 | Socks5ProxyHandler]
|
| 58 | + [io.netty.handler.logging |
| 59 | + LoggingHandler |
| 60 | + LogLevel] |
54 | 61 | [java.util.concurrent.atomic
|
55 | 62 | AtomicInteger]
|
56 | 63 | [aleph.utils
|
|
171 | 178 |
|
172 | 179 | (instance? HttpResponse msg)
|
173 | 180 | (let [rsp msg]
|
174 |
| - (if (HttpHeaders/isTransferEncodingChunked rsp) |
| 181 | + (if (HttpUtil/isTransferEncodingChunked rsp) |
175 | 182 | (let [s (netty/buffered-source (netty/channel ctx) #(alength ^bytes %) buffer-capacity)
|
176 | 183 | c (d/deferred)]
|
177 | 184 | (reset! stream s)
|
|
262 | 269 | ;; * `curl` uses separate option `--proxytunnel` flag to switch tunneling on
|
263 | 270 | ;; * `curl` uses CONNECT when sending request to HTTPS destination through HTTP proxy
|
264 | 271 | ;;
|
265 |
| -;; Explicitily setting `tunnel?` to false when it's expected to use CONNECT |
| 272 | +;; Explicitly setting `tunnel?` to false when it's expected to use CONNECT |
266 | 273 | ;; throws `IllegalArgumentException` to reduce the confusion
|
267 | 274 | (defn http-proxy-handler
|
268 | 275 | [^InetSocketAddress address
|
|
343 | 350 | (.remove (.pipeline ctx) this))
|
344 | 351 | (.fireUserEventTriggered ^ChannelHandlerContext ctx evt))))
|
345 | 352 |
|
| 353 | +(defn coerce-log-level [level] |
| 354 | + (if (instance? LogLevel level) |
| 355 | + level |
| 356 | + (let [netty-level (case level |
| 357 | + :trace LogLevel/TRACE |
| 358 | + :debug LogLevel/DEBUG |
| 359 | + :info LogLevel/INFO |
| 360 | + :warn LogLevel/WARN |
| 361 | + :error LogLevel/ERROR |
| 362 | + nil)] |
| 363 | + (when (nil? netty-level) |
| 364 | + (throw (IllegalArgumentException. |
| 365 | + (str "unknown log level given: " level)))) |
| 366 | + netty-level))) |
| 367 | + |
346 | 368 | (defn pipeline-builder
|
347 | 369 | [response-stream
|
348 | 370 | {:keys
|
|
354 | 376 | raw-stream?
|
355 | 377 | proxy-options
|
356 | 378 | ssl?
|
357 |
| - idle-timeout] |
| 379 | + idle-timeout |
| 380 | + log-activity] |
358 | 381 | :or
|
359 | 382 | {pipeline-transform identity
|
360 | 383 | response-buffer-size 65536
|
|
365 | 388 | (fn [^ChannelPipeline pipeline]
|
366 | 389 | (let [handler (if raw-stream?
|
367 | 390 | (raw-client-handler response-stream response-buffer-size)
|
368 |
| - (client-handler response-stream response-buffer-size))] |
| 391 | + (client-handler response-stream response-buffer-size)) |
| 392 | + logger (when (some? log-activity) |
| 393 | + (LoggingHandler. |
| 394 | + "aleph-client" |
| 395 | + ^LogLevel (coerce-log-level log-activity)))] |
369 | 396 | (doto pipeline
|
370 | 397 | (.addLast "http-client"
|
371 | 398 | (HttpClientCodec.
|
|
374 | 401 | max-chunk-size
|
375 | 402 | false
|
376 | 403 | false))
|
| 404 | + (.addLast "streamer" ^ChannelHandler (ChunkedWriteHandler.)) |
377 | 405 | (.addLast "handler" ^ChannelHandler handler)
|
378 | 406 | (http/attach-idle-handlers idle-timeout))
|
379 | 407 | (when (some? proxy-options)
|
|
388 | 416 | "pending-proxy-connection"
|
389 | 417 | ^ChannelHandler
|
390 | 418 | (pending-proxy-connection-handler response-stream)))))
|
| 419 | + (when (some? logger) |
| 420 | + (.addFirst pipeline "activity-logger" logger)) |
391 | 421 | (pipeline-transform pipeline))))
|
392 | 422 |
|
393 | 423 | (defn close-connection [f]
|
|
448 | 478 | (assoc req :uri (:request-url req))
|
449 | 479 | req))]
|
450 | 480 | (when-not (.get (.headers req') "Host")
|
451 |
| - (HttpHeaders/setHost req' (str host (when explicit-port? (str ":" port))))) |
| 481 | + (.set (.headers req') HttpHeaderNames/HOST (str host (when explicit-port? (str ":" port))))) |
452 | 482 | (when-not (.get (.headers req') "Connection")
|
453 |
| - (HttpHeaders/setKeepAlive req' keep-alive?)) |
| 483 | + (HttpUtil/setKeepAlive req' keep-alive?)) |
454 | 484 | (when (and (non-tunnel-proxy? proxy-options')
|
455 | 485 | (get proxy-options :keep-alive? true)
|
456 | 486 | (not (.get (.headers req') "Proxy-Connection")))
|
457 | 487 | (.set (.headers req') "Proxy-Connection" "Keep-Alive"))
|
458 | 488 |
|
459 |
| - (let [body (if-let [parts (get req :multipart)] |
460 |
| - (let [boundary (multipart/boundary) |
461 |
| - content-type (str "multipart/form-data; boundary=" boundary)] |
462 |
| - (HttpHeaders/setHeader req' "Content-Type" content-type) |
463 |
| - (multipart/encode-body boundary parts)) |
464 |
| - (get req :body))] |
| 489 | + (let [body (:body req) |
| 490 | + parts (:multipart req) |
| 491 | + multipart? (some? parts) |
| 492 | + [req' body] (cond |
| 493 | + ;; RFC #7231 4.3.8. TRACE |
| 494 | + ;; A client MUST NOT send a message body... |
| 495 | + (= :trace (:request-method req)) |
| 496 | + (do |
| 497 | + (when (or (some? body) multipart?) |
| 498 | + (log/warn "TRACE request body was omitted")) |
| 499 | + [req' nil]) |
| 500 | + |
| 501 | + (not multipart?) |
| 502 | + [req' body] |
| 503 | + |
| 504 | + :else |
| 505 | + (multipart/encode-request req' parts))] |
| 506 | + |
| 507 | + (when-let [save-message (get req :aleph/save-request-message)] |
| 508 | + ;; debug purpose only |
| 509 | + ;; note, that req' is effectively mutable, so |
| 510 | + ;; it will "capture" all changes made during "send-message" |
| 511 | + ;; execution |
| 512 | + (reset! save-message req')) |
| 513 | + |
| 514 | + (when-let [save-body (get req :aleph/save-request-body)] |
| 515 | + ;; might be different in case we use :multipart |
| 516 | + (reset! save-body body)) |
| 517 | + |
465 | 518 | (netty/safe-execute ch
|
466 | 519 | (http/send-message ch true ssl? req' body))))
|
467 | 520 |
|
|
584 | 637 | #(when (.isOpen ch)
|
585 | 638 | (d/chain'
|
586 | 639 | (netty/wrap-future (.close handshaker ch (CloseWebSocketFrame.)))
|
587 |
| - (fn [_] (netty/close ch)))))))) |
| 640 | + (fn [_] (netty/close ctx)))))))) |
588 | 641 |
|
589 | 642 | (instance? FullHttpResponse msg)
|
590 | 643 | (let [rsp ^FullHttpResponse msg]
|
591 | 644 | (throw
|
592 | 645 | (IllegalStateException.
|
593 | 646 | (str "unexpected HTTP response, status: "
|
594 |
| - (.getStatus rsp) |
| 647 | + (.status rsp) |
595 | 648 | ", body: '"
|
596 | 649 | (bs/to-string (.content rsp))
|
597 | 650 | "'"))))
|
|
0 commit comments