Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions contrib/indexer-service/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,6 @@ trigger_value_divisor = 500_000

[tap.sender_aggregator_endpoints]
"ACCOUNT0_ADDRESS_PLACEHOLDER" = "http://tap-aggregator:7610"

[horizon]
enabled = false
8 changes: 0 additions & 8 deletions contrib/indexer-service/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ until pg_isready -h postgres -U postgres -d indexer_components_1; do
sleep 2
done

echo "Applying database migrations..."
# Get all the migration UP files and sort them by name
for migration_file in $(find /opt/migrations -name "*.up.sql" | sort); do
echo "Applying migration: $(basename $migration_file)"
psql -h postgres -U postgres postgres -f $migration_file
done
echo "Database migrations completed."

# Get network subgraph deployment ID
NETWORK_DEPLOYMENT=$(curl -s "http://graph-node:8000/subgraphs/name/graph-network" \
-H 'content-type: application/json' \
Expand Down
7 changes: 0 additions & 7 deletions contrib/tap-agent/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,6 @@ fi

echo "Postgres is ready!"

echo "Ensuring database tables exist..."
for migration_file in $(find /opt/migrations -name "*.up.sql" | sort); do
echo "Applying migration if needed: $(basename $migration_file)"
psql -h postgres -U postgres indexer_components_1 -f $migration_file 2>/dev/null || true
done
echo "Database setup completed."

# Wait for indexer-service to be ready with timeout
echo "Waiting for indexer-service to be ready..."
MAX_ATTEMPTS=30
Expand Down
3 changes: 3 additions & 0 deletions crates/config/default_values.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,6 @@ trigger_value_divisor = 10
timestamp_buffer_secs = 60
request_timeout_secs = 5
max_receipts_per_request = 10000

[horizon]
enabled = false
3 changes: 3 additions & 0 deletions crates/config/maximal-config-example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,6 @@ hardhat = "100"

[dips.additional_networks]
"eip155:1337" = "hardhat"

[horizon]
enabled = false
10 changes: 10 additions & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ pub struct Config {
pub service: ServiceConfig,
pub tap: TapConfig,
pub dips: Option<DipsConfig>,
pub horizon: HorizonConfig,
}

// Newtype wrapping Config to be able use serde_ignored with Figment
Expand Down Expand Up @@ -445,6 +446,15 @@ pub struct RavRequestConfig {
pub max_receipts_per_request: u64,
}

/// Configuration for the horizon
/// standard
#[derive(Debug, Default, Deserialize)]
#[cfg_attr(test, derive(PartialEq))]
pub struct HorizonConfig {
/// Whether the horizon is enabled or not
pub enabled: bool,
}

#[cfg(test)]
mod tests {
use std::{
Expand Down
85 changes: 58 additions & 27 deletions crates/tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ pub struct SenderAccountConfig {
/// Senders that are allowed to spend up to `max_amount_willing_to_lose_grt`
/// over the escrow balance
pub trusted_senders: HashSet<Address>,

#[doc(hidden)]
pub horizon_enabled: bool,
}

impl SenderAccountConfig {
Expand All @@ -415,6 +418,7 @@ impl SenderAccountConfig {
rav_request_timeout: config.tap.rav_request.request_timeout_secs,
tap_sender_timeout: config.tap.sender_timeout_secs,
trusted_senders: config.tap.trusted_senders.clone(),
horizon_enabled: config.horizon.enabled,
}
}
}
Expand Down Expand Up @@ -623,6 +627,13 @@ impl State {
sender_balance = self.sender_balance.to_u128(),
"Denying sender."
);
// Check if this is horizon like sender and if it is actually enable,
// otherwise just ignore.
// FIXME: This should be removed once full horizon support
// is implemented!
if matches!(self.sender_type, SenderType::Horizon) && !self.config.horizon_enabled {
return;
}

SenderAccount::deny_sender(self.sender_type, &self.pgpool, self.sender).await;
self.denied = true;
Expand Down Expand Up @@ -654,16 +665,18 @@ impl State {
.expect("Should not fail to delete from denylist");
}
SenderType::Horizon => {
sqlx::query!(
r#"
if self.config.horizon_enabled {
sqlx::query!(
r#"
DELETE FROM tap_horizon_denylist
WHERE sender_address = $1
"#,
self.sender.encode_hex(),
)
.execute(&self.pgpool)
.await
.expect("Should not fail to delete from denylist");
self.sender.encode_hex(),
)
.execute(&self.pgpool)
.await
.expect("Should not fail to delete from horizon denylist");
}
}
}
self.denied = false;
Expand Down Expand Up @@ -798,20 +811,26 @@ impl Actor for SenderAccount {
.map(|record| (record.allocation_id, record.value_aggregate))
.collect(),
// Get all ravs from v2 table
SenderType::Horizon => sqlx::query!(
r#"
SenderType::Horizon => {
if config.horizon_enabled {
sqlx::query!(
r#"
SELECT allocation_id, value_aggregate
FROM tap_horizon_ravs
WHERE payer = $1 AND last AND NOT final;
"#,
sender_id.encode_hex(),
)
.fetch_all(&pgpool)
.await
.expect("Should not fail to fetch from scalar_tap_ravs")
.into_iter()
.map(|record| (record.allocation_id, record.value_aggregate))
.collect(),
sender_id.encode_hex(),
)
.fetch_all(&pgpool)
.await
.expect("Should not fail to fetch from \"horizon\" scalar_tap_ravs")
.into_iter()
.map(|record| (record.allocation_id, record.value_aggregate))
.collect()
} else {
vec![]
}
}
};

// get a list from the subgraph of which subgraphs were already redeemed and were not marked as final
Expand Down Expand Up @@ -845,7 +864,11 @@ impl Actor for SenderAccount {
// TODO Implement query for unfinalized v2 transactions
// Depends on Escrow Subgraph Schema
SenderType::Horizon => {
todo!()
if config.horizon_enabled {
todo!("Implement query for unfinalized v2 transactions, It depends on Escrow Subgraph Schema")
}
// if we have any problems, we don't want to filter out
vec![]
}
};

Expand Down Expand Up @@ -896,20 +919,28 @@ impl Actor for SenderAccount {
.denied
.expect("Deny status cannot be null"),
// Get deny status from the tap horizon table
SenderType::Horizon => sqlx::query!(
r#"
SenderType::Horizon => {
if config.horizon_enabled {
sqlx::query!(
r#"
SELECT EXISTS (
SELECT 1
FROM tap_horizon_denylist
WHERE sender_address = $1
) as denied
"#,
sender_id.encode_hex(),
)
.fetch_one(&pgpool)
.await?
.denied
.expect("Deny status cannot be null"),
sender_id.encode_hex(),
)
.fetch_one(&pgpool)
.await?
.denied
.expect("Deny status cannot be null")
} else {
// If horizon is enabled,
// just ignore this sender
false
}
}
};

let sender_balance = escrow_accounts
Expand Down Expand Up @@ -1419,7 +1450,7 @@ impl SenderAccount {
)
.execute(pool)
.await
.expect("Should not fail to insert into denylist");
.expect("Should not fail to insert into \"horizon\" denylist");
}
}

