diff --git a/SIT/environment.sh b/SIT/environment.sh index d12d424a..3644a24c 100755 --- a/SIT/environment.sh +++ b/SIT/environment.sh @@ -240,7 +240,7 @@ _Setup() { # docker build -t ${dockerContainerVersion} .. _info "Starting Docker container ${DOCKER_CASS}" - docker run --name ${DOCKER_CDM} --network ${NETWORK_NAME} --ip ${SUBNET}.3 -e "CASS_USERNAME=${CASS_USERNAME}" -e "CASS_PASSWORD=${CASS_PASSWORD}" -e "CASS_CLUSTER=${DOCKER_CASS}" -d ${dockerContainerVersion} + docker run --platform linux/amd64 --name ${DOCKER_CDM} --network ${NETWORK_NAME} --ip ${SUBNET}.3 -e "CASS_USERNAME=${CASS_USERNAME}" -e "CASS_PASSWORD=${CASS_PASSWORD}" -e "CASS_CLUSTER=${DOCKER_CASS}" -d ${dockerContainerVersion} attempt=1 while [[ $attempt -le 12 && "$(_testDockerCDM)" != "yes" ]]; do _info "waiting for CDM to start, attempt $attempt" diff --git a/src/main/java/com/datastax/cdm/data/CqlConversion.java b/src/main/java/com/datastax/cdm/data/CqlConversion.java index 98ea9eb1..e48aae10 100644 --- a/src/main/java/com/datastax/cdm/data/CqlConversion.java +++ b/src/main/java/com/datastax/cdm/data/CqlConversion.java @@ -18,6 +18,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -352,24 +354,70 @@ protected static Object convert_COLLECTION(Type collectionType, Object value, Li } // If all elements in conversionTypeList are either NONE or UNKNOWN, then return the original value - if (conversionTypeList.stream().allMatch(t -> Type.NONE.equals(t) || Type.UNSUPPORTED.equals(t))) + // Avoid stream operation for simple check + boolean allNoneOrUnsupported = true; + for (Type t : conversionTypeList) { + if (!Type.NONE.equals(t) && !Type.UNSUPPORTED.equals(t)) { + allNoneOrUnsupported = false; + break; + } + } + if (allNoneOrUnsupported) { return value; + } switch (collectionType) { case LIST: - return ((List) value).stream().map(v -> convert_ONE(conversionTypeList.get(0), v, - fromDataTypeList.get(0), toDataTypeList.get(0), codecRegistry)).collect(Collectors.toList()); + // Pre-allocate result list with proper capacity to avoid resizing + List sourceList = (List) value; + List resultList = new ArrayList<>(sourceList.size()); + Type elementConversionType = conversionTypeList.get(0); + DataType fromElementType = fromDataTypeList.get(0); + DataType toElementType = toDataTypeList.get(0); + + // Direct iteration is more efficient than stream for simpler operations + for (Object element : sourceList) { + resultList.add( + convert_ONE(elementConversionType, element, fromElementType, toElementType, codecRegistry)); + } + return resultList; + case SET: - return ((Set) value).stream().map(v -> convert_ONE(conversionTypeList.get(0), v, fromDataTypeList.get(0), - toDataTypeList.get(0), codecRegistry)).collect(Collectors.toSet()); + // For sets, we need to maintain uniqueness + Set sourceSet = (Set) value; + Set resultSet = new HashSet<>(Math.max((int) (sourceSet.size() / .75f) + 1, 16)); // Proper + // pre-sizing + elementConversionType = conversionTypeList.get(0); + fromElementType = fromDataTypeList.get(0); + toElementType = toDataTypeList.get(0); + + for (Object element : sourceSet) { + resultSet.add( + convert_ONE(elementConversionType, element, fromElementType, toElementType, codecRegistry)); + } + return resultSet; + case MAP: // There are two conversion types in the element list: one for keys and one for values - return ((Map) value).entrySet().stream() - .collect(Collectors.toMap( - entry -> convert_ONE(conversionTypeList.get(0), entry.getKey(), fromDataTypeList.get(0), - toDataTypeList.get(0), codecRegistry), - entry -> convert_ONE(conversionTypeList.get(1), entry.getValue(), fromDataTypeList.get(1), - toDataTypeList.get(1), codecRegistry))); + Map sourceMap = (Map) value; + Map resultMap = new HashMap<>(Math.max((int) (sourceMap.size() / .75f) + 1, 16)); // Proper + // capacity + + Type keyConversionType = conversionTypeList.get(0); + Type valueConversionType = conversionTypeList.get(1); + DataType fromKeyType = fromDataTypeList.get(0); + DataType toKeyType = toDataTypeList.get(0); + DataType fromValueType = fromDataTypeList.get(1); + DataType toValueType = toDataTypeList.get(1); + + for (Map.Entry entry : sourceMap.entrySet()) { + Object convertedKey = convert_ONE(keyConversionType, entry.getKey(), fromKeyType, toKeyType, + codecRegistry); + Object convertedValue = convert_ONE(valueConversionType, entry.getValue(), fromValueType, toValueType, + codecRegistry); + resultMap.put(convertedKey, convertedValue); + } + return resultMap; } return value; } diff --git a/src/main/java/com/datastax/cdm/job/DiffJobSession.java b/src/main/java/com/datastax/cdm/job/DiffJobSession.java index 197ee6fb..e65e9028 100644 --- a/src/main/java/com/datastax/cdm/job/DiffJobSession.java +++ b/src/main/java/com/datastax/cdm/job/DiffJobSession.java @@ -197,7 +197,8 @@ protected void processPartitionRange(PartitionRange range) { } private boolean diffAndClear(List recordsToDiff, JobCounter jobCounter) { - boolean isDiff = recordsToDiff.stream().map(r -> diff(r, jobCounter)).filter(b -> b == true).count() > 0; + // Use anyMatch for short-circuit evaluation instead of filtering and counting + boolean isDiff = recordsToDiff.stream().anyMatch(r -> diff(r, jobCounter)); recordsToDiff.clear(); return isDiff; } @@ -247,8 +248,31 @@ private String isDifferent(Record record) { Row originRow = record.getOriginRow(); Row targetRow = record.getTargetRow(); - StringBuffer diffData = new StringBuffer(); - IntStream.range(0, targetColumnNames.size()).parallel().forEach(targetIndex -> { + // Use a thread-safe StringBuilder for collecting differences + final StringBuilder diffData = new StringBuilder(); + + // Pre-filter columns that need to be compared to avoid unnecessary processing + final List columnsToProcess = new ArrayList<>(); + for (int i = 0; i < targetColumnNames.size(); i++) { + if (!constantColumnIndexes.contains(i)) { + columnsToProcess.add(i); + } + } + + // Determine optimal parallelism based on column count + final int columnCount = columnsToProcess.size(); + final int availableProcessors = Runtime.getRuntime().availableProcessors(); + final boolean useParallel = columnCount > 10; // Only use parallel for tables with more than 10 columns + + // Choose appropriate stream type based on column count + IntStream stream = IntStream.range(0, columnsToProcess.size()); + if (useParallel) { + stream = stream.parallel(); + } + + // Use synchronized block to avoid contention on the StringBuilder + stream.forEach(i -> { + int targetIndex = columnsToProcess.get(i); String previousLabel = ThreadContext.get(THREAD_CONTEXT_LABEL); try { ThreadContext.put(THREAD_CONTEXT_LABEL, pk + ":" + targetColumnNames.get(targetIndex)); @@ -256,13 +280,7 @@ private String isDifferent(Record record) { int originIndex = -2; // this to distinguish default from indexOf result Object targetAsOriginType = null; try { - if (constantColumnIndexes.contains(targetIndex)) { - if (logTrace) - logger.trace("PK {}, targetIndex {} skipping constant column {}", pk, targetIndex, - targetColumnNames.get(targetIndex)); - return; // nothing to compare in origin - } - + // Already filtered out constant columns targetAsOriginType = targetSession.getCqlTable().getAndConvertData(targetIndex, targetRow); if (targetIndex == extractJsonFeature.getTargetColumnIndex()) { if (!overwriteTarget && null != targetAsOriginType) { @@ -306,18 +324,27 @@ private String isDifferent(Record record) { originIndex < 0 ? "null" : originSession.getCqlTable().getColumnNames(false).get(originIndex), targetAsOriginType, origin); + + StringBuilder diffEntry = new StringBuilder(); if (null != origin && DataUtility.diff(origin, targetAsOriginType)) { String originContent = CqlData .getFormattedContent(CqlData.toType(originColumnTypes.get(originIndex)), origin); String targetContent = CqlData.getFormattedContent( CqlData.toType(targetColumnTypes.get(targetIndex)), targetAsOriginType); - diffData.append("Target column:").append(targetColumnNames.get(targetIndex)).append("-origin[") + diffEntry.append("Target column:").append(targetColumnNames.get(targetIndex)).append("-origin[") .append(originContent).append("]").append("-target[").append(targetContent) .append("]; "); } else if (null == origin && null != targetAsOriginType) { - diffData.append("Target column:").append(targetColumnNames.get(targetIndex)) + diffEntry.append("Target column:").append(targetColumnNames.get(targetIndex)) .append(" origin is null, target is ").append(targetAsOriginType).append("; "); } + + // Synchronize on StringBuilder to avoid thread conflicts + if (diffEntry.length() > 0) { + synchronized (diffData) { + diffData.append(diffEntry); + } + } } catch (Exception e) { String exceptionName; String myClassMethodLine = DataUtility.getMyClassMethodLine(e); @@ -326,14 +353,18 @@ private String isDifferent(Record record) { } else { exceptionName = e + "@" + myClassMethodLine; } - diffData.append("Target column:").append(targetColumnNames.get(targetIndex)).append(" Exception ") - .append(exceptionName).append(" targetIndex:").append(targetIndex).append(" originIndex:") - .append(originIndex).append("; "); + + synchronized (diffData) { + diffData.append("Target column:").append(targetColumnNames.get(targetIndex)) + .append(" Exception ").append(exceptionName).append(" targetIndex:").append(targetIndex) + .append(" originIndex:").append(originIndex).append("; "); + } } } finally { ThreadContext.put(THREAD_CONTEXT_LABEL, previousLabel); } }); + return diffData.toString(); } diff --git a/src/main/java/com/datastax/cdm/schema/CqlTable.java b/src/main/java/com/datastax/cdm/schema/CqlTable.java index 0570f8fd..8c471ae8 100644 --- a/src/main/java/com/datastax/cdm/schema/CqlTable.java +++ b/src/main/java/com/datastax/cdm/schema/CqlTable.java @@ -104,23 +104,57 @@ public CqlTable(IPropertyHelper propertyHelper, boolean isOrigin, CqlSession ses throw new IllegalArgumentException( "No columns defined for table " + this.keyspaceName + "." + this.tableName); } - this.columnNames = this.cqlAllColumns.stream().map(columnMetadata -> columnMetadata.getName().asInternal()) - .collect(Collectors.toList()); + // Replace stream operations with direct list operations for better performance + this.columnNames = new ArrayList<>(this.cqlAllColumns.size()); + for (ColumnMetadata columnMetadata : this.cqlAllColumns) { + this.columnNames.add(columnMetadata.getName().asInternal()); + } } - this.columnCqlTypes = columnNames.stream().map(columnName -> this.columnNameToCqlTypeMap.get(columnName)) - .collect(Collectors.toList()); - this.bindClasses = columnCqlTypes.stream().map(CqlData::getBindClass).collect(Collectors.toList()); - this.partitionKeyNames = cqlPartitionKey.stream().map(columnMetadata -> columnMetadata.getName().asInternal()) - .collect(Collectors.toList()); - this.pkNames = cqlPrimaryKey.stream().map(columnMetadata -> columnMetadata.getName().asInternal()) - .collect(Collectors.toList()); - List pkTypes = cqlPrimaryKey.stream().map(ColumnMetadata::getType).collect(Collectors.toList()); - this.pkClasses = pkTypes.stream().map(CqlData::getBindClass).collect(Collectors.toList()); - this.pkIndexes = pkNames.stream().map(columnNames::indexOf).collect(Collectors.toList()); + // Pre-allocate collections with proper capacity + this.columnCqlTypes = new ArrayList<>(columnNames.size()); + for (String columnName : columnNames) { + this.columnCqlTypes.add(this.columnNameToCqlTypeMap.get(columnName)); + } + + this.bindClasses = new ArrayList<>(columnCqlTypes.size()); + for (DataType dataType : columnCqlTypes) { + this.bindClasses.add(CqlData.getBindClass(dataType)); + } + + // Replace stream operations for partition key names + this.partitionKeyNames = new ArrayList<>(cqlPartitionKey.size()); + for (ColumnMetadata columnMetadata : cqlPartitionKey) { + this.partitionKeyNames.add(columnMetadata.getName().asInternal()); + } + + // Replace stream operations for primary key names + this.pkNames = new ArrayList<>(cqlPrimaryKey.size()); + List pkTypes = new ArrayList<>(cqlPrimaryKey.size()); + for (ColumnMetadata columnMetadata : cqlPrimaryKey) { + this.pkNames.add(columnMetadata.getName().asInternal()); + pkTypes.add(columnMetadata.getType()); + } + + // Replace stream operations for primary key classes + this.pkClasses = new ArrayList<>(pkTypes.size()); + for (DataType dataType : pkTypes) { + this.pkClasses.add(CqlData.getBindClass(dataType)); + } + + // Replace stream operations for primary key indexes + this.pkIndexes = new ArrayList<>(pkNames.size()); + for (String pkName : pkNames) { + this.pkIndexes.add(columnNames.indexOf(pkName)); + } - this.counterIndexes = IntStream.range(0, columnCqlTypes.size()) - .filter(i -> columnCqlTypes.get(i).equals(DataTypes.COUNTER)).boxed().collect(Collectors.toList()); + // Replace IntStream with direct iteration for counter indexes + this.counterIndexes = new ArrayList<>(); + for (int i = 0; i < columnCqlTypes.size(); i++) { + if (columnCqlTypes.get(i).equals(DataTypes.COUNTER)) { + this.counterIndexes.add(i); + } + } this.isCounterTable = !this.counterIndexes.isEmpty(); this.readConsistencyLevel = mapToConsistencyLevel(propertyHelper.getString(KnownProperties.READ_CL)); @@ -479,13 +513,19 @@ private void setCqlMetadata(CqlSession cqlSession) { boolean allowCollectionsForWritetimeTTL = propertyHelper .getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_CALC); - this.writetimeTTLColumns = tableMetadata.getColumns().values().stream() - .filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata, - allowCollectionsForWritetimeTTL)) - .map(ColumnMetadata::getName).map(CqlIdentifier::asInternal).collect(Collectors.toList()); + // Replace stream operations with direct filtering for writetime/TTL columns + this.writetimeTTLColumns = new ArrayList<>(); + for (ColumnMetadata columnMetadata : tableMetadata.getColumns().values()) { + if (canColumnHaveTTLorWritetime(tableMetadata, columnMetadata, allowCollectionsForWritetimeTTL)) { + this.writetimeTTLColumns.add(columnMetadata.getName().asInternal()); + } + } - this.columnNameToCqlTypeMap = this.cqlAllColumns.stream().collect( - Collectors.toMap(columnMetadata -> columnMetadata.getName().asInternal(), ColumnMetadata::getType)); + // Replace stream with direct HashMap population + this.columnNameToCqlTypeMap = new HashMap<>(this.cqlAllColumns.size()); + for (ColumnMetadata columnMetadata : this.cqlAllColumns) { + this.columnNameToCqlTypeMap.put(columnMetadata.getName().asInternal(), columnMetadata.getType()); + } } private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata,