Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.common.hash.MessageDigests;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.xcontent.XContentType;

import java.io.BufferedInputStream;
Expand All @@ -40,6 +42,10 @@
import static org.hamcrest.Matchers.equalTo;

public class DatabaseNodeServiceIT extends AbstractGeoIpIT {

@FixForMultiProject(description = "Use random project ID after ESIntegTestCase is MP enabled")
private final ProjectId projectId = ProjectId.DEFAULT;

/*
* This test makes sure that if we index an ordinary mmdb file into the .geoip_databases index, it is correctly handled upon retrieval.
*/
Expand All @@ -50,15 +56,15 @@ public void testNonGzippedDatabase() throws Exception {
String databaseName = randomAlphaOfLength(20) + "-" + databaseFileName;
byte[] mmdbBytes = getBytesForFile(databaseFileName);
final DatabaseNodeService databaseNodeService = internalCluster().getInstance(DatabaseNodeService.class);
assertNull(databaseNodeService.getDatabase(databaseName));
assertNull(databaseNodeService.getDatabase(projectId, databaseName));
int numChunks = indexData(databaseName, mmdbBytes);
/*
* If DatabaseNodeService::checkDatabases runs it will sometimes (rarely) remove the database we are using in this test while we
* are trying to assert things about it. So if it does then we 'just' try again.
*/
assertBusy(() -> {
retrieveDatabase(databaseNodeService, databaseName, mmdbBytes, numChunks);
assertNotNull(databaseNodeService.getDatabase(databaseName));
assertNotNull(databaseNodeService.getDatabase(projectId, databaseName));
assertValidDatabase(databaseNodeService, databaseName, databaseType);
});
}
Expand All @@ -75,15 +81,15 @@ public void testGzippedDatabase() throws Exception {
byte[] mmdbBytes = getBytesForFile(databaseFileName);
byte[] gzipBytes = gzipFileBytes(databaseName, mmdbBytes);
final DatabaseNodeService databaseNodeService = internalCluster().getInstance(DatabaseNodeService.class);
assertNull(databaseNodeService.getDatabase(databaseName));
assertNull(databaseNodeService.getDatabase(projectId, databaseName));
int numChunks = indexData(databaseName, gzipBytes);
/*
* If DatabaseNodeService::checkDatabases runs it will sometimes (rarely) remove the database we are using in this test while we
* are trying to assert things about it. So if it does then we 'just' try again.
*/
assertBusy(() -> {
retrieveDatabase(databaseNodeService, databaseName, gzipBytes, numChunks);
assertNotNull(databaseNodeService.getDatabase(databaseName));
assertNotNull(databaseNodeService.getDatabase(projectId, databaseName));
assertValidDatabase(databaseNodeService, databaseName, databaseType);
});
}
Expand All @@ -93,7 +99,7 @@ public void testGzippedDatabase() throws Exception {
*/
private void assertValidDatabase(DatabaseNodeService databaseNodeService, String databaseFileName, String databaseType)
throws IOException {
IpDatabase database = databaseNodeService.getDatabase(databaseFileName);
IpDatabase database = databaseNodeService.getDatabase(projectId, databaseFileName);
assertNotNull(database);
assertThat(database.getDatabaseType(), equalTo(databaseType));
CountryResponse countryResponse = database.getResponse("89.160.20.128", GeoIpTestUtils::getCountry);
Expand All @@ -110,7 +116,7 @@ private void assertValidDatabase(DatabaseNodeService databaseNodeService, String
private void retrieveDatabase(DatabaseNodeService databaseNodeService, String databaseFileName, byte[] expectedBytes, int numChunks)
throws IOException {
GeoIpTaskState.Metadata metadata = new GeoIpTaskState.Metadata(1, 0, numChunks - 1, getMd5(expectedBytes), 1);
databaseNodeService.retrieveAndUpdateDatabase(databaseFileName, metadata);
databaseNodeService.retrieveAndUpdateDatabase(projectId, databaseFileName, metadata);
}

private String getMd5(byte[] bytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@

import org.apache.lucene.tests.util.LuceneTestCase;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.project.TestProjectResolvers;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.IOUtils;
Expand All @@ -32,6 +36,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.cluster.ClusterState.builder;
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.GEOIP_TYPE;
import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDatabase;
import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDefaultDatabases;
Expand Down Expand Up @@ -62,31 +67,39 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
* geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance
*/
public void test() throws Exception {
ProjectId projectId = randomProjectIdOrDefault();
Path geoIpConfigDir = createTempDir();
Path geoIpTmpDir = createTempDir();
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
DatabaseNodeService databaseNodeService = createRegistry(geoIpConfigDir, geoIpTmpDir, clusterService);
when(clusterService.state()).thenReturn(
builder(ClusterName.DEFAULT).putProjectMetadata(ProjectMetadata.builder(projectId).build()).build()
);
DatabaseNodeService databaseNodeService = createRegistry(
geoIpConfigDir,
geoIpTmpDir,
clusterService,
TestProjectResolvers.singleProject(projectId)
);
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService);
copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(databaseNodeService);
databaseNodeService.updateDatabase(projectId, "GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase(projectId, "GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
lazyLoadReaders(projectId, databaseNodeService);

final GeoIpProcessor processor1 = (GeoIpProcessor) factory.create(
null,
"_tag",
null,
new HashMap<>(Map.of("field", "_field")),
null
projectId
);
final GeoIpProcessor processor2 = (GeoIpProcessor) factory.create(
null,
"_tag",
null,
new HashMap<>(Map.of("field", "_field", "database_file", "GeoLite2-City-Test.mmdb")),
null
projectId
);

final AtomicBoolean completed = new AtomicBoolean(false);
Expand Down Expand Up @@ -134,9 +147,9 @@ public void test() throws Exception {
Thread updateDatabaseThread = new Thread(() -> {
for (int i = 0; i < numberOfDatabaseUpdates; i++) {
try {
DatabaseReaderLazyLoader previous1 = databaseNodeService.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader previous1 = databaseNodeService.get(projectId, "GeoLite2-City.mmdb");
if (Files.exists(geoIpTmpDir.resolve("GeoLite2-City.mmdb"))) {
databaseNodeService.removeStaleEntries(List.of("GeoLite2-City.mmdb"));
databaseNodeService.removeStaleEntries(projectId, List.of("GeoLite2-City.mmdb"));
assertBusy(() -> {
// lazy loader may still be in use by an ingest thread,
// wait for any potential ingest thread to release the lazy loader (DatabaseReaderLazyLoader#postLookup(...)),
Expand All @@ -146,22 +159,32 @@ public void test() throws Exception {
});
} else {
copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
databaseNodeService.updateDatabase(
projectId,
"GeoLite2-City.mmdb",
"md5",
geoIpTmpDir.resolve("GeoLite2-City.mmdb")
);
}
DatabaseReaderLazyLoader previous2 = databaseNodeService.get("GeoLite2-City-Test.mmdb");
DatabaseReaderLazyLoader previous2 = databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb");
copyDatabase(
i % 2 == 0 ? "GeoIP2-City-Test.mmdb" : "GeoLite2-City-Test.mmdb",
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")
);
databaseNodeService.updateDatabase("GeoLite2-City-Test.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
databaseNodeService.updateDatabase(
projectId,
"GeoLite2-City-Test.mmdb",
"md5",
geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb")
);

DatabaseReaderLazyLoader current1 = databaseNodeService.get("GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseNodeService.get("GeoLite2-City-Test.mmdb");
DatabaseReaderLazyLoader current1 = databaseNodeService.get(projectId, "GeoLite2-City.mmdb");
DatabaseReaderLazyLoader current2 = databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb");
assertThat(current1, not(sameInstance(previous1)));
assertThat(current2, not(sameInstance(previous2)));

// lazy load type and reader:
lazyLoadReaders(databaseNodeService);
lazyLoadReaders(projectId, databaseNodeService);
} catch (Exception | AssertionError e) {
logger.error("error in update databases thread after run [" + i + "]", e);
failureHolder2.set(e);
Expand Down Expand Up @@ -193,8 +216,12 @@ public void test() throws Exception {
IOUtils.rm(geoIpConfigDir, geoIpTmpDir);
}

private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoIpTmpDir, ClusterService clusterService)
throws IOException {
private static DatabaseNodeService createRegistry(
Path geoIpConfigDir,
Path geoIpTmpDir,
ClusterService clusterService,
ProjectResolver projectResolver
) throws IOException {
GeoIpCache cache = new GeoIpCache(0);
ConfigDatabases configDatabases = new ConfigDatabases(geoIpConfigDir, cache);
copyDefaultDatabases(geoIpConfigDir, configDatabases);
Expand All @@ -204,19 +231,21 @@ private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoI
cache,
configDatabases,
Runnable::run,
clusterService
clusterService,
mock(IngestService.class),
projectResolver
);
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class), mock(IngestService.class));
databaseNodeService.initialize("nodeId", mock(ResourceWatcherService.class));
return databaseNodeService;
}

private static void lazyLoadReaders(DatabaseNodeService databaseNodeService) throws IOException {
if (databaseNodeService.get("GeoLite2-City.mmdb") != null) {
databaseNodeService.get("GeoLite2-City.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
private static void lazyLoadReaders(ProjectId projectId, DatabaseNodeService databaseNodeService) throws IOException {
if (databaseNodeService.get(projectId, "GeoLite2-City.mmdb") != null) {
databaseNodeService.get(projectId, "GeoLite2-City.mmdb").getDatabaseType();
databaseNodeService.get(projectId, "GeoLite2-City.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
}
databaseNodeService.get("GeoLite2-City-Test.mmdb").getDatabaseType();
databaseNodeService.get("GeoLite2-City-Test.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb").getDatabaseType();
databaseNodeService.get(projectId, "GeoLite2-City-Test.mmdb").getResponse("2.125.160.216", GeoIpTestUtils::getCity);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.metadata.ProjectId;
import org.elasticsearch.core.FixForMultiProject;
import org.elasticsearch.env.Environment;
import org.elasticsearch.watcher.FileChangesListener;
import org.elasticsearch.watcher.FileWatcher;
Expand Down Expand Up @@ -69,12 +71,13 @@ Map<String, DatabaseReaderLazyLoader> getConfigDatabases() {
return configDatabases;
}

@FixForMultiProject(description = "Replace DEFAULT project")
void updateDatabase(Path file, boolean update) {
String databaseFileName = file.getFileName().toString();
try {
if (update) {
logger.info("database file changed [{}], reloading database...", file);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, file, null);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, file, null);
DatabaseReaderLazyLoader existing = configDatabases.put(databaseFileName, loader);
if (existing != null) {
existing.shutdown();
Expand All @@ -90,6 +93,7 @@ void updateDatabase(Path file, boolean update) {
}
}

@FixForMultiProject(description = "Replace DEFAULT project")
Map<String, DatabaseReaderLazyLoader> initConfigDatabases() throws IOException {
Map<String, DatabaseReaderLazyLoader> databases = new HashMap<>();

Expand All @@ -103,7 +107,7 @@ Map<String, DatabaseReaderLazyLoader> initConfigDatabases() throws IOException {
if (Files.isRegularFile(databasePath) && pathMatcher.matches(databasePath)) {
assert Files.exists(databasePath);
String databaseFileName = databasePath.getFileName().toString();
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(cache, databasePath, null);
DatabaseReaderLazyLoader loader = new DatabaseReaderLazyLoader(ProjectId.DEFAULT, cache, databasePath, null);
databases.put(databaseFileName, loader);
}
}
Expand Down
Loading