Skip to content

Commit f74f074

Browse files
committed
* minor GridFSStreamObserver update
1 parent 4df5905 commit f74f074

File tree

6 files changed

+41
-13
lines changed

6 files changed

+41
-13
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## Versions
44

5+
### 1.8.1
6+
7+
* GridFSStreamObserver suports now completed and resultLength
8+
59
### 1.8.0
610

711
* mongo-scala-driver 4.0.2

src/main/scala/com/sfxcode/nosql/mongo/gridfs/Base.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,14 @@ abstract class Base extends LazyLogging {
3434

3535
def readConcern: ReadConcern = gridfsBucket.readConcern
3636

37-
def upload(
38-
fileName: String,
39-
source: Observable[ByteBuffer],
40-
metadata: AnyRef = Document(),
41-
chunkSizeBytes: Int = 1024 * 256): Observable[ObjectId] = {
37+
def upload(fileName: String,
38+
source: Observable[ByteBuffer],
39+
metadata: AnyRef = Document(),
40+
chunkSizeBytes: Int = 1024 * 256): Observable[ObjectId] = {
4241
val metadataDocument = {
4342
metadata match {
4443
case document: Document => document
45-
case _ => Converter.toDocument(metadata)
44+
case _ => Converter.toDocument(metadata)
4645
}
4746
}
4847
val options: GridFSUploadOptions = new GridFSUploadOptions()
@@ -54,9 +53,11 @@ abstract class Base extends LazyLogging {
5453
def download(oid: ObjectId): GridFSDownloadObservable =
5554
gridfsBucket.downloadToObservable(oid)
5655

57-
def downloadToStream(oid: ObjectId, outputStream: OutputStream): Unit = {
56+
def downloadToStream(oid: ObjectId, outputStream: OutputStream): GridFSStreamObserver = {
5857
val observable: GridFSDownloadObservable = gridfsBucket.downloadToObservable(oid)
59-
observable.subscribe(GridFSStreamObserver(outputStream))
58+
val observer = GridFSStreamObserver(outputStream)
59+
observable.subscribe(observer)
60+
observer
6061
}
6162

6263
}

src/main/scala/com/sfxcode/nosql/mongo/gridfs/GridFSStreamObserver.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@ package com.sfxcode.nosql.mongo.gridfs
22

33
import java.io.OutputStream
44
import java.nio.ByteBuffer
5+
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong }
56

67
import com.typesafe.scalalogging.LazyLogging
78
import org.mongodb.scala.Observer
89

910
case class GridFSStreamObserver(outputStream: OutputStream) extends Observer[ByteBuffer] with LazyLogging {
11+
val completed = new AtomicBoolean(false)
12+
val resultLength = new AtomicInteger(0)
1013

1114
override def onNext(buffer: ByteBuffer): Unit = {
1215
val bytes = new Array[Byte](buffer.remaining())
16+
resultLength.set(resultLength.get() + bytes.length)
1317
buffer.get(bytes, 0, bytes.length)
1418
buffer.clear()
1519
outputStream.write(bytes)
@@ -18,8 +22,12 @@ case class GridFSStreamObserver(outputStream: OutputStream) extends Observer[Byt
1822
override def onError(e: Throwable): Unit = {
1923
logger.error(e.getMessage, e)
2024
outputStream.close()
25+
resultLength.set(-1)
26+
completed.set(true)
2127
}
2228

23-
override def onComplete(): Unit =
29+
override def onComplete(): Unit = {
2430
outputStream.close()
31+
completed.set(true)
32+
}
2533
}

src/main/scala/com/sfxcode/nosql/mongo/package.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.sfxcode.nosql
22

33
import com.sfxcode.nosql.mongo.bson.BsonConverter
44
import com.sfxcode.nosql.mongo.database.MongoConfig
5+
import com.sfxcode.nosql.mongo.gridfs.GridFSStreamObserver
56
import com.sfxcode.nosql.mongo.operation.ObservableIncludes
67
import org.bson.BsonValue
78
import org.bson.types.ObjectId
@@ -73,4 +74,9 @@ package object mongo extends ObservableIncludes with ObservableImplicits {
7374

7475
implicit def gridFSFileToBSonIdValue(file: GridFSFile): BsonValue = file.getId
7576

77+
implicit def observerToResult(observer: GridFSStreamObserver): Int = {
78+
while (!observer.completed.get) {}
79+
observer.resultLength.get()
80+
}
81+
7682
}

src/test/scala/com/sfxcode/nosql/mongo/gridfs/GridfsDatabase.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
package com.sfxcode.nosql.mongo.gridfs
22

3+
import better.files.File
34
import com.sfxcode.nosql.mongo.GridFSDAO
45
import com.sfxcode.nosql.mongo.database.DatabaseProvider
56

67
/**
7-
* GridFS Database Sample
8-
*/
8+
* GridFS Database Sample
9+
*/
910
object GridfsDatabase {
1011
val SourcePath = "src/test/resources/images/"
1112
val TargetPath = "/tmp/_files/"
1213

14+
File("/tmp/_files").createIfNotExists()
15+
1316
case class ImageMetadata(name: String, group: String = "logos", version: Int = 1, indexSet: Set[Int] = Set(1, 2, 3))
1417

1518
val provider = DatabaseProvider("test")

src/test/scala/com/sfxcode/nosql/mongo/gridfs/GridfsDatabaseFunctions.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,14 @@ trait GridfsDatabaseFunctions {
2929
}
3030

3131
def downloadImage(id: ObjectId, path: String): Unit = {
32-
val file = File(path).touch()
33-
ImageFilesDAO.downloadToStream(id, file.newOutputStream)
32+
val file = File(path)
33+
val start = System.currentTimeMillis()
34+
val size: Int = ImageFilesDAO.downloadToStream(id, file.newOutputStream)
35+
36+
println(
37+
"file: %s with size %s Bytes written in %s ms "
38+
.format(file.pathAsString, size, System.currentTimeMillis() - start)
39+
)
3440
}
3541

3642
def findImage(id: ObjectId): GridFSFile = ImageFilesDAO.findById(id)

0 commit comments

Comments
 (0)