Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 1 addition & 27 deletions crates/apollo_deployments/src/deployment_definitions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
use apollo_node_config::component_execution_config::DEFAULT_INVALID_PORT;
use serde::Serialize;
use static_assertions::const_assert_ne;
use strum::EnumIter;
use strum_macros::{AsRefStr, Display};

Expand All @@ -9,34 +7,10 @@ use strum_macros::{AsRefStr, Display};
mod deployment_definitions_test;

pub(crate) const CONFIG_BASE_DIR: &str = "crates/apollo_deployments/resources/";
pub(crate) const INFRA_PORT_PLACEHOLDER: u16 = 1;
const_assert_ne!(INFRA_PORT_PLACEHOLDER, DEFAULT_INVALID_PORT);
pub(crate) const RETRIES_FOR_L1_SERVICES: usize = 0;

const BASE_APP_CONFIGS_DIR_PATH: &str = "crates/apollo_deployments/resources/app_configs";

// TODO(Nadin): Integrate this logic with `ComponentConfigInService` once the merge from main-14.0
// is complete.

#[derive(Clone, Copy, Debug, EnumIter, Display, Serialize, Ord, PartialEq, Eq, PartialOrd)]
pub enum InfraServicePort {
Batcher,
ClassManager,
Committer,
Gateway,
L1GasPriceProvider,
L1Provider,
Mempool,
SierraCompiler,
SignatureManager,
StateSync,
}

impl InfraServicePort {
pub fn get_port(&self) -> u16 {
INFRA_PORT_PLACEHOLDER
}
}

#[derive(
Hash, Clone, Debug, Display, Serialize, PartialEq, Eq, PartialOrd, Ord, EnumIter, AsRefStr,
)]
Expand Down
66 changes: 18 additions & 48 deletions crates/apollo_deployments/src/deployments/distributed.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap};

use apollo_infra::component_client::DEFAULT_RETRIES;
use apollo_node_config::component_config::ComponentConfig;
Expand All @@ -10,19 +10,14 @@ use serde::Serialize;
use strum::{Display, IntoEnumIterator};
use strum_macros::{AsRefStr, EnumIter};

use crate::deployment_definitions::{
ComponentConfigInService,
InfraServicePort,
INFRA_PORT_PLACEHOLDER,
};
use crate::deployment_definitions::{ComponentConfigInService, RETRIES_FOR_L1_SERVICES};
use crate::scale_policy::ScalePolicy;
use crate::service::{GetComponentConfigs, NodeService, ServiceNameInner};
use crate::utils::validate_ports;
use crate::utils::InfraPortAllocator;

// Number of infra-required ports for a distributed node service distribution.
pub const DISTRIBUTED_NODE_REQUIRED_PORTS_NUM: usize = 10;

pub const RETRIES_FOR_L1_SERVICES: usize = 0;

// TODO(Tsabary): define consts and functions whenever relevant.

