Skip to content

Commit 484d25d

Browse files
committed
coverage
1 parent 9ecb06d commit 484d25d

File tree

1 file changed

+101
-0
lines changed

1 file changed

+101
-0
lines changed

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeSinkTest.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,27 @@
2121

2222
import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin;
2323
import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant;
24+
import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
25+
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskSinkRuntimeEnvironment;
26+
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2427
import org.apache.iotdb.db.pipe.sink.protocol.legacy.IoTDBLegacyPipeSink;
28+
import org.apache.iotdb.db.pipe.sink.protocol.opcua.OpcUaSink;
2529
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
2630
import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
2731
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
2832
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
2933

34+
import org.apache.tsfile.enums.TSDataType;
35+
import org.apache.tsfile.write.record.Tablet;
36+
import org.apache.tsfile.write.schema.IMeasurementSchema;
37+
import org.apache.tsfile.write.schema.MeasurementSchema;
3038
import org.junit.Assert;
3139
import org.junit.Test;
3240

41+
import java.security.SecureRandom;
42+
import java.util.Arrays;
3343
import java.util.HashMap;
44+
import java.util.List;
3445

3546
public class PipeSinkTest {
3647

@@ -92,4 +103,94 @@ public void testIoTDBThriftAsyncConnectorToOthers() {
92103
Assert.fail();
93104
}
94105
}
106+
107+
@Test
108+
public void testOpcUaSink() {
109+
final List<IMeasurementSchema> schemaList =
110+
Arrays.asList(
111+
new MeasurementSchema("s1", TSDataType.INT64),
112+
new MeasurementSchema("s2", TSDataType.INT64));
113+
114+
final Tablet tablet = new Tablet("root.db.d1.vector6", schemaList, 100);
115+
116+
long timestamp = System.currentTimeMillis();
117+
for (long row = 0; row < 100; row++) {
118+
final int rowSize = tablet.getRowSize();
119+
tablet.addTimestamp(rowSize, timestamp);
120+
for (int i = 0; i < 2; i++) {
121+
tablet.addValue(
122+
schemaList.get(i).getMeasurementName(), rowSize, new SecureRandom().nextLong());
123+
}
124+
timestamp++;
125+
}
126+
127+
final List<IMeasurementSchema> opcSchemaList =
128+
Arrays.asList(
129+
new MeasurementSchema("value1", TSDataType.INT64),
130+
new MeasurementSchema("quality1", TSDataType.BOOLEAN));
131+
final Tablet qualityTablet = new Tablet("root.db.d1.vector6.s3", opcSchemaList, 100);
132+
133+
timestamp = System.currentTimeMillis();
134+
for (long row = 0; row < 100; row++) {
135+
final int rowSize = qualityTablet.getRowSize();
136+
qualityTablet.addTimestamp(rowSize, timestamp);
137+
qualityTablet.addValue(
138+
opcSchemaList.get(0).getMeasurementName(), rowSize, new SecureRandom().nextLong());
139+
qualityTablet.addValue(opcSchemaList.get(1).getMeasurementName(), rowSize, true);
140+
timestamp++;
141+
}
142+
143+
try (final OpcUaSink qualityOPC = new OpcUaSink();
144+
final OpcUaSink normalOPC = new OpcUaSink()) {
145+
final PipeTaskRuntimeConfiguration configuration =
146+
new PipeTaskRuntimeConfiguration(new PipeTaskSinkRuntimeEnvironment("temp", 0, 1));
147+
qualityOPC.customize(
148+
new PipeParameters(
149+
new HashMap<String, String>() {
150+
{
151+
put(
152+
PipeSinkConstant.CONNECTOR_KEY,
153+
BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName());
154+
put(PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY, "true");
155+
put(PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY, "value1");
156+
put(PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY, "quality1");
157+
}
158+
}),
159+
configuration);
160+
normalOPC.customize(
161+
new PipeParameters(
162+
new HashMap<String, String>() {
163+
{
164+
put(
165+
PipeSinkConstant.CONNECTOR_KEY,
166+
BuiltinPipePlugin.OPC_UA_SINK.getPipePluginName());
167+
}
168+
}),
169+
configuration);
170+
final PipeRawTabletInsertionEvent event =
171+
new PipeRawTabletInsertionEvent(
172+
false, "root.db", "db", "root.db", tablet, false, "pipe", 0L, null, null, false);
173+
event.increaseReferenceCount("");
174+
normalOPC.transfer(event);
175+
Assert.assertThrows(UnsupportedOperationException.class, () -> qualityOPC.transfer(event));
176+
event.decreaseReferenceCount("", false);
177+
178+
qualityOPC.transfer(
179+
new PipeRawTabletInsertionEvent(
180+
false,
181+
"root.db",
182+
"db",
183+
"root.db",
184+
qualityTablet,
185+
false,
186+
"pipe",
187+
0L,
188+
null,
189+
null,
190+
false));
191+
192+
} catch (Exception e) {
193+
Assert.fail();
194+
}
195+
}
95196
}

0 commit comments

Comments
 (0)