Skip to content

Commit ef67611

Browse files
committed
trilog
1 parent f78e6d7 commit ef67611

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,7 @@ public void run(OpcUaClient client) throws Exception {
7676
}
7777

7878
// Only support tree model & client-server
79-
public void transfer(final Tablet tablet, final OpcUaSink sink) throws UaException {
80-
81-
}
79+
public void transfer(final Tablet tablet, final OpcUaSink sink) throws UaException {}
8280

8381
/////////////////////////////// Getter ///////////////////////////////
8482

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,18 @@ public void shutdown() {
9898
public void transfer(final Tablet tablet, final boolean isTableModel, final OpcUaSink sink)
9999
throws UaException {
100100
if (sink.isClientServerModel()) {
101-
transferTabletForClientServerModel(tablet, isTableModel, sink);
101+
transferTabletForClientServerModel(
102+
tablet, isTableModel, sink, this::transferTabletRowForClientServerModel);
102103
} else {
103104
transferTabletForPubSubModel(tablet, isTableModel, sink);
104105
}
105106
}
106107

107-
private void transferTabletForClientServerModel(
108-
final Tablet tablet, final boolean isTableModel, final OpcUaSink sink) {
108+
public static void transferTabletForClientServerModel(
109+
final Tablet tablet,
110+
final boolean isTableModel,
111+
final OpcUaSink sink,
112+
final TabletRowConsumer consumer) {
109113
final List<IMeasurementSchema> schemas = tablet.getSchemas();
110114
final List<IMeasurementSchema> newSchemas = new ArrayList<>();
111115
if (!isTableModel) {
@@ -126,8 +130,7 @@ private void transferTabletForClientServerModel(
126130
}
127131
}
128132

129-
transferTabletRowForClientServerModel(
130-
tablet.getDeviceId().split("\\."), newSchemas, timestamps, values, sink);
133+
consumer.accept(tablet.getDeviceId().split("\\."), newSchemas, timestamps, values, sink);
131134
} else {
132135
new PipeTableModelTabletEventSorter(tablet).sortByTimestampIfNecessary();
133136

@@ -150,7 +153,7 @@ private void transferTabletForClientServerModel(
150153
}
151154

152155
final int finalI = i;
153-
transferTabletRowForClientServerModel(
156+
consumer.accept(
154157
folderSegments,
155158
newSchemas,
156159
Collections.singletonList(tablet.getTimestamp(i)),
@@ -167,6 +170,16 @@ private void transferTabletForClientServerModel(
167170
}
168171
}
169172

173+
@FunctionalInterface
174+
public interface TabletRowConsumer {
175+
void accept(
176+
final String[] segments,
177+
final List<IMeasurementSchema> measurementSchemas,
178+
final List<Long> timestamps,
179+
final List<Object> values,
180+
final OpcUaSink sink);
181+
}
182+
170183
private void transferTabletRowForClientServerModel(
171184
final String[] segments,
172185
final List<IMeasurementSchema> measurementSchemas,

0 commit comments

Comments
 (0)