Skip to content

Optimize java streams usage for better performance #372

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion SIT/environment.sh
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ _Setup() {
# docker build -t ${dockerContainerVersion} ..

_info "Starting Docker container ${DOCKER_CASS}"
docker run --name ${DOCKER_CDM} --network ${NETWORK_NAME} --ip ${SUBNET}.3 -e "CASS_USERNAME=${CASS_USERNAME}" -e "CASS_PASSWORD=${CASS_PASSWORD}" -e "CASS_CLUSTER=${DOCKER_CASS}" -d ${dockerContainerVersion}
docker run --platform linux/amd64 --name ${DOCKER_CDM} --network ${NETWORK_NAME} --ip ${SUBNET}.3 -e "CASS_USERNAME=${CASS_USERNAME}" -e "CASS_PASSWORD=${CASS_PASSWORD}" -e "CASS_CLUSTER=${DOCKER_CASS}" -d ${dockerContainerVersion}
attempt=1
while [[ $attempt -le 12 && "$(_testDockerCDM)" != "yes" ]]; do
_info "waiting for CDM to start, attempt $attempt"
Expand Down
70 changes: 59 additions & 11 deletions src/main/java/com/datastax/cdm/data/CqlConversion.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -352,24 +354,70 @@ protected static Object convert_COLLECTION(Type collectionType, Object value, Li
}

// If all elements in conversionTypeList are either NONE or UNKNOWN, then return the original value
if (conversionTypeList.stream().allMatch(t -> Type.NONE.equals(t) || Type.UNSUPPORTED.equals(t)))
// Avoid stream operation for simple check
boolean allNoneOrUnsupported = true;
for (Type t : conversionTypeList) {
if (!Type.NONE.equals(t) && !Type.UNSUPPORTED.equals(t)) {
allNoneOrUnsupported = false;
break;
}
}
if (allNoneOrUnsupported) {
return value;
}

switch (collectionType) {
case LIST:
return ((List<?>) value).stream().map(v -> convert_ONE(conversionTypeList.get(0), v,
fromDataTypeList.get(0), toDataTypeList.get(0), codecRegistry)).collect(Collectors.toList());
// Pre-allocate result list with proper capacity to avoid resizing
List<?> sourceList = (List<?>) value;
List<Object> resultList = new ArrayList<>(sourceList.size());
Type elementConversionType = conversionTypeList.get(0);
DataType fromElementType = fromDataTypeList.get(0);
DataType toElementType = toDataTypeList.get(0);

// Direct iteration is more efficient than stream for simpler operations
for (Object element : sourceList) {
resultList.add(
convert_ONE(elementConversionType, element, fromElementType, toElementType, codecRegistry));
}
return resultList;

case SET:
return ((Set<?>) value).stream().map(v -> convert_ONE(conversionTypeList.get(0), v, fromDataTypeList.get(0),
toDataTypeList.get(0), codecRegistry)).collect(Collectors.toSet());
// For sets, we need to maintain uniqueness
Set<?> sourceSet = (Set<?>) value;
Set<Object> resultSet = new HashSet<>(Math.max((int) (sourceSet.size() / .75f) + 1, 16)); // Proper
// pre-sizing
elementConversionType = conversionTypeList.get(0);
fromElementType = fromDataTypeList.get(0);
toElementType = toDataTypeList.get(0);

for (Object element : sourceSet) {
resultSet.add(
convert_ONE(elementConversionType, element, fromElementType, toElementType, codecRegistry));
}
return resultSet;

case MAP:
// There are two conversion types in the element list: one for keys and one for values
return ((Map<?, ?>) value).entrySet().stream()
.collect(Collectors.toMap(
entry -> convert_ONE(conversionTypeList.get(0), entry.getKey(), fromDataTypeList.get(0),
toDataTypeList.get(0), codecRegistry),
entry -> convert_ONE(conversionTypeList.get(1), entry.getValue(), fromDataTypeList.get(1),
toDataTypeList.get(1), codecRegistry)));
Map<?, ?> sourceMap = (Map<?, ?>) value;
Map<Object, Object> resultMap = new HashMap<>(Math.max((int) (sourceMap.size() / .75f) + 1, 16)); // Proper
// capacity

Type keyConversionType = conversionTypeList.get(0);
Type valueConversionType = conversionTypeList.get(1);
DataType fromKeyType = fromDataTypeList.get(0);
DataType toKeyType = toDataTypeList.get(0);
DataType fromValueType = fromDataTypeList.get(1);
DataType toValueType = toDataTypeList.get(1);

for (Map.Entry<?, ?> entry : sourceMap.entrySet()) {
Object convertedKey = convert_ONE(keyConversionType, entry.getKey(), fromKeyType, toKeyType,
codecRegistry);
Object convertedValue = convert_ONE(valueConversionType, entry.getValue(), fromValueType, toValueType,
codecRegistry);
resultMap.put(convertedKey, convertedValue);
}
return resultMap;
}
return value;
}
Expand Down
61 changes: 46 additions & 15 deletions src/main/java/com/datastax/cdm/job/DiffJobSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ protected void processPartitionRange(PartitionRange range) {
}

