Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 16 additions & 19 deletions core/src/tower_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,25 +241,22 @@ impl EtcdTowerStorage {
.unwrap();

let client = runtime
.block_on(async {
etcd_client::Client::connect(
endpoints,
tls_config.map(|tls_config| {
etcd_client::ConnectOptions::default().with_tls(
etcd_client::TlsOptions::new()
.domain_name(tls_config.domain_name)
.ca_certificate(etcd_client::Certificate::from_pem(
tls_config.ca_certificate,
))
.identity(etcd_client::Identity::from_pem(
tls_config.identity_certificate,
tls_config.identity_private_key,
)),
)
}),
)
.await
})
.block_on(etcd_client::Client::connect(
endpoints,
tls_config.map(|tls_config| {
etcd_client::ConnectOptions::default().with_tls(
etcd_client::TlsOptions::new()
.domain_name(tls_config.domain_name)
.ca_certificate(etcd_client::Certificate::from_pem(
tls_config.ca_certificate,
))
.identity(etcd_client::Identity::from_pem(
tls_config.identity_certificate,
tls_config.identity_private_key,
)),
)
}),
))
.map_err(Self::etdc_to_tower_error)?;

Ok(Self {
Expand Down
39 changes: 16 additions & 23 deletions program-test/tests/fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,17 @@ fn simulate_fuzz() {
processor!(process_instruction),
);

let (mut banks_client, payer, last_blockhash) =
rt.block_on(async { program_test.start().await });
let (mut banks_client, payer, last_blockhash) = rt.block_on(program_test.start());

// the honggfuzz `fuzz!` macro does not allow for async closures,
// so we have to use the runtime directly to run async functions
rt.block_on(async {
run_fuzz_instructions(
&[1, 2, 3, 4, 5],
&mut banks_client,
&payer,
last_blockhash,
&program_id,
)
.await
});
rt.block_on(run_fuzz_instructions(
&[1, 2, 3, 4, 5],
&mut banks_client,
&payer,
last_blockhash,
&program_id,
));
}

#[test]
Expand All @@ -64,20 +60,17 @@ fn simulate_fuzz_with_context() {
processor!(process_instruction),
);

let mut context = rt.block_on(async { program_test.start_with_context().await });
let mut context = rt.block_on(program_test.start_with_context());

// the honggfuzz `fuzz!` macro does not allow for async closures,
// so we have to use the runtime directly to run async functions
rt.block_on(async {
run_fuzz_instructions(
&[1, 2, 3, 4, 5],
&mut context.banks_client,
&context.payer,
context.last_blockhash,
&program_id,
)
.await
});
rt.block_on(run_fuzz_instructions(
&[1, 2, 3, 4, 5],
&mut context.banks_client,
&context.payer,
context.last_blockhash,
&program_id,
));
}

async fn run_fuzz_instructions(
Expand Down
4 changes: 2 additions & 2 deletions quic-client/src/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,14 +161,14 @@ impl ClientConnection for QuicClientConnection {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();

let _handle = RUNTIME.spawn(async move { send_data_async(inner, data).await });
let _handle = RUNTIME.spawn(send_data_async(inner, data));
Comment on lines 161 to +164
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change affect behavior? I see we grab an async semaphore at the top of the function; does not running the send_data_async in an async block break anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it does not

async fn send_data_async(
    connection: Arc<NonblockingQuicConnection>,
    buffer: Vec<u8>,
) -> TransportResult<()> {

Since send_data_async is an async fn, when you call it what happens is that it builds its return future and returns it. It only builds it tho, it does not poll it. Futures don't make progress until they're polled, so effectively the future will start executing only inside the task it's spawned on.

It would be different if send_data_async built the future manually, eg:

fn send_data_async(
    connection: Arc<NonblockingQuicConnection>,
    buffer: Vec<u8>,
) -> SendDataFuture {
    do_something_before_creating_the_future();
    SendDataFuture {
       ...
    }
}

In this case do_something_before_creating_the_future does execute before the resulting future is polled. So before landing this we need to double check if we're creating any manual futures, and if we're doing actual work (eg locking things) before returning them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!

I went through and audited all functions that were inside the async { .await } blocks, and they all were async fn (i.e. not creating any manual futures. So I think that means this PR is ok.

Ok(())
}

fn send_data_batch_async(&self, buffers: Vec<Vec<u8>>) -> TransportResult<()> {
let _lock = ASYNC_TASK_SEMAPHORE.acquire();
let inner = self.inner.clone();
let _handle = RUNTIME.spawn(async move { send_data_batch_async(inner, buffers).await });
let _handle = RUNTIME.spawn(send_data_batch_async(inner, buffers));
Ok(())
}

Expand Down
17 changes: 7 additions & 10 deletions tpu-client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -614,16 +614,13 @@ impl LeaderTpuService {
let t_leader_tpu_service = Some({
let recent_slots = recent_slots.clone();
let leader_tpu_cache = leader_tpu_cache.clone();
tokio::spawn(async move {
Self::run(
rpc_client,
recent_slots,
leader_tpu_cache,
pubsub_client,
exit,
)
.await
})
tokio::spawn(Self::run(
rpc_client,
recent_slots,
leader_tpu_cache,
pubsub_client,
exit,
))
});

Ok(LeaderTpuService {
Expand Down