1414package io .zonky .test .db .postgres .embedded ;
1515
1616
17+ import java .io .ByteArrayInputStream ;
1718import java .io .Closeable ;
1819import java .io .File ;
1920import java .io .FileOutputStream ;
2021import java .io .IOException ;
2122import java .io .InputStream ;
22- import java .io .OutputStream ;
2323import java .net .InetAddress ;
2424import java .net .InetSocketAddress ;
2525import java .net .ServerSocket ;
2626import java .net .Socket ;
2727import java .net .SocketException ;
2828import java .net .UnknownHostException ;
29+ import java .nio .ByteBuffer ;
30+ import java .nio .channels .AsynchronousFileChannel ;
31+ import java .nio .channels .Channel ;
32+ import java .nio .channels .CompletionHandler ;
2933import java .nio .channels .FileLock ;
3034import java .nio .channels .OverlappingFileLockException ;
3135import java .nio .file .FileSystems ;
3236import java .nio .file .Files ;
3337import java .nio .file .Path ;
34- import java .nio .file .Paths ;
3538import java .security .DigestInputStream ;
3639import java .security .MessageDigest ;
3740import java .security .NoSuchAlgorithmException ;
4952import java .util .Map .Entry ;
5053import java .util .Objects ;
5154import java .util .UUID ;
55+ import java .util .concurrent .Phaser ;
5256import java .util .concurrent .TimeUnit ;
5357import java .util .concurrent .atomic .AtomicBoolean ;
5458import java .util .concurrent .atomic .AtomicReference ;
6367import org .apache .commons .compress .archivers .tar .TarArchiveInputStream ;
6468import org .apache .commons .io .FileUtils ;
6569import org .apache .commons .io .IOUtils ;
70+ import org .apache .commons .io .output .ByteArrayOutputStream ;
6671import org .apache .commons .lang3 .SystemUtils ;
6772import org .apache .commons .lang3 .time .StopWatch ;
6873import org .postgresql .ds .PGSimpleDataSource ;
6974import org .slf4j .Logger ;
7075import org .slf4j .LoggerFactory ;
7176import org .tukaani .xz .XZInputStream ;
7277
78+ import static java .nio .file .StandardOpenOption .CREATE_NEW ;
79+ import static java .nio .file .StandardOpenOption .WRITE ;
80+
7381@ SuppressWarnings ("PMD.AvoidDuplicateLiterals" ) // "postgres"
7482public class EmbeddedPostgres implements Closeable
7583{
@@ -639,16 +647,15 @@ private static String getArchitecture()
639647 * Unpack archive compressed by tar with xz compression. By default system tar is used (faster). If not found, then the
640648 * java implementation takes place.
641649 *
642- * @param tbzPath The archive path .
650+ * @param stream A stream with the postgres binaries .
643651 * @param targetDir The directory to extract the content to.
644652 */
645- private static void extractTxz (String tbzPath , String targetDir ) throws IOException
646- {
653+ private static void extractTxz (InputStream stream , String targetDir ) throws IOException {
647654 try (
648- InputStream in = Files .newInputStream (Paths .get (tbzPath ));
649- XZInputStream xzIn = new XZInputStream (in );
655+ XZInputStream xzIn = new XZInputStream (stream );
650656 TarArchiveInputStream tarIn = new TarArchiveInputStream (xzIn )
651657 ) {
658+ final Phaser phaser = new Phaser (1 );
652659 TarArchiveEntry entry ;
653660
654661 while ((entry = tarIn .getNextTarEntry ()) != null ) {
@@ -665,9 +672,33 @@ private static void extractTxz(String tbzPath, String targetDir) throws IOExcept
665672 throw new IllegalStateException ("could not read " + individualFile );
666673 }
667674 mkdirs (fsObject .getParentFile ());
668- try (OutputStream outputFile = new FileOutputStream (fsObject )) {
669- IOUtils .write (content , outputFile );
670- }
675+
676+ final AsynchronousFileChannel fileChannel = AsynchronousFileChannel .open (fsObject .toPath (), CREATE_NEW , WRITE );
677+ final ByteBuffer buffer = ByteBuffer .wrap (content );
678+
679+ phaser .register ();
680+ fileChannel .write (buffer , 0 , fileChannel , new CompletionHandler <Integer , Channel >() {
681+ @ Override
682+ public void completed (Integer written , Channel channel ) {
683+ closeChannel (channel );
684+ }
685+
686+ @ Override
687+ public void failed (Throwable error , Channel channel ) {
688+ LOG .error ("Could not write file {}" , fsObject .getAbsolutePath (), error );
689+ closeChannel (channel );
690+ }
691+
692+ private void closeChannel (Channel channel ) {
693+ try {
694+ channel .close ();
695+ } catch (IOException e ) {
696+ LOG .error ("Unexpected error while closing the channel" , e );
697+ } finally {
698+ phaser .arriveAndDeregister ();
699+ }
700+ }
701+ });
671702 } else if (entry .isDirectory ()) {
672703 mkdirs (fsObject );
673704 } else {
@@ -680,6 +711,8 @@ private static void extractTxz(String tbzPath, String targetDir) throws IOExcept
680711 fsObject .setExecutable (true );
681712 }
682713 }
714+
715+ phaser .arriveAndAwaitAdvance ();
683716 }
684717 }
685718
@@ -696,10 +729,8 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
696729
697730 LOG .info ("Detected a {} {} system" , system , machineHardware );
698731 File pgDir ;
699- File pgTbz ;
700732 final InputStream pgBinary ;
701733 try {
702- pgTbz = File .createTempFile ("pgpg" , "pgpg" );
703734 pgBinary = pgBinaryResolver .getPgBinary (system , machineHardware );
704735 } catch (final IOException e ) {
705736 throw new ExceptionInInitializerError (e );
@@ -709,16 +740,12 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
709740 throw new IllegalStateException ("No Postgres binary found for " + system + " / " + machineHardware );
710741 }
711742
712- try (DigestInputStream pgArchiveData = new DigestInputStream (
713- pgBinary , MessageDigest .getInstance ("MD5" ));
714- FileOutputStream os = new FileOutputStream (pgTbz ))
715- {
716- IOUtils .copy (pgArchiveData , os );
743+ try (DigestInputStream pgArchiveData = new DigestInputStream (pgBinary , MessageDigest .getInstance ("MD5" ));
744+ ByteArrayOutputStream baos = new ByteArrayOutputStream ()) {
745+ IOUtils .copy (pgArchiveData , baos );
717746 pgArchiveData .close ();
718- os .close ();
719747
720748 String pgDigest = Hex .encodeHexString (pgArchiveData .getMessageDigest ().digest ());
721-
722749 pgDir = new File (getWorkingDirectory (), String .format ("PG-%s" , pgDigest ));
723750
724751 mkdirs (pgDir );
@@ -727,14 +754,16 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
727754
728755 if (!pgDirExists .exists ()) {
729756 try (FileOutputStream lockStream = new FileOutputStream (unpackLockFile );
730- FileLock unpackLock = lockStream .getChannel ().tryLock ()) {
757+ FileLock unpackLock = lockStream .getChannel ().tryLock ()) {
731758 if (unpackLock != null ) {
732759 try {
733760 if (pgDirExists .exists ()) {
734761 throw new IllegalStateException ("unpack lock acquired but .exists file is present " + pgDirExists );
735762 }
736763 LOG .info ("Extracting Postgres..." );
737- extractTxz (pgTbz .getPath (), pgDir .getPath ());
764+ try (ByteArrayInputStream bais = new ByteArrayInputStream (baos .toByteArray ())) {
765+ extractTxz (bais , pgDir .getPath ());
766+ }
738767 if (!pgDirExists .createNewFile ()) {
739768 throw new IllegalStateException ("couldn't make .exists file " + pgDirExists );
740769 }
@@ -762,10 +791,6 @@ private static File prepareBinaries(PgBinaryResolver pgBinaryResolver)
762791 } catch (final InterruptedException ie ) {
763792 Thread .currentThread ().interrupt ();
764793 throw new ExceptionInInitializerError (ie );
765- } finally {
766- if (!pgTbz .delete ()) {
767- LOG .warn ("could not delete {}" , pgTbz );
768- }
769794 }
770795 BINARY_DIR .set (pgDir );
771796 LOG .info ("Postgres binaries at {}" , pgDir );
0 commit comments