Skip to content

Commit 11bb3c2

Browse files
authored
Merge pull request #1423 from cloudsufi/cherry-pick-a60783bc696a9500958087ccb229d301f933021e
[🍒][PLUGIN-698] Add wildcard support for copy and move action
2 parents 1097d60 + 72420b9 commit 11bb3c2

File tree

7 files changed

+120
-7
lines changed

7 files changed

+120
-7
lines changed

docs/GCSCopy-action.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Properties
2525
It can be found on the Dashboard in the Google Cloud Platform Console.
2626

2727
**Source Path**: Path to a source object or directory.
28+
> Use `*` to copy multiple files. For example, `gs://demo0/prod/reports/*.csv` will copy all CSV files in the `reports` directory.
2829
2930
**Destination Path**: Path to the destination. The bucket will be created if it does not exist.
3031

docs/GCSMove-action.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Properties
2626
It can be found on the Dashboard in the Google Cloud Platform Console.
2727

2828
**Source Path**: Path to a source object or directory.
29+
> Use `*` to move multiple files. For example, `gs://demo0/prod/reports/*.csv` will move all CSV files in the `reports` directory.
2930
3031
**Destination Path**: Path to the destination. The bucket will be created if it does not exist.
3132

src/main/java/io/cdap/plugin/gcp/gcs/StorageClient.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,17 @@
3434
import org.slf4j.LoggerFactory;
3535

3636
import java.io.IOException;
37+
import java.nio.file.FileSystems;
38+
import java.nio.file.PathMatcher;
39+
import java.nio.file.Paths;
3740
import java.util.ArrayList;
41+
import java.util.HashSet;
3842
import java.util.Iterator;
3943
import java.util.List;
4044
import java.util.Map;
45+
import java.util.Set;
4146
import java.util.function.Consumer;
47+
import java.util.regex.Pattern;
4248
import javax.annotation.Nullable;
4349

4450
/**
@@ -111,7 +117,7 @@ public void mapMetaDataForAllBlobs(String path, Consumer<Map<String, String>> fu
111117
/**
112118
* Creates the given bucket if it does not exists.
113119
*
114-
* @param path the path of the bucket
120+
* @param path the path of the bucket
115121
* @param location the location of bucket
116122
* @param cmekKeyName the name of the cmek key
117123
*/
@@ -163,6 +169,42 @@ public void move(GCSPath sourcePath, GCSPath destPath, boolean recursive, boolea
163169
pairTraverse(sourcePath, destPath, recursive, overwrite, BlobPair::move);
164170
}
165171

