Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,058 changes: 562 additions & 496 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ publish = false
default-run = "sundae-sync-v2"

[dependencies]
pallas = "0.33"
pallas = { version = "1.0.0-alpha.4" }

tokio = { version = "1.37", features = ["rt-multi-thread", "macros", "signal"] }
anyhow = "1.0"
clap = { version = "4.5", features = ["derive", "env"] }
hex = "0.4.3"
tracing = "0.1"
tracing-subscriber = "0.3"
utxorpc = "0.11"
utxorpc = "0.13"
aws-config = "1.5.4"
aws-sdk-dynamodb = "1.38.0"
aws-sdk-s3 = "1.41.0"
Expand All @@ -31,6 +31,8 @@ tokio-util = "0.7.11"
bytes = "1.11"
prost = { version = "0.14", features = ["no-recursion-limit"] }
futures = "0.3.30"
num-bigint = "0.4.6"
rustls = "0.23"
serde_bytes = "0.11.15"
serde_bytes_base64 = "0.1.1"

Expand Down
47 changes: 43 additions & 4 deletions src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use serde::{Deserialize, Serialize};
use serde_bytes_base64::Bytes;
use serde_dynamo::{to_attribute_value, to_item};
use tracing::trace;
use utxorpc::spec::cardano::{Block, Datum as utxorpcDatum, Multiasset, Script};
use utxorpc::spec::cardano::{asset::Quantity, Block, Datum as utxorpcDatum, Redeemer, Script};

use crate::utils::elapsed;
use crate::utils::{bigint_to_string, bigint_to_u64, elapsed};

