Skip to content

Commit 7e86528

Browse files
authored
Merge pull request #16 from SAP/develop
Added delayed processing based on memory usage. Fixing other issues
2 parents d2cb6af + 83e7cbe commit 7e86528

19 files changed

+348
-9
lines changed

commercedbsync/external-dependencies.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,5 +48,10 @@
4848
<artifactId>jackson-datatype-jsr310</artifactId>
4949
<version>2.13.3</version>
5050
</dependency>
51+
<dependency>
52+
<groupId>org.openjdk.jol</groupId>
53+
<artifactId>jol-core</artifactId>
54+
<version>0.10</version>
55+
</dependency>
5156
</dependencies>
5257
</project>

commercedbsync/project.properties

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,35 @@ migration.log.sql.source.showparameters=true
527527
##
528528
migration.data.filestorage.container.name=migration
529529
migration.data.fulldatabase.enabled=true
530+
##
531+
# Activates enhanced memory usage logging
532+
#
533+
# @values true or false
534+
# @optional false
535+
##
536+
migration.profiling=false
537+
##
538+
# Delays reading until a minimum amount of memory is available
539+
#
540+
# @values any number
541+
# @optional false
542+
##
543+
migration.memory.min=5000000
544+
##
545+
# Number of attempts to wait for free memory
546+
#
547+
# @values any number
548+
# @optional false
549+
##
550+
migration.memory.attempts=300
551+
##
552+
# Number of time to wait for free memory (milliseconds)
553+
#
554+
# @values any number
555+
# @optional false
556+
##
557+
migration.memory.wait=2000
558+
530559

531560
# Enhanced Logging
532561
log4j2.appender.migrationAppender.type=Console

commercedbsync/resources/commercedbsync-spring.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@
225225
<property name="configurationService" ref="configurationService" />
226226
</bean>
227227

228+
<bean id="updateYDeploymentsPostProcessor"
229+
class="com.sap.cx.boosters.commercedbsync.processors.impl.UpdateYDeploymentsPostProcessor">
230+
<property name="databaseCopyTaskRepository" ref="databaseCopyTaskRepository"/>
231+
</bean>
232+
228233
<alias name="defaultCopyCompleteEventListener" alias="copyCompleteEventListener"/>
229234
<bean id="defaultCopyCompleteEventListener"
230235
class="com.sap.cx.boosters.commercedbsync.events.handlers.CopyCompleteEventListener"
@@ -240,6 +245,7 @@
240245
<ref bean="reportMigrationPostProcessor"/>
241246
<ref bean="jdbcQueriesPostProcessor"/>
242247
<ref bean="adjustActiveTypeSystemPostProcessor"/>
248+
<ref bean="updateYDeploymentsPostProcessor"/>
243249
</util:list>
244250
</property>
245251
</bean>

commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/task/BatchMarkerDataReaderTask.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public BatchMarkerDataReaderTask(PipeTaskContext pipeTaskContext, int batchId, S
3333

3434
@Override
3535
protected Boolean internalRun() throws Exception {
36+
waitForFreeMemory();
3637
process(batchMarkersPair.getLeft(), batchMarkersPair.getRight());
3738
return Boolean.TRUE;
3839
}
@@ -56,6 +57,9 @@ private void process(Object lastValue, Object nextValue) throws Exception {
5657
lastValue, nextValue, pageSize);
5758
}
5859
DataSet page = adapter.getBatchOrderedByColumn(ctx.getMigrationContext(), queryDefinition);
60+
61+
profileData(ctx, batchId, table, pageSize, page);
62+
5963
getPipeTaskContext().getRecorder().record(PerformanceUnit.ROWS, pageSize);
6064
getPipeTaskContext().getPipe().put(MaybeFinished.of(page));
6165
}

commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/task/BatchOffsetDataReaderTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,15 @@
1212
import com.sap.cx.boosters.commercedbsync.context.CopyContext;
1313
import com.sap.cx.boosters.commercedbsync.dataset.DataSet;
1414
import com.sap.cx.boosters.commercedbsync.performance.PerformanceUnit;
15-
1615
import java.util.Set;
1716
import java.util.stream.Collectors;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
1819

