Skip to content

Commit 112568c

Browse files
committed
pipe: Add Object data export support for Pipe
1 parent 0d64d51 commit 112568c

29 files changed

+3466
-129
lines changed

iotdb-core/datanode/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,11 @@
292292
<groupId>at.yawk.lz4</groupId>
293293
<artifactId>lz4-java</artifactId>
294294
</dependency>
295+
<dependency>
296+
<groupId>com.jcraft</groupId>
297+
<artifactId>jsch</artifactId>
298+
<version>0.1.55</version>
299+
</dependency>
295300
<dependency>
296301
<groupId>junit</groupId>
297302
<artifactId>junit</artifactId>

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink;
3131
import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink;
3232
import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink;
33+
import org.apache.iotdb.db.pipe.sink.protocol.tsfile.TsFileSink;
3334
import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink;
3435
import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink;
3536

@@ -97,5 +98,6 @@ protected void initConstructors() {
9798
pluginConstructors.put(
9899
BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(),
99100
PipeConsensusAsyncSink::new);
101+
pluginConstructors.put(BuiltinPipePlugin.TSFILE_SINK.getPipePluginName(), TsFileSink::new);
100102
}
101103
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.pipe.event.common.tablet;
21+
22+
import org.apache.tsfile.utils.Binary;
23+
import org.apache.tsfile.utils.BytesUtils;
24+
import org.apache.tsfile.utils.Pair;
25+
26+
import java.nio.charset.StandardCharsets;
27+
import java.util.Objects;
28+
29+
/**
30+
* Utility class for parsing Object type data (BLOB). Object data storage format: - First 8 bytes
31+
* (Long): file offset + content length - Following bytes: file relative path (UTF-8 encoded)
32+
*/
33+
public class ObjectDataParser {
34+
35+
/** Long type byte length */
36+
private static final int LONG_BYTES = Long.BYTES;
37+
38+
/**
39+
* Parse Binary object to extract Object data information
40+
*
41+
* @param binary Binary object
42+
* @return Pair(offsetPlusLength, relativePath), null if parse failed
43+
*/
44+
public static Pair<Long, String> parse(Binary binary) {
45+
if (Objects.isNull(binary)) {
46+
return null;
47+
}
48+
49+
byte[] bytes = binary.getValues();
50+
return parse(bytes);
51+
}
52+
53+
/**
54+
* Parse byte array to extract Object data information
55+
*
56+
* @param bytes byte array
57+
* @return Pair(offsetPlusLength, relativePath), null if parse failed
58+
*/
59+
public static Pair<Long, String> parse(byte[] bytes) {
60+
if (Objects.isNull(bytes) || bytes.length < LONG_BYTES) {
61+
return null;
62+
}
63+
64+
try {
65+
// Parse first 8 bytes: length
66+
long offsetPlusLength = BytesUtils.bytesToLong(bytes, 0);
67+
68+
// Parse following bytes: file path
69+
int pathLength = bytes.length - LONG_BYTES;
70+
if (pathLength <= 0) {
71+
return null;
72+
}
73+
74+
byte[] pathBytes = new byte[pathLength];
75+
System.arraycopy(bytes, LONG_BYTES, pathBytes, 0, pathLength);
76+
String relativePath = new String(pathBytes, StandardCharsets.UTF_8);
77+
78+
return new Pair<>(offsetPlusLength, relativePath);
79+
} catch (Exception e) {
80+
return null;
81+
}
82+
}
83+
84+
/**
85+
* Extract file length information from byte array
86+
*
87+
* @param bytes byte array
88+
* @return offset + content.length, -1 if extraction failed
89+
*/
90+
public static long extractLength(byte[] bytes) {
91+
if (Objects.isNull(bytes) || bytes.length < LONG_BYTES) {
92+
return -1;
93+
}
94+
95+
try {
96+
return BytesUtils.bytesToLong(bytes, 0);
97+
} catch (Exception e) {
98+
return -1;
99+
}
100+
}
101+
102+
/**
103+
* Extract file length information from Binary object
104+
*
105+
* @param binary Binary object
106+
* @return offset + content.length, -1 if extraction failed
107+
*/
108+
public static long extractLength(Binary binary) {
109+
if (Objects.isNull(binary)) {
110+
return -1;
111+
}
112+
return extractLength(binary.getValues());
113+
}
114+
115+
/**
116+
* Extract file relative path from byte array
117+
*
118+
* @param bytes byte array
119+
* @return file relative path, null if extraction failed
120+
*/
121+
public static String extractPath(byte[] bytes) {
122+
if (Objects.isNull(bytes) || bytes.length <= LONG_BYTES) {
123+
return null;
124+
}
125+
126+
try {
127+
int pathLength = bytes.length - LONG_BYTES;
128+
byte[] pathBytes = new byte[pathLength];
129+
System.arraycopy(bytes, LONG_BYTES, pathBytes, 0, pathLength);
130+
return new String(pathBytes, StandardCharsets.UTF_8);
131+
} catch (Exception e) {
132+
return null;
133+
}
134+
}
135+
136+
/**
137+
* Extract file relative path from Binary object
138+
*
139+
* @param binary Binary object
140+
* @return file relative path, null if extraction failed
141+
*/
142+
public static String extractPath(Binary binary) {
143+
if (Objects.isNull(binary)) {
144+
return null;
145+
}
146+
return extractPath(binary.getValues());
147+
}
148+
149+
/**
150+
* Validate if byte array is valid Object data format
151+
*
152+
* @param bytes byte array
153+
* @return true if format is valid, false otherwise
154+
*/
155+
public static boolean isValidObjectData(byte[] bytes) {
156+
if (Objects.isNull(bytes) || bytes.length <= LONG_BYTES) {
157+
return false;
158+
}
159+
160+
try {
161+
// Try to parse length
162+
long length = BytesUtils.bytesToLong(bytes, 0);
163+
if (length < 0) {
164+
return false;
165+
}
166+
167+
// Try to parse path
168+
int pathLength = bytes.length - LONG_BYTES;
169+
byte[] pathBytes = new byte[pathLength];
170+
System.arraycopy(bytes, LONG_BYTES, pathBytes, 0, pathLength);
171+
String path = new String(pathBytes, StandardCharsets.UTF_8);
172+
173+
return path != null && !path.isEmpty();
174+
} catch (Exception e) {
175+
return false;
176+
}
177+
}
178+
179+
/**
180+
* Validate if Binary object is valid Object data format
181+
*
182+
* @param binary Binary object
183+
* @return true if format is valid, false otherwise
184+
*/
185+
public static boolean isValidObjectData(Binary binary) {
186+
if (Objects.isNull(binary)) {
187+
return false;
188+
}
189+
return isValidObjectData(binary.getValues());
190+
}
191+
}

0 commit comments

Comments
 (0)