Skip to content

Commit ab52456

Browse files
authored
[ZEPPELIN-6264] Refactor InfluxDBInterpreter for improved readability and maintainability
### What is this PR for? This PR refactors the `InfluxDBInterpreter.class` to improve code readability, maintainability, and adherence to modern Java practices, without altering its runtime behavior or core logic. - Key changes include: - Renamed getInfluxDBClient() to better reflect its purpose (e.g., getQueryApi()), improving semantic clarity. - Removed unnecessary code - (e.g., InterpreterContext) from methods where they are unused. - Throwing exceptions from methods like open(), close(), cancel(), getFormType(), and getProgress() where exceptions are not thrown. - Extracted long nested logic blocks in `internalInterpret()` into smaller, well-named private methods. - Replaced imperative loops with Stream operations for collection processing. These changes aim to make the codebase more modular, clean by reducing boilerplate code, and approachable for future contributors and reviewers. ### What type of PR is it? Refactoring ### Todos * [ ] - Task ### What is the Jira issue? * [ZEPPELIN-6264](https://issues.apache.org/jira/browse/ZEPPELIN-6264) ### How should this be tested? * No functional changes; existing tests should pass as-is. ### Screenshots (if appropriate) ### Questions: * Does the license files need to update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Closes #5005 from eunhwa99/ZEPPELIN-6264. Signed-off-by: Philipp Dallig <philipp.dallig@gmail.com>
1 parent 1b90f31 commit ab52456

File tree

1 file changed

+66
-53
lines changed

1 file changed

+66
-53
lines changed

influxdb/src/main/java/org/apache/zeppelin/influxdb/InfluxDBInterpreter.java

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
*/
1515
package org.apache.zeppelin.influxdb;
1616

17+
import com.influxdb.query.FluxRecord;
1718
import java.util.Properties;
18-
import java.util.StringJoiner;
1919
import java.util.concurrent.CountDownLatch;
2020
import java.util.concurrent.atomic.AtomicReference;
2121

