Skip to content

Commit e854777

Browse files
committed
Acquired mutex before calling go
1 parent 7125931 commit e854777

File tree

1 file changed

+26
-32
lines changed

1 file changed

+26
-32
lines changed

io/jvm/src/main/scala/fs2/io/net/unixsocket/UnixSocketsPlatform.scala

Lines changed: 26 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -147,21 +147,19 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
147147
case syncFileHandle: SyncFileHandle[F] =>
148148
val fileChannel = syncFileHandle.chan
149149

150-
def go(currOffset: Long, remaining: Long): Stream[F, Unit] =
151-
if (remaining <= 0) Stream.empty
150+
def go(currOffset: Long, remaining: Long): F[Unit] =
151+
if (remaining <= 0) F.unit
152152
else {
153153
val toTransfer = remaining.min(chunkSize.toLong)
154-
Stream
155-
.eval(writeMutex.lock.surround {
156-
F.blocking(fileChannel.transferTo(currOffset, toTransfer, ch))
157-
})
158-
.flatMap { written =>
159-
if (written == 0) Stream.empty
160-
else go(currOffset + written, remaining - written)
154+
F.blocking(fileChannel.transferTo(currOffset, toTransfer, ch)).flatMap { written =>
155+
if (written == 0) F.unit
156+
else {
157+
go(currOffset + written, remaining - written)
161158
}
159+
}
162160
}
163161

164-
go(offset, count).drain
162+
Stream.eval(writeMutex.lock.surround(go(offset, count))).drain
165163

166164
case _ =>
167165
super.sendfile(file, offset, count, chunkSize)
@@ -172,31 +170,27 @@ private[unixsocket] trait UnixSocketsCompanionPlatform {
172170
offset: Long,
173171
count: Long,
174172
chunkSize: Int
175-
): Stream[F, Nothing] =
176-
177-
file match {
178-
case syncFileHandle: SyncFileHandle[F] =>
179-
val fileChannel = syncFileHandle.chan
180-
181-
def go(currOffset: Long, remaining: Long): Stream[F, Unit] =
182-
if (remaining <= 0) Stream.empty
183-
else {
184-
val toTransfer = remaining.min(chunkSize.toLong)
185-
Stream
186-
.eval(readMutex.lock.surround {
187-
F.blocking(fileChannel.transferFrom(ch, currOffset, toTransfer))
188-
})
189-
.flatMap { readBytes =>
190-
if (readBytes == 0) Stream.empty
191-
else go(currOffset + readBytes, remaining - readBytes)
192-
}
173+
): Stream[F, Nothing] = file match {
174+
case syncFileHandle: SyncFileHandle[F] =>
175+
val fileChannel = syncFileHandle.chan
176+
177+
def go(currOffset: Long, remaining: Long): F[Unit] =
178+
if (remaining <= 0) F.unit
179+
else {
180+
val toTransfer = remaining.min(chunkSize.toLong)
181+
F.blocking(fileChannel.transferFrom(ch, currOffset, toTransfer)).flatMap { readBytes =>
182+
if (readBytes > 0) F.unit
183+
else {
184+
go(currOffset + readBytes, remaining - readBytes)
185+
}
193186
}
187+
}
194188

195-
go(offset, count).drain
189+
Stream.eval(readMutex.lock.surround(go(offset, count))).drain
196190

197-
case _ =>
198-
super.recvfile(file, offset, count, chunkSize)
199-
}
191+
case _ =>
192+
super.recvfile(file, offset, count, chunkSize)
193+
}
200194

201195
}
202196
}

0 commit comments

Comments
 (0)