Skip to content

Commit 5e26c8f

Browse files
committed
add IT cases and fix checkstle
1 parent 39f5128 commit 5e26c8f

File tree

5 files changed

+118
-13
lines changed

5 files changed

+118
-13
lines changed

paimon-common/src/main/java/org/apache/paimon/fs/WeightedExternalPathProvider.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@
99
*
1010
* http://www.apache.org/licenses/LICENSE-2.0
1111
*
12-
* Unless required by applicable law or agreed to in writing,
13-
* software distributed under the License is distributed on an
14-
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15-
* KIND, either express or implied. See the License for the
16-
* specific language governing permissions and limitations
17-
* under the License.
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
1817
*/
1918

2019
package org.apache.paimon.fs;

paimon-common/src/test/java/org/apache/paimon/fs/WeightedExternalPathProviderTest.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@
99
*
1010
* http://www.apache.org/licenses/LICENSE-2.0
1111
*
12-
* Unless required by applicable law or agreed to in writing,
13-
* software distributed under the License is distributed on an
14-
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15-
* KIND, either express or implied. See the License for the
16-
* specific language governing permissions and limitations
17-
* under the License.
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
1817
*/
1918

2019
package org.apache.paimon.fs;

paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ private ExternalPathProvider createExternalPathProvider(BinaryRow partition, int
218218
return null;
219219
}
220220
return ExternalPathProvider.create(
221-
strategy, externalPaths, relativeBucketPath(partition, bucket), externalPathWeights);
221+
strategy,
222+
externalPaths,
223+
relativeBucketPath(partition, bucket),
224+
externalPathWeights);
222225
}
223226

224227
public List<Path> getExternalPaths() {

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/AppendOnlyTableITCase.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@
3434
import org.junit.jupiter.api.Test;
3535
import org.junit.jupiter.api.io.TempDir;
3636

37+
import java.io.IOException;
38+
import java.nio.file.Files;
3739
import java.nio.file.Path;
40+
import java.nio.file.Paths;
3841
import java.time.LocalDateTime;
3942
import java.time.ZoneId;
4043
import java.util.ArrayList;
@@ -225,6 +228,49 @@ public void testReadWriteWithExternalPathSpecificFSStrategy() {
225228
Row.of(1, "AAA"), Row.of(2, "BBB"), Row.of(3, "CCC"), Row.of(4, "DDD"));
226229
}
227230

231+
@Test
232+
public void testReadWriteWithExternalPathWeightRobinStrategy() throws IOException {
233+
String externalPaths =
234+
TraceableFileIO.SCHEME
235+
+ "://"
236+
+ tempExternalPath1.toString()
237+
+ ","
238+
+ LocalFileIOLoader.SCHEME
239+
+ "://"
240+
+ tempExternalPath2.toString();
241+
batchSql(
242+
"ALTER TABLE append_table SET ("
243+
+ "'data-file.external-paths' = '"
244+
+ externalPaths
245+
+ "', "
246+
+ "'data-file.external-paths.strategy' = 'weight-robin', "
247+
+ "'data-file.external-paths.weights' = '1,3', "
248+
+ "'write-only' = 'true'"
249+
+ ")");
250+
251+
int fileNum = 50;
252+
for (int i = 1; i <= fileNum; i++) {
253+
batchSql("INSERT INTO append_table VALUES (" + i + ", 'AAA')");
254+
}
255+
256+
List<Row> rows = batchSql("SELECT * FROM append_table");
257+
assertThat(rows.size()).isEqualTo(fileNum);
258+
259+
// Verify file distribution based on weights
260+
long filesInPath1 =
261+
Files.list(Paths.get(tempExternalPath1.toString() + "/bucket-0")).count();
262+
long filesInPath2 =
263+
Files.list(Paths.get(tempExternalPath2.toString() + "/bucket-0")).count();
264+
long totalFiles = filesInPath1 + filesInPath2;
265+
266+
// Since the file sample size is small in IT case, we only verify that higher-weighted path
267+
// has more files
268+
assertThat(filesInPath1).isGreaterThan(0);
269+
assertThat(filesInPath2).isGreaterThan(0);
270+
assertThat(filesInPath2).isGreaterThan(filesInPath1);
271+
assertThat(totalFiles).isEqualTo(fileNum);
272+
}
273+
228274
@Test
229275
public void testReadWriteWithExternalPathNoneStrategy() {
230276
String externalPaths = TraceableFileIO.SCHEME + "://" + tempExternalPath1.toString();

paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
import java.io.IOException;
5454
import java.nio.file.Files;
55+
import java.nio.file.Paths;
5556
import java.util.ArrayList;
5657
import java.util.Arrays;
5758
import java.util.Collections;
@@ -306,6 +307,63 @@ public void testTableReadWriteWithExternalPathRoundRobin() throws Exception {
306307
assertThat(actual).containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]", "+I[3, C]");
307308
}
308309

310+
@Test
311+
public void testTableReadWriteWithExternalPathWeightRobin() throws Exception {
312+
TableEnvironment sEnv =
313+
tableEnvironmentBuilder()
314+
.streamingMode()
315+
.checkpointIntervalMs(ThreadLocalRandom.current().nextInt(900) + 100)
316+
.parallelism(1)
317+
.build();
318+
319+
sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
320+
sEnv.executeSql("USE CATALOG testCatalog");
321+
String externalPaths =
322+
TraceableFileIO.SCHEME
323+
+ "://"
324+
+ externalPath1.toString()
325+
+ ","
326+
+ LocalFileIOLoader.SCHEME
327+
+ "://"
328+
+ externalPath2.toString();
329+
sEnv.executeSql(
330+
"CREATE TABLE T2 ( k INT, v STRING, PRIMARY KEY (k) NOT ENFORCED ) "
331+
+ "WITH ( "
332+
+ "'bucket' = '1',"
333+
+ "'write-only' = 'true',"
334+
+ "'data-file.external-paths' = '"
335+
+ externalPaths
336+
+ "',"
337+
+ "'data-file.external-paths.strategy' = 'weight-robin',"
338+
+ "'data-file.external-paths.weights' = '10,5'"
339+
+ ")");
340+
341+
CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM T2"));
342+
343+
int fileNum = 50;
344+
for (int i = 1; i <= fileNum; i++) {
345+
sEnv.executeSql("INSERT INTO T2 VALUES (" + i + ", 'data" + i + "')").await();
346+
}
347+
348+
List<String> actual = new ArrayList<>();
349+
for (int i = 0; i < fileNum; i++) {
350+
actual.add(it.next().toString());
351+
}
352+
// Verify all data is readable
353+
assertThat(actual).hasSize(fileNum);
354+
355+
long filesInPath1 = Files.list(Paths.get(externalPath1.toString() + "/bucket-0")).count();
356+
long filesInPath2 = Files.list(Paths.get(externalPath2.toString() + "/bucket-0")).count();
357+
long totalFiles = filesInPath1 + filesInPath2;
358+
359+
// Since the file sample size is small in IT case, we only verify that higher-weighted path
360+
// has more files
361+
assertThat(filesInPath1).isGreaterThan(0);
362+
assertThat(filesInPath2).isGreaterThan(0);
363+
assertThat(filesInPath1).isGreaterThan(filesInPath2);
364+
assertThat(totalFiles).isEqualTo(fileNum);
365+
}
366+
309367
@Test
310368
public void testDropTableWithExternalPaths() throws Exception {
311369
TableEnvironment sEnv =

0 commit comments

Comments
 (0)