-
Notifications
You must be signed in to change notification settings - Fork 705
Make projected parquet collection schema forward compatible with the given file schema #1921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 31 commits
e373ea2
721a490
0adbfae
84c119a
6bf7652
018f267
ae89c06
ca67991
b7363b8
2996317
e02d0a8
4d40c06
6e551e5
436def8
7ba2305
d2f2fb5
9c96d23
09bd4dd
83b6898
d0ed5d6
5de8b64
db937c3
0c38af8
3700a85
3b7255b
3a9be2e
5b5cf2b
e29c7e9
cd7e69c
b05f13c
7bb0382
6365db8
b6c8a92
579905c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,5 +1,5 @@ | ||
| language: scala | ||
| jdk: oraclejdk8 | ||
| jdk: openjdk8 | ||
| sudo: false | ||
|
|
||
| before_install: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,163 @@ | ||
| package com.twitter.scalding.parquet.scrooge | ||
|
|
||
| import org.apache.parquet.schema.Type.Repetition | ||
| import org.apache.parquet.schema.{GroupType, MessageType, Type} | ||
| import org.apache.parquet.thrift.DecodingSchemaMismatchException | ||
| import org.slf4j.LoggerFactory | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| /** | ||
| * Project file schema based on projected read schema which may contain different format | ||
| * of collection group--list and map. This is currently used in [[ScroogeReadSupport]] where | ||
| * projected read schema can come from: | ||
| * 1) Thrift struct via [[org.apache.parquet.thrift.ThriftSchemaConvertVisitor]] which always | ||
| * describe list with `_tuple` format, and map which has `MAP_KEY_VALUE` annotation. | ||
| * 2) User-supplied schema string via config key | ||
| * [[org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA]] | ||
| * | ||
| * By definition, the projected read schema is a "sub-graph" of file schema in terms of field names. | ||
| * (We do allow optional field in projected read schema to be in | ||
| * the projected file schema, even if file schema may not originally contain it.) | ||
| * The graphs of the two schemas may, however, differ for list and map type because of multiple | ||
| * legacy formats and the canonical one. This class supports all directions of conversion. | ||
| * | ||
| * The projection strategy is: | ||
| * 1) traverse the two schemas and maintain only the fields in the read schema. | ||
| * 2) find collection type indicated by `repeated` type, and delegate it to respective list/map formatter. | ||
| * 3) wrap back the formatted repeated type with group type from projected read schema. This | ||
| * means the optional/required remains the same as that from projected read schema. | ||
| */ | ||
| private[scrooge] object ParquetCollectionFormatCompatibility { | ||
|
|
||
| private val logger = LoggerFactory.getLogger(getClass) | ||
|
|
||
| /** | ||
| * Project file schema to contain the same fields as the given projected read schema. | ||
| * The result is projected file schema with the same optional/required fields as the | ||
| * projected read schema, but collection type format as the file schema. | ||
| * | ||
| * @param projectedReadSchema read schema specifying field projection | ||
| * @param fileSchema file schema to be projected | ||
| */ | ||
| def projectFileSchema(fileSchema: MessageType, projectedReadSchema: MessageType): MessageType = { | ||
| val projectedFileSchema = projectFileType(fileSchema, projectedReadSchema, FieldContext()).asGroupType() | ||
| logger.debug(s"Projected read schema:\n${projectedReadSchema}\n" + | ||
| s"File schema:\n${fileSchema}\n" + | ||
| s"Projected file schema:\n${projectedFileSchema}") | ||
| new MessageType(projectedFileSchema.getName, projectedFileSchema.getFields) | ||
| } | ||
|
|
||
| /** | ||
| * Main recursion to get projected file type. Traverse given schemas, filter out unneeded | ||
| * fields, and format read schema's list/map node to file schema's structure. | ||
| * The formatting of repeated type is not to one-to-one node swapping because we also have to | ||
| * handle projection and possible nested collection types in the repeated type. | ||
| */ | ||
| private def projectFileType(fileType: Type, projectedReadType: Type, fieldContext: FieldContext): Type = { | ||
| (extractCollectionGroup(projectedReadType), extractCollectionGroup(fileType)) match { | ||
| case _ if projectedReadType.isPrimitive && fileType.isPrimitive => | ||
| projectedReadType | ||
| case _ if projectedReadType.isPrimitive != fileType.isPrimitive => | ||
| throw new DecodingSchemaMismatchException( | ||
| s"Found schema mismatch between projected read type:\n$projectedReadType\n" + | ||
| s"and file type:\n${fileType}" | ||
| ) | ||
| case (Some(projectedReadGroup: ListGroup), Some(fileGroup: ListGroup)) => | ||
| projectFileGroup(fileGroup, projectedReadGroup, fieldContext.copy(nestedListLevel = fieldContext.nestedListLevel + 1), formatter=ParquetListFormatter) | ||
| case (Some(projectedReadGroup: MapGroup), Some(fileGroup: MapGroup)) => | ||
| projectFileGroup(fileGroup, projectedReadGroup, fieldContext, formatter=ParquetMapFormatter) | ||
| case _ => // Struct projection | ||
| val projectedReadGroupType = projectedReadType.asGroupType | ||
| val fileGroupType = fileType.asGroupType | ||
| val projectedReadFields = projectedReadGroupType.getFields.asScala.map { projectedReadField => | ||
| if (!fileGroupType.containsField(projectedReadField.getName)) { | ||
| if (!projectedReadField.isRepetition(Repetition.OPTIONAL)) { | ||
| throw new DecodingSchemaMismatchException( | ||
| s"Found non-optional projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" + | ||
| s"not present in the given file group type:\n${fileGroupType}" | ||
| ) | ||
| } | ||
| projectedReadField | ||
| } else { | ||
| val fileFieldIndex = fileGroupType.getFieldIndex(projectedReadField.getName) | ||
| val fileField = fileGroupType.getFields.get(fileFieldIndex) | ||
| if (fileField.isRepetition(Repetition.OPTIONAL) && projectedReadField.isRepetition(Repetition.REQUIRED)) { | ||
| throw new DecodingSchemaMismatchException( | ||
| s"Found required projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" + | ||
| s"on optional file field:\n${fileField}" | ||
| ) | ||
| } | ||
| projectFileType(fileField, projectedReadField, FieldContext(projectedReadField.getName)) | ||
| } | ||
| } | ||
| projectedReadGroupType.withNewFields(projectedReadFields.asJava) | ||
| } | ||
| } | ||
|
|
||
| private def projectFileGroup(fileGroup: CollectionGroup, | ||
| projectedReadGroup: CollectionGroup, | ||
| fieldContext: FieldContext, | ||
| formatter: ParquetCollectionFormatter) = { | ||
| val projectedFileRepeatedType = formatter.formatCompatibleRepeatedType( | ||
| fileGroup.repeatedType, | ||
| projectedReadGroup.repeatedType, | ||
| fieldContext, | ||
| projectFileType | ||
| ) | ||
| // Respect optional/required from the projected read group. | ||
| projectedReadGroup.groupType.withNewFields(projectedFileRepeatedType) | ||
| } | ||
|
|
||
| private def extractCollectionGroup(typ: Type): Option[CollectionGroup] = { | ||
| ParquetListFormatter.extractGroup(typ).orElse(ParquetMapFormatter.extractGroup(typ)) | ||
| } | ||
| } | ||
|
|
||
| private[scrooge] trait ParquetCollectionFormatter { | ||
| /** | ||
| * Format source repeated type in the structure of target repeated type. | ||
| * | ||
| * @param readRepeatedType repeated type from which the formatted result get content | ||
| * @param fileRepeatedType repeated type from which the formatted result get the structure | ||
| * @param recursiveSolver solver for the inner content of the repeated type | ||
| * @return formatted result | ||
| */ | ||
| def formatCompatibleRepeatedType(fileRepeatedType: Type, | ||
| readRepeatedType: Type, | ||
| fieldContext: FieldContext, | ||
| recursiveSolver: (Type, Type, FieldContext) => Type): Type | ||
|
||
|
|
||
| /** | ||
| * Extract collection group containing repeated type of different formats. | ||
| */ | ||
| def extractGroup(typ: Type): Option[CollectionGroup] | ||
| } | ||
|
|
||
| /** | ||
| * Helper class to carry information from the field. Currently it only contains specific to list collection | ||
| * @param name field name | ||
| * @param nestedListLevel li | ||
| */ | ||
| private[scrooge] case class FieldContext(name: String="", nestedListLevel: Int=0) | ||
|
|
||
| private[scrooge] sealed trait CollectionGroup { | ||
| /** | ||
| * Type for the collection. | ||
| * For example, given the schema, | ||
| * required group my_list (LIST) { | ||
| * repeated group list { | ||
| * optional binary element (UTF8); | ||
| * } | ||
| * } | ||
| * [[groupType]] refers to this whole schema | ||
| * [[repeatedType]] refers to inner `repeated` schema | ||
| */ | ||
| def groupType: GroupType | ||
|
|
||
| def repeatedType: Type | ||
| } | ||
|
|
||
| private[scrooge] sealed case class MapGroup(groupType: GroupType, repeatedType: Type) extends CollectionGroup | ||
|
|
||
| private[scrooge] sealed case class ListGroup(groupType: GroupType, repeatedType: Type) extends CollectionGroup | ||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check that both types are the same primitive type? Or is that handled elsewhere? I imagine that certain types of primitive-type-changing projections might be supported (e.g. widening an int to a long) but others might not be (e.g. converting a string to an int).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That type compatibility is handled in a separate place here, so by the time this is called we should already have type compatibility.
scalding/scalding-parquet-scrooge/src/main/java/com/twitter/scalding/parquet/scrooge/ScroogeReadSupport.java
Lines 133 to 136 in 73ce124