Skip to content

Commit 3b8e337

Browse files
authored
Verify Maxmind database types in the geoip processor (#114527) (#114532)
1 parent 1dfce84 commit 3b8e337

File tree

9 files changed

+176
-59
lines changed

9 files changed

+176
-59
lines changed

docs/changelog/114527.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 114527
2+
summary: Verify Maxmind database types in the geoip processor
3+
area: Ingest Node
4+
type: enhancement
5+
issues: []

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/ReloadingDatabasesWhilePerformingGeoLookupsIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.atomic.AtomicInteger;
3333
import java.util.concurrent.atomic.AtomicReference;
3434

35+
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.GEOIP_TYPE;
3536
import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDatabase;
3637
import static org.elasticsearch.ingest.geoip.GeoIpTestUtils.copyDefaultDatabases;
3738
import static org.hamcrest.Matchers.equalTo;
@@ -66,7 +67,7 @@ public void test() throws Exception {
6667
ClusterService clusterService = mock(ClusterService.class);
6768
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
6869
DatabaseNodeService databaseNodeService = createRegistry(geoIpConfigDir, geoIpTmpDir, clusterService);
69-
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(databaseNodeService);
70+
GeoIpProcessor.Factory factory = new GeoIpProcessor.Factory(GEOIP_TYPE, databaseNodeService);
7071
copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));
7172
copyDatabase("GeoLite2-City-Test.mmdb", geoIpTmpDir.resolve("GeoLite2-City-Test.mmdb"));
7273
databaseNodeService.updateDatabase("GeoLite2-City.mmdb", "md5", geoIpTmpDir.resolve("GeoLite2-City.mmdb"));

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
5555
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
5656
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation;
57+
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.GEOIP_TYPE;
5758

5859
/**
5960
* Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node.
@@ -296,9 +297,9 @@ private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor
296297
return false;
297298
}
298299

299-
if (processor.containsKey(GeoIpProcessor.TYPE)) {
300-
Map<String, Object> processorConfig = (Map<String, Object>) processor.get(GeoIpProcessor.TYPE);
301-
return downloadDatabaseOnPipelineCreation(processorConfig) == downloadDatabaseOnPipelineCreation;
300+
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get(GEOIP_TYPE);
301+
if (processorConfig != null) {
302+
return downloadDatabaseOnPipelineCreation(GEOIP_TYPE, processorConfig, null) == downloadDatabaseOnPipelineCreation;
302303
}
303304

304305
return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
@@ -336,11 +337,9 @@ && hasAtLeastOneGeoipProcessor(
336337
*/
337338
@SuppressWarnings("unchecked")
338339
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
339-
return processor.containsKey("foreach")
340-
&& hasAtLeastOneGeoipProcessor(
341-
((Map<String, Map<String, Object>>) processor.get("foreach")).get("processor"),
342-
downloadDatabaseOnPipelineCreation
343-
);
340+
final Map<String, Object> processorConfig = (Map<String, Object>) processor.get("foreach");
341+
return processorConfig != null
342+
&& hasAtLeastOneGeoipProcessor((Map<String, Object>) processorConfig.get("processor"), downloadDatabaseOnPipelineCreation);
344343
}
345344

346345
@UpdateForV9 // use MINUS_ONE once that means no timeout

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpProcessor.java

