@@ -54,7 +54,7 @@ private[worker] final class DiskFlusher(
5454 val threadCount : Int ) extends DeviceObserver with Logging {
5555 private lazy val diskFlusherId = System .identityHashCode(this )
5656 private val workingQueues = new Array [LinkedBlockingQueue [FlushTask ]](threadCount)
57- private val bufferQueues = new Array [ LinkedBlockingQueue [CompositeByteBuf ]](threadCount )
57+ private val bufferQueue = new LinkedBlockingQueue [CompositeByteBuf ](queueCapacity )
5858 private val workers = new Array [Thread ](threadCount)
5959 private val nextWorkerIndex = new AtomicInteger ()
6060
@@ -68,14 +68,11 @@ private[worker] final class DiskFlusher(
6868 init()
6969
7070 private def init (): Unit = {
71- val actualQueueSize = queueCapacity / threadCount + 1
71+ for (_ <- 0 until queueCapacity) {
72+ bufferQueue.put(Unpooled .compositeBuffer(256 ))
73+ }
7274 for (index <- 0 until (threadCount)) {
73- workingQueues(index) = new LinkedBlockingQueue [FlushTask ](actualQueueSize)
74- bufferQueues(index) = new LinkedBlockingQueue [CompositeByteBuf ](actualQueueSize)
75- for (_ <- 0 until actualQueueSize) {
76- bufferQueues(index).put(Unpooled .compositeBuffer(256 ))
77- }
78-
75+ workingQueues(index) = new LinkedBlockingQueue [FlushTask ](queueCapacity)
7976 workers(index) = new Thread (s " $this- $index" ) {
8077 override def run (): Unit = {
8178 while (! stopFlag.get()) {
@@ -96,7 +93,7 @@ private[worker] final class DiskFlusher(
9693 }
9794 lastBeginFlushTime = - 1
9895 }
99- returnBuffer(task.buffer, index )
96+ returnBuffer(task.buffer)
10097 task.notifier.numPendingFlushes.decrementAndGet()
10198 }
10299 }
@@ -122,16 +119,16 @@ private[worker] final class DiskFlusher(
122119 nextIndex % threadCount
123120 }
124121
125- def takeBuffer (timeoutMs : Long , workerIndex : Int ): CompositeByteBuf = {
126- bufferQueues(workerIndex) .poll(timeoutMs, TimeUnit .MILLISECONDS )
122+ def takeBuffer (timeoutMs : Long ): CompositeByteBuf = {
123+ bufferQueue .poll(timeoutMs, TimeUnit .MILLISECONDS )
127124 }
128125
129- def returnBuffer (buffer : CompositeByteBuf , workerIndex : Int ): Unit = {
126+ def returnBuffer (buffer : CompositeByteBuf ): Unit = {
130127 MemoryTracker .instance().releaseDiskBuffer(buffer.readableBytes())
131128 buffer.removeComponents(0 , buffer.numComponents())
132129 buffer.clear()
133130
134- bufferQueues(workerIndex) .put(buffer)
131+ bufferQueue .put(buffer)
135132 }
136133
137134 def addTask (task : FlushTask , timeoutMs : Long , workerIndex : Int ): Boolean = {
@@ -162,7 +159,7 @@ private[worker] final class DiskFlusher(
162159 deviceMonitor.reportDeviceError(workingDir, e, deviceErrorType)
163160 }
164161
165- def bufferQueueInfo (): String = s " $this used buffers: ${bufferQueues.map(_. size()).toList }"
162+ def bufferQueueInfo (): String = s " $this used buffers: ${bufferQueue. size()}"
166163
167164 override def hashCode (): Int = {
168165 workingDir.hashCode()
0 commit comments