Skip to content

Commit 0cfdc1c

Browse files
authored
Merge pull request #26 from movementlabsxyz/feat/pcp-adds-content
[protocol] pcp: add missing files
2 parents 403a31c + 844e916 commit 0cfdc1c

File tree

16 files changed

+910
-0
lines changed

16 files changed

+910
-0
lines changed

protocol/pcp/dlu/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# Deployed Logic Units (DLU)
2+
3+
DLUs are units of logic that are not generally run within an independent process, but which must be deployed somewhere. Smart contracts are DLU.
4+
5+
In this directory, we include DLU and the means via which to deploy them to various environments.

protocol/pcp/manager/Cargo.toml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
[package]
2+
name = "postconfirmationssettlement-manager"
3+
version = { workspace = true }
4+
edition = { workspace = true }
5+
license = { workspace = true }
6+
authors = { workspace = true }
7+
repository = { workspace = true }
8+
homepage = { workspace = true }
9+
publish = { workspace = true }
10+
rust-version = { workspace = true }
11+
12+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
13+
14+
[dependencies]
15+
postconfirmationsconfig = { workspace = true }
16+
postconfirmationssettlement-client = { workspace = true }
17+
movement-types = { workspace = true }
18+
19+
anyhow = { workspace = true }
20+
async-stream = { workspace = true }
21+
async-trait = { workspace = true }
22+
futures = { workspace = true }
23+
tokio = { workspace = true }
24+
tokio-stream = { workspace = true }
25+
serde_json = { workspace = true }
26+
27+
[dev-dependencies]
28+
postconfirmationssettlement-client = { workspace = true, features = ["mock"] }
29+
30+
[features]
31+
default = ["stub"]
32+
stub = []
33+
34+
[lints]
35+
workspace = true

