Skip to content

Commit a3c973f

Browse files
[MINOR] Respect Spark bloom filter config (#11561)
Respect Spark four configs, if not exists, Velox will use its default config value. ``` spark.sql.optimizer.runtime.bloomFilter.expectedNumItems spark.sql.optimizer.runtime.bloomFilter.maxNumItems spark.sql.optimizer.runtime.bloomFilter.numBits spark.sql.optimizer.runtime.bloomFilter.maxNumBits ```
1 parent 6bc71d6 commit a3c973f

File tree

6 files changed

+36
-45
lines changed

6 files changed

+36
-45
lines changed

backends-velox/src/main/scala/org/apache/gluten/config/VeloxConfig.scala

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ class VeloxConfig(conf: SQLConf) extends GlutenConfig(conf) {
5151
ResizeRange(minSize, Int.MaxValue)
5252
}
5353

54-
def veloxBloomFilterMaxNumBits: Long = getConf(COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS)
55-
5654
def castFromVarcharAddTrimNode: Boolean = getConf(CAST_FROM_VARCHAR_ADD_TRIM_NODE)
5755

5856
def enableVeloxFlushablePartialAggregation: Boolean =
@@ -441,30 +439,6 @@ object VeloxConfig extends ConfigRegistry {
441439
.intConf
442440
.createWithDefault(100000)
443441

444-
val COLUMNAR_VELOX_BLOOM_FILTER_EXPECTED_NUM_ITEMS =
445-
buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems")
446-
.doc(
447-
"The default number of expected items for the velox bloomfilter: " +
448-
"'spark.bloom_filter.expected_num_items'")
449-
.longConf
450-
.createWithDefault(1000000L)
451-
452-
val COLUMNAR_VELOX_BLOOM_FILTER_NUM_BITS =
453-
buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits")
454-
.doc(
455-
"The default number of bits to use for the velox bloom filter: " +
456-
"'spark.bloom_filter.num_bits'")
457-
.longConf
458-
.createWithDefault(8388608L)
459-
460-
val COLUMNAR_VELOX_BLOOM_FILTER_MAX_NUM_BITS =
461-
buildConf("spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits")
462-
.doc(
463-
"The max number of bits to use for the velox bloom filter: " +
464-
"'spark.bloom_filter.max_num_bits'")
465-
.longConf
466-
.createWithDefault(4194304L)
467-
468442
val HASH_PROBE_BLOOM_FILTER_PUSHDOWN_MAX_SIZE =
469443
buildConf("spark.gluten.sql.columnar.backend.velox.hashProbe.bloomFilterPushdown.maxSize")
470444
.doc("The maximum byte size of Bloom filter that can be generated from hash probe. When " +

cpp/velox/compute/WholeStageResultIterator.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -639,17 +639,25 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
639639
} else {
640640
configs[velox::core::QueryConfig::kSpillCompressionKind] = "none";
641641
}
642-
configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] =
643-
std::to_string(veloxCfg_->get<int64_t>(kBloomFilterExpectedNumItems, 1000000));
644-
configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] =
645-
std::to_string(veloxCfg_->get<int64_t>(kBloomFilterNumBits, 8388608));
646-
configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumBits] =
647-
std::to_string(veloxCfg_->get<int64_t>(kBloomFilterMaxNumBits, 4194304));
648642

