Skip to content

Commit 65e02a0

Browse files
committed
Add Async Copy methods.
1 parent fbf4652 commit 65e02a0

File tree

1 file changed

+81
-12
lines changed
  • util/src/main/java/io/kubernetes/client

1 file changed

+81
-12
lines changed

util/src/main/java/io/kubernetes/client/Copy.java

Lines changed: 81 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import java.io.OutputStream;
3131
import java.nio.file.Path;
3232
import java.nio.file.Paths;
33+
import java.util.concurrent.ExecutionException;
34+
import java.util.concurrent.Future;
35+
import java.util.concurrent.TimeUnit;
3336
import org.apache.commons.codec.binary.Base64InputStream;
3437
import org.apache.commons.codec.binary.Base64OutputStream;
3538
import org.apache.commons.compress.archivers.ArchiveEntry;
@@ -153,6 +156,21 @@ public void copyDirectoryFromPod(
153156
createDirectoryStructureFromTree(tree, namespace, pod, container, srcPath, destination);
154157
return;
155158
}
159+
Future<Integer> future =
160+
copyDirectoryFromPodAsync(namespace, pod, container, srcPath, destination);
161+
try {
162+
int code = future.get().intValue();
163+
if (code != 0) {
164+
throw new IOException("Copy failed (" + code + ")");
165+
}
166+
} catch (InterruptedException | ExecutionException ex) {
167+
throw new IOException(ex);
168+
}
169+
}
170+
171+
public Future<Integer> copyDirectoryFromPodAsync(
172+
String namespace, String pod, String container, String srcPath, Path destination)
173+
throws IOException, ApiException {
156174
final Process proc =
157175
this.exec(
158176
namespace,
@@ -187,14 +205,7 @@ public void copyDirectoryFromPod(
187205
}
188206
}
189207
}
190-
try {
191-
int status = proc.waitFor();
192-
if (status != 0) {
193-
throw new IOException("Copy command failed with status " + status);
194-
}
195-
} catch (InterruptedException ex) {
196-
throw new IOException(ex);
197-
}
208+
return new ProcessFuture(proc);
198209
}
199210

200211
// This creates directories and files using tree of files and directories under container
@@ -330,6 +341,19 @@ public void copyFileToPod(
330341
String namespace, String pod, String container, Path srcPath, Path destPath)
331342
throws ApiException, IOException {
332343

344+
try {
345+
int exit = copyFileToPodAsync(namespace, pod, container, srcPath, destPath).get();
346+
if (exit != 0) {
347+
throw new IOException("Failed to copy: " + exit);
348+
}
349+
} catch (InterruptedException | ExecutionException ex) {
350+
throw new IOException(ex);
351+
}
352+
}
353+
354+
public Future<Integer> copyFileToPodAsync(
355+
String namespace, String pod, String container, Path srcPath, Path destPath)
356+
throws ApiException, IOException {
333357
// Run decoding and extracting processes
334358
final Process proc = execCopyToPod(namespace, pod, container, destPath);
335359

@@ -344,15 +368,27 @@ public void copyFileToPod(
344368
archiveOutputStream.putArchiveEntry(tarEntry);
345369
ByteStreams.copy(input, archiveOutputStream);
346370
archiveOutputStream.closeArchiveEntry();
347-
} finally {
348-
proc.destroy();
371+
372+
return new ProcessFuture(proc);
349373
}
350374
}
351375

352376
public void copyFileToPod(
353377
String namespace, String pod, String container, byte[] src, Path destPath)
354378
throws ApiException, IOException {
379+
try {
380+
int exit = copyFileToPodAsync(namespace, pod, container, src, destPath).get();
381+
if (exit != 0) {
382+
throw new IOException("Copy failed: " + exit);
383+
}
384+
} catch (InterruptedException | ExecutionException ex) {
385+
throw new IOException(ex);
386+
}
387+
}
355388

389+
public Future<Integer> copyFileToPodAsync(
390+
String namespace, String pod, String container, byte[] src, Path destPath)
391+
throws ApiException, IOException {
356392
// Run decoding and extracting processes
357393
final Process proc = execCopyToPod(namespace, pod, container, destPath);
358394

@@ -365,8 +401,8 @@ public void copyFileToPod(
365401
archiveOutputStream.putArchiveEntry(tarEntry);
366402
ByteStreams.copy(new ByteArrayInputStream(src), archiveOutputStream);
367403
archiveOutputStream.closeArchiveEntry();
368-
} finally {
369-
proc.destroy();
404+
405+
return new ProcessFuture(proc);
370406
}
371407
}
372408

@@ -397,4 +433,37 @@ private boolean isTarPresentInContainer(String namespace, String pod, String con
397433
proc.destroy();
398434
}
399435
}
436+
437+
private static class ProcessFuture implements Future<Integer> {
438+
private Process proc;
439+
440+
ProcessFuture(Process proc) {
441+
this.proc = proc;
442+
}
443+
444+
// TODO: support cancelling?
445+
public boolean cancel(boolean interupt) {
446+
return false;
447+
}
448+
449+
public boolean isCancelled() {
450+
return false;
451+
}
452+
453+
public Integer get() throws InterruptedException {
454+
proc.waitFor();
455+
proc.destroy();
456+
return proc.exitValue();
457+
}
458+
459+
public Integer get(long timeout, TimeUnit unit) throws InterruptedException {
460+
proc.waitFor(timeout, unit);
461+
proc.destroy();
462+
return proc.exitValue();
463+
}
464+
465+
public boolean isDone() {
466+
return proc.isAlive();
467+
}
468+
}
400469
}

0 commit comments

Comments
 (0)