Skip to content

Commit 4a88079

Browse files
committed
[BUG] multi-thread flusher causes data inconsistent with chunk offsets (#275)
(cherry picked from commit cb42b2f)
1 parent f00fae8 commit 4a88079

File tree

2 files changed

+57
-40
lines changed

2 files changed

+57
-40
lines changed

server-worker/src/main/java/com/aliyun/emr/rss/service/deploy/worker/FileWriter.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public final class FileWriter extends DeviceObserver {
6060
private long bytesFlushed;
6161

6262
private final DiskFlusher flusher;
63+
private final int flushWorkerIndex;
6364
private CompositeByteBuf flushBuffer;
6465

6566
private final long chunkSize;
@@ -119,6 +120,7 @@ public FileWriter(
119120
PartitionSplitMode splitMode) throws IOException {
120121
this.file = file;
121122
this.flusher = flusher;
123+
this.flushWorkerIndex = flusher.getWorkerIndex();
122124
this.dataRootDir = workingDir;
123125
this.chunkSize = chunkSize;
124126
this.nextBoundary = chunkSize;
@@ -315,7 +317,7 @@ private void takeBuffer() {
315317
}
316318

317319
// real action
318-
flushBuffer = flusher.takeBuffer(timeoutMs);
320+
flushBuffer = flusher.takeBuffer(timeoutMs, flushWorkerIndex);
319321

320322
// metrics end
321323
if (source.samplePerfCritical()) {
@@ -330,7 +332,7 @@ private void takeBuffer() {
330332
}
331333

332334
private void addTask(FlushTask task) throws IOException {
333-
if (!flusher.addTask(task, timeoutMs)) {
335+
if (!flusher.addTask(task, timeoutMs, flushWorkerIndex)) {
334336
IOException e = new IOException("Add flush task timeout.");
335337
notifier.setException(e);
336338
throw e;
@@ -339,7 +341,7 @@ private void addTask(FlushTask task) throws IOException {
339341

340342
private synchronized void returnBuffer() {
341343
if (flushBuffer != null) {
342-
flusher.returnBuffer(flushBuffer);
344+
flusher.returnBuffer(flushBuffer, flushWorkerIndex);
343345
flushBuffer = null;
344346
}
345347
}

server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala

Lines changed: 52 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,10 @@ private[worker] final class DiskFlusher(
5353
val deviceMonitor: DeviceMonitor,
5454
val threadCount: Int) extends DeviceObserver with Logging {
5555
private lazy val diskFlusherId = System.identityHashCode(this)
56-
private val workingQueue = new LinkedBlockingQueue[FlushTask](queueCapacity)
57-
private val bufferQueue = new LinkedBlockingQueue[CompositeByteBuf](queueCapacity)
58-
private val writeActionPool = ThreadUtils.newDaemonFixedThreadPool(threadCount,
59-
workingDir.getName + "-flusher")
60-
for (_ <- 0 until queueCapacity) {
61-
bufferQueue.put(Unpooled.compositeBuffer(256))
62-
}
56+
private val workingQueues = new Array[LinkedBlockingQueue[FlushTask]](threadCount)
57+
private val bufferQueues = new Array[LinkedBlockingQueue[CompositeByteBuf]](threadCount)
58+
private val workers = new Array[Thread](threadCount)
59+
private val nextWorkerIndex = new AtomicInteger()
6360

6461
@volatile
6562
private var lastBeginFlushTime: Long = -1
@@ -68,12 +65,21 @@ private[worker] final class DiskFlusher(
6865
val stopFlag = new AtomicBoolean(false)
6966
val rand = new Random()
7067

71-
private val worker = new Thread(s"$this") {
72-
override def run(): Unit = {
73-
while (!stopFlag.get()) {
74-
val task = workingQueue.take()
75-
writeActionPool.submit(new Runnable {
76-
override def run(): Unit = {
68+
init()
69+
70+
private def init(): Unit = {
71+
val actualQueueSize = queueCapacity / threadCount + 1
72+
for (index <- 0 until (threadCount)) {
73+
workingQueues(index) = new LinkedBlockingQueue[FlushTask](actualQueueSize)
74+
bufferQueues(index) = new LinkedBlockingQueue[CompositeByteBuf](actualQueueSize)
75+
for (_ <- 0 until actualQueueSize) {
76+
bufferQueues(index).put(Unpooled.compositeBuffer(256))
77+
}
78+
79+
workers(index) = new Thread(s"$this-$index") {
80+
override def run(): Unit = {
81+
while (!stopFlag.get()) {
82+
val task = workingQueues(index).take()
7783
val key = s"DiskFlusher-$workingDir-${rand.nextInt()}"
7884
workerSource.sample(WorkerSource.FlushDataTime, key) {
7985
if (!task.notifier.hasException) {
@@ -90,55 +96,64 @@ private[worker] final class DiskFlusher(
9096
}
9197
lastBeginFlushTime = -1
9298
}
93-
returnBuffer(task.buffer)
99+
returnBuffer(task.buffer, index)
94100
task.notifier.numPendingFlushes.decrementAndGet()
95101
}
96102
}
97-
})
103+
}
98104
}
105+
workers(index).setDaemon(true)
106+
workers(index).setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
107+
override def uncaughtException(t: Thread, e: Throwable): Unit = {
108+
logError(s"$this thread terminated.", e)
109+
}
110+
})
111+
workers(index).start()
99112
}
113+
114+
deviceMonitor.registerDiskFlusher(this)
100115
}
101-
worker.setDaemon(true)
102-
worker.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
103-
override def uncaughtException(t: Thread, e: Throwable): Unit = {
104-
logError(s"$this thread terminated.", e)
105-
}
106-
})
107-
worker.start()
108116

109-
deviceMonitor.registerDiskFlusher(this)
117+
def getWorkerIndex: Int = {
118+
val nextIndex = nextWorkerIndex.getAndIncrement()
119+
if (nextIndex > threadCount) {
120+
nextWorkerIndex.set(0)
121+
}
122+
nextIndex % threadCount
123+
}
110124

111-
def takeBuffer(timeoutMs: Long): CompositeByteBuf = {
112-
bufferQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
125+
def takeBuffer(timeoutMs: Long, workerIndex: Int): CompositeByteBuf = {
126+
bufferQueues(workerIndex).poll(timeoutMs, TimeUnit.MILLISECONDS)
113127
}
114128

115-
def returnBuffer(buffer: CompositeByteBuf): Unit = {
129+
def returnBuffer(buffer: CompositeByteBuf, workerIndex: Int): Unit = {
116130
MemoryTracker.instance().releaseDiskBuffer(buffer.readableBytes())
117131
buffer.removeComponents(0, buffer.numComponents())
118132
buffer.clear()
119133

120-
bufferQueue.put(buffer)
134+
bufferQueues(workerIndex).put(buffer)
121135
}
122136

123-
def addTask(task: FlushTask, timeoutMs: Long): Boolean = {
124-
workingQueue.offer(task, timeoutMs, TimeUnit.MILLISECONDS)
137+
def addTask(task: FlushTask, timeoutMs: Long, workerIndex: Int): Boolean = {
138+
workingQueues(workerIndex).offer(task, timeoutMs, TimeUnit.MILLISECONDS)
125139
}
126140

127141
override def notifyError(deviceName: String, dirs: ListBuffer[File] = null,
128142
deviceErrorType: DeviceErrorType): Unit = {
129143
logError(s"$this is notified Device $deviceName Error $deviceErrorType! Stop Flusher.")
130144
stopFlag.set(true)
131145
try {
132-
worker.interrupt()
133-
writeActionPool.shutdown()
146+
workers.foreach(_.interrupt())
134147
} catch {
135148
case e: Exception =>
136-
logError(s"Exception when interrupt worker: $worker, $e")
149+
logError(s"Exception when interrupt worker: $workers, $e")
150+
}
151+
workingQueues.foreach { queue =>
152+
queue.asScala.foreach { task =>
153+
task.buffer.removeComponents(0, task.buffer.numComponents())
154+
task.buffer.clear()
155+
}
137156
}
138-
workingQueue.asScala.foreach(task => {
139-
task.buffer.removeComponents(0, task.buffer.numComponents())
140-
task.buffer.clear()
141-
})
142157
deviceMonitor.unregisterDiskFlusher(this)
143158
}
144159

@@ -147,7 +162,7 @@ private[worker] final class DiskFlusher(
147162
deviceMonitor.reportDeviceError(workingDir, e, deviceErrorType)
148163
}
149164

150-
def bufferQueueInfo(): String = s"$this available buffers: ${bufferQueue.size()}"
165+
def bufferQueueInfo(): String = s"$this used buffers: ${bufferQueues.map(_.size()).toList}"
151166

152167
override def hashCode(): Int = {
153168
workingDir.hashCode()

0 commit comments

Comments
 (0)