Skip to content

Commit c84e6a1

Browse files
committed
GridFS Support
1 parent 3abc850 commit c84e6a1

File tree

16 files changed

+176
-29
lines changed

16 files changed

+176
-29
lines changed

build.sbt

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,18 @@ buildInfoOptions += BuildInfoOption.BuildTime
2323

2424
// Test
2525

26-
libraryDependencies += "org.specs2" %% "specs2-core" % "4.3.0" % "test"
26+
libraryDependencies += "org.specs2" %% "specs2-core" % "4.3.1" % Test
2727

28-
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" % "test"
28+
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3" % Test
2929

30-
libraryDependencies += "com.typesafe" % "config" % "1.3.3" % "test"
30+
libraryDependencies += "com.typesafe" % "config" % "1.3.3" % Test
3131

32-
libraryDependencies += "joda-time" % "joda-time" % "2.10" % "test"
32+
libraryDependencies += "joda-time" % "joda-time" % "2.10" % Test
33+
34+
libraryDependencies += "org.json4s" %% "json4s-native" % "3.5.4" % Test
35+
36+
libraryDependencies += "com.github.pathikrit" %% "better-files" % "3.5.0" % Test
3337

34-
libraryDependencies += "org.json4s" %% "json4s-native" % "3.5.4" % "test"
3538

3639
libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "2.4.0"
3740

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package com.sfxcode.nosql.mongo
2+
3+
import com.sfxcode.nosql.mongo.gridfs.Crud
4+
import org.bson.types.ObjectId
5+
import org.mongodb.scala.bson.conversions.Bson
6+
import org.mongodb.scala.gridfs.{GridFSBucket, GridFSFile}
7+
import org.mongodb.scala.model.CountOptions
8+
import org.mongodb.scala.{Completed, Document, MongoDatabase, Observable, ReadConcern, ReadPreference, SingleObservable, WriteConcern}
9+
10+
abstract class GridFSDAO(database: MongoDatabase, bucketName: String) extends Crud {
11+
12+
var bucket = GridFSBucket(database, bucketName)
13+
14+
protected def gridfsBucket: GridFSBucket = bucket
15+
16+
def filesCollectionName: String = "%s.%s".format(bucketName, "files")
17+
18+
def count(filter: Bson = Document(), options: CountOptions = CountOptions()): Observable[Long] = Files.count(filter, options)
19+
20+
def createrMetadataIndex(key: String, sortAscending: Boolean = true): SingleObservable[String] = Files.createIndexForField(createMetadataKey(key), sortAscending)
21+
22+
def renameFile(id: ObjectId, newFilename: String): Observable[Completed] = gridfsBucket.rename(id, newFilename)
23+
24+
def renameFile(file: GridFSFile, newFilename: String): Observable[Completed] = gridfsBucket.rename(file.getId, newFilename)
25+
26+
def withReadConcern(readConcern: ReadConcern): Unit = {
27+
bucket = GridFSBucket(database, bucketName).withReadConcern(readConcern)
28+
}
29+
30+
def withWriteConcern(writeConcern: WriteConcern): Unit = {
31+
bucket = GridFSBucket(database, bucketName).withWriteConcern(writeConcern)
32+
}
33+
34+
def withChunkSizeBytes(chunkSizeBytes: Int): Unit = {
35+
bucket = GridFSBucket(database, bucketName).withChunkSizeBytes(chunkSizeBytes)
36+
}
37+
38+
def withReadPreference(readPreference: ReadPreference): Unit = {
39+
bucket = GridFSBucket(database, bucketName).withReadPreference(readPreference)
40+
}
41+
42+
def withDisableMD5(disableMD5: Boolean): Unit = {
43+
bucket = GridFSBucket(database, bucketName).withDisableMD5(disableMD5)
44+
}
45+
46+
object Files extends MongoDAO[Document](database, filesCollectionName)
47+
48+
}

src/main/scala/com/sfxcode/nosql/mongo/MongoDAO.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package com.sfxcode.nosql.mongo
22