#[derive(Clone, Copy, Debug, Display, PartialEq, Eq, Hash, Serialize, AsRefStr, EnumIter)]
Expand Down Expand Up @@ -50,45 +45,20 @@ impl From<DistributedNodeServiceName> for NodeService {

impl GetComponentConfigs for DistributedNodeServiceName {
fn get_component_configs(ports: Option<Vec<u16>>) -> HashMap<NodeService, ComponentConfig> {
// TODO(Tsabary): style this code, i.e., no need to use a mutable map nor the for loop, and
// can simply collect the required values.
let mut service_ports: BTreeMap<InfraServicePort, u16> = BTreeMap::new();
match ports {
Some(ports) => {
// TODO(Nadin): This should compare against DistributedServicePort-specific infra
// ports, not all InfraServicePort variants.
validate_ports(&ports, InfraServicePort::iter().count());
for (service_port, port) in InfraServicePort::iter().zip(ports) {
service_ports.insert(service_port, port);
}
}
None => {
for service_port in InfraServicePort::iter() {
service_ports.insert(service_port, INFRA_PORT_PLACEHOLDER);
}
}
};

let batcher =
Self::Batcher.component_config_pair(service_ports[&InfraServicePort::Batcher]);
let class_manager = Self::ClassManager
.component_config_pair(service_ports[&InfraServicePort::ClassManager]);
let committer =
Self::Committer.component_config_pair(service_ports[&InfraServicePort::Committer]);
let gateway =
Self::Gateway.component_config_pair(service_ports[&InfraServicePort::Gateway]);
let l1_gas_price_provider =
Self::L1.component_config_pair(service_ports[&InfraServicePort::L1GasPriceProvider]);
let l1_provider =
Self::L1.component_config_pair(service_ports[&InfraServicePort::L1Provider]);
let mempool =
Self::Mempool.component_config_pair(service_ports[&InfraServicePort::Mempool]);
let sierra_compiler = Self::SierraCompiler
.component_config_pair(service_ports[&InfraServicePort::SierraCompiler]);
let signature_manager = Self::SignatureManager
.component_config_pair(service_ports[&InfraServicePort::SignatureManager]);
let state_sync =
Self::StateSync.component_config_pair(service_ports[&InfraServicePort::StateSync]);
let mut infra_port_allocator =
InfraPortAllocator::new(ports, DISTRIBUTED_NODE_REQUIRED_PORTS_NUM);
let batcher = Self::Batcher.component_config_pair(infra_port_allocator.next());
let class_manager = Self::ClassManager.component_config_pair(infra_port_allocator.next());
let committer = Self::Committer.component_config_pair(infra_port_allocator.next());
let gateway = Self::Gateway.component_config_pair(infra_port_allocator.next());
let l1_gas_price_provider = Self::L1.component_config_pair(infra_port_allocator.next());
let l1_provider = Self::L1.component_config_pair(infra_port_allocator.next());
let mempool = Self::Mempool.component_config_pair(infra_port_allocator.next());
let sierra_compiler =
Self::SierraCompiler.component_config_pair(infra_port_allocator.next());
let signature_manager =
Self::SignatureManager.component_config_pair(infra_port_allocator.next());
let state_sync = Self::StateSync.component_config_pair(infra_port_allocator.next());

let mut component_config_map = HashMap::<NodeService, ComponentConfig>::new();
for inner_service_name in Self::iter() {
Expand Down
64 changes: 17 additions & 47 deletions crates/apollo_deployments/src/deployments/hybrid.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::collections::{BTreeSet, HashMap};

use apollo_infra::component_client::DEFAULT_RETRIES;
use apollo_node_config::component_config::ComponentConfig;
Expand All @@ -10,16 +10,12 @@ use serde::Serialize;
use strum::{Display, IntoEnumIterator};
use strum_macros::{AsRefStr, EnumIter};

use crate::deployment_definitions::{
ComponentConfigInService,
InfraServicePort,
INFRA_PORT_PLACEHOLDER,
};
use crate::deployments::distributed::RETRIES_FOR_L1_SERVICES;
use crate::deployment_definitions::{ComponentConfigInService, RETRIES_FOR_L1_SERVICES};
use crate::scale_policy::ScalePolicy;
use crate::service::{GetComponentConfigs, NodeService, ServiceNameInner};
use crate::utils::validate_ports;
use crate::utils::InfraPortAllocator;

// Number of infra-required ports for a hybrid node service distribution.
pub const HYBRID_NODE_REQUIRED_PORTS_NUM: usize = 10;

#[derive(Clone, Copy, Debug, Display, PartialEq, Eq, Hash, Serialize, AsRefStr, EnumIter)]
Expand All @@ -42,45 +38,19 @@ impl From<HybridNodeServiceName> for NodeService {

impl GetComponentConfigs for HybridNodeServiceName {
fn get_component_configs(ports: Option<Vec<u16>>) -> HashMap<NodeService, ComponentConfig> {
// TODO(Tsabary): style this code, i.e., no need to use a mutable map nor the for loop, and
// can simply collect the required values.
let mut service_ports: BTreeMap<InfraServicePort, u16> = BTreeMap::new();
match ports {
Some(ports) => {
validate_ports(&ports, InfraServicePort::iter().count());
// TODO(Nadin): This should compare against HybridServicePort-specific infra ports,
// not all InfraServicePort variants.
for (service_port, port) in InfraServicePort::iter().zip(ports) {
service_ports.insert(service_port, port);
}
}
None => {
// Extract the infra service ports for all inner services of the hybrid node.
for service_port in InfraServicePort::iter() {
service_ports.insert(service_port, INFRA_PORT_PLACEHOLDER);
}
}
};

let batcher = Self::Core.component_config_pair(service_ports[&InfraServicePort::Batcher]);
let class_manager =
Self::Core.component_config_pair(service_ports[&InfraServicePort::ClassManager]);
let committer =
Self::Committer.component_config_pair(service_ports[&InfraServicePort::Committer]);
let gateway =
Self::Gateway.component_config_pair(service_ports[&InfraServicePort::Gateway]);
let l1_gas_price_provider =
Self::L1.component_config_pair(service_ports[&InfraServicePort::L1GasPriceProvider]);
let l1_provider =
Self::L1.component_config_pair(service_ports[&InfraServicePort::L1Provider]);
let mempool =
Self::Mempool.component_config_pair(service_ports[&InfraServicePort::Mempool]);
let sierra_compiler = Self::SierraCompiler
.component_config_pair(service_ports[&InfraServicePort::SierraCompiler]);
let signature_manager =
Self::Core.component_config_pair(service_ports[&InfraServicePort::SignatureManager]);
let state_sync =
Self::Core.component_config_pair(service_ports[&InfraServicePort::StateSync]);
let mut infra_port_allocator =
InfraPortAllocator::new(ports, HYBRID_NODE_REQUIRED_PORTS_NUM);
let batcher = Self::Core.component_config_pair(infra_port_allocator.next());
let class_manager = Self::Core.component_config_pair(infra_port_allocator.next());
let committer = Self::Committer.component_config_pair(infra_port_allocator.next());
let gateway = Self::Gateway.component_config_pair(infra_port_allocator.next());
let l1_gas_price_provider = Self::L1.component_config_pair(infra_port_allocator.next());
let l1_provider = Self::L1.component_config_pair(infra_port_allocator.next());
let mempool = Self::Mempool.component_config_pair(infra_port_allocator.next());
let sierra_compiler =
Self::SierraCompiler.component_config_pair(infra_port_allocator.next());
let signature_manager = Self::Core.component_config_pair(infra_port_allocator.next());
let state_sync = Self::Core.component_config_pair(infra_port_allocator.next());

let mut component_config_map = HashMap::<NodeService, ComponentConfig>::new();
for inner_service_name in Self::iter() {
Expand Down
57 changes: 55 additions & 2 deletions crates/apollo_deployments/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,60 @@
use std::collections::HashSet;

/// Validates that the provided ports vector has the correct length and all unique values.
pub(crate) fn validate_ports(ports: &[u16], required_ports_num: usize) {
use apollo_node_config::component_execution_config::DEFAULT_INVALID_PORT;
use static_assertions::const_assert_ne;

const INFRA_PORT_PLACEHOLDER: u16 = 1;
const_assert_ne!(INFRA_PORT_PLACEHOLDER, DEFAULT_INVALID_PORT);

/// A generator-like for setting infra ports in different services.
/// - If constructed with `Some(vec)`: yields values from the vec, up to `expected_len` times.
/// - If constructed with `None`: yields `INFRA_PORT_PLACEHOLDER`, up to `expected_len` times.
/// - On `Drop`: asserts it has been fully depleted (all `expected_len` values were yielded).
pub(crate) struct InfraPortAllocator {
expected_len: usize,
idx: usize,
values: Vec<u16>,
}

impl InfraPortAllocator {
pub fn new(values: Option<Vec<u16>>, expected_len: usize) -> Self {
let values = match values {
Some(v) => {
validate_ports(&v, expected_len);
v
}
None => vec![INFRA_PORT_PLACEHOLDER; expected_len],
};

Self { expected_len, idx: 0, values }
}

/// Returns the next value. Panics if called more than `expected_len` times.
pub fn next(&mut self) -> u16 {
assert!(
self.idx < self.expected_len,
"InfraPortAllocator exhausted: expected_len is {}",
self.expected_len
);
let out = self.values[self.idx];
self.idx += 1;
out
}
}

impl Drop for InfraPortAllocator {
fn drop(&mut self) {
assert!(
self.idx == self.expected_len,
"InfraPortAllocator dropped before being depleted: produced {} out of {} values",
self.idx,
self.expected_len
);
}
}

// Validates that the provided ports vector has the correct length and all unique values.
fn validate_ports(ports: &[u16], required_ports_num: usize) {
let ports_len = ports.len();
assert_eq!(
ports_len, required_ports_num,
Expand Down
Loading