Skip to content

Commit 3ed5c71

Browse files
committed
Merge branch 'release/1.8.3' into show-my-search-results
# Conflicts: # CHANGELOG.md
2 parents 854a734 + 8bb7ce8 commit 3ed5c71

14 files changed

+98
-65
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
66

77
## 1.8.3 - 2020-03-12
88

9+
### Changed
10+
- Elasticsearch indexer will new add new metadata fields as strings to avoid unexpected behavior on date fields.
11+
912
### Fixed
1013
- Ability to delete tags from sections on file [CATS-1042](https://opensource.ncsa.illinois.edu/jira/browse/CATS-1046)
1114
- Ability to delete tags on file page.

app/api/Admin.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class Admin @Inject() (userService: UserService,
3434
*/
3535
def deleteAllData(resetAll: Boolean) = ServerAdminAction { implicit request =>
3636
current.plugin[MongoSalatPlugin].map(_.dropAllData(resetAll))
37-
current.plugin[ElasticsearchPlugin].map(_.deleteAll)
37+
current.plugin[ElasticsearchPlugin].map(_.deleteAll())
3838

3939
Ok(toJson("done"))
4040
}

app/api/Comments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class Comments @Inject()(datasets: DatasetService, comments: CommentService, eve
3838
datasets.get(parent.dataset_id.get) match {
3939
case Some(dataset) => {
4040
current.plugin[ElasticsearchPlugin].foreach {
41-
_.index(dataset, false)
41+
_.index(dataset, false, None)
4242
}
4343
}
4444
case None => Logger.error("Dataset not found: " + id)

app/models/QueuedAction.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,15 @@ import play.api.libs.json._
1212
*/
1313
// ElasticsearchQueue service action parameters
1414
case class ElasticsearchParameters(
15-
recursive: Boolean = false
15+
recursive: Boolean = false,
16+
index: Option[String] = None
1617
) {}
1718

1819
object ElasticsearchParameters {
1920
implicit val elasticParamsWrites = new Writes[ElasticsearchParameters] {
2021
def writes(params: ElasticsearchParameters): JsValue = Json.obj(
21-
"recursive" -> params.recursive.toString
22+
"recursive" -> params.recursive.toString,
23+
"index" -> params.index.getOrElse("").toString
2224
)
2325
}
2426
}

app/services/CollectionService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,10 +243,10 @@ trait CollectionService {
243243
def syncUpRootSpaces(collectionId: UUID, initialParents: List[UUID])
244244

245245
/** Queue all collections to be indexed in Elasticsearch. */
246-
def indexAll()
246+
def indexAll(idx: Option[String] = None)
247247

248248
/** Queue a collection to be indexed in Elasticsearch. */
249-
def index(id: UUID)
249+
def index(id: UUID, idx: Option[String] = None)
250250

251251
def incrementViews(id: UUID, user: Option[User]): (Int, Date)
252252

app/services/DatasetService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,10 @@ trait DatasetService {
250250
def selectNewThumbnailFromFiles(datasetId: UUID)
251251

252252
/** Queue all datasets to be indexed in Elasticsearch. */
253-
def indexAll()
253+
def indexAll(idx: Option[String] = None)
254254

255255
/** Queue a dataset to be indexed in Elasticsearch. */
256-
def index(id: UUID)
256+
def index(id: UUID, idx: Option[String] = None)
257257

258258
def removeTags(id: UUID, userIdStr: Option[String], eid: Option[String], tags: List[String])
259259

app/services/ElasticsearchPlugin.scala

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,26 +6,28 @@ import org.elasticsearch.common.xcontent.XContentBuilder
66
import org.elasticsearch.search.aggregations.AggregationBuilders
77
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms.Bucket
88
import play.api.libs.json.Json._
9+
910
import scala.util.Try
10-
import scala.collection.mutable.{MutableList, ListBuffer}
11+
import scala.collection.mutable.{ListBuffer, MutableList}
1112
import scala.collection.immutable.List
12-
import play.api.{Plugin, Logger, Application}
13+
import play.api.{Application, Logger, Plugin}
1314
import org.elasticsearch.common.settings.Settings
1415
import org.elasticsearch.client.transport.TransportClient
1516
import org.elasticsearch.common.transport.InetSocketTransportAddress
1617
import java.net.InetAddress
1718
import java.util.regex.Pattern
1819

1920
import org.elasticsearch.common.xcontent.XContentFactory._
20-
import org.elasticsearch.action.search.{SearchPhaseExecutionException, SearchType, SearchResponse}
21+
import org.elasticsearch.action.search.{SearchPhaseExecutionException, SearchResponse, SearchType}
2122
import org.elasticsearch.client.transport.NoNodeAvailableException
2223
import org.elasticsearch.ElasticsearchException
2324
import org.elasticsearch.indices.IndexAlreadyExistsException
24-
25-
import models.{Collection, Dataset, File, Folder, UUID, ResourceRef, Section, ElasticsearchResult, User}
25+
import org.elasticsearch.index.reindex.{ReindexAction, ReindexPlugin, ReindexRequestBuilder}
26+
import models.{Collection, Dataset, ElasticsearchResult, File, Folder, ResourceRef, Section, UUID, User}
2627
import play.api.Play.current
2728
import play.api.libs.json._
2829
import _root_.util.SearchUtils
30+
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
2931

3032

3133
/**
@@ -67,7 +69,7 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
6769
} else {
6870
Settings.settingsBuilder().build()
6971
}
70-
client = Some(TransportClient.builder().settings(settings).build()
72+
client = Some(TransportClient.builder().settings(settings).addPlugin(classOf[ReindexPlugin]).build()
7173
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(serverAddress), serverPort)))
7274
Logger.debug("--- Elasticsearch Client is being created----")
7375
client match {
@@ -324,16 +326,33 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
324326
}
325327
case None =>
326328
}
329+
}
327330

331+
def swapIndex(idx: String): Unit = {
332+
client match {
333+
case Some(x) => {
334+
// Check if swap index exists before swapping
335+
if (x.admin.indices.exists(new IndicesExistsRequest(idx)).get().isExists()) {
336+
Logger.debug("Deleting "+nameOfIndex+" index...")
337+
deleteAll(nameOfIndex)
338+
createIndex(nameOfIndex)
339+
Logger.debug("Replacing with "+idx+"...")
340+
ReindexAction.INSTANCE.newRequestBuilder(x).source(idx).destination(nameOfIndex).get()
341+
Logger.debug("Deleting "+idx+" index...")
342+
deleteAll(idx)
343+
}
344+
}
345+
case None =>
346+
}
328347
}
329348

330-
/** Delete all indices */
331-
def deleteAll {
349+
/** Delete all documents in default index */
350+
def deleteAll(idx: String = nameOfIndex) {
332351
connect()
333352
client match {
334353
case Some(x) => {
335354
try {
336-
val response = x.admin().indices().prepareDelete(nameOfIndex).get()
355+
val response = x.admin().indices().prepareDelete(idx).get()
337356
if (!response.isAcknowledged())
338357
Logger.error("Did not delete all data from elasticsearch.")
339358
} catch {
@@ -398,51 +417,51 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
398417
* Reindex the given collection, if recursive is set to true it will
399418
* also reindex all datasets and files.
400419
*/
401-
def index(collection: Collection, recursive: Boolean) {
420+
def index(collection: Collection, recursive: Boolean, idx: Option[String]) {
402421
connect()
403422
// Perform recursion first if necessary
404423
if (recursive) {
405424
for (dataset <- datasets.listCollection(collection.id.toString)) {
406-
index(dataset, recursive)
425+
index(dataset, recursive, idx)
407426
}
408427
}
409-
index(SearchUtils.getElasticsearchObject(collection))
428+
index(SearchUtils.getElasticsearchObject(collection), idx.getOrElse(nameOfIndex))
410429
}
411430

412431
/**
413432
* Reindex the given dataset, if recursive is set to true it will
414433
* also reindex all files.
415434
*/
416-
def index(dataset: Dataset, recursive: Boolean) {
435+
def index(dataset: Dataset, recursive: Boolean, idx: Option[String]) {
417436
connect()
418437
// Perform recursion first if necessary
419438
if (recursive) {
420-
files.get(dataset.files).found.foreach(f => index(f))
439+
files.get(dataset.files).found.foreach(f => index(f, idx))
421440
for (folderid <- dataset.folders) {
422441
folders.get(folderid) match {
423442
case Some(f) => {
424-
files.get(f.files).found.foreach(fi => index(fi))
443+
files.get(f.files).found.foreach(fi => index(fi, idx))
425444
}
426445
case None => Logger.error(s"Error getting file $folderid for recursive indexing")
427446
}
428447
}
429448
}
430-
index(SearchUtils.getElasticsearchObject(dataset))
449+
index(SearchUtils.getElasticsearchObject(dataset), idx.getOrElse(nameOfIndex))
431450
}
432451

433452
/** Reindex the given file. */
434-
def index(file: File) {
453+
def index(file: File, idx: Option[String]) {
435454
connect()
436455
// Index sections first so they register for tag counts
437456
for (section <- file.sections) {
438-
index(section)
457+
index(section, idx)
439458
}
440-
index(SearchUtils.getElasticsearchObject(file))
459+
index(SearchUtils.getElasticsearchObject(file), idx.getOrElse(nameOfIndex))
441460
}
442461

443-
def index(section: Section) {
462+
def index(section: Section, idx: Option[String]) {
444463
connect()
445-
index(SearchUtils.getElasticsearchObject(section))
464+
index(SearchUtils.getElasticsearchObject(section), idx.getOrElse(nameOfIndex))
446465
}
447466

448467
/** Index document using an arbitrary map of fields. */
@@ -687,19 +706,12 @@ class ElasticsearchPlugin(application: Application) extends Plugin {
687706

688707
/** Return string-encoded JSON object describing field types */
689708
def getElasticsearchObjectMappings(): String = {
690-
"""dynamic_templates": [{
691-
"nonindexer": {
692-
"match": "*",
693-
"match_mapping_type":"string",
694-
"mapping": {
695-
"type": "string",
696-
"index": "not_analyzed"
697-
}
698-
}
699-
}
700-
],"""
701-
709+
/** The dynamic template will restrict all dynamic metadata fields to be indexed
710+
* as strings for datatypes besides Objects. In the future, this could
711+
* be removed, but only once the Search API better supports those data types (e.g. Date).
712+
*/
702713
"""{"clowder_object": {
714+
|"date_detection": false,
703715
|"properties": {
704716
|"name": {"type": "string"},
705717
|"description": {"type": "string"},

app/services/FileService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ trait FileService {
123123
*/
124124
def first(): Option[File]
125125

126-
def indexAll()
126+
def indexAll(idx: Option[String] = None)
127127

128-
def index(id: UUID)
128+
def index(id: UUID, idx: Option[String] = None)
129129

130130
/**
131131
* Directly insert file into database, for example if the file path is local.

app/services/mongodb/ElasticsearchQueue.scala

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,29 +29,31 @@ class ElasticsearchQueue @Inject() (
2929
// process the next entry in the queue
3030
def handler(action: QueuedAction) = {
3131
val recursive = action.elastic_parameters.fold(false)(_.recursive)
32+
val idx: Option[String] = action.elastic_parameters.fold[Option[String]](None)(_.index)
3233

3334
action.target match {
3435
case Some(targ) => {
3536
action.action match {
3637
case "index_file" => {
3738
val target = files.get(targ.id) match {
38-
case Some(f) => current.plugin[ElasticsearchPlugin].foreach(p => p.index(f))
39+
case Some(f) => current.plugin[ElasticsearchPlugin].foreach(p => p.index(f, idx))
3940
case None => throw new NullPointerException(s"File ${targ.id.stringify} no longer found for indexing")
4041
}
4142
}
4243
case "index_dataset" => {
4344
val target = datasets.get(targ.id) match {
44-
case Some(ds) => current.plugin[ElasticsearchPlugin].foreach(p => p.index(ds, recursive))
45+
case Some(ds) => current.plugin[ElasticsearchPlugin].foreach(p => p.index(ds, recursive, idx))
4546
case None => throw new NullPointerException(s"Dataset ${targ.id.stringify} no longer found for indexing")
4647
}
4748
}
4849
case "index_collection" => {
4950
val target = collections.get(targ.id) match {
50-
case Some(c) => current.plugin[ElasticsearchPlugin].foreach(p => p.index(c, recursive))
51+
case Some(c) => current.plugin[ElasticsearchPlugin].foreach(p => p.index(c, recursive, idx))
5152
case None => throw new NullPointerException(s"Collection ${targ.id.stringify} no longer found for indexing")
5253
}
5354
}
5455
case "index_all" => _indexAll()
56+
case "index_swap" => _swapIndex()
5557
case _ => throw new IllegalArgumentException(s"Unrecognized action: ${action.action}")
5658
}
5759
}
@@ -61,6 +63,7 @@ class ElasticsearchQueue @Inject() (
6163
case "index_dataset" => throw new IllegalArgumentException(s"No target specified for action ${action.action}")
6264
case "index_collection" => throw new IllegalArgumentException(s"No target specified for action ${action.action}")
6365
case "index_all" => _indexAll()
66+
case "index_swap" => _swapIndex()
6467
case _ => throw new IllegalArgumentException(s"Unrecognized action: ${action.action}")
6568
}
6669
}
@@ -70,13 +73,25 @@ class ElasticsearchQueue @Inject() (
7073
def _indexAll() = {
7174
// Add all individual entries to the queue and delete this action
7275
current.plugin[ElasticsearchPlugin].foreach(p => {
73-
// delete & recreate index
74-
p.deleteAll
75-
p.createIndex()
76+
val idx = p.nameOfIndex + "_reindex_temp_swap"
77+
Logger.debug("Reindexing database into temporary reindex file: "+idx)
78+
p.createIndex(idx)
79+
7680
// queue everything for each resource type
77-
collections.indexAll()
78-
datasets.indexAll()
79-
files.indexAll()
81+
collections.indexAll(Some(idx))
82+
datasets.indexAll(Some(idx))
83+
files.indexAll(Some(idx))
84+
85+
// queue action to swap index once we're done reindexing
86+
p.queue.queue("index_swap")
87+
})
88+
}
89+
90+
// Replace the main index with the newly reindexed temp file
91+
def _swapIndex() = {
92+
Logger.debug("Swapping temporary reindex for main index")
93+
current.plugin[ElasticsearchPlugin].foreach(p => {
94+
p.swapIndex(p.nameOfIndex + "_reindex_temp_swap")
8095
})
8196
}
8297
}

app/services/mongodb/MongoDBCollectionService.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -892,16 +892,16 @@ class MongoDBCollectionService @Inject() (
892892
}
893893
}
894894

895-
def indexAll() = {
895+
def indexAll(idx: Option[String] = None) = {
896896
// Bypass Salat in case any of the file records are malformed to continue past them
897897
Collection.dao.collection.find(MongoDBObject(), MongoDBObject("_id" -> 1)).foreach(c => {
898-
index(new UUID(c.get("_id").toString))
898+
index(new UUID(c.get("_id").toString), idx)
899899
})
900900
}
901901

902-
def index(id: UUID) {
902+
def index(id: UUID, idx: Option[String] = None) {
903903
try
904-
esqueue.queue("index_collection", new ResourceRef('collection, id))
904+
esqueue.queue("index_collection", new ResourceRef('collection, id), new ElasticsearchParameters(index=idx))
905905
catch {
906906
case except: Throwable => Logger.error(s"Error queuing collection ${id.stringify}: ${except}")
907907
case _ => Logger.error(s"Error queuing collection ${id.stringify}")

0 commit comments

Comments
 (0)