1920
public class BatchOffsetDataReaderTask extends DataReaderTask {
2021

22+
private static final Logger LOG = LoggerFactory.getLogger(BatchOffsetDataReaderTask.class);
23+
2124
private final long offset;
2225
private final Set<String> batchColumns;
2326
private final int batchId;
@@ -32,6 +35,7 @@ public BatchOffsetDataReaderTask(PipeTaskContext pipeTaskContext, int batchId, l
3235

3336
@Override
3437
protected Boolean internalRun() throws Exception {
38+
waitForFreeMemory();
3539
process();
3640
return Boolean.TRUE;
3741
}
@@ -50,7 +54,9 @@ private void process() throws Exception {
5054
queryDefinition.setDeletionEnabled(context.getMigrationContext().isDeletionEnabled());
5155
queryDefinition.setLpTableEnabled(context.getMigrationContext().isLpTableMigrationEnabled());
5256
DataSet result = adapter.getBatchWithoutIdentifier(context.getMigrationContext(), queryDefinition);
57+
profileData(context, batchId, table, pageSize, result);
5358
getPipeTaskContext().getRecorder().record(PerformanceUnit.ROWS, result.getAllResults().size());
5459
getPipeTaskContext().getPipe().put(MaybeFinished.of(result));
5560
}
61+
5662
}

commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/task/DataReaderTask.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,19 @@
66

77
package com.sap.cx.boosters.commercedbsync.concurrent.impl.task;
88

9+
import com.sap.cx.boosters.commercedbsync.concurrent.PipeAbortedException;
10+
import com.sap.cx.boosters.commercedbsync.constants.CommercedbsyncConstants;
11+
import com.sap.cx.boosters.commercedbsync.context.CopyContext;
12+
import com.sap.cx.boosters.commercedbsync.dataset.DataSet;
13+
import de.hybris.platform.core.MasterTenant;
14+
import org.openjdk.jol.info.GraphLayout;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
918
public abstract class DataReaderTask extends RetriableTask {
1019

20+
private static final Logger LOG = LoggerFactory.getLogger(DataReaderTask.class);
21+
1122
private final PipeTaskContext pipeTaskContext;
1223

1324
protected DataReaderTask(PipeTaskContext pipeTaskContext) {
@@ -18,4 +29,34 @@ protected DataReaderTask(PipeTaskContext pipeTaskContext) {
1829
public PipeTaskContext getPipeTaskContext() {
1930
return pipeTaskContext;
2031
}
32+
33+
protected void waitForFreeMemory() throws Exception {
34+
CopyContext context = getPipeTaskContext().getContext();
35+
final long minMem = context.getMigrationContext().getMemoryMin();
36+
long freeMem = Runtime.getRuntime().freeMemory();
37+
38+
int cnt = 0;
39+
while (freeMem < minMem) {
40+
LOG.trace("Waiting for freeMem {} / {} Attempts={}", freeMem, minMem, cnt);
41+
Thread.sleep(context.getMigrationContext().getMemoryWait());
42+
cnt++;
43+
if (cnt >= context.getMigrationContext().getMemoryMaxAttempts()) {
44+
throw new PipeAbortedException("Maximum wait time exceeded. See property "
45+
+ CommercedbsyncConstants.MIGRATION_PROFILING_MEMORY_ATTEMPTS + " for more details.");
46+
}
47+
freeMem = Runtime.getRuntime().freeMemory();
48+
}
49+
}
50+
51+
protected void profileData(final CopyContext context, final int batchId, final String table, final long pageSize,
52+
final DataSet result) {
53+
if (context.getMigrationContext().isProfiling() && result != null) {
54+
final long objSize = GraphLayout.parseInstance(result.getAllResults()).totalSize();
55+
final long freeMem = Runtime.getRuntime().freeMemory();
56+
final int clusterID = MasterTenant.getInstance().getClusterID();
57+
LOG.trace(
58+
"Memory usage: [{}], Table = {}, BatchId = {}, Page Size = {}, Batch Memory Size = {}, Free System Memory = {}",
59+
clusterID, table, batchId, pageSize, objSize, freeMem);
60+
}
61+
}
2162
}

commercedbsync/src/com/sap/cx/boosters/commercedbsync/concurrent/impl/task/DefaultDataReaderTask.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public DefaultDataReaderTask(PipeTaskContext pipeTaskContext) {
1919

2020
@Override
2121
protected Boolean internalRun() throws Exception {
22+
waitForFreeMemory();
2223
process();
2324
return Boolean.TRUE;
2425
}
@@ -27,6 +28,8 @@ private void process() throws Exception {
2728
MigrationContext migrationContext = getPipeTaskContext().getContext().getMigrationContext();
2829
DataSet all = getPipeTaskContext().getDataRepositoryAdapter().getAll(migrationContext,
2930
getPipeTaskContext().getTable());
31+
profileData(getPipeTaskContext().getContext(), -1, getPipeTaskContext().getTable(),
32+
getPipeTaskContext().getPageSize(), all);
3033
getPipeTaskContext().getRecorder().record(PerformanceUnit.ROWS, all.getAllResults().size());
3134
getPipeTaskContext().getPipe().put(MaybeFinished.of(all));
3235
}

commercedbsync/src/com/sap/cx/boosters/commercedbsync/constants/CommercedbsyncConstants.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ public final class CommercedbsyncConstants extends GeneratedCommercedbsyncConsta
5858
public static final String MIGRATION_FILE_STORAGE_CONTAINER_NAME = "migration.data.filestorage.container.name";
5959
public static final String MIGRATION_INPUT_PROFILES = "migration.input.profiles";
6060
public static final String MIGRATION_OUTPUT_PROFILES = "migration.output.profiles";
61+
public static final String MIGRATION_PROFILING = "migration.profiling";
62+
public static final String MIGRATION_PROFILING_MEMORY_MIN = "migration.memory.min";
63+
public static final String MIGRATION_PROFILING_MEMORY_ATTEMPTS = "migration.memory.attempts";
64+
public static final String MIGRATION_PROFILING_MEMORY_WAIT = "migration.memory.wait";
6165

6266
public static final String MIGRATION_DATA_READTASK_KEEPALIVE_SECONDS = "migration.data.readtask.keepaliveseconds";
6367
public static final String MIGRATION_DATA_READTASK_QUEUE_CAPACITY = "migration.data.readtask.queuecapacity";

commercedbsync/src/com/sap/cx/boosters/commercedbsync/context/MigrationContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ public interface MigrationContext {
108108

109109
void setFullDatabaseMigrationEnabled(boolean enabled);
110110

111+
boolean isProfiling();
112+
113+
long getMemoryMin();
114+
115+
int getMemoryMaxAttempts();
116+
117+
int getMemoryWait();
118+
111119
void refreshSelf();
112120

113121
/**

commercedbsync/src/com/sap/cx/boosters/commercedbsync/context/impl/DefaultMigrationContext.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,26 @@ public Set<String> getOutputProfiles() {
282282
return getListProperty(CommercedbsyncConstants.MIGRATION_OUTPUT_PROFILES);
283283
}
284284

285+
@Override
286+
public boolean isProfiling() {
287+
return getBooleanProperty(CommercedbsyncConstants.MIGRATION_PROFILING);
288+
}
289+
290+
@Override
291+
public long getMemoryMin() {
292+
return getLongProperty(CommercedbsyncConstants.MIGRATION_PROFILING_MEMORY_MIN);
293+
}
294+
295+
@Override
296+
public int getMemoryMaxAttempts() {
297+
return getNumericProperty(CommercedbsyncConstants.MIGRATION_PROFILING_MEMORY_ATTEMPTS);
298+
}
299+
300+
@Override
301+
public int getMemoryWait() {
302+
return getNumericProperty(CommercedbsyncConstants.MIGRATION_PROFILING_MEMORY_WAIT);
303+
}
304+
285305
@Override
286306
public boolean isDeletionEnabled() {
287307
return this.deletionEnabled;
@@ -322,6 +342,10 @@ protected int getNumericProperty(final String key) {
322342
return configuration.getInt(key);
323343
}
324344

345+
protected long getLongProperty(final String key) {
346+
return configuration.getLong(key);
347+
}
348+
325349
protected String getStringProperty(final String key) {
326350
return configuration.getString(key);
327351
}

0 commit comments

Comments
 (0)