1111
1212import org .apache .lucene .tests .util .LuceneTestCase ;
1313import org .elasticsearch .client .internal .Client ;
14- import org .elasticsearch .cluster .ClusterState ;
14+ import org .elasticsearch .cluster .ClusterName ;
15+ import org .elasticsearch .cluster .metadata .ProjectId ;
16+ import org .elasticsearch .cluster .metadata .ProjectMetadata ;
17+ import org .elasticsearch .cluster .project .ProjectResolver ;
18+ import org .elasticsearch .cluster .project .TestProjectResolvers ;
1519import org .elasticsearch .cluster .service .ClusterService ;
1620import org .elasticsearch .common .util .concurrent .AtomicArray ;
1721import org .elasticsearch .core .IOUtils ;
3236import java .util .concurrent .atomic .AtomicInteger ;
3337import java .util .concurrent .atomic .AtomicReference ;
3438
39+ import static org .elasticsearch .cluster .ClusterState .builder ;
3540import static org .elasticsearch .ingest .geoip .GeoIpProcessor .GEOIP_TYPE ;
3641import static org .elasticsearch .ingest .geoip .GeoIpTestUtils .copyDatabase ;
3742import static org .elasticsearch .ingest .geoip .GeoIpTestUtils .copyDefaultDatabases ;
@@ -62,31 +67,39 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
6267 * geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance
6368 */
6469 public void test () throws Exception {
70+ ProjectId projectId = randomProjectIdOrDefault ();
6571 Path geoIpConfigDir = createTempDir ();
6672 Path geoIpTmpDir = createTempDir ();
6773 ClusterService clusterService = mock (ClusterService .class );
68- when (clusterService .state ()).thenReturn (ClusterState .EMPTY_STATE );
69- DatabaseNodeService databaseNodeService = createRegistry (geoIpConfigDir , geoIpTmpDir , clusterService );
74+ when (clusterService .state ()).thenReturn (
75+ builder (ClusterName .DEFAULT ).putProjectMetadata (ProjectMetadata .builder (projectId ).build ()).build ()
76+ );
77+ DatabaseNodeService databaseNodeService = createRegistry (
78+ geoIpConfigDir ,
79+ geoIpTmpDir ,
80+ clusterService ,
81+ TestProjectResolvers .singleProject (projectId )
82+ );
7083 GeoIpProcessor .Factory factory = new GeoIpProcessor .Factory (GEOIP_TYPE , databaseNodeService );
7184 copyDatabase ("GeoLite2-City-Test.mmdb" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
7285 copyDatabase ("GeoLite2-City-Test.mmdb" , geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
73- databaseNodeService .updateDatabase ("GeoLite2-City.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
74- databaseNodeService .updateDatabase ("GeoLite2-City-Test.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
75- lazyLoadReaders (databaseNodeService );
86+ databaseNodeService .updateDatabase (projectId , "GeoLite2-City.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
87+ databaseNodeService .updateDatabase (projectId , "GeoLite2-City-Test.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
88+ lazyLoadReaders (projectId , databaseNodeService );
7689
7790 final GeoIpProcessor processor1 = (GeoIpProcessor ) factory .create (
7891 null ,
7992 "_tag" ,
8093 null ,
8194 new HashMap <>(Map .of ("field" , "_field" )),
82- null
95+ projectId
8396 );
8497 final GeoIpProcessor processor2 = (GeoIpProcessor ) factory .create (
8598 null ,
8699 "_tag" ,
87100 null ,
88101 new HashMap <>(Map .of ("field" , "_field" , "database_file" , "GeoLite2-City-Test.mmdb" )),
89- null
102+ projectId
90103 );
91104
92105 final AtomicBoolean completed = new AtomicBoolean (false );
@@ -134,9 +147,9 @@ public void test() throws Exception {
134147 Thread updateDatabaseThread = new Thread (() -> {
135148 for (int i = 0 ; i < numberOfDatabaseUpdates ; i ++) {
136149 try {
137- DatabaseReaderLazyLoader previous1 = databaseNodeService .get ("GeoLite2-City.mmdb" );
150+ DatabaseReaderLazyLoader previous1 = databaseNodeService .get (projectId , "GeoLite2-City.mmdb" );
138151 if (Files .exists (geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ))) {
139- databaseNodeService .removeStaleEntries (List .of ("GeoLite2-City.mmdb" ));
152+ databaseNodeService .removeStaleEntries (projectId , List .of ("GeoLite2-City.mmdb" ));
140153 assertBusy (() -> {
141154 // lazy loader may still be in use by an ingest thread,
142155 // wait for any potential ingest thread to release the lazy loader (DatabaseReaderLazyLoader#postLookup(...)),
@@ -146,22 +159,32 @@ public void test() throws Exception {
146159 });
147160 } else {
148161 copyDatabase ("GeoLite2-City-Test.mmdb" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
149- databaseNodeService .updateDatabase ("GeoLite2-City.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
162+ databaseNodeService .updateDatabase (
163+ projectId ,
164+ "GeoLite2-City.mmdb" ,
165+ "md5" ,
166+ geoIpTmpDir .resolve ("GeoLite2-City.mmdb" )
167+ );
150168 }
151- DatabaseReaderLazyLoader previous2 = databaseNodeService .get ("GeoLite2-City-Test.mmdb" );
169+ DatabaseReaderLazyLoader previous2 = databaseNodeService .get (projectId , "GeoLite2-City-Test.mmdb" );
152170 copyDatabase (
153171 i % 2 == 0 ? "GeoIP2-City-Test.mmdb" : "GeoLite2-City-Test.mmdb" ,
154172 geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" )
155173 );
156- databaseNodeService .updateDatabase ("GeoLite2-City-Test.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
174+ databaseNodeService .updateDatabase (
175+ projectId ,
176+ "GeoLite2-City-Test.mmdb" ,
177+ "md5" ,
178+ geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" )
179+ );
157180
158- DatabaseReaderLazyLoader current1 = databaseNodeService .get ("GeoLite2-City.mmdb" );
159- DatabaseReaderLazyLoader current2 = databaseNodeService .get ("GeoLite2-City-Test.mmdb" );
181+ DatabaseReaderLazyLoader current1 = databaseNodeService .get (projectId , "GeoLite2-City.mmdb" );
182+ DatabaseReaderLazyLoader current2 = databaseNodeService .get (projectId , "GeoLite2-City-Test.mmdb" );
160183 assertThat (current1 , not (sameInstance (previous1 )));
161184 assertThat (current2 , not (sameInstance (previous2 )));
162185
163186 // lazy load type and reader:
164- lazyLoadReaders (databaseNodeService );
187+ lazyLoadReaders (projectId , databaseNodeService );
165188 } catch (Exception | AssertionError e ) {
166189 logger .error ("error in update databases thread after run [" + i + "]" , e );
167190 failureHolder2 .set (e );
@@ -193,8 +216,12 @@ public void test() throws Exception {
193216 IOUtils .rm (geoIpConfigDir , geoIpTmpDir );
194217 }
195218
196- private static DatabaseNodeService createRegistry (Path geoIpConfigDir , Path geoIpTmpDir , ClusterService clusterService )
197- throws IOException {
219+ private static DatabaseNodeService createRegistry (
220+ Path geoIpConfigDir ,
221+ Path geoIpTmpDir ,
222+ ClusterService clusterService ,
223+ ProjectResolver projectResolver
224+ ) throws IOException {
198225 GeoIpCache cache = new GeoIpCache (0 );
199226 ConfigDatabases configDatabases = new ConfigDatabases (geoIpConfigDir , cache );
200227 copyDefaultDatabases (geoIpConfigDir , configDatabases );
@@ -204,19 +231,21 @@ private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoI
204231 cache ,
205232 configDatabases ,
206233 Runnable ::run ,
207- clusterService
234+ clusterService ,
235+ mock (IngestService .class ),
236+ projectResolver
208237 );
209- databaseNodeService .initialize ("nodeId" , mock (ResourceWatcherService .class ), mock ( IngestService . class ) );
238+ databaseNodeService .initialize ("nodeId" , mock (ResourceWatcherService .class ));
210239 return databaseNodeService ;
211240 }
212241
213- private static void lazyLoadReaders (DatabaseNodeService databaseNodeService ) throws IOException {
214- if (databaseNodeService .get ("GeoLite2-City.mmdb" ) != null ) {
215- databaseNodeService .get ("GeoLite2-City.mmdb" ).getDatabaseType ();
216- databaseNodeService .get ("GeoLite2-City.mmdb" ).getResponse ("2.125.160.216" , GeoIpTestUtils ::getCity );
242+ private static void lazyLoadReaders (ProjectId projectId , DatabaseNodeService databaseNodeService ) throws IOException {
243+ if (databaseNodeService .get (projectId , "GeoLite2-City.mmdb" ) != null ) {
244+ databaseNodeService .get (projectId , "GeoLite2-City.mmdb" ).getDatabaseType ();
245+ databaseNodeService .get (projectId , "GeoLite2-City.mmdb" ).getResponse ("2.125.160.216" , GeoIpTestUtils ::getCity );
217246 }
218- databaseNodeService .get ("GeoLite2-City-Test.mmdb" ).getDatabaseType ();
219- databaseNodeService .get ("GeoLite2-City-Test.mmdb" ).getResponse ("2.125.160.216" , GeoIpTestUtils ::getCity );
247+ databaseNodeService .get (projectId , "GeoLite2-City-Test.mmdb" ).getDatabaseType ();
248+ databaseNodeService .get (projectId , "GeoLite2-City-Test.mmdb" ).getResponse ("2.125.160.216" , GeoIpTestUtils ::getCity );
220249 }
221250
222251}
0 commit comments