private boolean diffAndClear(List<Record> recordsToDiff, JobCounter jobCounter) {
boolean isDiff = recordsToDiff.stream().map(r -> diff(r, jobCounter)).filter(b -> b == true).count() > 0;
// Use anyMatch for short-circuit evaluation instead of filtering and counting
boolean isDiff = recordsToDiff.stream().anyMatch(r -> diff(r, jobCounter));
recordsToDiff.clear();
return isDiff;
}
Expand Down Expand Up @@ -247,22 +248,39 @@ private String isDifferent(Record record) {
Row originRow = record.getOriginRow();
Row targetRow = record.getTargetRow();

StringBuffer diffData = new StringBuffer();
IntStream.range(0, targetColumnNames.size()).parallel().forEach(targetIndex -> {
// Use a thread-safe StringBuilder for collecting differences
final StringBuilder diffData = new StringBuilder();

// Pre-filter columns that need to be compared to avoid unnecessary processing
final List<Integer> columnsToProcess = new ArrayList<>();
for (int i = 0; i < targetColumnNames.size(); i++) {
if (!constantColumnIndexes.contains(i)) {
columnsToProcess.add(i);
}
}

// Determine optimal parallelism based on column count
final int columnCount = columnsToProcess.size();
final int availableProcessors = Runtime.getRuntime().availableProcessors();
final boolean useParallel = columnCount > 10; // Only use parallel for tables with more than 10 columns

// Choose appropriate stream type based on column count
IntStream stream = IntStream.range(0, columnsToProcess.size());
if (useParallel) {
stream = stream.parallel();
}

// Use synchronized block to avoid contention on the StringBuilder
stream.forEach(i -> {
int targetIndex = columnsToProcess.get(i);
String previousLabel = ThreadContext.get(THREAD_CONTEXT_LABEL);
try {
ThreadContext.put(THREAD_CONTEXT_LABEL, pk + ":" + targetColumnNames.get(targetIndex));
Object origin = null;
int originIndex = -2; // this to distinguish default from indexOf result
Object targetAsOriginType = null;
try {
if (constantColumnIndexes.contains(targetIndex)) {
if (logTrace)
logger.trace("PK {}, targetIndex {} skipping constant column {}", pk, targetIndex,
targetColumnNames.get(targetIndex));
return; // nothing to compare in origin
}

// Already filtered out constant columns
targetAsOriginType = targetSession.getCqlTable().getAndConvertData(targetIndex, targetRow);
if (targetIndex == extractJsonFeature.getTargetColumnIndex()) {
if (!overwriteTarget && null != targetAsOriginType) {
Expand Down Expand Up @@ -306,18 +324,27 @@ private String isDifferent(Record record) {
originIndex < 0 ? "null"
: originSession.getCqlTable().getColumnNames(false).get(originIndex),
targetAsOriginType, origin);

StringBuilder diffEntry = new StringBuilder();
if (null != origin && DataUtility.diff(origin, targetAsOriginType)) {
String originContent = CqlData
.getFormattedContent(CqlData.toType(originColumnTypes.get(originIndex)), origin);
String targetContent = CqlData.getFormattedContent(
CqlData.toType(targetColumnTypes.get(targetIndex)), targetAsOriginType);
diffData.append("Target column:").append(targetColumnNames.get(targetIndex)).append("-origin[")
diffEntry.append("Target column:").append(targetColumnNames.get(targetIndex)).append("-origin[")
.append(originContent).append("]").append("-target[").append(targetContent)
.append("]; ");
} else if (null == origin && null != targetAsOriginType) {
diffData.append("Target column:").append(targetColumnNames.get(targetIndex))
diffEntry.append("Target column:").append(targetColumnNames.get(targetIndex))
.append(" origin is null, target is ").append(targetAsOriginType).append("; ");
}

// Synchronize on StringBuilder to avoid thread conflicts
if (diffEntry.length() > 0) {
synchronized (diffData) {
diffData.append(diffEntry);
}
}
} catch (Exception e) {
String exceptionName;
String myClassMethodLine = DataUtility.getMyClassMethodLine(e);
Expand All @@ -326,14 +353,18 @@ private String isDifferent(Record record) {
} else {
exceptionName = e + "@" + myClassMethodLine;
}
diffData.append("Target column:").append(targetColumnNames.get(targetIndex)).append(" Exception ")
.append(exceptionName).append(" targetIndex:").append(targetIndex).append(" originIndex:")
.append(originIndex).append("; ");

synchronized (diffData) {
diffData.append("Target column:").append(targetColumnNames.get(targetIndex))
.append(" Exception ").append(exceptionName).append(" targetIndex:").append(targetIndex)
.append(" originIndex:").append(originIndex).append("; ");
}
}
} finally {
ThreadContext.put(THREAD_CONTEXT_LABEL, previousLabel);
}
});

return diffData.toString();
}

Expand Down
80 changes: 60 additions & 20 deletions src/main/java/com/datastax/cdm/schema/CqlTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,23 +104,57 @@ public CqlTable(IPropertyHelper propertyHelper, boolean isOrigin, CqlSession ses
throw new IllegalArgumentException(
"No columns defined for table " + this.keyspaceName + "." + this.tableName);
}
this.columnNames = this.cqlAllColumns.stream().map(columnMetadata -> columnMetadata.getName().asInternal())
.collect(Collectors.toList());
// Replace stream operations with direct list operations for better performance
this.columnNames = new ArrayList<>(this.cqlAllColumns.size());
for (ColumnMetadata columnMetadata : this.cqlAllColumns) {
this.columnNames.add(columnMetadata.getName().asInternal());
}
}
this.columnCqlTypes = columnNames.stream().map(columnName -> this.columnNameToCqlTypeMap.get(columnName))
.collect(Collectors.toList());
this.bindClasses = columnCqlTypes.stream().map(CqlData::getBindClass).collect(Collectors.toList());

this.partitionKeyNames = cqlPartitionKey.stream().map(columnMetadata -> columnMetadata.getName().asInternal())
.collect(Collectors.toList());
this.pkNames = cqlPrimaryKey.stream().map(columnMetadata -> columnMetadata.getName().asInternal())
.collect(Collectors.toList());
List<DataType> pkTypes = cqlPrimaryKey.stream().map(ColumnMetadata::getType).collect(Collectors.toList());
this.pkClasses = pkTypes.stream().map(CqlData::getBindClass).collect(Collectors.toList());
this.pkIndexes = pkNames.stream().map(columnNames::indexOf).collect(Collectors.toList());
// Pre-allocate collections with proper capacity
this.columnCqlTypes = new ArrayList<>(columnNames.size());
for (String columnName : columnNames) {
this.columnCqlTypes.add(this.columnNameToCqlTypeMap.get(columnName));
}

this.bindClasses = new ArrayList<>(columnCqlTypes.size());
for (DataType dataType : columnCqlTypes) {
this.bindClasses.add(CqlData.getBindClass(dataType));
}

// Replace stream operations for partition key names
this.partitionKeyNames = new ArrayList<>(cqlPartitionKey.size());
for (ColumnMetadata columnMetadata : cqlPartitionKey) {
this.partitionKeyNames.add(columnMetadata.getName().asInternal());
}

// Replace stream operations for primary key names
this.pkNames = new ArrayList<>(cqlPrimaryKey.size());
List<DataType> pkTypes = new ArrayList<>(cqlPrimaryKey.size());
for (ColumnMetadata columnMetadata : cqlPrimaryKey) {
this.pkNames.add(columnMetadata.getName().asInternal());
pkTypes.add(columnMetadata.getType());
}

// Replace stream operations for primary key classes
this.pkClasses = new ArrayList<>(pkTypes.size());
for (DataType dataType : pkTypes) {
this.pkClasses.add(CqlData.getBindClass(dataType));
}

// Replace stream operations for primary key indexes
this.pkIndexes = new ArrayList<>(pkNames.size());
for (String pkName : pkNames) {
this.pkIndexes.add(columnNames.indexOf(pkName));
}

this.counterIndexes = IntStream.range(0, columnCqlTypes.size())
.filter(i -> columnCqlTypes.get(i).equals(DataTypes.COUNTER)).boxed().collect(Collectors.toList());
// Replace IntStream with direct iteration for counter indexes
this.counterIndexes = new ArrayList<>();
for (int i = 0; i < columnCqlTypes.size(); i++) {
if (columnCqlTypes.get(i).equals(DataTypes.COUNTER)) {
this.counterIndexes.add(i);
}
}
this.isCounterTable = !this.counterIndexes.isEmpty();

this.readConsistencyLevel = mapToConsistencyLevel(propertyHelper.getString(KnownProperties.READ_CL));
Expand Down Expand Up @@ -479,13 +513,19 @@ private void setCqlMetadata(CqlSession cqlSession) {

boolean allowCollectionsForWritetimeTTL = propertyHelper
.getBoolean(KnownProperties.ALLOW_COLL_FOR_WRITETIME_TTL_CALC);
this.writetimeTTLColumns = tableMetadata.getColumns().values().stream()
.filter(columnMetadata -> canColumnHaveTTLorWritetime(tableMetadata, columnMetadata,
allowCollectionsForWritetimeTTL))
.map(ColumnMetadata::getName).map(CqlIdentifier::asInternal).collect(Collectors.toList());
// Replace stream operations with direct filtering for writetime/TTL columns
this.writetimeTTLColumns = new ArrayList<>();
for (ColumnMetadata columnMetadata : tableMetadata.getColumns().values()) {
if (canColumnHaveTTLorWritetime(tableMetadata, columnMetadata, allowCollectionsForWritetimeTTL)) {
this.writetimeTTLColumns.add(columnMetadata.getName().asInternal());
}
}

this.columnNameToCqlTypeMap = this.cqlAllColumns.stream().collect(
Collectors.toMap(columnMetadata -> columnMetadata.getName().asInternal(), ColumnMetadata::getType));
// Replace stream with direct HashMap population
this.columnNameToCqlTypeMap = new HashMap<>(this.cqlAllColumns.size());
for (ColumnMetadata columnMetadata : this.cqlAllColumns) {
this.columnNameToCqlTypeMap.put(columnMetadata.getName().asInternal(), columnMetadata.getType());
}
}

private boolean canColumnHaveTTLorWritetime(TableMetadata tableMetadata, ColumnMetadata columnMetadata,
Expand Down
Loading