Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 49 additions & 73 deletions query/src/main/java/tech/ydb/query/tools/QueryReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import tech.ydb.core.Issue;
import tech.ydb.core.Result;
Expand All @@ -25,12 +24,12 @@
*/
public class QueryReader implements Iterable<ResultSetReader> {
private final QueryInfo info;
private final List<Issue> isssues;
private final List<ResultSetParts> results;
private final List<Issue> issues;
private final List<ResultSetReader> results;

QueryReader(QueryInfo info, List<Issue> issues, List<ResultSetParts> results) {
private QueryReader(QueryInfo info, List<Issue> issues, List<ResultSetReader> results) {
this.info = info;
this.isssues = issues;
this.issues = issues;
this.results = results;
}

Expand All @@ -43,11 +42,11 @@ public int getResultSetCount() {
}

public ResultSetReader getResultSet(int index) {
return new CompositeResultSet(results.get(index).getParts());
return results.get(index);
}

public List<Issue> getIssueList() {
return this.isssues;
return this.issues;
}

public static CompletableFuture<Result<QueryReader>> readFrom(QueryStream stream) {
Expand All @@ -57,45 +56,32 @@ public static CompletableFuture<Result<QueryReader>> readFrom(QueryStream stream

@Override
public Iterator<ResultSetReader> iterator() {
return new IteratorImpl(results.iterator());
}

private class IteratorImpl implements Iterator<ResultSetReader> {
private final Iterator<ResultSetParts> iter;

IteratorImpl(Iterator<ResultSetParts> iter) {
this.iter = iter;
}

@Override
public boolean hasNext() {
return iter.hasNext();
}

@Override
public ResultSetReader next() {
return new CompositeResultSet(iter.next().getParts());
}
return results.iterator();
}

private static class PartsCollector implements QueryStream.PartsHandler {
private final List<Issue> issueList = new ArrayList<>();
private final SortedMap<Long, ResultSetParts> results = new TreeMap<>();
private final SortedMap<Long, List<QueryResultPart>> results = new TreeMap<>();

QueryReader toReader(QueryInfo info) {
List<ResultSetParts> ordered = new ArrayList<>();
List<List<QueryResultPart>> ordered = new ArrayList<>();
long lastInserted = 0;
for (Map.Entry<Long, ResultSetParts> entry: results.entrySet()) {
for (Map.Entry<Long, List<QueryResultPart>> entry: results.entrySet()) {
long key = entry.getKey();
while (lastInserted + 1 < key) {
ordered.add(new ResultSetParts(lastInserted));
ordered.add(new ArrayList<>()); // add empty result for skipped indexes
lastInserted++;
}
ordered.add(entry.getValue());
lastInserted = key;
}

return new QueryReader(info, issueList, ordered);
List<ResultSetReader> resultsList = new ArrayList<>(ordered.size());
for (List<QueryResultPart> queryResult: ordered) {
resultsList.add(new CompositeResultSet(queryResult));
}

return new QueryReader(info, issueList, resultsList);
}

@Override
Expand All @@ -107,42 +93,27 @@ public void onIssues(Issue[] issues) {
public void onNextPart(QueryResultPart part) {
Long index = part.getResultSetIndex();
if (!results.containsKey(index)) {
results.put(index, new ResultSetParts(index));
results.put(index, new ArrayList<>());
}
results.get(index).addPart(part);
}
}

static class ResultSetParts {
private final long resultSetIndex;
private final List<QueryResultPart> parts = new ArrayList<>();

ResultSetParts(long index) {
this.resultSetIndex = index;
}

public void addPart(QueryResultPart part) {
parts.add(part);
}

public long getIndex() {
return resultSetIndex;
}

public List<QueryResultPart> getParts() {
return parts;
results.get(index).add(part);
}
}

private static class CompositeResultSet implements ResultSetReader {
private final List<ResultSetReader> parts;
private final ResultSetReader[] parts;
private final int rowsCount;
private int partIndex = -1;

CompositeResultSet(List<QueryResultPart> list) {
this.parts = list.stream().map(QueryResultPart::getResultSetReader).collect(Collectors.toList());
this.rowsCount = list.stream().mapToInt(QueryResultPart::getResultSetRowsCount).sum();
this.partIndex = parts.isEmpty() ? -1 : 0;
this.parts = new ResultSetReader[list.size()];
int count = 0;
int idx = 0;
for (QueryResultPart part: list) {
this.parts[idx++] = part.getResultSetReader();
count += part.getResultSetRowsCount();
}
this.rowsCount = count;
this.partIndex = list.isEmpty() ? -1 : 0;
}

@Override
Expand All @@ -155,47 +126,47 @@ public int getColumnCount() {
if (partIndex < 0) {
return 0;
}
return parts.get(partIndex).getColumnCount();
return parts[partIndex].getColumnCount();
}

@Override
public String getColumnName(int index) {
if (partIndex < 0) {
return null;
}
return parts.get(partIndex).getColumnName(index);
return parts[partIndex].getColumnName(index);
}

@Override
public int getColumnIndex(String name) {
if (partIndex < 0) {
return -1;
}
return parts.get(partIndex).getColumnIndex(name);
return parts[partIndex].getColumnIndex(name);
}

@Override
public ValueReader getColumn(int index) {
if (partIndex < 0) {
return null;
}
return parts.get(partIndex).getColumn(index);
return parts[partIndex].getColumn(index);
}

@Override
public ValueReader getColumn(String name) {
if (partIndex < 0) {
return null;
}
return parts.get(partIndex).getColumn(name);
return parts[partIndex].getColumn(name);
}

@Override
public Type getColumnType(int index) {
if (partIndex < 0) {
return null;
}
return parts.get(partIndex).getColumnType(index);
return parts[partIndex].getColumnType(index);
}

@Override
Expand All @@ -205,20 +176,25 @@ public int getRowCount() {

@Override
public void setRowIndex(int index) {
if (index < 0 || index >= rowsCount) {
throw new IndexOutOfBoundsException(String.format("Index %s out of bounds for length %s",
index, rowsCount));
}
partIndex = 0;
int currentIdx = index;
while (partIndex < parts.size()) {
int readerRows = parts.get(partIndex).getRowCount();
while (partIndex < parts.length) {
int readerRows = parts[partIndex].getRowCount();
if (currentIdx < readerRows) {
parts.get(partIndex).setRowIndex(currentIdx);
parts[partIndex].setRowIndex(currentIdx);
break;
}
parts.get(partIndex).setRowIndex(readerRows - 1);
parts[partIndex].setRowIndex(readerRows);
currentIdx -= readerRows;
partIndex++;
}
for (int partStep = partIndex + 1; partStep < parts.size(); partStep++) {
parts.get(partStep).setRowIndex(0);

for (int partStep = partIndex + 1; partStep < parts.length; partStep++) {
parts[partStep].setRowIndex(0);
}
}

Expand All @@ -227,10 +203,10 @@ public boolean next() {
if (partIndex < 0) {
return false;
}
boolean res = parts.get(partIndex).next();
while (!res && partIndex < parts.size() - 1) {
boolean res = parts[partIndex].next();
while (!res && partIndex < parts.length - 1) {
partIndex++;
res = parts.get(partIndex).next();
res = parts[partIndex].next();
}
return res;
}
Expand Down
Loading
Loading