Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,327 changes: 1,389 additions & 938 deletions Cargo.lock

Large diffs are not rendered by default.

39 changes: 20 additions & 19 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,45 @@ path = "src/main.rs"
anyhow = "1"
alloy = { version = "1.1.0", default-features = false, features = ["rand"] }
async-trait = "0.1.80"
axum = "0.6"
bigdecimal = { version = "0.3", features = ["serde"] }
axum = "0.8"
bigdecimal = { version = "0.4", features = ["serde"] }
chrono = { version = "0.4.38", features = ["serde"], default-features = false }
clap = { version = "4", features = ["derive", "env"] }
futures = "0.3.30"
hex = "0.4"
humantime-serde = "1.1.1"
hyper = "0.14"
itertools = "0.11"
hyper = "1"
itertools = "0.14"
num = "0.4"
prometheus = "0.13"
prometheus-metric-storage = "0.5.0"
reqwest = "0.11"
prometheus = "0.14"
prometheus-metric-storage = "0.6.0"
reqwest = { version = "0.13", features = ["json"] }
serde = "1"
serde_json = "1"
serde_with = "3"
serde_repr = "0.1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "rt-multi-thread", "signal", "time"] }
toml = "0.7"
tower = "0.4"
tower-http = { version = "0.4", features = ["trace"] }
toml = "0.8"
tower = "0.5"
tower-http = { version = "0.6", features = ["trace"] }
tracing = "0.1"
contracts = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "contracts" }
ethrpc = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "ethrpc" }
observe = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "observe" , features = ["axum-tracing"]}
shared = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "shared" }
dto = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "solvers-dto" }
rate-limit = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "rate-limit" }
number = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "number" }
contracts = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "contracts" }
ethrpc = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "ethrpc" }
observe = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "observe" }
shared = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "shared" }
dto = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "solvers-dto" }
rate-limit = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "rate-limit" }
configs = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "configs" }
number = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "number" }

# Memory allocator
tikv-jemallocator = { version = "0.6", features = ["unprefixed_malloc_on_supported_platforms", "profiling"] }
jemalloc_pprof = { version = "0.8", features = ["symbolize"] }

[dev-dependencies]
ethrpc = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "ethrpc", features = ["test-util"] }
testlib = { git = "https://github.com/cowprotocol/services.git", tag = "v2.345.4", package = "testlib" }
ethrpc = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "ethrpc", features = ["test-util"] }
testlib = { git = "https://github.com/cowprotocol/services.git", tag = "v2.354.3", package = "testlib" }
glob = "0.3"
maplit = "1"
tempfile = "3"
Expand Down
27 changes: 15 additions & 12 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use {
crate::domain::solver::Solver,
axum::extract::DefaultBodyLimit,
observe::distributed_tracing::tracing_axum::{make_span, record_trace_id},
std::{future::Future, net::SocketAddr, sync::Arc},
tokio::sync::oneshot,
observe::tracing::distributed::axum::{make_span, record_trace_id},
std::{future::Future, io, net::SocketAddr, sync::Arc},
tokio::{net::TcpListener, sync::oneshot},
};

mod routes;
Expand All @@ -20,24 +20,27 @@ impl Api {
self,
bind: Option<oneshot::Sender<SocketAddr>>,
shutdown: impl Future<Output = ()> + Send + 'static,
) -> Result<(), hyper::Error> {
) -> Result<(), io::Error> {
let app = axum::Router::new()
.route("/metrics", axum::routing::get(routes::metrics))
.route("/healthz", axum::routing::get(routes::healthz))
.route("/solve", axum::routing::post(routes::solve))
.layer(
tower::ServiceBuilder::new()
.layer(tower_http::trace::TraceLayer::new_for_http().make_span_with(make_span))
.map_request(record_trace_id),
)
.layer(tower_http::trace::TraceLayer::new_for_http().make_span_with(make_span))
.layer(axum::middleware::from_fn(
|request: axum::extract::Request, next: axum::middleware::Next| async {
next.run(record_trace_id(request)).await
},
))
.layer(DefaultBodyLimit::disable())
.with_state(Arc::new(self.solver));

let server = axum::Server::bind(&self.addr).serve(app.into_make_service());
let listener = TcpListener::bind(self.addr).await?;
if let Some(bind) = bind {
let _ = bind.send(server.local_addr());
let _ = bind.send(listener.local_addr()?);
}

server.with_graceful_shutdown(shutdown).await
axum::serve(listener, app)
.with_graceful_shutdown(shutdown)
.await
}
}
2 changes: 1 addition & 1 deletion src/api/routes/solve/dto/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod auction;
pub mod solution;

