2424import java .net .ServerSocket ;
2525import java .net .Socket ;
2626import java .nio .ByteBuffer ;
27- import java .nio .channels .AsynchronousFileChannel ;
28- import java .nio .channels .Channel ;
29- import java .nio .channels .CompletionHandler ;
30- import java .nio .channels .FileLock ;
31- import java .nio .channels .OverlappingFileLockException ;
27+ import java .nio .channels .*;
3228import java .nio .file .FileSystems ;
3329import java .nio .file .Files ;
3430import java .nio .file .Path ;
4945import java .util .Objects ;
5046import java .util .Optional ;
5147import java .util .UUID ;
52- import java .util .concurrent .Phaser ;
53- import java .util .concurrent .TimeUnit ;
48+ import java .util .concurrent .*;
5449import java .util .concurrent .atomic .AtomicBoolean ;
50+ import java .util .concurrent .atomic .AtomicInteger ;
5551import java .util .concurrent .locks .Lock ;
5652import java .util .concurrent .locks .ReentrantLock ;
5753
@@ -87,6 +83,8 @@ public class EmbeddedPostgres implements Closeable
8783 private static final String PG_SUPERUSER = "postgres" ;
8884 private static final Duration DEFAULT_PG_STARTUP_WAIT = Duration .ofSeconds (10 );
8985 private static final String LOCK_FILE_NAME = "epg-lock" ;
86+ private static final ThreadPoolExecutor THREAD_POOL = (ThreadPoolExecutor ) Executors .newCachedThreadPool ();
87+ private static final AtomicInteger active = new AtomicInteger (0 );
9088
9189 private final File pgDir ;
9290
@@ -108,6 +106,8 @@ public class EmbeddedPostgres implements Closeable
108106 private final ProcessBuilder .Redirect errorRedirector ;
109107 private final ProcessBuilder .Redirect outputRedirector ;
110108
109+ private Process pgProcess ;
110+
111111 EmbeddedPostgres (File parentDirectory , File dataDirectory , boolean cleanDataDirectory ,
112112 Map <String , String > postgresConfig , Map <String , String > localeConfig , int port , Map <String , String > connectConfig ,
113113 PgBinaryResolver pgBinaryResolver , ProcessBuilder .Redirect errorRedirector , ProcessBuilder .Redirect outputRedirector ) throws IOException
@@ -237,16 +237,19 @@ private void lock() throws IOException
237237 }
238238 }
239239
240- private void initdb ()
241- {
240+ private void initdb () throws IOException {
242241 final StopWatch watch = new StopWatch ();
243242 watch .start ();
244243 List <String > args = new ArrayList <>();
245244 args .addAll (Arrays .asList (
246245 "-A" , "trust" , "-U" , PG_SUPERUSER ,
247246 "-D" , dataDirectory .getPath (), "-E" , "UTF-8" ));
248247 args .addAll (createLocaleOptions ());
249- system (INIT_DB , args );
248+ try {
249+ system (INIT_DB , args , true , true ).exit .get ();
250+ } catch (InterruptedException | ExecutionException e ) {
251+ throw new IOException (e .getMessage ());
252+ }
250253 LOG .info ("{} initdb completed in {}" , instanceId , watch );
251254 }
252255
@@ -262,16 +265,13 @@ private void startPostmaster() throws IOException
262265 args .addAll (Arrays .asList ("-D" , dataDirectory .getPath ()));
263266 args .addAll (createInitOptions ());
264267
265- final ProcessBuilder builder = new ProcessBuilder ();
266- POSTGRES .applyTo (builder , args );
268+ SystemResult result = system (POSTGRES , args );
267269
268- builder .redirectErrorStream (true );
269- builder .redirectError (errorRedirector );
270- builder .redirectOutput (outputRedirector );
271- final Process postmaster = builder .start ();
272-
273- if (outputRedirector .type () == ProcessBuilder .Redirect .Type .PIPE ) {
274- ProcessOutputLogger .logOutput (LOG , postmaster , POSTGRES .processName ());
270+ final Process postmaster ;
271+ try {
272+ postmaster = result .initProcess .get ();
273+ } catch (InterruptedException | ExecutionException e ) {
274+ throw new IOException (e .getMessage ());
275275 }
276276
277277 LOG .info ("{} postmaster started as {} on port {}. Waiting up to {} for server startup to finish." , instanceId , postmaster .toString (), port , pgStartupWait );
@@ -311,7 +311,7 @@ private void waitForServerStartup(StopWatch watch) throws IOException
311311 Throwable lastCause = null ;
312312 final long start = System .nanoTime ();
313313 final long maxWaitNs = TimeUnit .NANOSECONDS .convert (pgStartupWait .toMillis (), TimeUnit .MILLISECONDS );
314- while (System .nanoTime () - start < maxWaitNs ) {
314+ while (System .nanoTime () - start < ( maxWaitNs * Math . max ( THREAD_POOL . getActiveCount (), 1 )) ) {
315315 try {
316316 verifyReady ();
317317 LOG .info ("{} postmaster startup finished in {}" , instanceId , watch );
@@ -328,7 +328,7 @@ private void waitForServerStartup(StopWatch watch) throws IOException
328328 return ;
329329 }
330330 }
331- throw new IOException ("Gave up waiting for server to start after " + pgStartupWait .toMillis () + "ms" , lastCause );
331+ throw new IOException ("Gave up waiting for " + instanceId + " server to start after " + ( pgStartupWait .toMillis () * Math . max ( THREAD_POOL . getActiveCount (), 1 ) ) + "ms" , lastCause );
332332 }
333333
334334 private void verifyReady () throws SQLException
@@ -383,7 +383,8 @@ public void close() throws IOException
383383 final StopWatch watch = new StopWatch ();
384384 watch .start ();
385385 try {
386- pgCtl (dataDirectory , "stop" );
386+ if (pgProcess != null )
387+ pgProcess .destroy ();
387388 LOG .info ("{} shut down postmaster in {}" , instanceId , watch );
388389 } catch (final Exception e ) {
389390 LOG .error ("Could not stop postmaster " + instanceId , e );
@@ -408,15 +409,18 @@ public void close() throws IOException
408409 }
409410 }
410411
411- private void pgCtl (File dir , String action )
412- {
412+ private void pgCtl (File dir , String action ) throws IOException {
413413 final List <String > args = new ArrayList <>();
414414 args .addAll (Arrays .asList (
415415 "-D" , dir .getPath (), action ,
416416 "-m" , PG_STOP_MODE , "-t" ,
417417 PG_STOP_WAIT_S , "-w"
418418 ));
419- system (PG_CTL , args );
419+ try {
420+ system (PG_CTL , args , true ).exit .get ();
421+ } catch (InterruptedException | ExecutionException e ) {
422+ throw new IOException (e .getMessage ());
423+ }
420424 }
421425
422426 private void cleanOldDataDirectories (File parentDirectory )
@@ -610,29 +614,121 @@ public int hashCode() {
610614 }
611615 }
612616
613- private void system (Command command , List <String > args )
617+ private int systemThread (ProcessBuilder builder , Command command , SystemResult result , boolean retry , boolean clean ) {
618+ AtomicInteger exit = new AtomicInteger (-1 );
619+ int retries = 0 ;
620+ while (exit .get () != 0 ) {
621+ final Process [] process = new Process [1 ];
622+ CompletableFuture <Process > initProcess ;
623+ if (retries == 0 ) {
624+ initProcess = result .initProcess ;
625+ } else {
626+ initProcess = new CompletableFuture <>();
627+ }
628+ int lastActive = THREAD_POOL .getActiveCount ();
629+ if (lastActive > active .get ())
630+ active .set (lastActive );
631+ Callable <Process > task = () -> {
632+ try {
633+ process [0 ] = builder .start ();
634+ pgProcess = process [0 ];
635+ result .process = process [0 ];
636+ result .initProcess = initProcess ;
637+ initProcess .complete (process [0 ]);
638+
639+ if (outputRedirector .type () == ProcessBuilder .Redirect .Type .PIPE ) {
640+ ProcessOutputLogger .logOutput (LOG , process [0 ], command .processName ());
641+ }
642+ return process [0 ];
643+ } catch (IOException e ) {
644+ initProcess .completeExceptionally (e );
645+ throw e ;
646+ }
647+ };
648+ Future <Process > thread = THREAD_POOL .submit (task );
649+ if (retry ) {
650+ try {
651+ exit .set (thread .get ().waitFor ());
652+ } catch (InterruptedException | ExecutionException e ) {
653+ e .printStackTrace ();
654+ }
655+ if (0 != exit .get ()) {
656+ LOG .info ("Active threads running {}" , THREAD_POOL .getActiveCount ());
657+ int currentActive = THREAD_POOL .getActiveCount ();
658+ if (currentActive >= lastActive )
659+ lastActive = currentActive ;
660+ if (lastActive > active .get ())
661+ active .set (lastActive );
662+ if (lastActive >= active .get () - 1 && active .get () > 0 && THREAD_POOL .getActiveCount () <= THREAD_POOL .getPoolSize ()) {
663+ THREAD_POOL .setMaximumPoolSize (active .decrementAndGet ());
664+ LOG .info ("Reduced thread pool size to {}" , active .get ());
665+ }
666+ retries ++;
667+ if (clean ) {
668+ try {
669+ FileUtils .cleanDirectory (dataDirectory );
670+ } catch (IOException e ) {
671+ LOG .error ("Could not clean up directory {} for retry" , dataDirectory .getAbsolutePath ());
672+ result .initProcess .completeExceptionally (e );
673+ result .exit .completeExceptionally (e );
674+ break ;
675+ }
676+ }
677+ }
678+ } else {
679+ try {
680+ thread .wait ();
681+ } catch (InterruptedException e ) {
682+ result .initProcess .completeExceptionally (new IOException ());
683+ result .exit .completeExceptionally (new IOException ("Failed to execute: " + command .processName ()));
684+ e .printStackTrace ();
685+ break ;
686+ }
687+ }
688+
689+ if (retries >= 10 ) {
690+ result .initProcess .completeExceptionally (new IOException ());
691+ result .exit .completeExceptionally (new IOException ("Failed to execute: " + command .processName () + ", too many failures." ));
692+ break ;
693+ }
694+
695+ if (!retry )
696+ break ;
697+ }
698+ return exit .get ();
699+ }
700+
701+ private SystemResult system (Command command , List <String > args , boolean retry , boolean clean )
614702 {
615- try {
616- final ProcessBuilder builder = new ProcessBuilder ();
703+ final ProcessBuilder builder = new ProcessBuilder ();
617704
618- command .applyTo (builder , args );
619- builder .redirectErrorStream (true );
620- builder .redirectError (errorRedirector );
621- builder .redirectOutput (outputRedirector );
705+ command .applyTo (builder , args );
706+ builder .redirectErrorStream (true );
707+ builder .redirectError (errorRedirector );
708+ builder .redirectOutput (outputRedirector );
622709
623- final Process process = builder .start ();
710+ SystemResult result = new SystemResult ();
711+ result .initProcess = new CompletableFuture <>();
712+ result .builder = builder ;
713+ result .exit = CompletableFuture .supplyAsync (() -> systemThread (builder , command , result , retry , clean ));
714+ return result ;
715+ }
624716
625- if (outputRedirector .type () == ProcessBuilder .Redirect .Type .PIPE ) {
626- ProcessOutputLogger .logOutput (LOG , process , command .processName ());
627- }
628- if (0 != process .waitFor ()) {
629- throw new IllegalStateException (String .format ("Process %s failed" , builder .command ()));
630- }
631- } catch (final RuntimeException e ) { // NOPMD
632- throw e ;
633- } catch (final Exception e ) {
634- throw new RuntimeException (e );
635- }
717+ private SystemResult system (Command command , List <String > args , boolean retry )
718+ {
719+ return system (command , args , retry , false );
720+ }
721+
722+ private SystemResult system (Command command , List <String > args )
723+ {
724+ return system (command , args , false , false );
725+ }
726+
727+ private class SystemResult {
728+ ProcessBuilder builder ;
729+ CompletableFuture <Process > initProcess ;
730+ Process process ;
731+ CompletableFuture <Integer > exit ;
636732 }
637733
638734 private static void mkdirs (File dir )
0 commit comments