Skip to content

Commit 5bee191

Browse files
committed
Document Extensions added
1 parent ec09e06 commit 5bee191

File tree

27 files changed

+345
-123
lines changed

27 files changed

+345
-123
lines changed

CHANGES.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
## Versions
44

5+
### 1.9.0
6+
7+
* ObservableIncludes changed method names for better understanding =>
8+
- result (A), results (Seq[A]), resultOption (Option[A]), resultList (List[A])
9+
- **Breaking Change** : dropped => headResult (use now result)
10+
- **Breaking Change** : changed => result (old Option[A], new A) => use resultOption instead
11+
* Add Bulkwrite to Crud Operations
12+
513
### 1.8.2
614

715
* GridFSStreamObserver suports now completed and resultLength
@@ -83,7 +91,7 @@
8391

8492

8593
### 1.4.0
86-
* dropped operations with pattern <name>Result (use headResult, list result or implicit conversion instead)
94+
* dropped operations with pattern <name>Result (use result, list result or implicit conversion instead)
8795
* introduce Database Functions trait
8896
* update docs
8997

docs/src/main/paradox/dao/base.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import com.sfxcode.nosql.mongo._
2727

2828
and append
2929

30-
* .headResult() for Single Result object
30+
* .result() for Single Result object
3131
* .resultList() for List result object
3232

3333
or use implicit conversion:

