Skip to content

Commit fce3fd3

Browse files
committed
refactor GridFS to reactive streams
1 parent 912ccdb commit fce3fd3

File tree

9 files changed

+133
-57
lines changed

9 files changed

+133
-57
lines changed

CHANGES.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
# Changes #
22

33
## Versions
4+
5+
### 1.8.0
6+
47
* mongo-scala-driver 4.0.2
5-
* maxWaitQueueSize is removed since 4.0.0
6-
*
8+
* maxWaitQueueSize is removed (since mongo-scala-driver 4.0.0)
9+
* GridFS Refactoring (Download / Upload use now reactive streams)
10+
* More Index Functions added
711

812
### 1.7.1
913
* [mongo-scala-driver 2.9.0](https://mongodb.github.io/mongo-scala-driver/2.9/changelog/)

src/main/scala/com/sfxcode/nosql/mongo/gridfs/Base.scala

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package com.sfxcode.nosql.mongo.gridfs
22

3-
import java.nio.channels.AsynchronousFileChannel
4-
import java.nio.file.{OpenOption, Path, StandardOpenOption}
3+
import java.io.OutputStream
4+
import java.nio.ByteBuffer
55

6+
import com.mongodb.client.gridfs.model.GridFSUploadOptions
7+
import com.sfxcode.nosql.mongo.Converter
68
import com.typesafe.scalalogging.LazyLogging
79
import org.mongodb.scala.bson.ObjectId
810
import org.mongodb.scala.gridfs.{GridFSBucket, GridFSDownloadObservable}
9-
import org.mongodb.scala.{Observable, ReadConcern, ReadPreference, WriteConcern}
11+
import org.mongodb.scala.{Document, Observable, ReadConcern, ReadPreference, WriteConcern}
1012

1113
abstract class Base extends LazyLogging {
1214

@@ -20,10 +22,6 @@ abstract class Base extends LazyLogging {
2022
metadataKey
2123
}
2224

23-
def download(oid: ObjectId):GridFSDownloadObservable = {
24-
gridfsBucket.downloadToObservable(oid)
25-
}
26-
2725
def drop(): Observable[Void] = gridfsBucket.drop()
2826

2927
def bucketName: String = gridfsBucket.bucketName
@@ -36,4 +34,32 @@ abstract class Base extends LazyLogging {
3634

3735
def readConcern: ReadConcern = gridfsBucket.readConcern
3836

37+
38+
def upload(fileName: String,
39+
source: Observable[ByteBuffer],
40+
metadata: AnyRef = Document(),
41+
chunkSizeBytes: Int = 1024 * 256): Observable[ObjectId] = {
42+
val metadataDocument = {
43+
metadata match {
44+
case document: Document => document
45+
case _ => Converter.toDocument(metadata)
46+
}
47+
}
48+
val options: GridFSUploadOptions = new GridFSUploadOptions()
49+
.chunkSizeBytes(chunkSizeBytes)
50+
.metadata(metadataDocument)
51+
gridfsBucket.uploadFromObservable(fileName, source, options)
52+
}
53+
54+
55+
def download(oid: ObjectId):GridFSDownloadObservable = {
56+
gridfsBucket.downloadToObservable(oid)
57+
}
58+
59+
def downloadToStream(oid: ObjectId, outputStream: OutputStream):Unit = {
60+
val observable: GridFSDownloadObservable = gridfsBucket.downloadToObservable(oid)
61+
observable.subscribe(GridFSStreamObserver(outputStream))
62+
}
63+
64+
3965
}
Lines changed: 8 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,19 @@
11
package com.sfxcode.nosql.mongo.gridfs
22

3-
import java.nio.ByteBuffer
3+
import java.io.InputStream
44

5-
import com.mongodb.client.gridfs.model.GridFSUploadOptions
6-
import com.sfxcode.nosql.mongo.Converter
7-
import org.bson.types.ObjectId
8-
import org.mongodb.scala.{ Document, Observable }
5+
import org.mongodb.scala.bson.ObjectId
6+
import org.mongodb.scala.{Document, Observable}
97

108
abstract class Crud extends Search {
119

1210
def deleteOne(id: ObjectId): Observable[Void] = gridfsBucket.delete(id)
1311

14-
def insertOne(fileName: String,
15-
source: Observable[ByteBuffer],
16-
metadata: AnyRef = Document(),
17-
chunkSizeBytes: Int = 1024 * 1204): Observable[ObjectId] = {
18-
val metadataDocument = {
19-
metadata match {
20-
case document: Document => document
21-
case _ => Converter.toDocument(metadata)
22-
}
23-
}
24-
val options: GridFSUploadOptions = new GridFSUploadOptions()
25-
.chunkSizeBytes(chunkSizeBytes)
26-
.metadata(metadataDocument)
27-
gridfsBucket.uploadFromObservable(fileName, source, options)
12+
def insertOne( fileName: String,
13+
stream: InputStream,
14+
metadata: AnyRef = Document(),
15+
chunkSizeBytes: Int = 1204 * 256, bufferSize:Int = 1024 * 64): Observable[ObjectId] = {
16+
upload(fileName, GridFSStreamObservable(stream, bufferSize), metadata, chunkSizeBytes)
2817
}
2918

30-
def upload(fileName: String,
31-
source: Observable[ByteBuffer],
32-
metadata: AnyRef = Document(),
33-
chunkSizeBytes: Int = 1024 * 1204): Observable[ObjectId] =
34-
insertOne(fileName, source, metadata, chunkSizeBytes)
35-
36-
37-
3819
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package com.sfxcode.nosql.mongo.gridfs
2+
3+
4+
import java.io.InputStream
5+
import java.util.concurrent.atomic.AtomicBoolean
6+
import java.nio.ByteBuffer
7+
8+
import com.typesafe.scalalogging.LazyLogging
9+
import org.mongodb.scala.{Observable, Observer, Subscription}
10+
11+
case class GridFSStreamObservable(inputStream: InputStream, bufferSize:Int = 1024 * 64) extends Observable[ByteBuffer] with LazyLogging{
12+
val isPublishing = new AtomicBoolean(false)
13+
val buffer = new Array[Byte](bufferSize)
14+
15+
override def subscribe(subscriber: Observer[_ >: ByteBuffer]): Unit = {
16+
isPublishing.set(true)
17+
subscriber.onSubscribe(GridFSSubscription(subscriber))
18+
}
19+
20+
case class GridFSSubscription(subscriber: Observer[_ >: ByteBuffer]) extends Subscription {
21+
22+
override def request(n: Long): Unit = {
23+
try {
24+
val len = inputStream.read(buffer)
25+
if (len >= 0 && isPublishing.get()) {
26+
val byteBuffer = ByteBuffer.wrap(buffer, 0, len)
27+
subscriber.onNext(byteBuffer)
28+
}
29+
else {
30+
subscriber.onComplete()
31+
inputStream.close()
32+
}
33+
} catch {
34+
case e:InterruptedException =>
35+
Thread.currentThread.interrupt()
36+
subscriber.onError(e)
37+
inputStream.close()
38+
logger.error(e.getMessage, e)
39+
case t:Throwable =>
40+
subscriber.onError(t)
41+
inputStream.close()
42+
logger.error(t.getMessage, t)
43+
}
44+
}
45+
46+
override def unsubscribe(): Unit = {
47+
isPublishing.set(false)
48+
}
49+
50+
override def isUnsubscribed: Boolean = ! isPublishing.get()
51+
}
52+
}
53+
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.sfxcode.nosql.mongo.gridfs
2+
3+
import java.io.OutputStream
4+
import java.nio.ByteBuffer
5+
6+
import com.typesafe.scalalogging.LazyLogging
7+
import org.mongodb.scala.Observer
8+
9+
case class GridFSStreamObserver(outputStream: OutputStream) extends Observer[ByteBuffer] with LazyLogging {
10+
11+
override def onNext(buffer: ByteBuffer): Unit = {
12+
val bytes = new Array[Byte](buffer.remaining())
13+
buffer.get(bytes, 0, bytes.length)
14+
buffer.clear()
15+
outputStream.write(bytes)
16+
}
17+
18+
override def onError(e: Throwable): Unit = {
19+
logger.error(e.getMessage, e)
20+
outputStream.close()
21+
}
22+
23+
override def onComplete(): Unit = {
24+
outputStream.close()
25+
}
26+
}

src/main/scala/com/sfxcode/nosql/mongo/gridfs/Search.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ abstract class Search extends Base {
1919
def findByMetadataValue(key: String, value: Any): GridFSFindObservable =
2020
find(createMetadataKey(key), value)
2121

22+
2223
}
31.5 KB
Loading

src/test/scala/com/sfxcode/nosql/mongo/gridfs/GridfsDatabaseApp.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
package com.sfxcode.nosql.mongo.gridfs
22

3-
import java.io.FileOutputStream
4-
import java.nio.ByteBuffer
5-
63
import com.sfxcode.nosql.mongo._
7-
import org.bson.types.ObjectId
84
import com.sfxcode.nosql.mongo.gridfs.GridfsDatabase._
5+
import org.bson.types.ObjectId
96

107
object GridfsDatabaseApp extends App with GridfsDatabaseFunctions {
118

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
11
package com.sfxcode.nosql.mongo.gridfs
22

3-
import java.io.FileOutputStream
4-
import java.nio.ByteBuffer
5-
63
import better.files.File
74
import com.sfxcode.nosql.mongo._
85
import com.sfxcode.nosql.mongo.gridfs.GridfsDatabase._
96
import org.bson.types.ObjectId
10-
import org.mongodb.scala.Observable
117
import org.mongodb.scala.bson.conversions.Bson
128
import org.mongodb.scala.gridfs.GridFSFile
139
import org.mongodb.scala.result.UpdateResult
@@ -29,18 +25,12 @@ trait GridfsDatabaseFunctions {
2925

3026
def insertImage(path: String, metadata: AnyRef): ObjectId = {
3127
val file = File(path)
32-
val buffer = ByteBuffer.wrap(file.loadBytes)
33-
ImageFilesDAO.upload(file.name, Observable(Seq(buffer)), metadata)
28+
ImageFilesDAO.insertOne(file.name, file.newInputStream, metadata, bufferSize = 1024 * 4)
3429
}
3530

36-
def downloadImage(id: ObjectId, path: String):Unit = {
31+
def downloadImage(id: ObjectId, path: String): Unit = {
3732
val file = File(path).touch()
38-
39-
val result = ImageFilesDAO.download(id)
40-
41-
val fc = file.fileChannel.get
42-
fc.write(result.asReadOnlyBuffer())
43-
fc.close()
33+
ImageFilesDAO.downloadToStream(id, file.newOutputStream)
4434
}
4535

4636
def findImage(id: ObjectId): GridFSFile = ImageFilesDAO.findById(id)
@@ -49,11 +39,9 @@ trait GridfsDatabaseFunctions {
4939

5040
def findImages(key: String, value: String): List[GridFSFile] = ImageFilesDAO.findByMetadataValue(key, value)
5141

52-
def updateMetadata(oid: ObjectId, value: Any): UpdateResult = {
42+
def updateMetadata(oid: ObjectId, value: Any): UpdateResult =
5343
ImageFilesDAO.updateMetadata(oid, value).headResult()
54-
}
5544

56-
def updateMetadataElements(filter: Bson, elements: Map[String, Any]): UpdateResult = {
45+
def updateMetadataElements(filter: Bson, elements: Map[String, Any]): UpdateResult =
5746
ImageFilesDAO.updateMetadataElements(filter, elements).headResult()
58-
}
5947
}

0 commit comments

Comments
 (0)