Skip to content

Commit c106d77

Browse files
authored
Merge pull request #3551 from rahulrangers/issue-3171
Implement `Socket#sendFile`
2 parents 85050d3 + 9c88497 commit c106d77

File tree

5 files changed

+167
-41
lines changed

5 files changed

+167
-41
lines changed

io/jvm-native/src/main/scala/fs2/io/file/FileHandlePlatform.scala

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,47 @@ private[file] trait FileHandlePlatform[F[_]] {
6767

6868
}
6969

70+
private[io] final class SyncFileHandle[F[_]](val chan: FileChannel)(implicit F: Sync[F])
71+
extends FileHandle[F] {
72+
type Lock = FileLock
73+
74+
override def force(metaData: Boolean): F[Unit] =
75+
F.blocking(chan.force(metaData))
76+
77+
override def lock: F[Lock] =
78+
F.blocking(chan.lock)
79+
80+
override def lock(position: Long, size: Long, shared: Boolean): F[Lock] =
81+
F.blocking(chan.lock(position, size, shared))
82+
83+
override def read(numBytes: Int, offset: Long): F[Option[Chunk[Byte]]] =
84+
F.blocking {
85+
val buf = ByteBuffer.allocate(numBytes)
86+
val len = chan.read(buf, offset)
87+
if (len < 0) None
88+
else if (len == 0) Some(Chunk.empty)
89+
else Some(Chunk.array(buf.array, 0, len))
90+
}
91+
92+
override def size: F[Long] =
93+
F.blocking(chan.size)
94+
95+
override def truncate(size: Long): F[Unit] =
96+
F.blocking { chan.truncate(size); () }
97+
98+
override def tryLock: F[Option[Lock]] =
99+
F.blocking(Option(chan.tryLock()))
100+
101+
override def tryLock(position: Long, size: Long, shared: Boolean): F[Option[Lock]] =
102+
F.blocking(Option(chan.tryLock(position, size, shared)))
103+
104+
override def unlock(f: Lock): F[Unit] =
105+
F.blocking(f.release())
106+
107+
override def write(bytes: Chunk[Byte], offset: Long): F[Int] =
108+
F.blocking(chan.write(bytes.toByteBuffer, offset))
109+
}
110+
70111
private[file] trait FileHandleCompanionPlatform {
71112
@deprecated("Use Files[F].open", "3.0.0")
72113
def fromPath[F[_]: Async](path: JPath, flags: Seq[OpenOption]): Resource[F, FileHandle[F]] =
@@ -76,49 +117,11 @@ private[file] trait FileHandleCompanionPlatform {
76117
def fromFileChannel[F[_]: Async](channel: F[FileChannel]): Resource[F, FileHandle[F]] =
77118
Files.forAsync[F].openFileChannel(channel)
78119

79-
/** Creates a `FileHandle[F]` from a `java.nio.channels.FileChannel`. */
120+
/** Creates a `SyncFileHandle[F]` from a `java.nio.channels.FileChannel`. */
80121
private[file] def make[F[_]](
81122
chan: FileChannel
82123
)(implicit F: Sync[F]): FileHandle[F] =
83-
new FileHandle[F] {
84-
type Lock = FileLock
85-
86-
override def force(metaData: Boolean): F[Unit] =
87-
F.blocking(chan.force(metaData))
88-
89-
override def lock: F[Lock] =
90-
F.blocking(chan.lock)
91-
92-
override def lock(position: Long, size: Long, shared: Boolean): F[Lock] =
93-
F.blocking(chan.lock(position, size, shared))
94-
95-
override def read(numBytes: Int, offset: Long): F[Option[Chunk[Byte]]] =
96-
F.blocking {
97-
val buf = ByteBuffer.allocate(numBytes)
98-
val len = chan.read(buf, offset)
99-
if (len < 0) None
100-
else if (len == 0) Some(Chunk.empty)
101-
else Some(Chunk.array(buf.array, 0, len))
102-
}
103-
104-
override def size: F[Long] =
105-
F.blocking(chan.size)
106-
107-
override def truncate(size: Long): F[Unit] =
108-
F.blocking { chan.truncate(size); () }
109-
110-
override def tryLock: F[Option[Lock]] =
111-
F.blocking(Option(chan.tryLock()))
112-
113-
override def tryLock(position: Long, size: Long, shared: Boolean): F[Option[Lock]] =
114-
F.blocking(Option(chan.tryLock(position, size, shared)))
115-
116-
override def unlock(f: Lock): F[Unit] =
117-
F.blocking(f.release())
118-
119-
override def write(bytes: Chunk[Byte], offset: Long): F[Int] =
120-
F.blocking(chan.write(bytes.toByteBuffer, offset))
121-
}
124+
new SyncFileHandle[F](chan)
122125

123126
/** Creates a `FileHandle[F]` from a `java.nio.channels.SeekableByteChannel`. Because a `SeekableByteChannel` doesn't provide all the functionalities required by `FileHandle` some features like locking will be unavailable. */
124127
private[file] def makeFromSeekableByteChannel[F[_]](

io/jvm/src/main/scala/fs2/io/net/SelectingSocket.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
package fs2
2323
package io.net
2424

25+
import fs2.io.file.SyncFileHandle
26+
import fs2.io.file.FileHandle
2527
import cats.effect.LiftIO
2628
import cats.effect.Selector
2729
import cats.effect.kernel.Async
@@ -77,7 +79,32 @@ private final class SelectingSocket[F[_]: LiftIO] private (
7779
F.delay {
7880
ch.shutdownInput(); ()
7981
}
82+
override def sendFile(
83+
file: FileHandle[F],
84+
offset: Long,
85+
count: Long,
86+
chunkSize: Int
87+
): Stream[F, Nothing] = file match {
88+
case syncFileHandle: SyncFileHandle[F] =>
89+
val fileChannel = syncFileHandle.chan
8090

91+
def go(currOffset: Long, remaining: Long): F[Unit] =
92+
if (remaining <= 0) F.unit
93+
else {
94+
F.blocking(fileChannel.transferTo(currOffset, remaining, ch)).flatMap { written =>
95+
if (written > 0) {
96+
go(currOffset + written, remaining - written)
97+
} else {
98+
selector.select(ch, OP_WRITE).to *> go(currOffset, remaining)
99+
}
100+
}
101+
}
102+
103+
Stream.exec(writeMutex.lock.surround(go(offset, count)))
104+
105+
case _ =>
106+
super.sendFile(file, offset, count, chunkSize)
107+
}
81108
}
82109

83110
private object SelectingSocket {

io/jvm/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
*/
2121

2222
package fs2.io.net.unixsocket
23-
23+
import fs2.io.file.SyncFileHandle
2424
import cats.effect.IO
2525
import cats.effect.LiftIO
2626
import cats.effect.kernel.{Async, Resource}
@@ -34,6 +34,7 @@ import fs2.io.net.Socket
3434
import fs2.io.evalOnVirtualThreadIfAvailable
3535
import java.nio.ByteBuffer
3636
import java.nio.channels.SocketChannel
37+
import fs2.io.file.FileHandle
3738

3839
private[unixsocket] trait UnixSocketsCompanionPlatform {
3940
def forIO: UnixSockets[IO] = forLiftIO
@@ -134,5 +135,30 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
134135
ch.shutdownInput(); ()
135136
}
136137
)
138+
override def sendFile(
139+
file: FileHandle[F],
140+
offset: Long,
141+
count: Long,
142+
chunkSize: Int
143+
): Stream[F, Nothing] = file match {
144+
case syncFileHandle: SyncFileHandle[F] =>
145+
val fileChannel = syncFileHandle.chan
146+
147+
def go(currOffset: Long, remaining: Long): F[Unit] =
148+
if (remaining <= 0) F.unit
149+
else {
150+
F.blocking(fileChannel.transferTo(currOffset, remaining, ch)).flatMap { written =>
151+
if (written == 0) F.unit
152+
else {
153+
go(currOffset + written, remaining - written)
154+
}
155+
}
156+
}
157+
158+
Stream.exec(writeMutex.lock.surround(go(offset, count)))
159+
160+
case _ =>
161+
super.sendFile(file, offset, count, chunkSize)
162+
}
137163
}
138164
}

io/shared/src/main/scala/fs2/io/net/Socket.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ package io
2424
package net
2525

2626
import com.comcast.ip4s.{IpAddress, SocketAddress}
27+
import fs2.io.file.FileHandle
2728

2829
/** Provides the ability to read/write from a TCP socket in the effect `F`.
2930
*/
@@ -69,6 +70,34 @@ trait Socket[F[_]] {
6970
/** Writes the supplied stream of bytes to this socket via `write` semantics.
7071
*/
7172
def writes: Pipe[F, Byte, Nothing]
73+
74+
/** Reads a file and writes it to a socket.
75+
* Streams the file contents of the specified size and sends them over the socket.
76+
* The stream terminates when the entire file has reached end of file or the specified count is reached.
77+
*
78+
* @param file the file handle to read from
79+
* @param offset the starting position in the file
80+
* @param count the maximum number of bytes to transfer
81+
* @param chunkSize the size of each chunk to read
82+
*/
83+
def sendFile(
84+
file: FileHandle[F],
85+
offset: Long,
86+
count: Long,
87+
chunkSize: Int
88+
): Stream[F, Nothing] = {
89+
90+
def go(currOffset: Long, remaining: Long): Stream[F, Byte] =
91+
if (remaining > 0)
92+
Stream.eval(file.read(math.min(remaining, chunkSize.toLong).toInt, currOffset)).flatMap {
93+
case Some(chunk) if chunk.nonEmpty =>
94+
Stream.chunk(chunk) ++ go(currOffset + chunk.size, remaining - chunk.size)
95+
case _ => Stream.empty
96+
}
97+
else Stream.empty
98+
99+
go(offset, count).through(writes)
100+
}
72101
}
73102

74103
object Socket extends SocketCompanionPlatform

io/shared/src/test/scala/fs2/io/net/tcp/SocketSuite.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import com.comcast.ip4s._
2929

3030
import scala.concurrent.duration._
3131
import scala.concurrent.TimeoutException
32+
import fs2.io.file._
3233

3334
class SocketSuite extends Fs2Suite with SocketSuitePlatform {
3435

@@ -284,5 +285,45 @@ class SocketSuite extends Fs2Suite with SocketSuitePlatform {
284285
}
285286
}
286287
}
288+
test("sendFile - sends data from file to socket from the offset") {
289+
val content = "Hello, world!"
290+
val offset = 7L
291+
val count = 5L
292+
val expected = "world"
293+
val chunkSize = 2
294+
295+
val setup = for {
296+
tempFile <- Files[IO].tempFile
297+
_ <- Stream
298+
.emits(content.getBytes)
299+
.covary[IO]
300+
.through(Files[IO].writeAll(tempFile))
301+
.compile
302+
.drain
303+
.toResource
304+
serverSetup <- Network[IO].serverResource(Some(ip"127.0.0.1"))
305+
(bindAddress, server) = serverSetup
306+
client <- Network[IO].client(bindAddress)
307+
} yield (tempFile, server, client)
308+
309+
Stream
310+
.resource(setup)
311+
.flatMap { case (tempFile, server, client) =>
312+
val serverStream = server.head.flatMap { socket =>
313+
Stream.eval(socket.reads.through(text.utf8.decode).compile.string)
314+
}
315+
val clientStream =
316+
Stream.resource(Files[IO].open(tempFile, Flags.Read)).flatMap { fileHandle =>
317+
client.sendFile(fileHandle, offset, count, chunkSize).drain ++
318+
Stream.eval(client.endOfOutput)
319+
}
320+
serverStream.concurrently(clientStream)
321+
}
322+
.compile
323+
.lastOrError
324+
.map { received =>
325+
assertEquals(received, expected)
326+
}
327+
}
287328
}
288329
}

0 commit comments

Comments
 (0)