|
124 | 124 | - coll of strings (e.g. [\"9222:9222\"])"
|
125 | 125 | [coll]
|
126 | 126 | (->>
|
127 |
| - (for [s coll :let [{:keys [container host]} (parse-port s)]] |
128 |
| - [(format "%s/%s" (:port container) (:protocol container)) [{:HostPort (:port host)}]]) |
129 |
| - (into {}))) |
| 127 | + (for [s coll :let [{:keys [container host]} (parse-port s)]] |
| 128 | + [(format "%s/%s" (:port container) (:protocol container)) [{:HostPort (:port host)}]]) |
| 129 | + (into {}))) |
130 | 130 |
|
131 | 131 | (comment
|
132 | 132 | (port-bindings ["9222:9222"])
|
|
142 | 142 | ;; Tty wraps the process in a pseudo terminal
|
143 | 143 | ;; StdinOnce closes the stdin after the first client detaches
|
144 | 144 | ;; OpenStdin just opens stdin
|
145 |
| -(defn create-container [{:keys [image entrypoint workdir command host-dir env thread-id opts mounts volumes ports network_mode] |
| 145 | +(defn create-container [{:keys [image entrypoint workdir command host-dir environment thread-id opts mounts volumes ports network_mode] |
146 | 146 | :or {opts {:Tty true}}}]
|
147 | 147 | (let [payload (json/generate-string
|
148 | 148 | (merge
|
149 | 149 | {:Image image}
|
150 | 150 | opts
|
151 |
| - (when env {:env (->> env |
152 |
| - (map (fn [[k v]] (format "%s=%s" (name k) v))) |
153 |
| - (into []))}) |
| 151 | + (when environment {:Env (->> environment |
| 152 | + (map (fn [[k v]] (format "%s=%s" (name k) v))) |
| 153 | + (into []))}) |
154 | 154 | {:HostConfig
|
155 | 155 | (merge
|
156 | 156 | {:Binds
|
|
433 | 433 | (logger/error "processing docker engine attach bytes: " t)
|
434 | 434 | "")))
|
435 | 435 |
|
| 436 | +(defn write-to-stdin |
| 437 | + " params |
| 438 | + client - SocketChannel for attached container" |
| 439 | + [client s] |
| 440 | + (let [buf (ByteBuffer/allocate (* 1024 20))] |
| 441 | + (.clear ^ByteBuffer buf) |
| 442 | + (try |
| 443 | + (.put ^ByteBuffer buf (.getBytes ^String s)) |
| 444 | + (.flip ^ByteBuffer buf) |
| 445 | + (while (.hasRemaining buf) |
| 446 | + (.write ^SocketChannel client buf)) |
| 447 | + (catch Throwable t |
| 448 | + (logger/error "write-string error " t))))) |
| 449 | + |
| 450 | +(defn read-loop |
| 451 | + " params |
| 452 | + in - SocketChannel for attached container |
| 453 | + c - channel to write multiplexed stdout stderr blocks" |
| 454 | + [in c] |
| 455 | + (async/go |
| 456 | + (try |
| 457 | + (let [header-buf (ByteBuffer/allocate 8)] |
| 458 | + (loop [offset 0] |
| 459 | + (let [result (.read ^SocketChannel in header-buf)] |
| 460 | + (cond |
| 461 | + ;;;;;;;;;; |
| 462 | + (= -1 result) |
| 463 | + (async/close! c) |
| 464 | + |
| 465 | + ;;;;;;;;;; |
| 466 | + (= 8 (+ offset result)) |
| 467 | + (do |
| 468 | + (.flip ^ByteBuffer header-buf) |
| 469 | + (let [size (.getInt (ByteBuffer/wrap (Arrays/copyOfRange ^bytes (.array ^ByteBuffer header-buf) 4 8))) |
| 470 | + stream-type (case (int (nth (.array ^ByteBuffer header-buf) 0)) |
| 471 | + 0 :stdin |
| 472 | + 1 :stdout |
| 473 | + 2 :stderr) |
| 474 | + buf (ByteBuffer/allocate size)] |
| 475 | + (loop [offset 0] |
| 476 | + (let [result (.read ^SocketChannel in buf)] |
| 477 | + (cond |
| 478 | + ;;;;;;;;;; |
| 479 | + (= -1 result) |
| 480 | + (async/close! c) |
| 481 | + |
| 482 | + ;;;;;;;;;; |
| 483 | + (= size (+ offset result)) |
| 484 | + (async/>! c {stream-type (String. ^bytes (.array buf))}) |
| 485 | + |
| 486 | + ;;;;;;;;;; |
| 487 | + :else |
| 488 | + (recur (+ offset result))))) |
| 489 | + (do |
| 490 | + (.clear ^ByteBuffer buf) |
| 491 | + (recur 0)))) |
| 492 | + |
| 493 | + ;;;;;;;;;; |
| 494 | + :else |
| 495 | + (do |
| 496 | + (.clear ^ByteBuffer header-buf) |
| 497 | + (recur (+ offset result))))))) |
| 498 | + (catch Throwable t |
| 499 | + (logger/error "streaming exception " t) |
| 500 | + (async/close! c))))) |
| 501 | + |
| 502 | +(defn attach-socket |
| 503 | + " returns SocketChannel" |
| 504 | + [container-id] |
| 505 | + (let [buf (ByteBuffer/allocate (* 1024 20)) |
| 506 | + address (UnixDomainSocketAddress/of "/var/run/docker.sock") |
| 507 | + client (SocketChannel/open address)] |
| 508 | + (.configureBlocking client true) |
| 509 | + (.clear buf) |
| 510 | + ;; make HTTP call |
| 511 | + (.put buf (.getBytes (String. (format "POST /containers/%s/attach?stdin=true&stdout=true&stderr=true&stream=true HTTP/1.1\n" container-id)))) |
| 512 | + (.put buf (.getBytes (String. "Host: localhost\nConnection: Upgrade\nUpgrade: tcp\n\n"))) |
| 513 | + (try |
| 514 | + (.flip ^ByteBuffer buf) |
| 515 | + (while (.hasRemaining buf) |
| 516 | + (.write client buf)) |
| 517 | + ;; TODO if successful, we should get a 101 UPGRADED response that will be exactly 117 bytes |
| 518 | + (let [buf (ByteBuffer/allocate 117)] |
| 519 | + ;; TODO read the HTTP upgrade message |
| 520 | + (.read client buf) |
| 521 | + (.read client buf)) |
| 522 | + (catch Throwable _ |
| 523 | + client)) |
| 524 | + client)) |
| 525 | + |
436 | 526 | (defn function-call-with-stdin
|
437 | 527 | "creates and starts container, then writes to stdin process
|
438 | 528 | returns container map with Id, and socket - socket is open socket to stdin"
|
|
0 commit comments