@@ -24,6 +24,7 @@
2424
import com.influxdb.client.InfluxDBClientFactory;
2525
import com.influxdb.client.InfluxDBClientOptions;
2626
import com.influxdb.client.QueryApi;
27+
import java.util.stream.Collectors;
2728
import org.apache.zeppelin.interpreter.AbstractInterpreter;
2829
import org.apache.zeppelin.interpreter.ZeppelinContext;
2930
import org.slf4j.Logger;
@@ -83,71 +84,83 @@ protected InterpreterResult internalInterpret(String query, InterpreterContext c
8384
LOGGER.debug("Run Flux command '{}'", query);
8485
query = query.trim();
8586

86-
QueryApi queryService = getInfluxDBClient(context);
87+
QueryApi queryService = getQueryApi();
8788

88-
final int[] actualIndex = {-1};
89+
final int[] currentTableIndex = {-1};
8990

9091
AtomicReference<InterpreterResult> resultRef = new AtomicReference<>();
9192
CountDownLatch countDownLatch = new CountDownLatch(1);
9293

93-
StringBuilder result = new StringBuilder();
94+
StringBuilder resultBuilder = new StringBuilder();
9495
queryService.query(
9596
query,
9697

9798
//process record
98-
(cancellable, fluxRecord) -> {
99-
100-
Integer tableIndex = fluxRecord.getTable();
101-
if (actualIndex[0] != tableIndex) {
102-
result.append(NEWLINE);
103-
result.append(TABLE_MAGIC_TAG);
104-
actualIndex[0] = tableIndex;
105-
106-
//add column names to table header
107-
StringJoiner joiner = new StringJoiner(TAB);
108-
fluxRecord.getValues().keySet().forEach(c -> joiner.add(replaceReservedChars(c)));
109-
result.append(joiner.toString());
110-
result.append(NEWLINE);
111-
}
112-
113-
StringJoiner rowsJoiner = new StringJoiner(TAB);
114-
for (Object value : fluxRecord.getValues().values()) {
115-
if (value == null) {
116-
value = EMPTY_COLUMN_VALUE;
117-
}
118-
rowsJoiner.add(replaceReservedChars(value.toString()));
119-
}
120-
result.append(rowsJoiner.toString());
121-
result.append(NEWLINE);
122-
},
123-
124-
throwable -> {
125-
126-
LOGGER.error(throwable.getMessage(), throwable);
127-
resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
128-
throwable.getMessage()));
129-
130-
countDownLatch.countDown();
131-
132-
}, () -> {
133-
//on complete
134-
InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
135-
intpResult.add(result.toString());
136-
resultRef.set(intpResult);
137-
countDownLatch.countDown();
138-
}
99+
(cancellable, fluxRecord) -> handleRecord(fluxRecord, currentTableIndex, resultBuilder),
100+
throwable -> handleError(throwable, resultRef, countDownLatch),
101+
() -> handleComplete(resultBuilder, resultRef, countDownLatch)
139102
);
103+
104+
awaitLatch(countDownLatch);
105+
106+
return resultRef.get();
107+
}
108+
109+
private void handleRecord(FluxRecord fluxRecord, int[] currentTableIndex,
110+
StringBuilder resultBuilder) {
111+
Integer tableIndex = fluxRecord.getTable();
112+
if (currentTableIndex[0] != tableIndex) {
113+
appendTableHeader(fluxRecord, resultBuilder);
114+
currentTableIndex[0] = tableIndex;
115+
}
116+
117+
appendTableRow(fluxRecord, resultBuilder);
118+
}
119+
120+
private void appendTableHeader(FluxRecord fluxRecord, StringBuilder resultBuilder) {
121+
resultBuilder.append(NEWLINE).append(TABLE_MAGIC_TAG);
122+
String headerLine = fluxRecord.getValues().keySet().stream()
123+
.map(this::replaceReservedChars)
124+
.collect(Collectors.joining(TAB));
125+
resultBuilder.append(headerLine).append(NEWLINE);
126+
}
127+
128+
private void appendTableRow(FluxRecord fluxRecord, StringBuilder resultBuilder) {
129+
String rowLine = fluxRecord.getValues().values().stream()
130+
.map(v -> v == null ? EMPTY_COLUMN_VALUE : v.toString())
131+
.map(this::replaceReservedChars)
132+
.collect(Collectors.joining(TAB));
133+
resultBuilder.append(rowLine).append(NEWLINE);
134+
}
135+
136+
private static void handleError(Throwable throwable, AtomicReference<InterpreterResult> resultRef,
137+
CountDownLatch countDownLatch) {
138+
LOGGER.error(throwable.getMessage(), throwable);
139+
resultRef.set(new InterpreterResult(InterpreterResult.Code.ERROR,
140+
throwable.getMessage()));
141+
142+
countDownLatch.countDown();
143+
}
144+
145+
private static void handleComplete(StringBuilder resultBuilder,
146+
AtomicReference<InterpreterResult> resultRef,
147+
CountDownLatch countDownLatch) {
148+
InterpreterResult intpResult = new InterpreterResult(InterpreterResult.Code.SUCCESS);
149+
intpResult.add(resultBuilder.toString());
150+
resultRef.set(intpResult);
151+
countDownLatch.countDown();
152+
}
153+
154+
private static void awaitLatch(CountDownLatch countDownLatch) throws InterpreterException {
140155
try {
141156
countDownLatch.await();
142157
} catch (InterruptedException e) {
143158
throw new InterpreterException(e);
144159
}
145-
146-
return resultRef.get();
147160
}
148161

149162

150-
private QueryApi getInfluxDBClient(InterpreterContext context) {
163+
private QueryApi getQueryApi() {
151164
if (queryApi == null) {
152165
queryApi = this.client.getQueryApi();
153166
}
@@ -156,7 +169,7 @@ private QueryApi getInfluxDBClient(InterpreterContext context) {
156169

157170

158171
@Override
159-
public void open() throws InterpreterException {
172+
public void open() {
160173

161174
if (this.client == null) {
162175
InfluxDBClientOptions opt = InfluxDBClientOptions.builder()
@@ -172,25 +185,25 @@ public void open() throws InterpreterException {
172185
}
173186

174187
@Override
175-
public void close() throws InterpreterException {
188+
public void close() {
176189
if (this.client != null) {
177190
this.client.close();
178191
this.client = null;
179192
}
180193
}
181194

182195
@Override
183-
public void cancel(InterpreterContext context) throws InterpreterException {
196+
public void cancel(InterpreterContext context) {
184197

185198
}
186199

187200
@Override
188-
public FormType getFormType() throws InterpreterException {
201+
public FormType getFormType() {
189202
return FormType.SIMPLE;
190203
}
191204

192205
@Override
193-
public int getProgress(InterpreterContext context) throws InterpreterException {
206+
public int getProgress(InterpreterContext context) {
194207
return 0;
195208
}
196209

0 commit comments

Comments
 (0)