649643
configs[velox::core::QueryConfig::kHashProbeDynamicFilterPushdownEnabled] =
650644
std::to_string(veloxCfg_->get<bool>(kHashProbeDynamicFilterPushdownEnabled, true));
651645
configs[velox::core::QueryConfig::kHashProbeBloomFilterPushdownMaxSize] =
652646
std::to_string(veloxCfg_->get<uint64_t>(kHashProbeBloomFilterPushdownMaxSize, 0));
647+
648+
if (const auto opt = veloxCfg_->get<std::string>(kSparkBloomFilterExpectedNumItems)) {
649+
configs[velox::core::QueryConfig::kSparkBloomFilterExpectedNumItems] = opt.value();
650+
}
651+
if (const auto opt = veloxCfg_->get<std::string>(kSparkBloomFilterNumBits)) {
652+
configs[velox::core::QueryConfig::kSparkBloomFilterNumBits] = opt.value();
653+
}
654+
if (const auto opt = veloxCfg_->get<std::string>(kSparkBloomFilterMaxNumBits)) {
655+
// Velox will check memory cannot exceed 4194304.
656+
configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumBits] = opt.value();
657+
}
658+
if (const auto opt = veloxCfg_->get<std::string>(kSparkBloomFilterMaxNumItems)) {
659+
configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumItems] = opt.value();
660+
}
653661
// spark.gluten.sql.columnar.backend.velox.SplitPreloadPerDriver takes no effect if
654662
// spark.gluten.sql.columnar.backend.velox.IOThreads is set to 0
655663
configs[velox::core::QueryConfig::kMaxSplitPreloadPerDriver] =

cpp/velox/config/VeloxConfig.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ const std::string kAbandonDedupHashMapMinRows = "spark.gluten.velox.abandonDedup
6767
const std::string kAbandonDedupHashMapMinPct = "spark.gluten.velox.abandonDedupHashMap.minPct";
6868

6969
// execution
70+
const std::string kSparkBloomFilterExpectedNumItems = "spark.sql.optimizer.runtime.bloomFilter.expectedNumItems";
71+
const std::string kSparkBloomFilterNumBits = "spark.sql.optimizer.runtime.bloomFilter.numBits";
72+
const std::string kSparkBloomFilterMaxNumBits = "spark.sql.optimizer.runtime.bloomFilter.maxNumBits";
73+
const std::string kSparkBloomFilterMaxNumItems = "spark.sql.optimizer.runtime.bloomFilter.maxNumItems";
7074
const std::string kBloomFilterExpectedNumItems = "spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems";
7175
const std::string kBloomFilterNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits";
7276
const std::string kBloomFilterMaxNumBits = "spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits";

docs/velox-configuration.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ nav_order: 16
1616
| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinPct | 90 | If partial aggregation aggregationPct greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. |
1717
| spark.gluten.sql.columnar.backend.velox.abandonPartialAggregationMinRows | 100000 | If partial aggregation input rows number greater than this value, partial aggregation may be early abandoned. Note: this option only works when flushable partial aggregation is enabled. Ignored when spark.gluten.sql.columnar.backend.velox.flushablePartialAggregation=false. |
1818
| spark.gluten.sql.columnar.backend.velox.asyncTimeoutOnTaskStopping | 30000ms | Timeout for asynchronous execution when task is being stopped in Velox backend. It's recommended to set to a number larger than network connection timeout that the possible aysnc tasks are relying on. |
19-
| spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems | 1000000 | The default number of expected items for the velox bloomfilter: 'spark.bloom_filter.expected_num_items' |
20-
| spark.gluten.sql.columnar.backend.velox.bloomFilter.maxNumBits | 4194304 | The max number of bits to use for the velox bloom filter: 'spark.bloom_filter.max_num_bits' |
21-
| spark.gluten.sql.columnar.backend.velox.bloomFilter.numBits | 8388608 | The default number of bits to use for the velox bloom filter: 'spark.bloom_filter.num_bits' |
2219
| spark.gluten.sql.columnar.backend.velox.broadcastHashTableBuildThreads | 1 | The number of threads used to build the broadcast hash table. If not set or set to 0, it will use the default number of threads (available processors). |
2320
| spark.gluten.sql.columnar.backend.velox.cacheEnabled | false | Enable Velox cache, default off. It's recommended to enablesoft-affinity as well when enable velox cache. |
2421
| spark.gluten.sql.columnar.backend.velox.cachePrefetchMinPct | 0 | Set prefetch cache min pct for velox file scan |

