Skip to content

Commit 4071352

Browse files
committed
refactor: handle logs async with chans
1 parent a3b810e commit 4071352

File tree

2 files changed

+61
-34
lines changed

2 files changed

+61
-34
lines changed

deps.edn

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{:paths ["resources" "src"]
22
:deps {org.clojure/clojure {:mvn/version "1.11.1"}
3+
org.clojure/core.async {:mvn/version "1.6.681"}
34
com.moclojer/moclojer {:mvn/version "0.3.2"}
45
com.moclojer/rq {:mvn/version "0.1.4"}
56
clj-http/clj-http {:mvn/version "3.12.3"}
Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
11
(ns com.moclojer.components.logs
22
(:require
3+
[clojure.core.async :as async]
34
[clojure.data.json :as json]
45
[clj-http.client :as http-client]
5-
[taoensso.telemere :as t]
6-
[taoensso.telemere.timbre :as timbre])
7-
(:import (java.util.concurrent TimeoutException TimeUnit)))
6+
[taoensso.telemere :as t])
7+
(:import [clojure.core.async.impl.channels ManyToManyChannel]))
88

99
(defn build-opensearch-base-req
1010
[config]
1111
(let [{:keys [username password host port]} config
1212
url (str "https://" host ":" port "/_bulk")]
1313
{:method :post
1414
:url url
15-
:async? true
1615
:basic-auth [username password]
1716
:content-type :json
1817
:body (json/write-str {:index {:_index "logs"}})}))
@@ -27,33 +26,46 @@
2726
:else (pr-str v))))
2827
{} m))
2928

30-
(defn send-opensearch-signal-req
31-
[base-req signal]
32-
(let [log (-> (select-keys signal [:level :ctx :data
33-
:msg_ :error :thread
34-
:uid :inst])
35-
(->str-values)
36-
(json/write-str))
37-
future (http-client/request
38-
(update
39-
base-req :body
40-
str \newline log \newline)
41-
identity #(throw ^Exception %))]
42-
(try
43-
(.get future 1 TimeUnit/SECONDS)
44-
(catch TimeoutException _
45-
(.cancel future true)))))
29+
(defn send-opensearch-log-req
30+
[base-req log]
31+
(http-client/request
32+
(update
33+
base-req :body
34+
str \newline (json/write-str (->str-values log)) \newline)
35+
identity #(throw ^Exception %)))
36+
37+
(defonce log-ch (atom (async/chan)))
4638

4739
(defn setup [config level env]
48-
(let [prod? (= env "prod")
49-
os-cfg (when prod? (:opensearch config))
50-
os-base-req (build-opensearch-base-req os-cfg)]
40+
(let [prod? (= env :prod)
41+
log-ch' (swap!
42+
log-ch
43+
(fn [ch]
44+
(when ch (async/close! ch))
45+
(async/chan)))]
46+
5147
(t/set-min-level! level)
52-
(t/set-ns-filter! {:disallow "*" :allow "com.moclojer.*"})
53-
(when prod?
54-
(telemere/add-signal!
55-
:opensearch
56-
(fn [signal])))))
48+
49+
(when (and prod? (instance? ManyToManyChannel log-ch'))
50+
(let [os-cfg (when prod? (:opensearch config))
51+
os-base-req (build-opensearch-base-req os-cfg)]
52+
53+
(t/set-ns-filter! {:disallow #{"*jetty*" "*hikari*"
54+
"*pedestal*" "*migratus*"}})
55+
56+
(t/add-handler!
57+
:opensearch
58+
(fn [signal]
59+
(async/go
60+
(async/>!
61+
log-ch'
62+
(select-keys signal [:level :ctx :data :msg_ :error
63+
:thread :uid :inst])))))
64+
65+
(async/go
66+
(while true
67+
(let [[log _] (async/alts! [log-ch'])]
68+
(send-opensearch-log-req os-base-req log))))))))
5769

5870
(comment
5971
(def my-signal
@@ -70,8 +82,6 @@
7082
:host "foobar"
7183
:port 25060}))
7284

73-
(send-opensearch-signal-req base-req my-signal)
74-
7585
(http-client/request
7686
(update
7787
base-req :body
@@ -82,15 +92,31 @@
8292
;;
8393
)
8494

85-
(defmacro log [level & args]
86-
`(timbre/log ~level ~@args))
95+
(defn log [level msg & [:as data]]
96+
(t/log! {:level level
97+
:data (first data)}
98+
(str msg)))
99+
100+
(comment
101+
;; DEPRECATED
102+
(defmacro log [level & args]
103+
`(timbre/log ~level ~@args))
104+
;;
105+
)
87106

88107
(defn gen-ctx-with-cid []
89108
{:cid (str "cid-" (random-uuid) "-" (System/currentTimeMillis))})
90109

91110
(comment
92-
(setup :dev)
111+
log-ch
112+
113+
(setup {:opensearch
114+
{:username "foobar"
115+
:password "foobar"
116+
:host "foobar"
117+
:port 25060}}
118+
:info :prod)
93119

94-
(log :info :world)
120+
(log :info "something happened" {:user "j0suetm"})
95121
;;
96122
)

0 commit comments

Comments
 (0)