11require ( 'dotenv' ) . config ( ) ;
22const express = require ( 'express' ) ;
33const { v4 : uuid } = require ( 'uuid' ) ;
4+ const Consumer = require ( './Consumer' ) ;
5+ const MessageQueue = require ( './MessageQueue' ) ;
46const PORT = process . env . PORT ;
57
68const app = express ( ) ;
@@ -10,23 +12,9 @@ app.use(express.urlencoded({ extended: false }));
1012
1113// TODO check if data directory exists, create if not exists
1214
13- // In-memory queues
14- var consumers = [ ] ;
15- var queues = [ ] ;
16-
17- // consumers
18- app . post ( '/queues/:queue/consumers' , function ( req , res , next ) {
19- try {
20- var consumer = consumers . find ( c =>
21- c . clientSecret === req . body . clientSecret && c . queue === req . params . queue ) ;
22- consumer = new Consumer ( uuid ( ) , req . body . clientSecret , req . params . queue ) ;
23- consumers . push ( consumer ) ;
24- res . status ( 200 ) . json ( consumer . id ) ;
25- } catch ( error ) {
26- console . error ( 'An error occurred' , error ) ;
27- res . sendStatus ( 500 ) ;
28- }
29- } ) ;
15+ // In-memory objects
16+ var consumers = [ ] ; // current consumers
17+ var queues = [ ] ; // use this to dump data
3018
3119// queues
3220app . get ( '/queues' , function ( req , res , next ) {
@@ -42,7 +30,7 @@ app.get('/queues', function (req, res, next) {
4230
4331app . post ( '/queues/:queue' , function ( req , res , next ) {
4432 try {
45- var queue = new Queue ( req . params . queue ) ;
33+ var queue = new MessageQueue ( req . params . queue ) ;
4634 queues . push ( queue ) ;
4735 res . sendStatus ( 200 ) ;
4836 } catch ( error ) {
@@ -61,11 +49,35 @@ app.delete('/queues/:queue', function (req, res, next) {
6149 }
6250} ) ;
6351
52+ // register consumer
53+ app . post ( '/queues/:queue/consumers' , function ( req , res , next ) {
54+ try {
55+ var consumer = consumers . find ( c =>
56+ c . clientSecret === req . body . clientSecret && c . queue === req . params . queue ) ;
57+ if ( ! consumer ) {
58+ consumer = new Consumer ( uuid ( ) , req . body . clientSecret , new MessageQueue ( req . params . queue ) ) ;
59+ consumers . push ( consumer ) ;
60+ }
61+ res . status ( 200 ) . json ( consumer . id ) ;
62+ } catch ( error ) {
63+ console . error ( 'An error occurred' , error ) ;
64+ res . sendStatus ( 500 ) ;
65+ }
66+ } ) ;
67+
6468// publish
6569app . post ( '/queues/:queue/messages' , function ( req , res , next ) {
6670 try {
71+ var queues = consumers . findAll ( c => c . queue . name === req . params . queue ) ;
72+ if ( queues . length > 0 ) {
73+ for ( var q in queues ) {
74+ q . Enqueue ( req . body . message ) ;
75+ }
76+ }
6777 var queue = queues . find ( q => q . name === req . params . queue ) ;
68- queue . Enqueue ( req . body . message ) ;
78+ if ( ! queue ) {
79+ queues . push ( req . params . queue ) ;
80+ }
6981 res . sendStatus ( 200 ) ;
7082 } catch ( error ) {
7183 console . error ( 'An error occurred' , error ) ;
@@ -76,20 +88,12 @@ app.post('/queues/:queue/messages', function (req, res, next) {
7688// consume
7789app . get ( '/queues/:queue/consumers/:id/messages' , function ( req , res , next ) {
7890 try {
91+ // check if valid queue and consumer
7992 var queue = queues . find ( q => q . name === req . params . queue ) ;
80- if ( queue ) {
81- // check if valid consumer
82- var consumer = consumers . find ( c => c . id === req . params . id && c . queue === req . params . queue ) ;
83- if ( consumer ) {
84- var message = queue . Dequeue ( ) ;
85- if ( message != null ) {
86- res . status ( 200 ) . json ( message ) ;
87- } else {
88- res . sendStatus ( 204 ) ;
89- }
90- } else {
91- res . sendStatus ( 204 ) ;
92- }
93+ var consumer = consumers . find ( c => c . id === req . params . id && c . queue . name === req . params . queue ) ;
94+ if ( queue && consumer ) {
95+ var message = queue . Dequeue ( ) ;
96+ message ? res . status ( 200 ) . json ( message ) : res . sendStatus ( 204 ) ;
9397 } else {
9498 res . sendStatus ( 204 ) ;
9599 }
0 commit comments