Expand Down
36 changes: 23 additions & 13 deletions crates/tap-agent/src/agent/sender_accounts_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,17 @@ impl Actor for SenderAccountsManager {
.await;

// v2
let sender_allocation_v2 = select! {
sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation,
_ = tokio::time::sleep(state.config.tap_sender_timeout) => {
panic!("Timeout while getting pending sender allocation ids");
let sender_allocation_v2 = if state.config.horizon_enabled {
select! {
sender_allocation = state.get_pending_sender_allocation_id_v2() => sender_allocation,
_ = tokio::time::sleep(state.config.tap_sender_timeout) => {
panic!("Timeout while getting pending sender allocation ids");
}
}
} else {
HashMap::new()
};

state.sender_ids_v2.extend(sender_allocation_v2.keys());
stream::iter(sender_allocation_v2)
.map(|(sender_id, allocation_ids)| {
Expand Down Expand Up @@ -298,19 +303,24 @@ impl Actor for SenderAccountsManager {

// Start the new_receipts_watcher task that will consume from the `pglistener`
// after starting all senders
state.new_receipts_watcher_handle_v2 = Some(tokio::spawn(
new_receipts_watcher()
.actor_cell(myself.get_cell())
.pglistener(pglistener_v2)
.escrow_accounts_rx(escrow_accounts_v2)
.sender_type(SenderType::Horizon)
.maybe_prefix(prefix)
.call(),
));
state.new_receipts_watcher_handle_v2 = None;

if state.config.horizon_enabled {
state.new_receipts_watcher_handle_v2 = Some(tokio::spawn(
new_receipts_watcher()
.actor_cell(myself.get_cell())
.pglistener(pglistener_v2)
.escrow_accounts_rx(escrow_accounts_v2)
.sender_type(SenderType::Horizon)
.maybe_prefix(prefix)
.call(),
));
}

tracing::info!("SenderAccountManager created!");
Ok(state)
}

async fn post_stop(
&self,
_: ActorRef<Self::Msg>,
Expand Down
2 changes: 2 additions & 0 deletions crates/tap-agent/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub fn get_sender_account_config() -> &'static SenderAccountConfig {
escrow_polling_interval: ESCROW_POLLING_INTERVAL,
tap_sender_timeout: Duration::from_secs(63),
trusted_senders: HashSet::new(),
horizon_enabled: true,
}))
}

Expand Down Expand Up @@ -127,6 +128,7 @@ pub async fn create_sender_account(
escrow_polling_interval: Duration::default(),
tap_sender_timeout: TAP_SENDER_TIMEOUT,
trusted_senders,
horizon_enabled: false,
}));

