Skip to content

Commit 39acec1

Browse files
authored
Merge branch 'main' into fuse_out_of_snapshot
2 parents aa1c221 + 552da22 commit 39acec1

File tree

8 files changed

+138
-98
lines changed

8 files changed

+138
-98
lines changed

docs/changelog/135402.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135402
2+
summary: Improve TSDB ingestion by hashing dimensions only once, using a new auto-populeted `index.dimensions` private index setting
3+
area: TSDB
4+
type: enhancement
5+
issues: []

libs/entitlement/tools/common/src/main/java/org/elasticsearch/entitlement/tools/Utils.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.stream.Collectors;
2525

2626
public class Utils {
27+
private static final FileSystem JRT_FS = FileSystems.getFileSystem(URI.create("jrt:/"));
2728

2829
// TODO Currently ServerProcessBuilder is using --add-modules=ALL-MODULE-PATH, should this rather
2930
// reflect below excludes (except for java.desktop which requires a special handling)?
@@ -47,9 +48,9 @@ public class Utils {
4748
&& m.contains(".internal.") == false
4849
&& m.contains(".incubator.") == false;
4950

50-
private static Map<String, Set<String>> findModuleExports(FileSystem fs) throws IOException {
51+
public static Map<String, Set<String>> findModuleExports() throws IOException {
5152
var modulesExports = new HashMap<String, Set<String>>();
52-
try (var stream = Files.walk(fs.getPath("modules"))) {
53+
try (var stream = Files.walk(JRT_FS.getPath("modules"))) {
5354
stream.filter(p -> p.getFileName().toString().equals("module-info.class")).forEach(x -> {
5455
try (var is = Files.newInputStream(x)) {
5556
var md = ModuleDescriptor.read(is);
@@ -74,21 +75,20 @@ public interface JdkModuleConsumer {
7475
}
7576

7677
public static void walkJdkModules(JdkModuleConsumer c) throws IOException {
77-
walkJdkModules(DEFAULT_MODULE_PREDICATE, c);
78+
walkJdkModules(DEFAULT_MODULE_PREDICATE, Utils.findModuleExports(), c);
7879
}
7980

80-
public static void walkJdkModules(Predicate<String> modulePredicate, JdkModuleConsumer c) throws IOException {
81-
FileSystem fs = FileSystems.getFileSystem(URI.create("jrt:/"));
82-
83-
var moduleExports = Utils.findModuleExports(fs);
84-
try (var stream = Files.walk(fs.getPath("modules"))) {
85-
var modules = stream.filter(x -> x.toString().endsWith(".class"))
86-
.collect(Collectors.groupingBy(x -> x.subpath(1, 2).toString()));
81+
public static void walkJdkModules(Predicate<String> modulePredicate, Map<String, Set<String>> exportsByModule, JdkModuleConsumer c)
82+
throws IOException {
83+
try (var stream = Files.walk(JRT_FS.getPath("modules"))) {
84+
var modules = stream.filter(
85+
x -> x.toString().endsWith(".class") && x.getFileName().toString().equals("module-info.class") == false
86+
).collect(Collectors.groupingBy(x -> x.subpath(1, 2).toString()));
8787

8888
for (var kv : modules.entrySet()) {
8989
var moduleName = kv.getKey();
9090
if (modulePredicate.test(moduleName)) {
91-
var thisModuleExports = moduleExports.get(moduleName);
91+
var thisModuleExports = exportsByModule.get(moduleName);
9292
c.accept(moduleName, kv.getValue(), thisModuleExports);
9393
}
9494
}

libs/entitlement/tools/jdk-api-extractor/src/main/java/org/elasticsearch/entitlement/tools/jdkapi/JdkApiExtractor.java

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.Arrays;
2525
import java.util.Collections;
2626
import java.util.Comparator;
27+
import java.util.HashMap;
2728
import java.util.Map;
2829
import java.util.Set;
2930
import java.util.TreeMap;
@@ -62,21 +63,34 @@ public static void main(String[] args) throws IOException {
6263
validateArgs(args);
6364
boolean deprecationsOnly = optionalArgs(args).anyMatch(DEPRECATIONS_ONLY::equals);
6465

65-
Map<String, Set<AccessibleMethod>> accessibleImplementationsByClass = new TreeMap<>();
66-
Map<String, Set<AccessibleMethod>> accessibleForOverridesByClass = new TreeMap<>();
67-
Map<String, Set<AccessibleMethod>> deprecationsByClass = new TreeMap<>();
66+
final Map<String, String> moduleNameByClass = new HashMap<>();
67+
final Map<String, Set<AccessibleMethod>> accessibleImplementationsByClass = new TreeMap<>();
68+
final Map<String, Set<AccessibleMethod>> accessibleForOverridesByClass = new TreeMap<>();
69+
final Map<String, Set<AccessibleMethod>> deprecationsByClass = new TreeMap<>();
6870

71+
final Map<String, Set<String>> exportsByModule = Utils.findModuleExports();
72+
// 1st: map class names to module names (including later excluded modules) for lookup in 2nd step
73+
Utils.walkJdkModules(m -> true, exportsByModule, (moduleName, moduleClasses, moduleExports) -> {
74+
for (var classFile : moduleClasses) {
75+
String prev = moduleNameByClass.put(internalClassName(classFile, moduleName), moduleName);
76+
if (prev != null) {
77+
throw new IllegalStateException("Class " + classFile + " is in both modules " + prev + " and " + moduleName);
78+
}
79+
}
80+
});
81+
82+
var visitor = new AccessibleClassVisitor(
83+
moduleNameByClass,
84+
exportsByModule,
85+
accessibleImplementationsByClass,
86+
accessibleForOverridesByClass,
87+
deprecationsByClass
88+
);
6989
Predicate<String> modulePredicate = Utils.DEFAULT_MODULE_PREDICATE.or(
7090
m -> optionalArgs(args).anyMatch(INCLUDE_INCUBATOR::equals) && m.contains(".incubator.")
7191
);
72-
73-
Utils.walkJdkModules(modulePredicate, (moduleName, moduleClasses, moduleExports) -> {
74-
var visitor = new AccessibleClassVisitor(
75-
moduleExports,
76-
accessibleImplementationsByClass,
77-
accessibleForOverridesByClass,
78-
deprecationsByClass
79-
);
92+
// 2nd: calculate accessible implementations of classes in included modules
93+
Utils.walkJdkModules(modulePredicate, exportsByModule, (moduleName, moduleClasses, moduleExports) -> {
8094
for (var classFile : moduleClasses) {
8195
// skip if class was already visited earlier due to a dependency on it
8296
if (accessibleImplementationsByClass.containsKey(internalClassName(classFile, moduleName))) {
@@ -91,7 +105,18 @@ public static void main(String[] args) throws IOException {
91105
}
92106
});
93107

94-
writeFile(Path.of(args[0]), deprecationsOnly ? deprecationsByClass : accessibleImplementationsByClass);
108+
// finally, skip some implementations we're not interested in
109+
Predicate<Map.Entry<String, Set<AccessibleMethod>>> predicate = entry -> {
110+
if (entry.getKey().startsWith("com/sun/") && entry.getKey().contains("/internal/")) {
111+
// skip com.sun.*.internal classes as they are not part of the supported JDK API
112+
// even if methods override some publicly visible API
113+
return false;
114+
}
115+
// skip classes that are not part of included modules, but checked due to dependencies
116+
String moduleName = moduleNameByClass.get(entry.getKey());
117+
return modulePredicate.test(moduleName);
118+
};
119+
writeFile(Path.of(args[0]), deprecationsOnly ? deprecationsByClass : accessibleImplementationsByClass, predicate);
95120
}
96121

97122
private static String internalClassName(Path clazz, String moduleName) {
@@ -132,9 +157,17 @@ private static boolean isWritableOutputPath(String pathStr) {
132157
}
133158

134159
@SuppressForbidden(reason = "cli tool printing to standard err/out")
135-
private static void writeFile(Path path, Map<String, Set<AccessibleMethod>> methods) throws IOException {
160+
private static void writeFile(
161+
Path path,
162+
Map<String, Set<AccessibleMethod>> methods,
163+
Predicate<Map.Entry<String, Set<AccessibleMethod>>> predicate
164+
) throws IOException {
136165
System.out.println("Writing result for " + Runtime.version() + " to " + path.toAbsolutePath());
137-
Files.write(path, () -> methods.entrySet().stream().flatMap(AccessibleMethod::toLines).iterator(), StandardCharsets.UTF_8);
166+
Files.write(
167+
path,
168+
() -> methods.entrySet().stream().filter(predicate).flatMap(AccessibleMethod::toLines).iterator(),
169+
StandardCharsets.UTF_8
170+
);
138171
}
139172

140173
record AccessibleMethod(String method, String descriptor, boolean isPublic, boolean isFinal, boolean isStatic) {
@@ -163,7 +196,8 @@ static Stream<CharSequence> toLines(Map.Entry<String, Set<AccessibleMethod>> ent
163196
}
164197

165198
static class AccessibleClassVisitor extends ClassVisitor {
166-
private final Set<String> moduleExports;
199+
private final Map<String, String> moduleNameByClass;
200+
private final Map<String, Set<String>> exportsByModule;
167201
private final Map<String, Set<AccessibleMethod>> accessibleImplementationsByClass;
168202
private final Map<String, Set<AccessibleMethod>> accessibleForOverridesByClass;
169203
private final Map<String, Set<AccessibleMethod>> deprecationsByClass;
@@ -179,13 +213,15 @@ static class AccessibleClassVisitor extends ClassVisitor {
179213
private boolean isExported;
180214

181215
AccessibleClassVisitor(
182-
Set<String> moduleExports,
216+
Map<String, String> moduleNameByClass,
217+
Map<String, Set<String>> exportsByModule,
183218
Map<String, Set<AccessibleMethod>> accessibleImplementationsByClass,
184219
Map<String, Set<AccessibleMethod>> accessibleForOverridesByClass,
185220
Map<String, Set<AccessibleMethod>> deprecationsByClass
186221
) {
187222
super(ASM9);
188-
this.moduleExports = moduleExports;
223+
this.moduleNameByClass = moduleNameByClass;
224+
this.exportsByModule = exportsByModule;
189225
this.accessibleImplementationsByClass = accessibleImplementationsByClass;
190226
this.accessibleForOverridesByClass = accessibleForOverridesByClass;
191227
this.deprecationsByClass = deprecationsByClass;
@@ -214,7 +250,7 @@ public void visit(int version, int access, String name, String signature, String
214250
}
215251
// only initialize local state AFTER visiting all dependencies above!
216252
super.visit(version, access, name, signature, superName, interfaces);
217-
this.isExported = moduleExports.contains(getPackageName(name));
253+
this.isExported = getModuleExports(getModuleName(name)).contains(getPackageName(name));
218254
this.className = name;
219255
this.isPublicClass = (access & ACC_PUBLIC) != 0;
220256
this.isFinalClass = (access & ACC_FINAL) != 0;
@@ -224,6 +260,22 @@ public void visit(int version, int access, String name, String signature, String
224260
this.deprecations = newSortedSet();
225261
}
226262

263+
private String getModuleName(String name) {
264+
String module = moduleNameByClass.get(name);
265+
if (module == null) {
266+
throw new IllegalStateException("Unknown module for class: " + name);
267+
}
268+
return module;
269+
}
270+
271+
private Set<String> getModuleExports(String module) {
272+
Set<String> exports = exportsByModule.get(module);
273+
if (exports == null) {
274+
throw new IllegalStateException("Unknown exports for module: " + module);
275+
}
276+
return exports;
277+
}
278+
227279
@Override
228280
public void visitEnd() {
229281
super.visitEnd();

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBIndexingIT.java

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
*/
99
package org.elasticsearch.datastreams;
1010

11+
import org.elasticsearch.action.ActionFuture;
1112
import org.elasticsearch.action.DocWriteRequest;
1213
import org.elasticsearch.action.admin.indices.diskusage.AnalyzeIndexDiskUsageRequest;
1314
import org.elasticsearch.action.admin.indices.diskusage.TransportAnalyzeIndexDiskUsageAction;
@@ -31,6 +32,7 @@
3132
import org.elasticsearch.action.index.IndexRequest;
3233
import org.elasticsearch.action.search.SearchRequest;
3334
import org.elasticsearch.action.support.WriteRequest;
35+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3436
import org.elasticsearch.cluster.metadata.ComponentTemplate;
3537
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
3638
import org.elasticsearch.cluster.metadata.IndexMetadata;
@@ -65,7 +67,6 @@
6567
import java.util.Map;
6668
import java.util.concurrent.CountDownLatch;
6769

68-
import static org.elasticsearch.datastreams.DataStreamIndexSettingsProvider.INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG;
6970
import static org.elasticsearch.test.MapMatcher.assertMap;
7071
import static org.elasticsearch.test.MapMatcher.matchesMap;
7172
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
@@ -331,10 +332,7 @@ public void testTsdbTemplatesNoKeywordFieldType() throws Exception {
331332
new Template(
332333
Settings.builder()
333334
.put("index.mode", "time_series")
334-
.put(
335-
"index.routing_path",
336-
randomBoolean() && INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG ? null : "metricset"
337-
)
335+
.put("index.routing_path", randomBoolean() ? null : "metricset")
338336
.build(),
339337
new CompressedXContent(mappingTemplate),
340338
null
@@ -664,13 +662,8 @@ public void testAddDimensionToMapping() throws Exception {
664662
"my-ds"
665663
);
666664
assertAcked(client().execute(CreateDataStreamAction.INSTANCE, createDsRequest));
667-
if (INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG) {
668-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), equalTo(List.of("metricset")));
669-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), empty());
670-
} else {
671-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
672-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), equalTo(List.of("metricset")));
673-
}
665+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), equalTo(List.of("metricset")));
666+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), empty());
674667

675668
// put mapping with k8s.pod.uid as another time series dimension
676669
var putMappingRequest = new PutMappingRequest(dataStreamName).source("""
@@ -684,13 +677,8 @@ public void testAddDimensionToMapping() throws Exception {
684677
}
685678
""", XContentType.JSON);
686679
assertAcked(client().execute(TransportPutMappingAction.TYPE, putMappingRequest).actionGet());
687-
if (INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG) {
688-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), containsInAnyOrder("metricset", "k8s.pod.name"));
689-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), empty());
690-
} else {
691-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
692-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), equalTo(List.of("metricset")));
693-
}
680+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), containsInAnyOrder("metricset", "k8s.pod.name"));
681+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), empty());
694682

695683
// put dynamic template defining time series dimensions
696684
// we don't support index.dimensions in that case
@@ -709,17 +697,20 @@ public void testAddDimensionToMapping() throws Exception {
709697
]
710698
}
711699
""", XContentType.JSON);
712-
assertAcked(client().execute(TransportPutMappingAction.TYPE, putMappingRequest).actionGet());
700+
ActionFuture<AcknowledgedResponse> putMappingFuture = client().execute(TransportPutMappingAction.TYPE, putMappingRequest);
713701
if (INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG) {
702+
IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, putMappingFuture::actionGet);
714703
assertThat(
715-
getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH),
716-
containsInAnyOrder("metricset", "labels.*", "k8s.pod.name")
704+
exception.getMessage(),
705+
containsString("Cannot add dynamic templates that define dimension fields on an existing index with index.dimensions")
717706
);
707+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), containsInAnyOrder("metricset", "k8s.pod.name"));
708+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), empty());
718709
} else {
710+
assertAcked(putMappingFuture);
719711
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_ROUTING_PATH), containsInAnyOrder("metricset"));
712+
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
720713
}
721-
assertThat(getSetting(dataStreamName, IndexMetadata.INDEX_DIMENSIONS), empty());
722-
723714
indexWithPodNames(dataStreamName, Instant.now(), Map.of(), "dog", "cat");
724715
}
725716

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/TSDBPassthroughIndexingIT.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import java.util.List;
4545
import java.util.Map;
4646

47-
import static org.elasticsearch.datastreams.DataStreamIndexSettingsProvider.INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG;
4847
import static org.elasticsearch.test.MapMatcher.assertMap;
4948
import static org.elasticsearch.test.MapMatcher.matchesMap;
5049
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@@ -186,13 +185,8 @@ public void testIndexingGettingAndSearching() throws Exception {
186185

187186
// validate index:
188187
var getIndexResponse = client().admin().indices().getIndex(new GetIndexRequest(TEST_REQUEST_TIMEOUT).indices(index)).actionGet();
189-
if (INDEX_DIMENSIONS_TSID_OPTIMIZATION_FEATURE_FLAG) {
190-
assertThat(getIndexResponse.getSettings().get(index).get("index.dimensions"), equalTo("[attributes.*]"));
191-
assertThat(getIndexResponse.getSettings().get(index).get("index.routing_path"), nullValue());
192-
} else {
193-
assertThat(getIndexResponse.getSettings().get(index).get("index.dimensions"), nullValue());
194-
assertThat(getIndexResponse.getSettings().get(index).get("index.routing_path"), equalTo("[attributes.*]"));
195-
}
188+
assertThat(getIndexResponse.getSettings().get(index).get("index.dimensions"), equalTo("[attributes.*]"));
189+
assertThat(getIndexResponse.getSettings().get(index).get("index.routing_path"), nullValue());
196190
// validate mapping
197191
var mapping = getIndexResponse.mappings().get(index).getSourceAsMap();
198192
assertMap(

0 commit comments

Comments
 (0)