3-
import com.sfxcode.nosql.mongo.operation.Search
3+
import com.sfxcode.nosql.mongo.operation.Crud
44
import org.mongodb.scala.{ Document, MongoCollection, MongoDatabase }
55

66
import scala.reflect.ClassTag
77

88
/**
99
* Created by tom on 20.01.17.
1010
*/
11-
abstract class MongoDAO[A](database: MongoDatabase, collectionName: String)(implicit ct: ClassTag[A]) extends Search[A] {
11+
abstract class MongoDAO[A](database: MongoDatabase, collectionName: String)(implicit ct: ClassTag[A]) extends Crud[A] {
1212

1313
val collection: MongoCollection[A] = database.getCollection[A](collectionName)
1414

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.sfxcode.nosql.mongo.gridfs
2+
3+
import java.nio.channels.AsynchronousFileChannel
4+
import java.nio.file.{OpenOption, Path, StandardOpenOption}
5+
6+
import com.typesafe.scalalogging.LazyLogging
7+
import org.mongodb.scala.bson.ObjectId
8+
import org.mongodb.scala.gridfs.GridFSBucket
9+
import org.mongodb.scala.gridfs.helpers.AsynchronousChannelHelper.channelToOutputStream
10+
import org.mongodb.scala.{Completed, Observable, ReadConcern, ReadPreference, WriteConcern}
11+
12+
abstract class Base extends LazyLogging {
13+
14+
protected def gridfsBucket: GridFSBucket
15+
16+
def createMetadataKey(key:String):String = {
17+
var metadataKey = key
18+
if (!metadataKey.startsWith("metadata"))
19+
metadataKey = "%s.%s".format("metadata", key)
20+
metadataKey
21+
}
22+
23+
def downloadToStream(oid: ObjectId, outputPath: Path, openOptions: Seq[OpenOption] = List(StandardOpenOption.CREATE, StandardOpenOption.WRITE)): Observable[Long] = {
24+
var result: Long = -1
25+
var streamToDownloadTo: AsynchronousFileChannel = AsynchronousFileChannel.open(outputPath, openOptions: _*)
26+
gridfsBucket.downloadToStream(oid, channelToOutputStream(streamToDownloadTo))
27+
}
28+
29+
def drop(): Observable[Completed] = gridfsBucket.drop()
30+
31+
def bucketName: String = gridfsBucket.bucketName
32+
33+
def chunkSizeBytes: Int = gridfsBucket.chunkSizeBytes
34+
35+
def writeConcern: WriteConcern = gridfsBucket.writeConcern
36+
37+
def readPreference: ReadPreference = gridfsBucket.readPreference
38+
39+
def readConcern: ReadConcern = gridfsBucket.readConcern
40+
41+
def disableMD5: Boolean = gridfsBucket.disableMD5
42+
43+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.sfxcode.nosql.mongo.gridfs
2+
3+
import java.io.InputStream
4+
5+
import com.mongodb.client.gridfs.model.GridFSUploadOptions
6+
import com.sfxcode.nosql.mongo.Converter
7+
import org.bson.types.ObjectId
8+
import org.mongodb.scala.gridfs.AsyncInputStream
9+
import org.mongodb.scala.gridfs.helpers.AsyncStreamHelper.toAsyncInputStream
10+
import org.mongodb.scala.{Completed, Document, Observable}
11+
12+
abstract class Crud extends Search {
13+
14+
def deleteOne(id: ObjectId): Observable[Completed] = gridfsBucket.delete(id)
15+
16+
def insertOne(fileName: String, stream: InputStream, metadata: AnyRef = Document(), chunkSizeBytes: Int = chunkSizeBytes): Observable[ObjectId] = {
17+
val streamToUploadFrom: AsyncInputStream = toAsyncInputStream(stream)
18+
val metadataDocument = {
19+
metadata match {
20+
case document: Document => document
21+
case _ => Converter.toDocument(metadata)
22+
}
23+
}
24+
val options: GridFSUploadOptions = new GridFSUploadOptions().chunkSizeBytes(1024 * 1204).metadata(metadataDocument)
25+
gridfsBucket.uploadFromStream(fileName, streamToUploadFrom, options)
26+
}
27+
28+
}
29+
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package com.sfxcode.nosql.mongo.gridfs
2+
3+
import org.mongodb.scala.Document
4+
import org.mongodb.scala.bson.ObjectId
5+
import org.mongodb.scala.bson.conversions.Bson
6+
import org.mongodb.scala.gridfs.GridFSFindObservable
7+
import org.mongodb.scala.model.Filters.equal
8+
9+
abstract class Search extends Base {
10+
11+
def find(filter: Bson = Document(), sort: Bson = Document(), projection: Bson = Document()): GridFSFindObservable = gridfsBucket.find(filter).sort(sort)
12+
13+
def findById(oid: ObjectId): GridFSFindObservable = find(equal("_id", oid))
14+
15+
def find(key: String, value: Any): GridFSFindObservable = find(equal(key, value))
16+
17+
def findByMetadataValue(key: String, value: Any): GridFSFindObservable = find(createMetadataKey(key), value)
18+
19+
20+
21+
}
22+

src/main/scala/com/sfxcode/nosql/mongo/operation/Base.scala

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.typesafe.scalalogging.LazyLogging
44
import org.mongodb.scala.bson.conversions.Bson
55
import org.mongodb.scala.model.Sorts._
66
import org.mongodb.scala.model.{ CountOptions, IndexOptions }
7-
import org.mongodb.scala.{ Completed, Document, MongoCollection, Observable, Observer, SingleObservable }
7+
import org.mongodb.scala.{ Completed, Document, MongoCollection, Observable, SingleObservable }
88

99
import scala.reflect.ClassTag
1010

@@ -31,11 +31,3 @@ abstract class Base[A]()(implicit ct: ClassTag[A]) extends LazyLogging {
3131

3232
}
3333

34-
class SimpleCompletedObserver[T] extends Observer[T] with LazyLogging {
35-
override def onError(e: Throwable): Unit = logger.error(e.getMessage, e)
36-
37-
override def onComplete(): Unit = {}
38-
39-
override def onNext(result: T): Unit = {}
40-
}
41-

src/main/scala/com/sfxcode/nosql/mongo/operation/Crud.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import org.mongodb.scala.{ Completed, Observable }
99

1010
import scala.reflect.ClassTag
1111

12-
abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Base[A] {
12+
abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {
1313

1414
// create
1515
def insertOne(value: A): Observable[Completed] = coll.insertOne(value)

src/main/scala/com/sfxcode/nosql/mongo/operation/CrudObserver.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.sfxcode.nosql.mongo.operation
22

33
import com.sfxcode.nosql.mongo.Converter
4+
import com.typesafe.scalalogging.LazyLogging
45
import org.mongodb.scala.bson.conversions.Bson
56
import org.mongodb.scala.model.Filters.equal
67
import org.mongodb.scala.result.{ DeleteResult, UpdateResult }
@@ -31,3 +32,12 @@ trait CrudObserver[A] extends Crud[A] {
3132
}
3233

3334
}
35+
36+
class SimpleCompletedObserver[T] extends Observer[T] with LazyLogging {
37+
override def onError(e: Throwable): Unit = logger.error(e.getMessage, e)
38+
39+
override def onComplete(): Unit = {}
40+
41+
override def onNext(result: T): Unit = {}
42+
}
43+

src/main/scala/com/sfxcode/nosql/mongo/operation/ObservableIncludes.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import java.util.concurrent.TimeUnit
55
import org.mongodb.scala._
66

77
import scala.concurrent.duration.Duration
8-
import scala.concurrent.{ Await, Future }
8+
import scala.concurrent.{Await, Future}
99

1010
object ObservableIncludes extends ObservableIncludes
1111

0 commit comments

Comments
 (0)