Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
experimental = ["setup-scripts"]
experimental = ["setup-scripts", "wrapper-scripts"]

[scripts.setup.replace-node-and-runtime]
command = ["bash", "-c", '''
rsync -av gear-cli-and-runtime-release/ target/release/
chmod +x target/release/gear
''']

[scripts.setup.set-envs]
command = ["bash", "-c", '''
echo "CARGO_BUILD_JOBS=1" >> "$NEXTEST_ENV"
echo "RAYON_NUM_THREADS=1" >> "$NEXTEST_ENV"
echo "ETHEXE_PROCESSOR_NUM_THREADS=1" >> "$NEXTEST_ENV"
''']

[scripts.wrapper.compute-4-cores]
command = "env RAYON_NUM_THREADS=4 ETHEXE_PROCESSOR_NUM_THREADS=4"

[profile.default]
leak-timeout = { period = "5s", result = "fail" }
slow-timeout = { period = "1m", terminate-after = 5 }
Expand All @@ -15,6 +25,10 @@ path = "junit.xml"
store-success-output = false
store-failure-output = true

[[profile.default.scripts]]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Given the PR title chore(ci): Limited parallelism, it seems these parallelism limits are intended only for the CI environment. Applying this to the default profile will also slow down local test runs for all developers, which might be an unintended side effect.

To scope this change only to CI, consider applying this script to the ci profile instead.

Suggested change
[[profile.default.scripts]]
[[profile.ci.scripts]]

filter = 'all()'
setup = "set-envs"

[profile.ci]
fail-fast = false
archive.include = [
Expand All @@ -26,25 +40,17 @@ path = "junit.xml"
store-success-output = false
store-failure-output = true

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While limiting parallelism is a good strategy to improve test stability, removing all retries might be too aggressive. Retries can still be valuable for handling transient, non-resource-contention-related flakes (e.g., network glitches).

Consider re-introducing the [[profile.ci.overrides]] blocks but with a reduced number of retries (e.g., retries = 2) and without the threads-required setting, which is now globally controlled.

# sdk
[[profile.ci.overrides]]
filter = 'package(gsdk) or package(gcli)'
retries = 5

[[profile.ci.scripts]]
filter = 'package(gsdk) or package(gcli)'
platform = "cfg(unix)"
setup = "replace-node-and-runtime"

# sometimes fails on CI machine in debug profile
# due to an inconsistent machine load and unoptimized code
[[profile.ci.overrides]]
filter = 'package(gear-authorship)'
retries = 5
threads-required = 4
# tests of such crates have timeouts, so we need to compile runtime faster
[[profile.ci.scripts]]
filter = 'package(ethexe-service) or package(ethexe-rpc) or package(ethexe-compute)'
run-wrapper = "compute-4-cores"

# ethexe
[[profile.ci.overrides]]
filter = 'package(ethexe-service) or package(ethexe-observer)'
filter = 'package(ethexe-service) or package(ethexe-rpc) or package(ethexe-compute)'
retries = 5
threads-required = 4
80 changes: 42 additions & 38 deletions ethexe/compute/src/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::{ComputeError, ProcessorExt, Result, service::SubService};
use crate::{ProcessorExt, Result, service::SubService};
use ethexe_common::{
CodeAndIdUnchecked,
db::{CodesStorageRO, CodesStorageRW},
};
use ethexe_db::Database;
use ethexe_processor::{ProcessedCodeInfo, ValidCodeInfo};
use futures::{FutureExt, StreamExt, future::BoxFuture, stream::FuturesUnordered};
use gprimitives::CodeId;
use metrics::Gauge;
use std::task::{Context, Poll};
use tokio::task::JoinSet;
use std::{
future,
task::{Context, Poll},
};

/// Metrics for the [`CodesSubService`].
#[derive(Clone, metrics_derive::Metrics)]
Expand All @@ -41,7 +44,7 @@ pub struct CodesSubService<P: ProcessorExt> {
processor: P,
metrics: Metrics,

processions: JoinSet<Result<CodeId>>,
processions: FuturesUnordered<BoxFuture<'static, Result<CodeId>>>,
}

impl<P: ProcessorExt> CodesSubService<P> {
Expand All @@ -50,7 +53,7 @@ impl<P: ProcessorExt> CodesSubService<P> {
db,
processor,
metrics: Metrics::default(),
processions: JoinSet::new(),
processions: FuturesUnordered::new(),
}
}

Expand All @@ -71,36 +74,37 @@ impl<P: ProcessorExt> CodesSubService<P> {
"Instrumented code {code_id:?} must exist in database"
);
}
self.processions.spawn(async move { Ok(code_id) });
self.processions.push(future::ready(Ok(code_id)).boxed());
} else {
let db = self.db.clone();
let mut processor = self.processor.clone();

self.processions.spawn_blocking(move || {
processor
.process_code(code_and_id)
.map(|ProcessedCodeInfo { code_id, valid }| {
if let Some(ValidCodeInfo {
code,
self.processions.push(
async move {
let ProcessedCodeInfo { code_id, valid } =
processor.process_code(code_and_id).await?;
if let Some(ValidCodeInfo {
code,
instrumented_code,
code_metadata,
}) = valid
{
db.set_original_code(&code);
db.set_instrumented_code(
ethexe_runtime_common::VERSION,
code_id,
instrumented_code,
code_metadata,
}) = valid
{
db.set_original_code(&code);
db.set_instrumented_code(
ethexe_runtime_common::VERSION,
code_id,
instrumented_code,
);
db.set_code_metadata(code_id, code_metadata);
db.set_code_valid(code_id, true);
} else {
db.set_code_valid(code_id, false);
}

code_id
})
});
);
db.set_code_metadata(code_id, code_metadata);
db.set_code_valid(code_id, true);
} else {
db.set_code_valid(code_id, false);
}

Ok(code_id)
}
.boxed(),
);
}