#[derive(Clone)]
pub struct Archive {
Expand All @@ -33,6 +33,19 @@ pub struct HeightRef {
pub location: String,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Multiasset {
pub policy_id: Bytes,
pub assets: Vec<Asset>,
pub redeemer: Option<Redeemer>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Asset {
pub name: Bytes,
pub output_coin: String,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum Datum {
Expand All @@ -58,8 +71,30 @@ impl From<utxorpc::spec::cardano::TxOutput> for TxOutput {
fn from(value: utxorpc::spec::cardano::TxOutput) -> Self {
TxOutput {
address: value.address.to_vec().into(),
coin: value.coin,
assets: value.assets.to_vec(),
coin: value
.coin
.as_ref()
.and_then(bigint_to_u64)
.expect("value did not fit in bigint"),
assets: value
.assets
.into_iter()
.map(|m| Multiasset {
policy_id: m.policy_id.to_vec().into(),
redeemer: m.redeemer,
assets: m
.assets
.into_iter()
.map(|a| Asset {
name: a.name.to_vec().into(),
output_coin: match a.quantity {
Some(Quantity::OutputCoin(o)) => bigint_to_string(&o),
_ => "0".to_string(),
},
})
.collect(),
})
.collect(),
datum: value
.datum
.map(|d| Datum::Raw(d.original_cbor.to_vec().into())),
Expand Down Expand Up @@ -95,6 +130,10 @@ impl LedgerContext for NoContext {
) -> Option<pallas::interop::utxorpc::UtxoMap> {
None
}

fn get_slot_timestamp(&self, _slot: u64) -> Option<u64> {
None
}
}

impl Archive {
Expand Down
9 changes: 8 additions & 1 deletion src/bin/restore-history/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
info!("Starting sundae-sync-v2 restore history");
if rustls::crypto::CryptoProvider::install_default(rustls::crypto::ring::default_provider())
.is_err()
{
warn!("Could not configure CryptoProvider");
}

let archive = construct_archive(&args).await?;

Expand Down Expand Up @@ -155,7 +160,9 @@ async fn restore_history(
return Err(e).context("archive.save failed after 10 retries");
}
let delay = Duration::from_millis(100 * 2u64.pow(attempt.min(7)));
warn!("archive.save failed (attempt {attempt}/10, retrying in {delay:?}): {e:#}");
warn!(
"archive.save failed (attempt {attempt}/10, retrying in {delay:?}): {e:#}"
);
sleep(delay).await;
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/broadcast/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl Broadcaster {
// For each destination
for destination in &mut self.destinations {
// Ignore this destination if we're further back in the chain
if destination.last_seen_point.index > message.advance.index {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this gracefully upgrade?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a rename of a grpc field, doesn't affect semantics

if destination.last_seen_point.slot > message.advance.slot {
continue;
}
// Check if we *should* send to this destination,
Expand Down
70 changes: 45 additions & 25 deletions src/broadcast/destination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl Destination {
.await
{
Ok(_) => {
info!("Repaired destination {} to point {}", self.pk, point.index);
info!("Repaired destination {} to point {}", self.pk, point.slot);
}
Err(e) => {
// Check if this is a conditional check failure
Expand All @@ -185,7 +185,7 @@ impl Destination {
// Another worker already advanced past this point - we're caught up
info!(
"Destination {} already advanced past repair point {}, repair complete",
self.pk, point.index
self.pk, point.slot
);
break;
}
Expand Down Expand Up @@ -216,7 +216,7 @@ impl Destination {
}

pub fn point_to_string(point: &BlockRef) -> String {
format!("{}/{}", point.index, point.hash.encode_hex::<String>())
format!("{}/{}", point.slot, point.hash.encode_hex::<String>())
}
pub fn serialize_point<S>(point: &BlockRef, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
Expand All @@ -243,9 +243,14 @@ pub fn string_to_point(s: String) -> Result<BlockRef> {
if parts.len() != 2 {
bail!("invalid point: multiple slashes!");
}
let index = parts[0].parse()?;
let slot = parts[0].parse()?;
let hash = Bytes::from_iter(hex::decode(parts[1])?);
Ok(BlockRef { index, hash })
Ok(BlockRef {
slot,
hash,
height: 0,
timestamp: 0,
})
}
pub fn deserialize_point<'de, D>(deserializer: D) -> std::result::Result<BlockRef, D::Error>
where
Expand Down Expand Up @@ -280,8 +285,10 @@ mod tests {
#[test]
fn test_point_to_string() {
let point = BlockRef {
index: 12345,
slot: 12345,
hash: bytes::Bytes::from(vec![0xde, 0xad, 0xbe, 0xef]),
height: 0,
timestamp: 0,
};
let result = point_to_string(&point);
assert_eq!(result, "12345/deadbeef");
Expand All @@ -291,7 +298,7 @@ mod tests {
fn test_string_to_point() {
let input = "12345/deadbeef".to_string();
let result = string_to_point(input).unwrap();
assert_eq!(result.index, 12345);
assert_eq!(result.slot, 12345);
assert_eq!(
result.hash,
bytes::Bytes::from(vec![0xde, 0xad, 0xbe, 0xef])
Expand All @@ -301,12 +308,14 @@ mod tests {
#[test]
fn test_point_roundtrip() {
let original = BlockRef {
index: 98765,
slot: 98765,
hash: bytes::Bytes::from(vec![0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef]),
height: 0,
timestamp: 0,
};
let serialized = point_to_string(&original);
let deserialized = string_to_point(serialized).unwrap();
assert_eq!(original.index, deserialized.index);
assert_eq!(original.slot, deserialized.slot);
assert_eq!(original.hash, deserialized.hash);
}

Expand Down Expand Up @@ -337,17 +346,23 @@ mod tests {
filter: None,
sequence_number: Some("12345".to_string()),
last_seen_point: BlockRef {
index: 100,
slot: 100,
hash: bytes::Bytes::from(vec![0xaa, 0xbb, 0xcc, 0xdd]),
height: 0,
timestamp: 0,
},
recovery_points: vec![
BlockRef {
index: 90,
slot: 90,
hash: bytes::Bytes::from(vec![0x11, 0x22, 0x33, 0x44]),
height: 0,
timestamp: 0,
},
BlockRef {
index: 95,
slot: 95,
hash: bytes::Bytes::from(vec![0x55, 0x66, 0x77, 0x88]),
height: 0,
timestamp: 0,
},
],
enabled: true,
Expand All @@ -358,18 +373,15 @@ mod tests {
let deserialized: Destination = serde_json::from_str(&json).unwrap();

assert_eq!(dest.pk, deserialized.pk);
assert_eq!(
dest.last_seen_point.index,
deserialized.last_seen_point.index
);
assert_eq!(dest.last_seen_point.slot, deserialized.last_seen_point.slot);
assert_eq!(dest.last_seen_point.hash, deserialized.last_seen_point.hash);
assert_eq!(
dest.recovery_points.len(),
deserialized.recovery_points.len()
);
assert_eq!(
dest.recovery_points[0].index,
deserialized.recovery_points[0].index
dest.recovery_points[0].slot,
deserialized.recovery_points[0].slot
);
}

Expand All @@ -378,26 +390,30 @@ mod tests {
#[test]
fn test_point_serialization_with_empty_hash() {
let point = BlockRef {
index: 0,
slot: 0,
hash: bytes::Bytes::from(vec![]),
height: 0,
timestamp: 0,
};
let serialized = point_to_string(&point);
assert_eq!(serialized, "0/");
let deserialized = string_to_point(serialized).unwrap();
assert_eq!(deserialized.index, 0);
assert_eq!(deserialized.slot, 0);
assert_eq!(deserialized.hash, bytes::Bytes::from(vec![]));
}

#[test]
fn test_point_serialization_with_large_index() {
let point = BlockRef {
index: u64::MAX,
slot: u64::MAX,
hash: bytes::Bytes::from(vec![0xff, 0xff]),
height: 0,
timestamp: 0,
};
let serialized = point_to_string(&point);
assert_eq!(serialized, "18446744073709551615/ffff");
let deserialized = string_to_point(serialized).unwrap();
assert_eq!(deserialized.index, u64::MAX);
assert_eq!(deserialized.slot, u64::MAX);
}

#[test]
Expand Down Expand Up @@ -428,12 +444,14 @@ mod tests {
0xcc, 0xdd, 0xee, 0xff,
];
let point = BlockRef {
index: 42,
slot: 42,
hash: bytes::Bytes::from(hash.clone()),
height: 0,
timestamp: 0,
};
let serialized = point_to_string(&point);
let deserialized = string_to_point(serialized).unwrap();
assert_eq!(deserialized.index, 42);
assert_eq!(deserialized.slot, 42);
assert_eq!(deserialized.hash, bytes::Bytes::from(hash));
}

Expand All @@ -448,8 +466,10 @@ mod tests {
fn test_point_to_string_lowercase_hex() {
// Verify that hex encoding produces lowercase
let point = BlockRef {
index: 1,
slot: 1,
hash: bytes::Bytes::from(vec![0xAB, 0xCD, 0xEF]),
height: 0,
timestamp: 0,
};
let result = point_to_string(&point);
assert_eq!(result, "1/abcdef");
Expand Down
20 changes: 12 additions & 8 deletions src/broadcast/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl FilterConfig {
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::u64_to_bigint;

#[test]
fn test_token_filter_policy_match() {
Expand Down Expand Up @@ -175,8 +176,9 @@ mod tests {
policy_id: policy_id.clone().into(),
assets: vec![utxorpc::spec::cardano::Asset {
name: asset_name.clone().into(),
output_coin: 100,
mint_coin: 0,
quantity: Some(utxorpc::spec::cardano::asset::Quantity::OutputCoin(
u64_to_bigint(100),
)),
}],
redeemer: None,
}];
Expand All @@ -195,8 +197,9 @@ mod tests {
policy_id: vec![0xff, 0xee, 0xdd].into(),
assets: vec![utxorpc::spec::cardano::Asset {
name: vec![0x04, 0x05].into(),
output_coin: 100,
mint_coin: 0,
quantity: Some(utxorpc::spec::cardano::asset::Quantity::OutputCoin(
u64_to_bigint(100),
)),
}],
redeemer: None,
}];
Expand All @@ -216,8 +219,9 @@ mod tests {
policy_id: policy_id.clone().into(),
assets: vec![utxorpc::spec::cardano::Asset {
name: vec![0xff, 0xee].into(),
output_coin: 100,
mint_coin: 0,
quantity: Some(utxorpc::spec::cardano::asset::Quantity::OutputCoin(
u64_to_bigint(100),
)),
}],
redeemer: None,
}];
Expand Down Expand Up @@ -371,7 +375,7 @@ mod tests {
let tx = Tx {
withdrawals: vec![utxorpc::spec::cardano::Withdrawal {
reward_account: credential.into(),
coin: 1000,
coin: Some(u64_to_bigint(1000)),
redeemer: None,
}],
..Default::default()
Expand All @@ -389,7 +393,7 @@ mod tests {
let tx = Tx {
withdrawals: vec![utxorpc::spec::cardano::Withdrawal {
reward_account: vec![0xff, 0xee, 0xdd].into(),
coin: 1000,
coin: Some(u64_to_bigint(1000)),
redeemer: None,
}],
..Default::default()
Expand Down
Loading