2424import ch .cyberduck .core .features .Read ;
2525import ch .cyberduck .core .io .BandwidthThrottle ;
2626import ch .cyberduck .core .io .StreamListener ;
27+ import ch .cyberduck .core .preferences .HostPreferencesFactory ;
28+ import ch .cyberduck .core .preferences .PreferencesReader ;
2729import ch .cyberduck .core .transfer .TransferStatus ;
2830
2931import org .irods .irods4j .high_level .connection .IRODSConnectionPool ;
3032import org .irods .irods4j .high_level .connection .IRODSConnectionPool .PoolConnection ;
31- import org .irods .irods4j .high_level .connection .QualifiedUsername ;
3233import org .irods .irods4j .high_level .io .IRODSDataObjectInputStream ;
3334import org .irods .irods4j .high_level .vfs .IRODSFilesystem ;
34- import org .irods .irods4j .low_level .api .IRODSApi ;
3535import org .irods .irods4j .low_level .api .IRODSApi .RcComm ;
3636
37- import java .io .BufferedOutputStream ;
3837import java .io .FileOutputStream ;
39- import java .io .RandomAccessFile ;
38+ import java .io .OutputStream ;
4039import java .nio .file .Files ;
4140import java .nio .file .Paths ;
41+ import java .nio .file .StandardOpenOption ;
4242import java .util .ArrayList ;
4343import java .util .List ;
4444import java .util .concurrent .ExecutorService ;
4949public class IRODSDownloadFeature implements Download {
5050
5151 private final IRODSSession session ;
52- private final int numThread = 3 ; // TODO Make this configurable
53- private static final int BUFFER_SIZE = 4 * 1024 * 1024 ; // TODO Make configurable
5452
5553 public IRODSDownloadFeature (final IRODSSession session ) {
5654 this .session = session ;
@@ -61,6 +59,8 @@ public void download(final Path file, final Local local, final BandwidthThrottle
6159 final StreamListener listener , final TransferStatus status ,
6260 final ConnectionCallback callback ) throws BackgroundException {
6361 try {
62+ final PreferencesReader preferences = HostPreferencesFactory .get (session .getHost ());
63+
6464 final RcComm primaryConn = session .getClient ().getRcComm ();
6565 final String logicalPath = file .getAbsolute ();
6666
@@ -72,9 +72,8 @@ public void download(final Path file, final Local local, final BandwidthThrottle
7272
7373 // Transfer the bytes over multiple connections if the size of the data object
7474 // exceeds a certain threshold - e.g. 32MB.
75- // TODO Consider making this configurable.
76- if (dataObjectSize < 32 * 1024 * 1024 ) {
77- byte [] buffer = new byte [4 * 1024 * 1024 ];
75+ if (dataObjectSize < 32 * 1024 * 1024 ) { //preferences.getInteger("irods.parallel_transfer.size_threshold")) {
76+ byte [] buffer = new byte [4 * 1024 * 1024 ]; //preferences.getInteger("irods.parallel_transfer.rbuffer_size")];
7877
7978 try (IRODSDataObjectInputStream in = new IRODSDataObjectInputStream (primaryConn , logicalPath );
8079 FileOutputStream out = new FileOutputStream (local .getAbsolute ())) {
@@ -92,58 +91,77 @@ public void download(final Path file, final Local local, final BandwidthThrottle
9291 // The data object is larger than the threshold so use parallel transfer.
9392 //
9493
95- // Step 1: Get replica token & number via primary stream
96- try (IRODSDataObjectInputStream primary = new IRODSDataObjectInputStream (primaryConn , logicalPath )) {
97- final String replicaToken = primary .getReplicaToken ();
98- final long replicaNumber = primary .getReplicaNumber ();
99-
100- // Step 2: Setup connection pool
101- try (final IRODSConnectionPool pool = new IRODSConnectionPool (numThread )) {
102- pool .start (
103- session .getHost ().getHostname (),
104- session .getHost ().getPort (),
105- new QualifiedUsername (session .getHost ().getCredentials ().getUsername (), session .getRegion ()),
106- conn -> {
107- try {
108- // TODO Needs to take the value of the Authorization profile value.
109- IRODSApi .rcAuthenticateClient (conn , "native" , session .getHost ().getCredentials ().getPassword ());
110- return true ;
111- }
112- catch (Exception e ) {
113- return false ;
114- }
115- });
116-
117- final long chunkSize = dataObjectSize / numThread ;
118- final long remainChunkSize = dataObjectSize % numThread ;
119-
120- // Step 3: Create empty target file
121- try (RandomAccessFile out = new RandomAccessFile (local .getAbsolute (), "rw" )) {
122- out .setLength (dataObjectSize );
123- }
94+ // TODO Clamp the value so that users do not specify something ridiculous.
95+ final int threadCount = 3 ; //preferences.getInteger("irods.parallel_transfer.thread_count");
96+ final ExecutorService executor = Executors .newFixedThreadPool (threadCount );
97+
98+ final long chunkSize = dataObjectSize / threadCount ;
99+ final long remainingBytes = dataObjectSize % threadCount ;
100+
101+ final List <IRODSDataObjectInputStream > secondaryIrodsStreams = new ArrayList <>();
102+ final List <OutputStream > localFileStreams = new ArrayList <>();
103+
104+ // Open the primary iRODS input stream.
105+ // TODO Needs to pass the target resource name if provided.
106+ try (IRODSDataObjectInputStream primaryStream = new IRODSDataObjectInputStream (primaryConn , logicalPath )) {
107+ // Initialize connections for secondary streams.
108+ try (IRODSConnectionPool pool = new IRODSConnectionPool (threadCount - 1 )) {
109+ IRODSConnectionUtils .startIRODSConnectionPool (session , pool );
124110
125- // Step 4: Parallel readers
126- final ExecutorService executor = Executors . newFixedThreadPool ( numThread );
111+ // Holds handles to tasks running on the thread pool. This allows us to wait for
112+ // all tasks to complete before shutting down everything.
127113 List <Future <?>> tasks = new ArrayList <>();
128- for (int i = 0 ; i < numThread ; i ++) {
129- final PoolConnection conn = pool .getConnection ();
130- tasks .add (executor .submit (new IRODSChunkWorker (
131- conn ,
132- new IRODSDataObjectInputStream (conn .getRcComm (), replicaToken , replicaNumber ),
133- new FileOutputStream (local .getAbsolute ()),
134- i * chunkSize ,
135- (numThread - 1 == i ) ? chunkSize + remainChunkSize : chunkSize ,
136- BUFFER_SIZE
137- )));
138- }
139114
140- for (Future <?> task : tasks ) {
141- task .get ();
142- }
115+ // Open the first output stream for the local file and store it for processing.
116+ // This also guarantees the target file exists and is empty (i.e. truncated to zero
117+ // if it exists).
118+ final java .nio .file .Path localFilePath = Paths .get (local .getAbsolute ());
119+ localFileStreams .add (Files .newOutputStream (localFilePath , StandardOpenOption .CREATE , StandardOpenOption .TRUNCATE_EXISTING ));
120+
121+ // Launch the first IO task.
122+ tasks .add (executor .submit (new IRODSChunkWorker (
123+ primaryStream ,
124+ localFileStreams .get (0 ),
125+ 0 ,
126+ chunkSize ,
127+ 4 * 1024 * 1024 //preferences.getInteger("irods.parallel_transfer.rbuffer_size")
128+ )));
129+
130+ try {
131+ // Launch remaining IO tasks.
132+ for (int i = 1 ; i < threadCount ; ++i ) {
133+ PoolConnection conn = pool .getConnection ();
134+ secondaryIrodsStreams .add (new IRODSDataObjectInputStream (conn .getRcComm (), logicalPath ));
135+ localFileStreams .add (Files .newOutputStream (localFilePath , StandardOpenOption .WRITE ));
136+ tasks .add (executor .submit (new IRODSChunkWorker (
137+ secondaryIrodsStreams .get (secondaryIrodsStreams .size () - 1 ),
138+ localFileStreams .get (localFileStreams .size () - 1 ),
139+ i * chunkSize ,
140+ (threadCount - 1 == i ) ? chunkSize + remainingBytes : chunkSize ,
141+ 4 * 1024 * 1024 //preferences.getInteger("irods.parallel_transfer.rbuffer_size")
142+ )));
143+ }
143144
144- executor .shutdown ();
145+ // Wait for all tasks on the thread pool to complete.
146+ for (Future <?> task : tasks ) {
147+ try {
148+ task .get ();
149+ }
150+ catch (Exception e ) { /* Ignored */ }
151+ }
152+ }
153+ finally {
154+ closeInputStreams (secondaryIrodsStreams );
155+ }
145156 }
146157 }
158+ finally {
159+ closeOutputStreams (localFileStreams );
160+ }
161+
162+ executor .shutdown ();
163+ // TODO Make this configurable.
164+ executor .awaitTermination (5 , TimeUnit .SECONDS );
147165 }
148166 catch (Exception e ) {
149167 throw new IRODSExceptionMappingService ().map ("Download {0} failed" , e );
@@ -159,4 +177,22 @@ public boolean offset(final Path file) {
159177 public Download withReader (final Read reader ) {
160178 return this ;
161179 }
180+
181+ private static void closeInputStreams (List <IRODSDataObjectInputStream > streams ) {
182+ streams .forEach (in -> {
183+ try {
184+ in .close ();
185+ }
186+ catch (Exception e ) { /* Ignored */ }
187+ });
188+ }
189+
190+ private static void closeOutputStreams (List <OutputStream > streams ) {
191+ streams .forEach (out -> {
192+ try {
193+ out .close ();
194+ }
195+ catch (Exception e ) { /* Ignored */ }
196+ });
197+ }
162198}
0 commit comments