Skip to content

Commit 37156c5

Browse files
authored
GEOMESA-3538 Accumulo - Support 'eventual' consistency scans (#3460)
* New parameter `accumulo.query.consistency` * Remove compatability checks for pre-1.2.4 schemas * Rename AccumuloDataStore.connector -> client
1 parent 9b6f84e commit 37156c5

File tree

47 files changed

+269
-471
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+269
-471
lines changed

docs/user/accumulo/commandline.rst

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ General Arguments
2424
Most commands require you to specify the connection to Accumulo. This generally includes the instance name,
2525
zookeeper hosts, username, and password (or Kerberos keytab file). Specify the instance with ``--instance-name``
2626
and ``--zookeepers``, and the username and password with ``--user`` and ``--password``. The password argument may be
27-
omitted in order to avoid plaintext credentials in the bash history and process list - in this case it will be
28-
prompted case for later. To use Kerberos authentication instead of a password, use ``--keytab`` with a path to a
29-
Kerberos keytab file containing an entry for the specified user. Since a keytab file allows authentication
30-
without any further constraints, it should be protected appropriately.
27+
omitted in order to avoid plaintext credentials in the process list - in this case it will be prompted for. To use
28+
Kerberos authentication instead of a password, use ``--keytab`` with a path to a Kerberos keytab file containing an
29+
entry for the specified user. Since a keytab file allows authentication without any further constraints, it should be
30+
protected appropriately.
3131

3232
Instead of specifying the cluster connection explicitly, an appropriate ``accumulo-client.properties``
3333
may be added to the classpath. See the
@@ -36,7 +36,8 @@ for information on the necessary configuration keys. Any explicit command-line a
3636
the configuration file.
3737

3838
The ``--auths`` argument corresponds to the ``AccumuloDataStore`` parameter ``geomesa.security.auths``. See
39-
:ref:`data_security` for more information.
39+
:ref:`data_security` for more information. The ``--consistency`` argument corresponds to the ``AccumuloDataStore`` parameter
40+
``accumulo.query.consistency``.
4041

4142
Commands
4243
--------

docs/user/accumulo/usage.rst

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,24 @@ Parameter Type Description
3232
``accumulo.query.record-threads`` Integer The number of threads to use for record retrieval
3333
``accumulo.write.threads`` Integer The number of threads to use for writing records
3434
``geomesa.stats.enable`` Boolean Toggle collection of statistics for newly created feature types
35+
``accumulo.query.consistency`` String Specify the scan consistency to use for queries. Must be one of ``immediate`` or
36+
``eventual``. See the `Accumulo documentation`_ for details.
3537
``accumulo.remote.arrow.enable`` Boolean Process Arrow encoding in Accumulo tablets servers as a distributed call
3638
``accumulo.remote.bin.enable`` Boolean Process binary encoding in Accumulo tablets servers as a distributed call
3739
``accumulo.remote.density.enable`` Boolean Process heatmap encoding in Accumulo tablets servers as a distributed call
3840
``accumulo.remote.stats.enable`` Boolean Process statistical calculations in Accumulo tablets servers as a distributed call
3941
``geomesa.partition.scan.parallel`` Boolean For partitioned schemas, execute scans in parallel instead of sequentially
4042
====================================== ======= ===================================================================================
4143

44+
.. _Accumulo documentation: https://accumulo.apache.org/docs/2.x/apidocs/org/apache/accumulo/core/client/ScannerBase.ConsistencyLevel.html
45+
4246
.. note::
4347

4448
It is an error to specify both ``accumulo.password`` and ``accumulo.keytab.path``.
4549

4650
Instead of specifying the cluster connection explicitly, an appropriate ``accumulo-client.properties`` may be added
4751
to the classpath. See the
48-
`Accumulo documentation <https://accumulo.apache.org/docs/2.x/getting-started/clients#creating-an-accumulo-client>`_
52+
`Accumulo documentation <https://accumulo.apache.org/docs/2.x/getting-started/clients#creating-an-accumulo-client>`__
4953
for information on the necessary configuration keys. Any explicit data store parameters will take precedence over
5054
the configuration file.
5155

@@ -66,4 +70,4 @@ that the GeoMesa code is on the classpath:
6670
org.geotools.api.data.DataStore dataStore =
6771
org.geotools.api.data.DataStoreFinder.getDataStore(parameters);
6872
69-
More information on using GeoTools can be found in the `GeoTools user guide <https://docs.geotools.org/stable/userguide/>`_.
73+
More information on using GeoTools can be found in the `GeoTools user guide <https://docs.geotools.org/stable/userguide/>`__.

geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/audit/AccumuloAuditReader.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.locationtech.geomesa.accumulo.audit
1010

1111
import org.apache.accumulo.core.client.AccumuloClient
12+
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel
1213
import org.apache.accumulo.core.security.Authorizations
1314
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore
1415
import org.locationtech.geomesa.index.audit.AuditReader
@@ -24,19 +25,27 @@ import java.time.ZonedDateTime
2425
* @param client accumulo client - note: assumed to be shared and not cleaned up on closed
2526
* @param table table containing audit records
2627
* @param authProvider auth provider
28+
* @param consistency scan consistency level
2729
*/
28-
class AccumuloAuditReader(client: AccumuloClient, table: String, authProvider: AuthorizationsProvider) extends AuditReader {
30+
class AccumuloAuditReader(
31+
client: AccumuloClient,
32+
table: String,
33+
authProvider: AuthorizationsProvider,
34+
consistency: Option[ConsistencyLevel] = None
35+
) extends AuditReader {
2936

3037
import scala.collection.JavaConverters._
3138

32-
def this(ds: AccumuloDataStore) = this(ds.connector, ds.config.auditWriter.table, ds.config.authProvider)
39+
def this(ds: AccumuloDataStore) =
40+
this(ds.client, ds.config.auditWriter.table, ds.config.authProvider, ds.config.queries.consistency)
3341

3442
@volatile
3543
private var tableExists: Boolean = client.tableOperations().exists(table)
3644

3745
override def getQueryEvents(typeName: String, dates: (ZonedDateTime, ZonedDateTime)): CloseableIterator[QueryEvent] = {
3846
if (!checkTable) { CloseableIterator.empty } else {
3947
val scanner = client.createScanner(table, new Authorizations(authProvider.getAuthorizations.asScala.toSeq: _*))
48+
consistency.foreach(scanner.setConsistencyLevel)
4049
AccumuloQueryEventTransform.iterator(scanner, typeName, dates)
4150
}
4251
}

geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloBackedMetadata.scala

Lines changed: 20 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,21 @@
99
package org.locationtech.geomesa.accumulo.data
1010

1111
import org.apache.accumulo.core.client.AccumuloClient
12+
import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel
1213
import org.apache.accumulo.core.data.{Mutation, Range, Value}
1314
import org.apache.accumulo.core.security.Authorizations
1415
import org.apache.hadoop.io.Text
1516
import org.locationtech.geomesa.accumulo.util.{GeoMesaBatchWriterConfig, TableManager}
16-
import org.locationtech.geomesa.index.metadata.{GeoMesaMetadata, KeyValueStoreMetadata, MetadataSerializer}
17+
import org.locationtech.geomesa.index.metadata.{KeyValueStoreMetadata, MetadataSerializer}
1718
import org.locationtech.geomesa.utils.collection.CloseableIterator
18-
import org.locationtech.geomesa.utils.io.{CloseQuietly, WithClose}
19+
import org.locationtech.geomesa.utils.io.WithClose
1920

20-
class AccumuloBackedMetadata[T](val connector: AccumuloClient, val table: String, val serializer: MetadataSerializer[T])
21-
extends KeyValueStoreMetadata[T] {
21+
class AccumuloBackedMetadata[T](
22+
val client: AccumuloClient,
23+
val table: String,
24+
val serializer: MetadataSerializer[T],
25+
consistency: Option[ConsistencyLevel] = None
26+
) extends KeyValueStoreMetadata[T] {
2227

2328
import scala.collection.JavaConverters._
2429

@@ -28,15 +33,15 @@ class AccumuloBackedMetadata[T](val connector: AccumuloClient, val table: String
2833

2934
private val empty = new Text()
3035

31-
override protected def checkIfTableExists: Boolean = connector.tableOperations().exists(table)
36+
override protected def checkIfTableExists: Boolean = client.tableOperations().exists(table)
3237

33-
override protected def createTable(): Unit = new TableManager(connector).ensureTableExists(table)
38+
override protected def createTable(): Unit = new TableManager(client).ensureTableExists(table)
3439

3540
override protected def createEmptyBackup(timestamp: String): AccumuloBackedMetadata[T] =
36-
new AccumuloBackedMetadata(connector, s"${table}_${timestamp}_bak", serializer)
41+
new AccumuloBackedMetadata(client, s"${table}_${timestamp}_bak", serializer, consistency)
3742

3843
override protected def write(rows: Seq[(Array[Byte], Array[Byte])]): Unit = {
39-
WithClose(connector.createBatchWriter(table, config)) { writer =>
44+
WithClose(client.createBatchWriter(table, config)) { writer =>
4045
rows.foreach { case (k, v) =>
4146
val m = new Mutation(k)
4247
m.put(empty, empty, new Value(v))
@@ -47,15 +52,16 @@ class AccumuloBackedMetadata[T](val connector: AccumuloClient, val table: String
4752

4853
override protected def delete(rows: Seq[Array[Byte]]): Unit = {
4954
val ranges = rows.map(r => Range.exact(new Text(r))).asJava
50-
WithClose(connector.createBatchDeleter(table, Authorizations.EMPTY, 1, config)) { deleter =>
55+
WithClose(client.createBatchDeleter(table, Authorizations.EMPTY, 1, config)) { deleter =>
5156
deleter.setRanges(ranges)
5257
deleter.delete()
5358
}
5459
}
5560

5661
override protected def scanValue(row: Array[Byte]): Option[Array[Byte]] = {
57-
WithClose(connector.createScanner(table, Authorizations.EMPTY)) { scanner =>
62+
WithClose(client.createScanner(table, Authorizations.EMPTY)) { scanner =>
5863
scanner.setRange(Range.exact(new Text(row)))
64+
consistency.foreach(scanner.setConsistencyLevel)
5965
val iter = scanner.iterator
6066
if (iter.hasNext) {
6167
Some(iter.next.getValue.get)
@@ -68,77 +74,12 @@ class AccumuloBackedMetadata[T](val connector: AccumuloClient, val table: String
6874
override protected def scanRows(prefix: Option[Array[Byte]]): CloseableIterator[(Array[Byte], Array[Byte])] = {
6975
// ensure we don't scan any single-row encoded values
7076
val range = prefix.map(p => Range.prefix(new Text(p))).getOrElse(new Range("", "~"))
71-
val scanner = connector.createScanner(table, Authorizations.EMPTY)
77+
val scanner = client.createScanner(table, Authorizations.EMPTY)
7278
scanner.setRange(range)
79+
consistency.foreach(scanner.setConsistencyLevel)
7380
CloseableIterator(scanner.iterator.asScala.map(r => (r.getKey.getRow.copyBytes, r.getValue.get)), scanner.close())
7481
}
75-
}
76-
77-
object AccumuloBackedMetadata {
78-
79-
/**
80-
* Old single row metadata kept around for back-compatibility
81-
*/
82-
class SingleRowAccumuloMetadata[T](metadata: AccumuloBackedMetadata[T]) {
83-
84-
import scala.collection.JavaConverters._
85-
86-
// if the table doesn't exist, we assume that we don't ever need to check it for old-encoded rows
87-
private val tableExists = metadata.connector.tableOperations().exists(metadata.table)
8882

89-
def getFeatureTypes: Array[String] = {
90-
if (!tableExists) { Array.empty } else {
91-
val scanner = metadata.connector.createScanner(metadata.table, Authorizations.EMPTY)
92-
// restrict to just one cf so we only get 1 hit per feature
93-
// use attributes as it's the only thing that's been there through all geomesa versions
94-
scanner.fetchColumnFamily(new Text(GeoMesaMetadata.AttributesKey))
95-
try {
96-
scanner.iterator.asScala.map(e => SingleRowAccumuloMetadata.getTypeName(e.getKey.getRow)).toArray
97-
} finally {
98-
scanner.close()
99-
}
100-
}
101-
}
102-
103-
/**
104-
* Migrate a table from the old single-row metadata to the new
105-
*
106-
* @param typeName simple feature type name
107-
*/
108-
def migrate(typeName: String): Unit = {
109-
if (tableExists) {
110-
val scanner = metadata.connector.createScanner(metadata.table, Authorizations.EMPTY)
111-
val writer = metadata.connector.createBatchWriter(metadata.table, GeoMesaBatchWriterConfig())
112-
try {
113-
scanner.setRange(SingleRowAccumuloMetadata.getRange(typeName))
114-
scanner.iterator.asScala.foreach { entry =>
115-
val key = entry.getKey.getColumnFamily.toString
116-
metadata.insert(typeName, key, metadata.serializer.deserialize(typeName, entry.getValue.get))
117-
// delete for the old entry
118-
val delete = new Mutation(entry.getKey.getRow)
119-
delete.putDelete(entry.getKey.getColumnFamily, entry.getKey.getColumnQualifier)
120-
writer.addMutation(delete)
121-
}
122-
} finally {
123-
CloseQuietly(scanner)
124-
if (writer != null) {
125-
CloseQuietly(writer)
126-
}
127-
}
128-
}
129-
}
130-
}
131-
132-
object SingleRowAccumuloMetadata {
133-
134-
private val MetadataTag = "~METADATA"
135-
private val MetadataRowKeyRegex = (MetadataTag + """_(.*)""").r
136-
137-
def getRange(typeName: String): Range = new Range(s"${MetadataTag}_$typeName")
138-
139-
def getTypeName(row: Text): String = {
140-
val MetadataRowKeyRegex(typeName) = row.toString
141-
typeName
142-
}
143-
}
83+
@deprecated("Use `client`")
84+
def connector: AccumuloClient = client
14485
}

0 commit comments

Comments
 (0)