2222import org .elasticsearch .action .index .IndexRequest ;
2323import org .elasticsearch .action .search .SearchRequest ;
2424import org .elasticsearch .action .search .SearchResponse ;
25+ import org .elasticsearch .common .Strings ;
2526import org .elasticsearch .common .settings .MockSecureSettings ;
2627import org .elasticsearch .common .settings .Settings ;
2728import org .elasticsearch .common .util .CollectionUtils ;
4041
4142import java .io .IOException ;
4243import java .util .Collection ;
44+ import java .util .List ;
4345import java .util .Map ;
4446
4547import static org .elasticsearch .ingest .EnterpriseGeoIpTask .ENTERPRISE_GEOIP_DOWNLOADER ;
48+ import static org .elasticsearch .ingest .geoip .EnterpriseGeoIpDownloaderTaskExecutor .IPINFO_TOKEN_SETTING ;
4649import static org .elasticsearch .ingest .geoip .EnterpriseGeoIpDownloaderTaskExecutor .MAXMIND_LICENSE_KEY_SETTING ;
4750import static org .hamcrest .Matchers .equalTo ;
4851
4952public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {
5053
51- private static final String DATABASE_TYPE = "GeoIP2-City" ;
54+ private static final String MAXMIND_DATABASE_TYPE = "GeoIP2-City" ;
55+ private static final String IPINFO_DATABASE_TYPE = "asn" ;
5256
5357 @ ClassRule
54- public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture (DATABASE_TYPE );
58+ public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture (
59+ List .of (MAXMIND_DATABASE_TYPE ),
60+ List .of (IPINFO_DATABASE_TYPE )
61+ );
5562
5663 protected String getEndpoint () {
5764 return fixture .getAddress ();
@@ -61,6 +68,7 @@ protected String getEndpoint() {
6168 protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
6269 MockSecureSettings secureSettings = new MockSecureSettings ();
6370 secureSettings .setString (MAXMIND_LICENSE_KEY_SETTING .getKey (), "license_key" );
71+ secureSettings .setString (IPINFO_TOKEN_SETTING .getKey (), "token" );
6472 Settings .Builder builder = Settings .builder ();
6573 builder .setSecureSettings (secureSettings )
6674 .put (super .nodeSettings (nodeOrdinal , otherSettings ))
@@ -87,29 +95,44 @@ public void testEnterpriseDownloaderTask() throws Exception {
8795 * Note that the "enterprise database" is actually just a geolite database being loaded by the GeoIpHttpFixture.
8896 */
8997 EnterpriseGeoIpDownloader .DEFAULT_MAXMIND_ENDPOINT = getEndpoint ();
90- final String pipelineName = "enterprise_geoip_pipeline" ;
98+ EnterpriseGeoIpDownloader . DEFAULT_IPINFO_ENDPOINT = getEndpoint () ;
9199 final String indexName = "enterprise_geoip_test_index" ;
100+ final String geoipPipelineName = "enterprise_geoip_pipeline" ;
101+ final String iplocationPipelineName = "enterprise_iplocation_pipeline" ;
92102 final String sourceField = "ip" ;
93- final String targetField = "ip-city " ;
103+ final String targetField = "ip-result " ;
94104
95105 startEnterpriseGeoIpDownloaderTask ();
96- configureDatabase (DATABASE_TYPE );
97- createGeoIpPipeline (pipelineName , DATABASE_TYPE , sourceField , targetField );
106+ configureMaxmindDatabase (MAXMIND_DATABASE_TYPE );
107+ configureIpinfoDatabase (IPINFO_DATABASE_TYPE );
108+ waitAround ();
109+ createPipeline (geoipPipelineName , "geoip" , MAXMIND_DATABASE_TYPE , sourceField , targetField );
110+ createPipeline (iplocationPipelineName , "ip_location" , IPINFO_DATABASE_TYPE , sourceField , targetField );
98111
112+ /*
113+ * We know that the databases index has been populated (because we waited around, :wink:), but we don't know for sure that
114+ * the databases have been pulled down and made available on all nodes. So we run these ingest-and-check steps in assertBusy blocks.
115+ */
99116 assertBusy (() -> {
100- /*
101- * We know that the .geoip_databases index has been populated, but we don't know for sure that the database has been pulled
102- * down and made available on all nodes. So we run this ingest-and-check step in an assertBusy.
103- */
104117 logger .info ("Ingesting a test document" );
105- String documentId = ingestDocument (indexName , pipelineName , sourceField );
118+ String documentId = ingestDocument (indexName , geoipPipelineName , sourceField , "89.160.20.128" );
106119 GetResponse getResponse = client ().get (new GetRequest (indexName , documentId )).actionGet ();
107120 Map <String , Object > returnedSource = getResponse .getSource ();
108121 assertNotNull (returnedSource );
109122 Object targetFieldValue = returnedSource .get (targetField );
110123 assertNotNull (targetFieldValue );
111124 assertThat (((Map <String , Object >) targetFieldValue ).get ("organization_name" ), equalTo ("Bredband2 AB" ));
112125 });
126+ assertBusy (() -> {
127+ logger .info ("Ingesting another test document" );
128+ String documentId = ingestDocument (indexName , iplocationPipelineName , sourceField , "12.10.66.1" );
129+ GetResponse getResponse = client ().get (new GetRequest (indexName , documentId )).actionGet ();
130+ Map <String , Object > returnedSource = getResponse .getSource ();
131+ assertNotNull (returnedSource );
132+ Object targetFieldValue = returnedSource .get (targetField );
133+ assertNotNull (targetFieldValue );
134+ assertThat (((Map <String , Object >) targetFieldValue ).get ("organization_name" ), equalTo ("OAKLAWN JOCKEY CLUB, INC." ));
135+ });
113136 }
114137
115138 private void startEnterpriseGeoIpDownloaderTask () {
@@ -128,36 +151,53 @@ private void startEnterpriseGeoIpDownloaderTask() {
128151 );
129152 }
130153
131- private void configureDatabase (String databaseType ) throws Exception {
154+ private void configureMaxmindDatabase (String databaseType ) {
132155 admin ().cluster ()
133156 .execute (
134157 PutDatabaseConfigurationAction .INSTANCE ,
135158 new PutDatabaseConfigurationAction .Request (
136159 TimeValue .MAX_VALUE ,
137160 TimeValue .MAX_VALUE ,
138- new DatabaseConfiguration ("test" , databaseType , new DatabaseConfiguration .Maxmind ("test_account" ))
161+ new DatabaseConfiguration ("test-1 " , databaseType , new DatabaseConfiguration .Maxmind ("test_account" ))
139162 )
140163 )
141164 .actionGet ();
165+ }
166+
167+ private void configureIpinfoDatabase (String databaseType ) {
168+ admin ().cluster ()
169+ .execute (
170+ PutDatabaseConfigurationAction .INSTANCE ,
171+ new PutDatabaseConfigurationAction .Request (
172+ TimeValue .MAX_VALUE ,
173+ TimeValue .MAX_VALUE ,
174+ new DatabaseConfiguration ("test-2" , databaseType , new DatabaseConfiguration .Ipinfo ())
175+ )
176+ )
177+ .actionGet ();
178+ }
179+
180+ private void waitAround () throws Exception {
142181 ensureGreen (GeoIpDownloader .DATABASES_INDEX );
143182 assertBusy (() -> {
144183 SearchResponse searchResponse = client ().search (new SearchRequest (GeoIpDownloader .DATABASES_INDEX )).actionGet ();
145184 try {
146- assertThat (searchResponse .getHits ().getHits ().length , equalTo (1 ));
185+ assertThat (searchResponse .getHits ().getHits ().length , equalTo (2 ));
147186 } finally {
148187 searchResponse .decRef ();
149188 }
150189 });
151190 }
152191
153- private void createGeoIpPipeline (String pipelineName , String databaseType , String sourceField , String targetField ) throws IOException {
192+ private void createPipeline (String pipelineName , String processorType , String databaseType , String sourceField , String targetField )
193+ throws IOException {
154194 putJsonPipeline (pipelineName , (builder , params ) -> {
155195 builder .field ("description" , "test" );
156196 builder .startArray ("processors" );
157197 {
158198 builder .startObject ();
159199 {
160- builder .startObject ("geoip" );
200+ builder .startObject (processorType );
161201 {
162202 builder .field ("field" , sourceField );
163203 builder .field ("target_field" , targetField );
@@ -171,11 +211,11 @@ private void createGeoIpPipeline(String pipelineName, String databaseType, Strin
171211 });
172212 }
173213
174- private String ingestDocument (String indexName , String pipelineName , String sourceField ) {
214+ private String ingestDocument (String indexName , String pipelineName , String sourceField , String value ) {
175215 BulkRequest bulkRequest = new BulkRequest ();
176- bulkRequest .add (
177- new IndexRequest ( indexName ). source ( "{ \" " + sourceField + " \ " : \" 89.160.20.128 \" }" , XContentType . JSON ). setPipeline ( pipelineName )
178- );
216+ bulkRequest .add (new IndexRequest ( indexName ). source ( Strings . format ( """
217+ { "%s ": "%s"}
218+ """ , sourceField , value ), XContentType . JSON ). setPipeline ( pipelineName ) );
179219 BulkResponse response = client ().bulk (bulkRequest ).actionGet ();
180220 BulkItemResponse [] bulkItemResponses = response .getItems ();
181221 assertThat (bulkItemResponses .length , equalTo (1 ));
0 commit comments