@@ -2,7 +2,6 @@ import { Injectable, Logger } from '@nestjs/common';
22import { Cron , CronExpression } from '@nestjs/schedule' ;
33import { RedisService } from '../redis/redis.service' ;
44import { PageService } from '../page/page.service' ;
5-
65@Injectable ( )
76export class TasksService {
87 private readonly logger = new Logger ( TasksService . name ) ;
@@ -11,38 +10,84 @@ export class TasksService {
1110 private readonly pageService : PageService ,
1211 ) { }
1312
14- @Cron ( CronExpression . EVERY_30_SECONDS )
13+ @Cron ( CronExpression . EVERY_10_SECONDS )
1514 async handleCron ( ) {
1615 this . logger . log ( '스케줄러 시작' ) ;
1716 // 시작 시간
1817 const startTime = performance . now ( ) ;
19-
20- // redis의 모든 값을 가져와서 database에 저장
21- const keys = await this . redisService . getAllKeys ( ) ;
22- const pages = [ ] ;
23- for await ( const key of keys ) {
24- const { title, content } = await this . redisService . get ( key ) ;
25- const jsonContent = JSON . parse ( content ) ;
26- Object . assign ( {
27- id : parseInt ( key ) ,
18+ const keys = await this . redisService . getAllKeys ( 'page:*' ) ;
19+ Promise . all (
20+ keys . map ( async ( key ) => {
21+ const { title, content } = await this . redisService . get ( key ) ;
22+ const pageId = parseInt ( key . split ( ':' ) [ 1 ] ) ;
23+ if ( title === null ) {
24+ const jsonContent = JSON . parse ( content ) ;
25+ await this . pageService . updatePage ( pageId , {
26+ content : jsonContent ,
27+ } ) ;
28+ } else if ( content === null ) {
29+ await this . pageService . updatePage ( pageId , {
30+ title,
31+ } ) ;
32+ } else {
33+ const jsonContent = JSON . parse ( content ) ;
34+ await this . pageService . updatePage ( pageId , {
35+ title,
36+ content : jsonContent ,
37+ } ) ;
38+ await this . redisService . delete ( key ) ;
39+ }
40+ } ) ,
41+ )
42+ . then ( ( ) => {
43+ const endTime = performance . now ( ) ;
44+ this . logger . log ( `갱신 개수 : ${ keys . length } 개` ) ;
45+ this . logger . log ( `실행 시간 : ${ ( endTime - startTime ) / 1000 } 초` ) ;
46+ } )
47+ . catch ( ( err ) => {
48+ this . logger . error ( err ) ;
2849 } ) ;
29- pages . push ( {
30- id : parseInt ( key ) ,
31- title,
32- content : jsonContent ,
33- version : 1 ,
34- } ) ;
35- // this.pageService.updatePage(parseInt(key), {
36- // title,
37- // content: jsonContent,
38- // });
39- // this.logger.log('데이터베이스 갱신');
40- }
41- await this . pageService . updateBulkPage ( pages ) ;
42-
43- // 끝 시간
44- const endTime = performance . now ( ) ;
45- this . logger . log ( `갱신 개수 : ${ pages . length } 개` ) ;
46- this . logger . log ( `실행 시간 : ${ ( endTime - startTime ) / 1000 } 초` ) ;
50+ // scan stream을 가져온다.
51+ // const stream = this.redisService.createStream();
52+ // let totalCount = 0;
53+ // stream.on('data', (keys) => {
54+ // console.log(keys);
55+ // totalCount += keys.length;
56+ // stream.pause();
57+ // Promise.all(
58+ // keys.map(async (key) => {
59+ // const { title, content } = await this.redisService.get(key);
60+ // if (title === null) {
61+ // const jsonContent = JSON.parse(content);
62+ // await this.pageService.updatePage(parseInt(key), {
63+ // content: jsonContent,
64+ // });
65+ // } else if (content === null) {
66+ // await this.pageService.updatePage(parseInt(key), {
67+ // title,
68+ // });
69+ // } else {
70+ // const jsonContent = JSON.parse(content);
71+ // await this.pageService.updatePage(parseInt(key), {
72+ // title,
73+ // content: jsonContent,
74+ // });
75+ // await this.redisService.delete(key);
76+ // }
77+ // }),
78+ // )
79+ // .then(() => {
80+ // stream.resume();
81+ // })
82+ // .catch((err) => {
83+ // this.logger.error(err);
84+ // stream.resume();
85+ // });
86+ // });
87+ // stream.on('end', () => {
88+ // const endTime = performance.now();
89+ // this.logger.log(`갱신 개수 : ${totalCount}개`);
90+ // this.logger.log(`실행 시간 : ${(endTime - startTime) / 1000}초`);
91+ // });
4792 }
4893}
0 commit comments