Lines changed: 64 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.common.logging.DeprecationCategory;
1414
import org.elasticsearch.common.logging.DeprecationLogger;
1515
import org.elasticsearch.core.Assertions;
16+
import org.elasticsearch.core.Strings;
1617
import org.elasticsearch.ingest.AbstractProcessor;
1718
import org.elasticsearch.ingest.IngestDocument;
1819
import org.elasticsearch.ingest.Processor;
@@ -22,6 +23,7 @@
2223
import java.io.IOException;
2324
import java.util.ArrayList;
2425
import java.util.List;
26+
import java.util.Locale;
2527
import java.util.Map;
2628
import java.util.Set;
2729
import java.util.function.Supplier;
@@ -36,9 +38,12 @@ public final class GeoIpProcessor extends AbstractProcessor {
3638
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(GeoIpProcessor.class);
3739
static final String DEFAULT_DATABASES_DEPRECATION_MESSAGE = "the [fallback_to_default_databases] has been deprecated, because "
3840
+ "Elasticsearch no longer includes the default Maxmind geoip databases. This setting will be removed in Elasticsearch 9.0";
41+
static final String UNSUPPORTED_DATABASE_DEPRECATION_MESSAGE = "the geoip processor will no longer support database type [{}] "
42+
+ "in a future version of Elasticsearch"; // TODO add a message about migration?
3943

40-
public static final String TYPE = "geoip";
44+
public static final String GEOIP_TYPE = "geoip";
4145

46+
private final String type;
4247
private final String field;
4348
private final Supplier<Boolean> isValid;
4449
private final String targetField;
@@ -62,6 +67,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
6267
* @param databaseFile the name of the database file being queried; used only for tagging documents if the database is unavailable
6368
*/
6469
GeoIpProcessor(
70+
final String type,
6571
final String tag,
6672
final String description,
6773
final String field,
@@ -74,6 +80,7 @@ public final class GeoIpProcessor extends AbstractProcessor {
7480
final String databaseFile
7581
) {
7682
super(tag, description);
83+
this.type = type;
7784
this.field = field;
7885
this.isValid = isValid;
7986
this.targetField = targetField;
@@ -93,7 +100,7 @@ public IngestDocument execute(IngestDocument document) throws IOException {
93100
Object ip = document.getFieldValue(field, Object.class, ignoreMissing);
94101

95102
if (isValid.get() == false) {
96-
document.appendFieldValue("tags", "_geoip_expired_database", false);
103+
document.appendFieldValue("tags", "_" + type + "_expired_database", false);
97104
return document;
98105
} else if (ip == null && ignoreMissing) {
99106
return document;
@@ -104,7 +111,7 @@ public IngestDocument execute(IngestDocument document) throws IOException {
104111
try (IpDatabase ipDatabase = this.supplier.get()) {
105112
if (ipDatabase == null) {
106113
if (ignoreMissing == false) {
107-
tag(document, databaseFile);
114+
tag(document, type, databaseFile);
108115
}
109116
return document;
110117
}
@@ -146,7 +153,7 @@ public IngestDocument execute(IngestDocument document) throws IOException {
146153

147154
@Override
148155
public String getType() {
149-
return TYPE;
156+
return type;
150157
}
151158

152159
String getField() {
@@ -202,9 +209,11 @@ public IpDatabase get() throws IOException {
202209

203210
public static final class Factory implements Processor.Factory {
204211

212+
private final String type; // currently always just "geoip"
205213
private final IpDatabaseProvider ipDatabaseProvider;
206214

207-
public Factory(IpDatabaseProvider ipDatabaseProvider) {
215+
public Factory(String type, IpDatabaseProvider ipDatabaseProvider) {
216+
this.type = type;
208217
this.ipDatabaseProvider = ipDatabaseProvider;
209218
}
210219

@@ -215,16 +224,16 @@ public Processor create(
215224
final String description,
216225
final Map<String, Object> config
217226
) throws IOException {
218-
String ipField = readStringProperty(TYPE, processorTag, config, "field");
219-
String targetField = readStringProperty(TYPE, processorTag, config, "target_field", "geoip");
220-
String databaseFile = readStringProperty(TYPE, processorTag, config, "database_file", "GeoLite2-City.mmdb");
221-
List<String> propertyNames = readOptionalList(TYPE, processorTag, config, "properties");
222-
boolean ignoreMissing = readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
223-
boolean firstOnly = readBooleanProperty(TYPE, processorTag, config, "first_only", true);
227+
String ipField = readStringProperty(type, processorTag, config, "field");
228+
String targetField = readStringProperty(type, processorTag, config, "target_field", "geoip");
229+
String databaseFile = readStringProperty(type, processorTag, config, "database_file", "GeoLite2-City.mmdb");
230+
List<String> propertyNames = readOptionalList(type, processorTag, config, "properties");
231+
boolean ignoreMissing = readBooleanProperty(type, processorTag, config, "ignore_missing", false);
232+
boolean firstOnly = readBooleanProperty(type, processorTag, config, "first_only", true);
224233

225234
// Validating the download_database_on_pipeline_creation even if the result
226235
// is not used directly by the factory.
227-
downloadDatabaseOnPipelineCreation(config, processorTag);
236+
downloadDatabaseOnPipelineCreation(type, config, processorTag);
228237

229238
// noop, should be removed in 9.0
230239
Object value = config.remove("fallback_to_default_databases");
@@ -239,7 +248,7 @@ public Processor create(
239248
// at a later moment, so a processor impl is returned that tags documents instead. If a database cannot be sourced
240249
// then the processor will continue to tag documents with a warning until it is remediated by providing a database
241250
// or changing the pipeline.
242-
return new DatabaseUnavailableProcessor(processorTag, description, databaseFile);
251+
return new DatabaseUnavailableProcessor(type, processorTag, description, databaseFile);
243252
}
244253
databaseType = ipDatabase.getDatabaseType();
245254
}
@@ -248,17 +257,48 @@ public Processor create(
248257
try {
249258
factory = IpDataLookupFactories.get(databaseType, databaseFile);
250259
} catch (IllegalArgumentException e) {
251-
throw newConfigurationException(TYPE, processorTag, "database_file", e.getMessage());
260+
throw newConfigurationException(type, processorTag, "database_file", e.getMessage());
261+
}
262+
263+
// the "geoip" processor type does additional validation of the database_type
264+
if (GEOIP_TYPE.equals(type)) {
265+
// type sniffing is done with the lowercased type
266+
final String lowerCaseDatabaseType = databaseType.toLowerCase(Locale.ROOT);
267+
268+
// start with a strict positive rejection check -- as we support addition database providers,
269+
// we should expand these checks when possible
270+
if (lowerCaseDatabaseType.startsWith(IpinfoIpDataLookups.IPINFO_PREFIX)) {
271+
throw newConfigurationException(
272+
type,
273+
processorTag,
274+
"database_file",
275+
Strings.format("Unsupported database type [%s] for file [%s]", databaseType, databaseFile)
276+
);
277+
}
278+
279+
// end with a lax negative rejection check -- if we aren't *certain* it's a maxmind database, then we'll warn --
280+
// it's possible for example that somebody cooked up a custom database of their own that happened to work with
281+
// our preexisting code, they should migrate to the new processor, but we're not going to break them right now
282+
if (lowerCaseDatabaseType.startsWith(MaxmindIpDataLookups.GEOIP2_PREFIX) == false
283+
&& lowerCaseDatabaseType.startsWith(MaxmindIpDataLookups.GEOLITE2_PREFIX) == false) {
284+
deprecationLogger.warn(
285+
DeprecationCategory.OTHER,
286+
"unsupported_database_type",
287+
UNSUPPORTED_DATABASE_DEPRECATION_MESSAGE,
288+
databaseType
289+
);
290+
}
252291
}
253292

254293
final IpDataLookup ipDataLookup;
255294
try {
256295
ipDataLookup = factory.create(propertyNames);
257296
} catch (IllegalArgumentException e) {
258-
throw newConfigurationException(TYPE, processorTag, "properties", e.getMessage());
297+
throw newConfigurationException(type, processorTag, "properties", e.getMessage());
259298
}
260299

261300
return new GeoIpProcessor(
301+
type,
262302
processorTag,
263303
description,
264304
ipField,
@@ -272,42 +312,39 @@ public Processor create(
272312
);
273313
}
274314

275-
public static boolean downloadDatabaseOnPipelineCreation(Map<String, Object> config) {
276-
return downloadDatabaseOnPipelineCreation(config, null);
277-
}
278-
279-
public static boolean downloadDatabaseOnPipelineCreation(Map<String, Object> config, String processorTag) {
280-
return readBooleanProperty(GeoIpProcessor.TYPE, processorTag, config, "download_database_on_pipeline_creation", true);
315+
public static boolean downloadDatabaseOnPipelineCreation(String type, Map<String, Object> config, String processorTag) {
316+
return readBooleanProperty(type, processorTag, config, "download_database_on_pipeline_creation", true);
281317
}
282-
283318
}
284319

285320
static class DatabaseUnavailableProcessor extends AbstractProcessor {
286321

322+
private final String type;
287323
private final String databaseName;
288324

289-
DatabaseUnavailableProcessor(String tag, String description, String databaseName) {
325+
DatabaseUnavailableProcessor(String type, String tag, String description, String databaseName) {
290326
super(tag, description);
327+
this.type = type;
291328
this.databaseName = databaseName;
292329
}
293330

294331
@Override
295332
public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
296-
tag(ingestDocument, databaseName);
333+
tag(ingestDocument, this.type, databaseName);
297334
return ingestDocument;
298335
}
299336

300337
@Override
301338
public String getType() {
302-
return TYPE;
339+
return type;
303340
}
304341

305342
public String getDatabaseName() {
306343
return databaseName;
307344
}
308345
}
309346

310-
private static void tag(IngestDocument ingestDocument, String databaseName) {
311-
ingestDocument.appendFieldValue("tags", "_geoip_database_unavailable_" + databaseName, true);
347+
private static void tag(IngestDocument ingestDocument, String type, String databaseName) {
348+
ingestDocument.appendFieldValue("tags", "_" + type + "_database_unavailable_" + databaseName, true);
312349
}
313350
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IngestGeoIpPlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
129129
parameters.ingestService.getClusterService()
130130
);
131131
databaseRegistry.set(registry);
132-
return Map.of(GeoIpProcessor.TYPE, new GeoIpProcessor.Factory(registry));
132+
return Map.of(GeoIpProcessor.GEOIP_TYPE, new GeoIpProcessor.Factory(GeoIpProcessor.GEOIP_TYPE, registry));
133133
}
134134

135135
@Override

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/IpinfoIpDataLookups.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ private IpinfoIpDataLookups() {
3939

4040
private static final Logger logger = LogManager.getLogger(IpinfoIpDataLookups.class);
4141

42+
// the actual prefix from the metadata is cased like the literal string, and
43+
// prefix dispatch and checks case-insensitive, so that works out nicely
44+
static final String IPINFO_PREFIX = "ipinfo";
45+
4246
/**
4347
* Lax-ly parses a string that (ideally) looks like 'AS123' into a Long like 123L (or null, if such parsing isn't possible).
4448
* @param asn a potentially empty (or null) ASN string that is expected to contain 'AS' and then a parsable long

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/MaxmindIpDataLookups.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.net.InetAddress;
3535
import java.util.HashMap;
3636
import java.util.List;
37+
import java.util.Locale;
3738
import java.util.Map;
3839
import java.util.Set;
3940

@@ -46,6 +47,11 @@ private MaxmindIpDataLookups() {
4647
// utility class
4748
}
4849

50+
// the actual prefixes from the metadata are cased like the literal strings, but
51+
// prefix dispatch and checks case-insensitive, so the actual constants are lowercase
52+
static final String GEOIP2_PREFIX = "GeoIP2".toLowerCase(Locale.ROOT);
53+
static final String GEOLITE2_PREFIX = "GeoLite2".toLowerCase(Locale.ROOT);
54+
4955
static class AnonymousIp extends AbstractBase<AnonymousIpResponse> {
5056
AnonymousIp(final Set<Database.Property> properties) {
5157
super(

0 commit comments

Comments
 (0)