Skip to content

Commit 8e8a81e

Browse files
[GLUTEN-10933][VL] feat: Support cached the batches in cpu cache (#11758)
Cache the batch in cpu cache, and wait for the join threads to fetch one by one, the build threads will start to fetch as soon as possible, but the probe thread need to wait for build finished. The buffer size is controlled by `spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes` temporally, the size may be changed by the remaining memory in the server. Test: Test in local SF100, adjust the config to enable caching batch. ``` --conf spark.gluten.sql.columnar.backend.velox.cudf.batchSize=10000 \ --conf spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes=1024MB ``` The log prints `Prefetched 171 batches (24057900 bytes) before blocking on GPU lock` Next step: Prefetch the probe side batch when build starts. Related issue: #10933
1 parent e0b6081 commit 8e8a81e

File tree

13 files changed

+110
-24
lines changed

13 files changed

+110
-24
lines changed

backends-velox/src/main/java/org/apache/gluten/utils/GpuBufferBatchResizerJniWrapper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,6 @@ public long rtHandle() {
3636
return runtime.getHandle();
3737
}
3838

39-
public native long create(int minOutputBatchSize, ColumnarBatchInIterator itr);
39+
public native long create(
40+
int minOutputBatchSize, long maxPrefetchBatchBytes, ColumnarBatchInIterator itr);
4041
}

backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
8686

8787
def cudfBatchSize: Int = getConf(CUDF_BATCH_SIZE)
8888

89+
def cudfShuffleMaxPrefetchBytes: Long = getConf(CUDF_SHUFFLE_MAX_PREFETCH_BYTES)
90+
8991
def orcUseColumnNames: Boolean = getConf(ORC_USE_COLUMN_NAMES)
9092

9193
def parquetUseColumnNames: Boolean = getConf(PARQUET_USE_COLUMN_NAMES)
@@ -685,6 +687,13 @@ object VeloxConfig extends ConfigRegistry {
685687
.intConf
686688
.createWithDefault(Integer.MAX_VALUE)
687689

690+
val CUDF_SHUFFLE_MAX_PREFETCH_BYTES =
691+
buildConf("spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes")
692+
.doc("Maximum bytes to prefetch in CPU memory during GPU shuffle read while waiting" +
693+
"for GPU available.")
694+
.bytesConf(ByteUnit.BYTE)
695+
.createWithDefaultString("1028MB")
696+
688697
val MEMORY_DUMP_ON_EXIT =
689698
buildConf("spark.gluten.monitor.memoryDumpOnExit")
690699
.internal()

backends-velox/src/main/scala/org/apache/gluten/execution/GpuResizeBufferColumnarBatchExec.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ import scala.collection.JavaConverters._
3434
/**
3535
* An operator to resize input BufferBatches generated by shuffle reader, and convert to cudf table.
3636
*/
37-
case class GpuResizeBufferColumnarBatchExec(override val child: SparkPlan, minOutputBatchSize: Int)
37+
case class GpuResizeBufferColumnarBatchExec(
38+
override val child: SparkPlan,
39+
minOutputBatchSize: Int,
40+
maxPrefetchBatchBytes: Long)
3841
extends ColumnarToColumnarExec(child) {
3942

4043
override protected def mapIterator(in: Iterator[ColumnarBatch]): Iterator[ColumnarBatch] = {
@@ -44,6 +47,7 @@ case class GpuResizeBufferColumnarBatchExec(override val child: SparkPlan, minOu
4447
.create(runtime)
4548
.create(
4649
minOutputBatchSize,
50+
maxPrefetchBatchBytes,
4751
new ColumnarBatchInIterator(BackendsApiManager.getBackendName, in.asJava))
4852
new ColumnarBatchOutIterator(runtime, outHandle).asScala
4953
}

backends-velox/src/main/scala/org/apache/gluten/extension/GpuBufferBatchResizeForShuffleInputOutput.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ case class GpuBufferBatchResizeForShuffleInputOutput() extends Rule[SparkPlan] {
3636
val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange
3737
val preferredBatchBytes = VeloxConfig.get.veloxPreferredBatchBytes
3838
val batchSize = VeloxConfig.get.cudfBatchSize
39+
val prefetchBatchBytes = VeloxConfig.get.cudfShuffleMaxPrefetchBytes
3940
plan.transformUp {
4041
case shuffle: ColumnarShuffleExchangeExec
4142
if shuffle.shuffleWriterType == HashShuffleWriterType &&
@@ -46,36 +47,38 @@ case class GpuBufferBatchResizeForShuffleInputOutput() extends Rule[SparkPlan] {
4647
case a @ AQEShuffleReadExec(
4748
ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _),
4849
_) =>
49-
GpuResizeBufferColumnarBatchExec(a, batchSize)
50+
GpuResizeBufferColumnarBatchExec(a, batchSize, prefetchBatchBytes)
5051
case a @ AQEShuffleReadExec(
5152
ShuffleQueryStageExec(_, ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase), _),
5253
_) =>
53-
GpuResizeBufferColumnarBatchExec(a, batchSize)
54+
GpuResizeBufferColumnarBatchExec(a, batchSize, prefetchBatchBytes)
5455
// Since it's transformed in a bottom to up order, so we may first encounter
5556
// ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec),
5657
// then we see AQEShuffleReadExec
5758
case a @ AQEShuffleReadExec(
5859
GpuResizeBufferColumnarBatchExec(
5960
s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _),
61+
_,
6062
_),
6163
_) =>
62-
GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize)
64+
GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize, prefetchBatchBytes)
6365
case a @ AQEShuffleReadExec(
6466
GpuResizeBufferColumnarBatchExec(
6567
s @ ShuffleQueryStageExec(
6668
_,
6769
ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase),
6870
_),
71+
_,
6972
_),
7073
_) =>
71-
GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize)
74+
GpuResizeBufferColumnarBatchExec(a.copy(child = s), batchSize, prefetchBatchBytes)
7275
case s @ ShuffleQueryStageExec(_, _: ColumnarShuffleExchangeExecBase, _) =>
73-
GpuResizeBufferColumnarBatchExec(s, batchSize)
76+
GpuResizeBufferColumnarBatchExec(s, batchSize, prefetchBatchBytes)
7477
case s @ ShuffleQueryStageExec(
7578
_,
7679
ReusedExchangeExec(_, _: ColumnarShuffleExchangeExecBase),
7780
_) =>
78-
GpuResizeBufferColumnarBatchExec(s, batchSize)
81+
GpuResizeBufferColumnarBatchExec(s, batchSize, prefetchBatchBytes)
7982
}
8083
}
8184
}

