Skip to content

Commit f40749a

Browse files
committed
feat: achievement: perfect EventSocket unlocked!
1 parent e54d3f8 commit f40749a

File tree

2 files changed

+204
-51
lines changed

2 files changed

+204
-51
lines changed

public/dashp2p.js

Lines changed: 201 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -65,18 +65,15 @@ var DashP2P = ('object' === typeof module && exports) || {};
6565
p2p.header = null;
6666
/** @type {Uint8Array?} */
6767
p2p.payload = null;
68+
let explicitEvents = ['version', 'verack', 'ping', 'pong'];
69+
p2p._promiseStream = Utils.EventSocket.create(explicitEvents);
6870

69-
p2p._generator = Utils.createPromiseGenerator();
70-
71-
p2p.accept = async function (messageTypes) {
72-
let data = await p2p._generator.next();
73-
return data;
74-
};
71+
p2p.listen = p2p._promiseStream.listen;
7572

7673
/** @param {Uint8Array?} */
7774
p2p.write = function (chunk) {
7875
if (p2p.state === 'error') {
79-
p2p._generator.reject(p2p.error);
76+
p2p._promiseStream.rejectAll(p2p.error);
8077

8178
// in the case of UDP where we miss a packet,
8279
// we can log the error but still resume on the next one.
@@ -101,18 +98,20 @@ var DashP2P = ('object' === typeof module && exports) || {};
10198
p2p.header.command,
10299
p2p.payload?.length || null,
103100
);
104-
p2p._generator.resolve({
101+
let msg = {
102+
command: p2p.header.command,
105103
header: p2p.header,
106104
payload: p2p.payload,
107-
});
105+
};
106+
p2p._promiseStream.resolveAll(msg.command, msg);
108107

109108
p2p.state = 'header';
110109
p2p.write(chunk);
111110
return;
112111
}
113112

114113
let err = new Error(`developer error: unknown state '${p2p.state}'`);
115-
p2p._generator.reject(err);
114+
p2p._promiseStream.rejectAll(err);
116115
p2p.state = 'header';
117116
p2p.write(chunk);
118117
};
@@ -715,53 +714,206 @@ var DashP2P = ('object' === typeof module && exports) || {};
715714
};
716715
Parsers.SIZES = SIZES;
717716

