|
17 | 17 |
|
18 | 18 | package com.webank.wedatasphere.dss.appjoint.execution.scheduler |
19 | 19 |
|
| 20 | +import java.util.concurrent.ArrayBlockingQueue |
| 21 | + |
20 | 22 | import com.webank.wedatasphere.dss.appjoint.exception.AppJointErrorException |
21 | 23 | import com.webank.wedatasphere.dss.appjoint.execution.common.{AsyncNodeExecutionResponse, CompletedNodeExecutionResponse, LongTermNodeExecutionAction} |
22 | 24 | import com.webank.wedatasphere.dss.appjoint.execution.conf.NodeExecutionConfiguration._ |
@@ -55,7 +57,7 @@ class ListenerEventBusNodeExecutionScheduler(eventQueueCapacity: Int, name: Stri |
55 | 57 | val field1 = ru.typeOf[ListenerEventBus[_, _]].decl(ru.TermName("eventQueue")).asMethod |
56 | 58 | val result = listenerEventBusClass.reflectMethod(field1) |
57 | 59 | result() match { |
58 | | - case queue: BlockingLoopArray[AsyncNodeExecutionResponseEvent] => queue |
| 60 | + case queue: ArrayBlockingQueue[AsyncNodeExecutionResponseEvent] => queue |
59 | 61 | } |
60 | 62 | } |
61 | 63 |
|
@@ -104,18 +106,18 @@ class ListenerEventBusNodeExecutionScheduler(eventQueueCapacity: Int, name: Stri |
104 | 106 |
|
105 | 107 | protected def addEvent(event: AsyncNodeExecutionResponseEvent): Unit = synchronized { |
106 | 108 | listenerEventBus.post(event) |
107 | | - event.getResponse.getAction match { |
108 | | - case longTermAction: LongTermNodeExecutionAction => |
109 | | - longTermAction.setSchedulerId(eventQueue.max) |
110 | | - case _ => |
111 | | - } |
| 109 | +// event.getResponse.getAction match { |
| 110 | +// case longTermAction: LongTermNodeExecutionAction => |
| 111 | +// longTermAction.setSchedulerId(eventQueue.max) |
| 112 | +// case _ => |
| 113 | +// } |
112 | 114 | } |
113 | 115 |
|
114 | | - override def removeAsyncResponse(action: LongTermNodeExecutionAction): Unit = |
115 | | - getAsyncResponse(action).setCompleted(true) |
| 116 | + override def removeAsyncResponse(action: LongTermNodeExecutionAction): Unit = { |
| 117 | + |
| 118 | + } |
116 | 119 |
|
117 | | - override def getAsyncResponse(action: LongTermNodeExecutionAction): AsyncNodeExecutionResponse = |
118 | | - eventQueue.get(action.getSchedulerId).getResponse |
| 120 | + override def getAsyncResponse(action: LongTermNodeExecutionAction): AsyncNodeExecutionResponse = null |
119 | 121 |
|
120 | 122 | override def start(): Unit = listenerEventBus.start() |
121 | 123 |
|
|
0 commit comments