Skip to content

Commit 1ca0bdf

Browse files
authored
Merge pull request #457 from boostcampwm-2024/refactor/merge-crawling-logic
♻️ refactor: 전체 피드 크롤링 로직 통합
2 parents d1328fa + 6ebc4aa commit 1ca0bdf

File tree

20 files changed

+323
-228
lines changed

20 files changed

+323
-228
lines changed

feed-crawler/src/common/constant.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ export const CONNECTION_LIMIT = 50;
22
export const redisConstant = {
33
FEED_RECENT_ALL_KEY: 'feed:recent:*',
44
FEED_AI_QUEUE: `feed:ai:queue`,
5+
FULL_FEED_CRAWL_QUEUE: `feed:full-crawl:queue`,
56
};
67

78
export const ONE_MINUTE = 60 * 1000;

feed-crawler/src/common/parser/base-feed-parser.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ export abstract class BaseFeedParser {
4343
return detailedFeeds;
4444
}
4545

46+
async parseAllFeeds(rssObj: RssObj, xmlData: string): Promise<FeedDetail[]> {
47+
const rawFeeds = this.extractRawFeeds(xmlData);
48+
const detailedFeeds = await this.convertToFeedDetails(rssObj, rawFeeds);
49+
50+
return detailedFeeds;
51+
}
52+
4653
abstract canParse(xmlData: string): boolean;
4754
protected abstract extractRawFeeds(xmlData: string): RawFeed[];
4855

feed-crawler/src/common/parser/feed-parser-manager.ts

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class FeedParserManager {
3434

3535
const parser = this.findSuitableParser(xmlData);
3636
if (!parser) {
37-
throw new Error(`지원하지 않는 피드 형식: ${rssObj.rssUrl} / `);
37+
throw new Error(`지원하지 않는 피드 형식: ${rssObj.rssUrl}`);
3838
}
3939

4040
return await parser.parseFeed(rssObj, xmlData, startTime);
@@ -44,6 +44,36 @@ export class FeedParserManager {
4444
}
4545
}
4646

