Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
dist: trusty

language: scala

scala:
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ lazy val contributors = Seq(
, "mrauilm" -> "Milan Raulim"
)

val fs2Version = "1.0.0-M2"
val fs2Version = "2.0.0"

lazy val commonSettings = Seq(
organization := "com.spinoco",
Expand Down
10 changes: 5 additions & 5 deletions src/main/scala/spinoco/fs2/mail/encoding/base64.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object base64 {
*/
def encodeRaw[F[_]](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = {
def go(rem:ByteVector)(s: Stream[F,Byte]):Pull[F, Byte, Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case None =>
if (rem.size == 0) Pull.done
else Pull.output(ByteVectorChunk(ByteVector.view(rem.toBase64(alphabet).getBytes)))
Expand Down Expand Up @@ -59,10 +59,10 @@ object base64 {
* Decodes base64 encoded stream with supplied alphabet. Whitespaces are ignored.
* Decoding is lazy to support very large Base64 bodies (i.e. email)
*/
def decodeRaw[F[_]](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = {
def decodeRaw[F[_]: RaiseThrowable](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = {
val Pad = alphabet.pad
def go(remAcc:BitVector)(s:Stream[F, Byte]):Pull[F, Byte, Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case None => Pull.done

case Some((chunk,tl)) =>
Expand Down Expand Up @@ -102,11 +102,11 @@ object base64 {
}

/** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]]. Whitespaces are ignored **/
def decodeUrl[F[_]]:Pipe[F, Byte, Byte] =
def decodeUrl[F[_]: RaiseThrowable]:Pipe[F, Byte, Byte] =
decodeRaw(Alphabets.Base64Url)

/** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-4 RF4648 section 4]] **/
def decode[F[_]]:Pipe[F, Byte, Byte] =
def decode[F[_]: RaiseThrowable]:Pipe[F, Byte, Byte] =
decodeRaw(Alphabets.Base64)

}
4 changes: 2 additions & 2 deletions src/main/scala/spinoco/fs2/mail/encoding/charset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object charset {
)) flatMap { decoder =>

def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, Char, Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case Some((chunk, tail)) =>
if (chunk.isEmpty) go(buff)(tail)
else {
Expand Down Expand Up @@ -113,7 +113,7 @@ object charset {
Stream.eval(F.delay(chs.newEncoder())) flatMap { encoder =>

def go(buff: String)(s: Stream[F, Char]): Pull[F, Byte, Unit] = {
s.pull.unconsChunk flatMap {
s.pull.uncons flatMap {
case Some((chunk, tail)) =>
val s = buff + StringChunk.asString(chunk)
val chb = CharBuffer.wrap(s)
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/spinoco/fs2/mail/encoding/lines.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ object lines {
/** decodes bytes in chunk of bytes by supplied separator. Last line is emitted even when not terminated by `separator` **/
def by[F[_]](separator: ByteVector): Pipe[F, Byte, Chunk[Byte]] = {
def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, Chunk[Byte], Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case Some((ch, tl)) =>
val bs = ch.toBytes
val data = buff ++ ByteVector.view(bs.values, bs.offset, bs.size)
Expand Down Expand Up @@ -64,7 +64,7 @@ object lines {
go(bv)(tl)
}
} else {
val chunks = bv.grouped(length)
val chunks = bv.grouped(length).toList
val lastChunk = chunks.lastOption.getOrElse(ByteVector.empty)
val chunksOut = if (lastChunk.nonEmpty) chunks.init else chunks
Pull.output(ByteVectorChunk(result ++ ByteVector.concat(chunksOut.map(h => prefixBytes ++ h ++ crlf)))) >> go(lastChunk)(tl)
Expand All @@ -76,7 +76,7 @@ object lines {
}
}

s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case Some((ch, tl)) =>
val bs = ch.toBytes
val bv = buff ++ ByteVector.view(bs.values, bs.offset, bs.size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object quotedPrintable {
* @tparam F
* @return
*/
def decode[F[_]]: Pipe[F, Byte, Byte] = {
def decode[F[_]](implicit rt: RaiseThrowable[F]): Pipe[F, Byte, Byte] = {
@tailrec
def decodeBV(rem: ByteVector, acc: ByteVector):Either[String, (ByteVector, ByteVector)] = {
val eqIdx = rem.indexOfSlice(`=`)
Expand All @@ -47,7 +47,7 @@ object quotedPrintable {


def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, Byte, Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case Some((chunk, tl)) =>
val bs = chunk.toBytes
val bv = buff ++ ByteVector.view(bs.values, bs.offset, bs.size)
Expand Down
24 changes: 12 additions & 12 deletions src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ object IMAPClient {
Pull.raiseError(new Throwable("Invalid client state initial response not received"))

}
.stream
.void.stream
.onFinalize(requestSemaphore.release)
}
}
Expand Down Expand Up @@ -429,16 +429,16 @@ object IMAPClient {
* @tparam F
* @return
*/
def fetchBytesOf[F[_]](
def fetchBytesOf[F[_]: RaiseThrowable](
contentIdx: Int
, contentKey: String
, encoding: String
): Pipe[F, (Int, String, IMAPData), Byte] = {

val decoder: Pipe[F, Byte, Byte] = { s =>
encoding.toUpperCase match {
case "BASE64" => base64.decode[F](s)
case "QUOTED-PRINTABLE" => quotedPrintable.decode[F](s)
case "BASE64" => s through base64.decode[F]
case "QUOTED-PRINTABLE" => s through quotedPrintable.decode[F]
case _ => s
}
}
Expand All @@ -455,7 +455,7 @@ object IMAPClient {
* @param encoding Encoding of the data
* @param charsetName Name of the charset of the text. If empty, UTF-8 will be used instead
*/
def fetchTextOf[F[_]: Sync](
def fetchTextOf[F[_]: Sync: RaiseThrowable](
contentIdx: Int
, contentKey: String
, encoding: String
Expand All @@ -469,8 +469,8 @@ object IMAPClient {

val decoder: Pipe[F, Byte, Char] = { s =>
encoding.toUpperCase match {
case "BASE64" => base64.decode[F](s) through charset.decode(chs)
case "QUOTED-PRINTABLE" => quotedPrintable.decode[F](s) through charset.decode(chs)
case "BASE64" => s through base64.decode[F] through charset.decode(chs)
case "QUOTED-PRINTABLE" => s through quotedPrintable.decode[F] through charset.decode(chs)
case _ => s through charset.decode(chs)
}
}
Expand Down Expand Up @@ -520,7 +520,7 @@ object IMAPClient {
*
* This requires UID content and BODY[HEADER] content to be present in map otherwise this will fail.
*/
def mkEmailHeader[F[_]](
def mkEmailHeader[F[_]: RaiseThrowable](
headerCodec: Codec[EmailHeader]
): Pipe[F, Map[String, Vector[IMAPData]], IMAPEmailHeader] = {
def asString(data: Vector[IMAPData]): Either[String, String] = {
Expand Down Expand Up @@ -610,7 +610,7 @@ object IMAPClient {
* @tparam F
* @return
*/
def rawContent[F[_]](result: RequestResult[F]): Stream[F, (Int, String, IMAPData)] = {
def rawContent[F[_]: RaiseThrowable](result: RequestResult[F]): Stream[F, (Int, String, IMAPData)] = {
def collectBytes(recordIdx: Int, key: String)(s: Stream[F, IMAPData]): Pull[F, (Int, String, IMAPData), Unit] = {
def go(s: Stream[F, IMAPData]): Pull[F, (Int, String, IMAPData), Unit] = {
s.pull.uncons1.flatMap {
Expand Down Expand Up @@ -737,11 +737,11 @@ object IMAPClient {
* various encodings.
*
*/
def lines[F[_]]: Pipe[F, Byte, IMAPData] = {
def lines[F[_]: RaiseThrowable]: Pipe[F, Byte, IMAPData] = {
val crlf = ByteVector.view("\r\n".getBytes)

def collectChunk(sz: Int)(s: Stream[F, Byte]): Pull[F, IMAPData, Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case Some((chunk, tl)) =>
val bs = chunk.toBytes
val bv = ByteVector.view(bs.values, bs.offset, bs.size)
Expand All @@ -757,7 +757,7 @@ object IMAPClient {
}

def collectLines(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, IMAPData, Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case Some((chunk, tl)) =>
val bs = chunk.toBytes
val nb = buff ++ ByteVector.view(bs.values, bs.offset, bs.size)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/spinoco/fs2/mail/internal/internal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package object internal {
* Once downstream finishes this synchronous queue is read until exhaustion marked by [[None]].
*/
def takeThroughDrain[F[_] : Concurrent, A](predicate: A => Boolean): Pipe[F, A, A] = { source =>
Stream.eval(fs2.async.synchronousQueue[F, Option[Either[Throwable, A]]]).flatMap{ feedQueue =>
Stream.eval(fs2.concurrent.Queue.synchronous[F, Option[Either[Throwable, A]]]).flatMap{ feedQueue =>

// dequeue and propagate errors to downstream
def dequeue:Stream[F, A] = {
Expand Down Expand Up @@ -48,4 +48,4 @@ package object internal {
}}
}

}
}
10 changes: 5 additions & 5 deletions src/main/scala/spinoco/fs2/mail/smtp/SMTPClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ object SMTPClient {
* It assumes the server never sends more data than very last \r\n, as the other data received will be discarded.
*
*/
def readResponse[F[_]]: Pipe[F, Byte, SMTPResponse] = {
def readResponse[F[_]: RaiseThrowable]: Pipe[F, Byte, SMTPResponse] = {
def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, String, Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case Some((ch, tl)) =>
val bs = ch.toBytes
val bv = buff ++ ByteVector.view(bs.values, bs.offset, bs.size)
Expand Down Expand Up @@ -244,7 +244,7 @@ object SMTPClient {
)(data: Stream[F, Byte])(implicit socketRef: Ref[F, Socket[F]]): F[Seq[SMTPResponse]] = {
socketRef.get.flatMap { socket =>
sending.acquire >>
(data.to(socket.writes()).compile.drain >>
(data.through(socket.writes()).compile.drain >>
socket.reads(1024, Some(timeout)).through(readResponse).compile.toVector
).attempt flatMap {
case Right(rslt) => sending.release as rslt
Expand Down Expand Up @@ -414,7 +414,7 @@ object SMTPClient {
*/
def insertDotIfNeeded[F[_]]: Pipe[F, Byte, Byte] = {
def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, Byte, Unit] = {
s.pull.unconsChunk.flatMap {
s.pull.uncons.flatMap {
case Some((ch, tl)) =>
val bs = ch.toBytes
val bv = buff ++ ByteVector.view(bs.values, bs.offset, bs.size)
Expand Down Expand Up @@ -511,7 +511,7 @@ object SMTPClient {
* @tparam F
* @return
*/
def encodeMimeBody[F[_]](
def encodeMimeBody[F[_]: RaiseThrowable](
header: EmailHeader
, body: MIMEPart[F]
, emailHeaderCodec: Codec[EmailHeader]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ import org.scalacheck.Properties
import org.scalacheck.Prop._
import spinoco.fs2.mail.imap.IMAPClient.impl.{IMAPData, IMAPText}

import scala.concurrent.ExecutionContext

object IMAPClientCommandSpec extends Properties("IMAPClient.request") {

import scala.concurrent.ExecutionContext.Implicits.global
private implicit val cs = IO.contextShift(ExecutionContext.global)
private implicit val c = IO.ioConcurrentEffect

def createTagged(shouldFail: IO[Boolean], count: Int, done: IO[Unit]): Stream[IO, IMAPData] = {
Stream.unfoldEval(count){ s =>
Expand Down
22 changes: 12 additions & 10 deletions src/test/scala/spinoco/fs2/mail/internal/TakeThroughDrainSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ package spinoco.fs2.mail.internal
import cats.effect.IO
import org.scalacheck.Properties
import org.scalacheck.Prop._

import fs2._
import spinoco.fs2.mail.internal

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){

import scala.concurrent.ExecutionContext.Implicits.global
private implicit val cs = IO.contextShift(ExecutionContext.global)
private implicit val c = IO.ioConcurrentEffect

property("early-terminated.drain") = protect{

Expand All @@ -19,9 +21,9 @@ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){
// one means a tag and three means some other data
val source = Stream[IO, Int](2, 2, 2, 2, 2, 2, 1, 3).covary[IO]

fs2.async.unboundedQueue[IO, Int].flatMap{ queue =>
fs2.concurrent.Queue.unbounded[IO, Int].flatMap{ queue =>

(source.to(queue.enqueue).drain ++
(source.through(queue.enqueue).drain ++
queue.dequeue.through(internal.takeThroughDrain(_ != 1)).take(2).drain ++
Stream.eval(queue.dequeue1)
).compile.last
Expand All @@ -35,8 +37,8 @@ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){
// one means a tag and three means some other data
val source = Stream[IO, Int](2, 2, 2, 2, 2, 2, 1, 3).covary[IO]

fs2.async.unboundedQueue[IO, Int].flatMap{ queue =>
(source.to(queue.enqueue).drain ++
fs2.concurrent.Queue.unbounded[IO, Int].flatMap{ queue =>
(source.through(queue.enqueue).drain ++
queue.dequeue.through(internal.takeThroughDrain(_ != 1)).drain ++
Stream.eval(queue.dequeue1)
).compile.last
Expand All @@ -48,8 +50,8 @@ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){
property("propagate-failure-from-source") = protect {
val source = Stream[IO, Int](2, 2, 2, 2, 2, 2, 1, 3).covary[IO] ++ Stream.raiseError(Boom)

fs2.async.unboundedQueue[IO, Int].flatMap{ queue =>
(source.to(queue.enqueue).drain ++
fs2.concurrent.Queue.unbounded[IO, Int].flatMap{ queue =>
(source.through(queue.enqueue).drain ++
queue.dequeue.through(internal.takeThroughDrain(_ != 100)).drain ++
Stream.eval(queue.dequeue1)
).compile.last
Expand All @@ -59,8 +61,8 @@ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){
property("propagate-failure-on-finalize") = protect {
val source = Stream[IO, Int](2, 2, 2, 2, 2, 2, 1, 3).covary[IO] ++ Stream.raiseError(Boom)

fs2.async.unboundedQueue[IO, Int].flatMap{ queue =>
(source.to(queue.enqueue).drain ++
fs2.concurrent.Queue.unbounded[IO, Int].flatMap{ queue =>
(source.through(queue.enqueue).drain ++
queue.dequeue.through(internal.takeThroughDrain(_ != 1)).drain ++
Stream.eval(queue.dequeue1)
).compile.last
Expand Down