Skip to content

Commit 5017b56

Browse files
committed
bug squashing
1 parent 1e021fa commit 5017b56

File tree

8 files changed

+238
-45
lines changed

8 files changed

+238
-45
lines changed

test/external-modules/seek-tracking-directory/src/internalClusterTest/java/org/elasticsearch/test/seektracker/SeekTrackerPluginIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@
1212
import org.elasticsearch.index.query.QueryBuilders;
1313
import org.elasticsearch.plugins.Plugin;
1414
import org.elasticsearch.test.ESIntegTestCase;
15-
import org.junit.Test;
1615

17-
import java.io.IOException;
1816
import java.util.ArrayList;
1917
import java.util.Collection;
2018
import java.util.List;
@@ -29,7 +27,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
2927
return List.of(SeekTrackerPlugin.class);
3028
}
3129

32-
public void testSeekTrackerPlugin() throws IOException, InterruptedException {
30+
public void testSeekTrackerPlugin() throws InterruptedException {
3331

3432
assertAcked(client().admin().indices().prepareCreate("index"));
3533
List<IndexRequestBuilder> docs = new ArrayList<>();
@@ -43,7 +41,6 @@ public void testSeekTrackerPlugin() throws IOException, InterruptedException {
4341
SeekStatsResponse response = client().execute(SeekStatsAction.INSTANCE, new SeekStatsRequest("index")).actionGet();
4442
List<ShardSeekStats> shardSeekStats = response.getSeekStats().get("index");
4543
assertThat(shardSeekStats.size(), greaterThan(0));
46-
4744
}
4845

4946
}

test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/IndexSeekTracker.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
package org.elasticsearch.test.seektracker;
1010

1111
import java.util.ArrayList;
12-
import java.util.Collection;
1312
import java.util.HashMap;
1413
import java.util.List;
1514
import java.util.Map;

test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/RestSeekStatsAction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,16 @@ public String getName() {
2626

2727
@Override
2828
public List<Route> routes() {
29-
return List.of(new RestHandler.Route(RestRequest.Method.GET, "/_seek_stats"),
30-
new RestHandler.Route(RestRequest.Method.GET, "/{index}/_seek_stats"));
29+
return List.of(
30+
new RestHandler.Route(RestRequest.Method.GET, "/_seek_stats"),
31+
new RestHandler.Route(RestRequest.Method.GET, "/{index}/_seek_stats")
32+
);
3133
}
3234

3335
@Override
3436
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
3537
String[] indices = request.paramAsStringArray("index", Strings.EMPTY_ARRAY);
3638
SeekStatsRequest seekStatsRequest = new SeekStatsRequest(indices);
37-
return channel -> client.executeLocally(SeekStatsAction.INSTANCE, seekStatsRequest, new RestToXContentListener<>(channel));
39+
return channel -> client.execute(SeekStatsAction.INSTANCE, seekStatsRequest, new RestToXContentListener<>(channel));
3840
}
3941
}

test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekStatsResponse.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.action.FailedNodeException;
1212
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
1313
import org.elasticsearch.cluster.ClusterName;
14+
import org.elasticsearch.common.Strings;
1415
import org.elasticsearch.common.io.stream.StreamInput;
1516
import org.elasticsearch.common.io.stream.StreamOutput;
1617
import org.elasticsearch.xcontent.ToXContent;
@@ -58,10 +59,14 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
5859
public Map<String, List<ShardSeekStats>> getSeekStats() {
5960
Map<String, List<ShardSeekStats>> combined = new HashMap<>();
6061
for (NodeSeekStats nodeSeekStats : getNodes()) {
61-
nodeSeekStats.getSeekStats().forEach((index, shardSeekStats) -> {
62-
combined.computeIfAbsent(index, k -> new ArrayList<>()).addAll(shardSeekStats);
63-
});
62+
nodeSeekStats.getSeekStats()
63+
.forEach((index, shardSeekStats) -> combined.computeIfAbsent(index, k -> new ArrayList<>()).addAll(shardSeekStats));
6464
}
6565
return combined;
6666
}
67+
68+
@Override
69+
public String toString() {
70+
return Strings.toString(this);
71+
}
6772
}

test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekTrackerPlugin.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ public Collection<Object> createComponents(
6262
return Collections.singletonList(seekStatsService);
6363
}
6464

65-
6665
// seeks per index/shard/file
6766

6867
@Override

test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/SeekTrackingDirectoryWrapper.java

Lines changed: 216 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212
import org.apache.lucene.store.FilterDirectory;
1313
import org.apache.lucene.store.IOContext;
1414
import org.apache.lucene.store.IndexInput;
15+
import org.apache.lucene.store.RandomAccessInput;
1516
import org.elasticsearch.cluster.routing.ShardRouting;
1617
import org.elasticsearch.index.IndexModule;
1718

