Skip to content

Commit 6218f36

Browse files
authored
Merge pull request #631 from bigmontz/testkit_wait_for_result_complete
testkit: waiting for the current result get completed
2 parents 5818546 + 1d3b942 commit 6218f36

File tree

2 files changed

+51
-10
lines changed

2 files changed

+51
-10
lines changed

testkit-backend/main.js

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,22 @@ class Backend {
124124
params[key] = cypherToNative(value)
125125
}
126126
}
127-
const result = session.run(cypher, params)
128-
this._id++
129-
const resultObserver = new ResultObserver()
130-
result.subscribe(resultObserver)
131-
this._resultObservers[this._id] = resultObserver
132-
this._writeResponse('Result', {
133-
id: this._id
134-
})
127+
128+
const observers = Object.values(this._resultObservers).filter(
129+
obs => obs.sessionId === sessionId
130+
)
131+
Promise.all(observers.map(obs => obs.completitionPromise()))
132+
.catch(_ => null)
133+
.then(_ => {
134+
this._id++
135+
const result = session.run(cypher, params)
136+
const resultObserver = new ResultObserver({ sessionId })
137+
result.subscribe(resultObserver)
138+
this._resultObservers[this._id] = resultObserver
139+
this._writeResponse('Result', {
140+
id: this._id
141+
})
142+
})
135143
}
136144
break
137145

@@ -191,7 +199,7 @@ class Backend {
191199
}
192200
const result = tx.tx.run(cypher, params)
193201
this._id++
194-
const resultObserver = new ResultObserver()
202+
const resultObserver = new ResultObserver({})
195203
result.subscribe(resultObserver)
196204
this._resultObservers[this._id] = resultObserver
197205
this._writeResponse('Result', {

testkit-backend/result-observer.js

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
const neo4j = require('neo4j-driver')
22

33
export class ResultObserver {
4-
constructor () {
4+
constructor ({ sessionId }) {
5+
this.sessionId = sessionId
56
this.keys = null
67
this._stream = []
78
this.summary = null
@@ -11,6 +12,7 @@ export class ResultObserver {
1112
this.onNext = this.onNext.bind(this)
1213
this.onCompleted = this.onCompleted.bind(this)
1314
this.onError = this.onError.bind(this)
15+
this._completitionPromise = null
1416
}
1517

1618
onKeys (keys) {
@@ -25,11 +27,15 @@ export class ResultObserver {
2527
onCompleted (summary) {
2628
this._summary = summary
2729
this._fulfill()
30+
this._resolve(this._completitionPromise, summary)
31+
this._completitionPromise = null
2832
}
2933

3034
onError (e) {
3135
this._stream.push(e)
3236
this._fulfill()
37+
this._reject(this._completitionPromise, e)
38+
this._completitionPromise = null
3339
}
3440

3541
// Returns a promise, only one outstanding next!
@@ -43,6 +49,21 @@ export class ResultObserver {
4349
})
4450
}
4551

52+
completitionPromise () {
53+
return new Promise((resolve, reject) => {
54+
if (this._summary) {
55+
resolve(this._summary)
56+
} else if (this._err) {
57+
reject(this._err)
58+
} else {
59+
this._completitionPromise = {
60+
resolve,
61+
reject
62+
}
63+
}
64+
})
65+
}
66+
4667
_fulfill () {
4768
if (!this._promise) {
4869
return
@@ -76,4 +97,16 @@ export class ResultObserver {
7697
this._promise = null
7798
}
7899
}
100+
101+
_resolve (promise, data) {
102+
if (promise) {
103+
promise.resolve(data)
104+
}
105+
}
106+
107+
_reject (promise, err) {
108+
if (promise) {
109+
promise.reject(err)
110+
}
111+
}
79112
}

0 commit comments

Comments
 (0)