1414package io .zonky .test .db .postgres .embedded ;
1515
1616
17+ import java .io .ByteArrayInputStream ;
1718import java .io .Closeable ;
1819import java .io .File ;
1920import java .io .FileOutputStream ;
2021import java .io .IOException ;
2122import java .io .InputStream ;
22- import java .io .OutputStream ;
2323import java .net .InetAddress ;
2424import java .net .InetSocketAddress ;
2525import java .net .ServerSocket ;
2626import java .net .Socket ;
2727import java .net .SocketException ;
2828import java .net .UnknownHostException ;
29+ import java .nio .ByteBuffer ;
30+ import java .nio .channels .AsynchronousFileChannel ;
31+ import java .nio .channels .Channel ;
32+ import java .nio .channels .CompletionHandler ;
2933import java .nio .channels .FileLock ;
3034import java .nio .channels .OverlappingFileLockException ;
3135import java .nio .file .FileSystems ;
3236import java .nio .file .Files ;
3337import java .nio .file .Path ;
34- import java .nio .file .Paths ;
3538import java .security .DigestInputStream ;
3639import java .security .MessageDigest ;
3740import java .security .NoSuchAlgorithmException ;
4952import java .util .Map .Entry ;
5053import java .util .Objects ;
5154import java .util .UUID ;
55+ import java .util .concurrent .Phaser ;
5256import java .util .concurrent .TimeUnit ;
5357import java .util .concurrent .atomic .AtomicBoolean ;
5458import java .util .concurrent .atomic .AtomicReference ;
6367import org .apache .commons .compress .archivers .tar .TarArchiveInputStream ;
6468import org .apache .commons .io .FileUtils ;
6569import org .apache .commons .io .IOUtils ;
70+ import org .apache .commons .io .output .ByteArrayOutputStream ;
6671import org .apache .commons .lang3 .SystemUtils ;
6772import org .apache .commons .lang3 .time .StopWatch ;
6873import org .postgresql .ds .PGSimpleDataSource ;
6974import org .slf4j .Logger ;
7075import org .slf4j .LoggerFactory ;
7176import org .tukaani .xz .XZInputStream ;
7277
78+ import static java .nio .file .StandardOpenOption .CREATE_NEW ;
79+ import static java .nio .file .StandardOpenOption .WRITE ;
80+
7381@ SuppressWarnings ("PMD.AvoidDuplicateLiterals" ) // "postgres"
7482public class EmbeddedPostgres implements Closeable
7583{
@@ -637,16 +645,15 @@ private static String getArchitecture()
637645 * Unpack archive compressed by tar with xz compression. By default system tar is used (faster). If not found, then the
638646 * java implementation takes place.
639647 *
640- * @param tbzPath The archive path .
648+ * @param stream A stream with the postgres binaries .
641649 * @param targetDir The directory to extract the content to.
642650 */
643- private static void extractTxz (String tbzPath , String targetDir ) throws IOException
644- {
651+ private static void extractTxz (InputStream stream , String targetDir ) throws IOException {
645652 try (
646- InputStream in = Files .newInputStream (Paths .get (tbzPath ));
647- XZInputStream xzIn = new XZInputStream (in );
653+ XZInputStream xzIn = new XZInputStream (stream );
648654 TarArchiveInputStream tarIn = new TarArchiveInputStream (xzIn )
649655 ) {
656+ final Phaser phaser = new Phaser (1 );
650657 TarArchiveEntry entry ;
651658
652659 while ((entry = tarIn .getNextTarEntry ()) != null ) {
@@ -663,9 +670,33 @@ private static void extractTxz(String tbzPath, String targetDir) throws IOExcept
663670 throw new IllegalStateException ("could not read " + individualFile );
664671 }
665672 mkdirs (fsObject .getParentFile ());
666- try (OutputStream outputFile = new FileOutputStream (fsObject )) {
667- IOUtils .write (content , outputFile );
668- }
673+
674+ final AsynchronousFileChannel fileChannel = AsynchronousFileChannel .open (fsObject .toPath (), CREATE_NEW , WRITE );
675+ final ByteBuffer buffer = ByteBuffer .wrap (content );
676+
677+ phaser .register ();
678+ fileChannel .write (buffer , 0 , fileChannel , new CompletionHandler <Integer , Channel >() {
679+ @ Override
680+ public void completed (Integer written , Channel channel ) {
681+ closeChannel (channel );
682+ }
683+
684+ @ Override
685+ public void failed (Throwable error , Channel channel ) {
686+ LOG .error ("Could not write file {}" , fsObject .getAbsolutePath (), error );
687+ closeChannel (channel );
688+ }
689+
690+ private void closeChannel (Channel channel ) {
691+ try {
692+ channel .close ();
693+ } catch (IOException e ) {
694+ LOG .error ("Unexpected error while closing the channel" , e );
695+ } finally {
696+ phaser .arriveAndDeregister ();
697+ }
698+ }
699+ });
669700 } else if (entry .isDirectory ()) {
670701 mkdirs (fsObject );
671702 } else {
@@ -678,6 +709,8 @@ private static void extractTxz(String tbzPath, String targetDir) throws IOExcept
678709 fsObject .setExecutable (true );
679710 }
680711 }
712+
713+ phaser .arriveAndAwaitAdvance ();
681714 }
682715 }
683716
@@ -694,10 +727,8 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
694727
695728 LOG .info ("Detected a {} {} system" , system , machineHardware );
696729 File pgDir ;
697- File pgTbz ;
698730 final InputStream pgBinary ;
699731 try {
700- pgTbz = File .createTempFile ("pgpg" , "pgpg" );
701732 pgBinary = pgBinaryResolver .getPgBinary (system , machineHardware );
702733 } catch (final IOException e ) {
703734 throw new ExceptionInInitializerError (e );
@@ -707,16 +738,12 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
707738 throw new IllegalStateException ("No Postgres binary found for " + system + " / " + machineHardware );
708739 }
709740
710- try (DigestInputStream pgArchiveData = new DigestInputStream (
711- pgBinary , MessageDigest .getInstance ("MD5" ));
712- FileOutputStream os = new FileOutputStream (pgTbz ))
713- {
714- IOUtils .copy (pgArchiveData , os );
741+ try (DigestInputStream pgArchiveData = new DigestInputStream (pgBinary , MessageDigest .getInstance ("MD5" ));
742+ ByteArrayOutputStream baos = new ByteArrayOutputStream ()) {
743+ IOUtils .copy (pgArchiveData , baos );
715744 pgArchiveData .close ();
716- os .close ();
717745
718746 String pgDigest = Hex .encodeHexString (pgArchiveData .getMessageDigest ().digest ());
719-
720747 pgDir = new File (getWorkingDirectory (), String .format ("PG-%s" , pgDigest ));
721748
722749 mkdirs (pgDir );
@@ -725,14 +752,16 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
725752
726753 if (!pgDirExists .exists ()) {
727754 try (FileOutputStream lockStream = new FileOutputStream (unpackLockFile );
728- FileLock unpackLock = lockStream .getChannel ().tryLock ()) {
755+ FileLock unpackLock = lockStream .getChannel ().tryLock ()) {
729756 if (unpackLock != null ) {
730757 try {
731758 if (pgDirExists .exists ()) {
732759 throw new IllegalStateException ("unpack lock acquired but .exists file is present " + pgDirExists );
733760 }
734761 LOG .info ("Extracting Postgres..." );
735- extractTxz (pgTbz .getPath (), pgDir .getPath ());
762+ try (ByteArrayInputStream bais = new ByteArrayInputStream (baos .toByteArray ())) {
763+ extractTxz (bais , pgDir .getPath ());
764+ }
736765 if (!pgDirExists .createNewFile ()) {
737766 throw new IllegalStateException ("couldn't make .exists file " + pgDirExists );
738767 }
@@ -760,10 +789,6 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
760789 } catch (final InterruptedException ie ) {
761790 Thread .currentThread ().interrupt ();
762791 throw new ExceptionInInitializerError (ie );
763- } finally {
764- if (!pgTbz .delete ()) {
765- LOG .warn ("could not delete {}" , pgTbz );
766- }
767792 }
768793 BINARY_DIR .set (pgDir );
769794 LOG .info ("Postgres binaries at {}" , pgDir );
0 commit comments