Skip to content

Commit f414380

Browse files
authored
Batch deal activations (#1310)
* wip: batch onboarding deals test works * fix activate deals failures tests * verified deal activation test * fix market tests * refactor to a map based pattern to ensure parallel return structure * fix deal failure test expectations * adjust market tests for new failure expectations * cron epoch test * fix the tests * remain ActivateDealsResult to DealActivation * commit deal states into state once for all sectors * use sectordeals for batchactivate * cleanup logic for marketactor::BatchActivateDealsResult shortcut * refactor Market::BatchActivateDeals to use BatchReturn * revert verifreg to use BatchReturn * better error context when deal activation fails * remove shortcut path, market actor already handles empty sectors * don't activate sectors with duplicate deals * use a batch activation helper * de duplicate harness deal activation paths * drop Copy requirement on BatchGen::success * simple tests for batch_activate_deals * fix tests
1 parent b712d1e commit f414380

19 files changed

+795
-366
lines changed

actors/market/src/lib.rs

Lines changed: 117 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
// SPDX-License-Identifier: Apache-2.0, MIT
33

44
use std::cmp::min;
5-
use std::collections::{BTreeMap, BTreeSet};
5+
use std::collections::{BTreeMap, BTreeSet, HashSet};
66

77
use cid::multihash::{Code, MultihashDigest, MultihashGeneric};
88
use cid::Cid;
9-
use fil_actors_runtime::{extract_send_result, FIRST_ACTOR_SPECIFIC_EXIT_CODE};
9+
use fil_actors_runtime::{extract_send_result, BatchReturnGen, FIRST_ACTOR_SPECIFIC_EXIT_CODE};
1010
use frc46_token::token::types::{BalanceReturn, TransferFromParams, TransferFromReturn};
1111
use fvm_ipld_bitfield::BitField;
1212
use fvm_ipld_blockstore::Blockstore;
@@ -74,7 +74,7 @@ pub enum Method {
7474
WithdrawBalance = 3,
7575
PublishStorageDeals = 4,
7676
VerifyDealsForActivation = 5,
77-
ActivateDeals = 6,
77+
BatchActivateDeals = 6,
7878
OnMinerSectorsTerminate = 7,
7979
ComputeDataCommitment = 8,
8080
CronTick = 9,
@@ -530,93 +530,145 @@ impl Actor {
530530
Ok(VerifyDealsForActivationReturn { sectors: sectors_data })
531531
}
532532

533-
/// Activate a set of deals, returning the combined deal space and extra info for verified deals.
534-
fn activate_deals(
533+
/// Activate a set of deals, returning the deal space and extra info for sectors containing
534+
/// verified deals. Sectors are activated in parameter-defined order and can fail independently of
535+
/// each other with the responsible ExitCode recorded in a BatchReturn.
536+
fn batch_activate_deals(
535537
rt: &impl Runtime,
536-
params: ActivateDealsParams,
537-
) -> Result<ActivateDealsResult, ActorError> {
538+
params: BatchActivateDealsParams,
539+
) -> Result<BatchActivateDealsResult, ActorError> {
538540
rt.validate_immediate_caller_type(std::iter::once(&Type::Miner))?;
539541
let miner_addr = rt.message().caller();
540542
let curr_epoch = rt.curr_epoch();
541543

542-
let (deal_spaces, verified_infos) = rt.transaction(|st: &mut State, rt| {
543-
let proposal_array = st.get_proposal_array(rt.store())?;
544-
let proposals = get_proposals(&proposal_array, &params.deal_ids, st.next_id)?;
544+
let (activations, batch_ret) = rt.transaction(|st: &mut State, rt| {
545+
let mut deal_states: Vec<(DealID, DealState)> = vec![];
546+
let mut batch_gen = BatchReturnGen::new(params.sectors.len());
547+
let mut activations: Vec<DealActivation> = vec![];
548+
let mut activated_deals: HashSet<DealID> = HashSet::new();
549+
550+
for p in params.sectors {
551+
let proposal_array = st.get_proposal_array(rt.store())?;
552+
553+
if p.deal_ids.iter().any(|id| activated_deals.contains(id)) {
554+
log::warn!(
555+
"failed to activate sector containing duplicate deals {:?}",
556+
p.deal_ids
557+
);
558+
batch_gen.add_fail(ExitCode::USR_ILLEGAL_ARGUMENT);
559+
continue;
560+
}
545561

546-
let deal_spaces = {
547-
validate_and_return_deal_space(
562+
let proposals = match get_proposals(&proposal_array, &p.deal_ids, st.next_id) {
563+
Ok(proposals) => proposals,
564+
Err(e) => {
565+
log::warn!("failed to get proposals for deals {:?}: {:?}", p.deal_ids, e);
566+
batch_gen.add_fail(e.exit_code());
567+
continue;
568+
}
569+
};
570+
571+
let deal_spaces = match validate_and_return_deal_space(
548572
&proposals,
549573
&miner_addr,
550-
params.sector_expiry,
574+
p.sector_expiry,
551575
curr_epoch,
552576
None,
553-
)
554-
.context("failed to validate deal proposals for activation")?
555-
};
577+
) {
578+
Ok(ds) => ds,
579+
Err(e) => {
580+
log::warn!("failed validate deals {:?}: {}", p.deal_ids, e);
581+
batch_gen.add_fail(e.exit_code());
582+
continue;
583+
}
584+
};
556585

557-
// Update deal states
558-
let mut verified_infos = Vec::new();
559-
let mut deal_states: Vec<(DealID, DealState)> = vec![];
586+
// Update deal states
587+
let mut verified_infos = Vec::new();
560588

561-
for (deal_id, proposal) in proposals {
562589
// This construction could be replaced with a single "update deal state"
563590
// state method, possibly batched over all deal ids at once.
564-
let s = st.find_deal_state(rt.store(), deal_id)?;
591+
let update_result: Result<(), ActorError> =
592+
proposals.into_iter().try_for_each(|(deal_id, proposal)| {
593+
let s = st
594+
.find_deal_state(rt.store(), deal_id)
595+
.context(format!("error looking up deal state for {}", deal_id))?;
565596

566-
if s.is_some() {
567-
return Err(actor_error!(
568-
illegal_argument,
569-
"deal {} already activated",
570-
deal_id
571-
));
572-
}
597+
if s.is_some() {
598+
return Err(actor_error!(
599+
illegal_argument,
600+
"deal {} already activated",
601+
deal_id
602+
));
603+
}
573604

574-
let propc = rt_deal_cid(rt, &proposal)?;
605+
let propc = rt_deal_cid(rt, &proposal)?;
575606

576-
// Confirm the deal is in the pending proposals queue.
577-
// It will be removed from this queue later, during cron.
578-
let has = st.has_pending_deal(rt.store(), propc)?;
607+
// Confirm the deal is in the pending proposals queue.
608+
// It will be removed from this queue later, during cron.
609+
let has = st.has_pending_deal(rt.store(), propc)?;
579610

580-
if !has {
581-
return Err(actor_error!(
582-
illegal_state,
583-
"tried to activate deal that was not in the pending set ({})",
584-
propc
585-
));
586-
}
611+
if !has {
612+
return Err(actor_error!(
613+
illegal_state,
614+
"tried to activate deal that was not in the pending set ({})",
615+
propc
616+
));
617+
}
587618

588-
// Extract and remove any verified allocation ID for the pending deal.
589-
let allocation = st
590-
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))?
591-
.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID))
592-
.1;
593-
594-
if allocation != NO_ALLOCATION_ID {
595-
verified_infos.push(VerifiedDealInfo {
596-
client: proposal.client.id().unwrap(),
597-
allocation_id: allocation,
598-
data: proposal.piece_cid,
599-
size: proposal.piece_size,
600-
})
601-
}
619+
// Extract and remove any verified allocation ID for the pending deal.
620+
let allocation = st
621+
.remove_pending_deal_allocation_id(rt.store(), &deal_id_key(deal_id))
622+
.context(format!(
623+
"failed to remove pending deal allocation id {}",
624+
deal_id
625+
))?
626+
.unwrap_or((BytesKey(vec![]), NO_ALLOCATION_ID))
627+
.1;
628+
629+
if allocation != NO_ALLOCATION_ID {
630+
verified_infos.push(VerifiedDealInfo {
631+
client: proposal.client.id().unwrap(),
632+
allocation_id: allocation,
633+
data: proposal.piece_cid,
634+
size: proposal.piece_size,
635+
})
636+
}
602637

603-
deal_states.push((
604-
deal_id,
605-
DealState {
606-
sector_start_epoch: curr_epoch,
607-
last_updated_epoch: EPOCH_UNDEFINED,
608-
slash_epoch: EPOCH_UNDEFINED,
609-
verified_claim: allocation,
610-
},
611-
));
638+
deal_states.push((
639+
deal_id,
640+
DealState {
641+
sector_start_epoch: curr_epoch,
642+
last_updated_epoch: EPOCH_UNDEFINED,
643+
slash_epoch: EPOCH_UNDEFINED,
644+
verified_claim: allocation,
645+
},
646+
));
647+
activated_deals.insert(deal_id);
648+
Ok(())
649+
});
650+
651+
match update_result {
652+
Ok(_) => {
653+
activations.push(DealActivation {
654+
nonverified_deal_space: deal_spaces.deal_space,
655+
verified_infos,
656+
});
657+
batch_gen.add_success();
658+
}
659+
Err(e) => {
660+
log::warn!("failed to activate deals {:?}: {}", p.deal_ids, e);
661+
batch_gen.add_fail(e.exit_code());
662+
}
663+
}
612664
}
613665

614666
st.put_deal_states(rt.store(), &deal_states)?;
615667

616-
Ok((deal_spaces, verified_infos))
668+
Ok((activations, batch_gen.gen()))
617669
})?;
618670

619-
Ok(ActivateDealsResult { nonverified_deal_space: deal_spaces.deal_space, verified_infos })
671+
Ok(BatchActivateDealsResult { activations, activation_results: batch_ret })
620672
}
621673

622674
/// Terminate a set of deals in response to their containing sector being terminated.
@@ -634,7 +686,6 @@ impl Actor {
634686

635687
for id in params.deal_ids {
636688
let deal = st.find_proposal(rt.store(), id)?;
637-
638689
// The deal may have expired and been deleted before the sector is terminated.
639690
// Nothing to do, but continue execution for the other deals.
640691
if deal.is_none() {
@@ -1403,7 +1454,7 @@ impl ActorCode for Actor {
14031454
WithdrawBalance|WithdrawBalanceExported => withdraw_balance,
14041455
PublishStorageDeals|PublishStorageDealsExported => publish_storage_deals,
14051456
VerifyDealsForActivation => verify_deals_for_activation,
1406-
ActivateDeals => activate_deals,
1457+
BatchActivateDeals => batch_activate_deals,
14071458
OnMinerSectorsTerminate => on_miner_sectors_terminate,
14081459
ComputeDataCommitment => compute_data_commitment,
14091460
CronTick => cron_tick,

actors/market/src/types.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use super::ext::verifreg::AllocationID;
55
use cid::Cid;
66
use fil_actors_runtime::Array;
7+
use fil_actors_runtime::BatchReturn;
78
use fvm_ipld_bitfield::BitField;
89
use fvm_ipld_encoding::strict_bytes;
910
use fvm_ipld_encoding::tuple::*;
@@ -97,9 +98,9 @@ pub struct SectorDealData {
9798
}
9899

99100
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
100-
pub struct ActivateDealsParams {
101-
pub deal_ids: Vec<DealID>,
102-
pub sector_expiry: ChainEpoch,
101+
#[serde(transparent)]
102+
pub struct BatchActivateDealsParams {
103+
pub sectors: Vec<SectorDeals>,
103104
}
104105

105106
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
@@ -111,12 +112,18 @@ pub struct VerifiedDealInfo {
111112
}
112113

113114
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
114-
pub struct ActivateDealsResult {
115+
pub struct DealActivation {
115116
#[serde(with = "bigint_ser")]
116117
pub nonverified_deal_space: BigInt,
117118
pub verified_infos: Vec<VerifiedDealInfo>,
118119
}
119120

121+
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
122+
pub struct BatchActivateDealsResult {
123+
pub activation_results: BatchReturn,
124+
pub activations: Vec<DealActivation>,
125+
}
126+
120127
#[derive(Serialize_tuple, Deserialize_tuple, Debug, Clone, Eq, PartialEq)]
121128
pub struct DealSpaces {
122129
#[serde(with = "bigint_ser")]

0 commit comments

Comments
 (0)