diff --git a/src/main/java/ru/itmo/ctlab/hict/hict_library/chunkedfile/Initializers.java b/src/main/java/ru/itmo/ctlab/hict/hict_library/chunkedfile/Initializers.java index 1f93bd5..d0d7e63 100644 --- a/src/main/java/ru/itmo/ctlab/hict/hict_library/chunkedfile/Initializers.java +++ b/src/main/java/ru/itmo/ctlab/hict/hict_library/chunkedfile/Initializers.java @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Executors; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -47,6 +46,28 @@ public class Initializers { HDF5LibraryInitializer.initializeHDF5Library(); } + public interface ProgressReporter { + void report(@NotNull String stage, double progress); + } + + private static final ThreadLocal PROGRESS = new ThreadLocal<>(); + + public static T withProgressReporter(final @NotNull ProgressReporter reporter, final @NotNull java.util.function.Supplier supplier) { + PROGRESS.set(reporter); + try { + return supplier.get(); + } finally { + PROGRESS.remove(); + } + } + + private static void reportProgress(final @NotNull String stage, final double progress) { + final var reporter = PROGRESS.get(); + if (reporter != null) { + reporter.report(stage, progress); + } + } + public static @NotNull List<@NotNull StripeDescriptor> readStripeDescriptors(final long resolution, final @NotNull IHDF5Reader reader) { final List result = new ArrayList<>(); final long[] stripeLengthBins; @@ -54,12 +75,37 @@ public class Initializers { stripeLengthBins = reader.int64().readArray(stripeLengthsBinsDataset.getDataSetPath()); } - for (int stripeId = 0; stripeId < stripeLengthBins.length; ++stripeId) { - try (final var stripeBinWeightsDataset = reader.object().openDataSet(getStripeBinWeightsDatasetPath(resolution))) { - final var stripeBinWeights = reader.float64().readMDArraySlice(stripeBinWeightsDataset, new long[]{stripeId, -1L}); - final var newStripe = new StripeDescriptor(stripeId, stripeLengthBins[stripeId], stripeBinWeights.getAsFlatArray()); - result.add(newStripe); + try (final var stripeBinWeightsDataset = reader.object().openDataSet(getStripeBinWeightsDatasetPath(resolution))) { + reportProgress("Reading stripe weights", 0.0); + final long[] dims = reader.object().getDataSetInformation(stripeBinWeightsDataset.getDataSetPath()).getDimensions(); + final int rowLen = dims.length > 1 ? (int) dims[1] : 0; + final int stripeCount = stripeLengthBins.length; + final int bytesPerRow = Math.max(1, rowLen) * Double.BYTES; + final int targetBlockBytes = 16 * 1024 * 1024; + final int maxRowsPerBlock = Math.max(1, targetBlockBytes / bytesPerRow); + final int blockSize = Math.min(256, Math.min(maxRowsPerBlock, stripeCount)); + + for (int start = 0; start < stripeCount; start += blockSize) { + final int count = Math.min(blockSize, stripeCount - start); + final var block = reader.float64().readMDArrayBlockWithOffset( + stripeBinWeightsDataset.getDataSetPath(), + new int[]{count, rowLen}, + new long[]{start, 0} + ); + final double[] flat = block.getAsFlatArray(); + for (int i = 0; i < count; i++) { + final int stripeId = start + i; + final int len = (int) stripeLengthBins[stripeId]; + final int rowOffset = i * rowLen; + final int copyLen = Math.min(len, Math.max(0, rowLen)); + final double[] weights = copyLen > 0 + ? Arrays.copyOfRange(flat, rowOffset, rowOffset + copyLen) + : new double[0]; + result.add(new StripeDescriptor(stripeId, stripeLengthBins[stripeId], weights)); + } + reportProgress("Reading stripe weights", (double) Math.min(stripeCount, start + count) / Math.max(1, stripeCount)); } + reportProgress("Reading stripe weights", 1.0); } return result; @@ -121,20 +167,18 @@ public class Initializers { // }); // } // } - try (final var executorService = Executors.newWorkStealingPool()) { - for (int i = 1; i < resolutions.length; ++i) { - int finalI = i; - executorService.submit(() -> { - final var stripes = readStripeDescriptors(resolutions[finalI], reader); - resolutionOrderToStripes.set(finalI, stripes); - chunkedFile.getStripeCount()[finalI] = stripes.size(); - final var atus = readATL(resolutions[finalI], reader, stripes); - resolutionOrderToBasisATUs.set(finalI, atus); - final var dataBundles = readContigDataBundles(resolutions[finalI], reader, atus); - contigDescriptorDataBundles.set(finalI, dataBundles); - }); - } + reportProgress("Reading resolution metadata", 0.0); + for (int i = 1; i < resolutions.length; ++i) { + final var stripes = readStripeDescriptors(resolutions[i], reader); + resolutionOrderToStripes.set(i, stripes); + chunkedFile.getStripeCount()[i] = stripes.size(); + final var atus = readATL(resolutions[i], reader, stripes); + resolutionOrderToBasisATUs.set(i, atus); + final var dataBundles = readContigDataBundles(resolutions[i], reader, atus); + contigDescriptorDataBundles.set(i, dataBundles); + reportProgress("Reading resolution metadata", (double) i / Math.max(1, resolutions.length - 1)); } + reportProgress("Reading resolution metadata", 1.0); contigCount = contigDescriptorDataBundles.get(1).size(); @@ -154,17 +198,27 @@ public class Initializers { } - final var contigDescriptors = IntStream.range(0, contigCount).parallel().mapToObj(contigId -> new ContigDescriptor( - contigId, - contigNames[contigId], - contigLengthBp[contigId], - contigDescriptorDataBundles.stream().skip(1L).mapToLong(bundlesAtResolution -> bundlesAtResolution.get(contigId).lengthBins()).boxed().toList(), - contigDescriptorDataBundles.stream().skip(1L).map(bundlesAtResolution -> bundlesAtResolution.get(contigId).hideType()).toList(), - contigDescriptorDataBundles.stream().skip(1L).map(bundlesAtResolution -> bundlesAtResolution.get(contigId).atus()).toList(), - contigNames[contigId], 0) - ); + reportProgress("Building contig descriptors", 0.0); + final List result = new ArrayList<>(contigCount); + for (int contigId = 0; contigId < contigCount; contigId++) { + final int cid = contigId; + final var contigDescriptor = new ContigDescriptor( + cid, + contigNames[cid], + contigLengthBp[cid], + contigDescriptorDataBundles.stream().skip(1L).mapToLong(bundlesAtResolution -> bundlesAtResolution.get(cid).lengthBins()).boxed().toList(), + contigDescriptorDataBundles.stream().skip(1L).map(bundlesAtResolution -> bundlesAtResolution.get(cid).hideType()).toList(), + contigDescriptorDataBundles.stream().skip(1L).map(bundlesAtResolution -> bundlesAtResolution.get(cid).atus()).toList(), + contigNames[cid], 0 + ); + result.add(new ContigTree.ContigTuple(contigDescriptor, contigDirections.get(contigDescriptor.getContigId()))); + if ((contigId & 1023) == 0) { + reportProgress("Building contig descriptors", (double) contigId / Math.max(1, contigCount)); + } + } + reportProgress("Building contig descriptors", 1.0); - return contigDescriptors.map(contigDescriptor -> new ContigTree.ContigTuple(contigDescriptor, contigDirections.get(contigDescriptor.getContigId()))).toList(); + return result; } public static @NotNull List<@NotNull ContigDescriptorDataBundle> readContigDataBundles(final long resolution, final @NotNull IHDF5Reader reader, final List basisATUs) { diff --git a/src/main/java/ru/itmo/ctlab/hict/hict_server/handlers/fileop/FileOpHandlersHolder.java b/src/main/java/ru/itmo/ctlab/hict/hict_server/handlers/fileop/FileOpHandlersHolder.java index 517706f..3df8ecc 100644 --- a/src/main/java/ru/itmo/ctlab/hict/hict_server/handlers/fileop/FileOpHandlersHolder.java +++ b/src/main/java/ru/itmo/ctlab/hict/hict_server/handlers/fileop/FileOpHandlersHolder.java @@ -35,6 +35,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import ru.itmo.ctlab.hict.hict_library.chunkedfile.Initializers; import ru.itmo.ctlab.hict.hict_library.chunkedfile.ChunkedFile; import ru.itmo.ctlab.hict.hict_server.HandlersHolder; import ru.itmo.ctlab.hict.hict_server.dto.response.assembly.AssemblyInfoDTO; @@ -76,16 +77,32 @@ public void addHandlersToRouter(final @NotNull Router router) { ctx.fail(new RuntimeException("Filename must be specified to open the file")); return; } + final boolean verbose = Boolean.parseBoolean(System.getProperty("HICT_VERBOSE", "false")); + if (verbose) { + log.info("Opening file " + filename); + } final @NotNull @NonNull LocalMap map = vertx.sharedData().getLocalMap("hict_server"); - final var chunkedFile = new ChunkedFile( + final var progress = new io.vertx.core.json.JsonObject() + .put("stage", "starting") + .put("progress", 0.0); + map.put("openProgress", progress); + + final var chunkedFile = Initializers.withProgressReporter((stage, progressValue) -> { + map.put("openProgress", new io.vertx.core.json.JsonObject() + .put("stage", stage) + .put("progress", progressValue)); + if (verbose) { + log.info(String.format("Open progress: %s (%.1f%%)", stage, progressValue * 100.0)); + } + }, () -> new ChunkedFile( new ChunkedFile.ChunkedFileOptions( Path.of(dataDirectory.toString(), filename), (int) map.getOrDefault("MIN_DS_POOL", 4), (int) map.getOrDefault("MAX_DS_POOL", 16) ) - ); + )); final var chunkedFileWrapper = new ShareableWrappers.ChunkedFileWrapper(chunkedFile); log.info("Putting chunkedFile into the local map"); @@ -94,11 +111,31 @@ public void addHandlersToRouter(final @NotNull Router router) { map.put("TileStatisticHolder", TileStatisticHolder.newDefaultStatisticHolder(chunkedFile.getResolutions().length)); + map.put("openProgress", new io.vertx.core.json.JsonObject() + .put("stage", "done") + .put("progress", 1.0)); + ctx.response() .putHeader("content-type", "application/json") .end(Json.encode(generateOpenFileResponse(chunkedFile))); }); + router.post("/open_progress").handler(ctx -> { + final @NotNull @NonNull LocalMap map = vertx.sharedData().getLocalMap("hict_server"); + final var progressObj = map.get("openProgress"); + if (!(progressObj instanceof io.vertx.core.json.JsonObject)) { + ctx.response() + .putHeader("content-type", "application/json") + .end(Json.encode(new io.vertx.core.json.JsonObject() + .put("stage", "idle") + .put("progress", 0.0))); + return; + } + ctx.response() + .putHeader("content-type", "application/json") + .end(((io.vertx.core.json.JsonObject) progressObj).encode()); + }); + router.post("/attach").blockingHandler(ctx -> { final @NotNull @NonNull LocalMap map = vertx.sharedData().getLocalMap("hict_server"); final var chunkedFileWrapper = ((ShareableWrappers.ChunkedFileWrapper) (map.get("chunkedFile"))); diff --git a/src/main/java/ru/itmo/ctlab/hict/hict_server/tools/HictCli.java b/src/main/java/ru/itmo/ctlab/hict/hict_server/tools/HictCli.java index 61b3470..f8f9d78 100644 --- a/src/main/java/ru/itmo/ctlab/hict/hict_server/tools/HictCli.java +++ b/src/main/java/ru/itmo/ctlab/hict/hict_server/tools/HictCli.java @@ -48,6 +48,9 @@ public static void main(final String[] args) { } final CommandLine.ParseResult parseResult = commandLine.parseArgs(args); + if (parseResult.hasMatchedOption("verbose")) { + System.setProperty("HICT_VERBOSE", "true"); + } final var subcommand = parseResult.subcommand(); final String subcommandName = subcommand != null ? subcommand.commandSpec().name() : "";