Skip to content

Commit dd1d157

Browse files
authored
Enable analytics geoip in behavioral analytics. (#96624)
* When using a managed pipeline GeoIpDownloader is triggered only when an index exists for the pipeline. * When using a managed pipeline GeoIpDownloader is triggered only when an index exists for the pipeline. * Adding the geoip processor back * Adding tags to the events mapping. * Fix a forbidden API call into tests. * lint * Adding an integration tests for managed pipelines. * lint * Add a geoip_database_lazy_download param to pipelines and use it instead of managed. * Fix a edge case: pipeline can be set after index is created. * lint. * Update docs/changelog/96624.yaml * Update 96624.yaml * Uses a processor setting (download_database_on_pipeline_creation) to decide database download strategy. * Removing debug instruction. * Improved documentation. * Improved the way to check for referenced pipelines. * Fixing an error in test. * Improved integration tests. * Lint. * Fix failing tests. * Fix failing tests (2). * Adding javadoc. * lint javadoc. * Using a set instead of a list to store checked pipelines.
1 parent 163fa8c commit dd1d157

File tree

10 files changed

+275
-68
lines changed

10 files changed

+275
-68
lines changed

docs/changelog/96624.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 96624
2+
summary: Enable analytics geoip in behavioral analytics
3+
area: "Application"
4+
type: feature
5+
issues: []

docs/reference/ingest/processors/geoip.asciidoc

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ CC BY-SA 4.0 license. It automatically downloads these databases if your nodes c
1414

1515
* `ingest.geoip.downloader.eager.download` is set to true
1616
* your cluster has at least one pipeline with a `geoip` processor
17-
17+
1818
{es} automatically downloads updates for these databases from the Elastic GeoIP endpoint:
1919
https://geoip.elastic.co/v1/database. To get download statistics for these
2020
updates, use the <<geoip-stats-api,GeoIP stats API>>.
@@ -33,13 +33,14 @@ field instead.
3333
.`geoip` options
3434
[options="header"]
3535
|======
36-
| Name | Required | Default | Description
37-
| `field` | yes | - | The field to get the ip address from for the geographical lookup.
38-
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the MaxMind database.
39-
| `database_file` | no | GeoLite2-City.mmdb | The database filename referring to a database the module ships with (GeoLite2-City.mmdb, GeoLite2-Country.mmdb, or GeoLite2-ASN.mmdb) or a custom database in the `ingest-geoip` config directory.
40-
| `properties` | no | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
41-
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
42-
| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array
36+
| Name | Required | Default | Description
37+
| `field` | yes | - | The field to get the ip address from for the geographical lookup.
38+
| `target_field` | no | geoip | The field that will hold the geographical information looked up from the MaxMind database.
39+
| `database_file` | no | GeoLite2-City.mmdb | The database filename referring to a database the module ships with (GeoLite2-City.mmdb, GeoLite2-Country.mmdb, or GeoLite2-ASN.mmdb) or a custom database in the `ingest-geoip` config directory.
40+
| `properties` | no | [`continent_name`, `country_iso_code`, `country_name`, `region_iso_code`, `region_name`, `city_name`, `location`] * | Controls what properties are added to the `target_field` based on the geoip lookup.
41+
| `ignore_missing` | no | `false` | If `true` and `field` does not exist, the processor quietly exits without modifying the document
42+
| `first_only` | no | `true` | If `true` only first found geoip data will be returned, even if `field` contains array
43+
| `download_database_on_pipeline_creation` | no | `true` | If `true` (and if `ingest.geoip.downloader.eager.download` is `false`), the missing database is downloaded when the pipeline is created. Else, the download is triggered by when the pipeline is used as the `default_pipeline` or `final_pipeline` in an index.
4344
|======
4445

4546
*Depends on what is available in `database_file`:

modules/ingest-geoip/src/internalClusterTest/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderIT.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import org.elasticsearch.client.internal.Client;
1919
import org.elasticsearch.cluster.node.DiscoveryNode;
2020
import org.elasticsearch.common.bytes.BytesReference;
21+
import org.elasticsearch.common.settings.Setting;
2122
import org.elasticsearch.common.settings.Settings;
2223
import org.elasticsearch.core.IOUtils;
2324
import org.elasticsearch.core.SuppressForbidden;
2425
import org.elasticsearch.core.TimeValue;
2526
import org.elasticsearch.env.Environment;
27+
import org.elasticsearch.index.IndexSettings;
2628
import org.elasticsearch.index.query.BoolQueryBuilder;
2729
import org.elasticsearch.index.query.MatchQueryBuilder;
2830
import org.elasticsearch.index.query.RangeQueryBuilder;
@@ -150,6 +152,7 @@ public void cleanUp() throws Exception {
150152
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/75221")
151153
public void testInvalidTimestamp() throws Exception {
152154
assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
155+
setupDatabasesInConfigDirectory();
153156
putGeoIpPipeline();
154157
updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
155158
assertBusy(() -> {
@@ -283,6 +286,42 @@ public void testGeoIpDatabasesDownloadNoGeoipProcessors() throws Exception {
283286
}, 2, TimeUnit.MINUTES);
284287
}
285288

289+
public void testDoNotDownloadDatabaseOnPipelineCreation() throws Exception {
290+
assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
291+
String pipelineId = randomIdentifier();
292+
293+
// Removing databases from tmp dir. So we can test the downloader.
294+
deleteDatabasesInConfigDirectory();
295+
296+
// Enabling the downloader.
297+
putGeoIpPipeline("_id", false);
298+
updateClusterSettings(Settings.builder().put(GeoIpDownloaderTaskExecutor.ENABLED_SETTING.getKey(), true));
299+
assertBusy(() -> assertNotNull(getTask()));
300+
301+
// Creating a pipeline containing a geo processor with lazy download enable.
302+
// Download should not be triggered and task state should stay null.
303+
putGeoIpPipeline(pipelineId, false);
304+
assertNull(getTask().getState());
305+
306+
// Creating an index which does not reference the pipeline should not trigger the database download.
307+
String indexIdentifier = randomIdentifier();
308+
assertAcked(client().admin().indices().prepareCreate(indexIdentifier).get());
309+
assertNull(getTask().getState());
310+
311+
// Set the pipeline as default_pipeline or final_pipeline for the index.
312+
// This should trigger the database download.
313+
Setting<String> pipelineSetting = randomFrom(IndexSettings.FINAL_PIPELINE, IndexSettings.DEFAULT_PIPELINE);
314+
Settings indexSettings = Settings.builder().put(pipelineSetting.getKey(), pipelineId).build();
315+
assertAcked(client().admin().indices().prepareUpdateSettings(indexIdentifier).setSettings(indexSettings).get());
316+
assertBusy(() -> {
317+
GeoIpTaskState state = getGeoIpTaskState();
318+
assertEquals(Set.of("GeoLite2-ASN.mmdb", "GeoLite2-City.mmdb", "GeoLite2-Country.mmdb"), state.getDatabases().keySet());
319+
}, 2, TimeUnit.MINUTES);
320+
321+
// Remove the created index.
322+
assertAcked(client().admin().indices().prepareDelete(indexIdentifier).get());
323+
}
324+
286325
@TestLogging(value = "org.elasticsearch.ingest.geoip:TRACE", reason = "https://github.com/elastic/elasticsearch/issues/69972")
287326
public void testUseGeoIpProcessorWithDownloadedDBs() throws Exception {
288327
assumeTrue("only test with fixture to have stable results", getEndpoint() != null);
@@ -450,6 +489,17 @@ private void putGeoIpPipeline() throws IOException {
450489
* @throws IOException
451490
*/
452491
private void putGeoIpPipeline(String pipelineId) throws IOException {
492+
putGeoIpPipeline(pipelineId, true);
493+
}
494+
495+
/**
496+
* This creates a pipeline named pipelineId with a geoip processor, which ought to cause the geoip downloader to begin (assuming it is
497+
* enabled).
498+
* @param pipelineId The name of the new pipeline with a geoip processor
499+
* @param downloadDatabaseOnPipelineCreation Indicates whether the pipeline creation should trigger database download or not.
500+
* @throws IOException
501+
*/
502+
private void putGeoIpPipeline(String pipelineId, boolean downloadDatabaseOnPipelineCreation) throws IOException {
453503
BytesReference bytes;
454504
try (XContentBuilder builder = JsonXContent.contentBuilder()) {
455505
builder.startObject();
@@ -463,6 +513,9 @@ private void putGeoIpPipeline(String pipelineId) throws IOException {
463513
builder.field("field", "ip");
464514
builder.field("target_field", "ip-city");
465515
builder.field("database_file", "GeoLite2-City.mmdb");
516+
if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
517+
builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
518+
}
466519
}
467520
builder.endObject();
468521
}
@@ -474,6 +527,9 @@ private void putGeoIpPipeline(String pipelineId) throws IOException {
474527
builder.field("field", "ip");
475528
builder.field("target_field", "ip-country");
476529
builder.field("database_file", "GeoLite2-Country.mmdb");
530+
if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
531+
builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
532+
}
477533
}
478534
builder.endObject();
479535
}
@@ -485,6 +541,9 @@ private void putGeoIpPipeline(String pipelineId) throws IOException {
485541
builder.field("field", "ip");
486542
builder.field("target_field", "ip-asn");
487543
builder.field("database_file", "GeoLite2-ASN.mmdb");
544+
if (downloadDatabaseOnPipelineCreation == false || randomBoolean()) {
545+
builder.field("download_database_on_pipeline_creation", downloadDatabaseOnPipelineCreation);
546+
}
488547
}
489548
builder.endObject();
490549
}

modules/ingest-geoip/src/main/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTaskExecutor.java

Lines changed: 99 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.core.TimeValue;
2828
import org.elasticsearch.gateway.GatewayService;
2929
import org.elasticsearch.index.Index;
30+
import org.elasticsearch.index.IndexSettings;
3031
import org.elasticsearch.ingest.IngestMetadata;
3132
import org.elasticsearch.ingest.IngestService;
3233
import org.elasticsearch.ingest.Pipeline;
@@ -43,11 +44,14 @@
4344
import java.util.List;
4445
import java.util.Map;
4546
import java.util.Objects;
47+
import java.util.Set;
4648
import java.util.concurrent.atomic.AtomicBoolean;
4749
import java.util.concurrent.atomic.AtomicReference;
50+
import java.util.stream.Collectors;
4851

4952
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.DATABASES_INDEX;
5053
import static org.elasticsearch.ingest.geoip.GeoIpDownloader.GEOIP_DOWNLOADER;
54+
import static org.elasticsearch.ingest.geoip.GeoIpProcessor.Factory.downloadDatabaseOnPipelineCreation;
5155

5256
/**
5357
* Persistent task executor that is responsible for starting {@link GeoIpDownloader} after task is allocated by master node.
@@ -207,7 +211,14 @@ public void clusterChanged(ClusterChangedEvent event) {
207211
}
208212
}
209213

210-
if (event.metadataChanged() && event.changedCustomMetadataSet().contains(IngestMetadata.TYPE)) {
214+
if (event.metadataChanged() == false) {
215+
return;
216+
}
217+
218+
boolean hasIndicesChanges = event.previousState().metadata().indices().equals(event.state().metadata().indices()) == false;
219+
boolean hasIngestPipelineChanges = event.changedCustomMetadataSet().contains(IngestMetadata.TYPE);
220+
221+
if (hasIngestPipelineChanges || hasIndicesChanges) {
211222
boolean newAtLeastOneGeoipProcessor = hasAtLeastOneGeoipProcessor(event.state());
212223
if (newAtLeastOneGeoipProcessor && atLeastOneGeoipProcessor == false) {
213224
atLeastOneGeoipProcessor = true;
@@ -222,41 +233,112 @@ public void clusterChanged(ClusterChangedEvent event) {
222233
}
223234
}
224235

225-
@SuppressWarnings("unchecked")
226236
static boolean hasAtLeastOneGeoipProcessor(ClusterState clusterState) {
227-
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
228-
return pipelineDefinitions.stream().anyMatch(pipelineDefinition -> {
229-
Map<String, Object> pipelineMap = pipelineDefinition.getConfigAsMap();
230-
return hasAtLeastOneGeoipProcessor((List<Map<String, Object>>) pipelineMap.get(Pipeline.PROCESSORS_KEY));
237+
if (pipelineConfigurationsWithGeoIpProcessor(clusterState, true).isEmpty() == false) {
238+
return true;
239+
}
240+
241+
Set<String> checkReferencedPipelines = pipelineConfigurationsWithGeoIpProcessor(clusterState, false).stream()
242+
.map(PipelineConfiguration::getId)
243+
.collect(Collectors.toSet());
244+
245+
if (checkReferencedPipelines.isEmpty()) {
246+
return false;
247+
}
248+
249+
return clusterState.getMetadata().indices().values().stream().anyMatch(indexMetadata -> {
250+
String defaultPipeline = IndexSettings.DEFAULT_PIPELINE.get(indexMetadata.getSettings());
251+
String finalPipeline = IndexSettings.FINAL_PIPELINE.get(indexMetadata.getSettings());
252+
return checkReferencedPipelines.contains(defaultPipeline) || checkReferencedPipelines.contains(finalPipeline);
231253
});
232254
}
233255

234-
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors) {
235-
return processors != null && processors.stream().anyMatch(GeoIpDownloaderTaskExecutor::hasAtLeastOneGeoipProcessor);
256+
/**
257+
* Retrieve list of pipelines that have at least one geoip processor.
258+
* @param clusterState Cluster state.
259+
* @param downloadDatabaseOnPipelineCreation Filter the list to include only pipeline with the download_database_on_pipeline_creation
260+
* matching the param.
261+
* @return A list of {@link PipelineConfiguration} matching criteria.
262+
*/
263+
@SuppressWarnings("unchecked")
264+
private static List<PipelineConfiguration> pipelineConfigurationsWithGeoIpProcessor(
265+
ClusterState clusterState,
266+
boolean downloadDatabaseOnPipelineCreation
267+
) {
268+
List<PipelineConfiguration> pipelineDefinitions = IngestService.getPipelines(clusterState);
269+
return pipelineDefinitions.stream().filter(pipelineConfig -> {
270+
List<Map<String, Object>> processors = (List<Map<String, Object>>) pipelineConfig.getConfigAsMap().get(Pipeline.PROCESSORS_KEY);
271+
return hasAtLeastOneGeoipProcessor(processors, downloadDatabaseOnPipelineCreation);
272+
}).collect(Collectors.toList());
236273
}
237274

238-
private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor) {
239-
return processor != null
240-
&& (processor.containsKey(GeoIpProcessor.TYPE)
241-
|| isProcessorWithOnFailureGeoIpProcessor(processor)
242-
|| isForeachProcessorWithGeoipProcessor(processor));
275+
/**
276+
* Check if a list of processor contains at least a geoip processor.
277+
* @param processors List of processors.
278+
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
279+
* @return true if a geoip processor is found in the processor list.
280+
*/
281+
private static boolean hasAtLeastOneGeoipProcessor(List<Map<String, Object>> processors, boolean downloadDatabaseOnPipelineCreation) {
282+
return processors != null && processors.stream().anyMatch(p -> hasAtLeastOneGeoipProcessor(p, downloadDatabaseOnPipelineCreation));
283+
}
284+
285+
/**
286+
* Check if a processor config is a geoip processor or contains at least a geoip processor.
287+
* @param processor Processor config.
288+
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
289+
* @return true if a geoip processor is found in the processor list.
290+
*/
291+
@SuppressWarnings("unchecked")
292+
private static boolean hasAtLeastOneGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
293+
if (processor == null) {
294+
return false;
295+
}
296+
297+
if (processor.containsKey(GeoIpProcessor.TYPE)) {
298+
Map<String, Object> processorConfig = (Map<String, Object>) processor.get(GeoIpProcessor.TYPE);
299+
return downloadDatabaseOnPipelineCreation(processorConfig) == downloadDatabaseOnPipelineCreation;
300+
}
301+
302+
return isProcessorWithOnFailureGeoIpProcessor(processor, downloadDatabaseOnPipelineCreation)
303+
|| isForeachProcessorWithGeoipProcessor(processor, downloadDatabaseOnPipelineCreation);
243304
}
244305

306+
/**
307+
* Check if a processor config is has an on_failure clause containing at least a geoip processor.
308+
* @param processor Processor config.
309+
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
310+
* @return true if a geoip processor is found in the processor list.
311+
*/
245312
@SuppressWarnings("unchecked")
246-
private static boolean isProcessorWithOnFailureGeoIpProcessor(Map<String, Object> processor) {
313+
private static boolean isProcessorWithOnFailureGeoIpProcessor(
314+
Map<String, Object> processor,
315+
boolean downloadDatabaseOnPipelineCreation
316+
) {
247317
return processor != null
248318
&& processor.values()
249319
.stream()
250320
.anyMatch(
251321
value -> value instanceof Map
252-
&& hasAtLeastOneGeoipProcessor(((Map<String, List<Map<String, Object>>>) value).get("on_failure"))
322+
&& hasAtLeastOneGeoipProcessor(
323+
((Map<String, List<Map<String, Object>>>) value).get("on_failure"),
324+
downloadDatabaseOnPipelineCreation
325+
)
253326
);
254327
}
255328

329+
/**
330+
* Check if a processor is a foreach processor containing at least a geoip processor.
331+
* @param processor Processor config.
332+
* @param downloadDatabaseOnPipelineCreation Should the download_database_on_pipeline_creation of the geoip processor be true or false.
333+
* @return true if a geoip processor is found in the processor list.
334+
*/
256335
@SuppressWarnings("unchecked")
257-
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor) {
336+
private static boolean isForeachProcessorWithGeoipProcessor(Map<String, Object> processor, boolean downloadDatabaseOnPipelineCreation) {
258337
return processor.containsKey("foreach")
259-
&& hasAtLeastOneGeoipProcessor(((Map<String, Map<String, Object>>) processor.get("foreach")).get("processor"));
338+
&& hasAtLeastOneGeoipProcessor(
339+
((Map<String, Map<String, Object>>) processor.get("foreach")).get("processor"),
340+
downloadDatabaseOnPipelineCreation
341+
);
260342
}
261343

262344
private void startTask(Runnable onFailure) {

0 commit comments

Comments
 (0)