Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.serialization

// We wrap types in Exported to provide low priority implicits
// the real type has a low priority implicit to extract from Exported
// into the original type.
// See more @ https://github.com/milessabin/export-hook
case class Exported[T](instance: T) extends AnyVal
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this pattern. Can you document it or link to an explanation? I know @travisbrown likes it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general gist is just that by supplying instances of Exported you can import an object/package supplying them and inject them at a lower priority than a normal import.

So here if you import the default object it would just supply low priority implicits so won't override a user supplied one.

https://github.com/milessabin/export-hook

(i can add a link in the code too)

Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,17 @@ trait OrderedSerialization[T] extends Ordering[T] with Serialization[T] {
* the InputStreams is mutated to be the end of the record.
*/
def compareBinary(a: InputStream, b: InputStream): OrderedSerialization.Result

/**
* This compares two InputStreams. After this call, the position in
* the InputStreams may or may not be at the end of the record.
*/
def compareBinaryNoConsume(a: InputStream, b: InputStream): OrderedSerialization.Result = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get really worried about how to compose methods like this that lack a strong contract. Also, I don't see that we ever call this.

compareBinary(a, b)
}
}

object OrderedSerialization {
object OrderedSerialization extends LowPriorityOrderedSerialization {
/**
* Represents the result of a comparison that might fail due
* to an error deserializing
Expand Down Expand Up @@ -214,3 +222,8 @@ final case class DeserializingOrderedSerialization[T](serialization: Serializati
final override def staticSize = serialization.staticSize
final override def dynamicSize(t: T) = serialization.dynamicSize(t)
}

private[serialization] trait LowPriorityOrderedSerialization {
implicit final def importedOrderedSerialization[A](implicit exported: Exported[OrderedSerialization[A]]): OrderedSerialization[A] = exported.instance
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@ import scala.util.hashing.Hashing
* implementation. This must satisfy:
* (!equiv(a, b)) || (hash(a) == hash(b))
*/
trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable {
trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable with LowPrioritySerialization {
def read(in: InputStream): Try[T]
def write(out: OutputStream, t: T): Try[Unit]

/**
* If all items have a static size, this returns Some, else None
* NOTE: lawful implementations that return Some here much return
Expand All @@ -53,6 +54,11 @@ trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable {
* otherwise the caller should just serialize into an ByteArrayOutputStream
*/
def dynamicSize(t: T): Option[Int]

// Override this to provide more efficient
def skip(in: InputStream): Try[Unit] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we may want def skip(count: Int, in: InputStream): Try[Unit] so in say List[T] we can skip the rest of the collection. If count <= 0 do nothing, and otherwise in the worst case just read and throw them away as you do below.

read(in).map{ _ => () }
}
}

/**
Expand Down Expand Up @@ -171,3 +177,7 @@ object Serialization {
sizeLaw,
transitivity)
}

private[serialization] trait LowPrioritySerialization {
implicit final def importedSerialization[A](implicit exported: Exported[Serialization[A]]): Serialization[A] = exported.instance
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ object OrderedSerializationProviderImpl {
val stringDispatcher = StringOrderedBuf.dispatch(c)
val traversablesDispatcher = TraversablesOrderedBuf.dispatch(c)(buildDispatcher)
val unitDispatcher = UnitOrderedBuf.dispatch(c)
val byteBufferDispatcher = ByteBufferOrderedBuf.dispatch(c)
val sealedTraitDispatcher = SealedTraitOrderedBuf.dispatch(c)(buildDispatcher)

OrderedSerializationProviderImpl
Expand All @@ -51,7 +50,6 @@ object OrderedSerializationProviderImpl {
.orElse(optionDispatcher)
.orElse(eitherDispatcher)
.orElse(stringDispatcher)
.orElse(byteBufferDispatcher)
.orElse(traversablesDispatcher)
.orElse(caseClassDispatcher)
.orElse(caseObjectDispatcher)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.twitter.scalding.serialization.provided

import com.twitter.scalding.serialization.JavaStreamEnrichments._
import java.nio.ByteBuffer
import java.io.InputStream
import com.twitter.scalding.serialization.Serialization
import com.twitter.scalding.serialization.JavaStreamEnrichments._
import scala.math.min
import scala.util.{ Failure, Try }
import scala.util.control.NonFatal

object OrderedSerializationByteBuffer extends UnsafeOrderedSerialization[ByteBuffer] {

override def staticSize: Option[Int] = None

def hash(x: ByteBuffer): Int =
x.hashCode

override def unsafeSwitchingCompareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream, consumeToEnd: Boolean): Int = {
val lenA = inputStreamA.readPosVarInt
val lenB = inputStreamB.readPosVarInt
val queryLength = scala.math.min(lenA, lenB)
var incr = 0
var state = 0

while (incr < queryLength && state == 0) {
state = java.lang.Byte.compare(inputStreamA.readByte, inputStreamB.readByte)
incr = incr + 1
}
if (consumeToEnd) {
inputStreamA.skip(lenA - incr)
inputStreamB.skip(lenB - incr)
}
if (state == 0) {
java.lang.Integer.compare(lenA, lenB)
} else {
state
}
}

override def skip(inputStream: InputStream): Try[Unit] = {
try {
val lenA = inputStream.readPosVarInt
inputStream.skip(lenA)
Serialization.successUnit
} catch {
case NonFatal(e) =>
Failure(e)
}
}

def unsafeWrite(outputStream: java.io.OutputStream, element: ByteBuffer): Unit = {
outputStream.writePosVarInt(element.remaining)
outputStream
.writeBytes(element.array, element.arrayOffset + element.position, element.remaining)
}

def unsafeRead(inputStream: java.io.InputStream): ByteBuffer = {
val lenA = inputStream.readPosVarInt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we write an additional length header on this thing currently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val bytes = new Array[Byte](lenA)
inputStream.readFully(bytes)
java.nio.ByteBuffer.wrap(bytes)
}

def compare(a: ByteBuffer, b: ByteBuffer): Int = a.compareTo(b)

def dynamicSize(element: ByteBuffer): Option[Int] = Some {
val tmpLen = element.remaining
posVarIntSize(tmpLen) + tmpLen
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2015 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.serialization.provided

import com.twitter.scalding.serialization.JavaStreamEnrichments._

import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream }
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
import com.twitter.scalding.serialization._

abstract class UnsafeOrderedSerialization[T] extends OrderedSerialization[T] {
// This will write out the interior data as a blob with no prepended length
// This means binary compare cannot skip on this data.
// However the contract remains that one should be able to _read_ the data
// back out again.
def unsafeWrite(out: java.io.OutputStream, t: T): Unit
// This is an internal read method that matches the unsafe write
// importantly any outer length header added to enable skipping isn't included here
def unsafeRead(in: java.io.InputStream): T

// This is an inner binary compare that the user must supply
def unsafeSwitchingCompareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream, shouldConsume: Boolean): Int

// This is the public write method, if we need to inject a size head of the object
// this is where we do it!
final override def write(into: OutputStream, e: T): Try[Unit] =
try {
unsafeWrite(into, e)
Serialization.successUnit
} catch {
case NonFatal(e) =>
Failure(e)
}

final def read(in: InputStream): Try[T] =
try {
Success(unsafeRead(in))
} catch {
case NonFatal(e) =>
Failure(e)
}

override def compareBinaryNoConsume(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be final?

try {
val r = unsafeSwitchingCompareBinaryNoConsume(inputStreamA, inputStreamB, false)
OrderedSerialization.resultFrom(r)
} catch {
case NonFatal(e) =>
OrderedSerialization.CompareFailure(e)
}

override def compareBinary(inputStreamA: InputStream, inputStreamB: InputStream): OrderedSerialization.Result =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be final?

try {
val r = unsafeSwitchingCompareBinaryNoConsume(inputStreamA, inputStreamB, true)
OrderedSerialization.resultFrom(r)
} catch {
case NonFatal(e) =>
OrderedSerialization.CompareFailure(e)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.twitter.scalding.serialization

import java.nio.ByteBuffer

import com.twitter.scalding.serialization.provided.{ OrderedSerializationByteBuffer }

package object provided {
implicit val byteBufferOrderedSerialization: Exported[OrderedSerialization[ByteBuffer]] = Exported(OrderedSerializationByteBuffer)
}
Loading