Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package org.devkor.apu.saerok_server.domain.announcement.application;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.devkor.apu.saerok_server.domain.admin.announcement.core.entity.Announcement;
import org.devkor.apu.saerok_server.domain.admin.announcement.core.repository.AnnouncementRepository;
import org.devkor.apu.saerok_server.domain.notification.application.assembly.render.NotificationRenderer;
import org.devkor.apu.saerok_server.domain.notification.application.dto.PushMessageCommand;
import org.devkor.apu.saerok_server.domain.notification.application.model.payload.SystemNotificationPayload;
import org.devkor.apu.saerok_server.domain.notification.core.entity.Notification;
import org.devkor.apu.saerok_server.domain.notification.core.entity.NotificationType;
import org.devkor.apu.saerok_server.domain.notification.core.repository.NotificationRepository;
import org.devkor.apu.saerok_server.domain.notification.core.repository.NotificationSettingRepository;
import org.devkor.apu.saerok_server.domain.notification.core.repository.UserDeviceRepository;
import org.devkor.apu.saerok_server.domain.notification.infra.fcm.FcmMessageClient;
import org.devkor.apu.saerok_server.domain.user.core.entity.User;
import org.devkor.apu.saerok_server.domain.user.core.repository.UserRepository;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Service
@RequiredArgsConstructor
public class AnnouncementBulkNotificationService {

private static final int CHUNK_SIZE = 100;
private static final int BATCH_INSERT_SIZE = 100;
private static final int FCM_MAX_TOKENS = 500;

private final AnnouncementRepository announcementRepository;
private final UserRepository userRepository;
private final NotificationRepository notificationRepository;
private final UserDeviceRepository userDeviceRepository;
private final NotificationSettingRepository settingRepository;
private final NotificationRenderer renderer;
private final FcmMessageClient fcmMessageClient;

@Transactional
public void sendBulkNotification(Long announcementId) {
Announcement ann = announcementRepository.findById(announcementId).orElse(null);
if (ann == null || !ann.isPublished() || !Boolean.TRUE.equals(ann.getSendNotification())) {
return;
}

Long annId = ann.getId();
NotificationType type = NotificationType.SYSTEM_PUBLISHED_ANNOUNCEMENT;
Map<String, Object> extras = Map.of(
"announcementId", annId,
"title", ann.getPushTitle(),
"body", ann.getPushBody(),
"inAppBody", ann.getInAppBody()
);

SystemNotificationPayload templatePayload = new SystemNotificationPayload(0L, type, annId, extras);
NotificationRenderer.RenderedMessage rendered = renderer.render(templatePayload);

int offset = 0;
while (true) {
List<Long> userIds = userRepository.findActiveUserIds(offset, CHUNK_SIZE);
if (userIds.isEmpty()) break;

processChunk(userIds, type, extras, rendered, annId);
offset += userIds.size();

if (userIds.size() < CHUNK_SIZE) break;
}

log.info("Bulk announcement notification completed: announcementId={}, totalUsers={}",
annId, offset);
}

private void processChunk(List<Long> userIds, NotificationType type,
Map<String, Object> extras,
NotificationRenderer.RenderedMessage rendered,
Long announcementId) {
// (a) 유저 배치 조회
List<User> users = userRepository.findByIds(userIds);

// (b) 인앱 알림 batch insert
List<Notification> notifications = users.stream()
.map(user -> Notification.builder()
.user(user)
.type(type)
.isRead(false)
.payload(new HashMap<>(extras))
.build())
.toList();
notificationRepository.batchInsert(notifications, BATCH_INSERT_SIZE);

// (c) 활성 디바이스 조회 (1 쿼리)
List<Long> enabledDeviceIds = settingRepository
.findEnabledDeviceIdsByUserIdsAndType(userIds, type);
if (enabledDeviceIds.isEmpty()) return;

// (d) 토큰 조회 (1 쿼리)
List<String> tokens = userDeviceRepository.findTokensByUserDeviceIds(enabledDeviceIds);
if (tokens.isEmpty()) return;

// (e) FCM 전송
PushMessageCommand cmd = PushMessageCommand.createPushMessageCommand(
rendered.pushTitle(), rendered.pushBody(),
type.name(), announcementId, 1, null);

for (int i = 0; i < tokens.size(); i += FCM_MAX_TOKENS) {
List<String> batch = tokens.subList(i, Math.min(i + FCM_MAX_TOKENS, tokens.size()));
fcmMessageClient.sendToDevices(batch, cmd);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
import lombok.RequiredArgsConstructor;
import org.devkor.apu.saerok_server.domain.admin.announcement.core.entity.Announcement;
import org.devkor.apu.saerok_server.domain.admin.announcement.core.repository.AnnouncementRepository;
import org.devkor.apu.saerok_server.domain.notification.application.facade.NotifySystemService;
import org.devkor.apu.saerok_server.domain.notification.core.entity.NotificationType;
import org.devkor.apu.saerok_server.domain.user.core.repository.UserRepository;
import org.devkor.apu.saerok_server.domain.announcement.application.event.AnnouncementPublishedEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.util.List;
import java.util.Map;

@Service
@Transactional
Expand All @@ -23,8 +21,7 @@ public class AnnouncementPublicationService {
private static final ZoneId KST = ZoneId.of("Asia/Seoul");

private final AnnouncementRepository announcementRepository;
private final UserRepository userRepository;
private final NotifySystemService notifySystemService;
private final ApplicationEventPublisher eventPublisher;

public OffsetDateTime toKstOffset(LocalDateTime localDateTime) {
if (localDateTime == null) return null;
Expand Down Expand Up @@ -59,22 +56,6 @@ public void notifyPublishedAnnouncement(Announcement announcement) {
if (!announcement.isPublished() || !Boolean.TRUE.equals(announcement.getSendNotification())) {
return;
}
List<Long> userIds = userRepository.findActiveUserIds();
if (userIds.isEmpty()) {
return;
}
Map<String, Object> extras = Map.of(
"announcementId", announcement.getId(),
"title", announcement.getPushTitle(),
"body", announcement.getPushBody(),
"inAppBody", announcement.getInAppBody()
);

notifySystemService.notifyUsersDeduplicatedPush(
userIds,
NotificationType.SYSTEM_PUBLISHED_ANNOUNCEMENT,
announcement.getId(),
extras
);
eventPublisher.publishEvent(new AnnouncementPublishedEvent(announcement.getId()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.devkor.apu.saerok_server.domain.announcement.application.event;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.devkor.apu.saerok_server.domain.announcement.application.AnnouncementBulkNotificationService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;

@Slf4j
@Component
@RequiredArgsConstructor
public class AnnouncementNotificationWorker {

private final AnnouncementBulkNotificationService bulkNotificationService;

@Async("announcementNotificationExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handle(AnnouncementPublishedEvent event) {
try {
bulkNotificationService.sendBulkNotification(event.announcementId());
} catch (Exception e) {
log.error("Failed to send bulk announcement notification: announcementId={}",
event.announcementId(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.devkor.apu.saerok_server.domain.announcement.application.event;

public record AnnouncementPublishedEvent(Long announcementId) {}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,18 @@ public void markAllAsReadByUserId(Long userId) {
.setParameter("userId", userId)
.executeUpdate();
}

public void batchInsert(List<Notification> notifications, int batchSize) {
for (int i = 0; i < notifications.size(); i++) {
em.persist(notifications.get(i));
if ((i + 1) % batchSize == 0) {
em.flush();
em.clear();
}
}
if (!notifications.isEmpty() && notifications.size() % batchSize != 0) {
em.flush();
em.clear();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,20 @@ public List<Long> findEnabledDeviceIdsByUserAndType(Long userId, NotificationTyp
.getResultList();
}

public List<Long> findEnabledDeviceIdsByUserIdsAndType(List<Long> userIds, NotificationType type) {
if (userIds == null || userIds.isEmpty()) return List.of();
return em.createQuery("""
select ns.userDevice.id
from NotificationSetting ns
where ns.userDevice.user.id in :userIds
and ns.type = :type
and ns.enabled = true
""", Long.class)
.setParameter("userIds", userIds)
.setParameter("type", type)
.getResultList();
}

public void save(NotificationSetting setting) {
em.persist(setting);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,25 @@ public List<Long> findActiveUserIds() {
.setParameter("withdrawn", SignupStatusType.WITHDRAWN)
.getResultList();
}

public List<Long> findActiveUserIds(int offset, int limit) {
return em.createQuery(
"SELECT u.id FROM User u " +
"WHERE u.deletedAt IS NULL AND u.signupStatus <> :withdrawn " +
"ORDER BY u.id",
Long.class)
.setParameter("withdrawn", SignupStatusType.WITHDRAWN)
.setFirstResult(offset)
.setMaxResults(limit)
.getResultList();
}

public List<User> findByIds(List<Long> ids) {
if (ids == null || ids.isEmpty()) return List.of();
return em.createQuery(
"SELECT u FROM User u WHERE u.id IN :ids",
User.class)
.setParameter("ids", ids)
.getResultList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,15 @@ public Executor pushNotificationExecutor() {
executor.initialize();
return executor;
}

@Bean(name = "announcementNotificationExecutor")
public Executor announcementNotificationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(10);
executor.setThreadNamePrefix("ann-notify-");
executor.initialize();
return executor;
}
}
Loading