172+
/**
173+
* Get all the matching wildcard paths given the regex input.
174+
*/
175+
public List<GCSPath> getMatchedPaths(GCSPath sourcePath, boolean recursive, Pattern wildcardRegex) {
176+
Page<Blob> blobPage = storage.list(sourcePath.getBucket(), Storage.BlobListOption.prefix(
177+
getWildcardPathPrefix(sourcePath, wildcardRegex)
178+
));
179+
List<String> blobPageNames = new ArrayList<>();
180+
blobPage.getValues().forEach(blob -> blobPageNames.add(blob.getName()));
181+
return getFilterMatchedPaths(sourcePath, blobPageNames, recursive);
182+
}
183+
184+
static String getWildcardPathPrefix(GCSPath sourcePath, Pattern wildcardRegex) {
185+
String pattern = sourcePath.getName();
186+
String[] patternSplits = pattern.split(wildcardRegex.pattern());
187+
// prefix may be empty
188+
return patternSplits.length >= 1 ? patternSplits[0] : "";
189+
}
190+
191+
static List<GCSPath> getFilterMatchedPaths(GCSPath sourcePath, List<String> blobPageNames, boolean recursive) {
192+
Set<GCSPath> matchedPaths = new HashSet<>();
193+
String globPattern = "glob:" + sourcePath.getName();
194+
PathMatcher matcher = FileSystems.getDefault().getPathMatcher(globPattern);
195+
for (String blobName : blobPageNames) {
196+
if (matcher.matches(Paths.get(blobName))) {
197+
LOG.debug("Blob name {} matches the glob pattern {}", blobName, globPattern);
198+
String gcsPath = String.format("gs://%s/%s", sourcePath.getBucket(), blobName);
199+
matchedPaths.add(GCSPath.from(gcsPath));
200+
}
201+
}
202+
if (!recursive) {
203+
matchedPaths.removeIf(path -> path.getName().endsWith("/"));
204+
}
205+
return new ArrayList<>(matchedPaths);
206+
}
207+
166208
/**
167209
* Gets source and destination pairs by traversing the source path. Consumes each pair after the directory structure
168210
* is completely traversed.

src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSCopy.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import io.cdap.plugin.gcp.gcs.StorageClient;
3636

3737
import java.io.IOException;
38+
import java.util.ArrayList;
3839
import java.util.Collections;
40+
import java.util.List;
3941
import javax.annotation.Nullable;
4042

4143

@@ -76,9 +78,17 @@ public void run(ActionContext context) throws IOException {
7678
// create the destination bucket if not exist
7779
storageClient.createBucketIfNotExists(destPath, config.location, cmekKeyName);
7880

79-
//noinspection ConstantConditions
80-
storageClient.copy(config.getSourcePath(), config.getDestPath(), config.recursive, config.shouldOverwrite());
81-
81+
List<GCSPath> matchedPaths = new ArrayList<>();
82+
if (SourceDestConfig.WILDCARD_REGEX.matcher(config.getSourcePath().getName()).find()) {
83+
matchedPaths = storageClient.getMatchedPaths(config.getSourcePath(), config.recursive,
84+
SourceDestConfig.WILDCARD_REGEX);
85+
} else {
86+
matchedPaths.add(config.getSourcePath());
87+
}
88+
for (GCSPath sourcePath : matchedPaths) {
89+
//noinspection ConstantConditions
90+
storageClient.copy(sourcePath, config.getDestPath(), config.recursive, config.shouldOverwrite());
91+
}
8292
}
8393

8494
/**

src/main/java/io/cdap/plugin/gcp/gcs/actions/GCSMove.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import io.cdap.plugin.gcp.gcs.StorageClient;
3636

3737
import java.io.IOException;
38+
import java.util.ArrayList;
3839
import java.util.Collections;
40+
import java.util.List;
3941
import javax.annotation.Nullable;
4042

4143

@@ -75,8 +77,17 @@ public void run(ActionContext context) throws IOException {
7577
// create the destination bucket if not exist
7678
storageClient.createBucketIfNotExists(destPath, config.location, cmekKeyName);
7779

78-
//noinspection ConstantConditions
79-
storageClient.move(config.getSourcePath(), config.getDestPath(), config.recursive, config.shouldOverwrite());
80+
List<GCSPath> matchedPaths = new ArrayList<>();
81+
if (SourceDestConfig.WILDCARD_REGEX.matcher(config.getSourcePath().getName()).find()) {
82+
matchedPaths = storageClient.getMatchedPaths(config.getSourcePath(), config.recursive,
83+
SourceDestConfig.WILDCARD_REGEX);
84+
} else {
85+
matchedPaths.add(config.getSourcePath());
86+
}
87+
for (GCSPath sourcePath : matchedPaths) {
88+
//noinspection ConstantConditions
89+
storageClient.move(sourcePath, config.getDestPath(), config.recursive, config.shouldOverwrite());
90+
}
8091
}
8192

8293
/**

src/main/java/io/cdap/plugin/gcp/gcs/actions/SourceDestConfig.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@
3333
import io.cdap.plugin.gcp.gcs.GCSPath;
3434

3535
import java.util.Collections;
36+
import java.util.List;
3637
import java.util.Map;
38+
import java.util.regex.Pattern;
3739
import javax.annotation.Nullable;
3840

3941
/**
@@ -44,6 +46,7 @@ public class SourceDestConfig extends GCPConfig {
4446
public static final String NAME_DEST_PATH = "destPath";
4547
public static final String NAME_LOCATION = "location";
4648
public static final String READ_TIMEOUT = "readTimeout";
49+
public static final Pattern WILDCARD_REGEX = Pattern.compile("[*]");
4750

4851
@Name(NAME_SOURCE_PATH)
4952
@Macro
@@ -132,6 +135,11 @@ public void validate(FailureCollector collector, Map<String, String> arguments)
132135
} catch (IllegalArgumentException e) {
133136
collector.addFailure(e.getMessage(), null).withConfigProperty(NAME_DEST_PATH);
134137
}
138+
if (WILDCARD_REGEX.matcher(destPath).find()) {
139+
collector.addFailure("Destination path should not contain wildcard characters.",
140+
"Please remove the wildcard characters from the destination path.")
141+
.withConfigProperty(NAME_DEST_PATH);
142+
}
135143
}
136144
if (!containsMacro(NAME_CMEK_KEY)) {
137145
validateCmekKey(collector, arguments);

src/test/java/io/cdap/plugin/gcp/gcs/StorageClientTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,19 @@
2020
import com.google.cloud.storage.BucketInfo;
2121
import com.google.cloud.storage.Storage;
2222
import com.google.cloud.storage.StorageException;
23+
import io.cdap.plugin.gcp.gcs.actions.SourceDestConfig;
2324
import org.junit.After;
2425
import org.junit.Assert;
2526
import org.junit.Before;
2627
import org.junit.Test;
2728
import org.mockito.Mock;
2829
import org.mockito.MockitoAnnotations;
29-
import org.slf4j.Logger;
3030

3131
import java.io.ByteArrayOutputStream;
3232
import java.io.PrintStream;
33+
import java.util.ArrayList;
34+
import java.util.Comparator;
35+
import java.util.List;
3336

3437
import static org.mockito.Mockito.any;
3538
import static org.mockito.Mockito.times;
@@ -50,12 +53,20 @@ public class StorageClientTest {
5053
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
5154

5255
private final PrintStream originalOut = System.out;
56+
private final List<String> blobPageNames = new ArrayList<>();
5357

5458
@Before
5559
public void setUp() {
5660
MockitoAnnotations.initMocks(this);
5761
storageClient = new StorageClient(storage);
5862
System.setOut(new PrintStream(outContent));
63+
// Setup blobPageNames
64+
blobPageNames.add("mydir/test_web1/report.html");
65+
blobPageNames.add("mydir/test_web2/report.html");
66+
blobPageNames.add("mydir/test_web2/css/");
67+
blobPageNames.add("mydir/test_web2/css/foo.css");
68+
blobPageNames.add("mydir/test_mob1/report.html");
69+
blobPageNames.add("mydir/test_mob2/report.html");
5970
}
6071

6172
@After
@@ -144,4 +155,33 @@ public void testCreateBucketIfNotExists() {
144155
}
145156
Assert.fail("Test for detecting bucket creation failure did not succeed. No exception caught");
146157
}
158+
159+
@Test
160+
public void testGetWildcardPathPrefix() {
161+
Assert.assertEquals("mydir/test_web", StorageClient.getWildcardPathPrefix(
162+
GCSPath.from("gs://my-bucket/mydir/test_web*/"), SourceDestConfig.WILDCARD_REGEX));
163+
Assert.assertEquals("", StorageClient.getWildcardPathPrefix(
164+
GCSPath.from("gs://my-bucket/*"), SourceDestConfig.WILDCARD_REGEX));
165+
}
166+
167+
@Test
168+
public void testFilterMatchedPaths() {
169+
GCSPath sourcePath = GCSPath.from("gs://foobucket/mydir/test_web*/*");
170+
List<GCSPath> filterMatchedPaths = StorageClient.getFilterMatchedPaths(sourcePath, blobPageNames, false);
171+
filterMatchedPaths.sort(Comparator.comparing(GCSPath::getUri));
172+
Assert.assertEquals(2, filterMatchedPaths.size());
173+
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web1/report.html"), filterMatchedPaths.get(0));
174+
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/report.html"), filterMatchedPaths.get(1));
175+
}
176+
177+
@Test
178+
public void testFilterMatchedPathsWithRecursive() {
179+
GCSPath sourcePath = GCSPath.from("gs://foobucket/mydir/test_web*/*");
180+
List<GCSPath> filterMatchedPaths = StorageClient.getFilterMatchedPaths(sourcePath, blobPageNames, true);
181+
Assert.assertEquals(3, filterMatchedPaths.size());
182+
filterMatchedPaths.sort(Comparator.comparing(GCSPath::getUri));
183+
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web1/report.html"), filterMatchedPaths.get(0));
184+
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/css/"), filterMatchedPaths.get(1));
185+
Assert.assertEquals(GCSPath.from("gs://foobucket/mydir/test_web2/report.html"), filterMatchedPaths.get(2));
186+
}
147187
}

0 commit comments

Comments
 (0)