@@ -12,7 +12,6 @@ var trace = require('debug')('arkivo:trace');
1212var B = require ( 'bluebird' ) ;
1313
1414var config = require ( './config' ) . controller ;
15- var zotero = require ( './zotero' ) ;
1615var common = require ( './common' ) ;
1716
1817var extend = common . extend ;
@@ -29,6 +28,78 @@ var Listener = require('./listener');
2928
3029/** @module arkivo */
3130
31+ /**
32+ * # How it works?
33+ * ## What happens on startup?
34+ * 1. Arkivo is started.
35+ * 2. Continues processing previous Kue jobs [subscribe, unusbscribe, sync].
36+ * 3. Creates listener, connects to stream-server, subscribes to all
37+ * topics from all subscriptions.
38+ * 4. Stream-server responds with subscriptionsCreated, and the listener
39+ * emits 'update' event for each subscription that is listening
40+ * for the specific topic and key pair.
41+ * 5. Sync jobs are scheduled for each subscription. And this basically means
42+ * that on each stream-server (re)connect we (re)synchronize all subscriptions.
43+ *
44+ * ## What happens when creating a subscription?
45+ * 1. The URL is validated to make sure it's an items url and we have access to it.
46+ * 2. If the URL is ok, the new subscription is created and saved.
47+ * 3. Two concurrent operations are issued:
48+ * subscribe to stream-server, and schedule the first sync job.
49+ * 4. Listener receives 'subscriptionsCreated' response, and emits
50+ * 'update' events only for subscriptions which are newly subscribed
51+ * to stream-server.
52+ * 5. Each 'update' event should schedule a 'sync' job for the updated
53+ * subscription, but in practise the job will be skipped because,
54+ * it should already exists when created in point 4.
55+ *
56+ * ## What happens when a topic is updated?
57+ * 1. Listener receives 'topicUpdated' from stream-server.
58+ * 2. Each subscription that is listening for the updated topic
59+ * will get the 'update' event.
60+ * 3. Each update event should schedule a 'sync' job for that subscription,
61+ * unless there already exists a 'sync' job for the subscription.
62+ *
63+ * ## What happens when removing subscription?
64+ * 1. Destroys the actual subscription.
65+ * 2. Removes the subscription from the listener.
66+ * 3. Listener checks if there is no more subscriptions listening
67+ * for the same topic(+key) combination, and usubscribes from
68+ * stream-server if so.
69+ *
70+ * ---
71+ *
72+ * # Optional stream-server
73+ * Stream-sever can be disabled by setting listen:false in config.
74+ * ## What this would change?
75+ * * On startup we only continue to do the previously scheduled jobs.
76+ * * But we don't initialize the listener.
77+ * * Therefore we don't add subscriptions to it.
78+ * * Therefore the listener doesn't subscribe to stream-server.
79+ * * Therefore we don't get subscriptionsCreated response from server,
80+ * which emits 'update' event and schedules 'sync' jobs.
81+ * * But, 'sync' job is scheduled on subscription creation.
82+ * * And also it can be triggered by `POST /api/sync` API request.
83+ *
84+ * ---
85+ *
86+ * # The Kue based 'sync' queue
87+ * The Kue based sync queue is probably unnecessary and can be replaced with
88+ * a simple array marking subscriptions that need to be updated:
89+ * var subscriptionsToUpdate = [
90+ * "vrrui780gb",
91+ * "olxbwld2t4",
92+ * ]
93+ *
94+ * Especially because Kue doesn't have a simple way to deduplicate jobs.
95+ *
96+ * The parameter 'skip' (to skip fetching already existing items)
97+ * can be saved together with the subscription,
98+ * therefore we would no longer need to persist the sync queue.
99+ * On each restart all subscriptions are resynchronized anyway.
100+ * Unless there is something else why we need the persisting sync queue?
101+ *
102+ */
32103
33104/**
34105 * @class Controller
@@ -103,13 +174,14 @@ Controller.prototype.subscribe = function (data, job) {
103174 // Theoretically the validated url can become invalid
104175 // when stream-server tries to create subscription.
105176 // But in practice this shouldn't be a problem.
106- return validateUrl ( data . url , data . key )
177+ return Subscription . validateItemsUrl ( data . url , data . key )
107178 . then ( function ( ) {
108179 return subscription . save ( ) ;
109180 } )
110181 . then ( function ( ) {
111- if ( this . options . listen )
182+ if ( this . options . listen ) {
112183 this . listener . add ( subscription ) ;
184+ }
113185 // If sync job would finish faster than stream-server
114186 // would add the new subscription, this could result in missed updates,
115187 // but each new stream-server subscription triggers another sync,
@@ -118,6 +190,7 @@ Controller.prototype.subscribe = function (data, job) {
118190 id : subscription . id ,
119191 skip : data . skip
120192 } , 0 , this . options . attempts ) ;
193+
121194 return subscription ;
122195 } . bind ( this ) ) ;
123196} ;
@@ -151,7 +224,7 @@ Controller.prototype.unsubscribe = function (data, job) {
151224 . call ( 'destroy' )
152225
153226 . tap ( function ( s ) {
154- if ( this . options . listen ) this . listener . remove ( s ) ;
227+ if ( this . options . listen ) this . listener . remove ( s . id ) ;
155228 } . bind ( this ) ) ;
156229} ;
157230
@@ -206,7 +279,7 @@ Controller.prototype.synchronize = function (data, job) {
206279
207280 subscriptions = Subscription
208281 . load ( data . id )
209- . then ( listify ) ;
282+ . then ( function ( subscription ) { return [ subscription ] ; } ) ;
210283 }
211284
212285 return subscriptions
@@ -463,29 +536,5 @@ function progressor(job, total) {
463536// };
464537// }
465538
466- function listify ( i ) { return [ i ] ; }
467-
468- function validateUrl ( url , key ) {
469- return new B ( function ( resolve , reject ) {
470- if ( ! ( / ^ ( \/ ? ( u s e r s | g r o u p s ) \/ \d + \/ ? ( p u b l i c a t i o n s \/ i t e m s | i t e m s ) ) / ) . test ( url ) ) {
471- return reject ( new Error ( 'Invalid URL' ) ) ;
472- }
473-
474- var headers = { } ;
475- if ( key ) {
476- headers [ 'Zotero-API-Key' ] = key ;
477- }
478- zotero . request ( {
479- method : 'get' ,
480- path : url ,
481- headers : headers
482- } , null , function ( error , resp ) {
483- // This directly returns all dataserver errors
484- if ( error ) return reject ( error ) ;
485- resolve ( resp ) ;
486- } ) ;
487- } ) ;
488- }
489-
490539// --- Exports ---
491540module . exports = Controller ;
0 commit comments