Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class ShufflePartitionedData {
private final ShufflePartitionedBlock[] blockList;
private final long totalBlockEncodedLength;
private final long totalBlockDataLength;
private int duplicateBlockCount;
private long duplicateBlockSize;

public ShufflePartitionedData(
int partitionId, long encodedLength, long dataLength, ShufflePartitionedBlock[] blockList) {
Expand Down Expand Up @@ -80,4 +82,20 @@ public long getTotalBlockEncodedLength() {
public long getTotalBlockDataLength() {
return totalBlockDataLength;
}

public int getDuplicateBlockCount() {
return duplicateBlockCount;
}

public void setDuplicateBlockCount(int duplicateBlockCount) {
this.duplicateBlockCount = duplicateBlockCount;
}

public long getDuplicateBlockSize() {
return duplicateBlockSize;
}

public void setDuplicateBlockSize(long duplicateBlockSize) {
this.duplicateBlockSize = duplicateBlockSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ public long getDataLength() {
return dataLength;
}

public long getBlockCount() {
return shuffleBlocks.size();
}

public String getAppId() {
return appId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,8 +458,7 @@ public void sendShuffleData(
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
manager.updateCachedBlockIds(
appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
manager.updateCachedBlockIds(appId, shuffleId, spd.getPartitionId(), spd);
}
} catch (ExceedHugePartitionHardLimitException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public class ShuffleServerMetrics {
private static final String ALLOCATED_BUFFER_SIZE = "allocated_buffer_size";
private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size";
private static final String USED_BUFFER_SIZE = "used_buffer_size";
private static final String TOTAL_IN_MEMORY_BLOCK_COUNT = "total_in_memory_block_count";
private static final String TOTAL_IN_FLUSH_BLOCK_COUNT = "total_in_flush_block_count";
private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size";
public static final String USED_DIRECT_MEMORY_SIZE = "used_direct_memory_size";
public static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY = "used_direct_memory_size_by_netty";
Expand Down Expand Up @@ -158,6 +160,10 @@ public class ShuffleServerMetrics {
public static final String TOPN_OF_TOTAL_DATA_SIZE_FOR_APP = "topN_of_total_data_size_for_app";
public static final String TOPN_OF_IN_MEMORY_DATA_SIZE_FOR_APP =
"topN_of_in_memory_data_size_for_app";
public static final String TOPN_OF_IN_MEMORY_BLOCK_COUNT_FOR_APP =
"topN_of_in_memory_block_count_for_app";
public static final String BOTTOMN_OF_IN_MEMORY_AVG_BLOCK_SIZE_FOR_APP =
"bottomN_of_in_memory_avg_block_size_for_app";
public static final String TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP =
"topN_of_on_localfile_data_size_for_app";
public static final String TOPN_OF_ON_HADOOP_DATA_SIZE_FOR_APP =
Expand Down Expand Up @@ -239,6 +245,8 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeAllocatedBufferSize;
public static Gauge.Child gaugeInFlushBufferSize;
public static Gauge.Child gaugeUsedBufferSize;
public static Gauge.Child gaugeTotalInMemoryBlockCount;
public static Gauge.Child gaugeTotalInFlushBlockCount;
public static Gauge.Child gaugeReadBufferUsedSize;
public static Gauge.Child gaugeWriteHandler;
public static Gauge.Child gaugeMergeEventQueueSize;
Expand All @@ -257,6 +265,8 @@ public class ShuffleServerMetrics {

public static Gauge gaugeTotalDataSizeUsage;
public static Gauge gaugeInMemoryDataSizeUsage;
public static Gauge gaugeInMemoryBlockCount;
public static Gauge gaugeInMemoryAvgBlockSize;
public static Gauge gaugeOnDiskDataSizeUsage;
public static Gauge gaugeOnHadoopDataSizeUsage;

Expand Down Expand Up @@ -477,6 +487,8 @@ private static void setUpMetrics(ShuffleServerConf serverConf) {
gaugeAllocatedBufferSize = metricsManager.addLabeledGauge(ALLOCATED_BUFFER_SIZE);
gaugeInFlushBufferSize = metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
gaugeTotalInMemoryBlockCount = metricsManager.addLabeledGauge(TOTAL_IN_MEMORY_BLOCK_COUNT);
gaugeTotalInFlushBlockCount = metricsManager.addLabeledGauge(TOTAL_IN_FLUSH_BLOCK_COUNT);
gaugeReadBufferUsedSize = metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
gaugeMergeEventQueueSize = metricsManager.addLabeledGauge(MERGE_EVENT_QUEUE_SIZE);
Expand Down Expand Up @@ -536,6 +548,20 @@ private static void setUpMetrics(ShuffleServerConf serverConf) {
.labelNames("app_id")
.register(metricsManager.getCollectorRegistry());

gaugeInMemoryBlockCount =
Gauge.build()
.name(TOPN_OF_IN_MEMORY_BLOCK_COUNT_FOR_APP)
.help("top N of in memory shuffle block count for app level")
.labelNames("app_id")
.register(metricsManager.getCollectorRegistry());

gaugeInMemoryAvgBlockSize =
Gauge.build()
.name(BOTTOMN_OF_IN_MEMORY_AVG_BLOCK_SIZE_FOR_APP)
.help("bottom N of in memory shuffle average block size for app level")
.labelNames("app_id")
.register(metricsManager.getCollectorRegistry());

gaugeOnDiskDataSizeUsage =
Gauge.build()
.name(TOPN_OF_ON_LOCALFILE_DATA_SIZE_FOR_APP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class ShuffleTaskInfo {

private final AtomicLong totalDataSize = new AtomicLong(0);
private final AtomicLong inMemoryDataSize = new AtomicLong(0);
private final AtomicLong inMemoryBlockCount = new AtomicLong(0);
private final AtomicLong onLocalFileNum = new AtomicLong(0);
private final AtomicLong onLocalFileDataSize = new AtomicLong(0);
private final AtomicLong onHadoopFileNum = new AtomicLong(0);
Expand Down Expand Up @@ -176,6 +177,19 @@ public long getInMemoryDataSize() {
return inMemoryDataSize.get();
}

public long getInMemoryBlockCount() {
return inMemoryBlockCount.get();
}

public void addInMemoryBlockCount(long delta) {
inMemoryBlockCount.addAndGet(delta);
}

public long getInMemoryAvgBlockSize() {
long blockCount = getInMemoryBlockCount();
return blockCount <= 0 ? Long.MAX_VALUE : getInMemoryDataSize() / blockCount;
}

public long addOnLocalFileDataSize(long delta, boolean isNewlyCreated) {
if (isNewlyCreated) {
onLocalFileNum.incrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,12 +481,14 @@ public int updateAndGetCommitCount(String appId, int shuffleId) {
}

// Only for tests
public void updateCachedBlockIds(String appId, int shuffleId, ShufflePartitionedBlock[] spbs) {
updateCachedBlockIds(appId, shuffleId, 0, spbs);
public void updateCachedBlockIds(
String appId, int shuffleId, ShufflePartitionedData shufflePartitionedData) {
updateCachedBlockIds(appId, shuffleId, 0, shufflePartitionedData);
}

public void updateCachedBlockIds(
String appId, int shuffleId, int partitionId, ShufflePartitionedBlock[] spbs) {
String appId, int shuffleId, int partitionId, ShufflePartitionedData shufflePartitionedData) {
ShufflePartitionedBlock[] spbs = shufflePartitionedData.getBlockList();
if (spbs == null || spbs.length == 0) {
return;
}
Expand All @@ -512,7 +514,12 @@ public void updateCachedBlockIds(
size += spb.getEncodedLength();
}
}
long partitionSize = shuffleTaskInfo.addPartitionDataSize(shuffleId, partitionId, size);
int blockCount = spbs.length - shufflePartitionedData.getDuplicateBlockCount();
shuffleBufferManager.addInMemoryBlockCount(blockCount);
shuffleTaskInfo.addInMemoryBlockCount(blockCount);
long partitionSize =
shuffleTaskInfo.addPartitionDataSize(
shuffleId, partitionId, size - shufflePartitionedData.getDuplicateBlockSize());
HugePartitionUtils.markHugePartition(
shuffleBufferManager, shuffleTaskInfo, shuffleId, partitionId, partitionSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TopNShuffleDataSizeOfAppCalcTask {

private final Gauge gaugeTotalDataSize;
private final Gauge gaugeInMemoryDataSize;
private final Gauge gaugeInMemoryBlockCount;
private final Gauge gaugeInMemoryAvgBlockSize;
private final Gauge gaugeOnLocalFileDataSize;
private final Gauge gaugeOnHadoopDataSize;

Expand All @@ -50,6 +52,8 @@ public TopNShuffleDataSizeOfAppCalcTask(ShuffleTaskManager taskManager, ShuffleS
shuffleTaskManager = taskManager;
this.gaugeTotalDataSize = ShuffleServerMetrics.gaugeTotalDataSizeUsage;
this.gaugeInMemoryDataSize = ShuffleServerMetrics.gaugeInMemoryDataSizeUsage;
this.gaugeInMemoryBlockCount = ShuffleServerMetrics.gaugeInMemoryBlockCount;
this.gaugeInMemoryAvgBlockSize = ShuffleServerMetrics.gaugeInMemoryAvgBlockSize;
this.gaugeOnLocalFileDataSize = ShuffleServerMetrics.gaugeOnDiskDataSizeUsage;
this.gaugeOnHadoopDataSize = ShuffleServerMetrics.gaugeOnHadoopDataSizeUsage;
this.scheduler =
Expand All @@ -72,6 +76,22 @@ private void calcTopNShuffleDataSize() {
.set(taskInfo.getValue().getInMemoryDataSize());
}

topNTaskInfo = calcTopNInMemoryBlockCountTaskInfo();
gaugeInMemoryBlockCount.clear();
for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
gaugeInMemoryBlockCount
.labels(taskInfo.getKey())
.set(taskInfo.getValue().getInMemoryBlockCount());
}

topNTaskInfo = calcBottomNInMemoryAvgBlockSizeTaskInfo();
gaugeInMemoryAvgBlockSize.clear();
for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
gaugeInMemoryAvgBlockSize
.labels(taskInfo.getKey())
.set(taskInfo.getValue().getInMemoryAvgBlockSize());
}

topNTaskInfo = calcTopNOnLocalFileDataSizeTaskInfo();
gaugeOnLocalFileDataSize.clear();
for (Map.Entry<String, ShuffleTaskInfo> taskInfo : topNTaskInfo) {
Expand Down Expand Up @@ -108,6 +128,27 @@ public List<Map.Entry<String, ShuffleTaskInfo>> calcTopNInMemoryDataSizeTaskInfo
.collect(Collectors.toList());
}

public List<Map.Entry<String, ShuffleTaskInfo>> calcTopNInMemoryBlockCountTaskInfo() {
return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
.sorted(
(e1, e2) ->
Long.compare(
e2.getValue().getInMemoryBlockCount(), e1.getValue().getInMemoryBlockCount()))
.limit(topNShuffleDataNumber)
.collect(Collectors.toList());
}

public List<Map.Entry<String, ShuffleTaskInfo>> calcBottomNInMemoryAvgBlockSizeTaskInfo() {
return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
.sorted(
(e1, e2) ->
Long.compare(
e1.getValue().getInMemoryAvgBlockSize(),
e2.getValue().getInMemoryAvgBlockSize()))
.limit(topNShuffleDataNumber)
.collect(Collectors.toList());
}

public List<Map.Entry<String, ShuffleTaskInfo>> calcTopNOnLocalFileDataSizeTaskInfo() {
return shuffleTaskManager.getShuffleTaskInfos().entrySet().stream()
.sorted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.uniffle.server.ShuffleFlushManager;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.server.ShuffleTaskInfo;
import org.apache.uniffle.server.ShuffleTaskManager;
import org.apache.uniffle.server.buffer.lab.ChunkCreator;
import org.apache.uniffle.server.buffer.lab.LABShuffleBufferWithLinkedList;
Expand Down Expand Up @@ -96,6 +97,8 @@ public class ShuffleBufferManager {
protected AtomicLong inFlushSize = new AtomicLong(0L);
protected AtomicLong usedMemory = new AtomicLong(0L);
private AtomicLong readDataMemory = new AtomicLong(0L);
private final AtomicLong inMemoryBlockCount = new AtomicLong(0);
private final AtomicLong inFlushBlockCount = new AtomicLong(0);
// appId -> shuffleId -> partitionId -> ShuffleBuffer to avoid too many appId
protected Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> bufferPool;
// appId -> shuffleId -> shuffle size in buffer
Expand Down Expand Up @@ -496,11 +499,22 @@ protected boolean flushBuffer(
shuffleFlushManager.getDataDistributionType(appId));
if (event != null) {
event.addCleanupCallback(() -> releaseMemory(event.getEncodedLength(), true, false));
event.addCleanupCallback(
() -> {
long blockCount = event.getBlockCount();
ShuffleTaskInfo shuffleTaskInfo = shuffleTaskManager.getShuffleTaskInfo(appId);
if (shuffleTaskInfo != null) {
shuffleTaskInfo.addInMemoryBlockCount(-blockCount);
}
addInMemoryBlockCount(-blockCount);
addInFlushBlockCount(-blockCount);
});
updateShuffleSize(appId, shuffleId, -event.getEncodedLength());
inFlushSize.addAndGet(event.getEncodedLength());
if (isHugePartition) {
event.markOwnedByHugePartition();
}
addInFlushBlockCount(event.getBlockCount());
ShuffleServerMetrics.gaugeInFlushBufferSize.set(inFlushSize.get());
shuffleFlushManager.addToFlushQueue(event);
return true;
Expand All @@ -524,6 +538,16 @@ public void removeBuffer(String appId) {
}
}

public void addInMemoryBlockCount(long delta) {
long blockCount = inMemoryBlockCount.addAndGet(delta);
ShuffleServerMetrics.gaugeTotalInMemoryBlockCount.set(blockCount);
}

public void addInFlushBlockCount(long delta) {
long blockCount = inFlushBlockCount.addAndGet(delta);
ShuffleServerMetrics.gaugeTotalInFlushBlockCount.set(blockCount);
}

public synchronized boolean requireMemory(long size, boolean isPreAllocated) {
if (capacity - usedMemory.get() >= size) {
usedMemory.addAndGet(size);
Expand Down Expand Up @@ -891,6 +915,7 @@ public void removeBufferByShuffleId(String appId, Collection<Integer> shuffleIds
Collection<ShuffleBuffer> buffers = bufferRangeMap.asMapOfRanges().values();
if (buffers != null) {
for (ShuffleBuffer buffer : buffers) {
addInMemoryBlockCount(-buffer.getBlockCount());
// the actual released size by this thread
long releasedSize = buffer.release();
ShuffleServerMetrics.gaugeTotalPartitionNum.dec();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public synchronized long append(ShufflePartitionedData data) {
}
long currentEncodedLength = 0;
long currentDataLength = 0;
int duplicateBlockCount = 0;
long duplicateBlockSize = 0;

for (ShufflePartitionedBlock block : data.getBlockList()) {
// If sendShuffleData retried, we may receive duplicate block. The duplicate
Expand All @@ -65,11 +67,15 @@ public synchronized long append(ShufflePartitionedData data) {
currentEncodedLength += block.getEncodedLength();
currentDataLength += block.getDataLength();
} else {
duplicateBlockCount++;
duplicateBlockSize += block.getEncodedLength();
releaseBlock(block);
}
}
this.encodedLength += currentEncodedLength;
this.dataLength += currentDataLength;
data.setDuplicateBlockCount(duplicateBlockCount);
data.setDuplicateBlockSize(duplicateBlockSize);

return currentEncodedLength;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public synchronized long append(ShufflePartitionedData data) {
}
long currentEncodedLength = 0;
long currentDataLength = 0;
int duplicateBlockCount = 0;
long duplicateBlockSize = 0;

for (ShufflePartitionedBlock block : data.getBlockList()) {
// If sendShuffleData retried, we may receive duplicate block. The duplicate
Expand All @@ -74,11 +76,15 @@ public synchronized long append(ShufflePartitionedData data) {
currentEncodedLength += block.getEncodedLength();
currentDataLength += block.getDataLength();
} else {
duplicateBlockCount++;
duplicateBlockSize += block.getEncodedLength();
releaseBlock(block);
}
}
this.encodedLength += currentEncodedLength;
this.dataLength += currentDataLength;
data.setDuplicateBlockCount(duplicateBlockCount);
data.setDuplicateBlockSize(duplicateBlockSize);

return currentEncodedLength;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ private boolean cachedMergedBlock(ByteBuf byteBuf, long blockId, int length) {
shuffle
.shuffleServer
.getShuffleTaskManager()
.updateCachedBlockIds(appId, shuffle.shuffleId, spd.getPartitionId(), spd.getBlockList());
.updateCachedBlockIds(appId, shuffle.shuffleId, spd.getPartitionId(), spd);
sleepTime = initSleepTime;
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
shuffleTaskManager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
shuffleTaskManager.updateCachedBlockIds(
appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, spd.getPartitionId(), spd);
}
} catch (ExceedHugePartitionHardLimitException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ public void removeShuffleDataWithHdfsTest() throws Exception {
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.requireBuffer(35);
shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList());
shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0);
shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0.getBlockList());
shuffleTaskManager.updateCachedBlockIds(appId, 1, partitionedData0);
shuffleTaskManager.refreshAppId(appId);
shuffleTaskManager.checkResourceStatus();

Expand Down
Loading
Loading