Skip to content

Commit ad6394d

Browse files
authored
Fix/search v4 (#201)
* fix(search): filter out tweets with deleted parents * test(search): add more unit tests * refactor(search): elasticsearch delete tweets job in a batch
1 parent f590ff6 commit ad6394d

File tree

9 files changed

+941
-219
lines changed

9 files changed

+941
-219
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export class EsDeleteTweetsDto {
2+
tweet_ids: string[];
3+
}

src/background-jobs/elasticsearch/es-delete-tweet.service.spec.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { EsDeleteTweetJobService } from './es-delete-tweet.service';
33
import { getQueueToken } from '@nestjs/bull';
44
import { QUEUE_NAMES } from '../constants/queue.constants';
55
import type { Queue } from 'bull';
6-
import { EsSyncTweetDto } from './dtos/es-sync-tweet.dto';
6+
import { EsDeleteTweetsDto } from './dtos/es-delete-tweets.dto';
77

88
describe('EsDeleteTweetJobService', () => {
99
let service: EsDeleteTweetJobService;
@@ -37,7 +37,7 @@ describe('EsDeleteTweetJobService', () => {
3737

3838
describe('queueDeleteTweet', () => {
3939
it('should queue a delete tweet job successfully', async () => {
40-
const dto = { tweet_id: 'tweet-123' };
40+
const dto = { tweet_ids: ['tweet-123', 'tweet-321'] };
4141
const mock_job = { id: 'job-123', data: dto };
4242

4343
mock_queue.add.mockResolvedValue(mock_job as any);
@@ -56,7 +56,7 @@ describe('EsDeleteTweetJobService', () => {
5656
});
5757

5858
it('should queue job with custom priority and delay', async () => {
59-
const dto = { tweet_id: 'tweet-123' };
59+
const dto = { tweet_ids: ['tweet-123', 'tweet-321'] };
6060
const custom_priority = 5;
6161
const custom_delay = 1000;
6262
const mock_job = { id: 'job-123', data: dto };
@@ -76,7 +76,7 @@ describe('EsDeleteTweetJobService', () => {
7676
});
7777

7878
it('should handle queue errors', async () => {
79-
const dto: EsSyncTweetDto = { tweet_id: 'tweet-123' };
79+
const dto: EsDeleteTweetsDto = { tweet_ids: ['tweet-123', 'tweet-321'] };
8080
const error = new Error('Queue error');
8181

8282
mock_queue.add.mockRejectedValue(error);

src/background-jobs/elasticsearch/es-delete-tweet.service.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@ import type { Queue } from 'bull';
44
import { JOB_DELAYS, JOB_NAMES, JOB_PRIORITIES, QUEUE_NAMES } from '../constants/queue.constants';
55
import { BackgroundJobsService } from 'src/background-jobs/background-jobs';
66
import { EsSyncTweetDto } from './dtos/es-sync-tweet.dto';
7+
import { EsDeleteTweetsDto } from './dtos/es-delete-tweets.dto';
78

89
@Injectable()
9-
export class EsDeleteTweetJobService extends BackgroundJobsService<EsSyncTweetDto> {
10+
export class EsDeleteTweetJobService extends BackgroundJobsService<EsDeleteTweetsDto> {
1011
constructor(@InjectQueue(QUEUE_NAMES.ELASTICSEARCH) private elasticsearch_queue: Queue) {
1112
super(
1213
elasticsearch_queue,
@@ -16,7 +17,7 @@ export class EsDeleteTweetJobService extends BackgroundJobsService<EsSyncTweetDt
1617
);
1718
}
1819

19-
async queueDeleteTweet(dto: EsSyncTweetDto, priority?: number, delay?: number) {
20+
async queueDeleteTweet(dto: EsDeleteTweetsDto, priority?: number, delay?: number) {
2021
return await this.queueJob(
2122
dto,
2223
priority ?? this.priority,

src/background-jobs/elasticsearch/es-sync.processor.spec.ts

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ describe('EsSyncProcessor', () => {
2929
const mock_elasticsearch_service = {
3030
index: jest.fn(),
3131
delete: jest.fn(),
32+
bulk: jest.fn(),
3233
updateByQuery: jest.fn(),
3334
deleteByQuery: jest.fn(),
3435
};
@@ -218,63 +219,75 @@ describe('EsSyncProcessor', () => {
218219
it('should delete a tweet successfully', async () => {
219220
const job = {
220221
data: {
221-
tweet_id: '0c059899-f706-4c8f-97d7-ba2e9fc22d6d',
222+
tweet_ids: ['tweet-123', 'tweet-321'],
222223
},
223224
} as Job;
224225

225-
mock_elasticsearch_service.delete.mockResolvedValue({} as any);
226+
mock_elasticsearch_service.bulk.mockResolvedValue({} as any);
226227

227228
const logger_spy = jest.spyOn(Logger.prototype, 'log');
228229

229230
await processor.handleDeleteTweet(job);
230231

231-
expect(mock_elasticsearch_service.delete).toHaveBeenCalledWith({
232-
index: ELASTICSEARCH_INDICES.TWEETS,
233-
id: '0c059899-f706-4c8f-97d7-ba2e9fc22d6d',
232+
expect(mock_elasticsearch_service.bulk).toHaveBeenCalledWith({
233+
body: [
234+
{ delete: { _index: ELASTICSEARCH_INDICES.TWEETS, _id: 'tweet-123' } },
235+
{ delete: { _index: ELASTICSEARCH_INDICES.TWEETS, _id: 'tweet-321' } },
236+
],
234237
});
235-
expect(logger_spy).toHaveBeenCalledWith(
236-
'Deleted tweet 0c059899-f706-4c8f-97d7-ba2e9fc22d6d from Elasticsearch'
237-
);
238+
expect(logger_spy).toHaveBeenCalledWith('Deleted 2 tweets from Elasticsearch');
238239
});
239240

240241
it('should skip if tweet not found in ES (404)', async () => {
241242
const job = {
242243
data: {
243-
tweet_id: '0c059899-f706-4c8f-97d7-ba2e9fc22d6d',
244+
tweet_ids: ['tweet-123', 'tweet-321'],
244245
},
245246
} as Job;
246247

247-
const error = {
248-
meta: { statusCode: 404 },
249-
};
250-
mock_elasticsearch_service.delete.mockRejectedValue(error);
248+
mock_elasticsearch_service.bulk.mockResolvedValue({
249+
errors: true,
250+
items: [
251+
{
252+
delete: {
253+
_id: 'tweet-123',
254+
status: 404,
255+
error: { type: 'document_missing_exception' },
256+
},
257+
},
258+
{
259+
delete: {
260+
_id: 'tweet-321',
261+
status: 404,
262+
error: { type: 'document_missing_exception' },
263+
},
264+
},
265+
],
266+
});
251267

252268
const logger_spy = jest.spyOn(Logger.prototype, 'warn');
253269

254270
await processor.handleDeleteTweet(job);
255271

256-
expect(logger_spy).toHaveBeenCalledWith(
257-
'Tweet 0c059899-f706-4c8f-97d7-ba2e9fc22d6d not found in ES, skipping delete'
258-
);
272+
expect(logger_spy).toHaveBeenCalledWith('Tweet tweet-123 not found in ES, skipping');
273+
expect(logger_spy).toHaveBeenCalledWith('Tweet tweet-321 not found in ES, skipping');
259274
});
260275

261276
it('should handle delete errors', async () => {
262277
const job = {
263278
data: {
264-
tweet_id: '0c059899-f706-4c8f-97d7-ba2e9fc22d6d',
279+
tweet_ids: ['tweet-123', 'tweet-321'],
265280
},
266281
} as Job;
267282

268-
const error = new Error('Delete failed');
269-
mock_elasticsearch_service.delete.mockRejectedValue(error);
283+
const error = new Error('Bulk delete failed');
284+
mock_elasticsearch_service.bulk.mockRejectedValue(error);
270285

271286
const logger_spy = jest.spyOn(Logger.prototype, 'error');
272287

273-
await expect(processor.handleDeleteTweet(job)).rejects.toThrow('Delete failed');
274-
expect(logger_spy).toHaveBeenCalledWith(
275-
'Failed to delete tweet 0c059899-f706-4c8f-97d7-ba2e9fc22d6d:',
276-
error
277-
);
288+
await expect(processor.handleDeleteTweet(job)).rejects.toThrow('Bulk delete failed');
289+
290+
expect(logger_spy).toHaveBeenCalledWith('Bulk delete failed:', error);
278291
});
279292
});
280293

src/background-jobs/elasticsearch/es-sync.processor.ts

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { User, UserFollows } from 'src/user/entities';
1212
import { EsSyncUserDto } from './dtos/es-sync-user.dto';
1313
import { EsSyncFollowDto } from './dtos/es-sync-follow.dto';
1414
import { TweetType } from 'src/shared/enums/tweet-types.enum';
15+
import { EsDeleteTweetsDto } from './dtos/es-delete-tweets.dto';
1516

1617
@Processor(QUEUE_NAMES.ELASTICSEARCH)
1718
export class EsSyncProcessor {
@@ -77,23 +78,41 @@ export class EsSyncProcessor {
7778
}
7879

7980
@Process(JOB_NAMES.ELASTICSEARCH.DELETE_TWEET)
80-
async handleDeleteTweet(job: Job<EsSyncTweetDto>) {
81-
const { tweet_id } = job.data;
81+
async handleDeleteTweet(job: Job<EsDeleteTweetsDto>) {
82+
const { tweet_ids } = job.data;
83+
84+
if (!tweet_ids?.length) {
85+
this.logger.warn('No tweet_ids provided, skipping ES delete');
86+
return;
87+
}
8288

8389
try {
84-
await this.elasticsearch_service.delete({
85-
index: ELASTICSEARCH_INDICES.TWEETS,
86-
id: tweet_id,
87-
});
90+
const body = tweet_ids.flatMap((tweet_id: string) => [
91+
{ delete: { _index: ELASTICSEARCH_INDICES.TWEETS, _id: tweet_id } },
92+
]);
8893

89-
this.logger.log(`Deleted tweet ${tweet_id} from Elasticsearch`);
90-
} catch (error) {
91-
if (error.meta?.statusCode === 404) {
92-
this.logger.warn(`Tweet ${tweet_id} not found in ES, skipping delete`);
93-
} else {
94-
this.logger.error(`Failed to delete tweet ${tweet_id}:`, error);
95-
throw error;
94+
const response = await this.elasticsearch_service.bulk({ body });
95+
96+
if (response.errors) {
97+
response.items.forEach((item, i) => {
98+
const result = item.delete;
99+
if (result?.error) {
100+
if (result.status === 404) {
101+
this.logger.warn(`Tweet ${tweet_ids[i]} not found in ES, skipping`);
102+
} else {
103+
this.logger.error(
104+
`Failed to delete tweet ${tweet_ids[i]}:`,
105+
result.error
106+
);
107+
}
108+
}
109+
});
96110
}
111+
112+
this.logger.log(`Deleted ${tweet_ids.length} tweets from Elasticsearch`);
113+
} catch (error) {
114+
this.logger.error('Bulk delete failed:', error);
115+
throw error;
97116
}
98117
}
99118

0 commit comments

Comments
 (0)