2424import org .elasticsearch .action .search .SearchRequest ;
2525import org .elasticsearch .action .search .SearchResponse ;
2626import org .elasticsearch .common .bytes .BytesReference ;
27+ import org .elasticsearch .common .Strings ;
2728import org .elasticsearch .common .settings .MockSecureSettings ;
2829import org .elasticsearch .common .settings .Settings ;
2930import org .elasticsearch .common .util .CollectionUtils ;
4445
4546import java .io .IOException ;
4647import java .util .Collection ;
48+ import java .util .List ;
4749import java .util .Map ;
4850
4951import static org .elasticsearch .ingest .EnterpriseGeoIpTask .ENTERPRISE_GEOIP_DOWNLOADER ;
52+ import static org .elasticsearch .ingest .geoip .EnterpriseGeoIpDownloaderTaskExecutor .IPINFO_TOKEN_SETTING ;
5053import static org .elasticsearch .ingest .geoip .EnterpriseGeoIpDownloaderTaskExecutor .MAXMIND_LICENSE_KEY_SETTING ;
5154import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
5255import static org .hamcrest .Matchers .equalTo ;
5356
5457public class EnterpriseGeoIpDownloaderIT extends ESIntegTestCase {
5558
56- private static final String DATABASE_TYPE = "GeoIP2-City" ;
59+ private static final String MAXMIND_DATABASE_TYPE = "GeoIP2-City" ;
60+ private static final String IPINFO_DATABASE_TYPE = "asn" ;
5761
5862 @ ClassRule
59- public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture (DATABASE_TYPE );
63+ public static final EnterpriseGeoIpHttpFixture fixture = new EnterpriseGeoIpHttpFixture (
64+ List .of (MAXMIND_DATABASE_TYPE ),
65+ List .of (IPINFO_DATABASE_TYPE )
66+ );
6067
6168 protected String getEndpoint () {
6269 return fixture .getAddress ();
@@ -66,6 +73,7 @@ protected String getEndpoint() {
6673 protected Settings nodeSettings (int nodeOrdinal , Settings otherSettings ) {
6774 MockSecureSettings secureSettings = new MockSecureSettings ();
6875 secureSettings .setString (MAXMIND_LICENSE_KEY_SETTING .getKey (), "license_key" );
76+ secureSettings .setString (IPINFO_TOKEN_SETTING .getKey (), "token" );
6977 Settings .Builder builder = Settings .builder ();
7078 builder .setSecureSettings (secureSettings )
7179 .put (super .nodeSettings (nodeOrdinal , otherSettings ))
@@ -92,29 +100,44 @@ public void testEnterpriseDownloaderTask() throws Exception {
92100 * Note that the "enterprise database" is actually just a geolite database being loaded by the GeoIpHttpFixture.
93101 */
94102 EnterpriseGeoIpDownloader .DEFAULT_MAXMIND_ENDPOINT = getEndpoint ();
95- final String pipelineName = "enterprise_geoip_pipeline" ;
103+ EnterpriseGeoIpDownloader . DEFAULT_IPINFO_ENDPOINT = getEndpoint () ;
96104 final String indexName = "enterprise_geoip_test_index" ;
105+ final String geoipPipelineName = "enterprise_geoip_pipeline" ;
106+ final String iplocationPipelineName = "enterprise_iplocation_pipeline" ;
97107 final String sourceField = "ip" ;
98- final String targetField = "ip-city " ;
108+ final String targetField = "ip-result " ;
99109
100110 startEnterpriseGeoIpDownloaderTask ();
101- configureDatabase (DATABASE_TYPE );
102- createGeoIpPipeline (pipelineName , DATABASE_TYPE , sourceField , targetField );
111+ configureMaxmindDatabase (MAXMIND_DATABASE_TYPE );
112+ configureIpinfoDatabase (IPINFO_DATABASE_TYPE );
113+ waitAround ();
114+ createPipeline (geoipPipelineName , "geoip" , MAXMIND_DATABASE_TYPE , sourceField , targetField );
115+ createPipeline (iplocationPipelineName , "ip_location" , IPINFO_DATABASE_TYPE , sourceField , targetField );
103116
117+ /*
118+ * We know that the databases index has been populated (because we waited around, :wink:), but we don't know for sure that
119+ * the databases have been pulled down and made available on all nodes. So we run these ingest-and-check steps in assertBusy blocks.
120+ */
104121 assertBusy (() -> {
105- /*
106- * We know that the .geoip_databases index has been populated, but we don't know for sure that the database has been pulled
107- * down and made available on all nodes. So we run this ingest-and-check step in an assertBusy.
108- */
109122 logger .info ("Ingesting a test document" );
110- String documentId = ingestDocument (indexName , pipelineName , sourceField );
123+ String documentId = ingestDocument (indexName , geoipPipelineName , sourceField , "89.160.20.128" );
111124 GetResponse getResponse = client ().get (new GetRequest (indexName , documentId )).actionGet ();
112125 Map <String , Object > returnedSource = getResponse .getSource ();
113126 assertNotNull (returnedSource );
114127 Object targetFieldValue = returnedSource .get (targetField );
115128 assertNotNull (targetFieldValue );
116129 assertThat (((Map <String , Object >) targetFieldValue ).get ("organization_name" ), equalTo ("Bredband2 AB" ));
117130 });
131+ assertBusy (() -> {
132+ logger .info ("Ingesting another test document" );
133+ String documentId = ingestDocument (indexName , iplocationPipelineName , sourceField , "12.10.66.1" );
134+ GetResponse getResponse = client ().get (new GetRequest (indexName , documentId )).actionGet ();
135+ Map <String , Object > returnedSource = getResponse .getSource ();
136+ assertNotNull (returnedSource );
137+ Object targetFieldValue = returnedSource .get (targetField );
138+ assertNotNull (targetFieldValue );
139+ assertThat (((Map <String , Object >) targetFieldValue ).get ("organization_name" ), equalTo ("OAKLAWN JOCKEY CLUB, INC." ));
140+ });
118141 }
119142
120143 private void startEnterpriseGeoIpDownloaderTask () {
@@ -133,29 +156,46 @@ private void startEnterpriseGeoIpDownloaderTask() {
133156 );
134157 }
135158
136- private void configureDatabase (String databaseType ) throws Exception {
159+ private void configureMaxmindDatabase (String databaseType ) {
137160 admin ().cluster ()
138161 .execute (
139162 PutDatabaseConfigurationAction .INSTANCE ,
140163 new PutDatabaseConfigurationAction .Request (
141164 TimeValue .MAX_VALUE ,
142165 TimeValue .MAX_VALUE ,
143- new DatabaseConfiguration ("test" , databaseType , new DatabaseConfiguration .Maxmind ("test_account" ))
166+ new DatabaseConfiguration ("test-1 " , databaseType , new DatabaseConfiguration .Maxmind ("test_account" ))
144167 )
145168 )
146169 .actionGet ();
170+ }
171+
172+ private void configureIpinfoDatabase (String databaseType ) {
173+ admin ().cluster ()
174+ .execute (
175+ PutDatabaseConfigurationAction .INSTANCE ,
176+ new PutDatabaseConfigurationAction .Request (
177+ TimeValue .MAX_VALUE ,
178+ TimeValue .MAX_VALUE ,
179+ new DatabaseConfiguration ("test-2" , databaseType , new DatabaseConfiguration .Ipinfo ())
180+ )
181+ )
182+ .actionGet ();
183+ }
184+
185+ private void waitAround () throws Exception {
147186 ensureGreen (GeoIpDownloader .DATABASES_INDEX );
148187 assertBusy (() -> {
149188 SearchResponse searchResponse = client ().search (new SearchRequest (GeoIpDownloader .DATABASES_INDEX )).actionGet ();
150189 try {
151- assertThat (searchResponse .getHits ().getHits ().length , equalTo (1 ));
190+ assertThat (searchResponse .getHits ().getHits ().length , equalTo (2 ));
152191 } finally {
153192 searchResponse .decRef ();
154193 }
155194 });
156195 }
157196
158- private void createGeoIpPipeline (String pipelineName , String databaseType , String sourceField , String targetField ) throws IOException {
197+ private void createPipeline (String pipelineName , String processorType , String databaseType ,
198+ String sourceField , String targetField ) throws IOException {
159199 final BytesReference bytes ;
160200 try (XContentBuilder builder = JsonXContent .contentBuilder ()) {
161201 builder .startObject ();
@@ -165,7 +205,7 @@ private void createGeoIpPipeline(String pipelineName, String databaseType, Strin
165205 {
166206 builder .startObject ();
167207 {
168- builder .startObject ("geoip" );
208+ builder .startObject (processorType );
169209 {
170210 builder .field ("field" , sourceField );
171211 builder .field ("target_field" , targetField );
@@ -183,11 +223,11 @@ private void createGeoIpPipeline(String pipelineName, String databaseType, Strin
183223 assertAcked (clusterAdmin ().putPipeline (new PutPipelineRequest (pipelineName , bytes , XContentType .JSON )).actionGet ());
184224 }
185225
186- private String ingestDocument (String indexName , String pipelineName , String sourceField ) {
226+ private String ingestDocument (String indexName , String pipelineName , String sourceField , String value ) {
187227 BulkRequest bulkRequest = new BulkRequest ();
188- bulkRequest .add (
189- new IndexRequest ( indexName ). source ( "{ \" " + sourceField + " \ " : \" 89.160.20.128 \" }" , XContentType . JSON ). setPipeline ( pipelineName )
190- );
228+ bulkRequest .add (new IndexRequest ( indexName ). source ( Strings . format ( """
229+ { "%s ": "%s"}
230+ """ , sourceField , value ), XContentType . JSON ). setPipeline ( pipelineName ) );
191231 BulkResponse response = client ().bulk (bulkRequest ).actionGet ();
192232 BulkItemResponse [] bulkItemResponses = response .getItems ();
193233 assertThat (bulkItemResponses .length , equalTo (1 ));
0 commit comments