718-
Utils.createPromiseGenerator = function () {
719-
let g = {};
720-
721-
g._settled = true;
722-
g._promise = Promise.resolve(); // for type hint
723-
g._results = [];
724-
725-
g.resolve = function (result) {};
726-
g.reject = function (err) {};
727-
g.next = async function () {
728-
if (!g._settled) {
729-
console.warn('g.accept() called before previous call was settled');
730-
return await g._promise;
731-
}
732-
g._settled = false;
733-
g._promise = new Promise(function (_resolve, _reject) {
734-
g.resolve = function (result) {
735-
if (g._settled) {
736-
g._results.push(result);
737-
return;
738-
}
739-
g._settled = true;
740-
_resolve(result);
717+
Utils.EventSocket = {};
718+
719+
/** @param {String} events */
720+
Utils.EventSocket.create = function (explicitEvents) {
721+
let stream = {};
722+
723+
stream._explicitEvents = explicitEvents;
724+
725+
/** @type {Array<any>} */
726+
stream._connections = [];
727+
728+
/**
729+
* @param {Array<String>} events - ex: ['*', 'error'] for default events, or list by name
730+
*/
731+
stream.listen = function (events = null) {
732+
let conn = Utils.EventSocket.createConnection(stream, events);
733+
return conn;
734+
};
735+
736+
stream.resolveAll = function (eventname, msg) {
737+
for (let p of stream._connections) {
738+
let isSubscribed = p._events.includes(eventname);
739+
if (isSubscribed) {
740+
p._resolve(msg);
741+
continue;
742+
}
743+
744+
let isExplicit = stream._explicitEvents.includes(eventname);
745+
if (isExplicit) {
746+
continue;
747+
}
748+
749+
let hasCatchall = p._events.includes('*');
750+
if (hasCatchall) {
751+
p._resolve(msg);
752+
}
753+
}
754+
};
755+
756+
stream.rejectAll = function (err) {
757+
let handled = false;
758+
for (let p of stream._connections) {
759+
let handlesErrors = p._events.includes('error');
760+
if (!handlesErrors) {
761+
continue;
762+
}
763+
764+
handled = true;
765+
p._reject(err);
766+
}
767+
if (!handled) {
768+
for (let p of stream._connections) {
769+
p._reject(err);
770+
}
771+
}
772+
};
773+
774+
return stream;
775+
};
776+
777+
Utils.EventSocket.createConnection = function (stream, defaultEvents = null) {
778+
let p = {};
779+
stream._connections.push(p);
780+
781+
p._events = defaultEvents;
782+
783+
p.closed = false;
784+
p._settled = false;
785+
p._resolve = function (msg) {};
786+
p._reject = function (err) {};
787+
p._promise = Promise.resolve(null);
788+
p._next = async function () {
789+
p._settled = false;
790+
p._promise = new Promise(function (_resolve, _reject) {
791+
p._resolve = function (msg) {
792+
p._close(true);
793+
_resolve(msg);
741794
};
742-
g.reject = function (error) {
743-
if (g._settled) {
744-
g._results.push(error);
745-
return;
746-
}
747-
g._settled = true;
748-
_reject(error);
795+
p._reject = function (err) {
796+
p._close(true);
797+
_reject(err);
749798
};
750799
});
751-
if (g._results.length) {
752-
let result = g._results.shift();
753-
if (result instanceof Error) {
754-
g.reject(result);
755-
} else {
756-
g.resolve(result);
757-
}
800+
801+
return await p._promise;
802+
};
803+
804+
/**
805+
* Accepts the next message of the given event name,
806+
* or of any of the default event names.
807+
* @param {String} eventname - '*' for default events, 'error' for error, or others by name
808+
*/
809+
p.accept = async function (eventname) {
810+
if (p.closed) {
811+
let err = new Error('cannot accept new events after close');
812+
Object.assign(err, { code: 'E_ALREADY_CLOSED' });
813+
throw err;
758814
}
759-
return await g._promise;
815+
816+
if (eventname) {
817+
p.events = [eventname];
818+
} else if (defaultEvents?.length) {
819+
p.events = defaultEvents;
820+
} else {
821+
let err = new Error(
822+
`call stream.listen(['*']) or conn.accept('*') for default events`,
823+
);
824+
Object.assign(err, { code: 'E_NO_EVENTS' });
825+
throw err;
826+
}
827+
828+
return await p._next();
760829
};
761830

762-
return g;
831+
p._close = function (_settle) {
832+
if (p.closed) {
833+
return;
834+
}
835+
p.closed = true;
836+
837+
let index = stream._connections.indexOf(p);
838+
if (index >= 0) {
839+
void stream._connections.splice(index, 1);
840+
}
841+
if (_settle) {
842+
p._settled = true;
843+
}
844+
if (p._settled) {
845+
return;
846+
}
847+
848+
p._settled = true;
849+
let err = new Error('promise stream closed');
850+
Object.assign(err, { code: 'E_CLOSE' });
851+
p._reject(err);
852+
};
853+
854+
/**
855+
* Causes `let msg = conn.accept()` to fail with E_CLOSE or E_ALREADY_CLOSED
856+
*/
857+
p.close = function () {
858+
p._close(false);
859+
};
860+
861+
return p;
763862
};
764863

864+
// /** @param {String} events */
865+
// Utils.createPromiseGenerator = function (events) {
866+
// let g = {};
867+
868+
// g.events = events;
869+
870+
// // g._settled = true;
871+
// g._promise = Promise.resolve(); // for type hint
872+
// g._results = [];
873+
874+
// g.resolve = function (result) {};
875+
// g.reject = function (err) {};
876+
877+
// // g.init = async function () {
878+
// // if (!g._settled) {
879+
// // console.warn('g.init() called again before previous call was settled');
880+
// // return await g._promise;
881+
// // }
882+
// // g._settled = false;
883+
// g._promise = new Promise(function (_resolve, _reject) {
884+
// g.resolve = _resolve;
885+
// g.reject = _reject;
886+
// // g.resolve = function (result) {
887+
// // if (g._settled) {
888+
// // g._results.push(result);
889+
// // return;
890+
// // }
891+
// // g._settled = true;
892+
// // _resolve(result);
893+
// // };
894+
// // g.reject = function (error) {
895+
// // if (g._settled) {
896+
// // g._results.push(error);
897+
// // return;
898+
// // }
899+
// // g._settled = true;
900+
// // _reject(error);
901+
// // };
902+
// });
903+
// // if (g._results.length) {
904+
// // let result = g._results.shift();
905+
// // if (result instanceof Error) {
906+
// // g.reject(result);
907+
// // } else {
908+
// // g.resolve(result);
909+
// // }
910+
// // }
911+
// // return await g._promise;
912+
// // };
913+
914+
// return g;
915+
// };
916+
765917
/**
766918
* @param {Array<Uint8Array>} byteArrays
767919
* @param {Number?} [len]

public/wallet-app.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,9 +1104,10 @@
11041104

11051105
for (;;) {
11061106
let subs = ['*', 'inv', 'ping', 'pong', 'version', 'verack'];
1107-
let msg = await p2p.accept(subs);
1107+
let conn = p2p.listen(subs);
1108+
let msg = await conn.accept();
11081109
let command = msg.header.command;
1109-
console.log('p2p.accept():', command);
1110+
console.log('conn.accept():', command);
11101111
let isSub = subs.includes(command);
11111112
if (isSub) {
11121113
console.log(msg);

0 commit comments

Comments
 (0)