protocol/pcp/manager/src/lib.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use pcp_types::block_commitment::{SuperBlockCommitment, SuperBlockCommitmentEvent};
2+
use tokio_stream::Stream;
3+
4+
mod manager;
5+
6+
pub use manager::Manager as PcpSettlementManager;
7+
8+
pub type CommitmentEventStream =
9+
std::pin::Pin<Box<dyn Stream<Item = Result<SuperBlockCommitmentEvent, anyhow::Error>> + Send>>;
10+
11+
#[async_trait::async_trait]
12+
pub trait PcpSettlementManagerOperations {
13+
/// Adds a block commitment to the manager queue.
14+
async fn post_block_commitment(
15+
&self,
16+
block_commitment: SuperBlockCommitment,
17+
) -> Result<(), anyhow::Error>;
18+
}
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
use crate::{CommitmentEventStream, PcpSettlementManagerOperations, SuperBlockCommitmentEvent};
2+
3+
use pcp_config::Config;
4+
use postconfirmations_settlement_client::PcpSettlementClientOperations;
5+
use pcp_types::block_commitment::{
6+
SuperBlockCommitment, SuperBlockCommitmentRejectionReason,
7+
};
8+
9+
use async_stream::stream;
10+
use async_trait::async_trait;
11+
use futures::future::{self, Either};
12+
use tokio::sync::mpsc;
13+
use tokio::time;
14+
use tokio_stream::StreamExt;
15+
16+
use std::collections::BTreeMap;
17+
use std::mem;
18+
use std::time::Duration;
19+
20+
/// Public handle for the PCP settlement manager.
21+
pub struct Manager {
22+
sender: mpsc::Sender<SuperBlockCommitment>,
23+
}
24+
25+
impl Manager {
26+
/// Creates a new PCP settlement manager.
27+
///
28+
/// Returns the handle with the public API and the stream to receive commitment events.
29+
/// The stream needs to be polled to drive the PCP settlement client and
30+
/// process the commitments.
31+
pub fn new<C: PcpSettlementClientOperations + Send + 'static>(
32+
client: C,
33+
config: &Config,
34+
) -> (Self, CommitmentEventStream) {
35+
let batch_timeout = Duration::from_millis(config.transactions.batch_timeout);
36+
let (sender, receiver) = mpsc::channel(16);
37+
let event_stream = process_commitments(receiver, client, batch_timeout);
38+
(Self { sender }, event_stream)
39+
}
40+
}
41+
42+
#[async_trait]
43+
impl PcpSettlementManagerOperations for Manager {
44+
async fn post_block_commitment(
45+
&self,
46+
block_commitment: SuperBlockCommitment,
47+
) -> Result<(), anyhow::Error> {
48+
self.sender.send(block_commitment).await?;
49+
Ok(())
50+
}
51+
}
52+
53+
fn process_commitments<C: PcpSettlementClientOperations + Send + 'static>(
54+
mut receiver: mpsc::Receiver<SuperBlockCommitment>,
55+
client: C,
56+
batch_timeout: Duration,
57+
) -> CommitmentEventStream {
58+
// Can't mix try_stream! and select!, see https://github.com/tokio-rs/async-stream/issues/63
59+
Box::pin(stream! {
60+
let mut settlement_stream = client.stream_block_commitments().await?;
61+
let mut max_height = client.get_max_tolerable_block_height().await?;
62+
let mut ahead_of_settlement = false;
63+
let mut commitments_to_settle = BTreeMap::new();
64+
let mut batch_acc = Vec::new();
65+
let mut batch_ready = Either::Left(future::pending::<()>());
66+
loop {
67+
tokio::select! {
68+
Some(block_commitment) = receiver.recv(), if !ahead_of_settlement => {
69+
commitments_to_settle.insert(
70+
block_commitment.height(),
71+
block_commitment.commitment().clone(),
72+
);
73+
if block_commitment.height() > max_height {
74+
// Can't post this commitment to the contract yet.
75+
// Post the previously accumulated commitments as a batch
76+
// and pause reading from input.
77+
ahead_of_settlement = true;
78+
let batch = mem::replace(&mut batch_acc, Vec::new());
79+
if let Err(e) = client.post_block_commitment_batch(batch).await {
80+
yield Err(e);
81+
break;
82+
}
83+
}
84+
// If this commitment starts a new batch, start the timeout
85+
if batch_acc.is_empty() {
86+
batch_ready = Either::Right(Box::pin(time::sleep(batch_timeout)));
87+
}
88+
batch_acc.push(block_commitment);
89+
}
90+
_ = &mut batch_ready => {
91+
// Batch timeout has expired, post the commitments we have now
92+
let batch = mem::replace(&mut batch_acc, Vec::new());
93+
if let Err(e) = client.post_block_commitment_batch(batch).await {
94+
yield Err(e);
95+
break;
96+
}
97+
// Disable the batch timeout
98+
batch_ready = Either::Left(future::pending::<()>());
99+
}
100+
Some(res) = settlement_stream.next() => {
101+
let settled_commitment = match res {
102+
Ok(commitment) => commitment,
103+
Err(e) => {
104+
yield Err(e);
105+
break;
106+
}
107+
};
108+
109+
let height = settled_commitment.height();
110+
if let Some(commitment) = commitments_to_settle.remove(&height) {
111+
let event = if commitment == settled_commitment.commitment() {
112+
SuperBlockCommitmentEvent::Accepted(settled_commitment)
113+
} else {
114+
SuperBlockCommitmentEvent::Rejected {
115+
height,
116+
reason: SuperBlockCommitmentRejectionReason::InvalidCommitment,
117+
}
118+
};
119+
yield Ok(event);
120+
} else if let Some((&lh, _)) = commitments_to_settle.last_key_value() {
121+
if lh < height {
122+
// Settlement has left some commitments behind, but the client could
123+
// deliver them of order?
124+
todo!("Handle falling behind on settlement")
125+
}
126+
}
127+
// Remove back-pressure if we can proceed settling new blocks.
128+
if ahead_of_settlement {
129+
let new_max_height = match client.get_max_tolerable_block_height().await {
130+
Ok(h) => h,
131+
Err(e) => {
132+
yield Err(e);
133+
break;
134+
}
135+
};
136+
if new_max_height > max_height {
137+
max_height = new_max_height;
138+
ahead_of_settlement = false;
139+
}
140+
}
141+
}
142+
else => break
143+
}
144+
}
145+
})
146+
}
147+
148+
#[cfg(test)]
149+
mod tests {
150+
use super::*;
151+
use postconfirmations_settlement_client::mock::PcpSettlementClient;
152+
use pcp_types::block_commitment::{Commitment, SuperBlockCommitment};
153+
154+
#[tokio::test]
155+
async fn test_block_commitment_accepted() -> Result<(), anyhow::Error> {
156+
let config = Config::default();
157+
let mut client = PcpSettlementClient::new();
158+
client.block_lead_tolerance = 1;
159+
let (manager, mut event_stream) = Manager::new(client.clone(), &config);
160+
let commitment = SuperBlockCommitment::new(1, Default::default(), Commitment::new([1; 32]));
161+
manager.post_block_commitment(commitment.clone()).await?;
162+
let commitment2 =
163+
SuperBlockCommitment::new(2, Default::default(), Commitment::new([2; 32]));
164+
manager.post_block_commitment(commitment2).await?;
165+
let item = event_stream.next().await;
166+
let res = item.unwrap();
167+
let event = res.unwrap();
168+
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment));
169+
Ok(())
170+
}
171+
172+
#[tokio::test]
173+
async fn test_block_commitment_rejected() -> Result<(), anyhow::Error> {
174+
let config = Config::default();
175+
let mut client = PcpSettlementClient::new();
176+
client.block_lead_tolerance = 1;
177+
let (manager, mut event_stream) = Manager::new(client.clone(), &config);
178+
let commitment = SuperBlockCommitment::new(1, Default::default(), Commitment::new([1; 32]));
179+
client
180+
.override_block_commitment(SuperBlockCommitment::new(
181+
1,
182+
Default::default(),
183+
Commitment::new([3; 32]),
184+
))
185+
.await;
186+
manager.post_block_commitment(commitment.clone()).await?;
187+
let commitment2 =
188+
SuperBlockCommitment::new(2, Default::default(), Commitment::new([2; 32]));
189+
manager.post_block_commitment(commitment2).await?;
190+
let item = event_stream.next().await;
191+
let res = item.unwrap();
192+
let event = res.unwrap();
193+
assert_eq!(
194+
event,
195+
SuperBlockCommitmentEvent::Rejected {
196+
height: 1,
197+
reason: SuperBlockCommitmentRejectionReason::InvalidCommitment,
198+
}
199+
);
200+
Ok(())
201+
}
202+
203+
#[tokio::test]
204+
async fn test_back_pressure() -> Result<(), anyhow::Error> {
205+
let config = Config::default();
206+
let mut client = PcpSettlementClient::new();
207+
client.block_lead_tolerance = 2;
208+
client.pause_after(2).await;
209+
let (manager, mut event_stream) = Manager::new(client.clone(), &config);
210+
211+
let commitment1 =
212+
SuperBlockCommitment::new(1, Default::default(), Commitment::new([1; 32]));
213+
manager.post_block_commitment(commitment1.clone()).await?;
214+
let commitment2 =
215+
SuperBlockCommitment::new(2, Default::default(), Commitment::new([2; 32]));
216+
manager.post_block_commitment(commitment2.clone()).await?;
217+
let commitment3 =
218+
SuperBlockCommitment::new(3, Default::default(), Commitment::new([3; 32]));
219+
manager.post_block_commitment(commitment3.clone()).await?;
220+
221+
let event = event_stream.next().await.expect("stream has ended")?;
222+
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment1.clone()));
223+
let event = event_stream.next().await.expect("stream has ended")?;
224+
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment2.clone()));
225+
226+
// The batch of first two should have been posted,
227+
// the third commitment is batched in the manager.
228+
assert_eq!(client.get_commitment_at_height(1).await?, Some(commitment1.clone()));
229+
assert_eq!(client.get_commitment_at_height(2).await?, Some(commitment2.clone()));
230+
assert_eq!(client.get_commitment_at_height(3).await?, None);
231+
232+
// Unblock the client, allowing processing of commitments to resume.
233+
client.resume().await;
234+
235+
let commitment4 =
236+
SuperBlockCommitment::new(4, Default::default(), Commitment::new([4; 32]));
237+
manager.post_block_commitment(commitment4).await?;
238+
let commitment5 =
239+
SuperBlockCommitment::new(5, Default::default(), Commitment::new([5; 32]));
240+
manager.post_block_commitment(commitment5).await?;
241+
242+
let event = event_stream.next().await.expect("stream has ended")?;
243+
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment3.clone()));
244+
245+
Ok(())
246+
}
247+
248+
#[tokio::test]
249+
async fn test_batch_timeout() -> Result<(), anyhow::Error> {
250+
let mut config = Config::default();
251+
config.transactions.batch_timeout = 100;
252+
let client = PcpSettlementClient::new();
253+
let (manager, mut event_stream) = Manager::new(client.clone(), &config);
254+
255+
let commitment1 =
256+
SuperBlockCommitment::new(1, Default::default(), Commitment::new([1; 32]));
257+
manager.post_block_commitment(commitment1.clone()).await?;
258+
let commitment2 =
259+
SuperBlockCommitment::new(2, Default::default(), Commitment::new([2; 32]));
260+
manager.post_block_commitment(commitment2.clone()).await?;
261+
262+
let item = time::timeout(Duration::from_secs(2), event_stream.next())
263+
.await
264+
.expect("no timeout");
265+
let event = item.expect("stream has ended")?;
266+
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment1.clone()));
267+
let event = event_stream.next().await.expect("stream has ended")?;
268+
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment2.clone()));
269+
270+
let commitment3 =
271+
SuperBlockCommitment::new(3, Default::default(), Commitment::new([3; 32]));
272+
manager.post_block_commitment(commitment3.clone()).await?;
273+
274+
let item = time::timeout(Duration::from_secs(2), event_stream.next())
275+
.await
276+
.expect("no timeout");
277+
let event = item.expect("stream has ended")?;
278+
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment3));
279+
280+
Ok(())
281+
}
282+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
[package]
2+
name = "pcp-config"
3+
description = "Configuration of the PCP settlement client"
4+
version.workspace = true
5+
edition.workspace = true
6+
license.workspace = true
7+
authors.workspace = true
8+
repository.workspace = true
9+
homepage.workspace = true
10+
publish.workspace = true
11+
rust-version.workspace = true
12+
13+
[dependencies]
14+
serde = { workspace = true , features = ["derive"] }
15+
alloy = { workspace = true }
16+
ffs-environment = { workspace = true }
17+
anyhow = { workspace = true }
18+
secure-signer-loader = { workspace = true }
19+
secure-signer = { workspace = true}
20+
21+
[lints]
22+
workspace = true

0 commit comments

Comments
 (0)