Skip to content

Commit 8e047cf

Browse files
authored
Merge pull request #458 from district0x/syncer-fixes
Keep event count to avoid duplicates and syncer fixes
2 parents 46e9ad3 + 7f6c17d commit 8e047cf

File tree

4 files changed

+72
-23
lines changed

4 files changed

+72
-23
lines changed

server/src/district/server/web3_events.cljs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,10 @@
166166
(fn [err checkpoint]
167167
(web3-eth/get-block-number
168168
@web3
169-
(fn [_err-block last-block-number]
169+
(fn [err-block last-block-number]
170+
(when (or err-block (nil? last-block-number))
171+
(throw (js/Error. "Failed to get current block number")))
170172
(let [{:keys [last-processed-block processed-log-indexes]} checkpoint
171-
; Current backtrack parameter is not considered (and therefore next-block-to-process neither)
172-
; If want to use it, it needs to be made sure that event handlers are idempotent (i.e. can be run twice without the DB changing)
173-
; This is not the case for FundsIn/FundsOut and some other messages in Ethlance for example
174173
next-block-to-process (max 0 (- last-processed-block backtrack))]
175174
(if skip-past-events-replay?
176175
(do
@@ -179,7 +178,7 @@
179178
(smart-contracts/replay-past-events-in-order
180179
events
181180
dispatch
182-
{:from-block (max last-processed-block from-block 0)
181+
{:from-block (max next-block-to-process from-block 0)
183182
:crash-on-event-fail? crash-on-event-fail?
184183
:skip-log-indexes processed-log-indexes
185184
:to-block last-block-number

server/src/ethlance/server/db.cljs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,27 @@
645645
:token-detail/decimals (:decimals token-details)}))))
646646

647647

648+
(defn get-last-event
649+
[conn contract-key event-name]
650+
(safe-go
651+
(<? (db/get conn {:select [:event/last-log-index :event/last-block-number :event/count]
652+
:from [:Event]
653+
:where [:and
654+
[:= :event/contract-key contract-key]
655+
[:= :event/event-name event-name]]}))))
656+
657+
658+
(defn upsert-event!
659+
[conn event]
660+
(safe-go
661+
(<?
662+
(db/run! conn {:insert-into :Event
663+
:values [(select-keys event (get-table-column-names :Event))]
664+
:upsert (array-map
665+
:on-conflict [:event/event-name :event/contract-key]
666+
:do-update-set (keys event))}))))
667+
668+
648669
(def get-checkpoint-query
649670
{:select [:*]
650671
:from [:ContractEventCheckpoint]

server/src/ethlance/server/db/schema.cljs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,4 +414,14 @@
414414
[[:id :serial]
415415
[:checkpoint :json]
416416
[:created-at :timestamp]
417-
[(sql/call :primary-key :id)]]}])
417+
[(sql/call :primary-key :id)]]}
418+
419+
{:table-name :Event
420+
:table-columns
421+
[[:event/contract-key :varchar not-nil]
422+
[:event/event-name :varchar not-nil]
423+
[:event/last-log-index :integer not-nil]
424+
[:event/last-block-number :integer not-nil]
425+
[:event/count :integer not-nil]
426+
;; PK
427+
[(sql/call :primary-key :event/contract-key :event/event-name)]]}])

server/src/ethlance/server/syncer.cljs

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@
99
[district.server.async-db :as db]
1010
[district.server.smart-contracts :as smart-contracts]
1111
[district.time :as time]
12+
[district.server.config :refer [config]]
1213
[district.server.web3 :refer [ping-start ping-stop web3]]
1314
[district.server.web3-events :as web3-events]
1415
[district.shared.async-helpers :refer [<? safe-go]]
16+
[ethlance.server.db :as ethlance-db]
1517
[ethlance.server.event-replay-queue :as replay-queue]
1618
[ethlance.server.tracing.api :as t-api]
1719
[ethlance.server.syncer.handlers :as handlers]
@@ -24,8 +26,9 @@
2426

2527

2628
(defstate ^{:on-reload :noop} syncer
27-
:start (start {})
28-
:stop (stop syncer))
29+
:start (start (merge (:syncer @config)
30+
(:syncer (mount/args))))
31+
:stop (stop))
2932

3033

3134
;;
@@ -60,6 +63,8 @@
6063
(async/go
6164
(let [contract-key (-> event :contract :contract-key)
6265
event-key (-> event :event)
66+
event-name (name event-key)
67+
log-index (-> event :log-index)
6368
handler (get contract-ev->handler [contract-key event-key])
6469
span (t-api/start-span (str (name (or contract-key "UnnamedContract")) "." (name (or event-key "UnnamedEvent"))))
6570
conn (<? (db/get-connection))]
@@ -72,21 +77,35 @@
7277
(if timestamp
7378
(bn/number timestamp)
7479
block-timestamp))))
75-
_ (db/begin-tx conn)
76-
res (t-api/with-span-context span #(handler conn err event))
77-
_ (db/commit-tx conn)]
78-
(t-api/set-span-ok! span)
79-
;; Calling a handler can throw or return a go block (when using safe-go)
80-
;; in the case of async ones, the go block will return the js/Error.
81-
;; In either cases push the event to the queue, so it can be replayed later
82-
(when (satisfies? ReadPort res)
83-
(let [r (<! res)]
84-
(when (instance? js/Error r)
85-
(throw r))
80+
{:keys [:event/last-block-number :event/last-log-index :event/count]
81+
:or {last-block-number -1
82+
last-log-index -1
83+
count 0}} (<? (ethlance-db/get-last-event conn (name contract-key) event-name))]
84+
(log/debug "Handling event..." event)
85+
(if (or (> block-number last-block-number)
86+
(and (= block-number last-block-number) (> log-index last-log-index)))
87+
(let [_ (db/begin-tx conn)
88+
res (t-api/with-span-context span #(handler conn err event))]
8689
(t-api/set-span-ok! span)
87-
(t-api/end-span! span)
88-
(log/info "Syncer: OK" r)
89-
r)))
90+
;; Calling a handler can throw or return a go block (when using safe-go)
91+
;; in the case of async ones, the go block will return the js/Error.
92+
;; In either cases push the event to the queue, so it can be replayed later
93+
(when (satisfies? ReadPort res)
94+
(let [r (<! res)]
95+
(when (instance? js/Error r)
96+
(throw r))
97+
(<? (ethlance-db/upsert-event! conn {:event/last-log-index log-index
98+
:event/last-block-number block-number
99+
:event/count (inc count)
100+
:event/event-name event-name
101+
:event/contract-key (name contract-key)}))
102+
(db/commit-tx conn)
103+
(log/info "Handled new event" event)
104+
(t-api/set-span-ok! span)
105+
(t-api/end-span! span)
106+
(log/info "Syncer: OK" r)
107+
r)))
108+
(log/info "Skipping handling of a persisted event" event)))
90109
(catch js/Error error
91110
(log/error "Syncer: ERROR" error)
92111
(replay-queue/push-event conn event)
@@ -105,7 +124,7 @@
105124
(let [connected? (true? (<! (web3-eth/is-listening? @web3)))]
106125
(when connected?
107126
(do
108-
(log/debug (str "disconnecting from provider to force reload. Last block: " @last-block-number))
127+
(log/debug (str "disconnecting from provider to force reload"))
109128
(web3-core/disconnect @web3))))))
110129
interval))
111130

0 commit comments

Comments
 (0)