Skip to content

Commit 81282f4

Browse files
author
Nagaraj G
committed
Fix partial cache update post snapshot restore
Signed-off-by: Nagaraj G <[email protected]>
1 parent d29095f commit 81282f4

File tree

4 files changed

+193
-20
lines changed

4 files changed

+193
-20
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1818
* [Resource Sharing] Reverts @Inject pattern usage for ResourceSharingExtension to client accessor pattern. ([#5576](https://github.com/opensearch-project/security/pull/5576))
1919
* Inject user custom attributes when injecting user and role information to the thread context ([#5560](https://github.com/opensearch-project/security/pull/5560))
2020
* Allow any plugin system request when `plugins.security.system_indices.enabled` is set to `false` ([#5579](https://github.com/opensearch-project/security/pull/5579))
21+
* Fix compilation issue after change to Subject interface in core and bump to 3.2.0 ([#5423](https://github.com/opensearch-project/security/pull/5423))
22+
* Provide SecureHttpTransportParameters to complement SecureTransportParameters counterpart ([#5432](https://github.com/opensearch-project/security/pull/5432))
23+
* Use isClusterPerm instead of requestedResolved.isLocalAll() to determine if action is a cluster action ([#5445](https://github.com/opensearch-project/security/pull/5445))
24+
* Fix config update with deprecated config types failing in mixed clusters ([#5456](https://github.com/opensearch-project/security/pull/5456))
25+
* Fix usage of jwt_clock_skew_tolerance_seconds in HTTPJwtAuthenticator ([#5506](https://github.com/opensearch-project/security/pull/5506))
26+
* Fix partial cache update post snapshot restore[#5478](https://github.com/opensearch-project/security/pull/5478)
2127

2228
### Refactoring
2329

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package org.opensearch.security;
2+
3+
import org.apache.logging.log4j.LogManager;
4+
import org.apache.logging.log4j.Logger;
5+
import org.junit.After;
6+
import org.junit.Before;
7+
import org.junit.ClassRule;
8+
import org.junit.Test;
9+
import org.opensearch.OpenSearchStatusException;
10+
import org.opensearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
11+
import org.opensearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
12+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
13+
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
14+
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
15+
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
16+
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
17+
import org.opensearch.action.get.GetRequest;
18+
import org.opensearch.action.get.GetResponse;
19+
import org.opensearch.action.index.IndexRequest;
20+
import org.opensearch.action.support.WriteRequest;
21+
import org.opensearch.client.RequestOptions;
22+
import org.opensearch.client.RestHighLevelClient;
23+
import org.opensearch.common.xcontent.XContentType;
24+
import org.opensearch.core.rest.RestStatus;
25+
import org.opensearch.security.api.AbstractApiIntegrationTest;
26+
import org.opensearch.security.support.ConfigConstants;
27+
import org.opensearch.test.framework.TestSecurityConfig;
28+
import org.opensearch.test.framework.cluster.ClusterManager;
29+
import org.opensearch.test.framework.cluster.LocalCluster;
30+
import org.opensearch.test.framework.cluster.TestRestClient;
31+
import org.opensearch.transport.client.Client;
32+
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.concurrent.ExecutionException;
36+
37+
import static org.hamcrest.MatcherAssert.assertThat;
38+
import static org.junit.Assert.assertEquals;
39+
import static org.junit.Assert.assertThrows;
40+
import static org.junit.Assert.assertTrue;
41+
import static org.opensearch.security.CrossClusterSearchTests.TYPE_ATTRIBUTE;
42+
import static org.opensearch.security.SearchOperationTest.TEST_SNAPSHOT_REPOSITORY_NAME;
43+
import static org.opensearch.security.support.ConfigConstants.SECURITY_BACKGROUND_INIT_IF_SECURITYINDEX_NOT_EXIST;
44+
import static org.opensearch.security.support.ConfigConstants.SECURITY_RESTAPI_ROLES_ENABLED;
45+
import static org.opensearch.test.framework.TestSecurityConfig.AuthcDomain.AUTHC_HTTPBASIC_INTERNAL;
46+
import static org.opensearch.test.framework.TestSecurityConfig.Role.ALL_ACCESS;
47+
import static org.opensearch.test.framework.TestSecurityConfig.User.USER_ADMIN;
48+
import static org.opensearch.test.framework.matcher.GetResponseMatchers.containDocument;
49+
50+
public class SecurityIndexSnapshotRestoreTests extends AbstractApiIntegrationTest {
51+
private static final Logger log = LogManager.getLogger(SecurityIndexSnapshotRestoreTests.class);
52+
53+
private static final String TEST_INDEX_NAME = "my_index_001";
54+
private static final String DOC_ID = "doc_id";
55+
56+
private static final TestSecurityConfig.User ADMIN_USER = new TestSecurityConfig.User("admin").roles(ALL_ACCESS).attr(TYPE_ATTRIBUTE, "administrative");
57+
58+
private static final TestSecurityConfig.User LIMITED_READ_USER_1 = new TestSecurityConfig.User("limited_read_user").roles(
59+
new TestSecurityConfig.Role("limited-reader").indexPermissions("indices:data/read*").on(TEST_INDEX_NAME));
60+
61+
private static final TestSecurityConfig.User LIMITED_READ_USER_2 = new TestSecurityConfig.User("user2");
62+
63+
private static final TestSecurityConfig.Role LIMITED_READ_USER_2_ROLE =
64+
new TestSecurityConfig.Role("limited-reader_2").indexPermissions("indices:data/read*").on(TEST_INDEX_NAME);
65+
66+
private String securityIndex;
67+
68+
@ClassRule
69+
public static LocalCluster cluster = new LocalCluster.Builder().clusterManager(ClusterManager.THREE_CLUSTER_MANAGERS)
70+
.authc(AUTHC_HTTPBASIC_INTERNAL)
71+
.users(ADMIN_USER, LIMITED_READ_USER_1)
72+
.anonymousAuth(false)
73+
.nodeSettings(
74+
Map.of(
75+
SECURITY_RESTAPI_ROLES_ENABLED,
76+
List.of("user_" + USER_ADMIN.getName() + "__" + ALL_ACCESS.getName()),
77+
SECURITY_BACKGROUND_INIT_IF_SECURITYINDEX_NOT_EXIST,
78+
false
79+
)
80+
)
81+
.build();
82+
83+
@Before
84+
public void setUp() throws Exception {
85+
securityIndex = ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX;
86+
87+
try (Client client = cluster.getInternalNodeClient()) {
88+
client.admin()
89+
.cluster()
90+
.putRepository(
91+
new PutRepositoryRequest(TEST_SNAPSHOT_REPOSITORY_NAME).type("fs")
92+
.settings(Map.of("location", cluster.getSnapshotDirPath()))
93+
)
94+
.actionGet();
95+
96+
CreateIndexResponse createIndexResponse = client.admin().indices()
97+
.create(new CreateIndexRequest(TEST_INDEX_NAME))
98+
.actionGet();
99+
assertTrue(createIndexResponse.isAcknowledged());
100+
101+
client.index(new IndexRequest(TEST_INDEX_NAME)
102+
.id(DOC_ID)
103+
.source("{\"message\": \"test document 1\"}", XContentType.JSON)
104+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE))
105+
.actionGet();
106+
}
107+
}
108+
109+
@After
110+
public void cleanData() throws ExecutionException, InterruptedException {
111+
try (Client client = cluster.getInternalNodeClient()) {
112+
client.admin().indices()
113+
.delete(new DeleteIndexRequest(TEST_INDEX_NAME))
114+
.actionGet();
115+
116+
client.admin().cluster().deleteRepository(new DeleteRepositoryRequest(TEST_SNAPSHOT_REPOSITORY_NAME)).actionGet();
117+
}
118+
}
119+
120+
@Test
121+
public void testSecurityCacheReloadAfterRestore() throws Exception {
122+
// 1. Read data in custom index with LIMITED_READ_USER_1
123+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_1)) {
124+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
125+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
126+
}
127+
128+
// 2. Create snapshot of security index
129+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(ADMIN_USER)) {
130+
SnapshotSteps steps = new SnapshotSteps(restHighLevelClient);
131+
steps.createSnapshot(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap", securityIndex);
132+
steps.waitForSnapshotCreation(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap");
133+
}
134+
135+
// 3. Add new role and user to security index (This is not in snapshot created above)
136+
try (TestRestClient client = cluster.getRestClient(ADMIN_USER)) {
137+
client.createRole(LIMITED_READ_USER_2_ROLE.getName(), LIMITED_READ_USER_2_ROLE).assertStatusCode(201);
138+
client.createUser(LIMITED_READ_USER_2.getName(), LIMITED_READ_USER_2).assertStatusCode(201);
139+
client.assignRoleToUser(LIMITED_READ_USER_2.getName(), "limited-reader_2").assertStatusCode(200);
140+
}
141+
142+
// 4. Read data in custom index with LIMITED_READ_USER_2
143+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_2)) {
144+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
145+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
146+
}
147+
148+
// 5. Delete security index
149+
try (Client client = cluster.getInternalNodeClient()) {
150+
DeleteIndexRequest deleteRequest = new DeleteIndexRequest(securityIndex);
151+
client.admin().indices().delete(deleteRequest).actionGet();
152+
}
153+
154+
// 6. Restore security index
155+
try (Client client = cluster.getInternalNodeClient()) {
156+
RestoreSnapshotRequest restoreRequest = new RestoreSnapshotRequest(TEST_SNAPSHOT_REPOSITORY_NAME, "test-snap")
157+
.waitForCompletion(true)
158+
.indices(securityIndex); // restore security index
159+
160+
RestoreSnapshotResponse restoreResponse = client.admin().cluster()
161+
.restoreSnapshot(restoreRequest)
162+
.actionGet();
163+
164+
// Verify restore was successful
165+
assertEquals(RestStatus.OK, restoreResponse.status());
166+
}
167+
168+
// 7. Read data in custom index with LIMITED_READ_USER_1 because it was in snapshot
169+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_1)) {
170+
GetResponse response = restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
171+
assertThat(response, containDocument(TEST_INDEX_NAME, DOC_ID));
172+
}
173+
174+
// 8. Should get 401 error to read custom index with LIMITED_READ_USER_2 because it was not in snapshot
175+
try (RestHighLevelClient restHighLevelClient = cluster.getRestHighLevelClient(LIMITED_READ_USER_2)) {
176+
restHighLevelClient.get(new GetRequest(TEST_INDEX_NAME, DOC_ID), RequestOptions.DEFAULT);
177+
} catch (OpenSearchStatusException exception){
178+
assertEquals(RestStatus.UNAUTHORIZED, exception.status()); // Verify it's a 401
179+
}
180+
}
181+
}

src/main/java/org/opensearch/security/configuration/ConfigurationRepository.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -681,15 +681,12 @@ public void afterIndexShardStarted(IndexShard indexShard) {
681681

682682
// Check if this is a security index shard
683683
if (securityIndex.equals(index.getName())) {
684-
// Only trigger on primary shard to avoid multiple reloads
685-
if (indexShard.routingEntry() != null && indexShard.routingEntry().primary()) {
686-
threadPool.generic().execute(() -> {
687-
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
688-
LOGGER.info("Security index primary shard {} started - config reloading for snapshot restore", shardId);
689-
reloadConfiguration(CType.values());
690-
}
691-
});
692-
}
684+
threadPool.generic().execute(() -> {
685+
if (isSecurityIndexRestoredFromSnapshot(clusterService, index, securityIndex)) {
686+
LOGGER.info("Security index shard {} started - config reloading for snapshot restore", shardId);
687+
reloadConfiguration(CType.values());
688+
}
689+
});
693690
}
694691
}
695692
}

src/test/java/org/opensearch/security/configuration/ConfigurationRepositoryTest.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,6 @@ public void getConfigurationsFromIndex_SecurityIndexNotInitiallyReady() throws I
597597
public void afterIndexShardStarted_whenSecurityIndexUpdated() throws InterruptedException, TimeoutException {
598598
Settings settings = Settings.builder().build();
599599
IndexShard indexShard = mock(IndexShard.class);
600-
ShardRouting shardRouting = mock(ShardRouting.class);
601600
ShardId shardId = mock(ShardId.class);
602601
Index index = mock(Index.class);
603602
ClusterState mockClusterState = mock(ClusterState.class);
@@ -611,20 +610,11 @@ public void afterIndexShardStarted_whenSecurityIndexUpdated() throws Interrupted
611610
when(indexShard.shardId()).thenReturn(shardId);
612611
when(shardId.getIndex()).thenReturn(index);
613612
when(index.getName()).thenReturn(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX);
614-
when(indexShard.routingEntry()).thenReturn(shardRouting);
615613
when(clusterService.state()).thenReturn(mockClusterState);
616614
when(mockClusterState.custom(RestoreInProgress.TYPE)).thenReturn(mockRestore);
617615
when(threadPool.generic()).thenReturn(executorService);
618616

619-
// when replica shard updated
620-
when(shardRouting.primary()).thenReturn(false);
621-
configurationRepository.afterIndexShardStarted(indexShard);
622-
verify(executorService, never()).execute(any());
623-
verify(configurationRepository, never()).reloadConfiguration(any());
624-
625-
// when primary shard updated
626617
doReturn(true).when(configurationRepository).reloadConfiguration(any());
627-
when(shardRouting.primary()).thenReturn(true);
628618
when(mockRestore.iterator()).thenReturn(Collections.singletonList(mockEntry).iterator());
629619
when(mockEntry.indices()).thenReturn(Collections.singletonList(ConfigConstants.OPENDISTRO_SECURITY_DEFAULT_CONFIG_INDEX));
630620
ArgumentCaptor<Runnable> successRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
@@ -637,7 +627,6 @@ public void afterIndexShardStarted_whenSecurityIndexUpdated() throws Interrupted
637627
Mockito.reset(configurationRepository, executorService);
638628
ArgumentCaptor<Runnable> errorRunnableCaptor = ArgumentCaptor.forClass(Runnable.class);
639629
when(clusterService.state()).thenThrow(new RuntimeException("ClusterState exception"));
640-
when(shardRouting.primary()).thenReturn(true);
641630
configurationRepository.afterIndexShardStarted(indexShard);
642631
verify(executorService).execute(errorRunnableCaptor.capture());
643632
errorRunnableCaptor.getValue().run();

0 commit comments

Comments
 (0)