1212
1313import com .carrotsearch .randomizedtesting .annotations .Name ;
1414
15- import org .apache .http .util .EntityUtils ;
1615import org .elasticsearch .client .Request ;
16+ import org .elasticsearch .client .Response ;
1717import org .elasticsearch .test .cluster .ElasticsearchCluster ;
1818import org .elasticsearch .test .cluster .FeatureFlag ;
1919import org .elasticsearch .test .cluster .local .distribution .DistributionType ;
20- import org .elasticsearch .test .rest .ObjectPath ;
2120import org .elasticsearch .upgrades .FullClusterRestartUpgradeStatus ;
2221import org .elasticsearch .upgrades .ParameterizedFullClusterRestartTestCase ;
2322import org .junit .ClassRule ;
2423import org .junit .rules .RuleChain ;
2524import org .junit .rules .TestRule ;
2625
27- import java .io .IOException ;
2826import java .util .List ;
2927import java .util .Map ;
30- import java .util .Objects ;
28+ import java .util .Set ;
3129import java .util .concurrent .TimeUnit ;
3230
33- import static org .hamcrest .Matchers .contains ;
31+ import static org .hamcrest .Matchers .equalTo ;
32+ import static org .hamcrest .Matchers .hasSize ;
33+ import static org .hamcrest .Matchers .is ;
3434
3535public class FullClusterRestartIT extends ParameterizedFullClusterRestartTestCase {
3636
3737 private static final boolean useFixture = Boolean .getBoolean ("geoip_use_service" ) == false ;
3838
39- private static GeoIpHttpFixture fixture = new GeoIpHttpFixture (useFixture );
39+ private static final GeoIpHttpFixture fixture = new GeoIpHttpFixture (useFixture );
4040
41- private static ElasticsearchCluster cluster = ElasticsearchCluster .local ()
41+ private static final ElasticsearchCluster cluster = ElasticsearchCluster .local ()
4242 .distribution (DistributionType .DEFAULT )
4343 .version (getOldClusterTestVersion ())
4444 .nodes (2 )
45- .setting ("indices.memory.shard_inactive_time" , "60m" )
46- .setting ("xpack.security.enabled" , "false" )
4745 .setting ("ingest.geoip.downloader.endpoint" , () -> fixture .getAddress (), s -> useFixture )
46+ .setting ("xpack.security.enabled" , "false" )
47+ // .setting("logger.org.elasticsearch.ingest.geoip", "TRACE")
4848 .feature (FeatureFlag .TIME_SERIES_MODE )
4949 .build ();
5050
@@ -60,110 +60,32 @@ protected ElasticsearchCluster getUpgradeCluster() {
6060 return cluster ;
6161 }
6262
63- public void testGeoIpSystemFeaturesMigration () throws Exception {
63+ @ SuppressWarnings ("unchecked" )
64+ public void testGeoIpDatabaseConfigurations () throws Exception {
6465 if (isRunningAgainstOldCluster ()) {
65- Request enableDownloader = new Request ("PUT" , "/_cluster/settings" );
66- enableDownloader .setJsonEntity ("""
67- {"persistent": {"ingest.geoip.downloader.enabled": true}}
68- """ );
69- assertOK (client ().performRequest (enableDownloader ));
70-
71- Request putPipeline = new Request ("PUT" , "/_ingest/pipeline/geoip" );
72- putPipeline .setJsonEntity ("""
66+ Request putConfiguration = new Request ("PUT" , "_ingest/ip_location/database/my-database-1" );
67+ putConfiguration .setJsonEntity ("""
7368 {
74- "description": "Add geoip info",
75- "processors": [{
76- "geoip": {
77- "field": "ip",
78- "target_field": "geo",
79- "database_file": "GeoLite2-Country.mmdb"
80- }
81- }]
69+ "name": "GeoIP2-Domain",
70+ "maxmind": {
71+ "account_id": "1234567"
72+ }
8273 }
8374 """ );
84- assertOK (client ().performRequest (putPipeline ));
85-
86- // wait for the geo databases to all be loaded
87- assertBusy (() -> testDatabasesLoaded (), 30 , TimeUnit .SECONDS );
88-
89- // the geoip index should be created
90- assertBusy (() -> testCatIndices (".geoip_databases" ));
91- assertBusy (() -> testIndexGeoDoc ());
92- } else {
93- Request migrateSystemFeatures = new Request ("POST" , "/_migration/system_features" );
94- assertOK (client ().performRequest (migrateSystemFeatures ));
95-
96- assertBusy (() -> testCatIndices (".geoip_databases-reindexed-for-8" , "my-index-00001" ));
97- assertBusy (() -> testIndexGeoDoc ());
98-
99- Request disableDownloader = new Request ("PUT" , "/_cluster/settings" );
100- disableDownloader .setJsonEntity ("""
101- {"persistent": {"ingest.geoip.downloader.enabled": false}}
102- """ );
103- assertOK (client ().performRequest (disableDownloader ));
104-
105- // the geoip index should be deleted
106- assertBusy (() -> testCatIndices ("my-index-00001" ));
107-
108- Request enableDownloader = new Request ("PUT" , "/_cluster/settings" );
109- enableDownloader .setJsonEntity ("""
110- {"persistent": {"ingest.geoip.downloader.enabled": true}}
111- """ );
112- assertOK (client ().performRequest (enableDownloader ));
113-
114- // wait for the geo databases to all be loaded
115- assertBusy (() -> testDatabasesLoaded (), 30 , TimeUnit .SECONDS );
116-
117- // the geoip index should be recreated
118- assertBusy (() -> testCatIndices (".geoip_databases" , "my-index-00001" ));
119- assertBusy (() -> testIndexGeoDoc ());
75+ assertOK (client ().performRequest (putConfiguration ));
12076 }
121- }
122-
123- @ SuppressWarnings ("unchecked" )
124- private void testDatabasesLoaded () throws IOException {
125- Request getTaskState = new Request ("GET" , "/_cluster/state" );
126- ObjectPath state = ObjectPath .createFromResponse (client ().performRequest (getTaskState ));
127-
128- List <?> tasks = state .evaluate ("metadata.persistent_tasks.tasks" );
129- // Short-circuit to avoid using steams if the list is empty
130- if (tasks .isEmpty ()) {
131- fail ();
132- }
133- Map <String , Object > databases = (Map <String , Object >) tasks .stream ().map (task -> {
134- try {
135- return ObjectPath .evaluate (task , "task.geoip-downloader.state.databases" );
136- } catch (IOException e ) {
137- return null ;
138- }
139- }).filter (Objects ::nonNull ).findFirst ().orElse (null );
140-
141- assertNotNull (databases );
142-
143- for (String name : List .of ("GeoLite2-ASN.mmdb" , "GeoLite2-City.mmdb" , "GeoLite2-Country.mmdb" )) {
144- Object database = databases .get (name );
145- assertNotNull (database );
146- assertNotNull (ObjectPath .evaluate (database , "md5" ));
147- }
148- }
149-
150- private void testCatIndices (String ... indexNames ) throws IOException {
151- Request catIndices = new Request ("GET" , "_cat/indices/*?s=index&h=index&expand_wildcards=all" );
152- String response = EntityUtils .toString (client ().performRequest (catIndices ).getEntity ());
153- List <String > indices = List .of (response .trim ().split ("\\ s+" ));
154- assertThat (indices , contains (indexNames ));
155- }
156-
157- private void testIndexGeoDoc () throws IOException {
158- Request putDoc = new Request ("PUT" , "/my-index-00001/_doc/my_id?pipeline=geoip" );
159- putDoc .setJsonEntity ("""
160- {"ip": "89.160.20.128"}
161- """ );
162- assertOK (client ().performRequest (putDoc ));
16377
164- Request getDoc = new Request ("GET" , "/my-index-00001/_doc/my_id" );
165- ObjectPath doc = ObjectPath .createFromResponse (client ().performRequest (getDoc ));
166- assertNull (doc .evaluate ("_source.tags" ));
167- assertEquals ("Sweden" , doc .evaluate ("_source.geo.country_name" ));
78+ assertBusy (() -> {
79+ Request getConfiguration = new Request ("GET" , "_ingest/ip_location/database/my-database-1" );
80+ Response response = assertOK (client ().performRequest (getConfiguration ));
81+ Map <String , Object > map = responseAsMap (response );
82+ assertThat (map .keySet (), equalTo (Set .of ("databases" )));
83+ List <Map <String , Object >> databases = (List <Map <String , Object >>) map .get ("databases" );
84+ assertThat (databases , hasSize (1 ));
85+ Map <String , Object > database = databases .get (0 );
86+ assertThat (database .get ("id" ), is ("my-database-1" ));
87+ assertThat (database .get ("version" ), is (1 ));
88+ assertThat (database .get ("database" ), equalTo (Map .of ("name" , "GeoIP2-Domain" , "maxmind" , Map .of ("account_id" , "1234567" ))));
89+ }, 30 , TimeUnit .SECONDS );
16890 }
16991}
0 commit comments