docs/velox-spark-configuration.md

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ title: Spark configurations status in Gluten Velox Backend
33
nav_order: 17
44

55
The file lists the if Spark configurations are hornored by Gluten velox backend or not. Table is from Spark4.0 configuration page. The status are:
6-
- H: hornored
7-
- P: Transparent to Gluten
8-
- I: ignored. Gluten doesn't use it.
6+
- ✅ Supported<br>
7+
- ❌ Not Supported<br>
8+
- ⚠️ Partial Support<br>
9+
- 🔄 In Progress<br>
10+
- 🚫 Not applied or transparent to Gluten<br>
911
- `<blank>`: unknown yet
1012

1113

@@ -2548,49 +2550,49 @@ These configurations are handled by Spark and do not affect Gluten’s behavior.
25482550
<td><code>spark.sql.optimizer.runtime.bloomFilter.applicationSideScanSizeThreshold</code></td>
25492551
<td>10GB</td>
25502552
<td>3.3.0</td>
2551-
<td></td>
2553+
<td>🚫</td>
25522554
</tr>
25532555

25542556
<tr>
25552557
<td><code>spark.sql.optimizer.runtime.bloomFilter.creationSideThreshold</code></td>
25562558
<td>10MB</td>
25572559
<td>3.3.0</td>
2558-
<td></td>
2560+
<td>🚫</td>
25592561
</tr>
25602562

25612563
<tr>
25622564
<td><code>spark.sql.optimizer.runtime.bloomFilter.enabled</code></td>
25632565
<td>true</td>
25642566
<td>3.3.0</td>
2565-
<td></td>
2567+
<td></td>
25662568
</tr>
25672569

25682570
<tr>
25692571
<td><code>spark.sql.optimizer.runtime.bloomFilter.expectedNumItems</code></td>
25702572
<td>1000000</td>
25712573
<td>3.3.0</td>
2572-
<td></td>
2574+
<td></td>
25732575
</tr>
25742576

25752577
<tr>
25762578
<td><code>spark.sql.optimizer.runtime.bloomFilter.maxNumBits</code></td>
25772579
<td>67108864</td>
25782580
<td>3.3.0</td>
2579-
<td></td>
2581+
<td></td>
25802582
</tr>
25812583

25822584
<tr>
25832585
<td><code>spark.sql.optimizer.runtime.bloomFilter.maxNumItems</code></td>
25842586
<td>4000000</td>
25852587
<td>3.3.0</td>
2586-
<td></td>
2588+
<td></td>
25872589
</tr>
25882590

25892591
<tr>
25902592
<td><code>spark.sql.optimizer.runtime.bloomFilter.numBits</code></td>
25912593
<td>8388608</td>
25922594
<td>3.3.0</td>
2593-
<td></td>
2595+
<td></td>
25942596
</tr>
25952597

25962598
<tr>
@@ -3539,3 +3541,5 @@ These configurations are handled by Spark and do not affect Gluten’s behavior.
35393541

35403542

35413543

3544+
3545+

gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,10 @@ object GlutenConfig extends ConfigRegistry {
472472
SQLConf.LEGACY_SIZE_OF_NULL.key,
473473
SQLConf.LEGACY_STATISTICAL_AGGREGATE.key,
474474
SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key,
475+
SQLConf.RUNTIME_BLOOM_FILTER_EXPECTED_NUM_ITEMS.key,
476+
SQLConf.RUNTIME_BLOOM_FILTER_NUM_BITS.key,
477+
SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_BITS.key,
478+
SQLConf.RUNTIME_BLOOM_FILTER_MAX_NUM_ITEMS.key,
475479
"spark.io.compression.codec",
476480
"spark.sql.decimalOperations.allowPrecisionLoss",
477481
"spark.gluten.sql.columnar.backend.velox.bloomFilter.expectedNumItems",

0 commit comments

Comments
 (0)