Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ dependencies {

api("com.asarkar.grpc:grpc-test:2.0.0")
api("com.esaulpaugh:headlong:13.3.1")
api("com.github.luben:zstd-jni:1.5.7-7")
api("com.github.meanbeanlib:meanbean:3.0.0-M9")
api("com.github.vertical-blank:sql-formatter:2.0.5")
api("com.bucket4j:bucket4j-core:8.10.1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public enum StreamType {
List.of("csv", "pb"),
Duration.ofMinutes(15L)),
RECORD(RecordFile::new, "recordstreams", "record", "", List.of("rcd"), Duration.ofSeconds(2L)),
BLOCK(BlockFile::new, "", "", "", List.of("blk"), Duration.ofMillis(500L));
BLOCK(BlockFile::new, "block", "", "", List.of("blk"), Duration.ofMillis(500L));

public static final String SIGNATURE_SUFFIX = "_sig";

Expand All @@ -41,12 +41,12 @@ public enum StreamType {
private final Duration fileCloseInterval;

StreamType(
Supplier<? extends StreamFile<?>> supplier,
String path,
String nodePrefix,
String suffix,
List<String> extensions,
Duration fileCloseInterval) {
final Supplier<? extends StreamFile<?>> supplier,
final String path,
final String nodePrefix,
final String suffix,
final List<String> extensions,
final Duration fileCloseInterval) {
this.supplier = supplier;
this.path = path;
this.nodePrefix = nodePrefix;
Expand All @@ -66,6 +66,14 @@ public boolean isChained() {
return this != BALANCE;
}

public String toBucketFilename(final String filename) {
if (this != BLOCK) {
return filename;
}

return filename.replaceAll("(\\d{4})", "$1/");
}

@SuppressWarnings("unchecked")
public <T extends StreamFile<?>> T newStreamFile() {
return (T) supplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@
import org.hiero.mirror.common.domain.DigestAlgorithm;
import org.hiero.mirror.common.domain.StreamFile;
import org.hiero.mirror.common.domain.StreamType;
import org.jspecify.annotations.NonNull;

@Builder(toBuilder = true)
@Data
@AllArgsConstructor
@NoArgsConstructor
public class BlockFile implements StreamFile<BlockTransaction> {
public final class BlockFile implements StreamFile<BlockTransaction> {

private static final int BASENAME_LENGTH = 36;
private static final int BASENAME_LENGTH = 19;
private static final char BASENAME_PADDING = '0';
private static final String COMPRESSED_FILE_SUFFIX = ".blk.gz";
private static final String COMPRESSED_FILE_SUFFIX = ".blk.zstd";
private static final String FILE_SUFFIX = ".blk";
private static final Predicate<String> STREAMED_FILENAME_PREDICATE =
Pattern.compile("^\\d{36}.blk$").asPredicate();
Pattern.compile("^\\d{19}.blk$").asPredicate();

@ToString.Exclude
private BlockHeader blockHeader;
Expand Down Expand Up @@ -71,8 +72,6 @@ public class BlockFile implements StreamFile<BlockTransaction> {

private String name;

private Long nodeId;

private String node;

@ToString.Exclude
Expand All @@ -88,13 +87,13 @@ public class BlockFile implements StreamFile<BlockTransaction> {

private int version;

public static String getFilename(long blockNumber, boolean gzipped) {
public static String getFilename(final long blockNumber, final boolean compressed) {
if (blockNumber < 0) {
throw new IllegalArgumentException("Block number must be non-negative");
}

var filename = leftPad(Long.toString(blockNumber), BASENAME_LENGTH, BASENAME_PADDING);
return gzipped ? filename + COMPRESSED_FILE_SUFFIX : filename + FILE_SUFFIX;
return compressed ? filename + COMPRESSED_FILE_SUFFIX : filename + FILE_SUFFIX;
}

@Override
Expand All @@ -107,6 +106,10 @@ public String getFileHash() {
return StringUtils.EMPTY;
}

public Long getNodeId() {
return -1L;
}

public BlockSourceType getSourceType() {
if (StringUtils.isBlank(name)) {
return null;
Expand All @@ -124,9 +127,14 @@ public StreamType getType() {
return StreamType.BLOCK;
}

@Override
public void setNodeId(@NonNull Long nodeId) {
// ignored
}

public static class BlockFileBuilder {

public BlockFileBuilder onNewRound(long roundNumber) {
public BlockFileBuilder onNewRound(final long roundNumber) {
if (roundStart == null) {
roundStart = roundNumber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
import java.util.Map;
import java.util.SortedSet;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

class StreamTypeTest {
final class StreamTypeTest {

private static final Map<StreamType, List<String>> DATA_EXTENSIONS =
ImmutableMap.<StreamType, List<String>>builder()
Expand Down Expand Up @@ -69,6 +70,18 @@ void getSignatureExtensions(StreamType streamType, List<String> signatureExtensi
assertPriorities(streamType.getSignatureExtensions(), signatureExtensions);
}

@Test
void toBucketFilename() {
assertThat(StreamType.BALANCE.toBucketFilename("2020-06-03T16_45_00.1Z_Balances.csv"))
.isEqualTo("2020-06-03T16_45_00.1Z_Balances.csv");
assertThat(StreamType.RECORD.toBucketFilename("2020-06-03T16_45_00.1Z.rcd"))
.isEqualTo("2020-06-03T16_45_00.1Z.rcd");
assertThat(StreamType.BLOCK.toBucketFilename("0000000000000000000.blk.zstd"))
.isEqualTo("0000/0000/0000/0000/000.blk.zstd");
assertThat(StreamType.BLOCK.toBucketFilename("0000000000000000000.blk"))
.isEqualTo("0000/0000/0000/0000/000.blk");
}

void assertPriorities(SortedSet<StreamType.Extension> actual, List<String> expected) {
// ensures extensions are ordered by priority
assertThat(actual.stream().map(StreamType.Extension::getName)).containsExactlyElementsOf(expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,15 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

class BlockFileTest {

@ParameterizedTest(name = "gzipped={0}")
@CsvSource(value = {"true, '.gz'", "false, ''"})
void getBlockStreamFilename(boolean gzipped, String extraSuffix) {
assertThat(BlockFile.getFilename(0, gzipped))
.isEqualTo("000000000000000000000000000000000000.blk" + extraSuffix);
assertThat(BlockFile.getFilename(1, gzipped))
.isEqualTo("000000000000000000000000000000000001.blk" + extraSuffix);
assertThat(BlockFile.getFilename(0, gzipped))
.isEqualTo("000000000000000000000000000000000000.blk" + extraSuffix);
assertThat(BlockFile.getFilename(1, gzipped))
.isEqualTo("000000000000000000000000000000000001.blk" + extraSuffix);
final class BlockFileTest {

@ParameterizedTest(name = "compressed={0}")
@CsvSource(value = {"true, '.zstd'", "false, ''"})
void getBlockStreamFilename(final boolean compressed, final String extraSuffix) {
assertThat(BlockFile.getFilename(0, compressed)).isEqualTo("0000000000000000000.blk" + extraSuffix);
assertThat(BlockFile.getFilename(1, compressed)).isEqualTo("0000000000000000001.blk" + extraSuffix);
assertThat(BlockFile.getFilename(0, compressed)).isEqualTo("0000000000000000000.blk" + extraSuffix);
assertThat(BlockFile.getFilename(1, compressed)).isEqualTo("0000000000000000001.blk" + extraSuffix);
assertThatThrownBy(() -> BlockFile.getFilename(-1, true))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Block number must be non-negative");
Expand All @@ -30,11 +26,11 @@ void getBlockStreamFilename(boolean gzipped, String extraSuffix) {
@ParameterizedTest
@CsvSource(textBlock = """
,
000000000000000000000000000000000000.blk, BLOCK_NODE
000000000000000000000000000000000000.blk.gz, FILE
0000000000000000000.blk, BLOCK_NODE
0000000000000000000.blk.zstd, FILE
""")
void getSourceType(String name, BlockSourceType type) {
var blockFile = new BlockFile();
void getSourceType(final String name, final BlockSourceType type) {
final var blockFile = new BlockFile();
blockFile.setName(name);
assertThat(blockFile.getSourceType()).isEqualTo(type);
}
Expand Down
1 change: 1 addition & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ value, it is recommended to only populate overridden properties in the custom `a

| Name | Default | Description |
| ------------------------------------------------------------------------------- | ---------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `hiero.mirror.importer.block.bucketName` | | The cloud storage bucket name to download blockstream files. This value takes priority over network hardcoded bucket names. |
| `hiero.mirror.importer.block.cutover` | | Whether to auto switch from record stream to block stream. This overrides the default set for the network. |
| `hiero.mirror.importer.block.cutoverThreshold` | 8s | The amount of time to wait to switch between block stream and record stream during cutover. |
| `hiero.mirror.importer.block.enabled` | false | Whether to enable block stream source |
Expand Down
1 change: 1 addition & 0 deletions importer/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies {
implementation("software.amazon.awssdk:s3")
implementation("software.amazon.awssdk:sts")
protobuf("org.hiero.block:block-node-protobuf-sources:$blockNodeVersion")
runtimeOnly("com.github.luben:zstd-jni")
runtimeOnly("io.grpc:grpc-netty")
testImplementation(project(path = ":common", configuration = "testClasses"))
testImplementation("com.asarkar.grpc:grpc-test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.nio.file.Paths;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down Expand Up @@ -105,17 +106,25 @@ public final class HederaNetwork {
public static final String PREVIEWNET = "previewnet";
public static final String TESTNET = "testnet";

private static final Map<String, String> NETWORK_DEFAULT_BUCKETS = Map.of(
DEMO, "hedera-demo-streams",
MAINNET, "hedera-mainnet-streams",
private static final Map<String, CloudBucket> NETWORK_DEFAULT_BUCKETS = Map.of(
DEMO, new CloudBucket("hedera-demo-recent-block-streams", "hedera-demo-streams"),
MAINNET, new CloudBucket("hedera-mainnet-recent-block-streams", "hedera-mainnet-streams"),
// OTHER has no default bucket
PREVIEWNET, "hedera-preview-testnet-streams",
TESTNET, "hedera-testnet-streams-2024-02");
PREVIEWNET, new CloudBucket("hedera-previewnet-recent-block-streams", "hedera-preview-testnet-streams"),
TESTNET, new CloudBucket("hedera-testnet-recent-block-streams", "hedera-testnet-streams-2024-02"));

private HederaNetwork() {}

public static String getBlockStreamBucketName(final String network) {
return Optional.ofNullable(NETWORK_DEFAULT_BUCKETS.get(network))
.map(CloudBucket::blockStream)
.orElse("");
}

public static String getBucketName(final String network) {
return NETWORK_DEFAULT_BUCKETS.getOrDefault(network, "");
return Optional.ofNullable(NETWORK_DEFAULT_BUCKETS.get(network))
.map(CloudBucket::recordStream)
.orElse("");
}

public static boolean hasCutover(final String network) {
Expand All @@ -126,4 +135,6 @@ public static boolean isAllowAnonymousAccess(final String network) {
return DEMO.equals(network);
}
}

private record CloudBucket(String blockStream, String recordStream) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.hiero.mirror.common.CommonProperties;
import org.hiero.mirror.importer.downloader.CommonDownloaderProperties;
import org.hiero.mirror.importer.downloader.StreamSourceProperties;
import org.hiero.mirror.importer.downloader.block.BlockProperties;
import org.hiero.mirror.importer.downloader.provider.LocalStreamFileProperties;
import org.hiero.mirror.importer.downloader.provider.LocalStreamFileProvider;
import org.hiero.mirror.importer.downloader.provider.S3StreamFileProvider;
Expand All @@ -37,6 +38,7 @@
@RequiredArgsConstructor
class CloudStorageConfiguration {

private final BlockProperties blockProperties;
private final CommonProperties commonProperties;
private final CommonDownloaderProperties commonDownloaderProperties;
private final LocalStreamFileProperties localProperties;
Expand All @@ -45,12 +47,16 @@ class CloudStorageConfiguration {
@Bean
List<StreamFileProvider> streamFileProviders() {
if (StringUtils.isBlank(commonDownloaderProperties.getPathPrefix())) {
log.info("Configured to download from bucket {}", commonDownloaderProperties.getBucketName());
log.info(
"Configured to download record streams from bucket {} and block streams from bucket {}",
commonDownloaderProperties.getBucketName(),
blockProperties.getBucketName());
} else {
log.info(
"Configured to download from bucket {} with path prefix {}",
"Configured to download record streams from bucket {} with path prefix {} and block streams from bucket {}",
commonDownloaderProperties.getBucketName(),
commonDownloaderProperties.getPathPrefix());
commonDownloaderProperties.getPathPrefix(),
blockProperties.getBucketName());
}

var providers = new ArrayList<StreamFileProvider>();
Expand All @@ -61,7 +67,8 @@ List<StreamFileProvider> streamFileProviders() {
case LOCAL ->
new LocalStreamFileProvider(commonProperties, commonDownloaderProperties, localProperties);
case GCP, S3 ->
new S3StreamFileProvider(commonProperties, commonDownloaderProperties, s3Client(source));
new S3StreamFileProvider(
blockProperties, commonProperties, commonDownloaderProperties, s3Client(source));
};

providers.add(provider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ public static StreamFileData from(File file, StreamFilename streamFilename) {
return readStreamFileData(file, streamFilename);
}

public static StreamFileData from(Path basePath, StreamFilename streamFilename) {
var streamFile = new File(basePath.toFile(), streamFilename.getFilePath());
public static StreamFileData from(final Path basePath, final StreamFilename streamFilename) {
final var streamFile = new File(basePath.toFile(), streamFilename.getBucketFilePath());
return readStreamFileData(streamFile, streamFilename);
}

Expand Down Expand Up @@ -94,7 +94,7 @@ public String getFilename() {
}

public String getFilePath() {
return streamFilename.getFilePath();
return streamFilename.getBucketFilePath();
}

@Override
Expand Down
Loading
Loading