Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,9 @@ public void writeTo(StreamOutput out) throws IOException {

out.writeBoolean(supported);
}

@Override
public String toString() {
return "NodeCapability{supported=" + supported + '}';
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,46 @@ default boolean mvSortedAscending() {
/**
* Expand multivalued fields into one row per value. Returns the same block if there aren't any multivalued
* fields to expand. The returned block needs to be closed by the caller to release the block's resources.
* TODO: pass BlockFactory
*/
Block expand();

/**
* Build a {@link Block} with a {@code null} inserted {@code before} each
* listed position.
* <p>
* Note: {@code before} must be non-decreasing.
* </p>
*/
default Block insertNulls(IntVector before) {
// TODO remove default and scatter to implementation where it can be a lot more efficient
int myCount = getPositionCount();
int beforeCount = before.getPositionCount();
try (Builder builder = elementType().newBlockBuilder(myCount + beforeCount, blockFactory())) {
int beforeP = 0;
int nextNull = before.getInt(beforeP);
for (int mainP = 0; mainP < myCount; mainP++) {
while (mainP == nextNull) {
builder.appendNull();
beforeP++;
if (beforeP >= beforeCount) {
builder.copyFrom(this, mainP, myCount);
return builder.build();
}
nextNull = before.getInt(beforeP);
}
// This line right below this is the super inefficient one.
builder.copyFrom(this, mainP, mainP + 1);
}
assert nextNull == myCount;
while (beforeP < beforeCount) {
nextNull = before.getInt(beforeP++);
assert nextNull == myCount;
builder.appendNull();
}
return builder.build();
}
}

/**
* Builds {@link Block}s. Typically, you use one of it's direct supinterfaces like {@link IntBlock.Builder}.
* This is {@link Releasable} and should be released after building the block or if building the block fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,9 @@ public OrdinalBytesRefBlock expand() {
public long ramBytesUsed() {
return ordinals.ramBytesUsed() + bytes.ramBytesUsed();
}

@Override
public String toString() {
return getClass().getSimpleName() + "[ordinals=" + ordinals + ", bytes=" + bytes + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ $endif$
int valueCount = getValueCount(pos);
int first = getFirstValueIndex(pos);
if (valueCount == 1) {
builder.append$Type$(get$Type$(getFirstValueIndex(pos)$if(BytesRef)$, scratch$endif$));
builder.append$Type$(get$Type$(first$if(BytesRef)$, scratch$endif$));
} else {
builder.beginPositionEntry();
for (int c = 0; c < valueCount; c++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@
import java.util.concurrent.atomic.LongAdder;

/**
* {@link AsyncOperator} performs an external computation specified in {@link #performAsync(Page, ActionListener)}.
* This operator acts as a client and operates on a per-page basis to reduce communication overhead.
* {@link AsyncOperator} performs an external computation specified in
* {@link #performAsync(Page, ActionListener)}. This operator acts as a client
* to reduce communication overhead and fetches a {@code Fetched} at a time.
* It's the responsibility of subclasses to transform that {@code Fetched} into
* output.
* @see #performAsync(Page, ActionListener)
*/
public abstract class AsyncOperator implements Operator {
public abstract class AsyncOperator<Fetched> implements Operator {

private volatile SubscribableListener<Void> blockedFuture;

private final Map<Long, Page> buffers = ConcurrentCollections.newConcurrentMap();
private final Map<Long, Fetched> buffers = ConcurrentCollections.newConcurrentMap();
private final FailureCollector failureCollector = new FailureCollector();
private final DriverContext driverContext;

Expand Down Expand Up @@ -83,7 +86,7 @@ public void addInput(Page input) {
driverContext.addAsyncAction();
boolean success = false;
try {
final ActionListener<Page> listener = ActionListener.wrap(output -> {
final ActionListener<Fetched> listener = ActionListener.wrap(output -> {
buffers.put(seqNo, output);
onSeqNoCompleted(seqNo);
}, e -> {
Expand All @@ -104,18 +107,20 @@ public void addInput(Page input) {
}
}

private void releasePageOnAnyThread(Page page) {
protected static void releasePageOnAnyThread(Page page) {
page.allowPassingToDifferentDriver();
page.releaseBlocks();
}

protected abstract void releaseFetchedOnAnyThread(Fetched result);

/**
* Performs an external computation and notify the listener when the result is ready.
*
* @param inputPage the input page
* @param listener the listener
*/
protected abstract void performAsync(Page inputPage, ActionListener<Page> listener);
protected abstract void performAsync(Page inputPage, ActionListener<Fetched> listener);

protected abstract void doClose();

Expand All @@ -125,7 +130,7 @@ private void onSeqNoCompleted(long seqNo) {
notifyIfBlocked();
}
if (closed || failureCollector.hasFailure()) {
discardPages();
discardResults();
}
}

Expand All @@ -145,18 +150,18 @@ private void notifyIfBlocked() {
private void checkFailure() {
Exception e = failureCollector.getFailure();
if (e != null) {
discardPages();
discardResults();
throw ExceptionsHelper.convertToRuntime(e);
}
}

private void discardPages() {
private void discardResults() {
long nextCheckpoint;
while ((nextCheckpoint = checkpoint.getPersistedCheckpoint() + 1) <= checkpoint.getProcessedCheckpoint()) {
Page page = buffers.remove(nextCheckpoint);
Fetched result = buffers.remove(nextCheckpoint);
checkpoint.markSeqNoAsPersisted(nextCheckpoint);
if (page != null) {
releasePageOnAnyThread(page);
if (result != null) {
releaseFetchedOnAnyThread(result);
}
}
}
Expand All @@ -165,7 +170,7 @@ private void discardPages() {
public final void close() {
finish();
closed = true;
discardPages();
discardResults();
doClose();
}

Expand All @@ -184,15 +189,18 @@ public boolean isFinished() {
}
}

@Override
public Page getOutput() {
/**
* Get a {@link Fetched} from the buffer.
* @return a result if one is ready or {@code null} if none are available.
*/
public final Fetched fetchFromBuffer() {
checkFailure();
long persistedCheckpoint = checkpoint.getPersistedCheckpoint();
if (persistedCheckpoint < checkpoint.getProcessedCheckpoint()) {
persistedCheckpoint++;
Page page = buffers.remove(persistedCheckpoint);
Fetched result = buffers.remove(persistedCheckpoint);
checkpoint.markSeqNoAsPersisted(persistedCheckpoint);
return page;
return result;
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,32 @@
import java.util.Objects;

/**
* Combines values at the given blocks with the same positions into a single position for the blocks at the given channels
* Combines values at the given blocks with the same positions into a single position
* for the blocks at the given channels.
* <p>
* Example, input pages consisting of three blocks:
* positions | field-1 | field-2 |
* -----------------------------------
* </p>
* <pre>{@code
* | positions | field-1 | field-2 |
* ------------------------------------
* Page 1:
* 1 | a,b | 2020 |
* 1 | c | 2021 |
* ---------------------------------
* | 1 | a,b | 2020 |
* | 1 | c | 2021 |
* Page 2:
* 2 | a,e | 2021 |
* ---------------------------------
* | 2 | a,e | 2021 |
* Page 3:
* 4 | d | null |
* ---------------------------------
* | 4 | d | null |
* }</pre>
* Output:
* <pre>{@code
* | field-1 | field-2 |
* ---------------------------
* | null | null |
* | a,b,c | 2020,2021 |
* | a,e | 2021 |
* | null | null |
* | d | 2023 |
* }</pre>
*/
public final class MergePositionsOperator implements Operator {
private boolean finished = false;
Expand Down
Loading