47+
async fetchAndParseAll(rssObj: RssObj): Promise<FeedDetail[]> {
48+
try {
49+
const response = await fetch(rssObj.rssUrl, {
50+
headers: {
51+
Accept:
52+
'application/rss+xml, application/xml, text/xml, application/atom+xml',
53+
},
54+
});
55+
56+
if (!response.ok) {
57+
throw new Error(`${rssObj.rssUrl}에서 피드 데이터 가져오기 실패`);
58+
}
59+
60+
const xmlData = await response.text();
61+
62+
const parser = this.findSuitableParser(xmlData);
63+
if (!parser) {
64+
throw new Error(`지원하지 않는 피드 형식: ${rssObj.rssUrl}`);
65+
}
66+
logger.info(
67+
`${rssObj.blogName}: ${parser.constructor.name} 사용 (전체 피드)`,
68+
);
69+
70+
return await parser.parseAllFeeds(rssObj, xmlData);
71+
} catch (error) {
72+
logger.warn(`[${rssObj.rssUrl}] 전체 피드 파싱 중 오류 발생: ${error}`);
73+
return [];
74+
}
75+
}
76+
4777
private findSuitableParser(xmlData: string): BaseFeedParser | null {
4878
for (const parser of this.parsers) {
4979
if (parser.canParse(xmlData)) {

feed-crawler/src/common/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,9 @@ export type FeedAIQueueItem = {
4040
tagList?: string[];
4141
summary?: string;
4242
};
43+
44+
export interface FullFeedCrawlMessage {
45+
rssId: number;
46+
timestamp: number;
47+
deathCount: number;
48+
}

feed-crawler/src/container.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ import { FeedRepository } from './repository/feed.repository';
77
import { RedisConnection } from './common/redis-access';
88
import { TagMapRepository } from './repository/tag-map.repository';
99
import { ParserUtil } from './common/parser/utils/parser-util';
10-
import { ClaudeService } from './claude.service';
10+
import { ClaudeEventWorker } from './event_worker/workers/claude-event-worker';
1111
import { FeedParserManager } from './common/parser/feed-parser-manager';
1212
import { Rss20Parser } from './common/parser/formats/rss20-parser';
1313
import { Atom10Parser } from './common/parser/formats/atom10-parser';
1414
import { FeedCrawler } from './feed-crawler';
15+
import { FullFeedCrawlEventWorker } from './event_worker/workers/full-feed-crawl-event-worker';
1516

1617
container.registerSingleton<DatabaseConnection>(
1718
DEPENDENCY_SYMBOLS.DatabaseConnection,
@@ -38,9 +39,9 @@ container.registerSingleton<TagMapRepository>(
3839
TagMapRepository,
3940
);
4041

41-
container.registerSingleton<ClaudeService>(
42-
DEPENDENCY_SYMBOLS.ClaudeService,
43-
ClaudeService,
42+
container.registerSingleton<ClaudeEventWorker>(
43+
DEPENDENCY_SYMBOLS.ClaudeEventWorker,
44+
ClaudeEventWorker,
4445
);
4546

4647
container.registerSingleton<ParserUtil>(
@@ -68,4 +69,9 @@ container.registerSingleton<FeedCrawler>(
6869
FeedCrawler,
6970
);
7071

72+
container.registerSingleton<FullFeedCrawlEventWorker>(
73+
DEPENDENCY_SYMBOLS.FullFeedCrawlEventWorker,
74+
FullFeedCrawlEventWorker,
75+
);
76+
7177
export { container };
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import { RedisConnection } from '../common/redis-access';
2+
import logger from '../common/logger';
3+
4+
export abstract class AbstractQueueWorker<T> {
5+
protected readonly nameTag: string;
6+
protected readonly redisConnection: RedisConnection;
7+
8+
constructor(nameTag: string, redisConnection: RedisConnection) {
9+
this.nameTag = nameTag;
10+
this.redisConnection = redisConnection;
11+
}
12+
13+
async start(): Promise<void> {
14+
logger.info(`========== ${this.nameTag} 작업 시작 ==========`);
15+
const startTime = Date.now();
16+
17+
try {
18+
await this.processQueue();
19+
} catch (error) {
20+
logger.error(`${this.nameTag} 처리 중 오류 발생: ${error.message}`);
21+
}
22+
23+
const endTime = Date.now();
24+
const executionTime = endTime - startTime;
25+
logger.info(`${this.nameTag} 실행 시간: ${executionTime / 1000}seconds`);
26+
logger.info(`========== ${this.nameTag} 작업 완료 ==========`);
27+
}
28+
29+
protected abstract processQueue(): Promise<void>;
30+
protected abstract getQueueKey(): string;
31+
protected abstract parseQueueMessage(message: string): T;
32+
protected abstract processItem(item: T): Promise<void>;
33+
34+
protected abstract handleFailure(item: T, error: Error): Promise<void>;
35+
}

feed-crawler/src/claude.service.ts renamed to feed-crawler/src/event_worker/workers/claude-event-worker.ts

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,57 @@
11
import 'reflect-metadata';
2-
import { injectable } from 'tsyringe';
2+
import { inject, injectable } from 'tsyringe';
33
import Anthropic from '@anthropic-ai/sdk';
4-
import { ClaudeResponse, FeedAIQueueItem } from './common/types';
5-
import { TagMapRepository } from './repository/tag-map.repository';
6-
import { FeedRepository } from './repository/feed.repository';
7-
import logger from './common/logger';
8-
import { PROMPT_CONTENT, redisConstant } from './common/constant';
9-
import { RedisConnection } from './common/redis-access';
4+
import { ClaudeResponse, FeedAIQueueItem } from '../../common/types';
5+
import { TagMapRepository } from '../../repository/tag-map.repository';
6+
import { FeedRepository } from '../../repository/feed.repository';
7+
import logger from '../../common/logger';
8+
import { PROMPT_CONTENT, redisConstant } from '../../common/constant';
9+
import { RedisConnection } from '../../common/redis-access';
10+
import { AbstractQueueWorker } from '../abstract-queue-worker';
11+
import { DEPENDENCY_SYMBOLS } from '../../types/dependency-symbols';
1012

1113
@injectable()
12-
export class ClaudeService {
14+
export class ClaudeEventWorker extends AbstractQueueWorker<FeedAIQueueItem> {
1315
private readonly client: Anthropic;
14-
private readonly nameTag: string;
1516

1617
constructor(
18+
@inject(DEPENDENCY_SYMBOLS.TagMapRepository)
1719
private readonly tagMapRepository: TagMapRepository,
20+
@inject(DEPENDENCY_SYMBOLS.FeedRepository)
1821
private readonly feedRepository: FeedRepository,
19-
private readonly redisConnection: RedisConnection,
22+
@inject(DEPENDENCY_SYMBOLS.RedisConnection)
23+
redisConnection: RedisConnection,
2024
) {
25+
super('[AI Service]', redisConnection);
2126
this.client = new Anthropic({
2227
apiKey: process.env.AI_API_KEY,
2328
});
24-
this.nameTag = '[AI Service]';
2529
}
2630

27-
async startRequestAI() {
31+
protected async processQueue(): Promise<void> {
2832
const feeds = await this.loadFeeds();
29-
await Promise.all(feeds.map((feed) => this.processFeed(feed)));
33+
await Promise.all(feeds.map((feed) => this.processItem(feed)));
34+
}
35+
36+
protected getQueueKey(): string {
37+
return redisConstant.FEED_AI_QUEUE;
38+
}
39+
40+
protected parseQueueMessage(message: string): FeedAIQueueItem {
41+
return JSON.parse(message);
42+
}
43+
44+
protected async processItem(feed: FeedAIQueueItem): Promise<void> {
45+
try {
46+
const aiData = await this.requestAI(feed);
47+
await this.saveAIResult(aiData);
48+
} catch (error) {
49+
logger.error(
50+
`${this.nameTag} ${feed.id} 처리 중 에러 발생: ${error.message}`,
51+
error.stack,
52+
);
53+
await this.handleFailure(feed, error);
54+
}
3055
}
3156

3257
private async loadFeeds() {
@@ -49,19 +74,6 @@ export class ClaudeService {
4974
}
5075
}
5176

52-
private async processFeed(feed: FeedAIQueueItem) {
53-
try {
54-
const aiData = await this.requestAI(feed);
55-
await this.saveAIResult(aiData);
56-
} catch (error) {
57-
logger.error(
58-
`${this.nameTag} ${feed.id} 처리 중 에러 발생: ${error.message}`,
59-
error.stack,
60-
);
61-
await this.handleFailure(feed, error);
62-
}
63-
}
64-
6577
private async requestAI(feed: FeedAIQueueItem) {
6678
logger.info(`${this.nameTag} AI 요청: ${JSON.stringify(feed)}`);
6779
const params: Anthropic.MessageCreateParams = {
@@ -93,7 +105,10 @@ export class ClaudeService {
93105
await this.feedRepository.updateSummary(feed.id, feed.summary);
94106
}
95107

96-
private async handleFailure(feed: FeedAIQueueItem, e: Error) {
108+
protected async handleFailure(
109+
feed: FeedAIQueueItem,
110+
e: Error,
111+
): Promise<void> {
97112
if (feed.deathCount < 3) {
98113
feed.deathCount++;
99114
await this.redisConnection.rpush(redisConstant.FEED_AI_QUEUE, [
@@ -102,7 +117,7 @@ export class ClaudeService {
102117
logger.warn(`${this.nameTag} ${feed.id} 재시도 (${feed.deathCount})`);
103118
} else {
104119
logger.error(
105-
`${this.nameTag} ${feed.id} 의 Death Count 3회 이상 발생 AI 요청 금지`,
120+
`${this.nameTag} ${feed.id} 의 Death Count 3회 이상 발생. AI 요청 스킵`,
106121
);
107122
await this.feedRepository.updateNullSummary(feed.id);
108123
}
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { inject, injectable } from 'tsyringe';
2+
import { RedisConnection } from '../../common/redis-access';
3+
import { RssRepository } from '../../repository/rss.repository';
4+
import { FeedCrawler } from '../../feed-crawler';
5+
import { DEPENDENCY_SYMBOLS } from '../../types/dependency-symbols';
6+
import { redisConstant } from '../../common/constant';
7+
import logger from '../../common/logger';
8+
import { AbstractQueueWorker } from '../abstract-queue-worker';
9+
import { FullFeedCrawlMessage } from '../../common/types';
10+
11+
@injectable()
12+
export class FullFeedCrawlEventWorker extends AbstractQueueWorker<FullFeedCrawlMessage> {
13+
constructor(
14+
@inject(DEPENDENCY_SYMBOLS.RedisConnection)
15+
redisConnection: RedisConnection,
16+
@inject(DEPENDENCY_SYMBOLS.RssRepository)
17+
private readonly rssRepository: RssRepository,
18+
@inject(DEPENDENCY_SYMBOLS.FeedCrawler)
19+
private readonly feedCrawler: FeedCrawler,
20+
) {
21+
super('[Full Feed Crawler]', redisConnection);
22+
}
23+
24+
protected async processQueue(): Promise<void> {
25+
const rssIdMessage = await this.redisConnection.rpop(this.getQueueKey());
26+
27+
if (!rssIdMessage) {
28+
logger.info('처리할 전체 피드 크롤링 요청이 없습니다.');
29+
return;
30+
}
31+
32+
const crawlMessage = this.parseQueueMessage(rssIdMessage);
33+
await this.processItem(crawlMessage);
34+
}
35+
36+
protected getQueueKey(): string {
37+
return redisConstant.FULL_FEED_CRAWL_QUEUE;
38+
}
39+
40+
protected parseQueueMessage(message: string): FullFeedCrawlMessage {
41+
return JSON.parse(message);
42+
}
43+
44+
protected async processItem(
45+
crawlMessage: FullFeedCrawlMessage,
46+
): Promise<void> {
47+
const rssId = crawlMessage.rssId;
48+
49+
logger.info(
50+
`${this.nameTag} RSS ID ${rssId}에 대한 전체 피드 크롤링을 시작합니다.`,
51+
);
52+
53+
const rssObj = await this.rssRepository.selectRssById(rssId);
54+
if (!rssObj) {
55+
logger.warn(`${this.nameTag} RSS ID ${rssId}를 찾을 수 없습니다.`);
56+
return;
57+
}
58+
59+
try {
60+
const insertedFeeds = await this.feedCrawler.startFullCrawl(rssObj);
61+
logger.info(
62+
`${this.nameTag} RSS ID ${rssId}에서 ${insertedFeeds.length}개의 피드를 처리했습니다.`,
63+
);
64+
} catch (error) {
65+
logger.error(
66+
`${this.nameTag} RSS ID ${rssId} 처리 중 오류 발생: ${error.message}`,
67+
);
68+
await this.handleFailure(crawlMessage, error);
69+
}
70+
}
71+
72+
protected async handleFailure(
73+
crawlMessage: FullFeedCrawlMessage,
74+
error: Error,
75+
): Promise<void> {
76+
if (crawlMessage.deathCount < 3) {
77+
crawlMessage.deathCount++;
78+
await this.redisConnection.rpush(redisConstant.FULL_FEED_CRAWL_QUEUE, [
79+
JSON.stringify(crawlMessage),
80+
]);
81+
logger.error(
82+
`${this.nameTag} ${crawlMessage.rssId} 의 Death Count 3회 이상 발생. 크롤링 스킵 처리`,
83+
);
84+
} else {
85+
logger.error(
86+
`${this.nameTag} RSS ID ${crawlMessage.rssId} 전체 피드 크롤링 실패: ${error.message}`,
87+
);
88+
}
89+
}
90+
}

feed-crawler/src/feed-crawler.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,27 @@ export class FeedCrawler {
4949
logger.info('==========작업 완료==========');
5050
}
5151

52+
async startFullCrawl(rssObj: RssObj): Promise<FeedDetail[]> {
53+
logger.info(`전체 피드 크롤링 시작: ${rssObj.blogName}(${rssObj.rssUrl})`);
54+
55+
const newFeeds = await this.feedParserManager.fetchAndParseAll(rssObj);
56+
57+
if (!newFeeds.length) {
58+
logger.info(`${rssObj.blogName}에서 가져올 피드가 없습니다.`);
59+
return [];
60+
}
61+
62+
logger.info(
63+
`${rssObj.blogName}에서 ${newFeeds.length}개의 피드를 가져왔습니다.`,
64+
);
65+
const insertedData: FeedDetail[] = await this.feedRepository.insertFeeds(
66+
newFeeds,
67+
);
68+
await this.feedRepository.saveAiQueue(insertedData);
69+
70+
return insertedData;
71+
}
72+
5273
private feedGroupByRss(
5374
rssObjects: RssObj[],
5475
startTime: Date,

0 commit comments

Comments
 (0)