1819
import java.io.IOException;
20+
import java.util.Map;
21+
import java.util.Set;
1922

2023
public class SeekTrackingDirectoryWrapper implements IndexModule.DirectoryWrapper {
2124

@@ -31,48 +34,231 @@ public Directory wrap(Directory directory, ShardRouting shardRouting) {
3134
return new FilterDirectory(directory) {
3235
@Override
3336
public IndexInput openInput(String name, IOContext context) throws IOException {
34-
return wrapIndexInput(shardRouting.shardId().toString(), name, super.openInput(name, context));
37+
IndexInput input = super.openInput(name, context);
38+
if (input instanceof RandomAccessInput) {
39+
return new RandomAccessSeekCountingIndexInput(input, shardRouting.shardId().toString(), name);
40+
}
41+
return wrapIndexInput(shardRouting.shardId().toString(), name, input);
3542
}
3643
};
3744
}
3845

3946
private IndexInput wrapIndexInput(String directory, String name, IndexInput in) {
40-
return new IndexInput(in.toString()) {
41-
@Override
42-
public void close() throws IOException {
43-
in.close();
44-
}
47+
return new SeekCountingIndexInput(in, directory, name);
48+
}
4549

46-
@Override
47-
public long getFilePointer() {
48-
return in.getFilePointer();
49-
}
50+
class RandomAccessSeekCountingIndexInput extends SeekCountingIndexInput implements RandomAccessInput {
5051

51-
@Override
52-
public void seek(long pos) throws IOException {
53-
in.seek(pos);
54-
seekTracker.increment(directory, name);
55-
}
52+
private final RandomAccessInput randomAccessInput;
5653

57-
@Override
58-
public long length() {
59-
return in.length();
60-
}
54+
RandomAccessSeekCountingIndexInput(IndexInput in, String directory, String name) {
55+
super(in, directory, name);
56+
randomAccessInput = (RandomAccessInput) unwrap(in);
57+
}
6158

62-
@Override
63-
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
64-
return wrapIndexInput(directory, name, in.slice(sliceDescription, offset, length));
65-
}
59+
@Override
60+
public IndexInput clone() {
61+
return new RandomAccessSeekCountingIndexInput(super.clone(), directory, name);
62+
}
6663

67-
@Override
68-
public byte readByte() throws IOException {
69-
return in.readByte();
64+
@Override
65+
public byte readByte(long pos) throws IOException {
66+
return randomAccessInput.readByte(pos);
67+
}
68+
69+
@Override
70+
public short readShort(long pos) throws IOException {
71+
return randomAccessInput.readShort(pos);
72+
}
73+
74+
@Override
75+
public int readInt(long pos) throws IOException {
76+
return randomAccessInput.readInt(pos);
77+
}
78+
79+
@Override
80+
public long readLong(long pos) throws IOException {
81+
return randomAccessInput.readLong(pos);
82+
}
83+
}
84+
85+
class SeekCountingIndexInput extends IndexInput {
86+
87+
public static IndexInput unwrap(IndexInput input) {
88+
while (input instanceof SeekCountingIndexInput) {
89+
input = ((SeekCountingIndexInput) input).in;
7090
}
91+
return input;
92+
}
7193

72-
@Override
73-
public void readBytes(byte[] b, int offset, int len) throws IOException {
74-
in.readBytes(b, offset, len);
94+
final IndexInput in;
95+
final String directory;
96+
final String name;
97+
98+
SeekCountingIndexInput(IndexInput in, String directory, String name) {
99+
super(unwrap(in).toString() + "[seek_tracked]");
100+
this.in = unwrap(in);
101+
this.directory = directory;
102+
this.name = name;
103+
}
104+
105+
@Override
106+
public IndexInput clone() {
107+
return new SeekCountingIndexInput(in.clone(), directory, name);
108+
}
109+
110+
@Override
111+
public void close() throws IOException {
112+
in.close();
113+
}
114+
115+
@Override
116+
public long getFilePointer() {
117+
return in.getFilePointer();
118+
}
119+
120+
@Override
121+
public void seek(long pos) throws IOException {
122+
in.seek(pos);
123+
seekTracker.increment(directory, name);
124+
}
125+
126+
@Override
127+
public long length() {
128+
return in.length();
129+
}
130+
131+
@Override
132+
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
133+
return wrapIndexInput(directory, name, in.slice(sliceDescription + "[seek_tracked]", offset, length));
134+
}
135+
136+
@Override
137+
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
138+
final IndexInput innerSlice = in.slice("randomaccess", offset, length);
139+
if (innerSlice instanceof RandomAccessInput) {
140+
// slice() already supports random access
141+
return new RandomAccessSeekCountingIndexInput(innerSlice, directory, name);
142+
} else {
143+
IndexInput slice = wrapIndexInput(directory, name, innerSlice);
144+
// return default impl
145+
return new RandomAccessInput() {
146+
@Override
147+
public byte readByte(long pos) throws IOException {
148+
slice.seek(pos);
149+
return slice.readByte();
150+
}
151+
152+
@Override
153+
public short readShort(long pos) throws IOException {
154+
slice.seek(pos);
155+
return slice.readShort();
156+
}
157+
158+
@Override
159+
public int readInt(long pos) throws IOException {
160+
slice.seek(pos);
161+
return slice.readInt();
162+
}
163+
164+
@Override
165+
public long readLong(long pos) throws IOException {
166+
slice.seek(pos);
167+
return slice.readLong();
168+
}
169+
170+
@Override
171+
public String toString() {
172+
return "RandomAccessInput(" + slice + ")";
173+
}
174+
};
75175
}
76-
};
176+
}
177+
178+
@Override
179+
public byte readByte() throws IOException {
180+
return in.readByte();
181+
}
182+
183+
@Override
184+
public void readBytes(byte[] b, int offset, int len) throws IOException {
185+
in.readBytes(b, offset, len);
186+
}
187+
188+
@Override
189+
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
190+
in.readBytes(b, offset, len, useBuffer);
191+
}
192+
193+
@Override
194+
public short readShort() throws IOException {
195+
return in.readShort();
196+
}
197+
198+
@Override
199+
public int readInt() throws IOException {
200+
return in.readInt();
201+
}
202+
203+
@Override
204+
public int readVInt() throws IOException {
205+
return in.readVInt();
206+
}
207+
208+
@Override
209+
public int readZInt() throws IOException {
210+
return in.readZInt();
211+
}
212+
213+
@Override
214+
public long readLong() throws IOException {
215+
return in.readLong();
216+
}
217+
218+
@Override
219+
public long readVLong() throws IOException {
220+
return in.readVLong();
221+
}
222+
223+
@Override
224+
public long readZLong() throws IOException {
225+
return in.readZLong();
226+
}
227+
228+
@Override
229+
public String readString() throws IOException {
230+
return in.readString();
231+
}
232+
233+
@Override
234+
public Map<String, String> readMapOfStrings() throws IOException {
235+
return in.readMapOfStrings();
236+
}
237+
238+
@Override
239+
public Set<String> readSetOfStrings() throws IOException {
240+
return in.readSetOfStrings();
241+
}
242+
243+
@Override
244+
public void skipBytes(long numBytes) throws IOException {
245+
in.skipBytes(numBytes);
246+
}
247+
248+
@Override
249+
public void readFloats(float[] floats, int offset, int len) throws IOException {
250+
in.readFloats(floats, offset, len);
251+
}
252+
253+
@Override
254+
public void readLongs(long[] dst, int offset, int length) throws IOException {
255+
in.readLongs(dst, offset, length);
256+
}
257+
258+
@Override
259+
public void readInts(int[] dst, int offset, int length) throws IOException {
260+
in.readInts(dst, offset, length);
261+
}
262+
77263
}
78264
}

test/external-modules/seek-tracking-directory/src/main/java/org/elasticsearch/test/seektracker/ShardSeekStats.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
1313
import org.elasticsearch.common.io.stream.Writeable;
14+
import org.elasticsearch.xcontent.ToXContentObject;
15+
import org.elasticsearch.xcontent.XContentBuilder;
1416

1517
import java.io.IOException;
1618
import java.util.Map;
1719

18-
public record ShardSeekStats(String shard, Map<String, Long> seeksPerFile) implements Writeable {
20+
public record ShardSeekStats(String shard, Map<String, Long> seeksPerFile) implements Writeable, ToXContentObject {
1921

2022
public ShardSeekStats(StreamInput in) throws IOException {
2123
this(in.readString(), in.readMap(StreamInput::readString, StreamInput::readLong));
@@ -26,4 +28,9 @@ public void writeTo(StreamOutput out) throws IOException {
2628
out.writeString(this.shard);
2729
out.writeMap(this.seeksPerFile, StreamOutput::writeString, StreamOutput::writeLong);
2830
}
31+
32+
@Override
33+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
34+
return builder.startObject().field("shard", this.shard).field("seeks", seeksPerFile).endObject();
35+
}
2936
}

0 commit comments

Comments
 (0)