diff --git a/pom.xml b/pom.xml index 19898025d..e9b9b9759 100644 --- a/pom.xml +++ b/pom.xml @@ -43,6 +43,11 @@ + + org.apache.commons + commons-csv + 1.8 + diff --git a/src/main/java/io/mycat/server/handler/ServerLoadDataInfileHandler.java b/src/main/java/io/mycat/server/handler/ServerLoadDataInfileHandler.java index 15c12a7da..36361ae55 100644 --- a/src/main/java/io/mycat/server/handler/ServerLoadDataInfileHandler.java +++ b/src/main/java/io/mycat/server/handler/ServerLoadDataInfileHandler.java @@ -2,8 +2,8 @@ * Copyright (c) 2013, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This code is free software;Designed and Developed mainly by many Chinese - * opensource volunteers. you can redistribute it and/or modify it under the + * This code is free software;Designed and Developed mainly by many Chinese + * opensource volunteers. you can redistribute it and/or modify it under the * terms of the GNU General Public License version 2 only, as published by the * Free Software Foundation. * @@ -16,8 +16,8 @@ * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Any questions about this component can be directed to it's project Web address + * + * Any questions about this component can be directed to it's project Web address * https://code.google.com/p/opencloudb/. * */ @@ -32,9 +32,6 @@ import com.alibaba.druid.sql.parser.SQLStatementParser; import com.google.common.collect.Lists; import com.google.common.io.Files; -import com.univocity.parsers.csv.CsvParser; -import com.univocity.parsers.csv.CsvParserSettings; - import io.mycat.MycatServer; import io.mycat.cache.LayerCachePool; import io.mycat.config.ErrorCode; @@ -56,26 +53,27 @@ import io.mycat.sqlengine.mpp.LoadData; import io.mycat.util.ObjectUtil; import io.mycat.util.StringUtil; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.*; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.sql.SQLNonTransientException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; +import java.util.*; + +//import com.univocity.parsers.csv.CsvParser; +//import com.univocity.parsers.csv.CsvParserSettings; /** * mysql命令行客户端也需要启用local file权限,加参数--local-infile=1 * jdbc则正常,不用设置 * load data sql中的CHARACTER SET 'gbk' 其中的字符集必须引号括起来,否则druid解析出错 */ -public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler -{ +public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(ServerLoadDataInfileHandler.class); private ServerConnection serverConnection; private String sql; private String fileName; @@ -99,30 +97,24 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler private boolean shoudAddSlot = false; - public int getPackID() - { + public int getPackID() { return packID; } - public void setPackID(byte packID) - { + public void setPackID(byte packID) { this.packID = packID; } - public ServerLoadDataInfileHandler(ServerConnection serverConnection) - { + public ServerLoadDataInfileHandler(ServerConnection serverConnection) { this.serverConnection = serverConnection; } - private static String parseFileName(String sql) - { - if (sql.contains("'")) - { + private static String parseFileName(String sql) { + if (sql.contains("'")) { int beginIndex = sql.indexOf("'"); return sql.substring(beginIndex + 1, sql.indexOf("'", beginIndex + 1)); - } else if (sql.contains("\"")) - { + } else if (sql.contains("\"")) { int beginIndex = sql.indexOf("\""); return sql.substring(beginIndex + 1, sql.indexOf("\"", beginIndex + 1)); } @@ -130,8 +122,7 @@ private static String parseFileName(String sql) } - private void parseLoadDataPram() - { + private void parseLoadDataPram() { loadData = new LoadData(); SQLTextLiteralExpr rawLineEnd = (SQLTextLiteralExpr) statement.getLinesTerminatedBy(); String lineTerminatedBy = rawLineEnd == null ? "\n" : rawLineEnd.getText(); @@ -145,8 +136,8 @@ private void parseLoadDataPram() String enclose = rawEnclosed == null ? null : rawEnclosed.getText(); loadData.setEnclose(enclose); - SQLTextLiteralExpr escapseExpr = (SQLTextLiteralExpr)statement.getColumnsEscaped() ; - String escapse=escapseExpr==null?"\\":escapseExpr.getText(); + SQLTextLiteralExpr escapseExpr = (SQLTextLiteralExpr) statement.getColumnsEscaped(); + String escapse = escapseExpr == null ? "\\" : escapseExpr.getText(); loadData.setEscape(escapse); String charset = statement.getCharset() != null ? statement.getCharset() : serverConnection.getCharset(); loadData.setCharset(charset); @@ -155,8 +146,7 @@ private void parseLoadDataPram() @Override - public void start(String sql) - { + public void start(String sql) { clear(); this.sql = sql; @@ -165,30 +155,28 @@ public void start(String sql) statement = (MySqlLoadDataInFileStatement) parser.parseStatement(); fileName = parseFileName(sql); - if (fileName == null) - { + if (fileName == null) { serverConnection.writeErrMessage(ErrorCode.ER_FILE_NOT_FOUND, " file name is null !"); clear(); return; } schema = MycatServer.getInstance().getConfig() .getSchemas().get(serverConnection.getSchema()); - if (schema == null){ + if (schema == null) { throw new RuntimeException("please sql:use schema before load data"); } tableId2DataNodeCache = (LayerCachePool) MycatServer.getInstance().getCacheService().getCachePool("TableID2DataNodeCache"); tableName = statement.getTableName().getSimpleName().toUpperCase(); tableConfig = schema.getTables().get(tableName); - if( tableConfig.getRule() != null && tableConfig.getRule().getRuleAlgorithm() instanceof SlotFunction){ - shoudAddSlot=true; - } + if (tableConfig.getRule() != null && tableConfig.getRule().getRuleAlgorithm() instanceof SlotFunction) { + shoudAddSlot = true; + } tempPath = SystemConfig.getHomePath() + File.separator + "temp" + File.separator + serverConnection.getId() + File.separator; tempFile = tempPath + "clientTemp.txt"; tempByteBuffer = new ByteArrayOutputStream(); List columns = statement.getColumns(); - if(tableConfig!=null) - { + if (tableConfig != null) { String pColumn = getPartitionColumn(); if (pColumn != null && columns != null && columns.size() > 0) { for (int i = 0, columnsSize = columns.size(); i < columnsSize; i++) { @@ -196,19 +184,18 @@ public void start(String sql) if (pColumn.equalsIgnoreCase(column)) { partitionColumnIndex = i; } - if("_slot".equalsIgnoreCase(column)){ - shoudAddSlot=false; + if ("_slot".equalsIgnoreCase(column)) { + shoudAddSlot = false; } } } } - if(shoudAddSlot){ - columns.add(new SQLIdentifierExpr("_slot")); - } + if (shoudAddSlot) { + columns.add(new SQLIdentifierExpr("_slot")); + } parseLoadDataPram(); - if (statement.isLocal()) - { + if (statement.isLocal()) { isStartLoadData = true; //向客户端请求发送文件 ByteBuffer buffer = serverConnection.allocate(); @@ -216,18 +203,14 @@ public void start(String sql) filePacket.fileName = fileName.getBytes(); filePacket.packetId = 1; filePacket.write(buffer, serverConnection, true); - } else - { - if (!new File(fileName).exists()) - { + } else { + if (!new File(fileName).exists()) { serverConnection.writeErrMessage(ErrorCode.ER_FILE_NOT_FOUND, fileName + " is not found!"); clear(); - } else - { + } else { parseFileByLine(fileName, loadData.getCharset(), loadData.getLineTerminatedBy()); RouteResultset rrs = buildResultSet(routeResultMap); - if (rrs != null) - { + if (rrs != null) { flushDataToFile(); isStartLoadData = false; serverConnection.getSession2().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL); @@ -238,13 +221,10 @@ public void start(String sql) } @Override - public void handle(byte[] data) - { + public void handle(byte[] data) { - try - { - if (sql == null) - { + try { + if (sql == null) { serverConnection.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR, "Unknown command"); clear(); @@ -257,25 +237,20 @@ public void handle(byte[] data) saveByteOrToFile(packet.data, false); - } catch (IOException e) - { + } catch (IOException e) { throw new RuntimeException(e); } } - private synchronized void saveByteOrToFile(byte[] data, boolean isForce) - { + private synchronized void saveByteOrToFile(byte[] data, boolean isForce) { - if (data != null) - { + if (data != null) { tempByteBuffrSize = tempByteBuffrSize + data.length; - try - { + try { tempByteBuffer.write(data); - } catch (IOException e) - { + } catch (IOException e) { throw new RuntimeException(e); } } @@ -283,29 +258,24 @@ private synchronized void saveByteOrToFile(byte[] data, boolean isForce) if ((isForce && isHasStoreToFile) || tempByteBuffrSize > 200 * 1024 * 1024) //超过200M 存文件 { FileOutputStream channel = null; - try - { + try { File file = new File(tempFile); Files.createParentDirs(file); channel = new FileOutputStream(file, true); tempByteBuffer.writeTo(channel); - tempByteBuffer=new ByteArrayOutputStream(); - tempByteBuffrSize = 0; + tempByteBuffer = new ByteArrayOutputStream(); + tempByteBuffrSize = 0; isHasStoreToFile = true; - } catch (IOException e) - { + } catch (IOException e) { throw new RuntimeException(e); - } finally - { - try - { + } finally { + try { if (channel != null) { channel.close(); } - } catch (IOException ignored) - { + } catch (IOException ignored) { } } @@ -315,29 +285,24 @@ private synchronized void saveByteOrToFile(byte[] data, boolean isForce) } - private RouteResultset tryDirectRoute(String sql, String[] lineList) - { + private RouteResultset tryDirectRoute(String sql, String[] lineList) { RouteResultset rrs = new RouteResultset(sql, ServerParse.INSERT); rrs.setLoadData(true); - if (tableConfig == null && schema.getDataNode() != null) - { + if (tableConfig == null && schema.getDataNode() != null) { //走默认节点 RouteResultsetNode rrNode = new RouteResultsetNode(schema.getDataNode(), ServerParse.INSERT, sql); rrNode.setSource(rrs); rrs.setNodes(new RouteResultsetNode[]{rrNode}); return rrs; - } - else if (tableConfig != null&&tableConfig.isGlobalTable()) - { - ArrayList dataNodes= tableConfig.getDataNodes(); - RouteResultsetNode[] rrsNodes= new RouteResultsetNode[dataNodes.size()]; - for (int i = 0, dataNodesSize = dataNodes.size(); i < dataNodesSize; i++) - { + } else if (tableConfig != null && tableConfig.isGlobalTable()) { + ArrayList dataNodes = tableConfig.getDataNodes(); + RouteResultsetNode[] rrsNodes = new RouteResultsetNode[dataNodes.size()]; + for (int i = 0, dataNodesSize = dataNodes.size(); i < dataNodesSize; i++) { String dataNode = dataNodes.get(i); RouteResultsetNode rrNode = new RouteResultsetNode(dataNode, ServerParse.INSERT, sql); - rrsNodes[i]=rrNode; - if(rrs.getDataNodeSlotMap().containsKey(dataNode)){ + rrsNodes[i] = rrNode; + if (rrs.getDataNodeSlotMap().containsKey(dataNode)) { rrsNodes[i].setSlot(rrs.getDataNodeSlotMap().get(dataNode)); } rrsNodes[i].setSource(rrs); @@ -345,45 +310,39 @@ else if (tableConfig != null&&tableConfig.isGlobalTable()) rrs.setNodes(rrsNodes); return rrs; - } - else if (tableConfig != null) - { + } else if (tableConfig != null) { DruidShardingParseInfo ctx = new DruidShardingParseInfo(); ctx.addTable(tableName); - if (partitionColumnIndex == -1 || partitionColumnIndex >= lineList.length) - { + if (partitionColumnIndex == -1 || partitionColumnIndex >= lineList.length) { return null; - } else - { + } else { String value = lineList[partitionColumnIndex]; RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit(); - routeCalculateUnit.addShardingExpr(tableName, getPartitionColumn(), parseFieldString(value,loadData.getEnclose())); + routeCalculateUnit.addShardingExpr(tableName, getPartitionColumn(), parseFieldString(value, loadData.getEnclose())); ctx.addRouteCalculateUnit(routeCalculateUnit); - try - { - SortedSet nodeSet = new TreeSet(); - for(RouteCalculateUnit unit : ctx.getRouteCalculateUnits()) { - RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, ctx, unit, rrs, false, tableId2DataNodeCache); - if(rrsTmp != null) { - for(RouteResultsetNode node :rrsTmp.getNodes()) { - nodeSet.add(node); - } - } - } - - RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()]; - int i = 0; - for (Iterator iterator = nodeSet.iterator(); iterator.hasNext();) { - nodes[i] = (RouteResultsetNode) iterator.next(); - i++; - } - - rrs.setNodes(nodes); + try { + SortedSet nodeSet = new TreeSet(); + for (RouteCalculateUnit unit : ctx.getRouteCalculateUnits()) { + RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, ctx, unit, rrs, false, tableId2DataNodeCache); + if (rrsTmp != null) { + for (RouteResultsetNode node : rrsTmp.getNodes()) { + nodeSet.add(node); + } + } + } + + RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()]; + int i = 0; + for (Iterator iterator = nodeSet.iterator(); iterator.hasNext(); ) { + nodes[i] = (RouteResultsetNode) iterator.next(); + i++; + } + + rrs.setNodes(nodes); return rrs; - } catch (SQLNonTransientException e) - { + } catch (SQLNonTransientException e) { throw new RuntimeException(e); } } @@ -395,29 +354,23 @@ else if (tableConfig != null) } - private void parseOneLine(List columns, String tableName, String[] line, boolean toFile, String lineEnd) - { + private void parseOneLine(List columns, String tableName, String[] line, boolean toFile, String lineEnd) { RouteResultset rrs = tryDirectRoute(sql, line); - if (rrs == null || rrs.getNodes() == null || rrs.getNodes().length == 0) - { + if (rrs == null || rrs.getNodes() == null || rrs.getNodes().length == 0) { String insertSql = makeSimpleInsert(columns, line, tableName, true); rrs = serverConnection.routeSQL(insertSql, ServerParse.INSERT); } - if (rrs == null || rrs.getNodes() == null || rrs.getNodes().length == 0) - { + if (rrs == null || rrs.getNodes() == null || rrs.getNodes().length == 0) { //无路由处理 - } else - { - for (RouteResultsetNode routeResultsetNode : rrs.getNodes()) - { + } else { + for (RouteResultsetNode routeResultsetNode : rrs.getNodes()) { String name = routeResultsetNode.getName(); LoadData data = routeResultMap.get(name); - if (data == null) - { + if (data == null) { data = new LoadData(); data.setCharset(loadData.getCharset()); data.setEnclose(loadData.getEnclose()); @@ -427,110 +380,90 @@ private void parseOneLine(List columns, String tableName, String[] line routeResultMap.put(name, data); } - String jLine = joinField(line, data); - if(shoudAddSlot){ - jLine=jLine+loadData.getFieldTerminatedBy()+routeResultsetNode.getSlot(); + String jLine = joinField(line, data); + if (shoudAddSlot) { + jLine = jLine + loadData.getFieldTerminatedBy() + routeResultsetNode.getSlot(); } - if (data.getData() == null) - { - data.setData(Lists.newArrayList(jLine)); - } else - { + if (data.getData() == null) { + data.setData(Lists.newArrayList(jLine)); + } else { - data.getData().add(jLine); + data.getData().add(jLine); - } + } if (toFile //避免当导入数据跨多分片时内存溢出的情况 - && data.getData().size()>10000) - { - saveDataToFile(data,name); + && data.getData().size() > 10000) { + saveDataToFile(data, name); } } } } - private void flushDataToFile() - { - for (Map.Entry stringLoadDataEntry : routeResultMap.entrySet()) - { + private void flushDataToFile() { + for (Map.Entry stringLoadDataEntry : routeResultMap.entrySet()) { LoadData value = stringLoadDataEntry.getValue(); - if( value.getFileName()!=null&&value.getData()!=null&&value.getData().size()>0) - { - saveDataToFile(value,stringLoadDataEntry.getKey()); + if (value.getFileName() != null && value.getData() != null && value.getData().size() > 0) { + saveDataToFile(value, stringLoadDataEntry.getKey()); } } } - private void saveDataToFile(LoadData data,String dnName) - { - if (data.getFileName() == null) - { + private void saveDataToFile(LoadData data, String dnName) { + if (data.getFileName() == null) { String dnPath = tempPath + dnName + ".txt"; data.setFileName(dnPath); } - File dnFile = new File(data.getFileName()); - try - { - if (!dnFile.exists()) { - Files.createParentDirs(dnFile); - } - Files.append(joinLine(data.getData(),data), dnFile, Charset.forName(loadData.getCharset())); - - } catch (IOException e) - { - throw new RuntimeException(e); - }finally - { - data.setData(null); - + File dnFile = new File(data.getFileName()); + try { + if (!dnFile.exists()) { + Files.createParentDirs(dnFile); } + Files.append(joinLine(data.getData(), data), dnFile, Charset.forName(loadData.getCharset())); + + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + data.setData(null); + } } - private String joinLine(List data, LoadData loadData) - { + private String joinLine(List data, LoadData loadData) { StringBuilder sb = new StringBuilder(); - for (String s : data) - { - sb.append(s).append(loadData.getLineTerminatedBy()) ; + for (String s : data) { + sb.append(s).append(loadData.getLineTerminatedBy()); } return sb.toString(); } - private String joinField(String[] src, LoadData loadData) - { + private String joinField(String[] src, LoadData loadData) { StringBuilder sb = new StringBuilder(); - for (int i = 0, srcLength = src.length; i < srcLength; i++) - { - String s = src[i]!=null?src[i]:""; - if(loadData.getEnclose()==null) - { - sb.append(s); - } else - { - sb.append(loadData.getEnclose()).append(s.replace(loadData.getEnclose(),loadData.getEscape()+loadData.getEnclose())).append(loadData.getEnclose()); + for (int i = 0, srcLength = src.length; i < srcLength; i++) { + String s = src[i] != null ? src[i] : ""; + if (loadData.getEnclose() == null) { + sb.append(s); + } else { + sb.append(loadData.getEnclose()).append(s.replace(loadData.getEnclose(), loadData.getEscape() + loadData.getEnclose())).append(loadData.getEnclose()); } - if(i!=srcLength-1) - { + if (i != srcLength - 1) { sb.append(loadData.getFieldTerminatedBy()); } } - return sb.toString(); + return sb.toString(); } - private RouteResultset buildResultSet(Map routeMap) - { + private RouteResultset buildResultSet(Map routeMap) { statement.setLocal(true);//强制local SQLLiteralExpr fn = new SQLCharExpr(fileName); //默认druid会过滤掉路径的分隔符,所以这里重新设置下 statement.setFileName(fn); @@ -543,8 +476,7 @@ private RouteResultset buildResultSet(Map routeMap) int size = routeMap.size(); RouteResultsetNode[] routeResultsetNodes = new RouteResultsetNode[size]; int index = 0; - for (String dn : routeMap.keySet()) - { + for (String dn : routeMap.keySet()) { RouteResultsetNode rrNode = new RouteResultsetNode(dn, ServerParse.LOAD_DATA_INFILE_SQL, srcStatement); rrNode.setSource(rrs); rrNode.setTotalNodeSize(size); @@ -553,12 +485,11 @@ private RouteResultset buildResultSet(Map routeMap) ObjectUtil.copyProperties(loadData, newLoadData); newLoadData.setLocal(true); LoadData loadData1 = routeMap.get(dn); - // if (isHasStoreToFile) - if (loadData1.getFileName()!=null)//此处判断是否有保存分库load的临时文件dn1.txt/dn2.txt,不是判断是否有clientTemp.txt + // if (isHasStoreToFile) + if (loadData1.getFileName() != null)//此处判断是否有保存分库load的临时文件dn1.txt/dn2.txt,不是判断是否有clientTemp.txt { newLoadData.setFileName(loadData1.getFileName()); - } else - { + } else { newLoadData.setData(loadData1.getData()); } rrNode.setLoadData(newLoadData); @@ -571,19 +502,15 @@ private RouteResultset buildResultSet(Map routeMap) } - private String makeSimpleInsert(List columns, String[] fields, String table, boolean isAddEncose) - { + private String makeSimpleInsert(List columns, String[] fields, String table, boolean isAddEncose) { StringBuilder sb = new StringBuilder(); sb.append(LoadData.loadDataHint).append("insert into ").append(table.toUpperCase()); - if (columns != null && columns.size() > 0) - { + if (columns != null && columns.size() > 0) { sb.append("("); - for (int i = 0, columnsSize = columns.size(); i < columnsSize; i++) - { + for (int i = 0, columnsSize = columns.size(); i < columnsSize; i++) { SQLExpr column = columns.get(i); sb.append(column.toString()); - if (i != columnsSize - 1) - { + if (i != columnsSize - 1) { sb.append(","); } } @@ -591,18 +518,14 @@ private String makeSimpleInsert(List columns, String[] fields, String t } sb.append(" values ("); - for (int i = 0, columnsSize = fields.length; i < columnsSize; i++) - { + for (int i = 0, columnsSize = fields.length; i < columnsSize; i++) { String column = fields[i]; - if (isAddEncose) - { + if (isAddEncose) { sb.append("'").append(parseFieldString(column, loadData.getEnclose())).append("'"); - } else - { + } else { sb.append(column); } - if (i != columnsSize - 1) - { + if (i != columnsSize - 1) { sb.append(","); } } @@ -610,13 +533,10 @@ private String makeSimpleInsert(List columns, String[] fields, String t return sb.toString(); } - private String parseFieldString(String value, String encose) - { - if (encose == null || "".equals(encose) || value == null) - { + private String parseFieldString(String value, String encose) { + if (encose == null || "".equals(encose) || value == null) { return value; - } else if (value.startsWith(encose) && value.endsWith(encose)) - { + } else if (value.startsWith(encose) && value.endsWith(encose)) { return value.substring(encose.length() - 1, value.length() - encose.length()); } return value; @@ -624,63 +544,53 @@ private String parseFieldString(String value, String encose) @Override - public void end(byte packID) - { + public void end(byte packID) { isStartLoadData = false; this.packID = packID; //load in data空包 结束 saveByteOrToFile(null, true); List columns = statement.getColumns(); String tableName = statement.getTableName().getSimpleName(); - if (isHasStoreToFile) - { + if (isHasStoreToFile) { parseFileByLine(tempFile, loadData.getCharset(), loadData.getLineTerminatedBy()); - } else - { - String content = new String(tempByteBuffer.toByteArray(), Charset.forName(loadData.getCharset())); - - // List lines = Splitter.on(loadData.getLineTerminatedBy()).omitEmptyStrings().splitToList(content); - CsvParserSettings settings = new CsvParserSettings(); - settings.setMaxColumns(65535); - settings.setMaxCharsPerColumn(65535); - settings.getFormat().setLineSeparator(loadData.getLineTerminatedBy()); - settings.getFormat().setDelimiter(loadData.getFieldTerminatedBy().charAt(0)); - if(loadData.getEnclose()!=null) - { - settings.getFormat().setQuote(loadData.getEnclose().charAt(0)); + } else { + String reader = new String(tempByteBuffer.toByteArray(), Charset.forName(loadData.getCharset())); + + CSVFormat csvFormat = CSVFormat.newFormat( + loadData.getFieldTerminatedBy().charAt(0)) + .withRecordSeparator(loadData.getLineTerminatedBy()). + withSkipHeaderRecord(false).withTrim(false); + + if (loadData.getEnclose() != null) { + csvFormat = csvFormat.withQuote(loadData.getEnclose().charAt(0)); } - if(loadData.getEscape()!=null) - { - settings.getFormat().setQuoteEscape(loadData.getEscape().charAt(0)); + if (loadData.getEscape() != null) { + csvFormat = csvFormat.withQuote(loadData.getEscape().charAt(0)); } - settings.getFormat().setNormalizedNewline(loadData.getLineTerminatedBy().charAt(0)); /* * fix bug #1074 : LOAD DATA local INFILE导入的所有Boolean类型全部变成了false * 不可见字符将在CsvParser被当成whitespace过滤掉, 使用settings.trimValues(false)来避免被过滤掉 * TODO : 设置trimValues(false)之后, 会引起字段值前后的空白字符无法被过滤! */ - settings.trimValues(false); - CsvParser parser = new CsvParser(settings); - try - { - parser.beginParsing(new StringReader(content)); + try { + Iterable records = csvFormat.parse(new StringReader(reader)); String[] row = null; - while ((row = parser.parseNext()) != null) - { + for (CSVRecord record : records) { + int size = record.size(); + row = new String[size]; + for (int i = 0; i < size; i++) { + row[i] = record.get(i); + } parseOneLine(columns, tableName, row, false, null); } - } finally - { - parser.stopParsing(); + } catch (Throwable e) { + LOGGER.error("解析csv格式错误", e); } - - } RouteResultset rrs = buildResultSet(routeResultMap); - if (rrs != null) - { + if (rrs != null) { flushDataToFile(); serverConnection.getSession2().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL); } @@ -692,167 +602,143 @@ public void end(byte packID) } - private void parseFileByLine(String file, String encode, String split) - { + private void parseFileByLine(String file, String encode, String split) { List columns = statement.getColumns(); - CsvParserSettings settings = new CsvParserSettings(); - settings.setMaxColumns(65535); - settings.setMaxCharsPerColumn(65535); - settings.getFormat().setLineSeparator(loadData.getLineTerminatedBy()); - settings.getFormat().setDelimiter(loadData.getFieldTerminatedBy().charAt(0)); - if(loadData.getEnclose()!=null) - { - settings.getFormat().setQuote(loadData.getEnclose().charAt(0)); + CSVFormat csvFormat = CSVFormat.newFormat( + loadData.getFieldTerminatedBy().charAt(0)) + .withRecordSeparator(loadData.getLineTerminatedBy()). + withSkipHeaderRecord(false).withTrim(false); + + if (loadData.getEnclose() != null) { + csvFormat = csvFormat.withQuote(loadData.getEnclose().charAt(0)); } - if(loadData.getEscape()!=null) - { - settings.getFormat().setQuoteEscape(loadData.getEscape().charAt(0)); + if (loadData.getEscape() != null) { + csvFormat = csvFormat.withQuote(loadData.getEscape().charAt(0)); } - settings.getFormat().setNormalizedNewline(loadData.getLineTerminatedBy().charAt(0)); /* - * fix #1074 : LOAD DATA local INFILE导入的所有Boolean类型全部变成了false + * fix bug #1074 : LOAD DATA local INFILE导入的所有Boolean类型全部变成了false * 不可见字符将在CsvParser被当成whitespace过滤掉, 使用settings.trimValues(false)来避免被过滤掉 * TODO : 设置trimValues(false)之后, 会引起字段值前后的空白字符无法被过滤! */ - settings.trimValues(false); - CsvParser parser = new CsvParser(settings); - InputStreamReader reader = null; - FileInputStream fileInputStream = null; - try - { - - fileInputStream = new FileInputStream(file); - reader = new InputStreamReader(fileInputStream, encode); - parser.beginParsing(reader); - String[] row = null; - while ((row = parser.parseNext()) != null) - { - parseOneLine(columns, tableName, row, true, loadData.getLineTerminatedBy()); - } + InputStreamReader reader = null; + FileInputStream fileInputStream = null; + try { - } catch (FileNotFoundException | UnsupportedEncodingException e) - { - throw new RuntimeException(e); - } finally - { - parser.stopParsing(); - if(fileInputStream!=null) - { - try - { - fileInputStream.close(); - } catch (IOException e) - { - throw new RuntimeException(e); + fileInputStream = new FileInputStream(file); + reader = new InputStreamReader(fileInputStream, encode); + Iterable records = csvFormat.parse((reader)); + String[] row = null; + for (CSVRecord record : records) { + int size = record.size(); + row = new String[size]; + for (int i = 0; i < size; i++) { + row[i] = record.get(i); + } + parseOneLine(columns, tableName, row, true, loadData.getLineTerminatedBy()); } - } - if (reader != null) - { - try - { - reader.close(); - } catch (IOException e) - { - throw new RuntimeException(e); + } catch (FileNotFoundException | UnsupportedEncodingException e) { + throw new RuntimeException(e); + } catch (IOException e) { + e.printStackTrace(); + } finally { + if (fileInputStream != null) { + try { + fileInputStream.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (reader != null) { + try { + reader.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } } - } - - } + } } - public void clear() - { - isStartLoadData = false; - tableId2DataNodeCache = null; - schema = null; - tableConfig = null; - isHasStoreToFile = false; - packID = 0; - tempByteBuffrSize = 0; - tableName=null; - partitionColumnIndex = -1; - if (tempFile != null) - { - File temp = new File(tempFile); - if (temp.exists()) - { - temp.delete(); + public void clear () { + isStartLoadData = false; + tableId2DataNodeCache = null; + schema = null; + tableConfig = null; + isHasStoreToFile = false; + packID = 0; + tempByteBuffrSize = 0; + tableName = null; + partitionColumnIndex = -1; + if (tempFile != null) { + File temp = new File(tempFile); + if (temp.exists()) { + temp.delete(); + } } + if (tempPath != null && new File(tempPath).exists()) { + deleteFile(tempPath); + } + tempByteBuffer = null; + loadData = null; + sql = null; + fileName = null; + statement = null; + routeResultMap.clear(); } - if (tempPath != null && new File(tempPath).exists()) - { - deleteFile(tempPath); - } - tempByteBuffer = null; - loadData = null; - sql = null; - fileName = null; - statement = null; - routeResultMap.clear(); - } - @Override - public byte getLastPackId() - { - return packID; - } - - @Override - public boolean isStartLoadData() - { - return isStartLoadData; - } + @Override + public byte getLastPackId () { + return packID; + } - private String getPartitionColumn() { - String pColumn; - if (tableConfig.isSecondLevel() - && tableConfig.getParentTC().getPartitionColumn() - .equals(tableConfig.getParentKey())) { - pColumn = tableConfig.getJoinKey(); - }else { - pColumn = tableConfig.getPartitionColumn(); - } - return pColumn; - } - - /** - * 删除目录及其所有子目录和文件 - * - * @param dirPath 要删除的目录路径 - * @throws Exception - */ - private static void deleteFile(String dirPath) - { - File fileDirToDel = new File(dirPath); - if (!fileDirToDel.exists()) - { - return; + @Override + public boolean isStartLoadData () { + return isStartLoadData; } - if (fileDirToDel.isFile()) - { - fileDirToDel.delete(); - return; + + private String getPartitionColumn () { + String pColumn; + if (tableConfig.isSecondLevel() + && tableConfig.getParentTC().getPartitionColumn() + .equals(tableConfig.getParentKey())) { + pColumn = tableConfig.getJoinKey(); + } else { + pColumn = tableConfig.getPartitionColumn(); + } + return pColumn; } - File[] fileList = fileDirToDel.listFiles(); - for (int i = 0; i < fileList.length; i++) - { - File file = fileList[i]; - if (file.isFile()&&file.exists()) - { - boolean delete = file.delete(); - } else if (file.isDirectory()) - { - deleteFile(file.getAbsolutePath()); - file.delete(); + /** + * 删除目录及其所有子目录和文件 + * + * @param dirPath 要删除的目录路径 + * @throws Exception + */ + private static void deleteFile (String dirPath){ + File fileDirToDel = new File(dirPath); + if (!fileDirToDel.exists()) { + return; + } + if (fileDirToDel.isFile()) { + fileDirToDel.delete(); + return; + } + File[] fileList = fileDirToDel.listFiles(); + + for (int i = 0; i < fileList.length; i++) { + File file = fileList[i]; + if (file.isFile() && file.exists()) { + boolean delete = file.delete(); + } else if (file.isDirectory()) { + deleteFile(file.getAbsolutePath()); + file.delete(); + } } + fileDirToDel.delete(); } - fileDirToDel.delete(); - } -} + } diff --git a/version.txt b/version.txt index 62c82fa18..81adb4444 100644 --- a/version.txt +++ b/version.txt @@ -1,5 +1,5 @@ -BuildTime 2019-09-18 16:34:05 -GitVersion ba0063ef6edd2b9a1e2cec7f8cc140ae9f4c294a +BuildTime 2020-07-27 10:17:13 +GitVersion 65167ab0f4edca3a6c6f998b5c8f1f2a7f3a8c58 MavenVersion 1.6.7.3-release GitUrl https://github.com/MyCATApache/Mycat-Server.git MyCatSite http://www.mycat.org.cn