Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions protocol/pcp/dlu/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Deployed Logic Units (DLU)

DLUs are units of logic that are not generally run within an independent process, but which must be deployed somewhere. Smart contracts are DLU.

In this directory, we include DLU and the means via which to deploy them to various environments.
35 changes: 35 additions & 0 deletions protocol/pcp/manager/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
[package]
name = "postconfirmationssettlement-manager"
version = { workspace = true }
edition = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
publish = { workspace = true }
rust-version = { workspace = true }

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
postconfirmationsconfig = { workspace = true }
postconfirmationssettlement-client = { workspace = true }
movement-types = { workspace = true }

anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
tokio = { workspace = true }
tokio-stream = { workspace = true }
serde_json = { workspace = true }

[dev-dependencies]
postconfirmationssettlement-client = { workspace = true, features = ["mock"] }

[features]
default = ["stub"]
stub = []

[lints]
workspace = true
18 changes: 18 additions & 0 deletions protocol/pcp/manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use pcp_types::block_commitment::{SuperBlockCommitment, SuperBlockCommitmentEvent};
use tokio_stream::Stream;

mod manager;

pub use manager::Manager as PcpSettlementManager;

pub type CommitmentEventStream =
std::pin::Pin<Box<dyn Stream<Item = Result<SuperBlockCommitmentEvent, anyhow::Error>> + Send>>;

#[async_trait::async_trait]
pub trait PcpSettlementManagerOperations {
/// Adds a block commitment to the manager queue.
async fn post_block_commitment(
&self,
block_commitment: SuperBlockCommitment,
) -> Result<(), anyhow::Error>;
}
282 changes: 282 additions & 0 deletions protocol/pcp/manager/src/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
use crate::{CommitmentEventStream, PcpSettlementManagerOperations, SuperBlockCommitmentEvent};

use pcp_config::Config;
use postconfirmations_settlement_client::PcpSettlementClientOperations;
use pcp_types::block_commitment::{
SuperBlockCommitment, SuperBlockCommitmentRejectionReason,
};

use async_stream::stream;
use async_trait::async_trait;
use futures::future::{self, Either};
use tokio::sync::mpsc;
use tokio::time;
use tokio_stream::StreamExt;

use std::collections::BTreeMap;
use std::mem;
use std::time::Duration;

/// Public handle for the PCP settlement manager.
pub struct Manager {
sender: mpsc::Sender<SuperBlockCommitment>,
}

impl Manager {
/// Creates a new PCP settlement manager.
///
/// Returns the handle with the public API and the stream to receive commitment events.
/// The stream needs to be polled to drive the PCP settlement client and
/// process the commitments.
pub fn new<C: PcpSettlementClientOperations + Send + 'static>(
client: C,
config: &Config,
) -> (Self, CommitmentEventStream) {
let batch_timeout = Duration::from_millis(config.transactions.batch_timeout);
let (sender, receiver) = mpsc::channel(16);
let event_stream = process_commitments(receiver, client, batch_timeout);
(Self { sender }, event_stream)
}
}

#[async_trait]
impl PcpSettlementManagerOperations for Manager {
async fn post_block_commitment(
&self,
block_commitment: SuperBlockCommitment,
) -> Result<(), anyhow::Error> {
self.sender.send(block_commitment).await?;
Ok(())
}
}

