Skip to content

Commit 10bae98

Browse files
author
du
authored
[Spark] Implement new compaction strategy (lakesoul-io#585)
* new compaction Signed-off-by: fphantam <dongf@dmetasoul.com> * remove debug info and change some params Signed-off-by: fphantam <dongf@dmetasoul.com> * remove debug code Signed-off-by: fphantam <dongf@dmetasoul.com> * new compaction Signed-off-by: fphantam <dongf@dmetasoul.com> * add License Signed-off-by: fphantam <dongf@dmetasoul.com> * not use DynamicBucket if table hashBucketNum not change in new compaction task Signed-off-by: fphantam <dongf@dmetasoul.com> * remove useless code Signed-off-by: fphantam <dongf@dmetasoul.com> * fix new compaction bug Signed-off-by: fphantam <dongf@dmetasoul.com> * add CompressDataFileInfo for midle file info Signed-off-by: fphantam <dongf@dmetasoul.com> --------- Signed-off-by: fphantam <dongf@dmetasoul.com>
1 parent d9f5dd3 commit 10bae98

File tree

22 files changed

+2630
-19
lines changed

22 files changed

+2630
-19
lines changed

lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBFactory.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,7 @@
44

55
package com.dmetasoul.lakesoul.meta;
66

7-
import com.dmetasoul.lakesoul.meta.dao.DataCommitInfoDao;
8-
import com.dmetasoul.lakesoul.meta.dao.NamespaceDao;
9-
import com.dmetasoul.lakesoul.meta.dao.PartitionInfoDao;
10-
import com.dmetasoul.lakesoul.meta.dao.TableInfoDao;
11-
import com.dmetasoul.lakesoul.meta.dao.TablePathIdDao;
12-
import com.dmetasoul.lakesoul.meta.dao.TableNameIdDao;
7+
import com.dmetasoul.lakesoul.meta.dao.*;
138

149
public class DBFactory {
1510

@@ -19,6 +14,7 @@ public class DBFactory {
1914
private static volatile TablePathIdDao tablePathIdDao;
2015
private static volatile DataCommitInfoDao dataCommitInfoDao;
2116
private static volatile PartitionInfoDao partitionInfoDao;
17+
private static volatile DiscardCompressedFileDao discardCompressedFileDao;
2218

2319
private DBFactory(){}
2420

@@ -87,4 +83,15 @@ public static PartitionInfoDao getPartitionInfoDao() {
8783
}
8884
return partitionInfoDao;
8985
}
86+
87+
public static DiscardCompressedFileDao getDiscardCompressedFileDao() {
88+
if (discardCompressedFileDao == null) {
89+
synchronized (DiscardCompressedFileDao.class) {
90+
if (discardCompressedFileDao == null) {
91+
discardCompressedFileDao = new DiscardCompressedFileDao();
92+
}
93+
}
94+
}
95+
return discardCompressedFileDao;
96+
}
9097
}

lakesoul-common/src/main/java/com/dmetasoul/lakesoul/meta/DBManager.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class DBManager {
3030
private final TablePathIdDao tablePathIdDao;
3131
private final DataCommitInfoDao dataCommitInfoDao;
3232
private final PartitionInfoDao partitionInfoDao;
33+
private final DiscardCompressedFileDao discardCompressedFileDao;
3334

3435
public DBManager() {
3536
namespaceDao = DBFactory.getNamespaceDao();
@@ -38,6 +39,7 @@ public DBManager() {
3839
tablePathIdDao = DBFactory.getTablePathIdDao();
3940
dataCommitInfoDao = DBFactory.getDataCommitInfoDao();
4041
partitionInfoDao = DBFactory.getPartitionInfoDao();
42+
discardCompressedFileDao = DBFactory.getDiscardCompressedFileDao();
4143
}
4244

4345
public boolean isNamespaceExists(String table_namespace) {
@@ -943,6 +945,42 @@ public void deleteNamespace(String namespace) {
943945
namespaceDao.deleteByNamespace(namespace);
944946
}
945947

948+
public void insertDiscardCompressedFile(DiscardCompressedFileInfo discardCompressedFileInfo) {
949+
discardCompressedFileDao.insert(discardCompressedFileInfo);
950+
}
951+
952+
public void batchInsertDiscardCompressedFile(List<DiscardCompressedFileInfo> discardCompressedFileInfoList) {
953+
discardCompressedFileDao.batchInsert(discardCompressedFileInfoList);
954+
}
955+
956+
public DiscardCompressedFileInfo getDiscardCompressedFileByFilePath(String filePath) {
957+
return discardCompressedFileDao.findByFilePath(filePath);
958+
}
959+
960+
public void deleteDiscardCompressedFileByFilePath(String filePath) {
961+
discardCompressedFileDao.deleteByFilePath(filePath);
962+
}
963+
964+
public void deleteDiscardCompressedFileByByFilterCondition(String tablePath, String partition, long timestamp) {
965+
discardCompressedFileDao.deleteDiscardCompressedFileByFilterCondition(tablePath, partition, timestamp);
966+
}
967+
968+
public List<DiscardCompressedFileInfo> getOutOfDateDiscardCompressedFile(long timestamp) {
969+
return discardCompressedFileDao.getDiscardCompressedFileBeforeTimestamp(timestamp);
970+
}
971+
972+
public List<DiscardCompressedFileInfo> getDiscardCompressedFileByFilterCondition(String tablePath, String partition, long timestamp) {
973+
return discardCompressedFileDao.getDiscardCompressedFileByFilterCondition(tablePath, partition, timestamp);
974+
}
975+
976+
public List<DiscardCompressedFileInfo> getAllDiscardCompressedFile() {
977+
return discardCompressedFileDao.listAllDiscardCompressedFile();
978+
}
979+
980+
public void cleanMetaDiscardCompressedFile() {
981+
discardCompressedFileDao.clean();
982+
}
983+
946984
// just for test
947985
public void cleanMeta() {
948986
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
// SPDX-FileCopyrightText: 2023 LakeSoul Contributors
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
package com.dmetasoul.lakesoul.meta.dao;
6+
7+
import com.dmetasoul.lakesoul.meta.DBConnector;
8+
import com.dmetasoul.lakesoul.meta.entity.DiscardCompressedFileInfo;
9+
import com.dmetasoul.lakesoul.meta.entity.JniWrapper;
10+
import com.dmetasoul.lakesoul.meta.jnr.NativeMetadataJavaClient;
11+
import com.dmetasoul.lakesoul.meta.jnr.NativeUtils;
12+
13+
import java.sql.Connection;
14+
import java.sql.PreparedStatement;
15+
import java.sql.ResultSet;
16+
import java.sql.SQLException;
17+
import java.util.ArrayList;
18+
import java.util.Arrays;
19+
import java.util.Collections;
20+
import java.util.List;
21+
22+
public class DiscardCompressedFileDao {
23+
24+
public DiscardCompressedFileInfo findByFilePath(String filePath) {
25+
if (NativeUtils.NATIVE_METADATA_QUERY_ENABLED) {
26+
JniWrapper jniWrapper = NativeMetadataJavaClient.query(
27+
NativeUtils.CodedDaoType.SelectDiscardCompressedFileInfoByFilePath,
28+
Collections.singletonList(filePath));
29+
if (jniWrapper == null) return null;
30+
List<DiscardCompressedFileInfo> discardCompressedFileInfoList = jniWrapper.getDiscardCompressedFileInfoList();
31+
return discardCompressedFileInfoList.isEmpty() ? null : discardCompressedFileInfoList.get(0);
32+
}
33+
Connection conn = null;
34+
PreparedStatement pstmt = null;
35+
ResultSet rs = null;
36+
String sql = String.format("select file_path, table_path, partition_desc, timestamp, t_date "
37+
+ "from discard_compressed_file_info where file_path = '%s'", filePath);
38+
DiscardCompressedFileInfo discardCompressedFileInfo = null;
39+
try {
40+
conn = DBConnector.getConn();
41+
pstmt = conn.prepareStatement(sql);
42+
rs = pstmt.executeQuery();
43+
while (rs.next()) {
44+
discardCompressedFileInfo = discardCompressedFileInfoFromResultSet(rs);
45+
}
46+
} catch (SQLException e) {
47+
throw new RuntimeException(e);
48+
} finally {
49+
DBConnector.closeConn(rs, pstmt, conn);
50+
}
51+
return discardCompressedFileInfo;
52+
}
53+
54+
public List<DiscardCompressedFileInfo> listAllDiscardCompressedFile() {
55+
if (NativeUtils.NATIVE_METADATA_QUERY_ENABLED) {
56+
JniWrapper jniWrapper = NativeMetadataJavaClient.query(
57+
NativeUtils.CodedDaoType.ListAllDiscardCompressedFileInfo,
58+
Collections.emptyList());
59+
if (jniWrapper == null) return null;
60+
return jniWrapper.getDiscardCompressedFileInfoList();
61+
}
62+
Connection conn = null;
63+
PreparedStatement pstmt = null;
64+
ResultSet rs = null;
65+
String sql = "select file_path, table_path, partition_desc, timestamp, t_date from discard_compressed_file_info";
66+
List<DiscardCompressedFileInfo> list = new ArrayList<>();
67+
try {
68+
conn = DBConnector.getConn();
69+
pstmt = conn.prepareStatement(sql);
70+
rs = pstmt.executeQuery();
71+
while (rs.next()) {
72+
list.add(discardCompressedFileInfoFromResultSet(rs));
73+
}
74+
} catch (SQLException e) {
75+
throw new RuntimeException(e);
76+
} finally {
77+
DBConnector.closeConn(rs, pstmt, conn);
78+
}
79+
return list;
80+
}
81+
82+
public List<DiscardCompressedFileInfo> getDiscardCompressedFileBeforeTimestamp(long timestamp) {
83+
if (NativeUtils.NATIVE_METADATA_QUERY_ENABLED) {
84+
JniWrapper jniWrapper = NativeMetadataJavaClient.query(
85+
NativeUtils.CodedDaoType.ListDiscardCompressedFileInfoBeforeTimestamp,
86+
Collections.singletonList(Long.toString(timestamp)));
87+
if (jniWrapper == null) return null;
88+
return jniWrapper.getDiscardCompressedFileInfoList();
89+
}
90+
Connection conn = null;
91+
PreparedStatement pstmt = null;
92+
ResultSet rs = null;
93+
String sql = String.format("select file_path, table_path, partition_desc, timestamp, t_date "
94+
+ "from discard_compressed_file_info where timestamp < %s", timestamp);
95+
List<DiscardCompressedFileInfo> list = new ArrayList<>();
96+
try {
97+
conn = DBConnector.getConn();
98+
pstmt = conn.prepareStatement(sql);
99+
rs = pstmt.executeQuery();
100+
while (rs.next()) {
101+
list.add(discardCompressedFileInfoFromResultSet(rs));
102+
}
103+
} catch (SQLException e) {
104+
throw new RuntimeException(e);
105+
} finally {
106+
DBConnector.closeConn(rs, pstmt, conn);
107+
}
108+
return list;
109+
}
110+
111+
public List<DiscardCompressedFileInfo> getDiscardCompressedFileByFilterCondition(String tablePath, String partition, long timestamp) {
112+
if (NativeUtils.NATIVE_METADATA_QUERY_ENABLED) {
113+
JniWrapper jniWrapper = NativeMetadataJavaClient.query(
114+
NativeUtils.CodedDaoType.ListDiscardCompressedFileByFilterCondition,
115+
Arrays.asList(tablePath, partition, Long.toString(timestamp)));
116+
if (jniWrapper == null) return null;
117+
return jniWrapper.getDiscardCompressedFileInfoList();
118+
}
119+
Connection conn = null;
120+
PreparedStatement pstmt = null;
121+
ResultSet rs = null;
122+
String sql = String.format("select file_path, table_path, partition_desc, timestamp, t_date "
123+
+ "from discard_compressed_file_info where table_path = '%s' and partition_desc = '%s' and timestamp < %s",
124+
timestamp, partition, timestamp);
125+
List<DiscardCompressedFileInfo> list = new ArrayList<>();
126+
try {
127+
conn = DBConnector.getConn();
128+
pstmt = conn.prepareStatement(sql);
129+
rs = pstmt.executeQuery();
130+
while (rs.next()) {
131+
list.add(discardCompressedFileInfoFromResultSet(rs));
132+
}
133+
} catch (SQLException e) {
134+
throw new RuntimeException(e);
135+
} finally {
136+
DBConnector.closeConn(rs, pstmt, conn);
137+
}
138+
return list;
139+
}
140+
141+
public void insert(DiscardCompressedFileInfo discardCompressedFileInfo) {
142+
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
143+
Integer count = NativeMetadataJavaClient.insert(
144+
NativeUtils.CodedDaoType.InsertDiscardCompressedFileInfo,
145+
JniWrapper.newBuilder().addDiscardCompressedFileInfo(discardCompressedFileInfo).build());
146+
return;
147+
}
148+
Connection conn = null;
149+
PreparedStatement pstmt = null;
150+
try {
151+
conn = DBConnector.getConn();
152+
pstmt = conn.prepareStatement("insert into discard_compressed_file_info (file_path, table_path, partition_desc, timestamp, t_date) values (?, ?, ?, ?, ?)");
153+
dataCommitInsert(pstmt, discardCompressedFileInfo);
154+
} catch (SQLException e) {
155+
throw new RuntimeException(e);
156+
} finally {
157+
DBConnector.closeConn(pstmt, conn);
158+
}
159+
}
160+
161+
public boolean batchInsert(List<DiscardCompressedFileInfo> discardCompressedFileInfoList) {
162+
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
163+
Integer count = NativeMetadataJavaClient.insert(
164+
NativeUtils.CodedDaoType.TransactionInsertDiscardCompressedFile,
165+
JniWrapper.newBuilder().addAllDiscardCompressedFileInfo(discardCompressedFileInfoList).build());
166+
return count > 0;
167+
}
168+
Connection conn = null;
169+
PreparedStatement pstmt = null;
170+
boolean result = true;
171+
try {
172+
conn = DBConnector.getConn();
173+
pstmt = conn.prepareStatement("insert into discard_compressed_file_info (file_path, table_path, partition_desc, timestamp, t_date) values (?, ?, ?, ?, ?)");
174+
conn.setAutoCommit(false);
175+
for (DiscardCompressedFileInfo discardCompressedFileInfo : discardCompressedFileInfoList) {
176+
dataCommitInsert(pstmt, discardCompressedFileInfo);
177+
}
178+
conn.commit();
179+
} catch (SQLException e) {
180+
try {
181+
if (conn != null) {
182+
conn.rollback();
183+
}
184+
} catch (SQLException ex) {
185+
ex.printStackTrace();
186+
}
187+
throw new RuntimeException(e);
188+
} finally {
189+
DBConnector.closeConn(pstmt, conn);
190+
}
191+
return result;
192+
}
193+
194+
private void dataCommitInsert(PreparedStatement pstmt, DiscardCompressedFileInfo discardCompressedFileInfo)
195+
throws SQLException {
196+
pstmt.setString(1, discardCompressedFileInfo.getFilePath());
197+
pstmt.setString(2, discardCompressedFileInfo.getTablePath());
198+
pstmt.setString(3, discardCompressedFileInfo.getPartitionDesc());
199+
pstmt.setLong(4, discardCompressedFileInfo.getTimestamp());
200+
pstmt.setString(5, discardCompressedFileInfo.getTDate());
201+
pstmt.execute();
202+
}
203+
204+
public void deleteByFilePath(String filePath) {
205+
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
206+
Integer count = NativeMetadataJavaClient.update(
207+
NativeUtils.CodedDaoType.DeleteDiscardCompressedFileInfoByFilePath,
208+
Collections.singletonList(filePath));
209+
return;
210+
}
211+
Connection conn = null;
212+
PreparedStatement pstmt = null;
213+
String sql = String.format("delete from discard_compressed_file_info where file_path = '%s' ", filePath);
214+
try {
215+
conn = DBConnector.getConn();
216+
pstmt = conn.prepareStatement(sql);
217+
pstmt.execute();
218+
} catch (SQLException e) {
219+
throw new RuntimeException(e);
220+
} finally {
221+
DBConnector.closeConn(pstmt, conn);
222+
}
223+
}
224+
225+
public void deleteDiscardCompressedFileByFilterCondition(String tablePath, String partition, long timestamp) {
226+
if (NativeUtils.NATIVE_METADATA_UPDATE_ENABLED) {
227+
Integer count = NativeMetadataJavaClient.update(
228+
NativeUtils.CodedDaoType.DeleteDiscardCompressedFileByFilterCondition,
229+
Arrays.asList(tablePath, partition, Long.toString(timestamp)));
230+
return;
231+
}
232+
Connection conn = null;
233+
PreparedStatement pstmt = null;
234+
String sql = String.format("delete from discard_compressed_file_info where table_path = '%s' "
235+
+ "and partition_desc = '%s' and timestamp < %s", tablePath, partition, timestamp);
236+
try {
237+
conn = DBConnector.getConn();
238+
pstmt = conn.prepareStatement(sql);
239+
pstmt.execute();
240+
} catch (SQLException e) {
241+
throw new RuntimeException(e);
242+
} finally {
243+
DBConnector.closeConn(pstmt, conn);
244+
}
245+
}
246+
247+
public void clean() {
248+
Connection conn = null;
249+
PreparedStatement pstmt = null;
250+
String sql = "delete from discard_compressed_file_info;";
251+
try {
252+
conn = DBConnector.getConn();
253+
pstmt = conn.prepareStatement(sql);
254+
pstmt.execute();
255+
} catch (SQLException e) {
256+
throw new RuntimeException(e);
257+
} finally {
258+
DBConnector.closeConn(pstmt, conn);
259+
}
260+
}
261+
262+
public static DiscardCompressedFileInfo discardCompressedFileInfoFromResultSet(ResultSet rs) throws SQLException {
263+
return DiscardCompressedFileInfo.newBuilder()
264+
.setFilePath(rs.getString("file_path"))
265+
.setTablePath(rs.getString("table_path"))
266+
.setPartitionDesc(rs.getString("partition_desc"))
267+
.setTimestamp(rs.getLong("timestamp"))
268+
.setTDate(rs.getString("t_date"))
269+
.build();
270+
}
271+
}

0 commit comments

Comments
 (0)