Skip to content

Commit e61e884

Browse files
authored
Per-partition memory pressure threshold (#47)
* Move to per-partition memory pressure thresholds, with a default setting of none. * Rename config to be more descriptive. * Format fix. * Addressing PR feedback.
1 parent 4f09c0b commit e61e884

File tree

3 files changed

+101
-7
lines changed

3 files changed

+101
-7
lines changed

README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,76 @@ This will partition the table by:
216216
- Month extracted from the integer timestamp
217217
- Event type as a string column
218218

219+
## Memory Sizing
220+
221+
The connector uses two distinct memory regions:
222+
223+
| Region | Controlled by | Contents |
224+
|--------|--------------|----------|
225+
| JVM heap | `-Xmx` | Kafka Connect framework, record deserialization, connector bookkeeping |
226+
| Off-heap (direct) | Arrow `RootAllocator` | Arrow columnar buffers (the bulk of memory under load) |
227+
228+
Arrow off-heap memory is **not** subject to `-Xmx`. It is bounded by the OS/container memory limit, which means an under-sized pod will OOMKill before the JVM notices pressure.
229+
230+
### Per-Partition Memory Pressure
231+
232+
The `memory.pressure.per.partition.bytes` setting defines a per-partition threshold for Arrow buffer accumulation. When a single partition's buffered data exceeds this value, that partition is flushed immediately, independent of the normal `flush.size` / `file.size.bytes` / `flush.interval.ms` thresholds.
233+
234+
Setting it to `0` (the default) disables the check entirely.
235+
236+
### Sizing Formula
237+
238+
Given:
239+
- **T** = maximum number of sink tasks that can be assigned to a single worker/pod
240+
- In distributed mode, this is ≤ connector-wide `tasks.max` and depends on how many workers you run and how tasks are balanced.
241+
- **P** = max partitions assigned to a single task
242+
- **M** = `memory.pressure.per.partition.bytes` (per-partition threshold)
243+
- **H** = JVM heap (`-Xmx`)
244+
245+
The worst-case Arrow off-heap usage **per pod** is:
246+
247+
```
248+
arrow_max = T × P × M
249+
```
250+
This formula is per pod/worker and uses **T** as "tasks that can co-reside on a single worker":
251+
- Worst-case placement (all tasks on one worker): `T = tasks.max`.
252+
- With `W` workers and roughly even distribution: `T ≈ ceil(tasks.max / W)`.
253+
254+
255+
Each partition can buffer up to `M` bytes before being force-flushed. The container memory limit must cover both heap and off-heap:
256+
257+
```
258+
container_memory ≥ H + arrow_max + overhead
259+
```
260+
261+
Where `overhead` accounts for JVM metaspace, thread stacks, OS buffers, etc. — typically 512MB–1GB.
262+
263+
### Example
264+
265+
| Parameter | Value |
266+
|-----------|-------|
267+
| `tasks.max` | 3 |
268+
| Partitions per task | 8 |
269+
| `memory.pressure.per.partition.bytes` | 536870912 (512MB) |
270+
| `-Xmx` | 2g |
271+
272+
```
273+
arrow_max = 3 × 8 × 512MB = 12GB
274+
container ≥ 2GB + 12GB + 1GB = 15GB
275+
```
276+
277+
If 15GB is too large, reduce the per-partition threshold or reduce `tasks.max`:
278+
279+
```
280+
# 2 tasks, 256MB threshold:
281+
arrow_max = 2 × 8 × 256MB = 4GB
282+
container ≥ 2GB + 4GB + 1GB = 7GB
283+
```
284+
285+
### Disk Spill as an Alternative
286+
287+
If off-heap memory is constrained, enable `spill.enabled=true` instead. This writes Arrow batches to local disk immediately after conversion, reducing per-task Arrow memory to a few MB at the cost of additional disk I/O. When spilling is enabled, `memory.pressure.per.partition.bytes` has no effect — there is no in-memory accumulation to threshold against.
288+
219289
## Example Connector Config (Kafka Connect REST)
220290

221291
```json

src/main/java/com/inyo/ducklake/connect/DucklakeSinkConfig.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ public class DucklakeSinkConfig extends AbstractConfig {
6161
// DuckLake retry configuration for handling PostgreSQL serialization conflicts
6262
public static final String DUCKLAKE_MAX_RETRY_COUNT = "ducklake.max_retry_count";
6363

64+
// Per-partition memory pressure threshold
65+
public static final String MEMORY_PRESSURE_PER_PARTITION_BYTES =
66+
"memory.pressure.per.partition.bytes";
67+
6468
// Disk spill configuration for reducing memory pressure
6569
public static final String SPILL_ENABLED = "spill.enabled";
6670
public static final String SPILL_DIRECTORY = "spill.directory";
@@ -160,6 +164,15 @@ private static ConfigDef newConfigDef() {
160164
true,
161165
ConfigDef.Importance.MEDIUM,
162166
"Enable parallel flushing of partitions for higher throughput. Default: true")
167+
.define(
168+
MEMORY_PRESSURE_PER_PARTITION_BYTES,
169+
ConfigDef.Type.LONG,
170+
0L,
171+
ConfigDef.Range.atLeast(0L),
172+
ConfigDef.Importance.MEDIUM,
173+
"Per-partition memory pressure threshold in bytes. When a partition's buffered data "
174+
+ "exceeds this value, it is flushed immediately. 0 disables memory pressure checks. "
175+
+ "Default: 0 (disabled)")
163176
.define(
164177
DUCKLAKE_MAX_RETRY_COUNT,
165178
ConfigDef.Type.INT,
@@ -259,6 +272,15 @@ public int getDuckDbThreads() {
259272
return threads;
260273
}
261274

275+
/**
276+
* Returns the memory pressure threshold in bytes for each partition.
277+
*
278+
* @return memory pressure threshold in bytes per partition
279+
*/
280+
public long getMemoryPressurePerPartitionBytes() {
281+
return getLong(MEMORY_PRESSURE_PER_PARTITION_BYTES);
282+
}
283+
262284
/**
263285
* Returns whether parallel partition flushing is enabled.
264286
*

src/main/java/com/inyo/ducklake/connect/DucklakeSinkTask.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public class DucklakeSinkTask extends SinkTask {
6666
private ScheduledExecutorService flushScheduler;
6767
private ExecutorService partitionExecutor;
6868
private boolean parallelPartitionFlush;
69+
private long memoryPressureBytes;
6970

7071
// Spill configuration
7172
private boolean spillEnabled;
@@ -161,6 +162,7 @@ public void start(Map<String, String> map) {
161162
this.flushIntervalMs = config.getFlushIntervalMs();
162163
this.fileSizeBytes = config.getFileSizeBytes();
163164
this.parallelPartitionFlush = config.isParallelPartitionFlushEnabled();
165+
this.memoryPressureBytes = config.getMemoryPressurePerPartitionBytes();
164166

165167
// Initialize spill configuration
166168
this.spillEnabled = config.isSpillEnabled();
@@ -191,12 +193,13 @@ public void start(Map<String, String> map) {
191193
int threadCount = config.getDuckDbThreads();
192194
LOG.info(
193195
"Buffering config: flushSize={}, flushIntervalMs={}, fileSizeBytes={}, "
194-
+ "parallelPartitionFlush={}, duckdbThreads={}, spillEnabled={}",
196+
+ "parallelPartitionFlush={}, duckdbThreads={}, memoryPressureBytes={}, spillEnabled={}",
195197
flushSize,
196198
flushIntervalMs,
197199
fileSizeBytes,
198200
parallelPartitionFlush,
199201
threadCount,
202+
memoryPressureBytes,
200203
spillEnabled);
201204

202205
// Create executor for parallel partition processing
@@ -687,16 +690,15 @@ private FlushData checkAndExtractFlushData(TopicPartition partition) {
687690
return null;
688691
}
689692

690-
// Check for global memory pressure from Arrow allocator
691-
long allocatedMemory = allocator.getAllocatedMemory();
692-
boolean memoryPressure = allocatedMemory > fileSizeBytes * buffers.size();
693+
// Check for per-partition memory pressure (0 = disabled)
694+
boolean memoryPressure = memoryPressureBytes > 0 && buffer.estimatedBytes > memoryPressureBytes;
693695

694696
if (memoryPressure) {
695697
LOG.warn(
696-
"Memory pressure detected for partition {}: allocatorBytes={}, threshold={}",
698+
"Memory pressure detected for partition {}: bufferBytes={}, threshold={}",
697699
partition,
698-
allocatedMemory,
699-
fileSizeBytes * buffers.size());
700+
buffer.estimatedBytes,
701+
memoryPressureBytes);
700702
}
701703

702704
// Flush if normal thresholds exceeded OR if under memory pressure with data buffered

0 commit comments

Comments
 (0)