fn process_commitments<C: PcpSettlementClientOperations + Send + 'static>(
mut receiver: mpsc::Receiver<SuperBlockCommitment>,
client: C,
batch_timeout: Duration,
) -> CommitmentEventStream {
// Can't mix try_stream! and select!, see https://github.com/tokio-rs/async-stream/issues/63
Box::pin(stream! {
let mut settlement_stream = client.stream_block_commitments().await?;
let mut max_height = client.get_max_tolerable_block_height().await?;
let mut ahead_of_settlement = false;
let mut commitments_to_settle = BTreeMap::new();
let mut batch_acc = Vec::new();
let mut batch_ready = Either::Left(future::pending::<()>());
loop {
tokio::select! {
Some(block_commitment) = receiver.recv(), if !ahead_of_settlement => {
commitments_to_settle.insert(
block_commitment.height(),
block_commitment.commitment().clone(),
);
if block_commitment.height() > max_height {
// Can't post this commitment to the contract yet.
// Post the previously accumulated commitments as a batch
// and pause reading from input.
ahead_of_settlement = true;
let batch = mem::replace(&mut batch_acc, Vec::new());
if let Err(e) = client.post_block_commitment_batch(batch).await {
yield Err(e);
break;
}
}
// If this commitment starts a new batch, start the timeout
if batch_acc.is_empty() {
batch_ready = Either::Right(Box::pin(time::sleep(batch_timeout)));
}
batch_acc.push(block_commitment);
}
_ = &mut batch_ready => {
// Batch timeout has expired, post the commitments we have now
let batch = mem::replace(&mut batch_acc, Vec::new());
if let Err(e) = client.post_block_commitment_batch(batch).await {
yield Err(e);
break;
}
// Disable the batch timeout
batch_ready = Either::Left(future::pending::<()>());
}
Some(res) = settlement_stream.next() => {
let settled_commitment = match res {
Ok(commitment) => commitment,
Err(e) => {
yield Err(e);
break;
}
};

let height = settled_commitment.height();
if let Some(commitment) = commitments_to_settle.remove(&height) {
let event = if commitment == settled_commitment.commitment() {
SuperBlockCommitmentEvent::Accepted(settled_commitment)
} else {
SuperBlockCommitmentEvent::Rejected {
height,
reason: SuperBlockCommitmentRejectionReason::InvalidCommitment,
}
};
yield Ok(event);
} else if let Some((&lh, _)) = commitments_to_settle.last_key_value() {
if lh < height {
// Settlement has left some commitments behind, but the client could
// deliver them of order?
todo!("Handle falling behind on settlement")
}
}
// Remove back-pressure if we can proceed settling new blocks.
if ahead_of_settlement {
let new_max_height = match client.get_max_tolerable_block_height().await {
Ok(h) => h,
Err(e) => {
yield Err(e);
break;
}
};
if new_max_height > max_height {
max_height = new_max_height;
ahead_of_settlement = false;
}
}
}
else => break
}
}
})
}

#[cfg(test)]
mod tests {
use super::*;
use postconfirmations_settlement_client::mock::PcpSettlementClient;
use pcp_types::block_commitment::{Commitment, SuperBlockCommitment};

#[tokio::test]
async fn test_block_commitment_accepted() -> Result<(), anyhow::Error> {
let config = Config::default();
let mut client = PcpSettlementClient::new();
client.block_lead_tolerance = 1;
let (manager, mut event_stream) = Manager::new(client.clone(), &config);
let commitment = SuperBlockCommitment::new(1, Default::default(), Commitment::new([1; 32]));
manager.post_block_commitment(commitment.clone()).await?;
let commitment2 =
SuperBlockCommitment::new(2, Default::default(), Commitment::new([2; 32]));
manager.post_block_commitment(commitment2).await?;
let item = event_stream.next().await;
let res = item.unwrap();
let event = res.unwrap();
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment));
Ok(())
}

#[tokio::test]
async fn test_block_commitment_rejected() -> Result<(), anyhow::Error> {
let config = Config::default();
let mut client = PcpSettlementClient::new();
client.block_lead_tolerance = 1;
let (manager, mut event_stream) = Manager::new(client.clone(), &config);
let commitment = SuperBlockCommitment::new(1, Default::default(), Commitment::new([1; 32]));
client
.override_block_commitment(SuperBlockCommitment::new(
1,
Default::default(),
Commitment::new([3; 32]),
))
.await;
manager.post_block_commitment(commitment.clone()).await?;
let commitment2 =
SuperBlockCommitment::new(2, Default::default(), Commitment::new([2; 32]));
manager.post_block_commitment(commitment2).await?;
let item = event_stream.next().await;
let res = item.unwrap();
let event = res.unwrap();
assert_eq!(
event,
SuperBlockCommitmentEvent::Rejected {
height: 1,
reason: SuperBlockCommitmentRejectionReason::InvalidCommitment,
}
);
Ok(())
}