cpp/velox/config/VeloxConfig.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,8 @@ const std::string kVeloxPreferredBatchBytes = "spark.gluten.sql.columnar.backend
208208
const std::string kCudfEnableTableScan = "spark.gluten.sql.columnar.backend.velox.cudf.enableTableScan";
209209
const bool kCudfEnableTableScanDefault = false;
210210
const std::string kCudfHiveConnectorId = "cudf-hive";
211+
const std::string kCudfShuffleMaxPrefetchBytes = "spark.gluten.sql.columnar.backend.velox.cudf.shuffleMaxPrefetchBytes";
212+
const int64_t kCudfShuffleMaxPrefetchBytesDefault = 1028L * 1024 * 1024; // 1028MB
211213

212214
const std::string kStaticBackendConfPrefix = "spark.gluten.velox.";
213215
const std::string kDynamicBackendConfPrefix = "spark.gluten.sql.columnar.backend.velox.";

cpp/velox/cudf/GpuLock.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,19 @@ void lockGpu() {
5555
getGpuLockState().gGpuOwner = tid;
5656
}
5757

58+
bool tryLockGpu() {
59+
std::thread::id tid = std::this_thread::get_id();
60+
std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);
61+
if (getGpuLockState().gGpuOwner == tid) {
62+
return true;
63+
}
64+
if (getGpuLockState().gGpuOwner.has_value()) {
65+
return false;
66+
}
67+
getGpuLockState().gGpuOwner = tid;
68+
return true;
69+
}
70+
5871
void unlockGpu() {
5972
std::thread::id tid = std::this_thread::get_id();
6073
std::unique_lock<std::mutex> lock(getGpuLockState().gGpuMutex);

cpp/velox/cudf/GpuLock.h

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,9 @@
2121

2222
namespace gluten {
2323

24-
/**
25-
* @brief Acquire the GPU lock (reentrant within the same thread)
26-
*/
24+
/// Acquire the GPU lock, blocking until available. Reentrant for the same thread.
2725
void lockGpu();
28-
29-
/**
30-
* @brief Release the GPU lock (must be called by the owning thread)
31-
*/
26+
bool tryLockGpu();
3227
void unlockGpu();
3328

3429
} // namespace gluten

cpp/velox/jni/VeloxJniWrapper.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,14 +454,15 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_utils_GpuBufferBatchResizerJniWra
454454
JNIEnv* env,
455455
jobject wrapper,
456456
jint minOutputBatchSize,
457+
jlong maxPrefetchBatchBytes,
457458
jobject jIter) {
458459
JNI_METHOD_START
459460
auto ctx = getRuntime(env, wrapper);
460461
auto arrowPool = dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->defaultArrowMemoryPool();
461462
auto pool = dynamic_cast<VeloxMemoryManager*>(ctx->memoryManager())->getLeafMemoryPool();
462463
auto iter = makeJniColumnarBatchIterator(env, jIter, ctx);
463464
auto appender = std::make_shared<ResultIterator>(
464-
std::make_unique<GpuBufferBatchResizer>(arrowPool, pool.get(), minOutputBatchSize, std::move(iter)));
465+
std::make_unique<GpuBufferBatchResizer>(arrowPool, pool.get(), minOutputBatchSize, maxPrefetchBatchBytes, std::move(iter)));
465466
return ctx->saveObject(appender);
466467
JNI_METHOD_END(kInvalidObjectHandle)
467468
}

cpp/velox/memory/GpuBufferColumnarBatch.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ int64_t GpuBufferColumnarBatch::numBytes() {
5454
if (!numBytes_.has_value()) {
5555
int64_t bytes = 0;
5656
for (const auto& buffer : buffers_) {
57-
bytes += buffer->size();
57+
if (buffer) {
58+
bytes += buffer->size();
59+
}
5860
}
5961
numBytes_ = bytes;
6062
}

cpp/velox/tests/VeloxGpuShuffleWriterTest.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ RowVectorPtr mergeBufferColumnarBatches(std::vector<std::shared_ptr<GpuBufferCol
116116
getDefaultMemoryManager()->defaultArrowMemoryPool(),
117117
getDefaultMemoryManager()->getLeafMemoryPool().get(),
118118
1200, // output one batch
119+
102400000, // prefetch up to 100MB data
119120
std::make_unique<ColumnarBatchArray>(bufferBatches));
120121
auto cb = resizer.next();
121122
auto batch = std::dynamic_pointer_cast<VeloxColumnarBatch>(cb);

0 commit comments

Comments
 (0)