-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathCloudFetchMemoryBufferManager.cs
More file actions
147 lines (129 loc) · 5.38 KB
/
CloudFetchMemoryBufferManager.cs
File metadata and controls
147 lines (129 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/*
* Copyright (c) 2025 ADBC Drivers Contributors
*
* This file has been modified from its original version, which is
* under the Apache License:
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using System.Threading.Tasks;
namespace AdbcDrivers.Databricks.Reader.CloudFetch
{
/// <summary>
/// Manages memory allocation for prefetched files.
///
/// This manager tracks in-flight download memory based on the compressed file sizes
/// received from the server. It does NOT track the decompressed data size, as that
/// memory is naturally bounded by the result queue capacity and batch processing flow.
///
/// The memory limit controls how many concurrent downloads can be in-flight at once,
/// preventing unbounded parallel downloads from exhausting system resources.
/// </summary>
internal sealed class CloudFetchMemoryBufferManager : ICloudFetchMemoryBufferManager
{
private readonly long _maxMemory;
private long _usedMemory;
private readonly SemaphoreSlim _memorySemaphore;
/// <summary>
/// Initializes a new instance of the <see cref="CloudFetchMemoryBufferManager"/> class.
/// </summary>
/// <param name="maxMemoryMB">The maximum memory allowed for buffering in megabytes.</param>
public CloudFetchMemoryBufferManager(int maxMemoryMB = CloudFetchConfiguration.DefaultMemoryBufferSizeMB)
{
if (maxMemoryMB <= 0)
{
throw new ArgumentOutOfRangeException(nameof(maxMemoryMB), "Memory buffer size must be positive.");
}
// Convert MB to bytes
_maxMemory = maxMemoryMB * 1024L * 1024L;
_usedMemory = 0;
_memorySemaphore = new SemaphoreSlim(1, 1);
}
/// <inheritdoc />
public long MaxMemory => _maxMemory;
/// <inheritdoc />
public long UsedMemory => Interlocked.Read(ref _usedMemory);
/// <inheritdoc />
public bool TryAcquireMemory(long size)
{
if (size <= 0)
{
throw new ArgumentOutOfRangeException(nameof(size), "Size must be positive.");
}
// Try to acquire memory (size is the compressed file size from the server)
long originalValue;
long newValue;
do
{
originalValue = Interlocked.Read(ref _usedMemory);
newValue = originalValue + size;
// Check if we would exceed the maximum memory
if (newValue > _maxMemory)
{
return false;
}
}
while (Interlocked.CompareExchange(ref _usedMemory, newValue, originalValue) != originalValue);
return true;
}
/// <inheritdoc />
public async Task AcquireMemoryAsync(long size, CancellationToken cancellationToken)
{
if (size <= 0)
{
throw new ArgumentOutOfRangeException(nameof(size), "Size must be positive.");
}
// Special case: if size is greater than max memory, we'll never be able to acquire it
// Note: size is the compressed file size from the server
if (size > _maxMemory)
{
throw new ArgumentOutOfRangeException(nameof(size), $"Requested size ({size} bytes) exceeds maximum memory ({_maxMemory} bytes).");
}
while (!cancellationToken.IsCancellationRequested)
{
// Try to acquire memory without blocking
if (TryAcquireMemory(size))
{
return;
}
// If we couldn't acquire memory, wait for some to be released
await Task.Delay(10, cancellationToken).ConfigureAwait(false);
}
// If we get here, cancellation was requested
cancellationToken.ThrowIfCancellationRequested();
}
/// <inheritdoc />
public void ReleaseMemory(long size)
{
if (size <= 0)
{
throw new ArgumentOutOfRangeException(nameof(size), "Size must be positive.");
}
// Release memory
long newValue = Interlocked.Add(ref _usedMemory, -size);
// Ensure we don't go negative
if (newValue < 0)
{
// This should never happen if the code is correct
Interlocked.Exchange(ref _usedMemory, 0);
throw new InvalidOperationException("Memory buffer manager released more memory than was acquired.");
}
}
}
}