Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright 2015 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.serialization

import com.twitter.scalding.serialization.JavaStreamEnrichments._

import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, InputStream, OutputStream }
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal

abstract class ComplexHelper[T] extends HasUnsafeCompareBinary[T] {
def staticSize: Option[Int] = None

protected def dynamicSizeWithoutLen(e: T): Option[Int]
final def dynamicSize(e: T) =
if (staticSize.isDefined) staticSize
else
dynamicSizeWithoutLen(e).map { e =>
e + posVarIntSize(e)
}
final def unsafeSize(t: T): Option[Int] = dynamicSizeWithoutLen(t)

/**
* This is the worst case: we have to serialize in a side buffer
* and then see how large it actually is. This happens for cases, like
* string, where the cost to see the serialized size is not cheaper than
* directly serializing.
*/
private[this] def noLengthWrite(element: T, outerOutputStream: OutputStream): Unit = {
// Start with pretty big buffers because reallocation will be expensive
val baos = new ByteArrayOutputStream(512)
unsafeWrite(baos, element)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implies that unsafeWrite means no size. Can we add that to the comments below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It implies no outer size, it can be a bit wasteful. I'm adding a comment to the definition of this method that I hope will help here a little:

  // This will write out the interior data as a blob with no prepended length
  // This means binary compare cannot skip on this data.
  // However the contract remains that one should be able to _read_ the data
  // back out again.
  def unsafeWrite(out: java.io.OutputStream, t: T): Unit

val len = baos.size
outerOutputStream.writePosVarInt(len)
baos.writeTo(outerOutputStream)
}

final override def write(into: OutputStream, e: T): Try[Unit] =
try {
if (staticSize.isDefined) {
unsafeWrite(into, e)
} else {
val dynSiz = dynamicSizeWithoutLen(e)
dynSiz match {
case Some(innerSiz) =>

into.writePosVarInt(innerSiz)
unsafeWrite(into, e)
case None =>
noLengthWrite(e, into)
}
}
Serialization.successUnit
} catch {
case NonFatal(e) =>
Failure(e)
}

final def read(in: InputStream): Try[T] =
try {
if (staticSize.isEmpty)
in.readPosVarInt

Success(unsafeRead(in))
} catch {
case NonFatal(e) =>
Failure(e)
}

final def compareBinary(inputStreamA: InputStream,
inputStreamB: InputStream): OrderedSerialization.Result =
try OrderedSerialization.resultFrom {
val lenA = staticSize.getOrElse(inputStreamA.readPosVarInt)
val lenB = staticSize.getOrElse(inputStreamB.readPosVarInt)

val posStreamA = PositionInputStream(inputStreamA)
val initialPositionA = posStreamA.position

val posStreamB = PositionInputStream(inputStreamB)
val initialPositionB = posStreamB.position

val innerR = unsafeCompareBinary(posStreamA, posStreamB)

posStreamA.seekToPosition(initialPositionA + lenA)
posStreamB.seekToPosition(initialPositionB + lenB)
innerR
} catch {
case NonFatal(e) =>
OrderedSerialization.CompareFailure(e)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.serialization

// We wrap types in Exported to provide low priority implicits
// the real type has a low priority implicit to extract from Exported
// into the original type.
// See more @ https://github.com/milessabin/export-hook
case class Exported[T](instance: T) extends AnyVal
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this pattern. Can you document it or link to an explanation? I know @travisbrown likes it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The general gist is just that by supplying instances of Exported you can import an object/package supplying them and inject them at a lower priority than a normal import.

So here if you import the default object it would just supply low priority implicits so won't override a user supplied one.

https://github.com/milessabin/export-hook

(i can add a link in the code too)

Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Copyright 2015 Twitter, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.twitter.scalding.serialization

import com.twitter.scalding.serialization.JavaStreamEnrichments._
import java.io.InputStream
import java.io.OutputStream

trait HasUnsafeCompareBinary[T] extends OrderedSerialization[T] {
def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int
def unsafeWrite(out: java.io.OutputStream, t: T): Unit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we drop java.io? Also, I think this has to be the unsized output if there is ever a size header added, right? It seems confusing above since it is used that way, but it is not clear.

It may or may not be sized? Can you add some laws about how to reason about these things?

def unsafeRead(in: java.io.InputStream): T
def unsafeSize(t: T): Option[Int]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the contract here? Again similar concerns as above.

}

object HasUnsafeCompareBinary {
def apply[T](ord: OrderedSerialization[T]): HasUnsafeCompareBinary[T] = ord match {
case e: HasUnsafeCompareBinary[T] => e
case o =>
new HasUnsafeCompareBinary[T] {
def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int =
o.compareBinary(inputStreamA, inputStreamB).unsafeToInt

def hash(x: T): Int = o.hash(x)

// Members declared in com.twitter.scalding.serialization.OrderedSerialization
def compareBinary(a: java.io.InputStream, b: java.io.InputStream): OrderedSerialization.Result =
o.compareBinary(a, b)

// Members declared in scala.math.Ordering
def compare(x: T, y: T): Int = o.compare(x, y)

def dynamicSize(e: T) = o.dynamicSize(e)
def unsafeSize(t: T): Option[Int] = o.dynamicSize(t)

// Members declared in com.twitter.scalding.serialization.Serialization
def read(in: java.io.InputStream): scala.util.Try[T] = o.read(in)
def staticSize: Option[Int] = o.staticSize
def unsafeWrite(out: java.io.OutputStream, t: T): Unit = o.write(out, t).get
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, unsafeWrite could have a size if the original did, but then I guess you could add two sizes, couldn't you (since above we might call noLengthWrite)?


def unsafeRead(in: java.io.InputStream): T = o.read(in).get

def write(out: java.io.OutputStream, t: T): scala.util.Try[Unit] = o.write(out, t)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ trait OrderedSerialization[T] extends Ordering[T] with Serialization[T] {
def compareBinary(a: InputStream, b: InputStream): OrderedSerialization.Result
}

object OrderedSerialization {
object OrderedSerialization extends LowPriorityOrderedSerialization {
/**
* Represents the result of a comparison that might fail due
* to an error deserializing
Expand Down Expand Up @@ -214,3 +214,8 @@ final case class DeserializingOrderedSerialization[T](serialization: Serializati
final override def staticSize = serialization.staticSize
final override def dynamicSize(t: T) = serialization.dynamicSize(t)
}

private[serialization] trait LowPriorityOrderedSerialization {
implicit final def importedOrderedSerialization[A](implicit exported: Exported[OrderedSerialization[A]]): OrderedSerialization[A] = exported.instance
}

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import scala.util.hashing.Hashing
* implementation. This must satisfy:
* (!equiv(a, b)) || (hash(a) == hash(b))
*/
trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable {
trait Serialization[T] extends Equiv[T] with Hashing[T] with Serializable with LowPrioritySerialization {
def read(in: InputStream): Try[T]
def write(out: OutputStream, t: T): Try[Unit]
/**
Expand Down Expand Up @@ -171,3 +171,7 @@ object Serialization {
sizeLaw,
transitivity)
}

private[serialization] trait LowPrioritySerialization {
implicit final def importedSerialization[A](implicit exported: Exported[Serialization[A]]): Serialization[A] = exported.instance
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ object OrderedSerializationProviderImpl {
val stringDispatcher = StringOrderedBuf.dispatch(c)
val traversablesDispatcher = TraversablesOrderedBuf.dispatch(c)(buildDispatcher)
val unitDispatcher = UnitOrderedBuf.dispatch(c)
val byteBufferDispatcher = ByteBufferOrderedBuf.dispatch(c)
val sealedTraitDispatcher = SealedTraitOrderedBuf.dispatch(c)(buildDispatcher)

OrderedSerializationProviderImpl
Expand All @@ -51,7 +50,6 @@ object OrderedSerializationProviderImpl {
.orElse(optionDispatcher)
.orElse(eitherDispatcher)
.orElse(stringDispatcher)
.orElse(byteBufferDispatcher)
.orElse(traversablesDispatcher)
.orElse(caseClassDispatcher)
.orElse(caseObjectDispatcher)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.twitter.scalding.serialization.provided

import com.twitter.scalding.serialization.JavaStreamEnrichments._
import java.nio.ByteBuffer
import java.io.InputStream
import com.twitter.scalding.serialization.ComplexHelper

object OrderedSerializationByteBuffer extends ComplexHelper[ByteBuffer] {
def hash(x: ByteBuffer): Int =
x.hashCode

def unsafeCompareBinary(inputStreamA: InputStream, inputStreamB: InputStream): Int = {
val lenA = inputStreamA.readPosVarInt
val lenB = inputStreamB.readPosVarInt
val queryLength = _root_.scala.math.min(lenA, lenB)
var incr = 0
var state = 0

while (incr < queryLength && state == 0) {
state = java.lang.Byte.compare(inputStreamA.readByte, inputStreamB.readByte)
incr = incr + 1
}
if (state == 0) {
java.lang.Integer.compare(lenA, lenB)
} else {
state
}
}

def unsafeWrite(outputStream: java.io.OutputStream, element: ByteBuffer): Unit = {
outputStream.writePosVarInt(element.remaining)
outputStream
.writeBytes(element.array, element.arrayOffset + element.position, element.remaining)
}

def unsafeRead(inputStream: java.io.InputStream): ByteBuffer = {
val lenA = inputStream.readPosVarInt
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we write an additional length header on this thing currently?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val bytes = new Array[Byte](lenA)
inputStream.readFully(bytes)
java.nio.ByteBuffer.wrap(bytes)
}

def compare(a: ByteBuffer, b: ByteBuffer): Int = a.compareTo(b)

def dynamicSizeWithoutLen(element: ByteBuffer): Option[Int] = Some {
val tmpLen = element.remaining
posVarIntSize(tmpLen) + tmpLen
}
}
Loading