Skip to content

Commit 6365db8

Browse files
Review suggestions for twitter#1921 (#1)
* Add more comments; add explicit else conditions to some if cases; lift base case out of pattern match * Formatting changes * More minor formatting.
1 parent 7bb0382 commit 6365db8

File tree

3 files changed

+75
-48
lines changed

3 files changed

+75
-48
lines changed

scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetCollectionFormatCompatibility.scala

Lines changed: 53 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ private[scrooge] object ParquetCollectionFormatCompatibility {
3737
* The result is projected file schema with the same optional/required fields as the
3838
* projected read schema, but collection type format as the file schema.
3939
*
40-
* @param projectedReadSchema read schema specifying field projection
4140
* @param fileSchema file schema to be projected
41+
* @param projectedReadSchema read schema specifying field projection
4242
*/
4343
def projectFileSchema(fileSchema: MessageType, projectedReadSchema: MessageType): MessageType = {
4444
val projectedFileSchema = projectFileType(fileSchema, projectedReadSchema, FieldContext()).asGroupType()
@@ -55,50 +55,70 @@ private[scrooge] object ParquetCollectionFormatCompatibility {
5555
* handle projection and possible nested collection types in the repeated type.
5656
*/
5757
private def projectFileType(fileType: Type, projectedReadType: Type, fieldContext: FieldContext): Type = {
58-
(extractCollectionGroup(projectedReadType), extractCollectionGroup(fileType)) match {
59-
case _ if projectedReadType.isPrimitive && fileType.isPrimitive =>
58+
if (projectedReadType.isPrimitive || fileType.isPrimitive) {
59+
// Base-cases to handle primitive types:
60+
if (projectedReadType.isPrimitive && fileType.isPrimitive) {
61+
// The field is a primitive in both schemas
6062
projectedReadType
61-
case _ if projectedReadType.isPrimitive != fileType.isPrimitive =>
63+
} else {
64+
// The field is primitive in one schema but non-primitive in the othe other
6265
throw new DecodingSchemaMismatchException(
6366
s"Found schema mismatch between projected read type:\n$projectedReadType\n" +
6467
s"and file type:\n${fileType}"
6568
)
66-
case (Some(projectedReadGroup: ListGroup), Some(fileGroup: ListGroup)) =>
67-
projectFileGroup(fileGroup, projectedReadGroup, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1), formatter=ParquetListFormatter)
68-
case (Some(projectedReadGroup: MapGroup), Some(fileGroup: MapGroup)) =>
69-
projectFileGroup(fileGroup, projectedReadGroup, fieldContext, formatter=ParquetMapFormatter)
70-
case _ => // Struct projection
71-
val projectedReadGroupType = projectedReadType.asGroupType
72-
val fileGroupType = fileType.asGroupType
73-
val projectedReadFields = projectedReadGroupType.getFields.asScala.map { projectedReadField =>
74-
if (!fileGroupType.containsField(projectedReadField.getName)) {
75-
if (!projectedReadField.isRepetition(Repetition.OPTIONAL)) {
76-
throw new DecodingSchemaMismatchException(
77-
s"Found non-optional projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" +
78-
s"not present in the given file group type:\n${fileGroupType}"
79-
)
80-
}
81-
projectedReadField
82-
} else {
83-
val fileFieldIndex = fileGroupType.getFieldIndex(projectedReadField.getName)
84-
val fileField = fileGroupType.getFields.get(fileFieldIndex)
85-
if (fileField.isRepetition(Repetition.OPTIONAL) && projectedReadField.isRepetition(Repetition.REQUIRED)) {
86-
throw new DecodingSchemaMismatchException(
87-
s"Found required projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" +
88-
s"on optional file field:\n${fileField}"
89-
)
69+
}
70+
} else {
71+
// Recursive cases to handle non-primitives (lists, maps, and structs):
72+
(extractCollectionGroup(projectedReadType), extractCollectionGroup(fileType)) match {
73+
case (Some(projectedReadGroup: ListGroup), Some(fileGroup: ListGroup)) =>
74+
projectFileGroup(fileGroup, projectedReadGroup, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1), formatter=ParquetListFormatter)
75+
case (Some(projectedReadGroup: MapGroup), Some(fileGroup: MapGroup)) =>
76+
projectFileGroup(fileGroup, projectedReadGroup, fieldContext, formatter=ParquetMapFormatter)
77+
case _ => // Struct projection
78+
val projectedReadGroupType = projectedReadType.asGroupType
79+
val fileGroupType = fileType.asGroupType
80+
val projectedReadFields = projectedReadGroupType.getFields.asScala.map { projectedReadField =>
81+
if (!fileGroupType.containsField(projectedReadField.getName)) {
82+
// The projected read schema includes a field which is missing from the file schema.
83+
if (projectedReadField.isRepetition(Repetition.OPTIONAL)) {
84+
// The missing field is optional in the projected read schema. Since the file schema
85+
// doesn't contain this field there are no collection compatibility concerns to worry
86+
// about and we can simply use the supplied schema:
87+
projectedReadField
88+
} else {
89+
// The missing field is repeated or required, which is an error:
90+
throw new DecodingSchemaMismatchException(
91+
s"Found non-optional projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" +
92+
s"not present in the given file group type:\n${fileGroupType}"
93+
)
94+
}
95+
} else {
96+
// The field is present in both schemas, so first check that the schemas specify compatible repetition
97+
// values for the field, then recursively process the fields:
98+
val fileFieldIndex = fileGroupType.getFieldIndex(projectedReadField.getName)
99+
val fileField = fileGroupType.getFields.get(fileFieldIndex)
100+
if (fileField.isRepetition(Repetition.OPTIONAL) && projectedReadField.isRepetition(Repetition.REQUIRED)) {
101+
// The field is optional in the file schema but required in the projected read schema; this is an error:
102+
throw new DecodingSchemaMismatchException(
103+
s"Found required projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" +
104+
s"on optional file field:\n${fileField}"
105+
)
106+
} else {
107+
// The field's repetitions are compatible in both schemas (e.g. optional in both schemas or required
108+
// in both), so recursively process the field:
109+
projectFileType(fileField, projectedReadField, FieldContext(projectedReadField.getName))
110+
}
90111
}
91-
projectFileType(fileField, projectedReadField, FieldContext(projectedReadField.getName))
92112
}
93-
}
94-
projectedReadGroupType.withNewFields(projectedReadFields.asJava)
113+
projectedReadGroupType.withNewFields(projectedReadFields.asJava)
114+
}
95115
}
96116
}
97117

98118
private def projectFileGroup(fileGroup: CollectionGroup,
99119
projectedReadGroup: CollectionGroup,
100120
fieldContext: FieldContext,
101-
formatter: ParquetCollectionFormatter) = {
121+
formatter: ParquetCollectionFormatter): GroupType = {
102122
val projectedFileRepeatedType = formatter.formatCompatibleRepeatedType(
103123
fileGroup.repeatedType,
104124
projectedReadGroup.repeatedType,
@@ -118,8 +138,8 @@ private[scrooge] trait ParquetCollectionFormatter {
118138
/**
119139
* Format source repeated type in the structure of target repeated type.
120140
*
121-
* @param readRepeatedType repeated type from which the formatted result get content
122141
* @param fileRepeatedType repeated type from which the formatted result get the structure
142+
* @param readRepeatedType repeated type from which the formatted result get content
123143
* @param recursiveSolver solver for the inner content of the repeated type
124144
* @return formatted result
125145
*/

scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetListFormatter.scala

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,20 @@ private[scrooge] object ParquetListFormatter extends ParquetCollectionFormatter
1919
private val logger = LoggerFactory.getLogger(getClass)
2020

2121
private val rules: Seq[ParquetListFormatRule] = Seq(
22-
PrimitiveElementRule, PrimitiveArrayRule,
23-
GroupElementRule, GroupArrayRule,
24-
TupleRule, StandardRule, SparkLegacyNullableElementRule
22+
PrimitiveElementRule,
23+
PrimitiveArrayRule,
24+
GroupElementRule,
25+
GroupArrayRule,
26+
TupleRule,
27+
StandardRule,
28+
SparkLegacyNullableElementRule
2529
)
2630

2731
def formatCompatibleRepeatedType(fileRepeatedType: Type,
2832
readRepeatedType: Type,
2933
fieldContext: FieldContext,
3034
recursiveSolver: (Type, Type, FieldContext) => Type): Type = {
31-
(
32-
findRule(fileRepeatedType),
33-
findRule(readRepeatedType)
34-
) match {
35+
(findRule(fileRepeatedType), findRule(readRepeatedType)) match {
3536
case (Some(fileRule), Some(readRule)) => {
3637
val readElementType = readRule.elementType(readRepeatedType)
3738
val fileElementType = fileRule.elementType(fileRepeatedType)
@@ -42,7 +43,7 @@ private[scrooge] object ParquetListFormatter extends ParquetCollectionFormatter
4243
elementName = readRule.elementName(readRepeatedType),
4344
isElementRequired = readRule.isElementRequired(readRepeatedType),
4445
elementOriginalType = readRule.elementOriginalType(readRepeatedType),
45-
fieldContext=fieldContext
46+
fieldContext = fieldContext
4647
)
4748
}
4849

@@ -165,8 +166,9 @@ private[scrooge] sealed trait GroupListRule extends ParquetListFormatRule {
165166
override def elementName(repeatedType: Type): String = this.constantElementName
166167

167168
override def appliesToType(repeatedType: Type): Boolean = {
168-
if (repeatedType.isPrimitive) false
169-
else {
169+
if (repeatedType.isPrimitive) {
170+
false
171+
} else {
170172
val groupType = repeatedType.asGroupType
171173
groupType.getFields.size > 0 && groupType.getName == this.constantElementName
172174
}
@@ -216,8 +218,11 @@ private[scrooge] object TupleRule extends ParquetListFormatRule {
216218
override def createCompliantRepeatedType(typ: Type, name: String, isElementRequired: Boolean, originalType: OriginalType, fieldContext: FieldContext): Type = {
217219
// nested list has type name of the form: `field_original_name_tuple_tuple..._tuple` for the depth of list
218220
val suffixed_name = (List(fieldContext.name) ++ (1 to fieldContext.nestedListLevel).toList.map(_ => "tuple")).mkString("_")
219-
if (typ.isPrimitive) new PrimitiveType(Type.Repetition.REPEATED, typ.asPrimitiveType.getPrimitiveTypeName, suffixed_name, originalType)
220-
else new GroupType(Type.Repetition.REPEATED, suffixed_name, originalType, typ.asGroupType.getFields)
221+
if (typ.isPrimitive) {
222+
new PrimitiveType(Type.Repetition.REPEATED, typ.asPrimitiveType.getPrimitiveTypeName, suffixed_name, originalType)
223+
} else {
224+
new GroupType(Type.Repetition.REPEATED, suffixed_name, originalType, typ.asGroupType.getFields)
225+
}
221226
}
222227
}
223228

@@ -304,7 +309,8 @@ private[scrooge] object SparkLegacyNullableElementRule extends ThreeLevelRule {
304309
override def createCompliantRepeatedType(originalElementType: Type, name: String, isElementRequired: Boolean, originalType: OriginalType, fieldContext: FieldContext): Type = {
305310
if (isElementRequired) {
306311
throw new IllegalArgumentException(s"Spark legacy mode for nullable element cannot take required element. Found: ${originalElementType}")
312+
} else {
313+
super.createCompliantRepeatedType(originalElementType, name, isElementRequired, originalType, fieldContext)
307314
}
308-
super.createCompliantRepeatedType(originalElementType, name, isElementRequired, originalType, fieldContext)
309315
}
310316
}

scalding-parquet-scrooge/src/main/scala/com/twitter/scalding/parquet/scrooge/ParquetMapFormatter.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import org.apache.parquet.schema.{OriginalType, Type}
66
* Format parquet map schema of read type to structure of file type.
77
* The supported formats are:
88
* 1) Standard repeated type of `key_value` without annotation
9-
* 2) Legacy repeated `map field annotated with (MAP_KEY_VALUE)
9+
* 2) Legacy repeated `map` field annotated with (MAP_KEY_VALUE)
1010
* as described in
1111
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps
1212
*
@@ -17,7 +17,8 @@ private[scrooge] object ParquetMapFormatter extends ParquetCollectionFormatter {
1717

1818
def formatCompatibleRepeatedType(fileRepeatedMapType: Type,
1919
readRepeatedMapType: Type,
20-
fieldContext: FieldContext, recursiveSolver: (Type, Type, FieldContext) => Type): Type = {
20+
fieldContext: FieldContext,
21+
recursiveSolver: (Type, Type, FieldContext) => Type): Type = {
2122
val solvedRepeatedType = recursiveSolver(fileRepeatedMapType, readRepeatedMapType, fieldContext)
2223
fileRepeatedMapType.asGroupType().withNewFields(solvedRepeatedType.asGroupType().getFields)
2324
}

0 commit comments

Comments
 (0)