Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -47,19 +46,66 @@ public class Initializers {
HDF5LibraryInitializer.initializeHDF5Library();
}

public interface ProgressReporter {
void report(@NotNull String stage, double progress);
}

private static final ThreadLocal<ProgressReporter> PROGRESS = new ThreadLocal<>();

public static <T> T withProgressReporter(final @NotNull ProgressReporter reporter, final @NotNull java.util.function.Supplier<T> supplier) {
PROGRESS.set(reporter);
try {
return supplier.get();
} finally {
PROGRESS.remove();
}
}

private static void reportProgress(final @NotNull String stage, final double progress) {
final var reporter = PROGRESS.get();
if (reporter != null) {
reporter.report(stage, progress);
}
}

public static @NotNull List<@NotNull StripeDescriptor> readStripeDescriptors(final long resolution, final @NotNull IHDF5Reader reader) {
final List<StripeDescriptor> result = new ArrayList<>();
final long[] stripeLengthBins;
try (final var stripeLengthsBinsDataset = reader.object().openDataSet(getStripeLengthsBinsDatasetPath(resolution))) {
stripeLengthBins = reader.int64().readArray(stripeLengthsBinsDataset.getDataSetPath());
}

for (int stripeId = 0; stripeId < stripeLengthBins.length; ++stripeId) {
try (final var stripeBinWeightsDataset = reader.object().openDataSet(getStripeBinWeightsDatasetPath(resolution))) {
final var stripeBinWeights = reader.float64().readMDArraySlice(stripeBinWeightsDataset, new long[]{stripeId, -1L});
final var newStripe = new StripeDescriptor(stripeId, stripeLengthBins[stripeId], stripeBinWeights.getAsFlatArray());
result.add(newStripe);
try (final var stripeBinWeightsDataset = reader.object().openDataSet(getStripeBinWeightsDatasetPath(resolution))) {
reportProgress("Reading stripe weights", 0.0);
final long[] dims = reader.object().getDataSetInformation(stripeBinWeightsDataset.getDataSetPath()).getDimensions();
final int rowLen = dims.length > 1 ? (int) dims[1] : 0;
final int stripeCount = stripeLengthBins.length;
final int bytesPerRow = Math.max(1, rowLen) * Double.BYTES;
final int targetBlockBytes = 16 * 1024 * 1024;
final int maxRowsPerBlock = Math.max(1, targetBlockBytes / bytesPerRow);
final int blockSize = Math.min(256, Math.min(maxRowsPerBlock, stripeCount));

for (int start = 0; start < stripeCount; start += blockSize) {
final int count = Math.min(blockSize, stripeCount - start);
final var block = reader.float64().readMDArrayBlockWithOffset(
stripeBinWeightsDataset.getDataSetPath(),
new int[]{count, rowLen},
new long[]{start, 0}
);
final double[] flat = block.getAsFlatArray();
for (int i = 0; i < count; i++) {
final int stripeId = start + i;
final int len = (int) stripeLengthBins[stripeId];
final int rowOffset = i * rowLen;
final int copyLen = Math.min(len, Math.max(0, rowLen));
final double[] weights = copyLen > 0
? Arrays.copyOfRange(flat, rowOffset, rowOffset + copyLen)
: new double[0];
result.add(new StripeDescriptor(stripeId, stripeLengthBins[stripeId], weights));
}
reportProgress("Reading stripe weights", (double) Math.min(stripeCount, start + count) / Math.max(1, stripeCount));
}
reportProgress("Reading stripe weights", 1.0);
}

return result;
Expand Down Expand Up @@ -121,20 +167,18 @@ public class Initializers {
// });
// }
// }
try (final var executorService = Executors.newWorkStealingPool()) {
for (int i = 1; i < resolutions.length; ++i) {
int finalI = i;
executorService.submit(() -> {
final var stripes = readStripeDescriptors(resolutions[finalI], reader);
resolutionOrderToStripes.set(finalI, stripes);
chunkedFile.getStripeCount()[finalI] = stripes.size();
final var atus = readATL(resolutions[finalI], reader, stripes);
resolutionOrderToBasisATUs.set(finalI, atus);
final var dataBundles = readContigDataBundles(resolutions[finalI], reader, atus);
contigDescriptorDataBundles.set(finalI, dataBundles);
});
}
reportProgress("Reading resolution metadata", 0.0);
for (int i = 1; i < resolutions.length; ++i) {
final var stripes = readStripeDescriptors(resolutions[i], reader);
resolutionOrderToStripes.set(i, stripes);
chunkedFile.getStripeCount()[i] = stripes.size();
final var atus = readATL(resolutions[i], reader, stripes);
resolutionOrderToBasisATUs.set(i, atus);
final var dataBundles = readContigDataBundles(resolutions[i], reader, atus);
contigDescriptorDataBundles.set(i, dataBundles);
reportProgress("Reading resolution metadata", (double) i / Math.max(1, resolutions.length - 1));
}
reportProgress("Reading resolution metadata", 1.0);

contigCount = contigDescriptorDataBundles.get(1).size();

Expand All @@ -154,17 +198,27 @@ public class Initializers {
}


final var contigDescriptors = IntStream.range(0, contigCount).parallel().mapToObj(contigId -> new ContigDescriptor(
contigId,
contigNames[contigId],
contigLengthBp[contigId],
contigDescriptorDataBundles.stream().skip(1L).mapToLong(bundlesAtResolution -> bundlesAtResolution.get(contigId).lengthBins()).boxed().toList(),
contigDescriptorDataBundles.stream().skip(1L).map(bundlesAtResolution -> bundlesAtResolution.get(contigId).hideType()).toList(),
contigDescriptorDataBundles.stream().skip(1L).map(bundlesAtResolution -> bundlesAtResolution.get(contigId).atus()).toList(),
contigNames[contigId], 0)
);
reportProgress("Building contig descriptors", 0.0);
final List<ContigTree.ContigTuple> result = new ArrayList<>(contigCount);
for (int contigId = 0; contigId < contigCount; contigId++) {
final int cid = contigId;
final var contigDescriptor = new ContigDescriptor(
cid,
contigNames[cid],
contigLengthBp[cid],
contigDescriptorDataBundles.stream().skip(1L).mapToLong(bundlesAtResolution -> bundlesAtResolution.get(cid).lengthBins()).boxed().toList(),
contigDescriptorDataBundles.stream().skip(1L).map(bundlesAtResolution -> bundlesAtResolution.get(cid).hideType()).toList(),
contigDescriptorDataBundles.stream().skip(1L).map(bundlesAtResolution -> bundlesAtResolution.get(cid).atus()).toList(),
contigNames[cid], 0
);
result.add(new ContigTree.ContigTuple(contigDescriptor, contigDirections.get(contigDescriptor.getContigId())));
if ((contigId & 1023) == 0) {
reportProgress("Building contig descriptors", (double) contigId / Math.max(1, contigCount));
}
}
reportProgress("Building contig descriptors", 1.0);

return contigDescriptors.map(contigDescriptor -> new ContigTree.ContigTuple(contigDescriptor, contigDirections.get(contigDescriptor.getContigId()))).toList();
return result;
}

public static @NotNull List<@NotNull ContigDescriptorDataBundle> readContigDataBundles(final long resolution, final @NotNull IHDF5Reader reader, final List<ATUDescriptor> basisATUs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.commons.lang3.ArrayUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import ru.itmo.ctlab.hict.hict_library.chunkedfile.Initializers;
import ru.itmo.ctlab.hict.hict_library.chunkedfile.ChunkedFile;
import ru.itmo.ctlab.hict.hict_server.HandlersHolder;
import ru.itmo.ctlab.hict.hict_server.dto.response.assembly.AssemblyInfoDTO;
Expand Down Expand Up @@ -76,16 +77,32 @@ public void addHandlersToRouter(final @NotNull Router router) {
ctx.fail(new RuntimeException("Filename must be specified to open the file"));
return;
}
final boolean verbose = Boolean.parseBoolean(System.getProperty("HICT_VERBOSE", "false"));
if (verbose) {
log.info("Opening file " + filename);
}

final @NotNull @NonNull LocalMap<String, Object> map = vertx.sharedData().getLocalMap("hict_server");

final var chunkedFile = new ChunkedFile(
final var progress = new io.vertx.core.json.JsonObject()
.put("stage", "starting")
.put("progress", 0.0);
map.put("openProgress", progress);

final var chunkedFile = Initializers.withProgressReporter((stage, progressValue) -> {
map.put("openProgress", new io.vertx.core.json.JsonObject()
.put("stage", stage)
.put("progress", progressValue));
if (verbose) {
log.info(String.format("Open progress: %s (%.1f%%)", stage, progressValue * 100.0));
}
}, () -> new ChunkedFile(
new ChunkedFile.ChunkedFileOptions(
Path.of(dataDirectory.toString(), filename),
(int) map.getOrDefault("MIN_DS_POOL", 4),
(int) map.getOrDefault("MAX_DS_POOL", 16)
)
);
));
final var chunkedFileWrapper = new ShareableWrappers.ChunkedFileWrapper(chunkedFile);

log.info("Putting chunkedFile into the local map");
Expand All @@ -94,11 +111,31 @@ public void addHandlersToRouter(final @NotNull Router router) {

map.put("TileStatisticHolder", TileStatisticHolder.newDefaultStatisticHolder(chunkedFile.getResolutions().length));

map.put("openProgress", new io.vertx.core.json.JsonObject()
.put("stage", "done")
.put("progress", 1.0));

ctx.response()
.putHeader("content-type", "application/json")
.end(Json.encode(generateOpenFileResponse(chunkedFile)));
});

router.post("/open_progress").handler(ctx -> {
final @NotNull @NonNull LocalMap<String, Object> map = vertx.sharedData().getLocalMap("hict_server");
final var progressObj = map.get("openProgress");
if (!(progressObj instanceof io.vertx.core.json.JsonObject)) {
ctx.response()
.putHeader("content-type", "application/json")
.end(Json.encode(new io.vertx.core.json.JsonObject()
.put("stage", "idle")
.put("progress", 0.0)));
return;
}
ctx.response()
.putHeader("content-type", "application/json")
.end(((io.vertx.core.json.JsonObject) progressObj).encode());
});

router.post("/attach").blockingHandler(ctx -> {
final @NotNull @NonNull LocalMap<String, Object> map = vertx.sharedData().getLocalMap("hict_server");
final var chunkedFileWrapper = ((ShareableWrappers.ChunkedFileWrapper) (map.get("chunkedFile")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public static void main(final String[] args) {
}

final CommandLine.ParseResult parseResult = commandLine.parseArgs(args);
if (parseResult.hasMatchedOption("verbose")) {
System.setProperty("HICT_VERBOSE", "true");
}
final var subcommand = parseResult.subcommand();
final String subcommandName = subcommand != null ? subcommand.commandSpec().name() : "";

Expand Down
Loading