Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e373ea2
forward list compat
mickjermsurawong-stripe Sep 27, 2019
721a490
bump travis to openjdk8
mickjermsurawong-stripe Sep 28, 2019
0adbfae
fail when projecting on list of struct
mickjermsurawong-stripe Sep 29, 2019
84c119a
recurse only on elements
mickjermsurawong-stripe Sep 29, 2019
6bf7652
check for non-optional extra fields for projection
mickjermsurawong-stripe Sep 30, 2019
018f267
move to scala
mickjermsurawong-stripe Sep 30, 2019
ae89c06
migrate test to different class
mickjermsurawong-stripe Sep 30, 2019
ca67991
handle map legacy format
mickjermsurawong-stripe Sep 30, 2019
b7363b8
format import
mickjermsurawong-stripe Sep 30, 2019
2996317
improve docs
mickjermsurawong-stripe Sep 30, 2019
e02d0a8
fix style warning
mickjermsurawong-stripe Sep 30, 2019
4d40c06
address PR feedback
mickjermsurawong-stripe Oct 1, 2019
6e551e5
undo import wildcard
mickjermsurawong-stripe Oct 1, 2019
436def8
fix warning descenents of sealed traits
mickjermsurawong-stripe Oct 1, 2019
7ba2305
remove duplicate test
mickjermsurawong-stripe Oct 1, 2019
d2f2fb5
improve test names and remove one duplicate
mickjermsurawong-stripe Oct 1, 2019
9c96d23
add docs
mickjermsurawong-stripe Oct 1, 2019
09bd4dd
Check on schema type mismatch
mickjermsurawong-stripe Oct 3, 2019
83b6898
explicit rename from source/target to projected read schema and file …
mickjermsurawong-stripe Oct 3, 2019
d0ed5d6
support creating _tuple format and generalize compat in all directions
mickjermsurawong-stripe Oct 3, 2019
5de8b64
support legacy spark list of nullable elements
mickjermsurawong-stripe Oct 3, 2019
db937c3
improve docs
mickjermsurawong-stripe Oct 3, 2019
0c38af8
add tests for all supported list compat conversions
mickjermsurawong-stripe Oct 3, 2019
3700a85
improve docs
mickjermsurawong-stripe Oct 4, 2019
3b7255b
remove classtag inference
mickjermsurawong-stripe Oct 4, 2019
3a9be2e
rename schema name and use consistent method
mickjermsurawong-stripe Oct 4, 2019
5b5cf2b
file rename to drop "forward" compat
mickjermsurawong-stripe Oct 4, 2019
e29c7e9
test rename and make variables consistent
mickjermsurawong-stripe Oct 4, 2019
cd7e69c
make names file/read oriented
mickjermsurawong-stripe Oct 4, 2019
b05f13c
improve test make sure formatted type is still compatible with given …
mickjermsurawong-stripe Oct 4, 2019
7bb0382
check for field optional/required
mickjermsurawong-stripe Oct 4, 2019
6365db8
Review suggestions for https://github.com/twitter/scalding/pull/1921 …
joshrosen-stripe Oct 5, 2019
b6c8a92
improve code coverage and remove dead code after restructuring
mickjermsurawong-stripe Oct 5, 2019
579905c
auto-format from running sbt test
mickjermsurawong-stripe Oct 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
language: scala
jdk: oraclejdk8
jdk: openjdk8
sudo: false