#[tokio::test]
async fn test_back_pressure() -> Result<(), anyhow::Error> {
let config = Config::default();
let mut client = PcpSettlementClient::new();
client.block_lead_tolerance = 2;
client.pause_after(2).await;
let (manager, mut event_stream) = Manager::new(client.clone(), &config);

let commitment1 =
SuperBlockCommitment::new(1, Default::default(), Commitment::new([1; 32]));
manager.post_block_commitment(commitment1.clone()).await?;
let commitment2 =
SuperBlockCommitment::new(2, Default::default(), Commitment::new([2; 32]));
manager.post_block_commitment(commitment2.clone()).await?;
let commitment3 =
SuperBlockCommitment::new(3, Default::default(), Commitment::new([3; 32]));
manager.post_block_commitment(commitment3.clone()).await?;

let event = event_stream.next().await.expect("stream has ended")?;
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment1.clone()));
let event = event_stream.next().await.expect("stream has ended")?;
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment2.clone()));

// The batch of first two should have been posted,
// the third commitment is batched in the manager.
assert_eq!(client.get_commitment_at_height(1).await?, Some(commitment1.clone()));
assert_eq!(client.get_commitment_at_height(2).await?, Some(commitment2.clone()));
assert_eq!(client.get_commitment_at_height(3).await?, None);

// Unblock the client, allowing processing of commitments to resume.
client.resume().await;

let commitment4 =
SuperBlockCommitment::new(4, Default::default(), Commitment::new([4; 32]));
manager.post_block_commitment(commitment4).await?;
let commitment5 =
SuperBlockCommitment::new(5, Default::default(), Commitment::new([5; 32]));
manager.post_block_commitment(commitment5).await?;

let event = event_stream.next().await.expect("stream has ended")?;
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment3.clone()));

Ok(())
}

#[tokio::test]
async fn test_batch_timeout() -> Result<(), anyhow::Error> {
let mut config = Config::default();
config.transactions.batch_timeout = 100;
let client = PcpSettlementClient::new();
let (manager, mut event_stream) = Manager::new(client.clone(), &config);

let commitment1 =
SuperBlockCommitment::new(1, Default::default(), Commitment::new([1; 32]));
manager.post_block_commitment(commitment1.clone()).await?;
let commitment2 =
SuperBlockCommitment::new(2, Default::default(), Commitment::new([2; 32]));
manager.post_block_commitment(commitment2.clone()).await?;

let item = time::timeout(Duration::from_secs(2), event_stream.next())
.await
.expect("no timeout");
let event = item.expect("stream has ended")?;
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment1.clone()));
let event = event_stream.next().await.expect("stream has ended")?;
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment2.clone()));

let commitment3 =
SuperBlockCommitment::new(3, Default::default(), Commitment::new([3; 32]));
manager.post_block_commitment(commitment3.clone()).await?;

let item = time::timeout(Duration::from_secs(2), event_stream.next())
.await
.expect("no timeout");
let event = item.expect("stream has ended")?;
assert_eq!(event, SuperBlockCommitmentEvent::Accepted(commitment3));

Ok(())
}
}
22 changes: 22 additions & 0 deletions protocol/pcp/util/config/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "pcp-config"
description = "Configuration of the PCP settlement client"
version.workspace = true
edition.workspace = true
license.workspace = true
authors.workspace = true
repository.workspace = true
homepage.workspace = true
publish.workspace = true
rust-version.workspace = true

[dependencies]
serde = { workspace = true , features = ["derive"] }
alloy = { workspace = true }
ffs-environment = { workspace = true }
anyhow = { workspace = true }
secure-signer-loader = { workspace = true }
secure-signer = { workspace = true}

[lints]
workspace = true
Loading