|
1 | 1 | (ns pcp.scgi |
2 | | - (:require [clojure.string :as str]) |
| 2 | + (:require [clojure.string :as str] |
| 3 | + [com.climate.claypoole :as cp]) |
3 | 4 | (:import [java.nio.channels ServerSocketChannel SocketChannel Selector SelectionKey] |
4 | 5 | [java.nio ByteBuffer] |
5 | 6 | [java.net InetSocketAddress InetAddress] |
6 | | - [java.io ByteArrayInputStream ByteArrayOutputStream]) |
| 7 | + [java.io ByteArrayInputStream ByteArrayOutputStream] |
| 8 | + [java.util Set]) |
7 | 9 | (:gen-class)) |
8 | 10 |
|
9 | 11 | (set! *warn-on-reflection* 1) |
|
39 | 41 | (assoc! :body (:body req)) |
40 | 42 | (persistent!)))) |
41 | 43 |
|
42 | | -(defn on-accept [selector ^SelectionKey key] |
| 44 | +(defn on-accept [selector ^SelectionKey key ^Set keys] |
43 | 45 | (let [^ServerSocketChannel channel (.channel key) |
44 | 46 | ^SocketChannel socketChannel (.accept channel)] |
45 | 47 | (.configureBlocking socketChannel false) |
46 | | - (.register socketChannel selector SelectionKey/OP_READ))) |
| 48 | + (.register socketChannel selector SelectionKey/OP_READ) |
| 49 | + (.remove keys key))) |
47 | 50 |
|
48 | 51 | (defn create-scgi-string [resp] |
49 | 52 | (let [nl "\r\n" |
50 | 53 | response (str (:status resp) nl (apply str (for [[k v] (:headers resp)] (str k ": " v nl))) nl (:body resp))] |
51 | 54 | response)) |
52 | 55 |
|
53 | | -(defn on-read [^SelectionKey key handler] |
| 56 | +(defn on-read [^SelectionKey key handler ^Set keys] |
54 | 57 | (let [^SocketChannel socket-channel (.channel key)] |
55 | 58 | (try |
56 | 59 | (let [buf (ByteBuffer/allocate 1) |
|
90 | 93 | (let [^ByteBuffer resp (-> {:header header :body (ByteArrayInputStream. (.toByteArray body-out))} extract-headers handler create-scgi-string to-byte-array)] |
91 | 94 | (.write socket-channel resp) |
92 | 95 | (.close socket-channel) |
93 | | - (.cancel key)))) |
94 | | - (catch Exception _ (.close socket-channel) (.cancel key))))) |
| 96 | + (.cancel key) |
| 97 | + (.remove keys key)))) |
| 98 | + (catch Exception e (println e) (.close socket-channel) (.cancel key) (.remove keys key))))) |
95 | 99 |
|
96 | 100 | (defn build-server [port selector] |
97 | 101 | (let [^ServerSocketChannel serverChannel (ServerSocketChannel/open) |
|
101 | 105 | (.register serverChannel selector SelectionKey/OP_ACCEPT) |
102 | 106 | serverChannel)) |
103 | 107 |
|
104 | | -(defn run-selection [active handler ^Selector selector] |
| 108 | +(defn run-selection [active handler ^Selector selector pool] |
| 109 | + (println "running...") |
105 | 110 | (while (some? @active) |
106 | | - (if (not= 0 (.select selector 50)) |
107 | | - (let [keys (.selectedKeys selector)] |
108 | | - (doseq [^SelectionKey key keys] |
109 | | - (let [ops (.readyOps key)] |
110 | | - (cond |
111 | | - (= ops SelectionKey/OP_ACCEPT) (on-accept selector key) |
112 | | - (= ops SelectionKey/OP_READ) (on-read key handler)))) |
113 | | - (.clear keys)) |
114 | | - nil))) |
| 111 | + (when (not= 0 (.select selector)) |
| 112 | + (let [keys (.selectedKeys selector) |
| 113 | + process (fn [^SelectionKey key] |
| 114 | + (let [ops (.readyOps key)] |
| 115 | + (cond |
| 116 | + (= ops SelectionKey/OP_ACCEPT) (on-accept selector key keys) |
| 117 | + (= ops SelectionKey/OP_READ) (on-read key handler keys))))] |
| 118 | + (dorun (cp/upmap pool process keys)))))) |
115 | 119 |
|
116 | 120 | (defn serve [handler port] |
117 | 121 | (let [active (atom true) |
118 | 122 | ^Selector selector (Selector/open) |
119 | | - ^ServerSocketChannel server (build-server port selector)] |
| 123 | + ^ServerSocketChannel server (build-server port selector) |
| 124 | + pool (cp/threadpool 512)] |
120 | 125 | (future |
121 | | - (run-selection active handler selector)) |
| 126 | + (run-selection active handler selector pool)) |
122 | 127 | (fn [] |
123 | 128 | (.close server) |
124 | | - (reset! active false)))) |
| 129 | + (reset! active false) |
| 130 | + (cp/shutdown pool)))) |
125 | 131 |
|
0 commit comments