1+ import { Game } from '@fparchive/flashpoint-archive' ;
2+ import { downloadGameData } from './download' ;
3+ import { fpDatabase } from '.' ;
4+ import * as fs from 'fs-extra' ;
5+ import * as crypto from 'crypto' ;
6+ import * as path from 'path' ;
7+ import { DownloaderStatus , DownloadTask , DownloadTaskStatus , DownloadWorkerState , GameDataSource } from 'flashpoint-launcher' ;
8+ import { axios } from './dns' ;
9+ import { BackState } from './types' ;
10+ import { BackOut } from '@shared/back/types' ;
11+ import { promiseSleep } from './util/misc' ;
12+ import { WrappedEventEmitter } from './util/WrappedEventEmitter' ;
13+ import { EventQueue } from './util/EventQueue' ;
14+
15+ export interface Downloader {
16+ on ( event : string , listener : ( ...args : any [ ] ) => void ) : this;
17+ once ( event : string , listener : ( ...args : any [ ] ) => void ) : this;
18+
19+ on ( event : 'workerChange' , handler : ( workerState : DownloadWorkerState ) => void ) : this;
20+ once ( event : 'workerChange' , handler : ( workerState : DownloadWorkerState ) => void ) : this;
21+
22+ on ( event : 'statusChange' , listener : ( status : DownloaderStatus ) => void ) : this;
23+ once ( event : 'statusChange' , listener : ( status : DownloaderStatus ) => void ) : this;
24+
25+ on ( event : 'taskChange' , listener : ( task : DownloadTask ) => void ) : this;
26+ once ( event : 'taskChange' , listener : ( task : DownloadTask ) => void ) : this;
27+ off ( event : 'taskChange' , listener : ( task : DownloadTask ) => void ) : this;
28+ }
29+
30+ export class Downloader extends WrappedEventEmitter {
31+ private tasks : Record < string , DownloadTask > = { } ; // Queue of tasks to be executed
32+ private workers : DownloadWorker [ ] = [ ] ; // Array of workers
33+ private idleWorkers : DownloadWorker [ ] = [ ] ; // Array of idle workers
34+ public databaseQueue : EventQueue = new EventQueue
35+ public status : DownloaderStatus ;
36+
37+ constructor (
38+ public readonly flashpointPath : string ,
39+ public readonly dataPacksFolderPath : string ,
40+ public readonly imageFolderPath : string ,
41+ public readonly onDemandBaseUrl : string ,
42+ public readonly sources : GameDataSource [ ] ,
43+ private state : BackState ,
44+ workerCount : number
45+ ) {
46+ super ( ) ;
47+ log . debug ( 'Downloads' , `Starting downloader with ${ workerCount } workers` ) ;
48+ for ( let i = 0 ; i < workerCount ; i ++ ) {
49+ const worker = new DownloadWorker ( this , i + 1 ) ;
50+ this . workers . push ( worker ) ;
51+ this . idleWorkers . push ( worker ) ;
52+ }
53+ this . status = 'running' ;
54+ }
55+
56+ public start ( ) {
57+ if ( this . status === 'stopped' ) {
58+ this . status = 'running' ;
59+ for ( let i = 0 ; i < this . workers . length ; i ++ ) {
60+ const nextTask = this . getNextTask ( ) ;
61+ if ( nextTask ) {
62+ this . assignTaskToIdleWorker ( nextTask ) ;
63+ }
64+ }
65+ this . state . socketServer . broadcast ( BackOut . UPDATE_DOWNLOADER_STATUS , this . status ) ;
66+ this . emit ( 'statusChange' , this . status ) ;
67+ }
68+ }
69+
70+ public stop ( ) {
71+ if ( this . status === 'running' ) {
72+ this . status = 'stopped' ;
73+ // Abort all workers and reset them to the idle queue
74+ for ( const worker of this . workers ) {
75+ worker . abort ( ) ;
76+ }
77+ this . idleWorkers = [ ...this . workers ] ; // Reset all workers to idle
78+ this . state . socketServer . broadcast ( BackOut . UPDATE_DOWNLOADER_STATUS , this . status ) ;
79+ this . emit ( 'statusChange' , this . status ) ;
80+ }
81+ }
82+
83+ public async clear ( ) {
84+ this . status = 'stopped' ;
85+ for ( const worker of this . workers ) {
86+ worker . abort ( ) ;
87+ }
88+ await promiseSleep ( 1000 ) ;
89+ this . tasks = { } ;
90+ this . status = 'running' ;
91+ }
92+
93+ public getTotal ( ) : number {
94+ return Object . values ( this . tasks ) . length ;
95+ }
96+
97+ public getTasks ( ) : Record < string , DownloadTask > {
98+ return this . tasks ;
99+ }
100+
101+ public addTask ( game : Game ) : boolean {
102+ if ( ! this . tasks [ game . id ] ) {
103+ const newTask : DownloadTask = {
104+ status : 'waiting' ,
105+ game,
106+ errors : [ ] ,
107+ } ;
108+ this . tasks [ game . id ] = newTask ;
109+ this . assignTaskToIdleWorker ( newTask ) ;
110+ this . emit ( 'taskChange' , newTask ) ;
111+ this . state . socketServer . broadcast ( BackOut . UPDATE_DOWNLOADER_TASK , newTask ) ;
112+ return true ;
113+ }
114+ return false ;
115+ }
116+
117+ public getNextTask ( ) : DownloadTask | undefined {
118+ const nextTaskKey = Object . keys ( this . tasks ) . find ( key => this . tasks [ key ] . status === 'waiting' ) ;
119+ if ( nextTaskKey ) {
120+ return this . tasks [ nextTaskKey ] ;
121+ }
122+ }
123+
124+ private assignTaskToIdleWorker ( task : DownloadTask ) : void {
125+ if ( this . status === 'running' && this . idleWorkers . length > 0 ) {
126+ log . debug ( 'Downloads' , 'Starting task' ) ;
127+ const worker = this . idleWorkers . shift ( ) ; // Get the first idle worker
128+ if ( worker ) {
129+ task . errors = [ ] ;
130+ task . status = 'in_progress' ;
131+ this . emit ( 'taskChange' , task ) ;
132+ this . state . socketServer . broadcast ( BackOut . UPDATE_DOWNLOADER_TASK , task ) ;
133+ worker . assignTask ( task ) ;
134+ }
135+ } else {
136+ log . debug ( 'Downloads' , 'Not running' ) ;
137+ }
138+ }
139+
140+ // Mark the URL as successfully processed
141+ public signalStatus ( worker : DownloadWorker , gameId : string , status : DownloadTaskStatus , errors : string [ ] ) : void {
142+ this . idleWorkers . push ( worker ) ;
143+ if ( this . tasks [ gameId ] ) {
144+ this . tasks [ gameId ] . status = status ;
145+ this . tasks [ gameId ] . errors = errors ;
146+ log . info ( 'Downloader' , `Task: ${ gameId } - Status: ${ status } ` ) ;
147+ this . state . socketServer . broadcast ( BackOut . UPDATE_DOWNLOADER_TASK , this . tasks [ gameId ] ) ;
148+ this . emit ( 'taskChange' , this . tasks [ gameId ] ) ;
149+ }
150+
151+ const nextTask = this . getNextTask ( ) ;
152+ if ( nextTask ) {
153+ this . assignTaskToIdleWorker ( nextTask ) ;
154+ }
155+ }
156+
157+ public onWorkerUpdate ( worker : DownloadWorker ) {
158+ const state = worker . getState ( ) ;
159+ this . state . socketServer . broadcast ( BackOut . UPDATE_DOWNLOADER_STATE_WORKER , state ) ;
160+ this . emit ( 'workerChange' , state ) ;
161+ }
162+ }
163+
164+ class DownloadWorker {
165+ private abortController : AbortController | null = null ;
166+ private step = 1 ;
167+ private totalSteps = 3 ;
168+ private stepProgress = 0.0 ;
169+ private statusText = '' ;
170+
171+ constructor (
172+ private downloader : Downloader ,
173+ private id : number ,
174+ ) { }
175+
176+ public getState ( ) : DownloadWorkerState {
177+ return {
178+ id : this . id ,
179+ step : this . step ,
180+ totalSteps : this . totalSteps ,
181+ stepProgress : this . stepProgress ,
182+ text : this . statusText ,
183+ }
184+ }
185+
186+ // Start the worker to begin processing URLs
187+ public async assignTask ( task : DownloadTask ) : Promise < void > {
188+ this . abortController = new AbortController ( ) ; // Create a new AbortController for the task
189+ const { signal } = this . abortController ;
190+ await this . execute ( signal , task ) ;
191+ }
192+
193+ private async execute ( signal : AbortSignal , task : DownloadTask ) : Promise < void > {
194+ this . step = 1 ;
195+ this . stepProgress = 0 ;
196+ this . statusText = 'Downloading logo...' ;
197+ this . downloader . onWorkerUpdate ( this ) ;
198+ const { game } = task ;
199+ const gameId = game . id ;
200+ const errors : string [ ] = [ ] ;
201+
202+ // Download game images
203+ const logoSubPath = `Logos/${ gameId . substring ( 0 , 2 ) } /${ gameId . substring ( 2 , 4 ) } /${ gameId } .png` ;
204+ const ssSubPath = `Screenshots/${ gameId . substring ( 0 , 2 ) } /${ gameId . substring ( 2 , 4 ) } /${ gameId } .png`
205+ const logoPath = path . join ( this . downloader . flashpointPath , this . downloader . imageFolderPath , logoSubPath ) ;
206+ const ssPath = path . join ( this . downloader . flashpointPath , this . downloader . imageFolderPath , ssSubPath ) ;
207+
208+ if ( ! fs . existsSync ( logoPath ) ) {
209+ try {
210+ await this . downloadImage ( logoSubPath , signal ) ;
211+ } catch ( e ) {
212+ this . downloader . signalStatus ( this , gameId , 'failure' , errors ) ;
213+ errors . push ( `${ e } ` ) ;
214+ }
215+ }
216+
217+ if ( signal . aborted ) {
218+ this . downloader . signalStatus ( this , gameId , 'failure' , errors ) ;
219+ return ;
220+ }
221+
222+ this . downloader . onWorkerUpdate ( this ) ;
223+ this . statusText = 'Downloading screenshot...' ;
224+ this . step = 2 ;
225+
226+ if ( ! fs . existsSync ( ssPath ) ) {
227+ try {
228+ await this . downloadImage ( ssSubPath , signal ) ;
229+ } catch ( e ) {
230+ errors . push ( `${ e } ` ) ;
231+ }
232+ }
233+
234+ if ( signal . aborted ) {
235+ this . downloader . signalStatus ( this , gameId , 'failure' , errors ) ;
236+ return ;
237+ }
238+
239+ this . downloader . onWorkerUpdate ( this ) ;
240+ this . statusText = 'Downloading game data...' ;
241+ this . step = 3 ;
242+
243+ // Download game data
244+ if ( game . gameData ) {
245+ this . stepProgress = 0 ;
246+ for ( const gameData of game . gameData ) {
247+ // Calc the path on disk and check if the file already matches
248+ const realPath = path . join ( this . downloader . flashpointPath , this . downloader . dataPacksFolderPath , `${ gameData . gameId } -${ ( new Date ( gameData . dateAdded ) ) . getTime ( ) } .zip` ) ;
249+ if ( fs . existsSync ( realPath ) ) {
250+ if ( gameData . path !== realPath || gameData . presentOnDisk === false ) {
251+ gameData . path = realPath ;
252+ gameData . presentOnDisk = true ;
253+ game . activeDataOnDisk = true ;
254+ await new Promise < void > ( ( resolve , reject ) => {
255+ this . downloader . databaseQueue . push ( async ( ) => {
256+ try {
257+ await fpDatabase . saveGameData ( gameData ) ;
258+ await fpDatabase . saveGame ( game ) ;
259+ resolve ( ) ;
260+ } catch ( err ) {
261+ reject ( err ) ;
262+ }
263+ } )
264+ } )
265+ }
266+ continue ;
267+ }
268+
269+ // Did not find matching file, try and download
270+ try {
271+ await downloadGameData ( gameData . id , path . join ( this . downloader . flashpointPath , this . downloader . dataPacksFolderPath ) , this . downloader . sources , signal , ( progress ) => {
272+ this . stepProgress = progress ;
273+ this . downloader . onWorkerUpdate ( this ) ;
274+ } , ( ) => { } , this . downloader . databaseQueue ) ;
275+ } catch ( e ) {
276+ errors . push ( `${ e } ` ) ;
277+ }
278+
279+ this . statusText = 'Done' ;
280+ this . stepProgress = 1 ;
281+ this . downloader . onWorkerUpdate ( this ) ;
282+
283+ if ( signal . aborted ) {
284+ this . downloader . signalStatus ( this , gameId , 'failure' , errors ) ;
285+ return ;
286+ }
287+ }
288+ }
289+
290+ if ( errors . length > 0 ) {
291+ this . downloader . signalStatus ( this , gameId , 'failure' , errors ) ;
292+ } else {
293+ this . downloader . signalStatus ( this , gameId , 'success' , errors ) ;
294+ }
295+ }
296+
297+ private async downloadImage ( subPath : string , signal : AbortSignal ) {
298+ let url = this . downloader . onDemandBaseUrl + ( this . downloader . onDemandBaseUrl . endsWith ( '/' ) ? '' : '/' ) + subPath ;
299+ await axios . get ( url , { responseType : 'arraybuffer' , signal } )
300+ . then ( async ( res ) => {
301+ // Save response to image file
302+ const imageData = res . data ;
303+
304+ const imageFolder = path . join ( this . downloader . flashpointPath , this . downloader . imageFolderPath ) ;
305+ const filePath = path . join ( imageFolder , subPath ) ;
306+
307+ await fs . ensureDir ( path . dirname ( filePath ) ) ;
308+ await fs . promises . writeFile ( filePath , imageData , 'binary' ) ;
309+ } ) ;
310+ }
311+
312+ private async calculateFileHash ( filePath : string ) : Promise < string > {
313+ return new Promise ( ( resolve , reject ) => {
314+ const hash = crypto . createHash ( 'sha256' ) ;
315+ const stream = fs . createReadStream ( filePath ) ;
316+
317+ stream . on ( 'data' , ( chunk ) => hash . update ( chunk ) ) ;
318+ stream . on ( 'end' , ( ) => resolve ( hash . digest ( 'hex' ) ) ) ;
319+ stream . on ( 'error' , ( error ) => reject ( error ) ) ;
320+ } ) ;
321+ }
322+
323+ // Abort the current task
324+ public abort ( ) : void {
325+ if ( this . abortController ) {
326+ this . abortController . abort ( ) ; // Trigger the abort signal
327+ }
328+ }
329+ }
0 commit comments