docs/src/main/paradox/features/observable.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ See [MongoDB Scala Driver Documentation](http://mongodb.github.io/mongo-scala-dr
2525

2626
Convert Observable to result object. Import mongo package is needed.
2727

28-
Functions headResult anf resultList have an optional maxWait in seconds parameter (Default maxWait = 10 seconds).
28+
Functions result anf resultList have an optional maxWait in seconds parameter (Default maxWait = 10 seconds).
2929

3030
```scala
3131
import com.sfxcode.nosql.mongo._
3232

33-
def restaurantsSize = RestaurantDAO.count().headResult()
33+
def restaurantsSize = RestaurantDAO.count().result()
3434

3535
def findAllRestaurants = RestaurantDAO.find().resultList(maxWait = 20)
3636

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.sfxcode.nosql.mongo.bson.convert
2+
3+
import java.text.{ DateFormat, SimpleDateFormat }
4+
import java.util.{ Date, TimeZone }
5+
6+
import com.typesafe.scalalogging.LazyLogging
7+
import org.bson.json.{ Converter, StrictJsonWriter }
8+
9+
object JsonDateTimeConverter {
10+
val Converter = new JsonDateTimeConverter
11+
12+
val UTC: TimeZone = TimeZone.getTimeZone("UTC")
13+
14+
val DateFormat: SimpleDateFormat = {
15+
val f = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'")
16+
f.setTimeZone(UTC)
17+
f
18+
}
19+
}
20+
21+
class JsonDateTimeConverter(dateFormat: DateFormat = JsonDateTimeConverter.DateFormat)
22+
extends Converter[java.lang.Long]
23+
with LazyLogging {
24+
override def convert(value: java.lang.Long, writer: StrictJsonWriter): Unit =
25+
try {
26+
val s = dateFormat.format(value)
27+
writer.writeString(s)
28+
} catch {
29+
case e: Exception =>
30+
logger.error(String.format("Failed to convert value %d to JSON date", value), e)
31+
}
32+
}

src/main/scala/com/sfxcode/nosql/mongo/gridfs/GridFSStreamObserver.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import com.typesafe.scalalogging.LazyLogging
88
import org.mongodb.scala.Observer
99

1010
case class GridFSStreamObserver(outputStream: OutputStream) extends Observer[ByteBuffer] with LazyLogging {
11-
val completed = new AtomicBoolean(false)
11+
val completed = new AtomicBoolean(false)
1212
val resultLength = new AtomicLong(0)
1313

1414
override def onNext(buffer: ByteBuffer): Unit = {

src/main/scala/com/sfxcode/nosql/mongo/gridfs/Search.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@ import org.mongodb.scala.model.Filters.equal
88

99
abstract class Search extends Base {
1010

11-
def find(filter: Bson = Document(), sort: Bson = Document()): GridFSFindObservable =
12-
gridfsBucket.find(filter).sort(sort)
11+
def find(filter: Bson = Document(), sort: Bson = Document(), limit: Int = 0): GridFSFindObservable =
12+
if (limit > 0) {
13+
gridfsBucket.find(filter).sort(sort).limit(0)
14+
} else {
15+
gridfsBucket.find(filter).sort(sort)
16+
}
1317

1418
def findById(oid: ObjectId): GridFSFindObservable = find(equal("_id", oid))
1519

src/main/scala/com/sfxcode/nosql/mongo/operation/Crud.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
package com.sfxcode.nosql.mongo.operation
22

33
import com.sfxcode.nosql.mongo.{ Converter, _ }
4-
import org.mongodb.scala.Observable
54
import org.mongodb.scala.bson.conversions.Bson
65
import org.mongodb.scala.model.Filters._
76
import org.mongodb.scala.model._
87
import org.mongodb.scala.result.{ DeleteResult, InsertManyResult, InsertOneResult, UpdateResult }
8+
import org.mongodb.scala.{ BulkWriteResult, Document, Observable, SingleObservable }
99

10+
import scala.collection.mutable.{ ArrayBuffer, ListBuffer }
1011
import scala.reflect.ClassTag
1112

1213
abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {
@@ -23,6 +24,19 @@ abstract class Crud[A]()(implicit ct: ClassTag[A]) extends Search[A] {
2324
def insertMany(values: Seq[A], options: InsertManyOptions): Observable[InsertManyResult] =
2425
coll.insertMany(values, options)
2526

27+
// bulk write
28+
29+
def bulkWrite(requests: List[WriteModel[_ <: A]], ordered: Boolean = true): SingleObservable[BulkWriteResult] =
30+
coll.bulkWrite(requests, BulkWriteOptions().ordered(ordered))
31+
32+
def bulkWriteMany(values: Seq[A], ordered: Boolean = true): SingleObservable[BulkWriteResult] = {
33+
val requests: ArrayBuffer[WriteModel[_ <: A]] = ArrayBuffer()
34+
values.foreach(value => {
35+
requests.append(InsertOneModel(value))
36+
})
37+
bulkWrite(requests.toList, ordered)
38+
}
39+
2640
// update
2741

2842
def replaceOne(value: A): Observable[UpdateResult] = {

src/main/scala/com/sfxcode/nosql/mongo/operation/ObservableIncludes.scala

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,6 @@ trait ObservableIncludes {
1717
implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
1818
override val debugString: C => String = doc => doc.toString
1919

20-
def resultList(maxWait: Int = DefaultMaxWait): List[C] =
21-
Await.result(asFuture(), Duration(maxWait, TimeUnit.SECONDS)).toList
22-
23-
def result(maxWait: Int = DefaultMaxWait): Option[C] = {
24-
val list =
25-
Await.result(asFuture(), Duration(maxWait, TimeUnit.SECONDS)).toList
26-
list.headOption
27-
}
28-
2920
}
3021

3122
trait ImplicitObservable[C] extends LazyLogging {
@@ -34,11 +25,17 @@ trait ObservableIncludes {
3425

3526
def asFuture(): Future[Seq[C]] = observable.toFuture()
3627

28+
def result(maxWait: Int = DefaultMaxWait): C =
29+
Await.result(observable.head(), Duration(maxWait, TimeUnit.SECONDS))
30+
3731
def results(maxWait: Int = DefaultMaxWait): Seq[C] =
3832
Await.result(asFuture(), Duration(maxWait, TimeUnit.SECONDS))
3933

40-
def headResult(maxWait: Int = DefaultMaxWait): C =
41-
Await.result(observable.head(), Duration(maxWait, TimeUnit.SECONDS))
34+
def resultList(maxWait: Int = DefaultMaxWait): List[C] =
35+
Await.result(asFuture(), Duration(maxWait, TimeUnit.SECONDS)).toList
36+
37+
def resultOption(maxWait: Int = DefaultMaxWait): Option[C] =
38+
Await.result(observable.headOption(), Duration(maxWait, TimeUnit.SECONDS))
4239

4340
}
4441

src/main/scala/com/sfxcode/nosql/mongo/operation/Search.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,16 @@ abstract class Search[A]()(implicit ct: ClassTag[A]) extends Base[A] {
1414

1515
protected def coll: MongoCollection[A]
1616

17-
def find(filter: Bson = Document(), sort: Bson = Document(), projection: Bson = Document()): FindObservable[A] =
18-
coll.find(filter).sort(sort).projection(projection)
17+
def find(
18+
filter: Bson = Document(),
19+
sort: Bson = Document(),
20+
projection: Bson = Document(),
21+
limit: Int = 0): FindObservable[A] =
22+
if (limit > 0) {
23+
coll.find(filter).sort(sort).projection(projection).limit(limit)
24+
} else {
25+
coll.find(filter).sort(sort).projection(projection)
26+
}
1927

2028
def findById(oid: ObjectId): FindObservable[A] = find(equal("_id", oid))
2129

@@ -26,9 +34,9 @@ abstract class Search[A]()(implicit ct: ClassTag[A]) extends Base[A] {
2634
coll.distinct[BsonValue](fieldName, filter)
2735

2836
def distinctResult[S <: Any](fieldName: String, filter: Bson = Document()): Seq[S] =
29-
distinct(fieldName, filter).results().map(v => fromBson(v).asInstanceOf[S])
37+
distinct(fieldName, filter).resultList().map(v => fromBson(v).asInstanceOf[S])
3038

31-
def findAggregated(aggregator: Seq[Bson]): AggregateObservable[A] =
32-
coll.aggregate(aggregator)
39+
def findAggregated(aggregator: Seq[Bson], allowDiskUse: Boolean = false): AggregateObservable[A] =
40+
coll.aggregate(aggregator).allowDiskUse(allowDiskUse)
3341

3442
}

src/main/scala/com/sfxcode/nosql/mongo/package.scala

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package com.sfxcode.nosql
22

3+
import com.sfxcode.nosql.mongo.Converter
34
import com.sfxcode.nosql.mongo.bson.BsonConverter
5+
import com.sfxcode.nosql.mongo.bson.convert.JsonDateTimeConverter
46
import com.sfxcode.nosql.mongo.database.MongoConfig
57
import com.sfxcode.nosql.mongo.gridfs.GridFSStreamObserver
68
import com.sfxcode.nosql.mongo.operation.ObservableIncludes
79
import org.bson.BsonValue
10+
import org.bson.json.{ JsonMode, JsonWriterSettings }
811
import org.bson.types.ObjectId
912
import org.mongodb.scala.bson.conversions.Bson
1013
import org.mongodb.scala.gridfs.{ GridFSFile, GridFSFindObservable }
@@ -13,16 +16,50 @@ import org.mongodb.scala.{ Document, FindObservable, Observable, ObservableImpli
1316
import scala.collection.JavaConverters._
1417
import scala.language.implicitConversions
1518

16-
package object mongo extends ObservableIncludes with ObservableImplicits {
19+
package object mongo extends MongoIncludes with DocumentIncludes {
20+
21+
implicit class DocumentExtensions[A <: Document](val document: A) extends AnyVal {
22+
23+
def asPlainMap: Map[String, Any] =
24+
BsonConverter.asMap(document)
25+
26+
def asPlainJson: String = {
27+
val builder = JsonWriterSettings.builder
28+
.dateTimeConverter(new JsonDateTimeConverter())
29+
.outputMode(JsonMode.RELAXED)
30+
document.toJson(builder.build())
31+
}
32+
}
33+
}
34+
35+
trait MongoIncludes extends ObservableIncludes with ObservableImplicits {
1736
implicit def stringToConfig(database: String): MongoConfig = MongoConfig(database)
1837

19-
implicit def observableToResult[T](obs: Observable[T]): T = obs.headResult()
38+
implicit def observableToResult[T](obs: Observable[T]): T = obs.result()
2039

2140
implicit def findObservableToResultList[T](obs: FindObservable[T]): List[T] =
2241
obs.resultList()
2342

24-
implicit def findObservableToResultOption[T](obs: FindObservable[T]): Option[T] = obs.result()
43+
implicit def findObservableToResultOption[T](obs: FindObservable[T]): Option[T] = obs.resultOption()
44+
45+
// gridfs
46+
47+
implicit def gridFSFindObservableToFiles(observable: GridFSFindObservable): List[GridFSFile] =
48+
observable.resultList()
49+
50+
implicit def gridFSFileToObjectId(file: GridFSFile): ObjectId =
51+
file.getObjectId
52+
53+
implicit def gridFSFileToBSonIdValue(file: GridFSFile): BsonValue = file.getId
2554

55+
implicit def observerToResultLength(observer: GridFSStreamObserver): Long = {
56+
while (!observer.completed.get) {}
57+
observer.resultLength.get()
58+
}
59+
60+
}
61+
62+
trait DocumentIncludes {
2663
implicit def mapToBson(value: Map[_, _]): Bson = Converter.toDocument(value)
2764

2865
implicit def documentFromJavaMap(map: java.util.Map[String, Any]): Document =
@@ -63,20 +100,4 @@ package object mongo extends ObservableIncludes with ObservableImplicits {
63100

64101
implicit def documentToObjectId(doc: Document): ObjectId =
65102
doc.getObjectId("_id")
66-
67-
// gridfs
68-
69-
implicit def gridFSFindObservableToFiles(observable: GridFSFindObservable): List[GridFSFile] =
70-
observable.resultList()
71-
72-
implicit def gridFSFileToObjectId(file: GridFSFile): ObjectId =
73-
file.getObjectId
74-
75-
implicit def gridFSFileToBSonIdValue(file: GridFSFile): BsonValue = file.getId
76-
77-
implicit def observerToResultLength(observer: GridFSStreamObserver): Long = {
78-
while (!observer.completed.get) {}
79-
observer.resultLength.get()
80-
}
81-
82103
}

0 commit comments

Comments
 (0)