diff --git a/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java b/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java index c769f10..73455e6 100644 --- a/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java +++ b/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java @@ -11,5 +11,5 @@ public interface ReportMapper { ReportDto getReportForUser(String userId); List getReportListForUser(String userId); int deleteReport(String msgId); - int deleteBulkReport(Map reqMap); + int deleteBulkReport(Map reqMap); } diff --git a/src/main/java/com/munjaon/server/cache/service/ReportService.java b/src/main/java/com/munjaon/server/cache/service/ReportService.java index f8bf70b..a8ec8c6 100644 --- a/src/main/java/com/munjaon/server/cache/service/ReportService.java +++ b/src/main/java/com/munjaon/server/cache/service/ReportService.java @@ -29,14 +29,14 @@ public class ReportService { } public int deleteBulkReport(String msgId, String userId) { - Map reqMap = new HashMap<>(); + Map reqMap = new HashMap<>(); reqMap.put("msgId", msgId); reqMap.put("userId", userId); return deleteBulkReport(reqMap); } - public int deleteBulkReport(Map reqMap) { + public int deleteBulkReport(Map reqMap) { return reportMapper.deleteBulkReport(reqMap); } } diff --git a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java index 30ea6a0..e37fde7 100644 --- a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java +++ b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java @@ -270,8 +270,8 @@ public class RunnerConfiguration { public CommandLineRunner getRunnerBeanForReportQueue() { try { String serviceName = "REPORT_QUEUE"; - ReportQueueServerService reportQueueServerService = new ReportQueueServerService(serviceName); - reportQueueServerService.start(); + ReportQueueServer reportQueueServer = new ReportQueueServer(serviceName); + reportQueueServer.start(); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java b/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java index 579286a..4359a64 100644 --- a/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java +++ b/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java @@ -25,6 +25,8 @@ public class ReportUserDto { private int maxQueueCount; //Report Queue에 유지할 건수 /* 요청을 처리중인지 여부 */ private boolean isRunningMode; + /* Report Queue 처리중인지 여부 */ + private boolean isQueueMode; public int isAlive() { if (isLogin) { diff --git a/src/main/java/com/munjaon/server/server/service/ReportQueueServer.java b/src/main/java/com/munjaon/server/server/service/ReportQueueServer.java new file mode 100644 index 0000000..1936510 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/service/ReportQueueServer.java @@ -0,0 +1,77 @@ +package com.munjaon.server.server.service; + +import com.munjaon.server.server.dto.ReportUserDto; +import com.munjaon.server.server.queue.ReportUserQueue; +import com.munjaon.server.server.task.ReportQueueTask; +import org.json.simple.JSONObject; + +import java.util.List; + +public class ReportQueueServer extends Service { + private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance(); + private final int maxWriteCount; + private final int maxQueueCount; + + public ReportQueueServer(String serviceName) { + super(serviceName); + this.maxWriteCount = Integer.parseInt(getProp("MAX_WRITE_COUNT").trim()); + this.maxQueueCount = Integer.parseInt(getProp("MAX_QUEUE_COUNT").trim()); + } + + @Override + public void checkReady() { + this.IS_READY_YN = true; + } + + @Override + public void initResources() { + saveSystemLog("REPORT_QUEUE_SERVICE : RESOURCES INITIALIZING ... ..."); + + saveSystemLog("REPORT_QUEUE_SERVICE : RESOURCES INITIALIZED ... ..."); + } + + @Override + public void releaseResources() { + saveSystemLog("REPORT_QUEUE_SERVICE : SERVER RESOURCE RELEASING ... ..."); + + saveSystemLog("REPORT_QUEUE_SERVICE : SERVER RESOURCE RELEASED ... ..."); + } + + @Override + public void doService() { + saveSystemLog("REPORT_QUEUE_SERVICE : SERVER SERVICE STARTED ... ..."); + while (isRun()) { + try { + doQueueService(); + Thread.sleep(10); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + saveSystemLog("REPORT_QUEUE_SERVICE : SERVER SERVICE STOPPED ... ..."); + } + + private void doQueueService() { + List reportUserList = reportUserQueue.getUsers(); + if (reportUserList == null || reportUserList.isEmpty()) { + return; + } + + for (ReportUserDto reportUserDto : reportUserList) { + /* Report Queue 작업중인지 체크 */ + if (reportUserDto.isQueueMode()) { + continue; + } + /* Report Queue 작업을 실행한다. */ + reportUserDto.setQueueMode(true); + reportUserDto.setMaxWriteCount(this.maxWriteCount); + reportUserDto.setMaxQueueCount(this.maxQueueCount); + new ReportQueueTask(reportUserDto, logger).run(); + } + } + + @Override + public JSONObject monitorService() { + return null; + } +} diff --git a/src/main/java/com/munjaon/server/server/task/ReportQueueTask.java b/src/main/java/com/munjaon/server/server/task/ReportQueueTask.java index 6cb8b94..cc3c335 100644 --- a/src/main/java/com/munjaon/server/server/task/ReportQueueTask.java +++ b/src/main/java/com/munjaon/server/server/task/ReportQueueTask.java @@ -6,10 +6,12 @@ import com.munjaon.server.queue.pool.ReportQueue; import com.munjaon.server.server.dto.ReportDto; import com.munjaon.server.server.dto.ReportUserDto; import com.munjaon.server.util.LogUtil; +import lombok.SneakyThrows; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -17,60 +19,106 @@ import java.util.Map; public class ReportQueueTask implements Runnable { public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]"); public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]"; + private final int NO_REPORT_SLEEP_TIME = 3000; + private final int REPORT_SLEEP_TIME = 10; private final ReportUserDto reportUserDto; + private final ReportQueue reportQueue; private final LogUtil logger; + private boolean isProcessRun = false; + public ReportQueueTask(ReportUserDto reportUserDto, LogUtil logger) { this.reportUserDto = reportUserDto; this.logger = logger; + + /* 유효성 체크 */ + if (validQueueTask()) { + this.reportQueue = reportUserDto.getReportQueue(); + /* 리포트큐에 최대 크기까지 쓰고 모두 리포트는 전송했는지 체크 : 테스트후 적용 */ + if (reportQueue.isWriteLimit(reportUserDto.getMaxQueueCount())) { + /* Report Queue : Report를 전부 전송한 경우 Truncate & 리포트 큐에 저장 로직 수행, Report Queue 저장로직 수행 */ + if (reportQueue.isTruncateQueue(reportUserDto.getMaxQueueCount())) { + try { + reportQueue.truncateQueue(); + /* 정상적으로 Truncate가 완료된 경우만 isProcessRun : true */ + this.isProcessRun = true; + } catch (Exception ignored) {} + } + } else { + /* Report Queue가 최대 쓰기 크기에 도달하지 않은 경우 : Report Queue 저장로직 수행 */ + this.isProcessRun = true; + } + } else { + this.reportQueue = null; + } + + /* Report Queue 모드로 설정 : Report Thread 제어용 */ + reportUserDto.setQueueMode(true); // Report Queue 모드로 설정 } - @Override - public void run() { + private boolean validQueueTask() { if (reportUserDto == null || reportUserDto.getUserId() == null) { - return; + return false; } if (reportUserDto.getReportQueue() == null || reportUserDto.getReportQueue().isOpen() == false) { - return; + return false; } - ReportQueue reportQueue = reportUserDto.getReportQueue(); - /* 리포트큐에 최대 크기까지 쓰고 모두 리포트는 전송했는지 체크 : 테스트후 적용 */ - if (reportQueue.isWriteLimit(reportUserDto.getMaxQueueCount())) { - if (reportQueue.isTruncateQueue(reportUserDto.getMaxQueueCount())) { - try { - reportQueue.truncateQueue(); - } catch (Exception ignored) {} - } else { - return; - } - } - ReportService reportService = (ReportService) CacheService.REPORT_SERVICE.getService(); - List list = reportService.getReportListForUser(reportUserDto.getUserId()); - if (list == null || list.isEmpty()) { - return; - } + return true; + } - StringBuilder builder = new StringBuilder(); - for (ReportDto dto : list) { - try { - if (builder.isEmpty()) { - builder.append(dto.getMsgId()); - } else { - builder.append(",").append(dto.getMsgId()); + @SneakyThrows + @Override + public void run() { + /* 정상적인 Report Queue 저장로직을 수행할 경우 */ + int cntReport = 0; + if (isProcessRun) { + ReportService reportService = (ReportService) CacheService.REPORT_SERVICE.getService(); + List list = reportService.getReportListForUser(reportUserDto.getUserId()); + + /* Report Queue 저장 & DB 데이터 삭제 */ + if (list != null && list.size() > 0) { + List msgList = new ArrayList<>(); + /* Report Queue에 저장 */ + for (ReportDto dto : list) { + boolean isError = false; + try { + reportQueue.pushReportToQueue(dto); + msgList.add(dto.getMsgId()); + cntReport++; // Report를 처리한 카운트 + saveLog("reportDto : " + dto.toString()); + } catch (Exception e) { + saveLog("ReportQueueTask ERROR"); + saveLog(e); + isError = true; + } + /* 에러인 경우 */ + if (isError) { + break; + } + } + + /* DB에서 데이터 삭제 */ + if (msgList.size() > 0) { + Map reqMap = new HashMap<>(); + reqMap.put("userId", reportUserDto.getUserId()); + reqMap.put("msgList", msgList); + reportService.deleteBulkReport(reqMap); } - saveLog("reportDto : " + dto.toString()); - reportQueue.pushReportToQueue(dto); - } catch (Exception e) { - throw new RuntimeException(e); } } - Map reqMap = new HashMap<>(); - reqMap.put("userId", reportUserDto.getUserId()); - reqMap.put("msgId", builder.toString()); - reportService.deleteBulkReport(reqMap); + /* Thread Sleep */ + if (cntReport > 0) { + saveLog("ReportQueueTask is Run ... ... [ID : " + reportUserDto.getUserId() + "] [REPORT COUNT : " + cntReport + "]"); + Thread.sleep(REPORT_SLEEP_TIME); + } else { + Thread.sleep(NO_REPORT_SLEEP_TIME); + } + + /* Queue 실해모드 해제 */ + reportUserDto.setQueueMode(false); // Report Queue 모드로 설정 } private void saveSystemLog(Object obj) { diff --git a/src/main/resources/sqlmap/report_sql.xml b/src/main/resources/sqlmap/report_sql.xml index ff74472..925b65b 100644 --- a/src/main/resources/sqlmap/report_sql.xml +++ b/src/main/resources/sqlmap/report_sql.xml @@ -38,14 +38,24 @@ + DELETE FROM mj_msg_report + WHERE USER_ID = #{userId} + AND MSG_ID IN + + #{item} + + + + DELETE SRC FROM mj_msg_report SRC INNER JOIN ( with recursive T as ( select #{msgId} as items), - N as ( select 1 as n union select n + 1 from N, T - where n length(items) - length(replace(items, ',', ''))) - select distinct substring_index(substring_index(items, ',', n), ',', -1) - MSG_ID from N, T + N as ( select 1 as n union select n + 1 from N, T + where n length(items) - length(replace(items, ',', ''))) + select distinct substring_index(substring_index(items, ',', n), ',', -1) + MSG_ID from N, T ) DEST ON SRC.MSG_ID = DEST.MSG_ID WHERE SRC.USER_ID = #{userId}