1616import static tech .pegasys .teku .statetransition .datacolumns .DataAvailabilitySampler .SamplingEligibilityStatus .REQUIRED ;
1717
1818import java .util .Collection ;
19+ import java .util .HashSet ;
1920import java .util .List ;
21+ import java .util .Optional ;
22+ import java .util .Set ;
23+ import java .util .function .Supplier ;
2024import org .apache .logging .log4j .LogManager ;
2125import org .apache .logging .log4j .Logger ;
26+ import org .apache .tuweni .bytes .Bytes32 ;
27+ import tech .pegasys .teku .infrastructure .async .SafeFuture ;
28+ import tech .pegasys .teku .infrastructure .unsigned .UInt64 ;
2229import tech .pegasys .teku .spec .datastructures .blocks .SignedBeaconBlock ;
30+ import tech .pegasys .teku .spec .datastructures .util .DataColumnSlotAndIdentifier ;
31+ import tech .pegasys .teku .statetransition .blobs .RemoteOrigin ;
2332import tech .pegasys .teku .statetransition .datacolumns .util .StringifyUtil ;
2433
2534public class DasPreSampler {
2635
2736 private static final Logger LOG = LogManager .getLogger ();
2837
2938 private final DataAvailabilitySampler sampler ;
39+ private final DataColumnSidecarCustody custody ;
40+ private final Supplier <CustodyGroupCountManager > custodyGroupCountManagerSupplier ;
3041
31- public DasPreSampler (final DataAvailabilitySampler sampler ) {
42+ public DasPreSampler (
43+ final DataAvailabilitySampler sampler ,
44+ final DataColumnSidecarCustody custody ,
45+ final Supplier <CustodyGroupCountManager > custodyGroupCountManagerSupplier ) {
3246 this .sampler = sampler ;
47+ this .custody = custody ;
48+ this .custodyGroupCountManagerSupplier = custodyGroupCountManagerSupplier ;
3349 }
3450
3551 private boolean isSamplingRequired (final SignedBeaconBlock block ) {
@@ -52,19 +68,52 @@ public void onNewPreImportBlocks(final Collection<SignedBeaconBlock> blocks) {
5268 }
5369
5470 private void onNewPreImportBlock (final SignedBeaconBlock block ) {
55- sampler
56- .checkDataAvailability (block .getSlot (), block .getRoot ())
57- .finish (
58- succ ->
59- LOG .debug (
60- "DasPreSampler: success pre-sampling block {} ({})" ,
61- block .getSlot (),
62- block .getRoot ()),
63- err ->
64- LOG .debug (
65- "DasPreSampler: error pre-sampling block {} ({}): {}" ,
66- block .getSlot (),
67- block .getRoot (),
68- err ));
71+ final Set <DataColumnSlotAndIdentifier > requiredColumnIdentifiers =
72+ new HashSet <>(calculateSamplingColumnIds (block .getSlot (), block .getRoot ()));
73+
74+ final SafeFuture <List <DataColumnSlotAndIdentifier >> columnsInCustodyFuture =
75+ maybeHasColumnsInCustody (requiredColumnIdentifiers );
76+
77+ columnsInCustodyFuture
78+ .thenAccept (
79+ columnsInCustody ->
80+ columnsInCustody .forEach (
81+ columnId -> sampler .onAlreadyKnownDataColumn (columnId , RemoteOrigin .CUSTODY )))
82+ .always (
83+ () ->
84+ sampler
85+ .checkDataAvailability (block .getSlot (), block .getRoot ())
86+ .finish (
87+ succ ->
88+ LOG .debug (
89+ "DasPreSampler: success pre-sampling block {} ({})" ,
90+ block .getSlot (),
91+ block .getRoot ()),
92+ err ->
93+ LOG .debug (
94+ "DasPreSampler: error pre-sampling block {} ({}): {}" ,
95+ block .getSlot (),
96+ block .getRoot (),
97+ err )));
98+ }
99+
100+ private List <DataColumnSlotAndIdentifier > calculateSamplingColumnIds (
101+ final UInt64 slot , final Bytes32 blockRoot ) {
102+ return custodyGroupCountManagerSupplier .get ().getSamplingColumnIndices ().stream ()
103+ .map (columnIndex -> new DataColumnSlotAndIdentifier (slot , blockRoot , columnIndex ))
104+ .toList ();
105+ }
106+
107+ private SafeFuture <Optional <DataColumnSlotAndIdentifier >> checkColumnInCustody (
108+ final DataColumnSlotAndIdentifier columnIdentifier ) {
109+ return custody
110+ .hasCustodyDataColumnSidecar (columnIdentifier )
111+ .thenApply (hasColumn -> hasColumn ? Optional .of (columnIdentifier ) : Optional .empty ());
112+ }
113+
114+ private SafeFuture <List <DataColumnSlotAndIdentifier >> maybeHasColumnsInCustody (
115+ final Collection <DataColumnSlotAndIdentifier > columnIdentifiers ) {
116+ return SafeFuture .collectAll (columnIdentifiers .stream ().map (this ::checkColumnInCustody ))
117+ .thenApply (list -> list .stream ().flatMap (Optional ::stream ).toList ());
69118 }
70119}
0 commit comments