Skip to content

Commit c9eb22d

Browse files
authored
GEOMESA-3534 Accumulo - CLI command to update attributes/visibilities (#3462)
* New `update-features` command
1 parent fc34e5c commit c9eb22d

File tree

14 files changed

+401
-113
lines changed

14 files changed

+401
-113
lines changed

docs/user/cli/ingest.rst

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ Argument Description
4141
``--converter-error-mode`` Override the error mode defined by the converter
4242
``-t, --threads`` Number of parallel threads used
4343
``--input-format`` Format of input files (csv, tsv, avro, shp, json, etc)
44-
```--index`` Specify a particular GeoMesa index to write to, instead of all indices
44+
``--index`` Specify a particular GeoMesa index to write to, instead of all indices
4545
``--no-tracking`` This application closes when ingest job is submitted. Useful for launching jobs with a script
4646
``--run-mode`` Must be one of ``local`` or ``distributed`` (for map/reduce ingest)
4747
``--combine-inputs`` Combine multiple input files into a single input split (distributed jobs only)
@@ -135,3 +135,24 @@ For example::
135135

136136
For local ingests, feature writers will be pooled and only flushed periodically. The frequency of flushes can be
137137
controlled via the system property ``geomesa.ingest.local.batch.size``, and defaults to every 20,000 features.
138+
139+
``update-features``
140+
-------------------
141+
142+
The ``update-features`` command allows for bulk updating of attributes or visibilities in GeoMesa. For example, you may notice
143+
that records from a certain time period have an incorrect value due to a bug in your ingest process. ``update-features`` could
144+
be used to go back and correct the data.
145+
146+
========================== ==================================================================================================
147+
Argument Description
148+
========================== ==================================================================================================
149+
``-c, --catalog *`` The catalog table containing schema metadata
150+
``-f, --feature-name *`` The name of the schema
151+
``--set`` Attributes names and values to update, e.g. ``--set age=30``
152+
``--set-visibility`` A visibility expression to apply to all updated features
153+
``-q, --cql`` A CQL filter used to select the features to update
154+
``--force`` Suppress any confirmation prompts
155+
========================== ==================================================================================================
156+
157+
At least one of ``--set`` or ``--set-visibility`` must be specified. ``--set`` may be specified multiple times, in order
158+
to update multiple attributes at once.

geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/data/AccumuloDataStoreFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ object AccumuloDataStoreFactory extends GeoMesaDataStoreInfo {
283283
// verify that the configured auths are valid for the connector we are using (fail-fast)
284284
val invalidAuths = configuredAuths.filterNot(masterAuths.contains)
285285
if (invalidAuths.nonEmpty) {
286-
throw new IllegalArgumentException(s"The authorizations '${invalidAuths.mkString(",")}' " +
286+
throw new IllegalArgumentException(s"The authorizations '${invalidAuths.mkString("', '")}' " +
287287
"are not valid for the Accumulo connection being used")
288288
}
289289

geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/AccumuloRunner.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ object AccumuloRunner extends RunnerWithAccumuloEnvironment {
3636
new tools.ingest.AccumuloBulkIngestCommand,
3737
new tools.ingest.AccumuloDeleteFeaturesCommand,
3838
new tools.ingest.AccumuloIngestCommand,
39+
new tools.ingest.AccumuloUpdateFeaturesCommand,
3940
new tools.schema.AccumuloCreateSchemaCommand,
4041
new tools.schema.AccumuloDeleteCatalogCommand,
4142
new tools.schema.AccumuloRemoveSchemaCommand,

geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/export/AccumuloExportCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* https://www.apache.org/licenses/LICENSE-2.0
77
***********************************************************************/
88

9-
package org.locationtech.geomesa.accumulo.tools.export
9+
package org.locationtech.geomesa.accumulo.tools.`export`
1010

1111
import com.beust.jcommander.Parameters
1212
import org.apache.hadoop.mapreduce.Job

geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/export/AccumuloPlaybackCommand.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* https://www.apache.org/licenses/LICENSE-2.0
77
***********************************************************************/
88

9-
package org.locationtech.geomesa.accumulo.tools.export
9+
package org.locationtech.geomesa.accumulo.tools.`export`
1010

1111
import com.beust.jcommander.Parameters
1212
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/***********************************************************************
2+
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Apache License, Version 2.0
5+
* which accompanies this distribution and is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
***********************************************************************/
8+
9+
package org.locationtech.geomesa.accumulo.tools.ingest
10+
11+
import com.beust.jcommander.Parameters
12+
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore
13+
import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloUpdateFeaturesCommand.AccumuloUpdateFeaturesParams
14+
import org.locationtech.geomesa.accumulo.tools.{AccumuloDataStoreCommand, AccumuloDataStoreParams}
15+
import org.locationtech.geomesa.tools.ingest.UpdateFeaturesCommand
16+
import org.locationtech.geomesa.tools.ingest.UpdateFeaturesCommand.UpdateFeaturesParams
17+
18+
class AccumuloUpdateFeaturesCommand extends UpdateFeaturesCommand[AccumuloDataStore] with AccumuloDataStoreCommand {
19+
override val params: AccumuloUpdateFeaturesParams = new AccumuloUpdateFeaturesParams()
20+
}
21+
22+
object AccumuloUpdateFeaturesCommand {
23+
@Parameters(commandDescription = "Update attributes or visibilities of features in GeoMesa.")
24+
class AccumuloUpdateFeaturesParams extends UpdateFeaturesParams with AccumuloDataStoreParams
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/***********************************************************************
2+
* Copyright (c) 2013-2025 General Atomics Integrated Intelligence, Inc.
3+
* All rights reserved. This program and the accompanying materials
4+
* are made available under the terms of the Apache License, Version 2.0
5+
* which accompanies this distribution and is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
***********************************************************************/
8+
9+
package org.locationtech.geomesa.accumulo.tools.ingest
10+
11+
import org.apache.accumulo.core.security.Authorizations
12+
import org.geomesa.testcontainers.AccumuloContainer
13+
import org.geotools.api.data.{Query, Transaction}
14+
import org.geotools.api.feature.simple.SimpleFeature
15+
import org.geotools.data.collection.ListFeatureCollection
16+
import org.geotools.util.factory.Hints
17+
import org.locationtech.geomesa.accumulo.tools.{AccumuloDataStoreCommand, AccumuloRunner}
18+
import org.locationtech.geomesa.features.ScalaSimpleFeature
19+
import org.locationtech.geomesa.security.SecurityUtils
20+
import org.locationtech.geomesa.utils.collection.SelfClosingIterator
21+
import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes
22+
import org.locationtech.geomesa.utils.io.WithClose
23+
import org.specs2.mutable.SpecificationWithJUnit
24+
25+
import java.util.concurrent.atomic.AtomicInteger
26+
27+
class AccumuloUpdateFeaturesCommandTest extends SpecificationWithJUnit {
28+
29+
private val sftCounter = new AtomicInteger(0)
30+
31+
val sft = SimpleFeatureTypes.createType("tools", "name:String,level:String,dtg:Date,*geom:Point:srid=4326")
32+
val features = Seq.tabulate(10) { i =>
33+
val sf = ScalaSimpleFeature.create(sft, s"id$i", s"name$i", "user", s"2016-01-01T0$i:00:00.000Z", "POINT(1 0)")
34+
sf.getUserData.put(Hints.USE_PROVIDED_FID, java.lang.Boolean.TRUE)
35+
sf.getUserData.put(SecurityUtils.FEATURE_VISIBILITY, "user")
36+
sf
37+
}
38+
39+
def baseArgs: Array[String] = Array(
40+
"update-features",
41+
"--force",
42+
"--instance", AccumuloContainer.getInstance().getInstanceName,
43+
"--zookeepers", AccumuloContainer.getInstance().getZookeepers,
44+
"--user", AccumuloContainer.getInstance().getUsername,
45+
"--password", AccumuloContainer.getInstance().getPassword,
46+
"--auths", "user,admin",
47+
"--catalog", s"gm.${getClass.getSimpleName}${sftCounter.getAndIncrement()}",
48+
"--feature-name", sft.getTypeName,
49+
)
50+
51+
def execute(args: Array[String]): List[SimpleFeature] = {
52+
val command = AccumuloRunner.parseCommand(args).asInstanceOf[AccumuloDataStoreCommand]
53+
command.withDataStore { ds =>
54+
ds.createSchema(sft)
55+
ds.getFeatureSource(sft.getTypeName).addFeatures(new ListFeatureCollection(sft, features: _*))
56+
}
57+
command.execute()
58+
command.withDataStore { ds =>
59+
try {
60+
SelfClosingIterator(ds.getFeatureReader(new Query(sft.getTypeName), Transaction.AUTO_COMMIT)).toList.sortBy(_.getID)
61+
} finally {
62+
ds.delete()
63+
}
64+
}
65+
}
66+
67+
step {
68+
WithClose(AccumuloContainer.getInstance().client()) { client =>
69+
client.securityOperations()
70+
.changeUserAuthorizations(AccumuloContainer.getInstance().getUsername, new Authorizations("user", "admin"))
71+
}
72+
}
73+
74+
"AccumuloUpdateFeaturesCommand" should {
75+
"update attributes" in {
76+
val updated = execute(baseArgs ++ Array("--set", "name=bob"))
77+
updated.map(_.getID) mustEqual features.map(_.getID)
78+
foreach(Seq("level", "dtg", "geom")) { attribute =>
79+
updated.map(_.getAttribute(attribute)) mustEqual features.map(_.getAttribute(attribute))
80+
}
81+
updated.map(_.getAttribute("name")).toSet mustEqual Set("bob")
82+
}
83+
84+
"update attributes with cql filter" in {
85+
val updated = execute(baseArgs ++ Array("--set", "name=bob", "--cql", "dtg AFTER 2016-01-01T04:00:00.000Z"))
86+
updated.map(_.getID) mustEqual features.map(_.getID)
87+
foreach(Seq("level", "dtg", "geom")) { attribute =>
88+
updated.map(_.getAttribute(attribute)) mustEqual features.map(_.getAttribute(attribute))
89+
}
90+
updated.take(5).map(_.getAttribute("name")) mustEqual features.take(5).map(_.getAttribute("name"))
91+
updated.drop(5).map(_.getAttribute("name")).toSet mustEqual Set("bob")
92+
}
93+
94+
"update visibility with cql filter" in {
95+
val updated =
96+
execute(baseArgs ++ Array("--set", "level=admin", "--set-visibility", "admin", "--cql", "dtg AFTER 2016-01-01T04:00:00.000Z"))
97+
updated.map(_.getID) mustEqual features.map(_.getID)
98+
foreach(Seq("name", "dtg", "geom")) { attribute =>
99+
updated.map(_.getAttribute(attribute)) mustEqual features.map(_.getAttribute(attribute))
100+
}
101+
updated.take(5).map(_.getAttribute("level")).toSet mustEqual Set("user")
102+
updated.take(5).map(_.getUserData.get(SecurityUtils.FEATURE_VISIBILITY)).toSet mustEqual Set("user")
103+
updated.drop(5).map(_.getAttribute("level")).toSet mustEqual Set("admin")
104+
updated.drop(5).map(_.getUserData.get(SecurityUtils.FEATURE_VISIBILITY)).toSet mustEqual Set("admin")
105+
}
106+
}
107+
}

geomesa-accumulo/geomesa-accumulo-tools/src/test/scala/org/locationtech/geomesa/accumulo/tools/ingest/IngestCommandTest.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class IngestCommandTest extends Specification {
5353

5454
command.withDataStore { ds =>
5555
try {
56-
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures.features).toList
56+
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures().features).toList
5757
features must haveSize(3)
5858
features.map(_.getAttribute("name")) must containTheSameElementsAs(Seq("Hermione", "Harry", "Severus"))
5959
} finally {
@@ -73,7 +73,7 @@ class IngestCommandTest extends Specification {
7373

7474
command.withDataStore { ds =>
7575
try {
76-
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures.features).toList
76+
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures().features).toList
7777
features.size mustEqual 3
7878
features.map(_.getAttribute("name")) must containTheSameElementsAs(Seq("Hermione", "Harry", "Severus"))
7979
} finally {
@@ -124,7 +124,7 @@ class IngestCommandTest extends Specification {
124124

125125
command.withDataStore { ds =>
126126
try {
127-
val features = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures.features).toList
127+
val features = SelfClosingIterator(ds.getFeatureSource(sftName).getFeatures().features).toList
128128
features.size mustEqual 3
129129
features.map(_.getAttribute(1)) must containTheSameElementsAs(Seq("Hermione", "Harry", "Severus"))
130130
} finally {
@@ -153,7 +153,7 @@ class IngestCommandTest extends Specification {
153153

154154
command.withDataStore { ds =>
155155
try {
156-
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures.features).toList
156+
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures().features).toList
157157
features.size mustEqual 3
158158
features.map(_.getAttribute("name")) must containTheSameElementsAs(Seq("Hermione", "Harry", "Severus"))
159159
} finally {
@@ -194,7 +194,7 @@ class IngestCommandTest extends Specification {
194194

195195
command.withDataStore { ds =>
196196
try {
197-
val features = SelfClosingIterator(ds.getFeatureSource("renegades2").getFeatures.features).toList
197+
val features = SelfClosingIterator(ds.getFeatureSource("renegades2").getFeatures().features).toList
198198
features must beEmpty
199199
} finally {
200200
ds.delete()
@@ -213,7 +213,7 @@ class IngestCommandTest extends Specification {
213213

214214
command.withDataStore { ds =>
215215
try {
216-
val features = SelfClosingIterator(ds.getFeatureSource("geonames").getFeatures.features).toList
216+
val features = SelfClosingIterator(ds.getFeatureSource("geonames").getFeatures().features).toList
217217
features must haveSize(3)
218218
} finally {
219219
ds.delete()
@@ -232,7 +232,7 @@ class IngestCommandTest extends Specification {
232232

233233
command.withDataStore { ds =>
234234
try {
235-
val features = SelfClosingIterator(ds.getFeatureSource("geonames").getFeatures.features).toList
235+
val features = SelfClosingIterator(ds.getFeatureSource("geonames").getFeatures().features).toList
236236
features must beEmpty
237237
} finally {
238238
ds.delete()
@@ -251,7 +251,7 @@ class IngestCommandTest extends Specification {
251251

252252
command.withDataStore { ds =>
253253
try {
254-
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures.features).toList
254+
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures().features).toList
255255
features must haveSize(5)
256256
features.map(_.getAttribute("name")) must
257257
containTheSameElementsAs(Seq("Hermione", "Harry", "Severus", "Ron", "Ginny"))
@@ -272,7 +272,7 @@ class IngestCommandTest extends Specification {
272272

273273
command.withDataStore { ds =>
274274
try {
275-
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures.features).toList
275+
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures().features).toList
276276
features must haveSize(5)
277277
features.map(_.getAttribute("name")) must
278278
containTheSameElementsAs(Seq("Hermione", "Harry", "Severus", "Ron", "Ginny"))
@@ -293,7 +293,7 @@ class IngestCommandTest extends Specification {
293293

294294
command.withDataStore { ds =>
295295
try {
296-
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures.features).toList
296+
val features = SelfClosingIterator(ds.getFeatureSource("renegades").getFeatures().features).toList
297297
features must haveSize(5)
298298
features.map(_.getAttribute("name")) must
299299
containTheSameElementsAs(Seq("Hermione", "Harry", "Severus", "Ron", "Ginny"))

0 commit comments

Comments
 (0)