before_install:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ public static MessageType getSchemaForRead(MessageType fileMessageType, String p
*/
public static MessageType getSchemaForRead(MessageType fileMessageType, MessageType projectedMessageType) {
assertGroupsAreCompatible(fileMessageType, projectedMessageType);
return projectedMessageType;
return ParquetCollectionFormatForwardCompatibility.projectFileSchema(
projectedMessageType, fileMessageType
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package com.twitter.scalding.parquet.scrooge

import scala.collection.JavaConverters._
import org.apache.parquet.schema.Type.Repetition
import org.apache.parquet.schema.{GroupType, MessageType, Type}
import org.apache.parquet.thrift.DecodingSchemaMismatchException
import org.slf4j.LoggerFactory

import scala.reflect.ClassTag

/**
* Project file schema to have collection types--list and map--in the same structure
* as projected read schema. This is currently used in [[ScroogeReadSupport]] where projected
* read schema can come from:
* 1) Thrift struct via [[org.apache.parquet.thrift.ThriftSchemaConvertVisitor]] which always
* describe list with `_tuple` format, and map which has `MAP_KEY_VALUE` annotation.
* 2) User-supplied schema string via config key
* [[org.apache.parquet.hadoop.api.ReadSupport.PARQUET_READ_SCHEMA]]
*
* The strategy of this class is to first assume that the projected read schema is a "sub-graph" of
* file schema in terms of field names. (We allow optional field in projected read schema to be in
* the projected file schema.) However, the data types for collection can differ in
* graph structure between the two schemas.
* We thus need to:
* 1) traverse the two schemas until we find the collection type indicated by `repeated` type.
* 2) delegate the collection types found to respective list/map formatter.
*/
private[scrooge] object ParquetCollectionFormatForwardCompatibility {

private val logger = LoggerFactory.getLogger(getClass)

/**
* Project file schema to contain the same fields as the given projected read schema.
* The result projected file schema should have the same optional/required fields as the
* projected read schema, but maintain collection type format for the file schema.
*
* @param projectedReadSchema read schema specifying field projection
* @param fileSchema file schema to be projected
*/
def projectFileSchema(projectedReadSchema: MessageType, fileSchema: MessageType): MessageType = {
val projectedFileSchema = projectFileType(projectedReadSchema, fileSchema).asGroupType()
logger.debug(s"Projected read schema:\n${projectedReadSchema}\n" +
s"File schema:\n${fileSchema}\n" +
s"Projected file schema:\n${projectedFileSchema}")
new MessageType(projectedFileSchema.getName, projectedFileSchema.getFields)
}

/**
* Traverse given schemas and format node for list or map of projected read type to structure
* of file schema. The formatting is not to one-to-one node swapping between the two schemas
* because of the projection requirement.
*/
private def projectFileType(projectedReadType: Type, fileType: Type): Type = {
(extractCollectionGroup(projectedReadType), extractCollectionGroup(fileType)) match {
case _ if projectedReadType.isPrimitive && fileType.isPrimitive =>
projectedReadType
case _ if projectedReadType.isPrimitive != fileType.isPrimitive =>
throw new DecodingSchemaMismatchException(
s"Found schema mismatch between projected read type:\n$projectedReadType\n" +
s"and file type:\n${fileType}"
)
case (Some(projectedReadGroup: ListGroup), Some(fileGroup: ListGroup)) =>
projectFileGroup[ListGroup](projectedReadGroup, fileGroup)
case (Some(projectedReadGroup: MapGroup), Some(fileGroup: MapGroup)) =>
projectFileGroup[MapGroup](projectedReadGroup, fileGroup)
case _ => // Field projection
val projectedReadGroupType = projectedReadType.asGroupType
val fileGroupType = fileType.asGroupType
val projectedReadFields = projectedReadGroupType.getFields.asScala.map { projectedReadField =>
if (!fileGroupType.containsField(projectedReadField.getName)) {
if (!projectedReadField.isRepetition(Repetition.OPTIONAL)) {
throw new DecodingSchemaMismatchException(
s"Found non-optional projected read field ${projectedReadField.getName}:\n$projectedReadField\n\n" +
s"not present in the given file group type:\n${fileGroupType}"
)
}
projectedReadField
}
else {
val fileFieldIndex = fileGroupType.getFieldIndex(projectedReadField.getName)
val fileField = fileGroupType.getFields.get(fileFieldIndex)
projectFileType(projectedReadField, fileField)
}
}
projectedReadGroupType.withNewFields(projectedReadFields.asJava)
}
}

private def projectFileGroup[T <: CollectionGroup](projectedReadGroup: T,
fileGroup: T)(implicit t: ClassTag[T]): GroupType = {

val formatter = t.runtimeClass.asInstanceOf[Class[T]] match {
case c if c == classOf[MapGroup] => ParquetMapFormatter
case c if c == classOf[ListGroup] => ParquetListFormatter
}
val projectedFileRepeatedType = formatter.formatForwardCompatibleRepeatedType(
projectedReadGroup.repeatedType,
fileGroup.repeatedType,
projectFileType(_, _))
// Respect optional/required from the projected read group.
projectedReadGroup.groupType.withNewFields(projectedFileRepeatedType)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this end up letting through cases where the file's field is optional and the projection's field is required? If so we should explicitly detect and throw for that (probably in projectFileType).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the assertion at the "struct projection" which will have access of field-level optional/required

}

private def extractCollectionGroup(typ: Type): Option[CollectionGroup] = {
ParquetListFormatter.extractGroup(typ).orElse(ParquetMapFormatter.extractGroup(typ))
}
}

private[scrooge] trait ParquetCollectionFormatter {
/**
* Format source repeated type in the structure of target repeated type.
* @param sourceRepeatedType repeated type from which the formatted result get content
* @param targetRepeatedType repeated type from which the formatted result get the structure
* @param recursiveSolver solver for the inner content of the repeated type
* @return formatted result
*/
def formatForwardCompatibleRepeatedType(sourceRepeatedType: Type,
targetRepeatedType: Type,
recursiveSolver: (Type, Type) => Type): Type

/**
* Extract collection group containing repeated type of different formats.
*/
def extractGroup(typ: Type): Option[CollectionGroup]
}

private[scrooge] sealed trait CollectionGroup {
/**
* Type for the collection.
* For example, given the schema,
* required group my_list (LIST) {
* repeated group list {
* optional binary element (UTF8);
* }
* }
* [[groupType]] refers to this whole schema
* [[repeatedType]] refers to inner `repeated` schema
*/
def groupType: GroupType

def repeatedType: Type
}

private[scrooge] sealed case class MapGroup(groupType: GroupType, repeatedType: Type) extends CollectionGroup

private[scrooge] sealed case class ListGroup(groupType: GroupType, repeatedType: Type) extends CollectionGroup
Loading