Skip to content

Commit d13f656

Browse files
committed
Handle CFS invalidate event and close the CompressionDictionaryManager
1 parent e7037f1 commit d13f656

File tree

4 files changed

+183
-27
lines changed

4 files changed

+183
-27
lines changed

src/java/org/apache/cassandra/db/ColumnFamilyStore.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ public ColumnFamilyStore(Keyspace keyspace,
579579
streamManager = new CassandraStreamManager(this);
580580
repairManager = new CassandraTableRepairManager(this);
581581
sstableImporter = new SSTableImporter(this);
582-
compressionDictionaryManager = new CompressionDictionaryManager(this);
582+
compressionDictionaryManager = new CompressionDictionaryManager(this, registerBookeeping);
583583

584584
if (DatabaseDescriptor.isClientOrToolInitialized() || SchemaConstants.isSystemKeyspace(getKeyspaceName()))
585585
topPartitions = null;
@@ -737,6 +737,8 @@ public void invalidate(boolean expectMBean, boolean dropData)
737737
invalidateCaches();
738738
if (topPartitions != null)
739739
topPartitions.close();
740+
741+
compressionDictionaryManager.close();
740742
}
741743

742744
/**

src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.cassandra.schema.CompressionParams;
3333
import org.apache.cassandra.schema.SystemDistributedKeyspace;
3434
import org.apache.cassandra.utils.MBeanWrapper;
35+
import org.apache.cassandra.utils.MBeanWrapper.OnException;
3536

3637
public class CompressionDictionaryManager implements CompressionDictionaryManagerMBean,
3738
ICompressionDictionaryCache,
@@ -42,15 +43,16 @@ public class CompressionDictionaryManager implements CompressionDictionaryManage
4243

4344
private final String keyspaceName;
4445
private final String tableName;
46+
private volatile boolean mbeanRegistered;
47+
private volatile boolean isEnabled;
48+
4549
// Components
4650
private final ICompressionDictionaryEventHandler eventHandler;
4751
private final ICompressionDictionaryCache cache;
4852
private final ICompressionDictionaryScheduler scheduler;
4953
private ICompressionDictionaryTrainer trainer = null;
5054

51-
private volatile boolean isEnabled;
52-
53-
public CompressionDictionaryManager(ColumnFamilyStore columnFamilyStore)
55+
public CompressionDictionaryManager(ColumnFamilyStore columnFamilyStore, boolean registerBookkeeping)
5456
{
5557
this.keyspaceName = columnFamilyStore.keyspace.getName();
5658
this.tableName = columnFamilyStore.getTableName();
@@ -71,15 +73,18 @@ public CompressionDictionaryManager(ColumnFamilyStore columnFamilyStore)
7173

7274
trainer.start(false);
7375
}
74-
registerMBean(keyspaceName, tableName, this);
76+
77+
if (registerBookkeeping)
78+
{
79+
MBeanWrapper.instance.registerMBean(this, mbeanName(keyspaceName, tableName));
80+
}
81+
mbeanRegistered = registerBookkeeping;
7582
}
7683

77-
private static void registerMBean(String keyspaceName, String tableName, CompressionDictionaryManagerMBean mBean)
84+
static String mbeanName(String keyspaceName, String tableName)
7885
{
79-
// Register as MBean for this specific table
80-
String mbeanName = "org.apache.cassandra.db.compression:type=CompressionDictionaryManager" +
81-
",keyspace=" + keyspaceName + ",table=" + tableName;
82-
MBeanWrapper.instance.registerMBean(mBean, mbeanName);
86+
return "org.apache.cassandra.db.compression:type=CompressionDictionaryManager" +
87+
",keyspace=" + keyspaceName + ",table=" + tableName;
8388
}
8489

8590
public boolean isEnabled()
@@ -243,18 +248,18 @@ public void updateSamplingRate(int samplingRate)
243248

244249
/**
245250
* Close all the resources. The method can be called multiple times.
246-
* @throws Exception if this resource cannot be closed
247251
*/
248252
@Override
249-
public synchronized void close() throws Exception
253+
public synchronized void close()
250254
{
255+
unregisterMbean();
251256
if (trainer != null)
252257
{
253-
trainer.close();
258+
closeQuitely(trainer, "CompressionDictionaryTrainer");
254259
trainer = null;
255260
}
256-
cache.close();
257-
scheduler.close();
261+
closeQuitely(cache, "CompressionDictionaryCache");
262+
closeQuitely(scheduler, "CompressionDictionaryScheduler");
258263
}
259264

