Skip to content

Commit 1422e3b

Browse files
committed
feat(webhook/sink): implement http request for reset action
1 parent d5c6d6a commit 1422e3b

File tree

1 file changed

+26
-15
lines changed

1 file changed

+26
-15
lines changed

src/sinks/webhook.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,27 +41,38 @@ impl gasket::framework::Worker<Stage> for Worker {
4141

4242
async fn execute(&mut self, unit: &ChainEvent, stage: &mut Stage) -> Result<(), WorkerError> {
4343
let point = unit.point().clone();
44-
let record = unit.record().cloned();
45-
46-
if record.is_none() {
47-
return Ok(());
48-
}
49-
50-
let body = serde_json::Value::from(record.unwrap());
5144

5245
let point_header = match &point {
5346
Point::Origin => String::from("origin"),
5447
Point::Specific(a, b) => format!("{a},{}", hex::encode(b)),
5548
};
5649

57-
let request = self
58-
.client
59-
.post(&stage.config.url)
60-
.header("x-oura-chainsync-action", "apply")
61-
.header("x-oura-chainsync-point", point_header)
62-
.json(&body)
63-
.build()
64-
.or_panic()?;
50+
let request = match unit {
51+
ChainEvent::Apply(_, record) => {
52+
let body = serde_json::Value::from(record.clone());
53+
self.client
54+
.post(&stage.config.url)
55+
.header("x-oura-chainsync-action", "apply")
56+
.header("x-oura-chainsync-point", point_header)
57+
.json(&body)
58+
}
59+
ChainEvent::Undo(_, record) => {
60+
let body = serde_json::Value::from(record.clone());
61+
self.client
62+
.post(&stage.config.url)
63+
.header("x-oura-chainsync-action", "undo")
64+
.header("x-oura-chainsync-point", point_header)
65+
.json(&body)
66+
}
67+
68+
ChainEvent::Reset(_) => self
69+
.client
70+
.post(&stage.config.url)
71+
.header("x-oura-chainsync-action", "reset")
72+
.header("x-oura-chainsync-point", point_header),
73+
}
74+
.build()
75+
.or_panic()?;
6576

6677
self.client
6778
.execute(request)

0 commit comments

Comments
 (0)