Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.serialization

// We wrap types in Exported to provide low priority implicits
// the real type has a low priority implicit to extract from Exported
// into the original type.
// See more @ https://github.com/milessabin/export-hook
case class Exported[T](instance: T) extends AnyVal
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ trait OrderedSerialization[T] extends Ordering[T] with Serialization[T] {
* the InputStreams is mutated to be the end of the record.
*/
def compareBinary(a: InputStream, b: InputStream): OrderedSerialization.Result

/**
* This compares two InputStreams. After this call, the position in
* the InputStreams may or may not be at the end of the record.
*/
def compareBinaryNoConsume(a: InputStream, b: InputStream): OrderedSerialization.Result = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get really worried about how to compose methods like this that lack a strong contract. Also, I don't see that we ever call this.

compareBinary(a, b)
}
}

object OrderedSerialization {
object OrderedSerialization extends LowPriorityOrderedSerialization {
/**
* Represents the result of a comparison that might fail due
* to an error deserializing
Expand Down Expand Up @@ -214,3 +222,8 @@ final case class DeserializingOrderedSerialization[T](serialization: Serializati
final override def staticSize = serialization.staticSize
final override def dynamicSize(t: T) = serialization.dynamicSize(t)
}

private[serialization] trait LowPriorityOrderedSerialization {
implicit final def importedOrderedSerialization[A](implicit exported: Exported[OrderedSerialization[A]]): OrderedSerialization[A] = exported.instance
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import scala.util.hashing.Hashing
* implementation. This must satisfy:
* (!equiv(a, b)) || (hash(a) == hash(b))
*/
trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable {
trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable with LowPrioritySerialization {
def read(in: InputStream): Try[T]
def write(out: OutputStream, t: T): Try[Unit]

/**
* If all items have a static size, this returns Some, else None
* NOTE: lawful implementations that return Some here much return
Expand All @@ -53,6 +54,11 @@ trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable {
* otherwise the caller should just serialize into an ByteArrayOutputStream
*/
def dynamicSize(t: T): Option[Int]

// Override this to provide more efficient
def skip(in: InputStream): Try[Unit] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want def skip(count: Int, in: InputStream): Try[Unit] so in say List[T] we can skip the rest of the collection. If count <= 0 do nothing, and otherwise in the worst case just read and throw them away as you do below.

read(in).map{ _ => () }
}
}

/**
Expand Down Expand Up @@ -171,3 +177,7 @@ object Serialization {
sizeLaw,
transitivity)
}

private[serialization] trait LowPrioritySerialization {
implicit final def importedSerialization[A](implicit exported: Exported[Serialization[A]]): Serialization[A] = exported.instance
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,20 @@ object OrderedSerializationProviderImpl {

val primitiveDispatcher = PrimitiveOrderedBuf.dispatch(c)
val optionDispatcher = OptionOrderedBuf.dispatch(c)(buildDispatcher)
val eitherDispatcher = EitherOrderedBuf.dispatch(c)(buildDispatcher)
val caseClassDispatcher = CaseClassOrderedBuf.dispatch(c)(buildDispatcher)
val caseObjectDispatcher = CaseObjectOrderedBuf.dispatch(c)
val productDispatcher = ProductOrderedBuf.dispatch(c)(buildDispatcher)
val stringDispatcher = StringOrderedBuf.dispatch(c)
val traversablesDispatcher = TraversablesOrderedBuf.dispatch(c)(buildDispatcher)
val unitDispatcher = UnitOrderedBuf.dispatch(c)
val byteBufferDispatcher = ByteBufferOrderedBuf.dispatch(c)
val sealedTraitDispatcher = SealedTraitOrderedBuf.dispatch(c)(buildDispatcher)

OrderedSerializationProviderImpl
.normalizedDispatcher(c)(buildDispatcher)
.orElse(primitiveDispatcher)
.orElse(unitDispatcher)
.orElse(optionDispatcher)
.orElse(eitherDispatcher)
.orElse(stringDispatcher)
.orElse(byteBufferDispatcher)
.orElse(traversablesDispatcher)
.orElse(caseClassDispatcher)
.orElse(caseObjectDispatcher)
Expand Down

This file was deleted.

This file was deleted.

Loading