Skip to content

Commit 453c0eb

Browse files
committed
Update DSS with linkis version 0.9.4 and solves some compilation errors
#Fixes #168
1 parent 9ed0227 commit 453c0eb

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@
103103
<dependency>
104104
<groupId>com.fasterxml.jackson.core</groupId>
105105
<artifactId>jackson-core</artifactId>
106-
<version>2.9.6</version>
106+
<version>2.10.0</version>
107107
</dependency>
108108
<dependency>
109109
<groupId>net.databinder.dispatch</groupId>

dss-appjoint-core/src/main/scala/com/webank/wedatasphere/dss/appjoint/execution/scheduler/ListenerEventBusNodeExecutionScheduler.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.webank.wedatasphere.dss.appjoint.execution.scheduler
1919

20+
import java.util.concurrent.ArrayBlockingQueue
21+
2022
import com.webank.wedatasphere.dss.appjoint.exception.AppJointErrorException
2123
import com.webank.wedatasphere.dss.appjoint.execution.common.{AsyncNodeExecutionResponse, CompletedNodeExecutionResponse, LongTermNodeExecutionAction}
2224
import com.webank.wedatasphere.dss.appjoint.execution.conf.NodeExecutionConfiguration._
@@ -55,7 +57,7 @@ class ListenerEventBusNodeExecutionScheduler(eventQueueCapacity: Int, name: Stri
5557
val field1 = ru.typeOf[ListenerEventBus[_, _]].decl(ru.TermName("eventQueue")).asMethod
5658
val result = listenerEventBusClass.reflectMethod(field1)
5759
result() match {
58-
case queue: BlockingLoopArray[AsyncNodeExecutionResponseEvent] => queue
60+
case queue: ArrayBlockingQueue[AsyncNodeExecutionResponseEvent] => queue
5961
}
6062
}
6163

@@ -104,18 +106,18 @@ class ListenerEventBusNodeExecutionScheduler(eventQueueCapacity: Int, name: Stri
104106

105107
protected def addEvent(event: AsyncNodeExecutionResponseEvent): Unit = synchronized {
106108
listenerEventBus.post(event)
107-
event.getResponse.getAction match {
108-
case longTermAction: LongTermNodeExecutionAction =>
109-
longTermAction.setSchedulerId(eventQueue.max)
110-
case _ =>
111-
}
109+
// event.getResponse.getAction match {
110+
// case longTermAction: LongTermNodeExecutionAction =>
111+
// longTermAction.setSchedulerId(eventQueue.max)
112+
// case _ =>
113+
// }
112114
}
113115

114-
override def removeAsyncResponse(action: LongTermNodeExecutionAction): Unit =
115-
getAsyncResponse(action).setCompleted(true)
116+
override def removeAsyncResponse(action: LongTermNodeExecutionAction): Unit = {
117+
118+
}
116119

117-
override def getAsyncResponse(action: LongTermNodeExecutionAction): AsyncNodeExecutionResponse =
118-
eventQueue.get(action.getSchedulerId).getResponse
120+
override def getAsyncResponse(action: LongTermNodeExecutionAction): AsyncNodeExecutionResponse = null
119121

120122
override def start(): Unit = listenerEventBus.start()
121123

0 commit comments

Comments
 (0)