260265
private void handleNewDictionary(CompressionDictionary dictionary)
@@ -306,6 +311,27 @@ private boolean shouldCreateNewTrainer(CompressionParams newParams)
306311
return !trainer.isCompatibleWith(newParams);
307312
}
308313

314+
private void unregisterMbean()
315+
{
316+
if (mbeanRegistered)
317+
{
318+
MBeanWrapper.instance.unregisterMBean(mbeanName(keyspaceName, tableName), OnException.IGNORE);
319+
mbeanRegistered = true;
320+
}
321+
}
322+
323+
private void closeQuitely(AutoCloseable closeable, String objectName)
324+
{
325+
try
326+
{
327+
closeable.close();
328+
}
329+
catch (Exception exception)
330+
{
331+
logger.warn("Failed closing {}", objectName, exception);
332+
}
333+
}
334+
309335
@VisibleForTesting
310336
boolean isReady()
311337
{
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.cassandra.db.compression;
20+
21+
import java.util.Map;
22+
23+
import org.junit.Before;
24+
import org.junit.BeforeClass;
25+
import org.junit.Test;
26+
27+
import org.apache.cassandra.SchemaLoader;
28+
import org.apache.cassandra.ServerTestUtils;
29+
import org.apache.cassandra.db.ColumnFamilyStore;
30+
import org.apache.cassandra.db.Keyspace;
31+
import org.apache.cassandra.schema.CompressionParams;
32+
import org.apache.cassandra.schema.KeyspaceParams;
33+
import org.apache.cassandra.schema.TableMetadata;
34+
import org.apache.cassandra.utils.MBeanWrapper;
35+
import org.apache.cassandra.utils.MBeanWrapper.OnException;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
39+
public class CompressionDictionaryManagerMBeanTest
40+
{
41+
private static final String KEYSPACE_WITH_DICT = "keyspace_mbean_test";
42+
private static final String TABLE = "test_table";
43+
44+
private static ColumnFamilyStore cfsWithDict;
45+
46+
@BeforeClass
47+
public static void setUpClass() throws Exception
48+
{
49+
ServerTestUtils.prepareServer();
50+
CompressionParams compressionParams = CompressionParams.zstd(CompressionParams.DEFAULT_CHUNK_LENGTH, true,
51+
Map.of("compression_level", "3"));
52+
TableMetadata.Builder tableBuilder = TableMetadata.builder(KEYSPACE_WITH_DICT, TABLE)
53+
.addPartitionKeyColumn("pk", org.apache.cassandra.db.marshal.UTF8Type.instance)
54+
.addRegularColumn("data", org.apache.cassandra.db.marshal.UTF8Type.instance)
55+
.compression(compressionParams);
56+
SchemaLoader.createKeyspace(KEYSPACE_WITH_DICT,
57+
KeyspaceParams.simple(1),
58+
tableBuilder);
59+
cfsWithDict = Keyspace.open(KEYSPACE_WITH_DICT).getColumnFamilyStore(TABLE);
60+
}
61+
62+
// Ensure no mbean is registered at the begining of the test
63+
@Before
64+
public void cleanup()
65+
{
66+
String mbeanName = CompressionDictionaryManager.mbeanName(KEYSPACE_WITH_DICT, TABLE);
67+
MBeanWrapper.instance.unregisterMBean(mbeanName, OnException.IGNORE);
68+
}
69+
70+
@Test
71+
public void testMBeanRegisteredWhenBookkeepingEnabled()
72+
{
73+
String mbeanName = CompressionDictionaryManager.mbeanName(KEYSPACE_WITH_DICT, TABLE);
74+
// Create manager with bookkeeping enabled
75+
try (CompressionDictionaryManager manager = new CompressionDictionaryManager(cfsWithDict, true))
76+
{
77+
// Verify MBean is registered
78+
assertThat(MBeanWrapper.instance.isRegistered(mbeanName))
79+
.as("MBean should be registered when bookkeeping is enabled")
80+
.isTrue();
81+
}
82+
// Closing manager should unregister the mbean; Verify it is unregistered
83+
assertThat(MBeanWrapper.instance.isRegistered(mbeanName))
84+
.as("MBean should be unregistered after unregisterMbean() call")
85+
.isFalse();
86+
}
87+
88+
@Test
89+
public void testMBeanNotRegisteredWhenBookkeepingDisabled()
90+
{
91+
// Create manager with bookkeeping disabled
92+
try (CompressionDictionaryManager manager = new CompressionDictionaryManager(cfsWithDict, false))
93+
{
94+
// Verify MBean is NOT registered
95+
String mbeanName = CompressionDictionaryManager.mbeanName(KEYSPACE_WITH_DICT, TABLE);;
96+
assertThat(MBeanWrapper.instance.isRegistered(mbeanName))
97+
.as("MBean should not be registered when bookkeeping is disabled")
98+
.isFalse();
99+
}
100+
// Closing manager should not throw due to mbean not registered
101+
}
102+
103+
@Test
104+
public void testMBeanUnregisteredOnCFSInvalidation()
105+
{
106+
String testKeyspace = "test_invalidation_mbean_ks";
107+
String testTable = "test_invalidation_mbean_table";
108+
109+
CompressionParams compressionParams = CompressionParams.zstd(CompressionParams.DEFAULT_CHUNK_LENGTH, true,
110+
Map.of("compression_level", "3"));
111+
112+
TableMetadata.Builder tableBuilder = TableMetadata.builder(testKeyspace, testTable)
113+
.addPartitionKeyColumn("pk", org.apache.cassandra.db.marshal.UTF8Type.instance)
114+
.addRegularColumn("data", org.apache.cassandra.db.marshal.UTF8Type.instance)
115+
.compression(compressionParams);
116+
117+
SchemaLoader.createKeyspace(testKeyspace,
118+
KeyspaceParams.simple(1),
119+
tableBuilder);
120+
121+
ColumnFamilyStore cfs = Keyspace.open(testKeyspace).getColumnFamilyStore(testTable);
122+
123+
String mbeanName = CompressionDictionaryManager.mbeanName(testKeyspace, testTable);
124+
125+
// Verify MBean is registered (CFS registers it during creation)
126+
assertThat(MBeanWrapper.instance.isRegistered(mbeanName))
127+
.as("MBean should be registered after CFS creation")
128+
.isTrue();
129+
130+
// Invalidate the CFS (which should unregister the MBean)
131+
cfs.invalidate(true, true);
132+
133+
// Verify MBean is unregistered
134+
assertThat(MBeanWrapper.instance.isRegistered(mbeanName))
135+
.as("MBean should be unregistered after CFS invalidation")
136+
.isFalse();
137+
}
138+
}

test/unit/org/apache/cassandra/db/compression/CompressionDictionaryManagerTest.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import org.apache.cassandra.config.CassandraRelevantProperties;
3232
import org.apache.cassandra.db.ColumnFamilyStore;
3333
import org.apache.cassandra.db.Keyspace;
34-
import org.apache.cassandra.db.compression.CompressionDictionary.DictId;
35-
import org.apache.cassandra.db.compression.CompressionDictionary.Kind;
3634
import org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus;
3735
import org.apache.cassandra.schema.CompressionParams;
3836
import org.apache.cassandra.schema.KeyspaceParams;
@@ -92,9 +90,8 @@ public static void setUpClass() throws Exception
9290
@Before
9391
public void setUp()
9492
{
95-
// Create managers - they will auto-initialize based on table compression params
96-
managerWithDict = new CompressionDictionaryManager(cfsWithDict);
97-
managerWithoutDict = new CompressionDictionaryManager(cfsWithoutDict);
93+
managerWithDict = new CompressionDictionaryManager(cfsWithDict, true);
94+
managerWithoutDict = new CompressionDictionaryManager(cfsWithoutDict, true);
9895
}
9996

10097
@After
@@ -335,11 +332,4 @@ public void testUpdateSamplingRateValidation()
335332
.isInstanceOf(IllegalArgumentException.class)
336333
.hasMessageContaining("Sampling rate must be positive");
337334
}
338-
339-
private static ZstdCompressionDictionary createTestDictionary()
340-
{
341-
byte[] dictBytes = "test dictionary data for manager testing".getBytes();
342-
DictId dictId = new DictId(Kind.ZSTD, System.currentTimeMillis());
343-
return new ZstdCompressionDictionary(dictId, dictBytes);
344-
}
345335
}

0 commit comments

Comments
 (0)