diff --git a/.travis.yml b/.travis.yml index 9258dbb..bd584a3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,3 +1,5 @@ +dist: trusty + language: scala scala: diff --git a/build.sbt b/build.sbt index f2e174e..887f268 100644 --- a/build.sbt +++ b/build.sbt @@ -8,7 +8,7 @@ lazy val contributors = Seq( , "mrauilm" -> "Milan Raulim" ) -val fs2Version = "1.0.0-M2" +val fs2Version = "2.0.0" lazy val commonSettings = Seq( organization := "com.spinoco", diff --git a/src/main/scala/spinoco/fs2/mail/encoding/base64.scala b/src/main/scala/spinoco/fs2/mail/encoding/base64.scala index 8a588bb..ce78975 100644 --- a/src/main/scala/spinoco/fs2/mail/encoding/base64.scala +++ b/src/main/scala/spinoco/fs2/mail/encoding/base64.scala @@ -16,7 +16,7 @@ object base64 { */ def encodeRaw[F[_]](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = { def go(rem:ByteVector)(s: Stream[F,Byte]):Pull[F, Byte, Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case None => if (rem.size == 0) Pull.done else Pull.output(ByteVectorChunk(ByteVector.view(rem.toBase64(alphabet).getBytes))) @@ -59,10 +59,10 @@ object base64 { * Decodes base64 encoded stream with supplied alphabet. Whitespaces are ignored. * Decoding is lazy to support very large Base64 bodies (i.e. email) */ - def decodeRaw[F[_]](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = { + def decodeRaw[F[_]: RaiseThrowable](alphabet:Base64Alphabet):Pipe[F, Byte, Byte] = { val Pad = alphabet.pad def go(remAcc:BitVector)(s:Stream[F, Byte]):Pull[F, Byte, Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case None => Pull.done case Some((chunk,tl)) => @@ -102,11 +102,11 @@ object base64 { } /** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-5 RF4648 section 5]]. Whitespaces are ignored **/ - def decodeUrl[F[_]]:Pipe[F, Byte, Byte] = + def decodeUrl[F[_]: RaiseThrowable]:Pipe[F, Byte, Byte] = decodeRaw(Alphabets.Base64Url) /** decodes base64 encoded stream [[http://tools.ietf.org/html/rfc4648#section-4 RF4648 section 4]] **/ - def decode[F[_]]:Pipe[F, Byte, Byte] = + def decode[F[_]: RaiseThrowable]:Pipe[F, Byte, Byte] = decodeRaw(Alphabets.Base64) } diff --git a/src/main/scala/spinoco/fs2/mail/encoding/charset.scala b/src/main/scala/spinoco/fs2/mail/encoding/charset.scala index 7b09c07..7a26fb0 100644 --- a/src/main/scala/spinoco/fs2/mail/encoding/charset.scala +++ b/src/main/scala/spinoco/fs2/mail/encoding/charset.scala @@ -44,7 +44,7 @@ object charset { )) flatMap { decoder => def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, Char, Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case Some((chunk, tail)) => if (chunk.isEmpty) go(buff)(tail) else { @@ -113,7 +113,7 @@ object charset { Stream.eval(F.delay(chs.newEncoder())) flatMap { encoder => def go(buff: String)(s: Stream[F, Char]): Pull[F, Byte, Unit] = { - s.pull.unconsChunk flatMap { + s.pull.uncons flatMap { case Some((chunk, tail)) => val s = buff + StringChunk.asString(chunk) val chb = CharBuffer.wrap(s) diff --git a/src/main/scala/spinoco/fs2/mail/encoding/lines.scala b/src/main/scala/spinoco/fs2/mail/encoding/lines.scala index e3be6fc..7d81d72 100644 --- a/src/main/scala/spinoco/fs2/mail/encoding/lines.scala +++ b/src/main/scala/spinoco/fs2/mail/encoding/lines.scala @@ -12,7 +12,7 @@ object lines { /** decodes bytes in chunk of bytes by supplied separator. Last line is emitted even when not terminated by `separator` **/ def by[F[_]](separator: ByteVector): Pipe[F, Byte, Chunk[Byte]] = { def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, Chunk[Byte], Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case Some((ch, tl)) => val bs = ch.toBytes val data = buff ++ ByteVector.view(bs.values, bs.offset, bs.size) @@ -64,7 +64,7 @@ object lines { go(bv)(tl) } } else { - val chunks = bv.grouped(length) + val chunks = bv.grouped(length).toList val lastChunk = chunks.lastOption.getOrElse(ByteVector.empty) val chunksOut = if (lastChunk.nonEmpty) chunks.init else chunks Pull.output(ByteVectorChunk(result ++ ByteVector.concat(chunksOut.map(h => prefixBytes ++ h ++ crlf)))) >> go(lastChunk)(tl) @@ -76,7 +76,7 @@ object lines { } } - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case Some((ch, tl)) => val bs = ch.toBytes val bv = buff ++ ByteVector.view(bs.values, bs.offset, bs.size) diff --git a/src/main/scala/spinoco/fs2/mail/encoding/quotedPrintable.scala b/src/main/scala/spinoco/fs2/mail/encoding/quotedPrintable.scala index b70b90c..c3e3bb5 100644 --- a/src/main/scala/spinoco/fs2/mail/encoding/quotedPrintable.scala +++ b/src/main/scala/spinoco/fs2/mail/encoding/quotedPrintable.scala @@ -22,7 +22,7 @@ object quotedPrintable { * @tparam F * @return */ - def decode[F[_]]: Pipe[F, Byte, Byte] = { + def decode[F[_]](implicit rt: RaiseThrowable[F]): Pipe[F, Byte, Byte] = { @tailrec def decodeBV(rem: ByteVector, acc: ByteVector):Either[String, (ByteVector, ByteVector)] = { val eqIdx = rem.indexOfSlice(`=`) @@ -47,7 +47,7 @@ object quotedPrintable { def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, Byte, Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case Some((chunk, tl)) => val bs = chunk.toBytes val bv = buff ++ ByteVector.view(bs.values, bs.offset, bs.size) diff --git a/src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala b/src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala index 10c018b..e19d33b 100644 --- a/src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala +++ b/src/main/scala/spinoco/fs2/mail/imap/IMAPClient.scala @@ -314,7 +314,7 @@ object IMAPClient { Pull.raiseError(new Throwable("Invalid client state initial response not received")) } - .stream + .void.stream .onFinalize(requestSemaphore.release) } } @@ -429,7 +429,7 @@ object IMAPClient { * @tparam F * @return */ - def fetchBytesOf[F[_]]( + def fetchBytesOf[F[_]: RaiseThrowable]( contentIdx: Int , contentKey: String , encoding: String @@ -437,8 +437,8 @@ object IMAPClient { val decoder: Pipe[F, Byte, Byte] = { s => encoding.toUpperCase match { - case "BASE64" => base64.decode[F](s) - case "QUOTED-PRINTABLE" => quotedPrintable.decode[F](s) + case "BASE64" => s through base64.decode[F] + case "QUOTED-PRINTABLE" => s through quotedPrintable.decode[F] case _ => s } } @@ -455,7 +455,7 @@ object IMAPClient { * @param encoding Encoding of the data * @param charsetName Name of the charset of the text. If empty, UTF-8 will be used instead */ - def fetchTextOf[F[_]: Sync]( + def fetchTextOf[F[_]: Sync: RaiseThrowable]( contentIdx: Int , contentKey: String , encoding: String @@ -469,8 +469,8 @@ object IMAPClient { val decoder: Pipe[F, Byte, Char] = { s => encoding.toUpperCase match { - case "BASE64" => base64.decode[F](s) through charset.decode(chs) - case "QUOTED-PRINTABLE" => quotedPrintable.decode[F](s) through charset.decode(chs) + case "BASE64" => s through base64.decode[F] through charset.decode(chs) + case "QUOTED-PRINTABLE" => s through quotedPrintable.decode[F] through charset.decode(chs) case _ => s through charset.decode(chs) } } @@ -520,7 +520,7 @@ object IMAPClient { * * This requires UID content and BODY[HEADER] content to be present in map otherwise this will fail. */ - def mkEmailHeader[F[_]]( + def mkEmailHeader[F[_]: RaiseThrowable]( headerCodec: Codec[EmailHeader] ): Pipe[F, Map[String, Vector[IMAPData]], IMAPEmailHeader] = { def asString(data: Vector[IMAPData]): Either[String, String] = { @@ -610,7 +610,7 @@ object IMAPClient { * @tparam F * @return */ - def rawContent[F[_]](result: RequestResult[F]): Stream[F, (Int, String, IMAPData)] = { + def rawContent[F[_]: RaiseThrowable](result: RequestResult[F]): Stream[F, (Int, String, IMAPData)] = { def collectBytes(recordIdx: Int, key: String)(s: Stream[F, IMAPData]): Pull[F, (Int, String, IMAPData), Unit] = { def go(s: Stream[F, IMAPData]): Pull[F, (Int, String, IMAPData), Unit] = { s.pull.uncons1.flatMap { @@ -737,11 +737,11 @@ object IMAPClient { * various encodings. * */ - def lines[F[_]]: Pipe[F, Byte, IMAPData] = { + def lines[F[_]: RaiseThrowable]: Pipe[F, Byte, IMAPData] = { val crlf = ByteVector.view("\r\n".getBytes) def collectChunk(sz: Int)(s: Stream[F, Byte]): Pull[F, IMAPData, Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case Some((chunk, tl)) => val bs = chunk.toBytes val bv = ByteVector.view(bs.values, bs.offset, bs.size) @@ -757,7 +757,7 @@ object IMAPClient { } def collectLines(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, IMAPData, Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case Some((chunk, tl)) => val bs = chunk.toBytes val nb = buff ++ ByteVector.view(bs.values, bs.offset, bs.size) diff --git a/src/main/scala/spinoco/fs2/mail/internal/internal.scala b/src/main/scala/spinoco/fs2/mail/internal/internal.scala index bb32f7f..c74a9ea 100644 --- a/src/main/scala/spinoco/fs2/mail/internal/internal.scala +++ b/src/main/scala/spinoco/fs2/mail/internal/internal.scala @@ -18,7 +18,7 @@ package object internal { * Once downstream finishes this synchronous queue is read until exhaustion marked by [[None]]. */ def takeThroughDrain[F[_] : Concurrent, A](predicate: A => Boolean): Pipe[F, A, A] = { source => - Stream.eval(fs2.async.synchronousQueue[F, Option[Either[Throwable, A]]]).flatMap{ feedQueue => + Stream.eval(fs2.concurrent.Queue.synchronous[F, Option[Either[Throwable, A]]]).flatMap{ feedQueue => // dequeue and propagate errors to downstream def dequeue:Stream[F, A] = { @@ -48,4 +48,4 @@ package object internal { }} } -} \ No newline at end of file +} diff --git a/src/main/scala/spinoco/fs2/mail/smtp/SMTPClient.scala b/src/main/scala/spinoco/fs2/mail/smtp/SMTPClient.scala index 333f4d4..c125011 100644 --- a/src/main/scala/spinoco/fs2/mail/smtp/SMTPClient.scala +++ b/src/main/scala/spinoco/fs2/mail/smtp/SMTPClient.scala @@ -190,9 +190,9 @@ object SMTPClient { * It assumes the server never sends more data than very last \r\n, as the other data received will be discarded. * */ - def readResponse[F[_]]: Pipe[F, Byte, SMTPResponse] = { + def readResponse[F[_]: RaiseThrowable]: Pipe[F, Byte, SMTPResponse] = { def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, String, Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case Some((ch, tl)) => val bs = ch.toBytes val bv = buff ++ ByteVector.view(bs.values, bs.offset, bs.size) @@ -244,7 +244,7 @@ object SMTPClient { )(data: Stream[F, Byte])(implicit socketRef: Ref[F, Socket[F]]): F[Seq[SMTPResponse]] = { socketRef.get.flatMap { socket => sending.acquire >> - (data.to(socket.writes()).compile.drain >> + (data.through(socket.writes()).compile.drain >> socket.reads(1024, Some(timeout)).through(readResponse).compile.toVector ).attempt flatMap { case Right(rslt) => sending.release as rslt @@ -414,7 +414,7 @@ object SMTPClient { */ def insertDotIfNeeded[F[_]]: Pipe[F, Byte, Byte] = { def go(buff: ByteVector)(s: Stream[F, Byte]): Pull[F, Byte, Unit] = { - s.pull.unconsChunk.flatMap { + s.pull.uncons.flatMap { case Some((ch, tl)) => val bs = ch.toBytes val bv = buff ++ ByteVector.view(bs.values, bs.offset, bs.size) @@ -511,7 +511,7 @@ object SMTPClient { * @tparam F * @return */ - def encodeMimeBody[F[_]]( + def encodeMimeBody[F[_]: RaiseThrowable]( header: EmailHeader , body: MIMEPart[F] , emailHeaderCodec: Codec[EmailHeader] diff --git a/src/test/scala/spinoco/fs2/mail/imap/IMAPClientCommandSpec.scala b/src/test/scala/spinoco/fs2/mail/imap/IMAPClientCommandSpec.scala index 1c280eb..48aaabc 100644 --- a/src/test/scala/spinoco/fs2/mail/imap/IMAPClientCommandSpec.scala +++ b/src/test/scala/spinoco/fs2/mail/imap/IMAPClientCommandSpec.scala @@ -8,9 +8,12 @@ import org.scalacheck.Properties import org.scalacheck.Prop._ import spinoco.fs2.mail.imap.IMAPClient.impl.{IMAPData, IMAPText} +import scala.concurrent.ExecutionContext + object IMAPClientCommandSpec extends Properties("IMAPClient.request") { - import scala.concurrent.ExecutionContext.Implicits.global + private implicit val cs = IO.contextShift(ExecutionContext.global) + private implicit val c = IO.ioConcurrentEffect def createTagged(shouldFail: IO[Boolean], count: Int, done: IO[Unit]): Stream[IO, IMAPData] = { Stream.unfoldEval(count){ s => diff --git a/src/test/scala/spinoco/fs2/mail/internal/TakeThroughDrainSpec.scala b/src/test/scala/spinoco/fs2/mail/internal/TakeThroughDrainSpec.scala index 43531e4..2c12c1b 100644 --- a/src/test/scala/spinoco/fs2/mail/internal/TakeThroughDrainSpec.scala +++ b/src/test/scala/spinoco/fs2/mail/internal/TakeThroughDrainSpec.scala @@ -3,14 +3,16 @@ package spinoco.fs2.mail.internal import cats.effect.IO import org.scalacheck.Properties import org.scalacheck.Prop._ - import fs2._ import spinoco.fs2.mail.internal + +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){ - import scala.concurrent.ExecutionContext.Implicits.global + private implicit val cs = IO.contextShift(ExecutionContext.global) + private implicit val c = IO.ioConcurrentEffect property("early-terminated.drain") = protect{ @@ -19,9 +21,9 @@ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){ // one means a tag and three means some other data val source = Stream[IO, Int](2, 2, 2, 2, 2, 2, 1, 3).covary[IO] - fs2.async.unboundedQueue[IO, Int].flatMap{ queue => + fs2.concurrent.Queue.unbounded[IO, Int].flatMap{ queue => - (source.to(queue.enqueue).drain ++ + (source.through(queue.enqueue).drain ++ queue.dequeue.through(internal.takeThroughDrain(_ != 1)).take(2).drain ++ Stream.eval(queue.dequeue1) ).compile.last @@ -35,8 +37,8 @@ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){ // one means a tag and three means some other data val source = Stream[IO, Int](2, 2, 2, 2, 2, 2, 1, 3).covary[IO] - fs2.async.unboundedQueue[IO, Int].flatMap{ queue => - (source.to(queue.enqueue).drain ++ + fs2.concurrent.Queue.unbounded[IO, Int].flatMap{ queue => + (source.through(queue.enqueue).drain ++ queue.dequeue.through(internal.takeThroughDrain(_ != 1)).drain ++ Stream.eval(queue.dequeue1) ).compile.last @@ -48,8 +50,8 @@ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){ property("propagate-failure-from-source") = protect { val source = Stream[IO, Int](2, 2, 2, 2, 2, 2, 1, 3).covary[IO] ++ Stream.raiseError(Boom) - fs2.async.unboundedQueue[IO, Int].flatMap{ queue => - (source.to(queue.enqueue).drain ++ + fs2.concurrent.Queue.unbounded[IO, Int].flatMap{ queue => + (source.through(queue.enqueue).drain ++ queue.dequeue.through(internal.takeThroughDrain(_ != 100)).drain ++ Stream.eval(queue.dequeue1) ).compile.last @@ -59,8 +61,8 @@ object TakeThroughDrainSpec extends Properties("TakeThroughDrain"){ property("propagate-failure-on-finalize") = protect { val source = Stream[IO, Int](2, 2, 2, 2, 2, 2, 1, 3).covary[IO] ++ Stream.raiseError(Boom) - fs2.async.unboundedQueue[IO, Int].flatMap{ queue => - (source.to(queue.enqueue).drain ++ + fs2.concurrent.Queue.unbounded[IO, Int].flatMap{ queue => + (source.through(queue.enqueue).drain ++ queue.dequeue.through(internal.takeThroughDrain(_ != 1)).drain ++ Stream.eval(queue.dequeue1) ).compile.last