Skip to content

Commit e66accc

Browse files
JisoLyaPengzna
andauthored
fix(store): improve some potential lock & type cast issues (#2895)
* update(store): fix some problem and clean up code - chore(store): clean some comments - chore(store): using Slf4j instead of System.out to print log - update(store): update more reasonable timeout setting - update(store): add close method for CopyOnWriteCache to avoid potential memory leak - update(store): delete duplicated beginTx() statement - update(store): extract parameter for compaction thread pool(move to configuration file in the future) - update(store): add default logic in AggregationFunctions - update(store): fix potential concurrency problem in QueryExecutor * Update hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java --------- Co-authored-by: Peng Junzhi <78788603+Pengzna@users.noreply.github.com>
1 parent f92c5a4 commit e66accc

File tree

11 files changed

+70
-34
lines changed

11 files changed

+70
-34
lines changed

hugegraph-pd/hg-pd-client/src/main/java/org/apache/hugegraph/pd/client/PDConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public String toString() {
103103
public PDConfig setAuthority(String userName, String pwd) {
104104
this.userName = userName;
105105
String auth = userName + ':' + pwd;
106-
this.authority = new String(Base64.getEncoder().encode(auth.getBytes(UTF_8)));
106+
this.authority = Base64.getEncoder().encodeToString(auth.getBytes(UTF_8));
107107
return this;
108108
}
109109

hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/Load.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,9 @@ public void action(String[] params) throws InterruptedException {
121121
for (int i = 0; i < readerSize; i++) {
122122
int fi = i;
123123
new Thread(() -> {
124-
try {
125-
InputStreamReader isr = new InputStreamReader(new FileInputStream(split[fi]),
126-
StandardCharsets.UTF_8);
127-
BufferedReader reader = new BufferedReader(isr);
124+
try(InputStreamReader isr = new InputStreamReader(new FileInputStream(split[fi]),
125+
StandardCharsets.UTF_8);
126+
BufferedReader reader = new BufferedReader(isr)) {
128127
long count = 0;
129128
String line;
130129
try {
@@ -146,9 +145,6 @@ public void action(String[] params) throws InterruptedException {
146145
}
147146
} catch (Exception e) {
148147
throw new RuntimeException(e);
149-
} finally {
150-
isr.close();
151-
reader.close();
152148
}
153149
} catch (Exception e) {
154150
log.error("send data with error:", e);
@@ -158,13 +154,12 @@ public void action(String[] params) throws InterruptedException {
158154
}).start();
159155
}
160156
latch.await();
161-
loadThread.join();
162157
completed.set(true);
158+
loadThread.join();
163159
}
164160

165161
public boolean put(String table, List<String> keys) {
166162
HgStoreSession session = storeClient.openSession(graph);
167-
session.beginTx();
168163
try {
169164
session.beginTx();
170165
for (String key : keys) {

hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/MultiQuery.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public boolean hasNext() {
101101
current = (HgOwnerKey) queue[finalI].poll(1,
102102
TimeUnit.SECONDS);
103103
} catch (InterruptedException e) {
104-
//
104+
Thread.currentThread().interrupt();
105105
}
106106
}
107107
if (current == null) {

hugegraph-store/hg-store-cli/src/main/java/org/apache/hugegraph/store/cli/cmd/ScanTable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public void action(String[] params) throws PDException {
7373
if (iterator.hasNext()) {
7474
iterator.next();
7575
position = iterator.position();
76-
System.out.println("count is " + count);
76+
log.info("count is {}", count);
7777
} else {
7878
position = null;
7979
}

hugegraph-store/hg-store-client/src/main/java/org/apache/hugegraph/store/client/query/QueryExecutor.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,15 @@ public class QueryExecutor {
7373

7474
private final HugeGraphSupplier supplier;
7575

76-
private long timeout = 1800_000;
76+
/**
77+
* Timeout duration for StreamObserver receiving response
78+
*/
79+
private long timeout = 60_000;
7780

7881
/**
7982
* Used for testing single machine
8083
*/
81-
public static String filterStore = null;
84+
private static final ThreadLocal<String> filterStore = new ThreadLocal<>();
8285

8386
public QueryExecutor(HgStoreNodePartitioner nodePartitioner, HugeGraphSupplier supplier,
8487
Long timeout) {
@@ -123,12 +126,20 @@ public List<HgKvIterator<BaseElement>> getIterators(StoreQueryParam query) throw
123126
if (o1 == null && o2 == null) {
124127
return 0;
125128
}
126-
127-
if (o1 != null) {
128-
return ((KvElement) o1).compareTo((KvElement) o2);
129+
if (o1 != null && o2 != null) {
130+
if (o1 instanceof KvElement && o2 instanceof KvElement) {
131+
return ((KvElement) o1).compareTo((KvElement) o2);
132+
}
133+
if (!(o1 instanceof KvElement)) {
134+
throw new IllegalStateException(
135+
"Expected KvElement but got: " + o1.getClass().getName());
136+
}
137+
// !(o2 instanceof KvElement)
138+
throw new IllegalStateException(
139+
"Expected KvElement but got: " + o2.getClass().getName());
129140
}
130141

131-
return 0;
142+
return o1 != null ? 1 : -1;
132143
});
133144

134145
iterator = new StreamFinalAggregationIterator<>(iterator, query.getFuncList());
@@ -277,9 +288,10 @@ private List<Tuple2<String, QueryRequest.Builder>> getNodeTasks(StoreQueryParam
277288
}
278289
}
279290

280-
if (filterStore != null) {
281-
return tasks.containsKey(filterStore) ?
282-
List.of(Tuple2.of(filterStore, tasks.get(filterStore))) : List.of();
291+
if (filterStore.get() != null) {
292+
String filterStoreStr = filterStore.get();
293+
return tasks.containsKey(filterStoreStr) ?
294+
List.of(Tuple2.of(filterStoreStr, tasks.get(filterStoreStr))) : List.of();
283295
}
284296

285297
return tasks.entrySet().stream()

hugegraph-store/hg-store-common/src/main/java/org/apache/hugegraph/store/query/func/AggregationFunctions.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,11 @@ public void iterate(T record) {
7979
((AtomicFloat) buffer).getAndAdd((Float) record);
8080
break;
8181
default:
82-
// throw new Exception ?
83-
break;
82+
// throw new Exception
83+
throw new IllegalStateException(
84+
"Unsupported buffer type: " + buffer.getClass().getName() +
85+
". Supported types: AtomicLong, AtomicInteger, AtomicDouble, AtomicFloat"
86+
);
8487
}
8588
}
8689
}

hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/business/BusinessHandlerImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,17 @@ public class BusinessHandlerImpl implements BusinessHandler {
130130
}};
131131
private static final Map<Integer, String> dbNames = new ConcurrentHashMap<>();
132132
private static HugeGraphSupplier mockGraphSupplier = null;
133-
private static final int compactionThreadCount = 64;
134133
private static final ConcurrentMap<String, AtomicInteger> pathLock = new ConcurrentHashMap<>();
135134
private static final ConcurrentMap<Integer, AtomicInteger> compactionState =
136135
new ConcurrentHashMap<>();
136+
// Default core thread count
137+
private static final int compactionThreadCount = 64;
138+
private static final int compactionMaxThreadCount = 256;
139+
// Max size of compaction queue
140+
private static final int compactionQueueSize = 1000;
137141
private static final ThreadPoolExecutor compactionPool =
138142
ExecutorUtil.createExecutor(PoolNames.COMPACT, compactionThreadCount,
139-
compactionThreadCount * 4, Integer.MAX_VALUE);
143+
compactionMaxThreadCount, compactionQueueSize);
140144
private static final int timeoutMillis = 6 * 3600 * 1000;
141145
private final BinaryElementSerializer serializer = BinaryElementSerializer.getInstance();
142146
private final DirectBinarySerializer directBinarySerializer = new DirectBinarySerializer();
@@ -1667,4 +1671,8 @@ public void rollback() throws HgStoreException {
16671671
};
16681672
}
16691673
}
1674+
1675+
public static void clearCache() {
1676+
GRAPH_SUPPLIER_CACHE.clear();
1677+
}
16701678
}

hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/cmd/request/DestroyRaftRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
@Data
2828
public class DestroyRaftRequest extends HgCmdBase.BaseRequest {
2929

30-
private List<String> graphNames = new ArrayList<>();
30+
private final List<String> graphNames = new ArrayList<>();
3131

3232
public void addGraphName(String graphName) {
3333
graphNames.add(graphName);

hugegraph-store/hg-store-core/src/main/java/org/apache/hugegraph/store/util/CopyOnWriteCache.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collection;
2121
import java.util.Collections;
2222
import java.util.HashMap;
23+
import java.util.List;
2324
import java.util.Map;
2425
import java.util.Set;
2526
import java.util.concurrent.ConcurrentMap;
@@ -29,7 +30,9 @@
2930

3031
import org.jetbrains.annotations.NotNull;
3132

32-
//FIXME Missing shutdown method
33+
import lombok.extern.slf4j.Slf4j;
34+
35+
@Slf4j
3336
public class CopyOnWriteCache<K, V> implements ConcurrentMap<K, V> {
3437

3538
// Scheduled executor service for periodically clearing the cache.
@@ -263,4 +266,23 @@ public synchronized V replace(K k, V v) {
263266
return null;
264267
}
265268
}
269+
270+
public void close(){
271+
scheduledExecutor.shutdown();
272+
try {
273+
boolean isTerminated = scheduledExecutor.awaitTermination(30, TimeUnit.SECONDS);
274+
if (!isTerminated) {
275+
List<Runnable> runnables = scheduledExecutor.shutdownNow();
276+
log.info("CopyOnWriteCache shutting down with {} tasks left", runnables.size());
277+
278+
boolean isNowTerminated = scheduledExecutor.awaitTermination(30, TimeUnit.SECONDS);
279+
if (!isNowTerminated) {
280+
log.warn("Failed to shutdown CopyOnWriteCache thread pool");
281+
}
282+
}
283+
}catch (InterruptedException e) {
284+
scheduledExecutor.shutdownNow();
285+
Thread.currentThread().interrupt();
286+
}
287+
}
266288
}

hugegraph-store/hg-store-node/src/main/java/org/apache/hugegraph/store/node/AppConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,9 +278,9 @@ public class QueryPushDownConfig {
278278
private int fetchBatchSize;
279279

280280
/**
281-
* the timeout of request fetch
281+
* the timeout of request fetch (ms)
282282
*/
283-
@Value("${query.push-down.fetch_timeout:3600000}")
283+
@Value("${query.push-down.fetch_timeout:300000}")
284284
private long fetchTimeOut;
285285

286286
/**

0 commit comments

Comments
 (0)