Skip to content

Commit c35a6f7

Browse files
authored
Replace IO by polymorphic F (#53)
* Replace IO by a polymorphic F * Updated README Co-authored-by: mark@vectos.net <mark@vectos.net>
1 parent 2e0fe42 commit c35a6f7

File tree

4 files changed

+83
-80
lines changed

4 files changed

+83
-80
lines changed

README.md

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ libraryDependencies += "com.github.regis-leray" %% "fs2-ftp" % "<version>"
2020
### FTP / FTPS
2121

2222
```scala
23+
import cats.effect.IO
2324
import ray.fs2.ftp.FtpClient._
2425
import ray.fs2.ftp.FtpSettings._
2526

@@ -28,7 +29,7 @@ val settings = UnsecureFtpSettings("127.0.0.1", 21, FtpCredentials("foo", "bar")
2829
// FTP-SSL
2930
val settings = UnsecureFtpSettings.secure("127.0.0.1", 21, FtpCredentials("foo", "bar"))
3031

31-
connect(settings).use{
32+
connect[IO](settings).use{
3233
_.ls("/").compile.toList
3334
}
3435
```
@@ -39,10 +40,11 @@ connect(settings).use{
3940
```scala
4041
import ray.fs2.ftp.FtpClient._
4142
import ray.fs2.ftp.FtpSettings._
43+
import cats.effect.IO
4244

4345
val settings = SecureFtpSettings("127.0.0.1", 22, FtpCredentials("foo", "bar"))
4446

45-
connect(settings).use(
47+
connect[IO](settings).use(
4648
_.ls("/").compile.toList
4749
)
4850
```
@@ -52,33 +54,20 @@ connect(settings).use(
5254
import ray.fs2.ftp.FtpClient._
5355
import ray.fs2.ftp.FtpSettings._
5456
import java.nio.file.Paths._
57+
import cats.effect.IO
5558

5659
// Provide a SftpIdentity implementation
5760

5861
val keyFile = KeyFileSftpIdentity(Paths.get("privateKeyStringPath"))
5962

6063
val settings = SecureFtpSettings("127.0.0.1", 22, FtpCredentials("foo", ""), keyFile)
6164

62-
connect(settings).use(
65+
connect[IO](settings).use(
6366
_.ls("/").compile.toList
6467
)
6568
```
6669

67-
## Required ContextShit[IO]
68-
69-
```scala
70-
trait FtpClient[+A] {
71-
def stat(path: String)(implicit cs: ContextShift[IO]): IO[Option[FtpResource]]
72-
def readFile(path: String, chunkSize: Int = 2048)(implicit cs: ContextShift[IO]): fs2.Stream[IO, Byte]
73-
def rm(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
74-
def rmdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
75-
def mkdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
76-
def ls(path: String)(implicit cs: ContextShift[IO]): fs2.Stream[IO, FtpResource]
77-
def lsDescendant(path: String)(implicit cs: ContextShift[IO]): fs2.Stream[IO, FtpResource]
78-
def upload(path: String, source: fs2.Stream[IO, Byte])(implicit cs: ContextShift[IO]): IO[Unit]
79-
def execute[T](f: A => T)(implicit cs: ContextShift[IO]): IO[T]
80-
}
81-
```
70+
## Required ContextShift
8271

8372
All function required an implicit ContextShit[IO].
8473

@@ -87,7 +76,7 @@ More information here https://typelevel.org/cats-effect/datatypes/contextshift.h
8776

8877

8978

90-
Here how to provide an ContextShit
79+
Here how to provide an ContextShift
9180

9281
* you can use the default one provided by `IOApp`
9382
```scala
@@ -96,13 +85,13 @@ object MyApp extends cats.effect.IOApp {
9685
}
9786
```
9887

99-
Or create your own ContextShit
88+
Or create your own ContextShift
10089
```scala
10190
import cats.effect.IO
10291
import cats.effect.ContextShift
10392

10493
implicit val blockingIO = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
105-
implicit val cs: ContextShit[IO] = IO.contextShift(blockingIO)
94+
implicit val cs: ContextShift[IO] = IO.contextShift(blockingIO)
10695
```
10796

10897

@@ -116,7 +105,7 @@ import ray.fs2.ftp.FtpSettings._
116105

117106
val settings = SecureFtpSettings("127.0.0.1", 22, FtpCredentials("foo", "bar"))
118107

119-
connect(settings).use(
108+
connect[IO](settings).use(
120109
_.execute(_.version())
121110
)
122111
```

src/main/scala/ray/fs2/ftp/Ftp.scala

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,45 +2,50 @@ package ray.fs2.ftp
22

33
import java.io.{ FileNotFoundException, InputStream }
44

5-
import cats.effect.{ Blocker, ContextShift, IO, Resource }
6-
import cats.syntax.monadError._
5+
import cats.effect.{ Blocker, ConcurrentEffect, ContextShift, Resource }
6+
import cats.implicits._
77
import fs2.Stream
88
import org.apache.commons.net.ftp.{ FTP, FTPSClient, FTPClient => JFTPClient }
99
import ray.fs2.ftp.FtpSettings.UnsecureFtpSettings
1010

11-
final private class Ftp(unsafeClient: JFTPClient, blocker: Blocker) extends FtpClient[JFTPClient] {
11+
final private class Ftp[F[_]](unsafeClient: JFTPClient, blocker: Blocker)(
12+
implicit CE: ConcurrentEffect[F],
13+
CS: ContextShift[F]
14+
) extends FtpClient[F, JFTPClient] {
1215

13-
def stat(path: String)(implicit cs: ContextShift[IO]): IO[Option[FtpResource]] =
16+
def stat(path: String): F[Option[FtpResource]] =
1417
execute(client => Option(client.mlistFile(path)).map(FtpResource(_)))
1518

16-
def readFile(path: String, chunkSize: Int = 2048)(implicit cs: ContextShift[IO]): fs2.Stream[IO, Byte] = {
19+
def readFile(path: String, chunkSize: Int = 2048): fs2.Stream[F, Byte] = {
1720
val is = execute(client => Option(client.retrieveFileStream(path)))
18-
.flatMap(_.fold(IO.raiseError[InputStream](new FileNotFoundException(s"file doesnt exist $path")))(IO.pure))
21+
.flatMap(
22+
_.fold(CE.raiseError[InputStream](new FileNotFoundException(s"file doesnt exist $path")))(x => CE.pure(x))
23+
)
1924

2025
fs2.io.readInputStream(is, chunkSize, blocker)
2126
}
2227

23-
def rm(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
28+
def rm(path: String): F[Unit] =
2429
execute(_.deleteFile(path))
2530
.ensure(InvalidPathError(s"Path is invalid. Cannot delete file : $path"))(identity)
2631
.map(_ => ())
2732

28-
def rmdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
33+
def rmdir(path: String): F[Unit] =
2934
execute(_.removeDirectory(path))
3035
.ensure(InvalidPathError(s"Path is invalid. Cannot remove directory : $path"))(identity)
3136
.map(_ => ())
3237

33-
def mkdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
38+
def mkdir(path: String): F[Unit] =
3439
execute(_.makeDirectory(path))
3540
.ensure(InvalidPathError(s"Path is invalid. Cannot create directory : $path"))(identity)
3641
.map(_ => ())
3742

38-
def ls(path: String)(implicit cs: ContextShift[IO]): Stream[IO, FtpResource] =
43+
def ls(path: String): Stream[F, FtpResource] =
3944
fs2.Stream
4045
.evalSeq(execute(_.listFiles(path).toList))
4146
.map(FtpResource(_, Some(path)))
4247

43-
def lsDescendant(path: String)(implicit cs: ContextShift[IO]): Stream[IO, FtpResource] =
48+
def lsDescendant(path: String): Stream[F, FtpResource] =
4449
fs2.Stream
4550
.evalSeq(execute(_.listFiles(path).toList))
4651
.flatMap { f =>
@@ -51,28 +56,30 @@ final private class Ftp(unsafeClient: JFTPClient, blocker: Blocker) extends FtpC
5156
Stream(FtpResource(f, Some(path)))
5257
}
5358

54-
def upload(path: String, source: fs2.Stream[IO, Byte])(implicit cs: ContextShift[IO]): IO[Unit] =
59+
def upload(path: String, source: fs2.Stream[F, Byte]): F[Unit] =
5560
source
56-
.through(fs2.io.toInputStream[IO])
61+
.through(fs2.io.toInputStream[F])
5762
.evalMap(is =>
5863
execute(_.storeFile(path, is))
5964
.ensure(InvalidPathError(s"Path is invalid. Cannot upload data to : $path"))(identity)
6065
)
6166
.compile
6267
.drain
6368

64-
def execute[T](f: JFTPClient => T)(implicit cs: ContextShift[IO]): IO[T] =
65-
blocker.delay[IO, T](f(unsafeClient))
69+
def execute[T](f: JFTPClient => T): F[T] =
70+
blocker.delay[F, T](f(unsafeClient))
6671
}
6772

6873
object Ftp {
6974

70-
def connect(settings: UnsecureFtpSettings)(implicit cs: ContextShift[IO]): Resource[IO, FtpClient[JFTPClient]] =
75+
def connect[F[_]](
76+
settings: UnsecureFtpSettings
77+
)(implicit CS: ContextShift[F], CE: ConcurrentEffect[F]): Resource[F, FtpClient[F, JFTPClient]] =
7178
for {
72-
blocker <- Blocker[IO]
79+
blocker <- Blocker[F]
7380

74-
r <- Resource.make[IO, FtpClient[JFTPClient]] {
75-
IO.delay {
81+
r <- Resource.make[F, FtpClient[F, JFTPClient]] {
82+
CE.delay {
7683
val ftpClient = if (settings.secure) new FTPSClient() else new JFTPClient()
7784
settings.proxy.foreach(ftpClient.setProxy)
7885
ftpClient.connect(settings.host, settings.port)
@@ -90,14 +97,14 @@ object Ftp {
9097
ftpClient.enterLocalPassiveMode()
9198
}
9299

93-
success -> new Ftp(ftpClient, blocker)
100+
success -> new Ftp[F](ftpClient, blocker)
94101
}
95102
.ensure(ConnectionError(s"Fail to connect to server ${settings.host}:${settings.port}"))(_._1)
96103
.map(_._2)
97104
} { client =>
98105
for {
99106
connected <- client.execute(_.isConnected)
100-
_ <- if (!connected) IO.pure(())
107+
_ <- if (!connected) CE.pure(())
101108
else
102109
client
103110
.execute(_.logout)

src/main/scala/ray/fs2/ftp/FtpClient.scala

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,26 @@
11
package ray.fs2.ftp
22

3-
import cats.effect.{ ContextShift, IO, Resource }
3+
import cats.effect.{ ConcurrentEffect, ContextShift, Resource }
44
import fs2.Stream
55
import ray.fs2.ftp.FtpSettings.{ SecureFtpSettings, UnsecureFtpSettings }
66

7-
trait FtpClient[+A] {
8-
def stat(path: String)(implicit cs: ContextShift[IO]): IO[Option[FtpResource]]
9-
def readFile(path: String, chunkSize: Int = 2048)(implicit cs: ContextShift[IO]): fs2.Stream[IO, Byte]
10-
def rm(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
11-
def rmdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
12-
def mkdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit]
13-
def ls(path: String)(implicit cs: ContextShift[IO]): Stream[IO, FtpResource]
14-
def lsDescendant(path: String)(implicit cs: ContextShift[IO]): Stream[IO, FtpResource]
15-
def upload(path: String, source: fs2.Stream[IO, Byte])(implicit cs: ContextShift[IO]): IO[Unit]
16-
def execute[T](f: A => T)(implicit cs: ContextShift[IO]): IO[T]
7+
trait FtpClient[F[_], +A] {
8+
def stat(path: String): F[Option[FtpResource]]
9+
def readFile(path: String, chunkSize: Int = 2048): fs2.Stream[F, Byte]
10+
def rm(path: String): F[Unit]
11+
def rmdir(path: String): F[Unit]
12+
def mkdir(path: String): F[Unit]
13+
def ls(path: String): Stream[F, FtpResource]
14+
def lsDescendant(path: String): Stream[F, FtpResource]
15+
def upload(path: String, source: fs2.Stream[F, Byte]): F[Unit]
16+
def execute[T](f: A => T): F[T]
1717
}
1818

1919
object FtpClient {
2020

21-
def connect[A](settings: FtpSettings[A])(implicit cs: ContextShift[IO]): Resource[IO, FtpClient[A]] = settings match {
21+
def connect[F[_], A](
22+
settings: FtpSettings[A]
23+
)(implicit CS: ContextShift[F], CE: ConcurrentEffect[F]): Resource[F, FtpClient[F, A]] = settings match {
2224
case s: UnsecureFtpSettings => Ftp.connect(s)
2325
case s: SecureFtpSettings => SFtp.connect(s)
2426
}

src/main/scala/ray/fs2/ftp/SFtp.scala

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ package ray.fs2.ftp
22

33
import java.io._
44

5-
import cats.effect.{ Blocker, ContextShift, IO, Resource }
6-
import cats.syntax.applicativeError._
5+
import cats.effect.{ Blocker, ConcurrentEffect, ContextShift, Resource }
6+
import cats.implicits._
77
import fs2.Stream
88
import fs2.Stream._
99
import net.schmizz.sshj.SSHClient
@@ -14,33 +14,36 @@ import ray.fs2.ftp.FtpSettings.{ KeyFileSftpIdentity, RawKeySftpIdentity, Secure
1414

1515
import scala.jdk.CollectionConverters._
1616

17-
final private class SFtp(unsafeClient: JSFTPClient, blocker: Blocker) extends FtpClient[JSFTPClient] {
17+
final private class SFtp[F[_]](unsafeClient: JSFTPClient, blocker: Blocker)(
18+
implicit CE: ConcurrentEffect[F],
19+
CS: ContextShift[F]
20+
) extends FtpClient[F, JSFTPClient] {
1821

19-
def ls(path: String)(implicit cs: ContextShift[IO]): fs2.Stream[IO, FtpResource] =
22+
def ls(path: String): fs2.Stream[F, FtpResource] =
2023
fs2.Stream
2124
.evalSeq(execute(_.ls(path).asScala.toSeq))
2225
.map(FtpResource(_))
2326
.recoverWith {
24-
case ex: SFTPException if ex.getStatusCode == Response.StatusCode.NO_SUCH_FILE => fs2.Stream.empty.covary[IO]
25-
case other => fs2.Stream.raiseError[IO](other)
27+
case ex: SFTPException if ex.getStatusCode == Response.StatusCode.NO_SUCH_FILE => fs2.Stream.empty.covary[F]
28+
case other => fs2.Stream.raiseError[F](other)
2629
}
2730

28-
def lsDescendant(path: String)(implicit cs: ContextShift[IO]): fs2.Stream[IO, FtpResource] =
31+
def lsDescendant(path: String): fs2.Stream[F, FtpResource] =
2932
fs2.Stream
3033
.evalSeq(execute(_.ls(path).asScala.toSeq))
3134
.flatMap(f => if (f.isDirectory) lsDescendant(f.getPath) else Stream(FtpResource(f)))
3235
.recoverWith {
33-
case ex: SFTPException if ex.getStatusCode == Response.StatusCode.NO_SUCH_FILE => fs2.Stream.empty.covary[IO]
34-
case other => fs2.Stream.raiseError[IO](other)
36+
case ex: SFTPException if ex.getStatusCode == Response.StatusCode.NO_SUCH_FILE => fs2.Stream.empty.covary[F]
37+
case other => fs2.Stream.raiseError[F](other)
3538
}
3639

37-
def stat(path: String)(implicit cs: ContextShift[IO]): IO[Option[FtpResource]] =
40+
def stat(path: String): F[Option[FtpResource]] =
3841
execute(client => Option(client.statExistence(path)).map(FtpResource(path, _)))
3942

4043
def readFile(
4144
path: String,
4245
chunkSize: Int = 10 * 1024
43-
)(implicit cs: ContextShift[IO]): fs2.Stream[IO, Byte] =
46+
): fs2.Stream[F, Byte] =
4447
for {
4548
remoteFile <- Stream.eval(execute(_.open(path, java.util.EnumSet.of(OpenMode.READ))))
4649

@@ -54,22 +57,22 @@ final private class SFtp(unsafeClient: JSFTPClient, blocker: Blocker) extends Ft
5457
}
5558
}
5659

57-
input <- fs2.io.readInputStream(IO.pure(is), chunkSize, blocker)
60+
input <- fs2.io.readInputStream(CE.pure(is), chunkSize, blocker)
5861
} yield input
5962

60-
def rm(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
63+
def rm(path: String): F[Unit] =
6164
execute(_.rm(path))
6265

63-
def rmdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
66+
def rmdir(path: String): F[Unit] =
6467
execute(_.rmdir(path))
6568

66-
def mkdir(path: String)(implicit cs: ContextShift[IO]): IO[Unit] =
69+
def mkdir(path: String): F[Unit] =
6770
execute(_.mkdir(path))
6871

6972
def upload(
7073
path: String,
71-
source: fs2.Stream[IO, Byte]
72-
)(implicit cs: ContextShift[IO]): IO[Unit] =
74+
source: fs2.Stream[F, Byte]
75+
): F[Unit] =
7376
(for {
7477
remoteFile <- Stream.eval(execute(_.open(path, java.util.EnumSet.of(OpenMode.WRITE, OpenMode.CREAT))))
7578

@@ -82,21 +85,23 @@ final private class SFtp(unsafeClient: JSFTPClient, blocker: Blocker) extends Ft
8285
super.close()
8386
}
8487
}
85-
_ <- source.through(fs2.io.writeOutputStream(IO.pure(os), blocker))
88+
_ <- source.through(fs2.io.writeOutputStream(CE.pure(os), blocker))
8689
} yield ()).compile.drain
8790

88-
def execute[T](f: JSFTPClient => T)(implicit cs: ContextShift[IO]): IO[T] =
89-
blocker.delay[IO, T](f(unsafeClient))
91+
def execute[T](f: JSFTPClient => T): F[T] =
92+
blocker.delay[F, T](f(unsafeClient))
9093
}
9194

9295
object SFtp {
9396

94-
def connect(settings: SecureFtpSettings)(implicit cs: ContextShift[IO]): Resource[IO, FtpClient[JSFTPClient]] =
97+
def connect[F[_]](
98+
settings: SecureFtpSettings
99+
)(implicit CS: ContextShift[F], CE: ConcurrentEffect[F]): Resource[F, FtpClient[F, JSFTPClient]] =
95100
for {
96-
ssh <- Resource.liftF(IO(new SSHClient(settings.sshConfig)))
101+
ssh <- Resource.liftF(CE.delay(new SSHClient(settings.sshConfig)))
97102

98-
blocker <- Blocker[IO]
99-
r <- Resource.make[IO, FtpClient[JSFTPClient]](IO.delay {
103+
blocker <- Blocker[F]
104+
r <- Resource.make[F, FtpClient[F, JSFTPClient]](CE.delay {
100105
import settings._
101106

102107
if (!strictHostKeyChecking)
@@ -116,7 +121,7 @@ object SFtp {
116121

117122
new SFtp(ssh.newSFTPClient(), blocker)
118123
})(client =>
119-
client.execute(_.close()).attempt.flatMap(_ => if (ssh.isConnected) IO.delay(ssh.disconnect()) else IO.unit)
124+
client.execute(_.close()).attempt.flatMap(_ => if (ssh.isConnected) CE.delay(ssh.disconnect()) else CE.unit)
120125
)
121126
} yield r
122127

0 commit comments

Comments
 (0)