|
1 | 1 | (ns hello.handler |
2 | 2 | (:require |
3 | | - [aleph.http :as http] |
4 | | - [aleph.netty :as netty] |
5 | | - [byte-streams :as bs] |
6 | | - [clj-async-profiler.core :as prof] |
7 | | - [hiccup.page :as hp] |
8 | | - [hiccup.util :as hu] |
9 | | - [jsonista.core :as json] |
10 | | - [manifold.deferred :as d] |
11 | | - [porsas.async :as async]) |
12 | | - (:import (clojure.lang IDeref) |
13 | | - (io.netty.channel ChannelOption) |
| 3 | + [aleph.http :as http] |
| 4 | + [aleph.netty :as netty] |
| 5 | + [hiccup.page :as hp] |
| 6 | + [hiccup.util :as hu] |
| 7 | + [jsonista.core :as json] |
| 8 | + [manifold.deferred :as d] |
| 9 | + [next.jdbc :as jdbc] |
| 10 | + [next.jdbc.connection :as connection] |
| 11 | + [next.jdbc.result-set :as rs]) |
| 12 | + |
| 13 | + (:import (com.zaxxer.hikari HikariDataSource) |
14 | 14 | (io.netty.buffer PooledByteBufAllocator) |
15 | | - (java.util.function Supplier) |
16 | | - (java.util.concurrent ThreadLocalRandom) |
17 | | - (porsas.async Context)) |
| 15 | + (io.netty.channel ChannelOption) |
| 16 | + (java.util.concurrent ThreadLocalRandom)) |
18 | 17 | (:gen-class)) |
19 | 18 |
|
| 19 | +(def jdbc-opts {:builder-fn rs/as-unqualified-maps}) |
| 20 | + |
| 21 | +(def db-spec |
| 22 | + {:jdbcUrl "jdbc:postgresql://tfb-database/hello_world?user=benchmarkdbuser&password=benchmarkdbpass"}) |
| 23 | + |
| 24 | +(def datasource |
| 25 | + (connection/->pool HikariDataSource db-spec)) |
| 26 | + |
20 | 27 | (def plaintext-response |
21 | | - {:status 200 |
| 28 | + {:status 200 |
22 | 29 | :headers {"Content-Type" "text/plain"} |
23 | | - :body (bs/to-byte-array "Hello, World!")}) |
| 30 | + :body (.getBytes "Hello, World!")}) |
24 | 31 |
|
25 | 32 | (def json-response |
26 | | - {:status 200 |
| 33 | + {:status 200 |
27 | 34 | :headers {"Content-Type" "application/json"}}) |
28 | 35 |
|
29 | 36 | (def html-response |
30 | | - {:status 200 |
| 37 | + {:status 200 |
31 | 38 | :headers {"Content-Type" "text/html; charset=utf-8"}}) |
32 | 39 |
|
33 | | -(def db-spec |
34 | | - {:uri "postgresql://tfb-database:5432/hello_world" |
35 | | - :user "benchmarkdbuser" |
36 | | - :password "benchmarkdbpass" |
37 | | - :size 1}) |
38 | | - |
39 | | -(defmacro thread-local [& body] |
40 | | - `(let [tl# (ThreadLocal/withInitial (reify Supplier (get [_] ~@body)))] |
41 | | - (reify IDeref (deref [_] (.get tl#))))) |
42 | 40 |
|
43 | | -(def pool |
44 | | - "PostgreSQL pool of connections (`PgPool`)." |
45 | | - (thread-local (async/pool db-spec))) |
46 | | - |
47 | | -(defn random |
| 41 | +(defn- random |
48 | 42 | "Generate a random number between 1 and 10'000." |
49 | 43 | [] |
50 | 44 | (unchecked-inc (.nextInt (ThreadLocalRandom/current) 10000))) |
51 | 45 |
|
52 | | -(defn sanitize-queries-param |
53 | | - "Sanitizes the `queries` parameter. Clamps the value between 1 and 500. |
54 | | - Invalid (string) values become 1." |
| 46 | +(defn- sanitize-queries-param |
55 | 47 | [request] |
56 | 48 | (let [queries (-> request |
57 | 49 | :query-string |
58 | 50 | (subs 8)) |
59 | 51 | n (try (Integer/parseInt queries) |
60 | | - (catch Exception _ 1))] ; default to 1 on parse failure |
| 52 | + (catch Exception _ 1))] ; default to 1 on parse failure |
61 | 53 | (cond |
62 | 54 | (< n 1) 1 |
63 | 55 | (> n 500) 500 |
64 | 56 | :else n))) |
65 | 57 |
|
66 | | -(def ^Context |
67 | | - query-mapper |
68 | | - "Map each row into a record." |
69 | | - (async/context {:row (async/rs->compiled-record)})) |
70 | 58 |
|
71 | | -(defn query-one-random-world |
72 | | - "Query a random world on the database. |
73 | | - Return a `CompletableFuture`." |
74 | | - [] |
75 | | - (async/query-one query-mapper |
76 | | - @pool |
77 | | - ["SELECT id, randomnumber FROM world WHERE id=$1" (random)])) |
78 | | - |
79 | | -(defn update-world |
80 | | - "Update a world on the database. |
81 | | - Return a `CompletableFuture`." |
82 | | - [{:keys [randomNumber id]}] |
83 | | - (async/query @pool |
84 | | - ["UPDATE world SET randomnumber=$1 WHERE id=$2" randomNumber id])) |
85 | | - |
86 | | -(defn run-queries |
| 59 | +(defn- query-one-random-world [] |
| 60 | + (jdbc/execute-one! datasource |
| 61 | + ["select * from \"World\" where id = ?;" (random)] |
| 62 | + jdbc-opts)) |
| 63 | + |
| 64 | +(defn- update-world |
| 65 | + [{:keys [randomnumber id]}] |
| 66 | + (jdbc/execute-one! datasource |
| 67 | + ["update \"World\" set randomNumber = ? where id = ? returning *;" randomnumber id] |
| 68 | + jdbc-opts)) |
| 69 | + |
| 70 | +(defn- run-queries |
87 | 71 | "Run a number of `queries` on the database to fetch a random world. |
88 | 72 | Return a `manifold.deferred`." |
89 | 73 | [queries] |
90 | 74 | (apply d/zip |
91 | 75 | (take queries |
92 | 76 | (repeatedly query-one-random-world)))) |
93 | 77 |
|
94 | | -(defn query-fortunes |
95 | | - "Query the fortunes on database. |
96 | | - Return a `CompletableFuture`." |
97 | | - [] |
98 | | - (async/query query-mapper @pool ["SELECT id, message from FORTUNE"])) |
99 | 78 |
|
100 | | -(defn get-fortunes |
| 79 | +(defn query-fortunes [] |
| 80 | + (jdbc/execute! datasource |
| 81 | + ["select * from \"Fortune\";"] |
| 82 | + jdbc-opts)) |
| 83 | + |
| 84 | +(defn- get-fortunes |
101 | 85 | "Fetch the full list of Fortunes from the database, sort them by the fortune |
102 | 86 | message text. |
103 | 87 | Return a `CompletableFuture` with the results." |
104 | 88 | [] |
105 | | - (d/chain (query-fortunes) |
106 | | - (fn [fortunes] |
107 | | - (sort-by :message |
108 | | - (conj fortunes |
109 | | - {:id 0 |
110 | | - :message "Additional fortune added at request time."}))))) |
111 | | - |
112 | | -(defn update-and-persist |
113 | | - "Fetch a number of `queries` random world from the database. |
114 | | - Compute a new `randomNumber` for each of them a return a `CompletableFuture` |
115 | | - with the updated worlds." |
116 | | - [queries] |
| 89 | + (try |
| 90 | + (d/chain (query-fortunes) |
| 91 | + (fn [fortunes] |
| 92 | + (sort-by :message |
| 93 | + (conj fortunes |
| 94 | + {:id 0 |
| 95 | + :message "Additional fortune added at request time."})))) |
| 96 | + (catch Exception e |
| 97 | + (.getStackTrace ^Exception e)))) |
| 98 | + |
| 99 | +(defn- update-and-persist [queries] |
117 | 100 | (d/chain' (run-queries queries) |
118 | 101 | (fn [worlds] |
119 | | - (let [worlds' (mapv #(assoc % :randomNumber (random)) worlds)] |
| 102 | + (let [worlds' (mapv #(assoc % :randomnumber (random)) worlds)] |
120 | 103 | (d/chain' (apply d/zip (mapv update-world worlds')) |
121 | 104 | (fn [_] worlds')))))) |
122 | 105 |
|
123 | | -(defn fortunes-hiccup |
| 106 | +(defn- fortunes-hiccup |
124 | 107 | "Render the given fortunes to simple HTML using Hiccup." |
125 | 108 | [fortunes] |
126 | 109 | (hp/html5 |
127 | | - [:head |
128 | | - [:title "Fortunes"]] |
129 | | - [:body |
130 | | - [:table |
131 | | - [:tr |
132 | | - [:th "id"] |
133 | | - [:th "message"]] |
134 | | - (for [x fortunes] |
135 | | - [:tr |
136 | | - [:td (:id x)] |
137 | | - [:td (hu/escape-html (:message x))]])]])) |
| 110 | + [:head |
| 111 | + [:title "Fortunes"]] |
| 112 | + [:body |
| 113 | + [:table |
| 114 | + [:tr |
| 115 | + [:th "id"] |
| 116 | + [:th "message"]] |
| 117 | + (for [x fortunes] |
| 118 | + [:tr |
| 119 | + [:td (:id x)] |
| 120 | + [:td (hu/escape-html (:message x))]])]])) |
138 | 121 |
|
139 | 122 | (defn handler |
140 | 123 | "Ring handler representing the different tests." |
141 | 124 | [req] |
142 | 125 | (let [uri (:uri req)] |
143 | 126 | (cond |
144 | 127 | (.equals "/plaintext" uri) plaintext-response |
145 | | - (.equals "/json" uri) (assoc json-response |
146 | | - :body (json/write-value-as-bytes {:message "Hello, World!"})) |
147 | | - (.equals "/db" uri) (-> (query-one-random-world) |
148 | | - (d/chain (fn [world] |
149 | | - (assoc json-response |
150 | | - :body (json/write-value-as-bytes world))))) |
151 | | - (.equals "/queries" uri) (-> (sanitize-queries-param req) |
152 | | - (run-queries) |
153 | | - (d/chain (fn [worlds] |
154 | | - (assoc json-response |
155 | | - :body (json/write-value-as-bytes worlds))))) |
156 | | - (.equals "/fortunes" uri) (d/chain' (get-fortunes) |
157 | | - fortunes-hiccup |
158 | | - (fn [body] |
159 | | - (assoc html-response :body body))) |
160 | | - (.equals "/updates" uri) (-> (sanitize-queries-param req) |
161 | | - (update-and-persist) |
162 | | - (d/chain (fn [worlds] |
163 | | - (assoc json-response |
164 | | - :body (json/write-value-as-bytes worlds))))) |
165 | | - :else {:status 404}))) |
166 | | - |
167 | | -;;; |
| 128 | + (.equals "/json" uri) (assoc json-response |
| 129 | + :body (json/write-value-as-bytes {:message "Hello, World!"})) |
| 130 | + (.equals "/db" uri) (-> (query-one-random-world) |
| 131 | + (d/chain (fn [world] |
| 132 | + (assoc json-response |
| 133 | + :body (json/write-value-as-bytes world))))) |
| 134 | + (.equals "/queries" uri) (-> (sanitize-queries-param req) |
| 135 | + (run-queries) |
| 136 | + (d/chain (fn [worlds] |
| 137 | + (assoc json-response |
| 138 | + :body (json/write-value-as-bytes worlds))))) |
| 139 | + (.equals "/fortunes" uri) (d/chain' (get-fortunes) |
| 140 | + fortunes-hiccup |
| 141 | + (fn [body] |
| 142 | + (assoc html-response :body body))) |
| 143 | + (.equals "/updates" uri) (-> (sanitize-queries-param req) |
| 144 | + (update-and-persist) |
| 145 | + (d/chain (fn [worlds] |
| 146 | + (assoc json-response |
| 147 | + :body (json/write-value-as-bytes worlds))))) |
| 148 | + :else {:body "Not found" |
| 149 | + :status 404}))) |
| 150 | + |
168 | 151 |
|
169 | 152 | (defn -main [& _] |
170 | 153 | (netty/leak-detector-level! :disabled) |
171 | | - (http/start-server handler {:port 8080 |
172 | | - :raw-stream? true |
173 | | - :epoll? true |
174 | | - :executor :none |
| 154 | + (println "starting server on port 8080") |
| 155 | + (http/start-server handler {:port 8080 |
| 156 | + :raw-stream? true |
| 157 | + :executor :none |
175 | 158 | :bootstrap-transform (fn [bootstrap] |
176 | 159 | (.option bootstrap ChannelOption/ALLOCATOR PooledByteBufAllocator/DEFAULT) |
177 | 160 | (.childOption bootstrap ChannelOption/ALLOCATOR PooledByteBufAllocator/DEFAULT)) |
178 | | - :pipeline-transform (fn [pipeline] |
179 | | - (.remove pipeline "continue-handler"))}) |
180 | | - ;; Uncomment to enable async-profiler |
181 | | - #_ |
182 | | - (do |
183 | | - (prof/profile-for 60 |
184 | | - #_ |
185 | | - {:transform (fn [s] |
186 | | - (when-not (re-find #"(writev|__libc|epoll_wait|write|__pthread)" s) |
187 | | - s))}) |
188 | | - (prof/serve-files 8081))) |
| 161 | + :pipeline-transform (fn [pipeline] |
| 162 | + (.remove pipeline "continue-handler"))})) |
0 commit comments