let network_subgraph = Box::leak(Box::new(
Expand Down
1 change: 1 addition & 0 deletions crates/tap-agent/tests/tap_agent_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub async fn start_agent(
escrow_polling_interval: Duration::from_secs(10),
tap_sender_timeout: Duration::from_secs(30),
trusted_senders: HashSet::new(),
horizon_enabled: false,
}));

let args = SenderAccountsManagerArgs {
Expand Down
4 changes: 2 additions & 2 deletions integration-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use anyhow::Result;
use rav_tests::tap_rav_test;
use rav_tests::test_tap_rav_v1;

mod metrics;
mod rav_tests;
Expand All @@ -13,5 +13,5 @@ use receipt::create_tap_receipt;
#[tokio::main]
async fn main() -> Result<()> {
// Run the TAP receipt test
tap_rav_test().await
test_tap_rav_v1().await
}
7 changes: 2 additions & 5 deletions integration-tests/src/rav_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ const NUM_RECEIPTS: u32 = 3;
const BATCHES: u32 = 2;
const MAX_TRIGGERS: usize = 100;

pub async fn tap_rav_test() -> Result<()> {
// Function to test the tap RAV generation
pub async fn test_tap_rav_v1() -> Result<()> {
// Setup wallet using your MnemonicBuilder
let index: u32 = 0;
let wallet: PrivateKeySigner = MnemonicBuilder::<English>::default()
Expand All @@ -62,9 +63,6 @@ pub async fn tap_rav_test() -> Result<()> {
.build()
.unwrap();

let sender_address = wallet.address();
println!("Using sender address: {}", sender_address);

// Setup HTTP client
let http_client = Arc::new(Client::new());

Expand All @@ -87,7 +85,6 @@ pub async fn tap_rav_test() -> Result<()> {

// Try to find a valid allocation
let response_text = response.text().await?;
println!("Network subgraph response: {}", response_text);

let json_value = serde_json::from_str::<serde_json::Value>(&response_text)?;
let allocation_id = json_value
Expand Down
Loading