Skip to content

Commit 6ca587f

Browse files
authored
Modify thread (thulab#483)
* When TEST_DATA_PERSISTENCE is equal to none, no longer create a ResultPersistence thread. * fix printService thread. * PeriodicalUpdateDNList * add ThreadName.class add ThreadName.java add ThreadName. delete log * Fix concurrency issue about threadNameLoopIndexMap * Fix the Printing Thread. * Drop ENABLE_AUTO_FETCH. * Replacing data types within TaskProgress: replacing atomic with volatile * Spotless
1 parent 05b154d commit 6ca587f

File tree

16 files changed

+205
-72
lines changed

16 files changed

+205
-72
lines changed

core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/DataClient.java

Lines changed: 17 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import cn.edu.tsinghua.iot.benchmark.client.generate.GenerateDataDeviceClient;
2323
import cn.edu.tsinghua.iot.benchmark.client.generate.GenerateDataMixClient;
2424
import cn.edu.tsinghua.iot.benchmark.client.generate.GenerateDataWriteClient;
25+
import cn.edu.tsinghua.iot.benchmark.client.progress.TaskProgress;
2526
import cn.edu.tsinghua.iot.benchmark.client.real.RealDataSetQueryClient;
2627
import cn.edu.tsinghua.iot.benchmark.client.real.RealDataSetWriteClient;
2728
import cn.edu.tsinghua.iot.benchmark.conf.Config;
@@ -32,7 +33,6 @@
3233
import cn.edu.tsinghua.iot.benchmark.tsdb.DBConfig;
3334
import cn.edu.tsinghua.iot.benchmark.tsdb.DBWrapper;
3435
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
35-
import cn.edu.tsinghua.iot.benchmark.utils.NamedThreadFactory;
3636
import cn.edu.tsinghua.iot.benchmark.workload.DataWorkLoad;
3737
import cn.edu.tsinghua.iot.benchmark.workload.QueryWorkLoad;
3838
import cn.edu.tsinghua.iot.benchmark.workload.interfaces.IDataWorkLoad;
@@ -53,20 +53,20 @@ public abstract class DataClient implements Runnable {
5353

5454
/** The id of client */
5555
protected final int clientThreadId;
56+
5657
/** RealDataWorkload */
5758
protected final IDataWorkLoad dataWorkLoad;
59+
5860
/** QueryWorkload */
5961
protected final IQueryWorkLoad queryWorkLoad;
60-
/** Log related */
61-
protected final ScheduledExecutorService service;
62+
6263
/** Tested DataBase */
6364
protected DBWrapper dbWrapper = null;
65+
6466
/** Related Schema */
6567
protected final List<DeviceSchema> clientDeviceSchemas;
66-
/** Total number of loop */
67-
protected long totalLoop = 0;
68-
/** Loop Index, using for loop and log */
69-
protected long loopIndex = 0;
68+
69+
protected TaskProgress taskProgress;
7070

7171
/** Control the status */
7272
protected AtomicBoolean isStop = new AtomicBoolean(false);
@@ -76,35 +76,34 @@ public abstract class DataClient implements Runnable {
7676

7777
private final CyclicBarrier barrier;
7878

79-
public DataClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
79+
public DataClient(
80+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
8081
this.countDownLatch = countDownLatch;
8182
this.barrier = barrier;
8283
this.dataWorkLoad = DataWorkLoad.getInstance(id);
8384
this.queryWorkLoad = QueryWorkLoad.getInstance(id);
8485
this.clientThreadId = id;
8586
this.clientDeviceSchemas =
8687
MetaDataSchema.getInstance().getDeviceSchemaByDataClientId(clientThreadId);
87-
this.service =
88-
Executors.newSingleThreadScheduledExecutor(
89-
new NamedThreadFactory("ShowWorkProgress-" + clientThreadId));
88+
this.taskProgress = taskProgress;
9089
initDBWrappers();
9190
}
9291

9392
public static DataClient getInstance(
94-
int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
93+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
9594
switch (config.getBENCHMARK_WORK_MODE()) {
9695
case TEST_WITH_DEFAULT_PATH:
9796
if (config.isIS_POINT_COMPARISON()) {
98-
return new GenerateDataDeviceClient(id, countDownLatch, barrier);
97+
return new GenerateDataDeviceClient(id, countDownLatch, barrier, taskProgress);
9998
} else {
100-
return new GenerateDataMixClient(id, countDownLatch, barrier);
99+
return new GenerateDataMixClient(id, countDownLatch, barrier, taskProgress);
101100
}
102101
case GENERATE_DATA:
103-
return new GenerateDataWriteClient(id, countDownLatch, barrier);
102+
return new GenerateDataWriteClient(id, countDownLatch, barrier, taskProgress);
104103
case VERIFICATION_WRITE:
105-
return new RealDataSetWriteClient(id, countDownLatch, barrier);
104+
return new RealDataSetWriteClient(id, countDownLatch, barrier, taskProgress);
106105
case VERIFICATION_QUERY:
107-
return new RealDataSetQueryClient(id, countDownLatch, barrier);
106+
return new RealDataSetQueryClient(id, countDownLatch, barrier, taskProgress);
108107
default:
109108
LOGGER.warn("No need to create client" + config.getBENCHMARK_WORK_MODE());
110109
break;
@@ -123,28 +122,14 @@ public void run() {
123122
if (dbWrapper != null) {
124123
dbWrapper.init();
125124
}
125+
taskProgress.setThreadName(Thread.currentThread().getName());
126126
// wait for that all dataClients start test simultaneously
127127
barrier.await();
128128

129-
String currentThread = Thread.currentThread().getName();
130-
131-
if (!config.isIS_POINT_COMPARISON()) {
132-
// print current progress periodically
133-
service.scheduleAtFixedRate(
134-
() -> {
135-
String percent = String.format("%.2f", loopIndex * 100.0D / this.totalLoop);
136-
LOGGER.info("{} {}% workload is done.", currentThread, percent);
137-
},
138-
1,
139-
config.getLOG_PRINT_INTERVAL(),
140-
TimeUnit.SECONDS);
141-
}
142-
143129
doTest();
144130
} catch (Exception e) {
145131
LOGGER.error("Unexpected error: ", e);
146132
} finally {
147-
service.shutdown();
148133
try {
149134
if (dbWrapper != null) {
150135
dbWrapper.close();

core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateBaseClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package cn.edu.tsinghua.iot.benchmark.client.generate;
2121

2222
import cn.edu.tsinghua.iot.benchmark.client.DataClient;
23+
import cn.edu.tsinghua.iot.benchmark.client.progress.TaskProgress;
2324
import cn.edu.tsinghua.iot.benchmark.distribution.ProbTool;
2425
import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch;
2526
import cn.edu.tsinghua.iot.benchmark.schema.MetaUtil;
@@ -46,8 +47,9 @@ public abstract class GenerateBaseClient extends DataClient implements Runnable
4647
/** Actual deviceFloor */
4748
protected int actualDeviceFloor;
4849

49-
public GenerateBaseClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
50-
super(id, countDownLatch, barrier);
50+
public GenerateBaseClient(
51+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
52+
super(id, countDownLatch, barrier, taskProgress);
5153
insertLoopIndex = 0;
5254
actualDeviceFloor = (int) (config.getDEVICE_NUMBER() * config.getREAL_INSERT_RATE());
5355
actualDeviceFloor = MetaUtil.getDeviceId(actualDeviceFloor);
@@ -56,7 +58,7 @@ public GenerateBaseClient(int id, CountDownLatch countDownLatch, CyclicBarrier b
5658
@Override
5759
protected void initDBWrappers() {
5860
super.initDBWrappers();
59-
this.totalLoop = config.getLOOP();
61+
taskProgress.setTotalLoop(config.getLOOP());
6062
}
6163

6264
/** Check whether write batch */

core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataDeviceClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package cn.edu.tsinghua.iot.benchmark.client.generate;
2121

22+
import cn.edu.tsinghua.iot.benchmark.client.progress.TaskProgress;
2223
import cn.edu.tsinghua.iot.benchmark.entity.DeviceSummary;
2324
import cn.edu.tsinghua.iot.benchmark.tsdb.TsdbException;
2425
import cn.edu.tsinghua.iot.benchmark.workload.query.impl.DeviceQuery;
@@ -37,8 +38,9 @@ public class GenerateDataDeviceClient extends GenerateBaseClient {
3738
* config.getBATCH_SIZE_PER_WRITE();
3839
private int now = 0;
3940

40-
public GenerateDataDeviceClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
41-
super(id, countDownLatch, barrier);
41+
public GenerateDataDeviceClient(
42+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
43+
super(id, countDownLatch, barrier, taskProgress);
4244
}
4345

4446
@Override

core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataMixClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import cn.edu.tsinghua.iot.benchmark.client.operation.Operation;
2323
import cn.edu.tsinghua.iot.benchmark.client.operation.OperationController;
24+
import cn.edu.tsinghua.iot.benchmark.client.progress.TaskProgress;
2425
import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch;
2526

2627
import java.util.Random;
@@ -34,8 +35,9 @@ public class GenerateDataMixClient extends GenerateBaseClient {
3435

3536
private final Random random = new Random(config.getDATA_SEED() + clientThreadId);
3637

37-
public GenerateDataMixClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
38-
super(id, countDownLatch, barrier);
38+
public GenerateDataMixClient(
39+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
40+
super(id, countDownLatch, barrier, taskProgress);
3941
// TODO exclude control model
4042
this.operationController = new OperationController(id);
4143
}
@@ -44,7 +46,8 @@ public GenerateDataMixClient(int id, CountDownLatch countDownLatch, CyclicBarrie
4446
@Override
4547
protected void doTest() {
4648
long start = 0;
47-
for (loopIndex = 0; loopIndex < config.getLOOP(); loopIndex++) {
49+
taskProgress.resetLoopIndex();
50+
for (; taskProgress.getLoopIndex() < config.getLOOP(); taskProgress.incrementLoopIndex()) {
4851
Operation operation = operationController.getNextOperationType();
4952
if (config.getOP_MIN_INTERVAL() > 0) {
5053
start = System.currentTimeMillis();

core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/generate/GenerateDataWriteClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package cn.edu.tsinghua.iot.benchmark.client.generate;
2121

22+
import cn.edu.tsinghua.iot.benchmark.client.progress.TaskProgress;
2223
import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch;
2324
import cn.edu.tsinghua.iot.benchmark.extern.DataWriter;
2425

@@ -29,14 +30,16 @@
2930
public class GenerateDataWriteClient extends GenerateBaseClient {
3031
private DataWriter dataWriter = DataWriter.getDataWriter();
3132

32-
public GenerateDataWriteClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
33-
super(id, countDownLatch, barrier);
33+
public GenerateDataWriteClient(
34+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
35+
super(id, countDownLatch, barrier, taskProgress);
3436
}
3537

3638
/** Do Operations */
3739
@Override
3840
protected void doTest() {
39-
for (loopIndex = 0; loopIndex < config.getLOOP(); loopIndex++) {
41+
taskProgress.resetLoopIndex();
42+
for (; taskProgress.getLoopIndex() < config.getLOOP(); taskProgress.incrementLoopIndex()) {
4043
if (!doGenerate()) {
4144
break;
4245
}
@@ -68,6 +71,6 @@ private boolean doGenerate() {
6871
@Override
6972
protected void initDBWrappers() {
7073
// do nothing
71-
this.totalLoop = config.getLOOP();
74+
taskProgress.setTotalLoop(config.getLOOP());
7275
}
7376
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package cn.edu.tsinghua.iot.benchmark.client.progress;
2+
3+
import java.util.concurrent.atomic.AtomicReference;
4+
5+
public class TaskProgress {
6+
/** Thread name */
7+
private final AtomicReference<String> threadNameAtomic;
8+
/** Total number of loop */
9+
private volatile long totalLoop;
10+
/** Loop Index, using for loop and log */
11+
private volatile long loopIndex;
12+
13+
public TaskProgress() {
14+
this.threadNameAtomic = new AtomicReference<>();
15+
this.totalLoop = 1L;
16+
this.loopIndex = 0L;
17+
}
18+
19+
public void setThreadName(String threadName) {
20+
threadNameAtomic.set(threadName);
21+
}
22+
23+
public String getThreadName() {
24+
return threadNameAtomic.get();
25+
}
26+
27+
public void setTotalLoop(Long value) {
28+
totalLoop = value;
29+
}
30+
31+
public long getLoopIndex() {
32+
return loopIndex;
33+
}
34+
35+
public void resetLoopIndex() {
36+
loopIndex = 0L;
37+
}
38+
39+
public void incrementLoopIndex() {
40+
loopIndex++;
41+
}
42+
43+
public double getPercent() {
44+
if (totalLoop == 0) {
45+
return 0.00D;
46+
}
47+
return (double) loopIndex * 100.0D / totalLoop;
48+
}
49+
}

core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/real/RealBaseClient.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package cn.edu.tsinghua.iot.benchmark.client.real;
2121

2222
import cn.edu.tsinghua.iot.benchmark.client.DataClient;
23+
import cn.edu.tsinghua.iot.benchmark.client.progress.TaskProgress;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
2526

@@ -30,16 +31,17 @@ public abstract class RealBaseClient extends DataClient implements Runnable {
3031

3132
protected static final Logger LOGGER = LoggerFactory.getLogger(RealBaseClient.class);
3233

33-
public RealBaseClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
34-
super(id, countDownLatch, barrier);
34+
public RealBaseClient(
35+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
36+
super(id, countDownLatch, barrier, taskProgress);
3537
}
3638

3739
@Override
3840
protected void initDBWrappers() {
3941
super.initDBWrappers();
40-
this.totalLoop = this.dataWorkLoad.getBatchNumber();
42+
taskProgress.setTotalLoop(this.dataWorkLoad.getBatchNumber());
4143
if (!config.isIS_SENSOR_TS_ALIGNMENT()) {
42-
this.totalLoop *= config.getSENSOR_NUMBER();
44+
taskProgress.setTotalLoop(this.dataWorkLoad.getBatchNumber() * config.getSENSOR_NUMBER());
4345
}
4446
}
4547
}

core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/real/RealDataSetQueryClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package cn.edu.tsinghua.iot.benchmark.client.real;
2121

22+
import cn.edu.tsinghua.iot.benchmark.client.progress.TaskProgress;
2223
import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch;
2324
import cn.edu.tsinghua.iot.benchmark.workload.query.impl.VerificationQuery;
2425

@@ -30,8 +31,9 @@ public class RealDataSetQueryClient extends RealBaseClient {
3031

3132
private final Random random = new Random(config.getDATA_SEED() + clientThreadId);
3233

33-
public RealDataSetQueryClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
34-
super(id, countDownLatch, barrier);
34+
public RealDataSetQueryClient(
35+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
36+
super(id, countDownLatch, barrier, taskProgress);
3537
}
3638

3739
/** Do Operations */
@@ -49,7 +51,7 @@ protected void doTest() {
4951
}
5052
VerificationQuery verificationQuery = queryWorkLoad.getVerifiedQuery(batch);
5153
dbWrapper.verificationQuery(verificationQuery);
52-
loopIndex++;
54+
taskProgress.incrementLoopIndex();
5355
if (isStop.get()) {
5456
break;
5557
}

core/src/main/java/cn/edu/tsinghua/iot/benchmark/client/real/RealDataSetWriteClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package cn.edu.tsinghua.iot.benchmark.client.real;
2121

22+
import cn.edu.tsinghua.iot.benchmark.client.progress.TaskProgress;
2223
import cn.edu.tsinghua.iot.benchmark.entity.Batch.IBatch;
2324
import cn.edu.tsinghua.iot.benchmark.exception.DBConnectException;
2425

@@ -30,8 +31,9 @@ public class RealDataSetWriteClient extends RealBaseClient {
3031

3132
private final Random random = new Random(config.getDATA_SEED() + clientThreadId);
3233

33-
public RealDataSetWriteClient(int id, CountDownLatch countDownLatch, CyclicBarrier barrier) {
34-
super(id, countDownLatch, barrier);
34+
public RealDataSetWriteClient(
35+
int id, CountDownLatch countDownLatch, CyclicBarrier barrier, TaskProgress taskProgress) {
36+
super(id, countDownLatch, barrier, taskProgress);
3537
}
3638

3739
/** Do Operations */
@@ -48,7 +50,7 @@ protected void doTest() {
4850
start = System.currentTimeMillis();
4951
}
5052
dbWrapper.insertOneBatchWithCheck(batch);
51-
loopIndex++;
53+
taskProgress.incrementLoopIndex();
5254
if (isStop.get()) {
5355
break;
5456
}

0 commit comments

Comments
 (0)