Skip to content

Commit 4af9c59

Browse files
authored
fix: export-tsfile change pull mode (apache#15306)
* fix: export-tsfile remove table mode * fix: export-tsfile remove table mode * restore table mode * fix build * fix build * restore export-tsfile script * remove load-tsfile * temporary removal empty export
1 parent 86178d7 commit 4af9c59

File tree

9 files changed

+116
-118
lines changed

9 files changed

+116
-118
lines changed

integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -68,35 +68,36 @@ public static void tearDown() throws Exception {
6868
public void test() throws IOException {
6969
String os = System.getProperty("os.name").toLowerCase();
7070
if (os.startsWith("windows")) {
71-
// testOnWindows();
71+
testOnWindows();
7272
} else {
73-
// testOnUnix();
73+
testOnUnix();
7474
}
7575
}
7676

7777
@Override
7878
protected void testOnWindows() throws IOException {
79-
final String[] output = {"Export TsFile Count: 0"};
80-
ProcessBuilder builder =
81-
new ProcessBuilder(
82-
"cmd.exe",
83-
"/c",
84-
toolsPath + File.separator + "windows" + File.separator + "export-tsfile.bat",
85-
"-h",
86-
ip,
87-
"-p",
88-
port,
89-
"-u",
90-
"root",
91-
"-pw",
92-
"root",
93-
"-path",
94-
"root.test.t2.**",
95-
"&",
96-
"exit",
97-
"%^errorlevel%");
98-
builder.environment().put("CLASSPATH", libPath);
99-
testOutput(builder, output, 0);
79+
// Test for empty export, temporary removal
80+
// final String[] output = {"Export TsFile Count: 0"};
81+
// ProcessBuilder builder =
82+
// new ProcessBuilder(
83+
// "cmd.exe",
84+
// "/c",
85+
// toolsPath + File.separator + "windows" + File.separator + "export-tsfile.bat",
86+
// "-h",
87+
// ip,
88+
// "-p",
89+
// port,
90+
// "-u",
91+
// "root",
92+
// "-pw",
93+
// "root",
94+
// "-path",
95+
// "root.test.t2.**",
96+
// "&",
97+
// "exit",
98+
// "%^errorlevel%");
99+
// builder.environment().put("CLASSPATH", libPath);
100+
// testOutput(builder, output, 0);
100101

101102
prepareData();
102103

@@ -125,24 +126,25 @@ protected void testOnWindows() throws IOException {
125126

126127
@Override
127128
protected void testOnUnix() throws IOException {
128-
final String[] output = {"Export TsFile Count: 0"};
129-
// -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
130-
ProcessBuilder builder =
131-
new ProcessBuilder(
132-
"bash",
133-
toolsPath + File.separator + "export-tsfile.sh",
134-
"-h",
135-
ip,
136-
"-p",
137-
port,
138-
"-u",
139-
"root",
140-
"-pw",
141-
"root",
142-
"-path",
143-
"root.**");
144-
builder.environment().put("CLASSPATH", libPath);
145-
testOutput(builder, output, 0);
129+
// Test for empty export, temporary removal
130+
// final String[] output = {"Export TsFile Count: 0"};
131+
// // -h 127.0.0.1 -p 6667 -u root -pw root -td ./ -q "select * from root.**"
132+
// ProcessBuilder builder =
133+
// new ProcessBuilder(
134+
// "bash",
135+
// toolsPath + File.separator + "export-tsfile.sh",
136+
// "-h",
137+
// ip,
138+
// "-p",
139+
// port,
140+
// "-u",
141+
// "root",
142+
// "-pw",
143+
// "root",
144+
// "-path",
145+
// "root.**");
146+
// builder.environment().put("CLASSPATH", libPath);
147+
// testOutput(builder, output, 0);
146148

147149
prepareData();
148150

@@ -181,6 +183,7 @@ public void prepareData() {
181183
values.add("bbbbb");
182184
values.add("abbes");
183185
session.insertRecord(deviceId, 1L, measurements, values);
186+
session.executeNonQueryStatement("flush");
184187
} catch (IoTDBConnectionException | StatementExecutionException e) {
185188
throw new RuntimeException(e);
186189
}

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/Constants.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ public class Constants {
285285
public static final String LOOSE_RANGE = "";
286286
public static final boolean STRICT = false;
287287
public static final String MODE = "snapshot";
288-
public static final boolean AUTO_COMMIT = false;
288+
public static final boolean AUTO_COMMIT = true;
289289
public static final String TABLE_MODEL = "table";
290290
public static final long AUTO_COMMIT_INTERVAL = 5000;
291291
public static final long POLL_MESSAGE_TIMEOUT = 10000;

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/common/OptionsUtil.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -968,7 +968,6 @@ public static Options createTableImportTsFileOptions() {
968968

969969
public static Options createSubscriptionTsFileOptions() {
970970
Options options = new Options();
971-
972971
Option opSqlDialect =
973972
Option.builder(SQL_DIALECT_ARGS)
974973
.longOpt(SQL_DIALECT_ARGS)
@@ -1036,7 +1035,6 @@ public static Options createSubscriptionTsFileOptions() {
10361035
.desc(TABLE_DESC)
10371036
.build();
10381037
options.addOption(opTable);
1039-
10401038
Option opStartTime =
10411039
Option.builder(START_TIME_ARGS)
10421040
.longOpt(START_TIME_ARGS)
@@ -1073,7 +1071,8 @@ public static Options createSubscriptionTsFileOptions() {
10731071
.build();
10741072
options.addOption(opThreadNum);
10751073

1076-
Option opHelp = Option.builder(HELP_ARGS).longOpt(HELP_ARGS).hasArg().desc(HELP_DESC).build();
1074+
Option opHelp =
1075+
Option.builder(HELP_ARGS).longOpt(HELP_ARGS).hasArg(false).desc(HELP_DESC).build();
10771076
options.addOption(opHelp);
10781077
return options;
10791078
}

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/ExportTsFile.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,15 @@ public static void main(String[] args) throws Exception {
7474

7575
private static void parseParams(String[] args, Options options) {
7676
HelpFormatter hf = new HelpFormatter();
77+
hf.setOptionComparator(null);
7778
CommandLine cli = null;
7879
CommandLineParser cliParser = new DefaultParser();
7980
try {
8081
cli = cliParser.parse(options, args);
82+
if (cli.hasOption(Constants.HELP_ARGS) || args.length == 0) {
83+
hf.printHelp(Constants.SUBSCRIPTION_CLI_PREFIX, options, true);
84+
System.exit(0);
85+
}
8186
if (cli.hasOption(Constants.SQL_DIALECT_ARGS)) {
8287
commonParam.setSqlDialect(cli.getOptionValue(Constants.SQL_DIALECT_ARGS));
8388
}

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/AbstractSubscriptionTsFile.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public static void setSubscriptionSession() throws IoTDBConnectionException {
4747
.build());
4848
commonParam.getTableSubs().open();
4949
} else {
50+
5051
commonParam.setSubscriptionTsFile(new SubscriptionTreeTsFile());
5152
commonParam.setTreeSubs(
5253
new SubscriptionTreeSessionBuilder()

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTableTsFile.java

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.io.File;
3838
import java.io.IOException;
3939
import java.nio.file.Paths;
40-
import java.time.Duration;
4140
import java.util.ArrayList;
4241
import java.util.List;
4342
import java.util.Properties;
@@ -144,31 +143,22 @@ public void consumerPoll(ExecutorService executor, String topicName) {
144143
for (int i = commonParam.getStartIndex(); i < pullTableConsumers.size(); i++) {
145144
SubscriptionTablePullConsumer consumer =
146145
(SubscriptionTablePullConsumer) pullTableConsumers.get(i);
147-
final String consumerGroupId = consumer.getConsumerGroupId();
148146
executor.submit(
149147
new Runnable() {
150148
@Override
151149
public void run() {
152-
int retryCount = 0;
153-
while (true) {
150+
final String consumerGroupId = consumer.getConsumerGroupId();
151+
while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
154152
try {
155-
List<SubscriptionMessage> messages =
156-
consumer.poll(Duration.ofMillis(Constants.POLL_MESSAGE_TIMEOUT));
157-
consumer.commitSync(messages);
158-
if (messages.isEmpty()) {
159-
retryCount++;
160-
if (retryCount >= Constants.MAX_RETRY_TIMES) {
161-
break;
162-
}
163-
}
164-
for (final SubscriptionMessage message : messages) {
165-
SubscriptionTsFileHandler fp = message.getTsFileHandler();
166-
ioTPrinter.println(fp.getFile().getName());
153+
for (final SubscriptionMessage message :
154+
consumer.poll(Constants.POLL_MESSAGE_TIMEOUT)) {
155+
final SubscriptionTsFileHandler handler = message.getTsFileHandler();
156+
ioTPrinter.println(handler.getFile().getName());
167157
try {
168-
fp.moveFile(
158+
handler.moveFile(
169159
Paths.get(
170160
commonParam.getTargetDir() + File.separator + consumerGroupId,
171-
fp.getPath().getFileName().toString()));
161+
handler.getPath().getFileName().toString()));
172162
} catch (IOException e) {
173163
throw new RuntimeException(e);
174164
}

iotdb-client/cli/src/main/java/org/apache/iotdb/tool/tsfile/subscription/SubscriptionTreeTsFile.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.io.File;
3636
import java.io.IOException;
3737
import java.nio.file.Paths;
38-
import java.time.Duration;
3938
import java.util.ArrayList;
4039
import java.util.List;
4140
import java.util.Properties;
@@ -140,32 +139,22 @@ public void consumerPoll(ExecutorService executor, String topicName) {
140139
List<SubscriptionTreePullConsumer> pullTreeConsumers = commonParam.getPullTreeConsumers();
141140
for (int i = commonParam.getStartIndex(); i < pullTreeConsumers.size(); i++) {
142141
SubscriptionTreePullConsumer consumer = commonParam.getPullTreeConsumers().get(i);
142+
final String consumerGroupId = consumer.getConsumerGroupId();
143143
executor.submit(
144144
new Runnable() {
145145
@Override
146146
public void run() {
147-
int retryCount = 0;
148-
while (true) {
147+
while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
149148
try {
150-
List<SubscriptionMessage> messages =
151-
consumer.poll(Duration.ofMillis(Constants.POLL_MESSAGE_TIMEOUT));
152-
consumer.commitSync(messages);
153-
if (messages.isEmpty()) {
154-
retryCount++;
155-
if (retryCount >= Constants.MAX_RETRY_TIMES) {
156-
break;
157-
}
158-
}
159-
for (final SubscriptionMessage message : messages) {
160-
SubscriptionTsFileHandler fp = message.getTsFileHandler();
161-
ioTPrinter.println(fp.getFile().getName());
149+
for (final SubscriptionMessage message :
150+
consumer.poll(Constants.POLL_MESSAGE_TIMEOUT)) {
151+
final SubscriptionTsFileHandler handler = message.getTsFileHandler();
152+
ioTPrinter.println(handler.getFile().getName());
162153
try {
163-
fp.moveFile(
154+
handler.moveFile(
164155
Paths.get(
165-
commonParam.getTargetDir()
166-
+ File.separator
167-
+ consumer.getConsumerGroupId(),
168-
fp.getPath().getFileName().toString()));
156+
commonParam.getTargetDir() + File.separator + consumerGroupId,
157+
handler.getPath().getFileName().toString()));
169158
} catch (IOException e) {
170159
throw new RuntimeException(e);
171160
}
Lines changed: 24 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,20 @@
1818
# under the License.
1919
#
2020

21-
echo ---------------------
22-
echo Start Loading TsFile
23-
echo ---------------------
24-
25-
source "$(dirname "$0")/../conf/iotdb-common.sh"
26-
#get_iotdb_include and checkAllVariables is in iotdb-common.sh
27-
VARS=$(get_iotdb_include "$*")
28-
checkAllVariables
29-
export IOTDB_HOME="${IOTDB_HOME}"
30-
eval set -- "$VARS"
31-
32-
PARAMETERS=$@
33-
34-
IOTDB_CLI_CONF=${IOTDB_HOME}/conf
35-
36-
MAIN_CLASS=org.apache.iotdb.tool.tsfile.ImportTsFile
21+
echo ------------------------------------------
22+
echo Starting IoTDB Client Export Script
23+
echo ------------------------------------------
24+
25+
if [ -z "${IOTDB_INCLUDE}" ]; then
26+
#do nothing
27+
:
28+
elif [ -r "$IOTDB_INCLUDE" ]; then
29+
. "$IOTDB_INCLUDE"
30+
fi
3731

38-
CLASSPATH=""
39-
for f in ${IOTDB_HOME}/lib/*.jar; do
40-
CLASSPATH=${CLASSPATH}":"$f
41-
done
32+
if [ -z "${IOTDB_HOME}" ]; then
33+
export IOTDB_HOME="$(cd "`dirname "$0"`"/..; pwd)"
34+
fi
4235

4336
if [ -n "$JAVA_HOME" ]; then
4437
for java in "$JAVA_HOME"/bin/amd64/java "$JAVA_HOME"/bin/java; do
@@ -51,10 +44,16 @@ else
5144
JAVA=java
5245
fi
5346

54-
set -o noglob
55-
iotdb_cli_params="-Dlogback.configurationFile=${IOTDB_CLI_CONF}/logback-tool.xml"
47+
if [ -z $JAVA ] ; then
48+
echo Unable to find java executable. Check JAVA_HOME and PATH environment variables. > /dev/stderr
49+
exit 1;
50+
fi
51+
52+
for f in ${IOTDB_HOME}/lib/*.jar; do
53+
CLASSPATH=${CLASSPATH}":"$f
54+
done
5655

57-
echo "Starting..."
58-
exec "$JAVA" $iotdb_cli_params -cp "$CLASSPATH" "$MAIN_CLASS" $PARAMETERS
56+
MAIN_CLASS=org.apache.iotdb.tool.tsfile.ExportTsFile
5957

60-
exit $?
58+
"$JAVA" -DIOTDB_HOME=${IOTDB_HOME} -cp "$CLASSPATH" "$MAIN_CLASS" "$@"
59+
exit $?
Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,44 @@
1919

2020
@echo off
2121

22-
@REM You can put your env variable here
23-
@REM set JAVA_HOME=%JAVA_HOME%
22+
title IoTDB Export
2423

25-
title IoTDB Load
24+
echo ````````````````````````````````````````````````
25+
echo Starting IoTDB Client Export Script
26+
echo ````````````````````````````````````````````````
2627

2728
if "%OS%" == "Windows_NT" setlocal
2829

2930
pushd %~dp0..\..
3031
if NOT DEFINED IOTDB_HOME set IOTDB_HOME=%CD%
3132
popd
3233

33-
if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.tsfile.ImportTsFile
34+
if NOT DEFINED MAIN_CLASS set MAIN_CLASS=org.apache.iotdb.tool.tsfile.ExportTsFile
3435
if NOT DEFINED JAVA_HOME goto :err
3536

3637
@REM -----------------------------------------------------------------------------
3738
@REM JVM Opts we'll use in legacy run or installation
3839
set JAVA_OPTS=-ea^
3940
-DIOTDB_HOME="%IOTDB_HOME%"
4041

41-
REM For each jar in the IOTDB_HOME lib directory call append to build the CLASSPATH variable.
42-
if EXIST %IOTDB_HOME%\lib (set CLASSPATH="%IOTDB_HOME%\lib\*") else set CLASSPATH="%IOTDB_HOME%\..\lib\*"
42+
@REM ***** CLASSPATH library setting *****
43+
set CLASSPATH=%CLASSPATH%;"%IOTDB_HOME%\lib\*"
4344

44-
set PARAMETERS=%*
45+
REM -----------------------------------------------------------------------------
4546

46-
echo Starting...
47-
"%JAVA_HOME%\bin\java" %JAVA_OPTS% -cp %CLASSPATH% %MAIN_CLASS% %PARAMETERS%
47+
"%JAVA_HOME%\bin\java" -DIOTDB_HOME="%IOTDB_HOME%" %JAVA_OPTS% -cp %CLASSPATH% %MAIN_CLASS% %*
4848
set ret_code=%ERRORLEVEL%
49+
goto finally
4950

50-
EXIT /B %ret_code%
51+
52+
:err
53+
echo JAVA_HOME environment variable must be set!
54+
set ret_code=1
55+
pause
56+
57+
@REM -----------------------------------------------------------------------------
58+
:finally
59+
60+
ENDLOCAL
61+
62+
EXIT /B %ret_code%

0 commit comments

Comments
 (0)