12
12
13
13
import com .carrotsearch .randomizedtesting .annotations .Name ;
14
14
15
- import org .apache .http .util .EntityUtils ;
16
15
import org .elasticsearch .client .Request ;
16
+ import org .elasticsearch .client .Response ;
17
17
import org .elasticsearch .test .cluster .ElasticsearchCluster ;
18
18
import org .elasticsearch .test .cluster .FeatureFlag ;
19
19
import org .elasticsearch .test .cluster .local .distribution .DistributionType ;
20
- import org .elasticsearch .test .rest .ObjectPath ;
21
20
import org .elasticsearch .upgrades .FullClusterRestartUpgradeStatus ;
22
21
import org .elasticsearch .upgrades .ParameterizedFullClusterRestartTestCase ;
23
22
import org .junit .ClassRule ;
24
23
import org .junit .rules .RuleChain ;
25
24
import org .junit .rules .TestRule ;
26
25
27
- import java .io .IOException ;
28
26
import java .util .List ;
29
27
import java .util .Map ;
30
- import java .util .Objects ;
28
+ import java .util .Set ;
31
29
import java .util .concurrent .TimeUnit ;
32
30
33
- import static org .hamcrest .Matchers .contains ;
31
+ import static org .hamcrest .Matchers .equalTo ;
32
+ import static org .hamcrest .Matchers .hasSize ;
33
+ import static org .hamcrest .Matchers .is ;
34
34
35
35
public class FullClusterRestartIT extends ParameterizedFullClusterRestartTestCase {
36
36
37
37
private static final boolean useFixture = Boolean .getBoolean ("geoip_use_service" ) == false ;
38
38
39
- private static GeoIpHttpFixture fixture = new GeoIpHttpFixture (useFixture );
39
+ private static final GeoIpHttpFixture fixture = new GeoIpHttpFixture (useFixture );
40
40
41
- private static ElasticsearchCluster cluster = ElasticsearchCluster .local ()
41
+ private static final ElasticsearchCluster cluster = ElasticsearchCluster .local ()
42
42
.distribution (DistributionType .DEFAULT )
43
43
.version (getOldClusterTestVersion ())
44
44
.nodes (2 )
45
- .setting ("indices.memory.shard_inactive_time" , "60m" )
46
- .setting ("xpack.security.enabled" , "false" )
47
45
.setting ("ingest.geoip.downloader.endpoint" , () -> fixture .getAddress (), s -> useFixture )
46
+ .setting ("xpack.security.enabled" , "false" )
47
+ // .setting("logger.org.elasticsearch.ingest.geoip", "TRACE")
48
48
.feature (FeatureFlag .TIME_SERIES_MODE )
49
49
.build ();
50
50
@@ -60,110 +60,32 @@ protected ElasticsearchCluster getUpgradeCluster() {
60
60
return cluster ;
61
61
}
62
62
63
- public void testGeoIpSystemFeaturesMigration () throws Exception {
63
+ @ SuppressWarnings ("unchecked" )
64
+ public void testGeoIpDatabaseConfigurations () throws Exception {
64
65
if (isRunningAgainstOldCluster ()) {
65
- Request enableDownloader = new Request ("PUT" , "/_cluster/settings" );
66
- enableDownloader .setJsonEntity ("""
67
- {"persistent": {"ingest.geoip.downloader.enabled": true}}
68
- """ );
69
- assertOK (client ().performRequest (enableDownloader ));
70
-
71
- Request putPipeline = new Request ("PUT" , "/_ingest/pipeline/geoip" );
72
- putPipeline .setJsonEntity ("""
66
+ Request putConfiguration = new Request ("PUT" , "_ingest/ip_location/database/my-database-1" );
67
+ putConfiguration .setJsonEntity ("""
73
68
{
74
- "description": "Add geoip info",
75
- "processors": [{
76
- "geoip": {
77
- "field": "ip",
78
- "target_field": "geo",
79
- "database_file": "GeoLite2-Country.mmdb"
80
- }
81
- }]
69
+ "name": "GeoIP2-Domain",
70
+ "maxmind": {
71
+ "account_id": "1234567"
72
+ }
82
73
}
83
74
""" );
84
- assertOK (client ().performRequest (putPipeline ));
85
-
86
- // wait for the geo databases to all be loaded
87
- assertBusy (() -> testDatabasesLoaded (), 30 , TimeUnit .SECONDS );
88
-
89
- // the geoip index should be created
90
- assertBusy (() -> testCatIndices (".geoip_databases" ));
91
- assertBusy (() -> testIndexGeoDoc ());
92
- } else {
93
- Request migrateSystemFeatures = new Request ("POST" , "/_migration/system_features" );
94
- assertOK (client ().performRequest (migrateSystemFeatures ));
95
-
96
- assertBusy (() -> testCatIndices (".geoip_databases-reindexed-for-8" , "my-index-00001" ));
97
- assertBusy (() -> testIndexGeoDoc ());
98
-
99
- Request disableDownloader = new Request ("PUT" , "/_cluster/settings" );
100
- disableDownloader .setJsonEntity ("""
101
- {"persistent": {"ingest.geoip.downloader.enabled": false}}
102
- """ );
103
- assertOK (client ().performRequest (disableDownloader ));
104
-
105
- // the geoip index should be deleted
106
- assertBusy (() -> testCatIndices ("my-index-00001" ));
107
-
108
- Request enableDownloader = new Request ("PUT" , "/_cluster/settings" );
109
- enableDownloader .setJsonEntity ("""
110
- {"persistent": {"ingest.geoip.downloader.enabled": true}}
111
- """ );
112
- assertOK (client ().performRequest (enableDownloader ));
113
-
114
- // wait for the geo databases to all be loaded
115
- assertBusy (() -> testDatabasesLoaded (), 30 , TimeUnit .SECONDS );
116
-
117
- // the geoip index should be recreated
118
- assertBusy (() -> testCatIndices (".geoip_databases" , "my-index-00001" ));
119
- assertBusy (() -> testIndexGeoDoc ());
75
+ assertOK (client ().performRequest (putConfiguration ));
120
76
}
121
- }
122
-
123
- @ SuppressWarnings ("unchecked" )
124
- private void testDatabasesLoaded () throws IOException {
125
- Request getTaskState = new Request ("GET" , "/_cluster/state" );
126
- ObjectPath state = ObjectPath .createFromResponse (client ().performRequest (getTaskState ));
127
-
128
- List <?> tasks = state .evaluate ("metadata.persistent_tasks.tasks" );
129
- // Short-circuit to avoid using steams if the list is empty
130
- if (tasks .isEmpty ()) {
131
- fail ();
132
- }
133
- Map <String , Object > databases = (Map <String , Object >) tasks .stream ().map (task -> {
134
- try {
135
- return ObjectPath .evaluate (task , "task.geoip-downloader.state.databases" );
136
- } catch (IOException e ) {
137
- return null ;
138
- }
139
- }).filter (Objects ::nonNull ).findFirst ().orElse (null );
140
-
141
- assertNotNull (databases );
142
-
143
- for (String name : List .of ("GeoLite2-ASN.mmdb" , "GeoLite2-City.mmdb" , "GeoLite2-Country.mmdb" )) {
144
- Object database = databases .get (name );
145
- assertNotNull (database );
146
- assertNotNull (ObjectPath .evaluate (database , "md5" ));
147
- }
148
- }
149
-
150
- private void testCatIndices (String ... indexNames ) throws IOException {
151
- Request catIndices = new Request ("GET" , "_cat/indices/*?s=index&h=index&expand_wildcards=all" );
152
- String response = EntityUtils .toString (client ().performRequest (catIndices ).getEntity ());
153
- List <String > indices = List .of (response .trim ().split ("\\ s+" ));
154
- assertThat (indices , contains (indexNames ));
155
- }
156
-
157
- private void testIndexGeoDoc () throws IOException {
158
- Request putDoc = new Request ("PUT" , "/my-index-00001/_doc/my_id?pipeline=geoip" );
159
- putDoc .setJsonEntity ("""
160
- {"ip": "89.160.20.128"}
161
- """ );
162
- assertOK (client ().performRequest (putDoc ));
163
77
164
- Request getDoc = new Request ("GET" , "/my-index-00001/_doc/my_id" );
165
- ObjectPath doc = ObjectPath .createFromResponse (client ().performRequest (getDoc ));
166
- assertNull (doc .evaluate ("_source.tags" ));
167
- assertEquals ("Sweden" , doc .evaluate ("_source.geo.country_name" ));
78
+ assertBusy (() -> {
79
+ Request getConfiguration = new Request ("GET" , "_ingest/ip_location/database/my-database-1" );
80
+ Response response = assertOK (client ().performRequest (getConfiguration ));
81
+ Map <String , Object > map = responseAsMap (response );
82
+ assertThat (map .keySet (), equalTo (Set .of ("databases" )));
83
+ List <Map <String , Object >> databases = (List <Map <String , Object >>) map .get ("databases" );
84
+ assertThat (databases , hasSize (1 ));
85
+ Map <String , Object > database = databases .get (0 );
86
+ assertThat (database .get ("id" ), is ("my-database-1" ));
87
+ assertThat (database .get ("version" ), is (1 ));
88
+ assertThat (database .get ("database" ), equalTo (Map .of ("name" , "GeoIP2-Domain" , "maxmind" , Map .of ("account_id" , "1234567" ))));
89
+ }, 30 , TimeUnit .SECONDS );
168
90
}
169
91
}
0 commit comments