self.metrics
Expand All @@ -113,14 +117,14 @@ impl<P: ProcessorExt> SubService for CodesSubService<P> {
type Output = CodeId;

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::Output>> {
futures::ready!(self.processions.poll_join_next(cx))
.map(|res| {
self.metrics
.processing_codes
.set(self.processions.len() as f64);
res.map_err(ComputeError::CodeProcessJoin)?
})
.map_or(Poll::Pending, Poll::Ready)
if let Poll::Ready(Some(res)) = self.processions.poll_next_unpin(cx) {
self.metrics
.processing_codes
.set(self.processions.len() as f64);
return Poll::Ready(res);
}

Poll::Pending
}
}

Expand Down
9 changes: 6 additions & 3 deletions ethexe/compute/src/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ mod tests {

const USER_ID: ActorId = ActorId::new([1u8; 32]);

pub fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId {
pub async fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId {
let code_id = CodeId::generate(code);

let ValidCodeInfo {
Expand All @@ -448,6 +448,7 @@ mod tests {
code: code.to_vec(),
code_id,
})
.await
.expect("failed to process code")
.valid
.expect("code is invalid");
Expand Down Expand Up @@ -579,7 +580,8 @@ mod tests {

let db = Database::memory();
let mut processor = Processor::new(db.clone()).unwrap();
let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db);
let ping_code_id =
test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await;
let ping_id = ActorId::from(0x10000);

let blockchain = BlockChain::mock(BLOCKCHAIN_LEN as u32).setup(&db);
Expand Down Expand Up @@ -702,7 +704,8 @@ mod tests {
let db = Database::memory();
let mut processor = Processor::new(db.clone()).unwrap();

let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db);
let ping_code_id =
test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await;
let ping_id = ActorId::from(0x10000);

let blockchain = BlockChain::mock(3).setup(&db);
Expand Down
11 changes: 6 additions & 5 deletions ethexe/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ pub enum ComputeError {
BlockHeaderNotFound(H256),
#[error("block validators committed for era not found for block({0})")]
CommittedEraNotFound(H256),
#[error("process code join error")]
CodeProcessJoin(#[from] tokio::task::JoinError),
#[error("codes queue not found for computed block({0})")]
CodesQueueNotFound(H256),
#[error("last committed batch not found for computed block({0})")]
Expand Down Expand Up @@ -101,7 +99,10 @@ pub trait ProcessorExt: Sized + Unpin + Send + Clone + 'static {
executable: ExecutableData,
promise_out_tx: Option<mpsc::UnboundedSender<Promise>>,
) -> impl Future<Output = Result<FinalizedBlockTransitions>> + Send;
fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo>;
fn process_code(
&mut self,
code_and_id: CodeAndIdUnchecked,
) -> impl Future<Output = Result<ProcessedCodeInfo>> + Send;
}

impl ProcessorExt for Processor {
Expand All @@ -115,7 +116,7 @@ impl ProcessorExt for Processor {
.map_err(Into::into)
}

fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo> {
self.process_code(code_and_id).map_err(Into::into)
async fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo> {
self.process_code(code_and_id).await.map_err(Into::into)
}
}
2 changes: 1 addition & 1 deletion ethexe/compute/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl ProcessorExt for MockProcessor {
Ok(self.process_programs_result.take().unwrap_or_default())
}

fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo> {
async fn process_code(&mut self, code_and_id: CodeAndIdUnchecked) -> Result<ProcessedCodeInfo> {
Ok(self
.process_codes_result
.take()
Expand Down
1 change: 0 additions & 1 deletion ethexe/processor/src/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use gprimitives::ActorId;
pub(crate) mod events;
pub(crate) mod overlaid;
pub(crate) mod run;
mod thread_pool;

/// A high-level interface for executing ops,
/// which mutate states based on the current block request events.
Expand Down
Loading
Loading