1212
1313import com .carrotsearch .randomizedtesting .annotations .Name ;
1414
15- import org .apache .http .util .EntityUtils ;
1615import org .elasticsearch .client .Request ;
17- import org .elasticsearch .client .RequestOptions ;
1816import org .elasticsearch .client .Response ;
19- import org .elasticsearch .client .WarningsHandler ;
20- import org .elasticsearch .common .settings .Settings ;
21- import org .elasticsearch .common .util .concurrent .ThreadContext ;
22- import org .elasticsearch .core .Nullable ;
23- import org .elasticsearch .rest .RestStatus ;
2417import org .elasticsearch .test .cluster .ElasticsearchCluster ;
2518import org .elasticsearch .test .cluster .FeatureFlag ;
2619import org .elasticsearch .test .cluster .local .distribution .DistributionType ;
2720import org .elasticsearch .test .cluster .util .Version ;
28- import org .elasticsearch .test .rest .ObjectPath ;
2921import org .elasticsearch .upgrades .FullClusterRestartUpgradeStatus ;
3022import org .elasticsearch .upgrades .ParameterizedFullClusterRestartTestCase ;
3123import org .junit .ClassRule ;
3224import org .junit .rules .RuleChain ;
3325import org .junit .rules .TestRule ;
3426
35- import java .io .IOException ;
36- import java .nio .charset .StandardCharsets ;
37- import java .util .ArrayList ;
38- import java .util .Base64 ;
39- import java .util .HashSet ;
4027import java .util .List ;
4128import java .util .Map ;
42- import java .util .Objects ;
29+ import java .util .Set ;
4330import java .util .concurrent .TimeUnit ;
4431
32+ import static org .hamcrest .Matchers .equalTo ;
33+ import static org .hamcrest .Matchers .hasSize ;
4534import static org .hamcrest .Matchers .is ;
4635
4736public class FullClusterRestartIT extends ParameterizedFullClusterRestartTestCase {
@@ -50,29 +39,16 @@ public class FullClusterRestartIT extends ParameterizedFullClusterRestartTestCas
5039
5140 private static final GeoIpHttpFixture fixture = new GeoIpHttpFixture (useFixture );
5241
53- // e.g. use ./gradlew -Dtests.jvm.argline="-Dgeoip_test_with_security=false" ":modules:ingest-geoip:qa:full-cluster-restart:check"
54- // to set this to false, if you so desire
55- private static final boolean useSecurity = Boolean .parseBoolean (System .getProperty ("geoip_test_with_security" , "true" ));
56-
5742 private static final ElasticsearchCluster cluster = ElasticsearchCluster .local ()
5843 .distribution (DistributionType .DEFAULT )
5944 .version (Version .fromString (OLD_CLUSTER_VERSION ))
6045 .nodes (2 )
6146 .setting ("ingest.geoip.downloader.endpoint" , () -> fixture .getAddress (), s -> useFixture )
62- .setting ("xpack.security.enabled" , useSecurity ? "true" : "false" )
47+ .setting ("xpack.security.enabled" , "false" )
48+ // .setting("logger.org.elasticsearch.ingest.geoip", "TRACE")
6349 .feature (FeatureFlag .TIME_SERIES_MODE )
6450 .build ();
6551
66- @ Override
67- protected Settings restClientSettings () {
68- Settings settings = super .restClientSettings ();
69- if (useSecurity ) {
70- String token = "Basic " + Base64 .getEncoder ().encodeToString ("test_user:x-pack-test-password" .getBytes (StandardCharsets .UTF_8 ));
71- settings = Settings .builder ().put (settings ).put (ThreadContext .PREFIX + ".Authorization" , token ).build ();
72- }
73- return settings ;
74- }
75-
7652 @ ClassRule
7753 public static TestRule ruleChain = RuleChain .outerRule (fixture ).around (cluster );
7854
@@ -85,196 +61,32 @@ protected ElasticsearchCluster getUpgradeCluster() {
8561 return cluster ;
8662 }
8763
88- public void testGeoIpSystemFeaturesMigration () throws Exception {
89- final List <String > maybeSecurityIndex = useSecurity ? List .of (".security-7" ) : List .of ();
90- final List <String > maybeSecurityIndexReindexed = useSecurity ? List .of (".security-7-reindexed-for-10" ) : List .of ();
91-
64+ @ SuppressWarnings ("unchecked" )
65+ public void testGeoIpDatabaseConfigurations () throws Exception {
9266 if (isRunningAgainstOldCluster ()) {
93- Request enableDownloader = new Request ("PUT" , "/_cluster/settings" );
94- enableDownloader .setJsonEntity ("""
95- {"persistent": {"ingest.geoip.downloader.enabled": true}}
96- """ );
97- assertOK (client ().performRequest (enableDownloader ));
98-
99- Request putPipeline = new Request ("PUT" , "/_ingest/pipeline/geoip" );
100- putPipeline .setJsonEntity ("""
67+ Request putConfiguration = new Request ("PUT" , "_ingest/ip_location/database/my-database-1" );
68+ putConfiguration .setJsonEntity ("""
10169 {
102- "description": "Add geoip info",
103- "processors": [{
104- "geoip": {
105- "field": "ip",
106- "target_field": "geo",
107- "database_file": "GeoLite2-Country.mmdb"
108- }
109- }]
70+ "name": "GeoIP2-Domain",
71+ "maxmind": {
72+ "account_id": "1234567"
73+ }
11074 }
11175 """ );
112- assertOK (client ().performRequest (putPipeline ));
113-
114- // wait for the geo databases to all be loaded
115- assertBusy (() -> testDatabasesLoaded (), 30 , TimeUnit .SECONDS );
116-
117- // the geoip index should be created
118- assertBusy (() -> testCatIndices (List .of (".geoip_databases" ), maybeSecurityIndex ));
119- assertBusy (() -> testIndexGeoDoc ());
120-
121- // before the upgrade, Kibana should work
122- assertBusy (() -> testGetStarAsKibana (List .of ("my-index-00001" ), maybeSecurityIndex ));
123-
124- // as should a normal get *
125- assertBusy (() -> testGetStar (List .of ("my-index-00001" ), maybeSecurityIndex ));
126-
127- // and getting data streams
128- assertBusy (() -> testGetDatastreams ());
129- } else {
130- // after the upgrade, but before the migration, Kibana should work
131- assertBusy (() -> testGetStarAsKibana (List .of ("my-index-00001" ), maybeSecurityIndex ));
132-
133- // as should a normal get *
134- assertBusy (() -> testGetStar (List .of ("my-index-00001" ), maybeSecurityIndex ));
135-
136- // and getting data streams
137- assertBusy (() -> testGetDatastreams ());
138-
139- // migrate the system features and give the cluster a moment to settle
140- Request migrateSystemFeatures = new Request ("POST" , "/_migration/system_features" );
141- assertOK (client ().performRequest (migrateSystemFeatures ));
142- ensureHealth (request -> request .addParameter ("wait_for_status" , "yellow" ));
143-
144- assertBusy (() -> testCatIndices (List .of (".geoip_databases-reindexed-for-10" , "my-index-00001" ), maybeSecurityIndexReindexed ));
145- assertBusy (() -> testIndexGeoDoc ());
146-
147- // after the migration, Kibana should work
148- assertBusy (() -> testGetStarAsKibana (List .of ("my-index-00001" ), maybeSecurityIndexReindexed ));
149-
150- // as should a normal get *
151- assertBusy (() -> testGetStar (List .of ("my-index-00001" ), maybeSecurityIndexReindexed ));
152-
153- // and getting data streams
154- assertBusy (() -> testGetDatastreams ());
155-
156- Request disableDownloader = new Request ("PUT" , "/_cluster/settings" );
157- disableDownloader .setJsonEntity ("""
158- {"persistent": {"ingest.geoip.downloader.enabled": false}}
159- """ );
160- assertOK (client ().performRequest (disableDownloader ));
161-
162- // the geoip index should be deleted
163- assertBusy (() -> testCatIndices (List .of ("my-index-00001" ), maybeSecurityIndexReindexed ));
164-
165- Request enableDownloader = new Request ("PUT" , "/_cluster/settings" );
166- enableDownloader .setJsonEntity ("""
167- {"persistent": {"ingest.geoip.downloader.enabled": true}}
168- """ );
169- assertOK (client ().performRequest (enableDownloader ));
170-
171- // wait for the geo databases to all be loaded
172- assertBusy (() -> testDatabasesLoaded (), 30 , TimeUnit .SECONDS );
173-
174- // the geoip index should be recreated
175- assertBusy (() -> testCatIndices (List .of (".geoip_databases" , "my-index-00001" ), maybeSecurityIndexReindexed ));
176- assertBusy (() -> testIndexGeoDoc ());
76+ assertOK (client ().performRequest (putConfiguration ));
17777 }
178- }
179-
180- @ SuppressWarnings ("unchecked" )
181- private void testDatabasesLoaded () throws IOException {
182- Request getTaskState = new Request ("GET" , "/_cluster/state" );
183- ObjectPath state = ObjectPath .createFromResponse (assertOK (client ().performRequest (getTaskState )));
184-
185- List <?> tasks = state .evaluate ("metadata.persistent_tasks.tasks" );
186- // Short-circuit to avoid using steams if the list is empty
187- if (tasks .isEmpty ()) {
188- fail ();
189- }
190- Map <String , Object > databases = (Map <String , Object >) tasks .stream ().map (task -> {
191- try {
192- return ObjectPath .evaluate (task , "task.geoip-downloader.state.databases" );
193- } catch (IOException e ) {
194- return null ;
195- }
196- }).filter (Objects ::nonNull ).findFirst ().orElse (null );
197-
198- assertNotNull (databases );
199-
200- for (String name : List .of ("GeoLite2-ASN.mmdb" , "GeoLite2-City.mmdb" , "GeoLite2-Country.mmdb" )) {
201- Object database = databases .get (name );
202- assertNotNull (database );
203- assertNotNull (ObjectPath .evaluate (database , "md5" ));
204- }
205- }
206-
207- private void testCatIndices (List <String > indexNames , @ Nullable List <String > additionalIndexNames ) throws IOException {
208- Request catIndices = new Request ("GET" , "_cat/indices/*?s=index&h=index&expand_wildcards=all" );
209- // the cat APIs can sometimes 404, erroneously
210- // see https://github.com/elastic/elasticsearch/issues/104371
211- setIgnoredErrorResponseCodes (catIndices , RestStatus .NOT_FOUND );
212- String response = EntityUtils .toString (assertOK (client ().performRequest (catIndices )).getEntity ());
213- List <String > indices = List .of (response .trim ().split ("\\ s+" ));
214-
215- if (additionalIndexNames != null && additionalIndexNames .isEmpty () == false ) {
216- indexNames = new ArrayList <>(indexNames ); // recopy into a mutable list
217- indexNames .addAll (additionalIndexNames );
218- }
219-
220- assertThat (new HashSet <>(indices ), is (new HashSet <>(indexNames )));
221- }
222-
223- private void testIndexGeoDoc () throws IOException {
224- Request putDoc = new Request ("PUT" , "/my-index-00001/_doc/my_id?pipeline=geoip" );
225- putDoc .setJsonEntity ("""
226- {"ip": "89.160.20.128"}
227- """ );
228- assertOK (client ().performRequest (putDoc ));
229-
230- Request getDoc = new Request ("GET" , "/my-index-00001/_doc/my_id" );
231- ObjectPath doc = ObjectPath .createFromResponse (assertOK (client ().performRequest (getDoc )));
232- assertNull (doc .evaluate ("_source.tags" ));
233- assertEquals ("Sweden" , doc .evaluate ("_source.geo.country_name" ));
234- }
235-
236- private void testGetStar (List <String > indexNames , @ Nullable List <String > additionalIndexNames ) throws IOException {
237- Request getStar = new Request ("GET" , "*?expand_wildcards=all" );
238- getStar .setOptions (
239- RequestOptions .DEFAULT .toBuilder ().setWarningsHandler (WarningsHandler .PERMISSIVE ) // we don't care about warnings, just errors
240- );
241- Response response = assertOK (client ().performRequest (getStar ));
242-
243- if (additionalIndexNames != null && additionalIndexNames .isEmpty () == false ) {
244- indexNames = new ArrayList <>(indexNames ); // recopy into a mutable list
245- indexNames .addAll (additionalIndexNames );
246- }
247-
248- Map <String , Object > map = responseAsMap (response );
249- assertThat (map .keySet (), is (new HashSet <>(indexNames )));
250- }
251-
252- private void testGetStarAsKibana (List <String > indexNames , @ Nullable List <String > additionalIndexNames ) throws IOException {
253- Request getStar = new Request ("GET" , "*?expand_wildcards=all" );
254- getStar .setOptions (
255- RequestOptions .DEFAULT .toBuilder ()
256- .addHeader ("X-elastic-product-origin" , "kibana" )
257- .setWarningsHandler (WarningsHandler .PERMISSIVE ) // we don't care about warnings, just errors
258- );
259- Response response = assertOK (client ().performRequest (getStar ));
260-
261- if (additionalIndexNames != null && additionalIndexNames .isEmpty () == false ) {
262- indexNames = new ArrayList <>(indexNames ); // recopy into a mutable list
263- indexNames .addAll (additionalIndexNames );
264- }
265-
266- Map <String , Object > map = responseAsMap (response );
267- assertThat (map .keySet (), is (new HashSet <>(indexNames )));
268- }
269-
270- private void testGetDatastreams () throws IOException {
271- Request getStar = new Request ("GET" , "_data_stream" );
272- getStar .setOptions (
273- RequestOptions .DEFAULT .toBuilder ().setWarningsHandler (WarningsHandler .PERMISSIVE ) // we don't care about warnings, just errors
274- );
275- Response response = client ().performRequest (getStar );
276- assertOK (response );
27778
278- // note: we don't actually care about the response, just that there was one and that it didn't error out on us
79+ assertBusy (() -> {
80+ Request getConfiguration = new Request ("GET" , "_ingest/ip_location/database/my-database-1" );
81+ Response response = assertOK (client ().performRequest (getConfiguration ));
82+ Map <String , Object > map = responseAsMap (response );
83+ assertThat (map .keySet (), equalTo (Set .of ("databases" )));
84+ List <Map <String , Object >> databases = (List <Map <String , Object >>) map .get ("databases" );
85+ assertThat (databases , hasSize (1 ));
86+ Map <String , Object > database = databases .get (0 );
87+ assertThat (database .get ("id" ), is ("my-database-1" ));
88+ assertThat (database .get ("version" ), is (1 ));
89+ assertThat (database .get ("database" ), equalTo (Map .of ("name" , "GeoIP2-Domain" , "maxmind" , Map .of ("account_id" , "1234567" ))));
90+ }, 30 , TimeUnit .SECONDS );
27991 }
28092}
0 commit comments