Skip to content

Commit cd41fa8

Browse files
authored
Merge pull request #90 from Altinity/89-improve-the-error-message-when-user-attempts-to-perform-a-no-copy-with-multiple-partitions-in-the-same-row-groupblock
Added logic to throw exception when the user tries to use -force-no-c…
2 parents d2d7233 + 8b5c15d commit cd41fa8

File tree

3 files changed

+31
-15
lines changed

3 files changed

+31
-15
lines changed

ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -423,17 +423,20 @@ private static List<DataFile> processFile(
423423

424424
PartitionKey partitionKey = null;
425425
if (partitionSpec.isPartitioned()) {
426-
partitionKey = Partitioning.inferPartitionKey(metadata, partitionSpec);
427-
if (partitionKey == null) {
426+
var inferResult = Partitioning.inferPartitionKey(metadata, partitionSpec);
427+
if (!inferResult.success()) {
428428
if (options.noCopy || options.s3CopyObject) {
429429
throw new BadRequestException(
430430
String.format(
431-
"Cannot infer partition key of %s from the metadata", inputFile.location()));
431+
"%s: %s. In no-copy mode, each file must contain data for only one partition value",
432+
inputFile.location(), inferResult.failureReason()));
432433
}
433434
logger.warn(
434-
"{} does not appear to be partitioned. Falling back to full scan (slow)",
435-
inputFile.location());
435+
"{}: {}. Falling back to full scan (slow)",
436+
inputFile.location(),
437+
inferResult.failureReason());
436438
} else {
439+
partitionKey = inferResult.partitionKey();
437440
logger.info("{}: using inferred partition key {}", file, partitionKey);
438441
}
439442
}

ice/src/main/java/com/altinity/ice/cli/internal/iceberg/Partitioning.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@ public final class Partitioning {
4949

5050
private Partitioning() {}
5151

52+
public record InferPartitionKeyResult(
53+
@Nullable PartitionKey partitionKey, @Nullable String failureReason) {
54+
public boolean success() {
55+
return partitionKey != null;
56+
}
57+
}
58+
5259
public static PartitionSpec newPartitionSpec(Schema schema, List<Main.IcePartition> columns) {
5360
final PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
5461
if (!columns.isEmpty()) {
@@ -123,7 +130,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
123130
}
124131

125132
// TODO: fall back to path when statistics is not available
126-
public static @Nullable PartitionKey inferPartitionKey(
133+
public static InferPartitionKeyResult inferPartitionKey(
127134
ParquetMetadata metadata, PartitionSpec spec) {
128135
Schema schema = spec.schema();
129136

@@ -138,7 +145,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
138145

139146
Object value = null;
140147
Object valueTransformed = null;
141-
boolean same = true;
148+
String failureReason = null;
142149

143150
for (BlockMetaData block : blocks) {
144151
ColumnChunkMetaData columnMeta =
@@ -148,7 +155,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
148155
.orElse(null);
149156

150157
if (columnMeta == null) {
151-
same = false;
158+
failureReason = String.format("Column '%s' not found in file metadata", sourceName);
152159
break;
153160
}
154161

@@ -158,7 +165,7 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
158165
|| !stats.hasNonNullValue()
159166
|| stats.genericGetMin() == null
160167
|| stats.genericGetMax() == null) {
161-
same = false;
168+
failureReason = String.format("Column '%s' has no statistics", sourceName);
162169
break;
163170
}
164171

@@ -176,29 +183,35 @@ public static void apply(UpdatePartitionSpec op, List<Main.IcePartition> columns
176183
Object maxTransformed = boundTransform.apply(max);
177184

178185
if (!minTransformed.equals(maxTransformed)) {
179-
same = false;
186+
failureReason =
187+
String.format(
188+
"File contains multiple partition values for '%s' (min: %s, max: %s)",
189+
sourceName, minTransformed, maxTransformed);
180190
break;
181191
}
182192

183193
if (valueTransformed == null) {
184194
valueTransformed = minTransformed;
185195
value = min;
186196
} else if (!valueTransformed.equals(minTransformed)) {
187-
same = false;
197+
failureReason =
198+
String.format(
199+
"File contains multiple partition values for '%s' (e.g., %s and %s)",
200+
sourceName, valueTransformed, minTransformed);
188201
break;
189202
}
190203
}
191204

192-
if (same && value != null) {
205+
if (failureReason == null && value != null) {
193206
partitionRecord.setField(sourceName, decodeStatValue(value, type));
194207
} else {
195-
return null;
208+
return new InferPartitionKeyResult(null, failureReason);
196209
}
197210
}
198211

199212
PartitionKey partitionKey = new PartitionKey(spec, schema);
200213
partitionKey.wrap(partitionRecord);
201-
return partitionKey;
214+
return new InferPartitionKeyResult(partitionKey, null);
202215
}
203216

204217
// Copied from org.apache.iceberg.parquet.ParquetConversions.

ice/src/test/java/com/altinity/ice/cli/internal/iceberg/PartitioningTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private PartitionKey partitionOf(
9090
throws IOException {
9191
Map<PartitionKey, List<Record>> partition =
9292
Partitioning.partition(inputFile, partitionSpec.schema(), partitionSpec);
93-
PartitionKey result = Partitioning.inferPartitionKey(metadata, partitionSpec);
93+
PartitionKey result = Partitioning.inferPartitionKey(metadata, partitionSpec).partitionKey();
9494
if (result != null) {
9595
assertThat(partition.size()).isEqualTo(1);
9696
PartitionKey expected = partition.keySet().stream().findFirst().get();

0 commit comments

Comments
 (0)