|
495 | 495 | proxy-options' (when (some? proxy-options)
|
496 | 496 | (assoc proxy-options :ssl? ssl?))
|
497 | 497 | non-tunnel-proxy? (non-tunnel-proxy? proxy-options')
|
498 |
| - keep-alive?' (or keep-alive? (when (some? proxy-options) |
499 |
| - (get proxy-options :keep-alive? true))) |
| 498 | + keep-alive?' (boolean (or keep-alive? (when (some? proxy-options) |
| 499 | + (get proxy-options :keep-alive? true)))) |
500 | 500 | host-header-value (str host (when explicit-port? (str ":" port)))
|
501 | 501 | c (netty/create-client
|
502 | 502 | (pipeline-builder responses (assoc options :ssl? ssl?))
|
|
511 | 511 | epoll?
|
512 | 512 | name-resolver)]
|
513 | 513 | (d/chain' c
|
514 |
| - (fn [^Channel ch] |
515 |
| - |
516 |
| - (s/consume |
517 |
| - (fn [req] |
518 |
| - (try |
519 |
| - (let [^HttpRequest req' (http/ring-request->netty-request |
520 |
| - (if non-tunnel-proxy? |
521 |
| - (assoc req :uri (req->proxy-url req)) |
522 |
| - req))] |
523 |
| - (when-not (.get (.headers req') "Host") |
524 |
| - (.set (.headers req') HttpHeaderNames/HOST host-header-value)) |
525 |
| - (when-not (.get (.headers req') "Connection") |
526 |
| - (HttpUtil/setKeepAlive req' keep-alive?')) |
527 |
| - |
528 |
| - (let [body (:body req) |
529 |
| - parts (:multipart req) |
530 |
| - multipart? (some? parts) |
531 |
| - [req' body] (cond |
532 |
| - ;; RFC #7231 4.3.8. TRACE |
533 |
| - ;; A client MUST NOT send a message body... |
534 |
| - (= :trace (:request-method req)) |
535 |
| - (do |
536 |
| - (when (or (some? body) multipart?) |
537 |
| - (log/warn "TRACE request body was omitted")) |
538 |
| - [req' nil]) |
539 |
| - |
540 |
| - (not multipart?) |
541 |
| - [req' body] |
542 |
| - |
543 |
| - :else |
544 |
| - (multipart/encode-request req' parts))] |
545 |
| - |
546 |
| - (when-let [save-message (get req :aleph/save-request-message)] |
547 |
| - ;; debug purpose only |
548 |
| - ;; note, that req' is effectively mutable, so |
549 |
| - ;; it will "capture" all changes made during "send-message" |
550 |
| - ;; execution |
551 |
| - (reset! save-message req')) |
552 |
| - |
553 |
| - (when-let [save-body (get req :aleph/save-request-body)] |
554 |
| - ;; might be different in case we use :multipart |
555 |
| - (reset! save-body body)) |
556 |
| - |
557 |
| - (netty/safe-execute ch |
558 |
| - (http/send-message ch true ssl? req' body)))) |
559 |
| - |
560 |
| - ;; this will usually happen because of a malformed request |
561 |
| - (catch Throwable e |
562 |
| - (s/put! responses (d/error-deferred e))))) |
563 |
| - requests) |
564 |
| - |
565 |
| - (s/on-closed responses |
566 |
| - (fn [] |
567 |
| - (when on-closed (on-closed)) |
568 |
| - (s/close! requests))) |
569 |
| - |
570 |
| - (let [t0 (System/nanoTime)] |
571 |
| - (fn [req] |
572 |
| - (if (contains? req ::close) |
573 |
| - (netty/wrap-future (netty/close ch)) |
574 |
| - (let [raw-stream? (get req :raw-stream? raw-stream?) |
575 |
| - rsp (locking ch |
576 |
| - (s/put! requests req) |
577 |
| - (s/take! responses ::closed))] |
578 |
| - (d/chain' rsp |
579 |
| - (fn [rsp] |
580 |
| - (cond |
581 |
| - (instance? Throwable rsp) |
582 |
| - (d/error-deferred rsp) |
583 |
| - |
584 |
| - (identical? ::closed rsp) |
585 |
| - (d/error-deferred |
586 |
| - (ex-info |
587 |
| - (format "connection was closed after %.3f seconds" (/ (- (System/nanoTime) t0) 1e9)) |
588 |
| - {:request req})) |
589 |
| - |
590 |
| - raw-stream? |
591 |
| - rsp |
592 |
| - |
593 |
| - :else |
594 |
| - (d/chain' rsp |
595 |
| - (fn [rsp] |
596 |
| - (let [body (:body rsp)] |
597 |
| - |
598 |
| - ;; handle connection life-cycle |
599 |
| - (when-not keep-alive? |
600 |
| - (if (s/stream? body) |
601 |
| - (s/on-closed body #(netty/close ch)) |
602 |
| - (netty/close ch))) |
603 |
| - |
604 |
| - (assoc rsp |
605 |
| - :body |
606 |
| - (bs/to-input-stream body |
607 |
| - {:buffer-size response-buffer-size})))))))))))))))) |
| 514 | + (fn [^Channel ch] |
| 515 | + |
| 516 | + (s/consume |
| 517 | + (fn [req] |
| 518 | + (try |
| 519 | + (let [^HttpRequest req' (http/ring-request->netty-request |
| 520 | + (if non-tunnel-proxy? |
| 521 | + (assoc req :uri (req->proxy-url req)) |
| 522 | + req))] |
| 523 | + (when-not (.get (.headers req') "Host") |
| 524 | + (.set (.headers req') HttpHeaderNames/HOST host-header-value)) |
| 525 | + (when-not (.get (.headers req') "Connection") |
| 526 | + (HttpUtil/setKeepAlive req' keep-alive?')) |
| 527 | + |
| 528 | + (let [body (:body req) |
| 529 | + parts (:multipart req) |
| 530 | + multipart? (some? parts) |
| 531 | + [req' body] (cond |
| 532 | + ;; RFC #7231 4.3.8. TRACE |
| 533 | + ;; A client MUST NOT send a message body... |
| 534 | + (= :trace (:request-method req)) |
| 535 | + (do |
| 536 | + (when (or (some? body) multipart?) |
| 537 | + (log/warn "TRACE request body was omitted")) |
| 538 | + [req' nil]) |
| 539 | + |
| 540 | + (not multipart?) |
| 541 | + [req' body] |
| 542 | + |
| 543 | + :else |
| 544 | + (multipart/encode-request req' parts))] |
| 545 | + |
| 546 | + (when-let [save-message (get req :aleph/save-request-message)] |
| 547 | + ;; debug purpose only |
| 548 | + ;; note, that req' is effectively mutable, so |
| 549 | + ;; it will "capture" all changes made during "send-message" |
| 550 | + ;; execution |
| 551 | + (reset! save-message req')) |
| 552 | + |
| 553 | + (when-let [save-body (get req :aleph/save-request-body)] |
| 554 | + ;; might be different in case we use :multipart |
| 555 | + (reset! save-body body)) |
| 556 | + |
| 557 | + (netty/safe-execute ch |
| 558 | + (http/send-message ch true ssl? req' body)))) |
| 559 | + |
| 560 | + ;; this will usually happen because of a malformed request |
| 561 | + (catch Throwable e |
| 562 | + (s/put! responses (d/error-deferred e))))) |
| 563 | + requests) |
| 564 | + |
| 565 | + (s/on-closed responses |
| 566 | + (fn [] |
| 567 | + (when on-closed (on-closed)) |
| 568 | + (s/close! requests))) |
| 569 | + |
| 570 | + (let [t0 (System/nanoTime)] |
| 571 | + (fn [req] |
| 572 | + (if (contains? req ::close) |
| 573 | + (netty/wrap-future (netty/close ch)) |
| 574 | + (let [raw-stream? (get req :raw-stream? raw-stream?) |
| 575 | + rsp (locking ch |
| 576 | + (s/put! requests req) |
| 577 | + (s/take! responses ::closed))] |
| 578 | + (d/chain' rsp |
| 579 | + (fn [rsp] |
| 580 | + (cond |
| 581 | + (instance? Throwable rsp) |
| 582 | + (d/error-deferred rsp) |
| 583 | + |
| 584 | + (identical? ::closed rsp) |
| 585 | + (d/error-deferred |
| 586 | + (ex-info |
| 587 | + (format "connection was closed after %.3f seconds" (/ (- (System/nanoTime) t0) 1e9)) |
| 588 | + {:request req})) |
| 589 | + |
| 590 | + raw-stream? |
| 591 | + rsp |
| 592 | + |
| 593 | + :else |
| 594 | + (d/chain' rsp |
| 595 | + (fn [rsp] |
| 596 | + (let [body (:body rsp)] |
| 597 | + |
| 598 | + ;; handle connection life-cycle |
| 599 | + (when-not keep-alive? |
| 600 | + (if (s/stream? body) |
| 601 | + (s/on-closed body #(netty/close ch)) |
| 602 | + (netty/close ch))) |
| 603 | + |
| 604 | + (assoc rsp |
| 605 | + :body |
| 606 | + (bs/to-input-stream body |
| 607 | + {:buffer-size response-buffer-size})))))))))))))))) |
608 | 608 |
|
609 | 609 | ;;;
|
610 | 610 |
|
|
0 commit comments