1111
1212import org .apache .lucene .tests .util .LuceneTestCase ;
1313import org .elasticsearch .client .internal .Client ;
14+ import org .elasticsearch .cluster .ClusterName ;
1415import org .elasticsearch .cluster .ClusterState ;
16+ import org .elasticsearch .cluster .metadata .ProjectId ;
17+ import org .elasticsearch .cluster .metadata .ProjectMetadata ;
18+ import org .elasticsearch .cluster .project .ProjectResolver ;
19+ import org .elasticsearch .cluster .project .TestProjectResolvers ;
1520import org .elasticsearch .cluster .service .ClusterService ;
1621import org .elasticsearch .common .util .concurrent .AtomicArray ;
1722import org .elasticsearch .core .IOUtils ;
3237import java .util .concurrent .atomic .AtomicInteger ;
3338import java .util .concurrent .atomic .AtomicReference ;
3439
40+ import static org .elasticsearch .cluster .ClusterState .builder ;
3541import static org .elasticsearch .ingest .geoip .GeoIpProcessor .GEOIP_TYPE ;
3642import static org .elasticsearch .ingest .geoip .GeoIpTestUtils .copyDatabase ;
3743import static org .elasticsearch .ingest .geoip .GeoIpTestUtils .copyDefaultDatabases ;
@@ -62,31 +68,36 @@ public class ReloadingDatabasesWhilePerformingGeoLookupsIT extends ESTestCase {
6268 * geoip processor instance is using the related {@link DatabaseReaderLazyLoader} instance
6369 */
6470 public void test () throws Exception {
71+ ProjectId projectId = randomProjectIdOrDefault ();
6572 Path geoIpConfigDir = createTempDir ();
6673 Path geoIpTmpDir = createTempDir ();
6774 ClusterService clusterService = mock (ClusterService .class );
68- when (clusterService .state ()).thenReturn (ClusterState .EMPTY_STATE );
69- DatabaseNodeService databaseNodeService = createRegistry (geoIpConfigDir , geoIpTmpDir , clusterService );
75+ when (clusterService .state ()).thenReturn (builder (ClusterName .DEFAULT )
76+ .putProjectMetadata (ProjectMetadata .builder (projectId ).build ())
77+ .build ()
78+ );
79+ DatabaseNodeService databaseNodeService = createRegistry (geoIpConfigDir , geoIpTmpDir , clusterService ,
80+ TestProjectResolvers .singleProject (projectId ));
7081 GeoIpProcessor .Factory factory = new GeoIpProcessor .Factory (GEOIP_TYPE , databaseNodeService );
7182 copyDatabase ("GeoLite2-City-Test.mmdb" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
7283 copyDatabase ("GeoLite2-City-Test.mmdb" , geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
73- databaseNodeService .updateDatabase ("GeoLite2-City.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
74- databaseNodeService .updateDatabase ("GeoLite2-City-Test.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
75- lazyLoadReaders (databaseNodeService );
84+ databaseNodeService .updateDatabase (projectId , "GeoLite2-City.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
85+ databaseNodeService .updateDatabase (projectId , "GeoLite2-City-Test.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
86+ lazyLoadReaders (projectId , databaseNodeService );
7687
7788 final GeoIpProcessor processor1 = (GeoIpProcessor ) factory .create (
7889 null ,
7990 "_tag" ,
8091 null ,
8192 new HashMap <>(Map .of ("field" , "_field" )),
82- null
93+ projectId
8394 );
8495 final GeoIpProcessor processor2 = (GeoIpProcessor ) factory .create (
8596 null ,
8697 "_tag" ,
8798 null ,
8899 new HashMap <>(Map .of ("field" , "_field" , "database_file" , "GeoLite2-City-Test.mmdb" )),
89- null
100+ projectId
90101 );
91102
92103 final AtomicBoolean completed = new AtomicBoolean (false );
@@ -134,9 +145,9 @@ public void test() throws Exception {
134145 Thread updateDatabaseThread = new Thread (() -> {
135146 for (int i = 0 ; i < numberOfDatabaseUpdates ; i ++) {
136147 try {
137- DatabaseReaderLazyLoader previous1 = databaseNodeService .get ("GeoLite2-City.mmdb" );
148+ DatabaseReaderLazyLoader previous1 = databaseNodeService .get (projectId , "GeoLite2-City.mmdb" );
138149 if (Files .exists (geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ))) {
139- databaseNodeService .removeStaleEntries (List .of ("GeoLite2-City.mmdb" ));
150+ databaseNodeService .removeStaleEntries (projectId , List .of ("GeoLite2-City.mmdb" ));
140151 assertBusy (() -> {
141152 // lazy loader may still be in use by an ingest thread,
142153 // wait for any potential ingest thread to release the lazy loader (DatabaseReaderLazyLoader#postLookup(...)),
@@ -146,22 +157,24 @@ public void test() throws Exception {
146157 });
147158 } else {
148159 copyDatabase ("GeoLite2-City-Test.mmdb" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
149- databaseNodeService .updateDatabase ("GeoLite2-City.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
160+ databaseNodeService .updateDatabase (projectId ,"GeoLite2-City.mmdb" , "md5" ,
161+ geoIpTmpDir .resolve ("GeoLite2-City.mmdb" ));
150162 }
151- DatabaseReaderLazyLoader previous2 = databaseNodeService .get ("GeoLite2-City-Test.mmdb" );
163+ DatabaseReaderLazyLoader previous2 = databaseNodeService .get (projectId , "GeoLite2-City-Test.mmdb" );
152164 copyDatabase (
153165 i % 2 == 0 ? "GeoIP2-City-Test.mmdb" : "GeoLite2-City-Test.mmdb" ,
154166 geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" )
155167 );
156- databaseNodeService .updateDatabase ("GeoLite2-City-Test.mmdb" , "md5" , geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
168+ databaseNodeService .updateDatabase (projectId ,"GeoLite2-City-Test.mmdb" , "md5" ,
169+ geoIpTmpDir .resolve ("GeoLite2-City-Test.mmdb" ));
157170
158- DatabaseReaderLazyLoader current1 = databaseNodeService .get ("GeoLite2-City.mmdb" );
159- DatabaseReaderLazyLoader current2 = databaseNodeService .get ("GeoLite2-City-Test.mmdb" );
171+ DatabaseReaderLazyLoader current1 = databaseNodeService .get (projectId , "GeoLite2-City.mmdb" );
172+ DatabaseReaderLazyLoader current2 = databaseNodeService .get (projectId , "GeoLite2-City-Test.mmdb" );
160173 assertThat (current1 , not (sameInstance (previous1 )));
161174 assertThat (current2 , not (sameInstance (previous2 )));
162175
163176 // lazy load type and reader:
164- lazyLoadReaders (databaseNodeService );
177+ lazyLoadReaders (projectId , databaseNodeService );
165178 } catch (Exception | AssertionError e ) {
166179 logger .error ("error in update databases thread after run [" + i + "]" , e );
167180 failureHolder2 .set (e );
@@ -193,7 +206,8 @@ public void test() throws Exception {
193206 IOUtils .rm (geoIpConfigDir , geoIpTmpDir );
194207 }
195208
196- private static DatabaseNodeService createRegistry (Path geoIpConfigDir , Path geoIpTmpDir , ClusterService clusterService )
209+ private static DatabaseNodeService createRegistry (Path geoIpConfigDir , Path geoIpTmpDir , ClusterService clusterService ,
210+ ProjectResolver projectResolver )
197211 throws IOException {
198212 GeoIpCache cache = new GeoIpCache (0 );
199213 ConfigDatabases configDatabases = new ConfigDatabases (geoIpConfigDir , cache );
@@ -206,17 +220,17 @@ private static DatabaseNodeService createRegistry(Path geoIpConfigDir, Path geoI
206220 Runnable ::run ,
207221 clusterService
208222 );
209- databaseNodeService .initialize ("nodeId" , mock (ResourceWatcherService .class ), mock (IngestService .class ));
223+ databaseNodeService .initialize ("nodeId" , mock (ResourceWatcherService .class ), mock (IngestService .class ), projectResolver );
210224 return databaseNodeService ;
211225 }
212226
213- private static void lazyLoadReaders (DatabaseNodeService databaseNodeService ) throws IOException {
214- if (databaseNodeService .get ("GeoLite2-City.mmdb" ) != null ) {
215- databaseNodeService .get ("GeoLite2-City.mmdb" ).getDatabaseType ();
216- databaseNodeService .get ("GeoLite2-City.mmdb" ).getResponse ("2.125.160.216" , GeoIpTestUtils ::getCity );
227+ private static void lazyLoadReaders (ProjectId projectId , DatabaseNodeService databaseNodeService ) throws IOException {
228+ if (databaseNodeService .get (projectId , "GeoLite2-City.mmdb" ) != null ) {
229+ databaseNodeService .get (projectId , "GeoLite2-City.mmdb" ).getDatabaseType ();
230+ databaseNodeService .get (projectId , "GeoLite2-City.mmdb" ).getResponse ("2.125.160.216" , GeoIpTestUtils ::getCity );
217231 }
218- databaseNodeService .get ("GeoLite2-City-Test.mmdb" ).getDatabaseType ();
219- databaseNodeService .get ("GeoLite2-City-Test.mmdb" ).getResponse ("2.125.160.216" , GeoIpTestUtils ::getCity );
232+ databaseNodeService .get (projectId , "GeoLite2-City-Test.mmdb" ).getDatabaseType ();
233+ databaseNodeService .get (projectId , "GeoLite2-City-Test.mmdb" ).getResponse ("2.125.160.216" , GeoIpTestUtils ::getCity );
220234 }
221235
222236}
0 commit comments