Skip to content
2 changes: 2 additions & 0 deletions src/main/java/de/rub/nds/crawler/constant/JobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
public enum JobStatus {
/** Job is waiting to be executed. */
TO_BE_EXECUTED(false),
/** Job is currently being executed. Partial results may be available in DB. */
RUNNING(false),
/** The domain was not resolvable. An empty result was written to DB. */
UNRESOLVABLE(true),
/** An uncaught exception occurred while resolving the host. */
Expand Down
74 changes: 62 additions & 12 deletions src/main/java/de/rub/nds/crawler/core/BulkScanWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
package de.rub.nds.crawler.core;

import de.rub.nds.crawler.data.ScanConfig;
import de.rub.nds.crawler.data.ScanJobDescription;
import de.rub.nds.crawler.data.ScanTarget;
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import de.rub.nds.crawler.util.CanceallableThreadPoolExecutor;
import de.rub.nds.scanner.core.execution.NamedThreadFactory;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.bson.Document;
Expand All @@ -41,6 +43,9 @@ public abstract class BulkScanWorker<T extends ScanConfig> {
/** The scan configuration for this worker */
protected final T scanConfig;

/** The persistence provider for writing partial results */
protected final IPersistenceProvider persistenceProvider;

/**
* Calls the inner scan function and may handle cleanup. This is needed to wrap the scanner into
* a future object such that we can handle timeouts properly.
Expand All @@ -55,10 +60,16 @@ public abstract class BulkScanWorker<T extends ScanConfig> {
* @param scanConfig The scan configuration for this worker
* @param parallelScanThreads The number of parallel scan threads to use, i.e., how many {@link
* ScanTarget}s to handle in parallel.
* @param persistenceProvider The persistence provider for writing partial results
*/
protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThreads) {
protected BulkScanWorker(
String bulkScanId,
T scanConfig,
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
this.bulkScanId = bulkScanId;
this.scanConfig = scanConfig;
this.persistenceProvider = persistenceProvider;

timeoutExecutor =
new CanceallableThreadPoolExecutor(
Expand All @@ -74,31 +85,59 @@ protected BulkScanWorker(String bulkScanId, T scanConfig, int parallelScanThread
* Handles a scan target by submitting it to the executor. If init was not called, it will
* initialize itself. In this case it will also clean up itself if all jobs are done.
*
* @param scanTarget The target to scan.
* @return A future that resolves to the scan result once the scan is done.
* <p>Returns a {@link ProgressableFuture} that represents the entire scan lifecycle, allowing
* callers to:
*
* <ul>
* <li>Get partial results as the scan progresses via {@link
* ProgressableFuture#getCurrentResult()}
* <li>Wait for the final result via {@link ProgressableFuture#get()}
* </ul>
*
* @param jobDescription The job description for this scan.
* @return A ProgressableFuture representing the scan lifecycle
*/
public Future<Document> handle(ScanTarget scanTarget) {
public ProgressableFuture<Document> handle(ScanJobDescription jobDescription) {
// if we initialized ourself, we also clean up ourself
shouldCleanupSelf.weakCompareAndSetAcquire(false, init());
activeJobs.incrementAndGet();
return timeoutExecutor.submit(

ProgressableFuture<Document> progressableFuture = new ProgressableFuture<>();

// Compose a consumer that both updates the future and persists partial results
Consumer<Document> progressConsumer =
partialResult -> {
progressableFuture.updateResult(partialResult);
persistPartialResult(jobDescription, partialResult);
};

timeoutExecutor.submit(
() -> {
Document result = scan(scanTarget);
if (activeJobs.decrementAndGet() == 0 && shouldCleanupSelf.get()) {
cleanup();
try {
Document result = scan(jobDescription, progressConsumer);
progressableFuture.complete(result);
} catch (Exception e) {
progressableFuture.completeExceptionally(e);
} finally {
if (activeJobs.decrementAndGet() == 0 && shouldCleanupSelf.get()) {
cleanup();
}
}
return result;
});

return progressableFuture;
}

/**
* Scans a target and returns the result as a Document. This is the core scanning functionality
* that must be implemented by subclasses.
*
* @param scanTarget The target to scan
* @param jobDescription The job description containing target and metadata
* @param progressConsumer Consumer to call with partial results during scanning
* @return The scan result as a Document
*/
public abstract Document scan(ScanTarget scanTarget);
public abstract Document scan(
ScanJobDescription jobDescription, Consumer<Document> progressConsumer);

/**
* Initializes this worker if it hasn't been initialized yet. This method is thread-safe and
Expand Down Expand Up @@ -161,4 +200,15 @@ public final boolean cleanup() {
* specific resources.
*/
protected abstract void cleanupInternal();

/**
* Persists a partial scan result. This method can be called by subclasses during scanning to
* save intermediate results.
*
* @param jobDescription The job description for the scan
* @param partialResult The partial result document to persist
*/
protected void persistPartialResult(ScanJobDescription jobDescription, Document partialResult) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not somehow linked to ScheduledScan.updateResult?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see two approaches:
Either this function updates the results in the ScheduledScan object
Or this function is registered as an observer for updates on the ScheduledScan object; i.e. if updateResult is called, it fires an even that causes persistPratialResult to be called.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or are there cases where one function is called and not the other?

persistenceProvider.upsertPartialResult(jobDescription, partialResult);
}
}
40 changes: 27 additions & 13 deletions src/main/java/de/rub/nds/crawler/core/BulkScanWorkerManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import de.rub.nds.crawler.data.BulkScanInfo;
import de.rub.nds.crawler.data.ScanConfig;
import de.rub.nds.crawler.data.ScanJobDescription;
import de.rub.nds.crawler.persistence.IPersistenceProvider;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.UncheckedException;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -58,21 +58,27 @@ public static BulkScanWorkerManager getInstance() {

/**
* Static convenience method to handle a scan job. See also {@link #handle(ScanJobDescription,
* int, int)}.
* int, int, IPersistenceProvider)}.
*
* @param scanJobDescription The scan job to handle
* @param parallelConnectionThreads The number of parallel connection threads to use (used to
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @return A future that returns the scan result when the target is scanned is done
* @param persistenceProvider The persistence provider for writing partial results
* @return A ProgressableFuture representing the scan lifecycle
*/
public static Future<Document> handleStatic(
public static ProgressableFuture<Document> handleStatic(
ScanJobDescription scanJobDescription,
int parallelConnectionThreads,
int parallelScanThreads) {
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
BulkScanWorkerManager manager = getInstance();
return manager.handle(scanJobDescription, parallelConnectionThreads, parallelScanThreads);
return manager.handle(
scanJobDescription,
parallelConnectionThreads,
parallelScanThreads,
persistenceProvider);
}

private final Cache<String, BulkScanWorker<?>> bulkScanWorkers;
Expand Down Expand Up @@ -102,21 +108,26 @@ private BulkScanWorkerManager() {
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @param persistenceProvider The persistence provider for writing partial results
* @return A bulk scan worker for the specified bulk scan
* @throws UncheckedException If a worker cannot be created
*/
public BulkScanWorker<?> getBulkScanWorker(
String bulkScanId,
ScanConfig scanConfig,
int parallelConnectionThreads,
int parallelScanThreads) {
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
try {
return bulkScanWorkers.get(
bulkScanId,
() -> {
BulkScanWorker<?> ret =
scanConfig.createWorker(
bulkScanId, parallelConnectionThreads, parallelScanThreads);
bulkScanId,
parallelConnectionThreads,
parallelScanThreads,
persistenceProvider);
ret.init();
return ret;
});
Expand All @@ -135,19 +146,22 @@ public BulkScanWorker<?> getBulkScanWorker(
* create worker if it does not exist)
* @param parallelScanThreads The number of parallel scan threads to use (used to create worker
* if it does not exist)
* @return A future that returns the scan result when the target is scanned is done
* @param persistenceProvider The persistence provider for writing partial results
* @return A ProgressableFuture representing the scan lifecycle
*/
public Future<Document> handle(
public ProgressableFuture<Document> handle(
ScanJobDescription scanJobDescription,
int parallelConnectionThreads,
int parallelScanThreads) {
int parallelScanThreads,
IPersistenceProvider persistenceProvider) {
BulkScanInfo bulkScanInfo = scanJobDescription.getBulkScanInfo();
BulkScanWorker<?> worker =
getBulkScanWorker(
bulkScanInfo.getBulkScanId(),
bulkScanInfo.getScanConfig(),
parallelConnectionThreads,
parallelScanThreads);
return worker.handle(scanJobDescription.getScanTarget());
parallelScanThreads,
persistenceProvider);
return worker.handle(scanJobDescription);
}
}
101 changes: 101 additions & 0 deletions src/main/java/de/rub/nds/crawler/core/ProgressableFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* TLS-Crawler - A TLS scanning tool to perform large scale scans with the TLS-Scanner
*
* Copyright 2018-2023 Ruhr University Bochum, Paderborn University, and Hackmanit GmbH
*
* Licensed under Apache License, Version 2.0
* http://www.apache.org/licenses/LICENSE-2.0.txt
*/
package de.rub.nds.crawler.core;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* A Future implementation that supports tracking progress through partial results.
*
* <p>This class extends the standard {@link Future} contract with the ability to:
*
* <ul>
* <li>Get the current partial result via {@link #getCurrentResult()}
* <li>Update the partial result as work progresses via {@link #updateResult(Object)}
* <li>Wait for the final result via standard Future methods ({@link #get()}, {@link #get(long,
* TimeUnit)})
* </ul>
*
* @param <T> The type of result this future produces
*/
public class ProgressableFuture<T> implements Future<T> {

Check warning on line 31 in src/main/java/de/rub/nds/crawler/core/ProgressableFuture.java

View check run for this annotation

Jenkins CI - TLS-Attacker / JavaDoc

-

NORMAL: use of default constructor, which does not provide a comment

private volatile T currentResult;
private final CompletableFuture<T> delegate = new CompletableFuture<>();

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return delegate.cancel(mayInterruptIfRunning);
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public T get() throws InterruptedException, ExecutionException {
return delegate.get();
}

@Override
public T get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}

/**
* Get the current result. If the operation is still in progress, this returns the latest
* partial result. If the operation is complete, this returns the final result.
*
* @return The current result, or null if no result is available yet
*/
public T getCurrentResult() {
return currentResult;
}

/**
* Update the current result with a partial result. This is called during processing when new
* partial results are available.
*
* @param partialResult The updated partial result
*/
public void updateResult(T partialResult) {
this.currentResult = partialResult;
}

/**
* Mark the operation as complete with the final result. This will complete the Future and
* notify any waiting consumers.
*
* @param result The final result
*/
void complete(T result) {
this.currentResult = result;
this.delegate.complete(result);
}

/**
* Mark the operation as failed with an exception.
*
* @param exception The exception that caused the failure
*/
void completeExceptionally(Throwable exception) {
this.delegate.completeExceptionally(exception);
}
}
Loading