pub use dto::{auction::Auction, solution::Solutions};
pub use dto::{auction::Auction, solution::SolverResponse};
4 changes: 2 additions & 2 deletions src/api/routes/solve/dto/solution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use {
};

/// Creates a new solution DTO from its domain object.
pub fn from_domain(solutions: &[solution::Solution]) -> super::Solutions {
super::Solutions {
pub fn from_domain(solutions: &[solution::Solution]) -> dto::solution::SolverResponse {
dto::solution::SolverResponse::Solutions {
solutions: solutions
.iter()
.map(|solution| Solution {
Expand Down
2 changes: 1 addition & 1 deletion src/api/routes/solve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub async fn solve(
auction: axum::extract::Json<dto::Auction>,
) -> (
axum::http::StatusCode,
axum::response::Json<Response<dto::Solutions>>,
axum::response::Json<Response<dto::SolverResponse>>,
) {
let handle_request = async {
let auction = match dto::auction::to_domain(&auction) {
Expand Down
11 changes: 1 addition & 10 deletions src/infra/blockchain.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,4 @@
use std::time::Duration;

/// Creates a node RPC instance.
pub fn rpc(url: &reqwest::Url) -> ethrpc::Web3 {
ethrpc::web3(
Default::default(),
reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(10))
.user_agent("cowprotocol-solver-engine/1.0.0"),
url,
"base",
)
ethrpc::web3(Default::default(), url, Some("base"))
}
6 changes: 2 additions & 4 deletions src/infra/config/dex/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,7 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
let (settlement, authenticator) = if let Some(settlement) = config.settlement {
let authenticator = {
let web3 = blockchain::rpc(&config.node_url);
let settlement =
::contracts::alloy::GPv2Settlement::Instance::new(settlement, web3.alloy.clone());
settlement
::contracts::alloy::GPv2Settlement::Instance::new(settlement, web3.provider.clone())
.authenticator()
.call()
.await
Expand Down Expand Up @@ -190,7 +188,7 @@ pub async fn load<T: DeserializeOwned>(path: &Path) -> (super::Config, T) {
.expect("invalid minimum surplus limits"),
concurrent_requests: config.concurrent_requests,
smallest_partial_fill: eth::Ether(config.smallest_partial_fill),
rate_limiting_strategy: rate_limit::Strategy::try_new(
rate_limiting_strategy: configs::rate_limit::Strategy::try_new(
config.back_off_growth_factor,
config.min_back_off,
config.max_back_off,
Expand Down
2 changes: 1 addition & 1 deletion src/infra/config/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct Config {
pub minimum_surplus: MinimumSurplusLimits,
pub concurrent_requests: NonZeroUsize,
pub smallest_partial_fill: eth::Ether,
pub rate_limiting_strategy: rate_limit::Strategy,
pub rate_limiting_strategy: configs::rate_limit::Strategy,
pub gas_offset: eth::Gas,
pub block_stream: Option<CurrentBlockWatcher>,
pub internalize_interactions: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/infra/dex/balancer/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl Serialize for HumanReadableAmount {
where
S: Serializer,
{
self.value().serialize(serializer)
serializer.serialize_str(&self.value().to_plain_string())
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/infra/dex/balancer/query_swap_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,10 @@ impl OnChainQuerySwapProvider {
let web3 = blockchain::rpc(&node_url);
Self {
queries: queries.map(|addr| {
contracts::alloy::BalancerQueries::Instance::new(addr, web3.alloy.clone())
contracts::alloy::BalancerQueries::Instance::new(addr, web3.provider.clone())
}),
v3_batch_router: v3_batch_router.map(|addr| v3::Router::new(addr, web3.alloy.clone())),
v3_batch_router: v3_batch_router
.map(|addr| v3::Router::new(addr, web3.provider.clone())),
settlement,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/infra/dex/simulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Simulator {
/// Create a new simulator for computing DEX swap gas usage.
pub fn new(url: &reqwest::Url, settlement: Address, authenticator: Address) -> Self {
Self {
web3: blockchain::rpc(url).alloy,
web3: blockchain::rpc(url).provider,
settlement,
authenticator,
}
Expand Down
4 changes: 2 additions & 2 deletions src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn run_with(args: cli::Args, bind: Option<oneshot::Sender<SocketAddr>>) {
args.use_json_logs,
None,
);
observe::tracing::initialize_reentrant(&obs_config);
observe::tracing::init::initialize_reentrant(&obs_config);
#[cfg(unix)]
observe::heap_dump_handler::spawn_heap_dump_handler();
tracing::info!("running solver engine with {args:#?}");
Expand All @@ -57,7 +57,7 @@ async fn run_with(args: cli::Args, bind: Option<oneshot::Sender<SocketAddr>>) {
));
Solver::Dex(solver::Dex::new(
dex::Dex::Balancer(Box::new(
dex::balancer::Sor::new(config.sor, web3.alloy, query_swap_provider)
dex::balancer::Sor::new(config.sor, web3.provider, query_swap_provider)
.expect("invalid Balancer configuration"),
)),
config.base.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/tests/balancer/market_order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ async fn sell_v3() {
#[tokio::test]
async fn buy_v2() {
let obs_config = observe::Config::new("", tracing::Level::DEBUG.into(), false, None);
observe::tracing::initialize_reentrant(&obs_config);
observe::tracing::init::initialize_reentrant(&obs_config);

let api = mock::http::setup(vec![mock::http::Expectation::Post {
path: mock::http::Path::exact("sor"),
Expand Down
2 changes: 1 addition & 1 deletion src/tests/balancer/mock_query_swap_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn test_mock_provider_affects_swap_result() {
};
let web3 = ethrpc::mock::web3();

let sor = balancer::Sor::new(config, web3.alloy.clone(), Box::new(mock_provider))
let sor = balancer::Sor::new(config, web3.provider.clone(), Box::new(mock_provider))
.expect("Failed to create Sor with mock provider");

// Create a test order (sell order)
Expand Down
8 changes: 4 additions & 4 deletions src/tests/mock/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub async fn setup(mut expectations: Vec<Expectation>) -> ServerHandle {

let app = axum::Router::new()
.route(
"/*path",
"/{*path}",
axum::routing::get(
|axum::extract::State(state),
axum::extract::Path(path),
Expand Down Expand Up @@ -169,9 +169,9 @@ pub async fn setup(mut expectations: Vec<Expectation>) -> ServerHandle {
failed_assert: failed_assert.clone(),
});

let server = axum::Server::bind(&"0.0.0.0:0".parse().unwrap()).serve(app.into_make_service());
let address = server.local_addr();
let handle = tokio::spawn(async move { server.await.unwrap() });
let listener = tokio::net::TcpListener::bind("0.0.0.0:0").await.unwrap();
let address = listener.local_addr().unwrap();
let handle = tokio::spawn(async move { axum::serve(listener, app).await.unwrap() });

ServerHandle {
handle,
Expand Down
Loading