From 2679d176296337be6c49dbf44640a22aa9d0472d Mon Sep 17 00:00:00 2001 From: dsjang Date: Mon, 22 Jul 2024 03:29:59 +0900 Subject: [PATCH] =?UTF-8?q?Report=20=EA=B8=B0=EB=8A=A5=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80,=20LMS=20=EA=B8=B0=EB=8A=A5=20=EC=B6=94=EA=B0=80,=20M?= =?UTF-8?q?MS=20=EA=B8=B0=EB=8A=A5=20=ED=85=8C=EC=8A=A4=ED=8A=B8=EC=A4=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/cache/mapper/ReportMapper.java | 5 + .../server/cache/service/ReportService.java | 20 + .../server/config/RunnerConfiguration.java | 107 ++++- .../munjaon/server/config/ServiceCode.java | 9 + .../server/queue/config/ReportConfig.java | 46 ++ .../server/queue/mapper/LmsMapper.java | 7 + .../server/queue/pool/LmsReadQueue.java | 39 ++ .../server/queue/pool/LmsWriteQueue.java | 6 + .../server/queue/pool/MmsReadQueue.java | 40 ++ .../server/queue/pool/MmsWriteQueue.java | 6 + .../server/queue/pool/ReportQueue.java | 213 +++++++++ .../server/queue/service/LmsQueueService.java | 11 +- .../server/queue/service/SmsQueueService.java | 2 +- .../service/CacheScheduleService.java | 3 +- .../server/server/config/ServerConfig.java | 2 +- .../server/server/dto/ConnectUserDto.java | 1 + .../server/server/dto/ReportUserDto.java | 19 +- .../server/server/packet/LinkCheck.java | 2 +- .../server/server/packet/LmsMessage.java | 87 ++++ .../server/server/packet/MmsMessage.java | 104 +++++ .../munjaon/server/server/packet/Packet.java | 19 +- .../munjaon/server/server/packet/Report.java | 160 +++++++ .../server/server/queue/CollectUserQueue.java | 118 +++++ .../server/server/queue/ReportUserQueue.java | 76 ++++ .../munjaon/server/server/sample/Main.java | 34 ++ .../server/server/sample/MyCallable.java | 27 ++ .../server/server/sample/MyRunnable.java | 20 + .../service/CollectBackServerService.java | 403 ++++++++++++++++++ .../server/service/CollectServerService.java | 322 ++++---------- .../service/ReportQueueServerService.java | 116 +++++ .../server/service/ReportServerService.java | 165 +++++-- .../server/server/task/CollectReadTask.java | 269 ++++++++++++ .../server/server/task/ReportQueueTask.java | 85 ++++ .../server/server/task/ReportReadTask.java | 199 +++++++++ .../com/munjaon/server/util/MessageUtil.java | 161 ++++++- src/main/resources/sqlmap/lms_sql.xml | 21 + src/main/resources/sqlmap/report_sql.xml | 32 +- 37 files changed, 2632 insertions(+), 324 deletions(-) create mode 100644 src/main/java/com/munjaon/server/queue/config/ReportConfig.java create mode 100644 src/main/java/com/munjaon/server/queue/mapper/LmsMapper.java create mode 100644 src/main/java/com/munjaon/server/queue/pool/LmsReadQueue.java create mode 100644 src/main/java/com/munjaon/server/queue/pool/MmsReadQueue.java create mode 100644 src/main/java/com/munjaon/server/queue/pool/ReportQueue.java create mode 100644 src/main/java/com/munjaon/server/server/packet/LmsMessage.java create mode 100644 src/main/java/com/munjaon/server/server/packet/MmsMessage.java create mode 100644 src/main/java/com/munjaon/server/server/packet/Report.java create mode 100644 src/main/java/com/munjaon/server/server/queue/CollectUserQueue.java create mode 100644 src/main/java/com/munjaon/server/server/queue/ReportUserQueue.java create mode 100644 src/main/java/com/munjaon/server/server/sample/Main.java create mode 100644 src/main/java/com/munjaon/server/server/sample/MyCallable.java create mode 100644 src/main/java/com/munjaon/server/server/sample/MyRunnable.java create mode 100644 src/main/java/com/munjaon/server/server/service/CollectBackServerService.java create mode 100644 src/main/java/com/munjaon/server/server/service/ReportQueueServerService.java create mode 100644 src/main/java/com/munjaon/server/server/task/CollectReadTask.java create mode 100644 src/main/java/com/munjaon/server/server/task/ReportQueueTask.java create mode 100644 src/main/java/com/munjaon/server/server/task/ReportReadTask.java create mode 100644 src/main/resources/sqlmap/lms_sql.xml diff --git a/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java b/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java index ab28d58..c769f10 100644 --- a/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java +++ b/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java @@ -3,8 +3,13 @@ package com.munjaon.server.cache.mapper; import com.munjaon.server.server.dto.ReportDto; import org.apache.ibatis.annotations.Mapper; +import java.util.List; +import java.util.Map; + @Mapper public interface ReportMapper { ReportDto getReportForUser(String userId); + List getReportListForUser(String userId); int deleteReport(String msgId); + int deleteBulkReport(Map reqMap); } diff --git a/src/main/java/com/munjaon/server/cache/service/ReportService.java b/src/main/java/com/munjaon/server/cache/service/ReportService.java index a5bcd6e..f8bf70b 100644 --- a/src/main/java/com/munjaon/server/cache/service/ReportService.java +++ b/src/main/java/com/munjaon/server/cache/service/ReportService.java @@ -6,6 +6,10 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + @Slf4j @Service @RequiredArgsConstructor @@ -16,7 +20,23 @@ public class ReportService { return reportMapper.getReportForUser(userId); } + public List getReportListForUser(String userId) { + return reportMapper.getReportListForUser(userId); + } + public int deleteReport(String msgId) { return reportMapper.deleteReport(msgId); } + + public int deleteBulkReport(String msgId, String userId) { + Map reqMap = new HashMap<>(); + reqMap.put("msgId", msgId); + reqMap.put("userId", userId); + + return deleteBulkReport(reqMap); + } + + public int deleteBulkReport(Map reqMap) { + return reportMapper.deleteBulkReport(reqMap); + } } diff --git a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java index f797ecd..8ad87e5 100644 --- a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java +++ b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java @@ -1,11 +1,8 @@ package com.munjaon.server.config; import com.munjaon.server.queue.dto.QueueInfo; -import com.munjaon.server.queue.pool.SmsReadQueue; -import com.munjaon.server.queue.pool.SmsWriteQueue; -import com.munjaon.server.server.service.CollectServerService; -import com.munjaon.server.server.service.PropertyLoader; -import com.munjaon.server.server.service.QueueServerService; +import com.munjaon.server.queue.pool.*; +import com.munjaon.server.server.service.*; import com.munjaon.server.util.ServiceUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -67,6 +64,60 @@ public class RunnerConfiguration { return args -> System.out.println("Runner Bean #2"); } + @Bean + @Order(2) + public CommandLineRunner getRunnerBeanForLmsQueue() { + try { + String[] svcArray = ServiceUtil.getServiceNames(serverConfig.getStringArray("LMS.SERVICE_LIST")); + if (svcArray == null || svcArray.length == 0) { + log.info("LMS service list is empty"); + } else { + if (ServiceUtil.isDuplicate(svcArray)) { + log.info("LMS service list is duplicated"); + } else { + for (String svc : svcArray) { + log.info("SERVICE CREATE : {}", svc); + QueueInfo queueInfo = QueueInfo.builder().queueName(svc).serviceType("LMS").queuePath(serverConfig.getString("LMS.QUEUE_PATH")).build(); + LmsWriteQueue lmsWriteQueue = new LmsWriteQueue(queueInfo); + LmsReadQueue lmsReadQueue = new LmsReadQueue(queueInfo); + QueueServerService queueServerService = new QueueServerService(svc, lmsWriteQueue, lmsReadQueue); + queueServerService.start(); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return args -> System.out.println("Runner Bean #2"); + } + + @Bean + @Order(2) + public CommandLineRunner getRunnerBeanForMmsQueue() { + try { + String[] svcArray = ServiceUtil.getServiceNames(serverConfig.getStringArray("MMS.SERVICE_LIST")); + if (svcArray == null || svcArray.length == 0) { + log.info("MMS service list is empty"); + } else { + if (ServiceUtil.isDuplicate(svcArray)) { + log.info("MMS service list is duplicated"); + } else { + for (String svc : svcArray) { + log.info("SERVICE CREATE : {}", svc); + QueueInfo queueInfo = QueueInfo.builder().queueName(svc).serviceType("MMS").queuePath(serverConfig.getString("MMS.QUEUE_PATH")).build(); + MmsWriteQueue mmsWriteQueue = new MmsWriteQueue(queueInfo); + MmsReadQueue mmsReadQueue = new MmsReadQueue(queueInfo); + QueueServerService queueServerService = new QueueServerService(svc, mmsWriteQueue, mmsReadQueue); + queueServerService.start(); + } + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return args -> System.out.println("Runner Bean #2"); + } + @Bean @Order(3) public CommandLineRunner getRunnerBeanForSmsCollector() { @@ -74,11 +125,55 @@ public class RunnerConfiguration { String serviceName = "SMS_COLLECTOR"; String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); +// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port); CollectServerService collectServerService = new CollectServerService(serviceName, serviceType, port); collectServerService.start(); } catch (Exception e) { throw new RuntimeException(e); } - return args -> System.out.println("Runner Bean #2"); + return args -> System.out.println("Runner Bean SmsCollector #3"); + } + + @Bean + @Order(3) + public CommandLineRunner getRunnerBeanForLmsCollector() { + try { + String serviceName = "LMS_COLLECTOR"; + String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); + int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); +// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port); + CollectServerService collectServerService = new CollectServerService(serviceName, serviceType, port); + collectServerService.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return args -> System.out.println("Runner Bean SmsCollector #3"); + } + + @Bean + @Order(3) + public CommandLineRunner getRunnerBeanForReporter() { + try { + String serviceName = "REPORTER"; + int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); + ReportServerService reportServerService = new ReportServerService(serviceName, port); + reportServerService.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return args -> System.out.println("Runner Bean Reporter #3"); + } + + @Bean + @Order(4) + public CommandLineRunner getRunnerBeanForReportQueue() { + try { + String serviceName = "REPORT_QUEUE"; + ReportQueueServerService reportQueueServerService = new ReportQueueServerService(serviceName); + reportQueueServerService.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return args -> System.out.println("Runner Bean ReporterQueue #4"); } } diff --git a/src/main/java/com/munjaon/server/config/ServiceCode.java b/src/main/java/com/munjaon/server/config/ServiceCode.java index 06f03f8..992b9c2 100644 --- a/src/main/java/com/munjaon/server/config/ServiceCode.java +++ b/src/main/java/com/munjaon/server/config/ServiceCode.java @@ -25,6 +25,15 @@ public enum ServiceCode { MSG_ERROR_MEDIA_SUBJECT(500, "LMS, MMS Subject is Null OR is Over Limit Byte"), MSG_ERROR_MEDIA_MESSAGE(501, "LMS, MMS Message is Null OR is Over Limit Byte"), + /* 카카오 알림톡, 친구톡 메시지 */ + + /* 리포트 관련 에러 메시지 */ + MSG_ERROR_REPORT(800, "REPORT DATA is Null OR is invalid"), + MSG_ERROR_REPORT_MSG_ID(801, "MSG_ID is Null OR is Over Limit Byte"), + MSG_ERROR_REPORT_AGENT_CODE(802, "AGENT_CODE is Null OR is Over Limit Byte"), + MSG_ERROR_REPORT_SEND_TIME(803, "SEND_TIME is Null OR is Over Limit Byte"), + MSG_ERROR_REPORT_TELECOM(804, "TELECOM is Null OR is Over Limit Byte"), + MSG_ERROR_REPORT_RESULT(805, "RESULT is Null OR is Over Limit Byte"), ETC_ERROR(999, "ETC ERROR"); diff --git a/src/main/java/com/munjaon/server/queue/config/ReportConfig.java b/src/main/java/com/munjaon/server/queue/config/ReportConfig.java new file mode 100644 index 0000000..ec6424c --- /dev/null +++ b/src/main/java/com/munjaon/server/queue/config/ReportConfig.java @@ -0,0 +1,46 @@ +package com.munjaon.server.queue.config; + +public final class ReportConfig { + /* USER_ID (Length : 20 / Position : 0) */ + public static final int USER_ID_LENGTH = 20; + public static final int USER_ID_POSITION = 0; + + /* WRITE_COUNT (Length : 10 / Position : 20) */ + public static final int WRITE_COUNT_LENGTH = 10; + public static final int WRITE_COUNT_POSITION = USER_ID_POSITION + USER_ID_LENGTH; + + /* READ_COUNT (Length : 10 / Position : 30) */ + public static final int READ_COUNT_LENGTH = 10; + public static final int READ_COUNT_POSITION = WRITE_COUNT_POSITION + WRITE_COUNT_LENGTH; + + /* Head 길이 */ + public static final int HEAD_LENGTH = READ_COUNT_POSITION + READ_COUNT_LENGTH; + + + + /* MSG_ID (Length : 20 / Position : 0) */ + public static final int MSG_ID_LENGTH = 20; + public static final int MSG_ID_POSITION = 0; + + /* AGENT_CODE (Length : 2 / Position : 20) */ + public static final int AGENT_CODE_LENGTH = 2; + public static final int AGENT_CODE_POSITION = MSG_ID_POSITION + MSG_ID_LENGTH; + + /* SEND_TIME (Length : 14 / Position : 22) */ + public static final int SEND_TIME_LENGTH = 14; + public static final int SEND_TIME_POSITION = AGENT_CODE_POSITION + AGENT_CODE_LENGTH; + + /* TELECOM (Length : 3 / Position : 36) */ + public static final int TELECOM_LENGTH = 3; + public static final int TELECOM_POSITION = SEND_TIME_POSITION + SEND_TIME_LENGTH; + + /* RESULT (Length : 5 / Position : 39) */ + public static final int RESULT_LENGTH = 5; + public static final int RESULT_POSITION = TELECOM_POSITION + TELECOM_LENGTH; + + /* Report 길이 */ + public static final int BODY_LENGTH = RESULT_POSITION + RESULT_LENGTH; + + // 큐에 저장하기전에 바이트를 채울 문자 + public static final byte SET_DEFAULT_BYTE = (byte) 0x20; +} diff --git a/src/main/java/com/munjaon/server/queue/mapper/LmsMapper.java b/src/main/java/com/munjaon/server/queue/mapper/LmsMapper.java new file mode 100644 index 0000000..3dbb53b --- /dev/null +++ b/src/main/java/com/munjaon/server/queue/mapper/LmsMapper.java @@ -0,0 +1,7 @@ +package com.munjaon.server.queue.mapper; + +import com.munjaon.server.queue.dto.BasicMessageDto; + +public interface LmsMapper { + int insert(BasicMessageDto messageDto); +} diff --git a/src/main/java/com/munjaon/server/queue/pool/LmsReadQueue.java b/src/main/java/com/munjaon/server/queue/pool/LmsReadQueue.java new file mode 100644 index 0000000..0755139 --- /dev/null +++ b/src/main/java/com/munjaon/server/queue/pool/LmsReadQueue.java @@ -0,0 +1,39 @@ +package com.munjaon.server.queue.pool; + +import com.munjaon.server.queue.config.MediaBodyConfig; +import com.munjaon.server.queue.config.QueueConstants; +import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.dto.QueueInfo; +import com.munjaon.server.util.MessageUtil; + +import java.nio.ByteBuffer; + +public class LmsReadQueue extends ReadQueue { + public LmsReadQueue(QueueInfo queueInfo) throws Exception { + this.queueInfo = queueInfo; +// initQueue(); + } + + @Override + void popBuffer() throws Exception { + this.channel.position(MessageUtil.calcReadPosition(this.popCounter, MediaBodyConfig.LMS_SUM_BYTE_LENGTH)); + this.channel.read(this.dataBuffer); + } + + @Override + void getBytesForExtendMessage(BasicMessageDto messageDto) { + MessageUtil.getBytesForMediaMessage(this.dataBuffer, messageDto); + } + + @Override + void initDataBuffer() { + if (this.dataBuffer == null) { + this.dataBuffer = ByteBuffer.allocateDirect(MediaBodyConfig.LMS_SUM_BYTE_LENGTH); + } + this.dataBuffer.clear(); + for(int loopCnt = 0; loopCnt < MediaBodyConfig.LMS_SUM_BYTE_LENGTH; loopCnt++){ + this.dataBuffer.put(QueueConstants.SET_DEFAULT_BYTE); + } + this.dataBuffer.position(0); + } +} diff --git a/src/main/java/com/munjaon/server/queue/pool/LmsWriteQueue.java b/src/main/java/com/munjaon/server/queue/pool/LmsWriteQueue.java index 5ea00f6..7cecda3 100644 --- a/src/main/java/com/munjaon/server/queue/pool/LmsWriteQueue.java +++ b/src/main/java/com/munjaon/server/queue/pool/LmsWriteQueue.java @@ -4,11 +4,17 @@ import com.munjaon.server.config.ServiceCode; import com.munjaon.server.queue.config.MediaBodyConfig; import com.munjaon.server.queue.config.QueueConstants; import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.dto.QueueInfo; import com.munjaon.server.util.MessageUtil; import java.nio.ByteBuffer; public class LmsWriteQueue extends WriteQueue { + public LmsWriteQueue(QueueInfo queueInfo) throws Exception { + this.queueInfo = queueInfo; + /* 큐초기화 */ +// initQueue(); + } @Override public int isValidateMessageForExtend(BasicMessageDto messageDto) { diff --git a/src/main/java/com/munjaon/server/queue/pool/MmsReadQueue.java b/src/main/java/com/munjaon/server/queue/pool/MmsReadQueue.java new file mode 100644 index 0000000..9158405 --- /dev/null +++ b/src/main/java/com/munjaon/server/queue/pool/MmsReadQueue.java @@ -0,0 +1,40 @@ +package com.munjaon.server.queue.pool; + +import com.munjaon.server.queue.config.MediaBodyConfig; +import com.munjaon.server.queue.config.QueueConstants; +import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.dto.QueueInfo; +import com.munjaon.server.util.MessageUtil; + +import java.nio.ByteBuffer; + +public class MmsReadQueue extends ReadQueue { + public MmsReadQueue(QueueInfo queueInfo) throws Exception { + this.queueInfo = queueInfo; +// initQueue(); + } + + @Override + void popBuffer() throws Exception { + this.channel.position(MessageUtil.calcReadPosition(this.popCounter, MediaBodyConfig.MMS_SUM_BYTE_LENGTH)); + this.channel.read(this.dataBuffer); + } + + @Override + void getBytesForExtendMessage(BasicMessageDto messageDto) { + MessageUtil.getBytesForMediaMessage(this.dataBuffer, messageDto); + MessageUtil.getBytesForMmsMessage(this.dataBuffer, messageDto); + } + + @Override + void initDataBuffer() { + if (this.dataBuffer == null) { + this.dataBuffer = ByteBuffer.allocateDirect(MediaBodyConfig.MMS_SUM_BYTE_LENGTH); + } + this.dataBuffer.clear(); + for(int loopCnt = 0; loopCnt < MediaBodyConfig.MMS_SUM_BYTE_LENGTH; loopCnt++){ + this.dataBuffer.put(QueueConstants.SET_DEFAULT_BYTE); + } + this.dataBuffer.position(0); + } +} diff --git a/src/main/java/com/munjaon/server/queue/pool/MmsWriteQueue.java b/src/main/java/com/munjaon/server/queue/pool/MmsWriteQueue.java index 281b01b..b6f8edf 100644 --- a/src/main/java/com/munjaon/server/queue/pool/MmsWriteQueue.java +++ b/src/main/java/com/munjaon/server/queue/pool/MmsWriteQueue.java @@ -4,11 +4,17 @@ import com.munjaon.server.config.ServiceCode; import com.munjaon.server.queue.config.MediaBodyConfig; import com.munjaon.server.queue.config.QueueConstants; import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.dto.QueueInfo; import com.munjaon.server.util.MessageUtil; import java.nio.ByteBuffer; public class MmsWriteQueue extends WriteQueue { + public MmsWriteQueue(QueueInfo queueInfo) throws Exception { + this.queueInfo = queueInfo; + /* 큐초기화 */ +// initQueue(); + } @Override public int isValidateMessageForExtend(BasicMessageDto messageDto) { diff --git a/src/main/java/com/munjaon/server/queue/pool/ReportQueue.java b/src/main/java/com/munjaon/server/queue/pool/ReportQueue.java new file mode 100644 index 0000000..74dce05 --- /dev/null +++ b/src/main/java/com/munjaon/server/queue/pool/ReportQueue.java @@ -0,0 +1,213 @@ +package com.munjaon.server.queue.pool; + +import com.munjaon.server.config.ServiceCode; +import com.munjaon.server.queue.config.QueueConstants; +import com.munjaon.server.queue.config.ReportConfig; +import com.munjaon.server.server.dto.ReportDto; +import com.munjaon.server.util.FileUtil; +import com.munjaon.server.util.MessageUtil; +import lombok.Getter; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +public class ReportQueue { + private final Object lockObject = new Object(); // Lock Object + @Getter + private String queuePathFile; // 사용자 큐 파일 + @Getter + private String userId; // 사용자 아이디 + @Getter + private int writeCounter = 0; // 쓰기 카운트 + @Getter + private int readCounter = 0; // 읽기 카운트 + private FileChannel channel = null; // Queue File Channel + private ByteBuffer headBuffer = ByteBuffer.allocateDirect(ReportConfig.HEAD_LENGTH); // Queue Head Buffer + private ByteBuffer bodyBuffer = ByteBuffer.allocateDirect(ReportConfig.BODY_LENGTH); // Queue Body Buffer + private byte[] byteArray = null; + + public ReportQueue(String queuePathFile, String userId) throws Exception { + this.queuePathFile = queuePathFile; + this.userId = userId; + + initQueue(); + } + + private void initQueue() throws Exception { + /* 2. 경로 체크 및 생성 */ + FileUtil.mkdirs(this.queuePathFile); + File file = new File(this.queuePathFile + File.separator + this.userId + ".queue"); + this.channel = new RandomAccessFile(file, "rw").getChannel(); + if (file.length() == 0) { + this.writeCounter = 0; + this.readCounter = 0; + writeHeader(); + } else { + readHeader(); + } + } + + public void pushReportToQueue(ReportDto reportDto) throws Exception { + synchronized (lockObject) { + if (ServiceCode.OK.getCode() != MessageUtil.isValidateMessageForReport(reportDto)) { + return; + } + + /* 1. buffer 초기화 및 데이터 Set */ + initBodyBuffer(); + MessageUtil.setBytesForReport(this.bodyBuffer, reportDto); + /* 2. Header 정보 다시 읽기 */ + readHeader(); + /* 3. 파일채널에 쓰기 */ + this.channel.position(MessageUtil.calcWritePositionForReport(this.writeCounter, ReportConfig.BODY_LENGTH)); + this.bodyBuffer.flip(); + this.channel.write(this.bodyBuffer); + /* 4 Push 카운터 증가 및 head 저장 */ + this.writeCounter = this.writeCounter + 1; + writeHeader(); + } + } + + public ReportDto popReportFromQueue() throws Exception { + synchronized (lockObject) { + /* 1. Header 정보 다시 읽기 */ + readHeader(); + /* 2. 읽을 데이터가 없는지 체크 */ + if (this.writeCounter <= this.readCounter) { + return null; + } + /* 3. 채널에서 읽기 */ + this.bodyBuffer.clear(); + this.channel.position(MessageUtil.calcWritePositionForReport(this.readCounter, ReportConfig.BODY_LENGTH)); + this.channel.read(this.bodyBuffer); + + return MessageUtil.getReportFromBuffer(this.bodyBuffer); + } + } + + public boolean isTruncateQueue(int maxWriteCount) { + boolean truncate = false; + synchronized (lockObject) { + if (this.writeCounter >= maxWriteCount) { + if (this.writeCounter == this.readCounter) { + truncate = true; + } + } + } + + return truncate; + } + + public boolean isWriteLimit(int maxWriteCount) { + boolean isLimit = false; + synchronized (lockObject) { + if (this.writeCounter >= maxWriteCount) { + isLimit = true; + } + } + + return isLimit; + } + + public void truncateQueue() throws Exception { + synchronized (lockObject) { + if (isOpen()) { + this.channel.truncate(0); + this.writeCounter = 0; + this.readCounter = 0; + writeHeader(); + } + } + } + + private void readHeader() throws Exception { + synchronized (lockObject) { + initHeadBuffer(); + this.channel.position(0); + this.channel.read(this.headBuffer); + this.byteArray = new byte[ReportConfig.USER_ID_LENGTH]; + // USER_ID + this.headBuffer.position(ReportConfig.USER_ID_POSITION); + this.headBuffer.get(this.byteArray); + this.userId = (new String(this.byteArray)).trim(); + // 쓰기 카운트 가져오기 + this.byteArray = new byte[ReportConfig.WRITE_COUNT_LENGTH]; + this.headBuffer.position(ReportConfig.WRITE_COUNT_POSITION); + this.headBuffer.get(this.byteArray); + this.writeCounter = Integer.parseInt((new String(this.byteArray)).trim()); + // 읽기 카운트 가져오기 + this.byteArray = new byte[ReportConfig.READ_COUNT_LENGTH]; + this.headBuffer.position(ReportConfig.READ_COUNT_POSITION); + this.headBuffer.get(this.byteArray); + this.readCounter = Integer.parseInt((new String(this.byteArray)).trim()); + } + } + + public void addReadCounter() throws Exception { + synchronized (lockObject) { + /* 읽기 카운트 증가 */ + this.readCounter = this.readCounter + 1; + + initHeadBuffer(); + this.channel.position(ReportConfig.USER_ID_POSITION); + this.headBuffer.put(this.userId.getBytes()); + this.headBuffer.position(ReportConfig.WRITE_COUNT_POSITION); + this.headBuffer.put(Integer.toString(this.writeCounter).getBytes()); + this.headBuffer.position(ReportConfig.READ_COUNT_POSITION); + this.headBuffer.put(Integer.toString(this.readCounter).getBytes()); + this.headBuffer.flip(); + this.channel.write(this.headBuffer); + } + } + + private void writeHeader() throws Exception { + synchronized (lockObject) { + initHeadBuffer(); + this.channel.position(ReportConfig.USER_ID_POSITION); + this.headBuffer.put(this.userId.getBytes()); + this.headBuffer.position(ReportConfig.WRITE_COUNT_POSITION); + this.headBuffer.put(Integer.toString(this.writeCounter).getBytes()); + this.headBuffer.position(ReportConfig.READ_COUNT_POSITION); + this.headBuffer.put(Integer.toString(this.readCounter).getBytes()); + this.headBuffer.flip(); + this.channel.write(this.headBuffer); + } + } + + private void initHeadBuffer() { + this.headBuffer.clear(); + for(int loopCnt = 0; loopCnt < ReportConfig.HEAD_LENGTH; loopCnt++){ + this.headBuffer.put(ReportConfig.SET_DEFAULT_BYTE); + } + this.headBuffer.position(0); + } + + public void initBodyBuffer() { + this.bodyBuffer.clear(); + for(int loopCnt = 0; loopCnt < ReportConfig.BODY_LENGTH; loopCnt++){ + this.bodyBuffer.put(QueueConstants.SET_DEFAULT_BYTE); + } + this.bodyBuffer.position(0); + } + + public void close() throws IOException { + try { + if (isOpen()) { + channel.close(); + } + } catch(IOException e) { + throw e; + } + } + + public boolean isOpen() { + if (this.channel == null) { + return false; + } + + return this.channel.isOpen(); + } +} diff --git a/src/main/java/com/munjaon/server/queue/service/LmsQueueService.java b/src/main/java/com/munjaon/server/queue/service/LmsQueueService.java index 3a84508..ed90b3e 100644 --- a/src/main/java/com/munjaon/server/queue/service/LmsQueueService.java +++ b/src/main/java/com/munjaon/server/queue/service/LmsQueueService.java @@ -1,6 +1,8 @@ package com.munjaon.server.queue.service; +import com.munjaon.server.cache.service.SerialNoService; import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.mapper.LmsMapper; import com.munjaon.server.queue.pool.LmsMemoryQueue; import com.munjaon.server.queue.pool.LmsQueuePool; import com.munjaon.server.queue.pool.WriteQueue; @@ -12,8 +14,10 @@ import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class LmsQueueService implements QueueAction { + private final LmsMapper lmsMapper; private final LmsQueuePool queueInstance = LmsQueuePool.getInstance(); private final LmsMemoryQueue memoryQueue = LmsMemoryQueue.getInstance(); + private final SerialNoService serialNoService; @Override public int getQueueSize() { @@ -51,7 +55,12 @@ public class LmsQueueService implements QueueAction { @Override public int saveMessageToTable(BasicMessageDto data) { - return 0; + String serialNo = serialNoService.getSerialNo(); + String groupSerialNo = serialNo.replace("MSGID", "MGRP"); + data.setId(serialNo); + data.setMsgGroupID(groupSerialNo); + log.debug("Save message to table : {}", data); + return lmsMapper.insert(data); } @Override diff --git a/src/main/java/com/munjaon/server/queue/service/SmsQueueService.java b/src/main/java/com/munjaon/server/queue/service/SmsQueueService.java index f0963ac..7bd41d7 100644 --- a/src/main/java/com/munjaon/server/queue/service/SmsQueueService.java +++ b/src/main/java/com/munjaon/server/queue/service/SmsQueueService.java @@ -56,7 +56,7 @@ public class SmsQueueService implements QueueAction { @Override public int saveMessageToTable(BasicMessageDto data) { String serialNo = serialNoService.getSerialNo(); - String groupSerialNo = serialNo.replace("MSGID", "MSGGID"); + String groupSerialNo = serialNo.replace("MSGID", "MGRP"); data.setId(serialNo); data.setMsgGroupID(groupSerialNo); log.debug("Save message to table : {}", data); diff --git a/src/main/java/com/munjaon/server/scheduler/service/CacheScheduleService.java b/src/main/java/com/munjaon/server/scheduler/service/CacheScheduleService.java index b065dfc..ea6f97d 100644 --- a/src/main/java/com/munjaon/server/scheduler/service/CacheScheduleService.java +++ b/src/main/java/com/munjaon/server/scheduler/service/CacheScheduleService.java @@ -4,7 +4,6 @@ import com.munjaon.server.cache.service.MemberService; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @Slf4j @@ -20,7 +19,7 @@ public class CacheScheduleService { /* 사용자 설정 테이블 마지막 업데이트 시간 */ private String config_last_modified_time = null; - @Scheduled(cron="0/5 * * * * *") +// @Scheduled(cron="0/5 * * * * *") public void doService() throws Exception { doMemberService(); } diff --git a/src/main/java/com/munjaon/server/server/config/ServerConfig.java b/src/main/java/com/munjaon/server/server/config/ServerConfig.java index 2a45655..3789ddd 100644 --- a/src/main/java/com/munjaon/server/server/config/ServerConfig.java +++ b/src/main/java/com/munjaon/server/server/config/ServerConfig.java @@ -6,7 +6,7 @@ public final class ServerConfig { /* 서버 연결후 로그인 만료 시간 */ public static final int LIMIT_BIND_TIMEOUT = 5000; /* Session Check 만료 시간 */ - public static final int LIMIT_LINK_CHECK_TIMEOUT = 35000; + public static final int LIMIT_LINK_CHECK_TIMEOUT = 10000; /* 서버 프로퍼티 reload interval 시간 */ public static final Long INTERVAL_PROPERTY_RELOAD_TIME = 3000L; diff --git a/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java b/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java index 0a5da10..006f065 100644 --- a/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java +++ b/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java @@ -24,6 +24,7 @@ public class ConnectUserDto { private String remoteIP; /* 요금제(선불 : P / 후불 : A) */ private final String feeType = "A"; + private int command; //요청 command private MemberDto memberDto; diff --git a/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java b/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java index c4f8538..1badf5e 100644 --- a/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java +++ b/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java @@ -1,6 +1,7 @@ package com.munjaon.server.server.dto; import com.munjaon.server.cache.dto.MemberDto; +import com.munjaon.server.queue.pool.ReportQueue; import com.munjaon.server.server.config.ServerConfig; import lombok.Builder; import lombok.Getter; @@ -12,16 +13,14 @@ import lombok.ToString; @Builder @ToString public class ReportUserDto { - /* 로그인여부 */ - private boolean isLogin; - /* 마지막 통신 시간 */ - private Long lastTrafficTime; - /* 사용자 ID */ - private String userId; - /* 사용자 접속 IP */ - private String remoteIP; - - private MemberDto memberDto; + private boolean isLogin; //로그인여부 + private Long lastTrafficTime; //마지막 통신 시간 + private String userId; //사용자 ID + private String remoteIP; //사용자 접속 IP + private String queuePath; //저장큐경로 + private ReportQueue reportQueue; //ReportQueue + private MemberDto memberDto; //사용자 정보 + private int command; //요청 command public int isAlive() { if (isLogin) { diff --git a/src/main/java/com/munjaon/server/server/packet/LinkCheck.java b/src/main/java/com/munjaon/server/server/packet/LinkCheck.java index 4c8365b..a1a764f 100644 --- a/src/main/java/com/munjaon/server/server/packet/LinkCheck.java +++ b/src/main/java/com/munjaon/server/server/packet/LinkCheck.java @@ -23,7 +23,7 @@ public final class LinkCheck { public static ByteBuffer makeLinkCheckAckBuffer() { ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + LINK_CHECK_ACK_BODY_LENGTH); Packet.setDefaultByte(buffer); - Header.putHeader(buffer, Header.COMMAND_LINK_CHECK, LINK_CHECK_ACK_BODY_LENGTH); + Header.putHeader(buffer, Header.COMMAND_LINK_CHECK_ACK, LINK_CHECK_ACK_BODY_LENGTH); buffer.put(LINK_CHECK_ACK_BODY_POSITION, LINK_CHECK_ACK_VALUE.getBytes()); return buffer; diff --git a/src/main/java/com/munjaon/server/server/packet/LmsMessage.java b/src/main/java/com/munjaon/server/server/packet/LmsMessage.java new file mode 100644 index 0000000..1737a38 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/LmsMessage.java @@ -0,0 +1,87 @@ +package com.munjaon.server.server.packet; + +import com.munjaon.server.util.CommonUtil; + +import java.nio.ByteBuffer; + +public final class LmsMessage { + public static final int DELIVER_LMS_BODY_LENGTH = 2119; + public static final int DELIVER_LMS_ACK_BODY_LENGTH = 21; + + /* DELIVER */ + /* SUBJECT */ + public static final int DELIVER_SUBJECT_LENGTH = 40; + public static final int DELIVER_SUBJECT_POSITION = CommonMessage.DELIVER_MSG_TYPE_POSITION + CommonMessage.DELIVER_MSG_TYPE_LENGTH; + /* MESSAGE */ + public static final int DELIVER_MESSAGE_LENGTH = 2000; + public static final int DELIVER_MESSAGE_POSITION = DELIVER_SUBJECT_POSITION + DELIVER_SUBJECT_LENGTH; + + public static void putSubjectForDeliver(ByteBuffer buffer, String subject) { + if (buffer == null || subject == null) { + return; + } + subject = CommonUtil.cutString(subject, DELIVER_SUBJECT_LENGTH); + buffer.put(DELIVER_SUBJECT_POSITION, subject.getBytes()); + } + public static String getSubjectForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_SUBJECT_POSITION); + byte[] destArray = new byte[DELIVER_SUBJECT_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putMessageForDeliver(ByteBuffer buffer, String message) { + if (buffer == null || message == null) { + return; + } + message = CommonUtil.cutString(message, DELIVER_MESSAGE_LENGTH); + buffer.put(DELIVER_MESSAGE_POSITION, message.getBytes()); + } + public static String getMessageForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_MESSAGE_POSITION); + byte[] destArray = new byte[DELIVER_MESSAGE_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static ByteBuffer makeDeliverAckBuffer(String msgId, String status) { + ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + DELIVER_LMS_ACK_BODY_LENGTH); + Packet.setDefaultByte(buffer); + Header.putHeader(buffer, Header.COMMAND_DELIVER_ACK, DELIVER_LMS_ACK_BODY_LENGTH); + buffer.put(CommonMessage.DELIVER_ACK_MESSAGE_ID_POSITION, msgId.getBytes()); + buffer.put(CommonMessage.DELIVER_ACK_RESULT_POSITION, status.getBytes()); + + return buffer; + } +// public static void makeDataForDeliver(ByteBuffer buffer, MunjaonMsg data) { +// if (buffer == null || data == null) { +// return; +// } +// /* MSG_ID */ +// CommonMessage.putMessageIdForDeliver(buffer, data.getMsgId()); +// /* SENDER */ +// CommonMessage.putSenderForDeliver(buffer, data.getSendPhone()); +// /* RECEIVER */ +// CommonMessage.putReceiverForDeliver(buffer, data.getRecvPhone()); +// /* RESERVE_TIME */ +// CommonMessage.putReserveTimeForDeliver(buffer, data.getReserveDate()); +// /* REQUEST_TIME */ +// CommonMessage.putRequestTimeForDeliver(buffer, data.getRequestDate()); +// /* MSG_TYPE */ +// CommonMessage.putMsgTypeForDeliver(buffer, data.getMsgType()); +// /* Subject */ +// putSubjectForDeliver(buffer, data.getSubject()); +// /* MSG */ +// putMessageForDeliver(buffer, data.getMessage()); +// } +} diff --git a/src/main/java/com/munjaon/server/server/packet/MmsMessage.java b/src/main/java/com/munjaon/server/server/packet/MmsMessage.java new file mode 100644 index 0000000..fbb368b --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/MmsMessage.java @@ -0,0 +1,104 @@ +package com.munjaon.server.server.packet; + +import com.munjaon.server.util.CommonUtil; + +import java.nio.ByteBuffer; + +public final class MmsMessage { + public static final int DELIVER_MMS_BODY_LENGTH = 2120; + public static final int DELIVER_MMS_ACK_BODY_LENGTH = 21; + + /* DELIVER */ + /* SUBJECT */ + public static final int DELIVER_SUBJECT_LENGTH = 40; + public static final int DELIVER_SUBJECT_POSITION = CommonMessage.DELIVER_MSG_TYPE_POSITION + CommonMessage.DELIVER_MSG_TYPE_LENGTH; + /* MESSAGE */ + public static final int DELIVER_MESSAGE_LENGTH = 2000; + public static final int DELIVER_MESSAGE_POSITION = DELIVER_SUBJECT_POSITION + DELIVER_SUBJECT_LENGTH; + /* FILECOUNT */ + public static final int DELIVER_FILECOUNT_LENGTH = 1; + public static final int DELIVER_FILECOUNT_POSITION = DELIVER_MESSAGE_POSITION + DELIVER_MESSAGE_LENGTH; + + public static void putSubjectForDeliver(ByteBuffer buffer, String subject) { + if (buffer == null || subject == null) { + return; + } + subject = CommonUtil.cutString(subject, DELIVER_SUBJECT_LENGTH); + buffer.put(DELIVER_SUBJECT_POSITION, subject.getBytes()); + } + public static String getSubjectForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_SUBJECT_POSITION); + byte[] destArray = new byte[DELIVER_SUBJECT_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putMessageForDeliver(ByteBuffer buffer, String message) { + if (buffer == null || message == null) { + return; + } + message = CommonUtil.cutString(message, DELIVER_MESSAGE_LENGTH); + buffer.put(DELIVER_MESSAGE_POSITION, message.getBytes()); + } + public static String getMessageForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_MESSAGE_POSITION); + byte[] destArray = new byte[DELIVER_MESSAGE_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putFileCountForDeliver(ByteBuffer buffer, int fileCount) { + putFileCountForDeliver(buffer, Integer.toString(fileCount)); + } + + public static void putFileCountForDeliver(ByteBuffer buffer, String fileCount) { + if (buffer == null || fileCount == null) { + return; + } + buffer.put(DELIVER_FILECOUNT_POSITION, fileCount.getBytes()); + } + public static String getFileCountForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_FILECOUNT_POSITION); + byte[] destArray = new byte[DELIVER_FILECOUNT_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } +// public static void makeDataForDeliver(ByteBuffer buffer, MunjaonMsg data) { +// if (buffer == null || data == null) { +// return; +// } +// /* MSG_ID */ +// CommonMessage.putMessageIdForDeliver(buffer, data.getMsgId()); +// /* SENDER */ +// CommonMessage.putSenderForDeliver(buffer, data.getSendPhone()); +// /* RECEIVER */ +// CommonMessage.putReceiverForDeliver(buffer, data.getRecvPhone()); +// /* RESERVE_TIME */ +// CommonMessage.putReserveTimeForDeliver(buffer, data.getReserveDate()); +// /* REQUEST_TIME */ +// CommonMessage.putRequestTimeForDeliver(buffer, data.getRequestDate()); +// /* MSG_TYPE */ +// CommonMessage.putMsgTypeForDeliver(buffer, data.getMsgType()); +// /* Subject */ +// putSubjectForDeliver(buffer, data.getSubject()); +// /* MSG */ +// putMessageForDeliver(buffer, data.getMessage()); +// /* FileCount */ +// putFileCountForDeliver(buffer, data.getFileCount()); +// } +} diff --git a/src/main/java/com/munjaon/server/server/packet/Packet.java b/src/main/java/com/munjaon/server/server/packet/Packet.java index 42a5b5b..a0e3946 100644 --- a/src/main/java/com/munjaon/server/server/packet/Packet.java +++ b/src/main/java/com/munjaon/server/server/packet/Packet.java @@ -48,10 +48,21 @@ public final class Packet { if (dest.capacity() != (srcHead.capacity() + srcBody.capacity())) { return; } - byte[] srcHeadArray = srcHead.array(); - byte[] srcBodyArray = srcBody.array(); + for (int i = 0; i < srcHead.capacity(); i++) { + dest.put(i, srcHead.get(i)); + } + for (int i = 0; i < srcBody.capacity(); i++) { + dest.put((i + srcHead.capacity()), srcBody.get(i)); + } + } - dest.put(0, srcHeadArray); - dest.put(srcHeadArray.length, srcBodyArray); + public static void printBuffer(ByteBuffer buffer) { + if (buffer == null) { + return; + } + byte[] srcArray = buffer.array(); + for (int i = 0; i < srcArray.length; i++) { + System.out.print(srcArray[i] + " "); + } } } diff --git a/src/main/java/com/munjaon/server/server/packet/Report.java b/src/main/java/com/munjaon/server/server/packet/Report.java new file mode 100644 index 0000000..9bea11f --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/Report.java @@ -0,0 +1,160 @@ +package com.munjaon.server.server.packet; + +import com.munjaon.server.server.dto.ReportDto; + +import java.nio.ByteBuffer; + +public final class Report { + /* MSG_ID (Length : 20 / Position : 0) */ + public static final int REPORT_MSG_ID_LENGTH = 20; + public static final int REPORT_MSG_ID_POSITION = Header.HEADER_LENGTH; + + /* AGENT_CODE (Length : 2 / Position : 20) */ + public static final int REPORT_AGENT_CODE_LENGTH = 2; + public static final int REPORT_AGENT_CODE_POSITION = REPORT_MSG_ID_POSITION + REPORT_MSG_ID_LENGTH; + + /* SEND_TIME (Length : 14 / Position : 22) */ + public static final int REPORT_SEND_TIME_LENGTH = 14; + public static final int REPORT_SEND_TIME_POSITION = REPORT_AGENT_CODE_POSITION + REPORT_AGENT_CODE_LENGTH; + + /* TELECOM (Length : 3 / Position : 36) */ + public static final int REPORT_TELECOM_LENGTH = 3; + public static final int REPORT_TELECOM_POSITION = REPORT_SEND_TIME_POSITION + REPORT_SEND_TIME_LENGTH; + + /* RESULT (Length : 5 / Position : 39) */ + public static final int REPORT_RESULT_LENGTH = 5; + public static final int REPORT_RESULT_POSITION = REPORT_TELECOM_POSITION + REPORT_TELECOM_LENGTH; + + /* Report 길이 */ + public static final int REPORT_BODY_LENGTH = REPORT_RESULT_POSITION + REPORT_RESULT_LENGTH; + + /* RESULT (Length : 5 / Position : 39) */ + public static final int REPORT_ACK_RESULT_LENGTH = 1; + public static final int REPORT_ACK_RESULT_POSITION = 0; + + public static final int REPORT_ACK_BODY_LENGTH = REPORT_ACK_RESULT_POSITION + REPORT_ACK_RESULT_LENGTH; + + public static void putReport(final ByteBuffer buffer, final ReportDto reportDto) { + if (reportDto == null) { + return; + } + putMsgId(buffer, reportDto.getMsgId()); + putAgentCode(buffer, reportDto.getAgentCode()); + putSendTime(buffer, reportDto.getRsltDate()); + putTelecom(buffer, reportDto.getRsltNet()); + putResult(buffer, reportDto.getRsltCode()); + } + public static void putMsgId(final ByteBuffer buffer, String msgId) { + if (buffer == null || msgId == null) { + return; + } + buffer.put(REPORT_MSG_ID_POSITION, msgId.getBytes()); + } + + public static String getMsgId(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(REPORT_MSG_ID_POSITION); + byte[] destArray = new byte[REPORT_MSG_ID_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putAgentCode(final ByteBuffer buffer, String agentCode) { + if (buffer == null || agentCode == null) { + return; + } + buffer.put(REPORT_AGENT_CODE_POSITION, agentCode.getBytes()); + } + + public static String getAgentCode(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(REPORT_AGENT_CODE_POSITION); + byte[] destArray = new byte[REPORT_AGENT_CODE_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putSendTime(final ByteBuffer buffer, String sendTime) { + if (buffer == null || sendTime == null) { + return; + } + buffer.put(REPORT_SEND_TIME_POSITION, sendTime.getBytes()); + } + + public static String getSendTime(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(REPORT_SEND_TIME_POSITION); + byte[] destArray = new byte[REPORT_SEND_TIME_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putTelecom(final ByteBuffer buffer, String telecom) { + if (buffer == null || telecom == null) { + return; + } + buffer.put(REPORT_TELECOM_POSITION, telecom.getBytes()); + } + + public static String getTelecom(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(REPORT_TELECOM_POSITION); + byte[] destArray = new byte[REPORT_TELECOM_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putResult(final ByteBuffer buffer, String result) { + if (buffer == null || result == null) { + return; + } + buffer.put(REPORT_RESULT_POSITION, result.getBytes()); + } + + public static String getResult(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(REPORT_RESULT_POSITION); + byte[] destArray = new byte[REPORT_RESULT_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putResultAck(final ByteBuffer buffer, String result) { + if (buffer == null || result == null) { + return; + } + buffer.put(REPORT_ACK_RESULT_POSITION, result.getBytes()); + } + + public static String getResultAck(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(REPORT_ACK_RESULT_POSITION); + byte[] destArray = new byte[REPORT_ACK_RESULT_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } +} diff --git a/src/main/java/com/munjaon/server/server/queue/CollectUserQueue.java b/src/main/java/com/munjaon/server/server/queue/CollectUserQueue.java new file mode 100644 index 0000000..53c2935 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/queue/CollectUserQueue.java @@ -0,0 +1,118 @@ +package com.munjaon.server.server.queue; + +import com.munjaon.server.server.dto.ConnectUserDto; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class CollectUserQueue { + private final Object lockObject = new Object(); // Lock Object + private final Map smstUserQueue = new LinkedHashMap<>(); + private final Map lmsUserQueue = new LinkedHashMap<>(); + private final Map mmsUserQueue = new LinkedHashMap<>(); + private final Map katUserQueue = new LinkedHashMap<>(); + private final Map kftUserQueue = new LinkedHashMap<>(); + private static CollectUserQueue collectUserQueue; + + private CollectUserQueue() {} + + public synchronized static CollectUserQueue getInstance() { + if (collectUserQueue == null) { + collectUserQueue = new CollectUserQueue(); + } + return collectUserQueue; + } + + public void putUser(final String serviceType, final ConnectUserDto user) { + synchronized (lockObject) { + switch (serviceType) { + case "SMS" : smstUserQueue.put(user.getUserId(), user); break; + case "LMS" : lmsUserQueue.put(user.getUserId(), user); break; + case "MMS" : mmsUserQueue.put(user.getUserId(), user); break; + case "KAT" : katUserQueue.put(user.getUserId(), user); break; + case "KFT" : kftUserQueue.put(user.getUserId(), user); break; + default: break; + } + } + } + + public ConnectUserDto getUser(final String serviceType, final String userId) { + synchronized (lockObject) { + switch (serviceType) { + case "SMS" : return smstUserQueue.get(userId); + case "LMS" : return lmsUserQueue.get(userId); + case "MMS" : return mmsUserQueue.get(userId); + case "KAT" : return katUserQueue.get(userId); + case "KFT" : return kftUserQueue.get(userId); + default: return null; + } + } + } + + public List getUsers(final String serviceType) { + synchronized (lockObject) { + switch (serviceType) { + case "SMS" : return new ArrayList<>(smstUserQueue.values()); + case "LMS" : return new ArrayList<>(lmsUserQueue.values()); + case "MMS" : return new ArrayList<>(mmsUserQueue.values()); + case "KAT" : return new ArrayList<>(katUserQueue.values()); + case "KFT" : return new ArrayList<>(kftUserQueue.values()); + default: return null; + } + } + } + + public boolean isExist(final String serviceType, final String userId) { + synchronized (lockObject) { + switch (serviceType) { + case "SMS" : return smstUserQueue.containsKey(userId); + case "LMS" : return lmsUserQueue.containsKey(userId); + case "MMS" : return mmsUserQueue.containsKey(userId); + case "KAT" : return katUserQueue.containsKey(userId); + case "KFT" : return kftUserQueue.containsKey(userId); + default: return false; + } + } + } + + public void removeUser(final String serviceType, final String userId) { + synchronized (lockObject) { + switch (serviceType) { + case "SMS" : smstUserQueue.remove(userId); break; + case "LMS" : lmsUserQueue.remove(userId); break; + case "MMS" : mmsUserQueue.remove(userId); break; + case "KAT" : katUserQueue.remove(userId); break; + case "KFT" : kftUserQueue.remove(userId); break; + default: break; + } + } + } + + public int size(final String serviceType) { + synchronized (lockObject) { + switch (serviceType) { + case "SMS" : return smstUserQueue.size(); + case "LMS" : return lmsUserQueue.size(); + case "MMS" : return mmsUserQueue.size(); + case "KAT" : return katUserQueue.size(); + case "KFT" : return kftUserQueue.size(); + default: return 0; + } + } + } + + public boolean isEmpty(final String serviceType) { + synchronized (lockObject) { + switch (serviceType) { + case "SMS" : return smstUserQueue.isEmpty(); + case "LMS" : return lmsUserQueue.isEmpty(); + case "MMS" : return mmsUserQueue.isEmpty(); + case "KAT" : return katUserQueue.isEmpty(); + case "KFT" : return kftUserQueue.isEmpty(); + default: return true; + } + } + } +} diff --git a/src/main/java/com/munjaon/server/server/queue/ReportUserQueue.java b/src/main/java/com/munjaon/server/server/queue/ReportUserQueue.java new file mode 100644 index 0000000..82d0260 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/queue/ReportUserQueue.java @@ -0,0 +1,76 @@ +package com.munjaon.server.server.queue; + +import com.munjaon.server.queue.pool.ReportQueue; +import com.munjaon.server.server.dto.ReportUserDto; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class ReportUserQueue { + private final Object lockObject = new Object(); // Lock Object + private final Map connectUserQueue = new LinkedHashMap<>(); + private static ReportUserQueue reportUserQueue; + + private ReportUserQueue() {} + + public synchronized static ReportUserQueue getInstance() { + if (reportUserQueue == null) { + reportUserQueue = new ReportUserQueue(); + } + + return reportUserQueue; + } + + public void putUser(final ReportUserDto user) { + synchronized (lockObject) { + connectUserQueue.put(user.getUserId(), user); + } + } + + public ReportUserDto getUser(final String userId) { + synchronized (lockObject) { + return connectUserQueue.get(userId); + } + } + + public List getUsers() { + synchronized (lockObject) { + return new ArrayList<>(connectUserQueue.values()); + } + } + + public boolean isExist(final String userId) { + synchronized (lockObject) { + return connectUserQueue.containsKey(userId); + } + } + + public void removeUser(final String userId) { + synchronized (lockObject) { + ReportUserDto userDto = connectUserQueue.remove(userId); + if (userDto != null) { + ReportQueue queue = userDto.getReportQueue(); + try { + queue.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + } + + public int size() { + synchronized (lockObject) { + return connectUserQueue.size(); + } + } + + public boolean isEmpty() { + synchronized (lockObject) { + return connectUserQueue.isEmpty(); + } + } +} diff --git a/src/main/java/com/munjaon/server/server/sample/Main.java b/src/main/java/com/munjaon/server/server/sample/Main.java new file mode 100644 index 0000000..a055629 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/sample/Main.java @@ -0,0 +1,34 @@ +package com.munjaon.server.server.sample; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +public class Main { + public static void main(String[] args) throws InterruptedException, ExecutionException { + ExecutorService e = Executors.newFixedThreadPool(5); + // 리턴 값이 필요 없는 경우 +// for (int i = 0; i < 5; i++) { +// e.execute(new MyRunnable(i * 20)); +// } + // 작업을 기다려야 할 경우 + //e.shutdown(); + + // 리턴 값이 필요한 경우 + List> l = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Future f = e.submit(new MyCallable(i * 20)); + l.add(f); + } + + // 작업이 완료 되길 기다려야 할 경우 + int n = 0; + for (int i = 0; i < 5; i++) { + Future f = l.get(i); + n += f.get(); + } + } +} diff --git a/src/main/java/com/munjaon/server/server/sample/MyCallable.java b/src/main/java/com/munjaon/server/server/sample/MyCallable.java new file mode 100644 index 0000000..896541d --- /dev/null +++ b/src/main/java/com/munjaon/server/server/sample/MyCallable.java @@ -0,0 +1,27 @@ +package com.munjaon.server.server.sample; + +import java.util.concurrent.Callable; + +public class MyCallable implements Callable { + private final int s; + + public MyCallable(int s) { + this.s = s; + } + + @Override + public Integer call() throws Exception { + int in = 0; + for (int i = 0; i < 20; i++) { + System.out.println(s + " : MyCallable"); + System.out.println(i + s); + in += i + s; + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + return in; + } +} diff --git a/src/main/java/com/munjaon/server/server/sample/MyRunnable.java b/src/main/java/com/munjaon/server/server/sample/MyRunnable.java new file mode 100644 index 0000000..505e9f3 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/sample/MyRunnable.java @@ -0,0 +1,20 @@ +package com.munjaon.server.server.sample; + +public class MyRunnable implements Runnable { + private final int s; + public MyRunnable(int s) { + this.s = s; + } + + public void run() { + for (int i = 0; i < 20; i++) { + System.out.println(s + " : MyRunnable"); + System.out.println(i + s); + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/src/main/java/com/munjaon/server/server/service/CollectBackServerService.java b/src/main/java/com/munjaon/server/server/service/CollectBackServerService.java new file mode 100644 index 0000000..dd0a287 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/service/CollectBackServerService.java @@ -0,0 +1,403 @@ +package com.munjaon.server.server.service; + +import com.munjaon.server.cache.dto.MemberDto; +import com.munjaon.server.cache.enums.CacheService; +import com.munjaon.server.cache.service.MemberService; +import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.enums.QueueTypeWorker; +import com.munjaon.server.server.dto.ConnectUserDto; +import com.munjaon.server.server.dto.HeaderDto; +import com.munjaon.server.server.packet.*; +import com.munjaon.server.util.LogUtil; +import lombok.Getter; +import org.json.simple.JSONObject; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class CollectBackServerService extends Service { + private final InetSocketAddress listenAddress; + private CollectorThreadService threadService; + private Selector selector; + private final String serviceType; + + public CollectBackServerService(String serviceName, String serviceType, int port) { + super(serviceName); + this.listenAddress = new InetSocketAddress(port); + this.serviceType = serviceType; + } + @Override + public void checkReady() { + QueueTypeWorker worker = QueueTypeWorker.find(this.serviceType); + if (worker != null && worker.getQueueSize() > 0) { + this.IS_READY_YN = true; + } else { + this.IS_READY_YN = false; + } + } + + @Override + public void initResources() { + try { + initCollectChannel(); + threadService = new CollectorThreadService(8, this.serviceType, logger); + } catch (IOException e) { + saveSystemLog(e); + throw new RuntimeException(e); + } + } + + private void initCollectChannel() throws IOException { + selector = Selector.open(); + /* 채널 생성 */ + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + /* non-Blocking 설정 */ + serverChannel.configureBlocking(false); + /* 서버 ip, port 설정 */ + serverChannel.socket().bind(listenAddress); + /* 채널에 accept 대기 설정 */ + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + } + + private void closeCollectChannel() throws IOException { + selector.close(); + } + + @Override + public void releaseResources() { + try { + closeCollectChannel(); + threadService.close(); + } catch (IOException e) { + saveSystemLog(e); + throw new RuntimeException(e); + } + } + + @Override + public void doService() { + while (isRun()) { + try { + execInterest(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private void execInterest() throws IOException { + if (selector.select(1000) == 0) { + return ; + } + Iterator keys = selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey key = keys.next(); + if (key.isValid()) { + if (key.isAcceptable()) { // 접속일 경우.. + saveSystemLog("isAcceptable"); + threadService.submit(selector, key, 1); + } else if (key.isReadable()) { // 수신일 경우.. + saveSystemLog("isReadable"); + threadService.submit(selector, key, 2); + } else if (key.isWritable()) { // 발신일 경우.. + saveSystemLog("isWritable"); + threadService.submit(selector, key, 3); + } + } + /* 키 셋에서 제거. */ + keys.remove(); + } + } + + @Override + public JSONObject monitorService() { + return null; + } + + private static class CollectorThreadService { + @Getter + private String serviceType; + @Getter + private final int maxCore; + private final ExecutorService executor; + private final Map connectUserMap = new ConcurrentHashMap<>(); + private final LogUtil logger; + + public CollectorThreadService(String serviceType, LogUtil logger) { + this(Runtime.getRuntime().availableProcessors(), serviceType, logger); + } + + public CollectorThreadService(int maxCore, String serviceType, LogUtil logger) { + this.maxCore = maxCore; + this.executor = Executors.newFixedThreadPool(maxCore); + this.logger = logger; + } + + public void submit(Selector selector, SelectionKey key, int interestOps) { + executor.submit(() -> { + switch (interestOps) { + case 1 : accept(selector, key); break; + case 2 : read(selector, key); break; + case 3 : write(selector, key); break; + default : break; + } + }); + } + + private void accept(Selector selector, SelectionKey key) { + try { + /* 키 채널을 가져온다. */ + ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); + /* accept을 해서 Socket 채널을 가져온다. */ + SocketChannel channel = serverChannel.accept(); + channel.configureBlocking(false); + /* 소켓 취득 */ + Socket socket = channel.socket(); + SocketAddress remoteAddr = socket.getRemoteSocketAddress(); + saveSystemLog("accept : " + Thread.currentThread().getName()); + saveSystemLog("Connected to: " + remoteAddr); + // Socket 채널을 channel에 수신 등록한다 + channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +// private void read(Selector selector, SelectionKey key) { +// try { +// saveSystemLog("read : " + Thread.currentThread().getName()); +// // 키 채널을 가져온다. +// SocketChannel channel = (SocketChannel) key.channel(); +// ConnectUserDto userDto = (ConnectUserDto) key.attachment(); +// +// int size = -1; +// +// ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH); +// ByteBuffer bindBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH); +// ByteBuffer linkBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_BODY_LENGTH); +// ByteBuffer msgBuffer = ByteBuffer.allocate(SmsMessage.DELIVER_SMS_BODY_LENGTH); +// try { +// size = channel.read(headBuffer); +// String command = Header.getCommand(headBuffer); +// System.out.println("command : " + command); +// if ("1".equals(command)) { +// size = channel.read(bindBuffer); +// } else if ("3".equals(command)) { +// size = channel.read(msgBuffer); +// } else if ("7".equals(command)) { +// size = channel.read(linkBuffer); +// } else { +// size = -1; +// } +// +// System.out.println("size : " + size); +// } catch (IOException e) {} +// if (size < 0) { +// expireConnectUser(key); +// } else if (size > 0) { +//// String command = Header.getCommand(buffer); +//// saveSystemLog("command : " + command); +//// switch (Integer.parseInt(command)) { +//// case 1 : recvBind(channel, buffer, userDto); break; +//// case 3 : recvDeliver(channel, buffer, userDto); break; +//// case 7 : recvLinkCheck(key); break; +//// default: expireConnectUser(key); break; +//// } +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } +// System.out.println("read"); +// } + + private void read(Selector selector, SelectionKey key) { + try { + saveSystemLog("read : " + Thread.currentThread().getName()); + // 키 채널을 가져온다. + SocketChannel channel = (SocketChannel) key.channel(); + ConnectUserDto userDto = (ConnectUserDto) key.attachment(); + + int size = -1; + ByteBuffer buffer = ByteBuffer.allocate(256); + try { + size = channel.read(buffer); + } catch (IOException e) {} + if (size < 0) { + expireConnectUser(key); + } else if (size > 0) { + String command = Header.getCommand(buffer); + saveSystemLog("command : " + command); + switch (Integer.parseInt(command)) { + case 1 : recvBind(channel, buffer, userDto); break; + case 3 : recvDeliver(channel, buffer, userDto); break; + case 7 : recvLinkCheck(key); break; + default: expireConnectUser(key); break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println("read"); + } + + private void recvBind(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) { + String resultCode = "00"; + try { + String id = Bind.getBindId(buffer); + String pwd = Bind.getBindPwd(buffer); + saveSystemLog("id : " + id); + saveSystemLog("pwd : " + pwd); + if (id == null || pwd == null) { + resultCode = "50"; + } else { + if (connectUserMap.containsKey(id)) { + resultCode = "60"; + } else { + MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService(); + MemberDto memberDto = null; + if (svc != null) { + memberDto = svc.get(id); + } + if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) { + resultCode = "20"; + } else { + userDto.setUserId(id); + userDto.setLogin(true); + userDto.setMemberDto(memberDto); + } + } + } + } catch (Exception e) { + resultCode = "10"; + e.printStackTrace(); + } + + try { + saveSystemLog("resultCode : " + resultCode); + channel.write(Bind.makeBindAckBuffer(resultCode)); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void recvDeliver(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) throws IOException { + BasicMessageDto messageDto = new BasicMessageDto(); + messageDto.setRouterSeq("40"); + messageDto.setServiceType("4"); + messageDto.setUserId(userDto.getUserId()); + messageDto.setRemoteIP(userDto.getRemoteIP()); + messageDto.setSendStatus("0"); + messageDto.setUserMsgID(CommonMessage.getMessageIdForDeliver(buffer)); + messageDto.setUserSender(CommonMessage.getSenderForDeliver(buffer)); + messageDto.setUserReceiver(CommonMessage.getReceiverForDeliver(buffer)); + messageDto.setReserveDt(CommonMessage.getReserveTimeForDeliver(buffer)); + messageDto.setRequestDt(CommonMessage.getRequestTimeForDeliver(buffer)); + messageDto.setUnitCost("10.4"); + messageDto.setUserMessage(SmsMessage.getMessageForDeliver(buffer)); + + QueueTypeWorker worker = QueueTypeWorker.find("SMS"); + if (worker != null) { + worker.pushQueue(messageDto); + channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + } + } + private void recvLinkCheck(SelectionKey key) throws IOException { + SocketChannel channel = (SocketChannel) key.channel(); + channel.write(LinkCheck.makeLinkCheckAckBuffer()); + } + private void expireConnectUser(SelectionKey key) { + if (key == null || !key.isValid()) { + return; + } + try { + SocketChannel channel = (SocketChannel) key.channel(); + ConnectUserDto userDto = (ConnectUserDto) key.attachment(); + if (userDto != null && userDto.getUserId() != null) { + connectUserMap.remove(userDto.getUserId()); + key.attach(null); + } + // 소켓 채널 닫기 + channel.close(); + // 키 닫기 + key.cancel(); + } catch (IOException e) { + e.printStackTrace(); + } + } + private HeaderDto getHeader(SocketChannel channel) { + HeaderDto headerDto = HeaderDto.builder().build(); + int size = -1; + ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH); + if (channel != null) { + try { + saveSystemLog("Key is valid : "); +// SocketChannel channel = (SocketChannel) key.channel(); + size = channel.read(buffer); + } catch (IOException e) {} + } + + if (size < 0) { + saveSystemLog("Is Error : "); + headerDto.setError(true); + } else { + saveSystemLog("version : " + Header.getVersion(buffer)); + saveSystemLog("Command : " + Header.getCommand(buffer)); + saveSystemLog("BodyLength : " + Header.getBodyLength(buffer)); + headerDto.setVersion(Header.getVersion(buffer)); + headerDto.setCommand(Integer.parseInt(Header.getCommand(buffer))); + headerDto.setBodyLength(Integer.parseInt(Header.getBodyLength(buffer))); + } + + saveSystemLog("READ HEADER : " + size); + + return headerDto; + } + + private void write(Selector selector, SelectionKey key) { + System.out.println("write"); + } + + private void saveSystemLog(Object obj) { + saveLog(obj, true); + } + + private void saveLog(Object obj) { + saveLog(obj, false); + } + + private void saveLog(Object obj, boolean isConsoleOutput) { + if (isConsoleOutput) { + System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{" + serviceType + "}} " + obj); + } + + if (logger == null) { + return; + } + + logger.log(obj); + } + + public void close() { + List unfinishedTasks = executor.shutdownNow(); + connectUserMap.clear(); + if (!unfinishedTasks.isEmpty()) { + saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size()); + } + } + } +} diff --git a/src/main/java/com/munjaon/server/server/service/CollectServerService.java b/src/main/java/com/munjaon/server/server/service/CollectServerService.java index ff5e230..cce8ab5 100644 --- a/src/main/java/com/munjaon/server/server/service/CollectServerService.java +++ b/src/main/java/com/munjaon/server/server/service/CollectServerService.java @@ -1,13 +1,9 @@ package com.munjaon.server.server.service; -import com.munjaon.server.cache.dto.MemberDto; -import com.munjaon.server.cache.enums.CacheService; -import com.munjaon.server.cache.service.MemberService; -import com.munjaon.server.queue.dto.BasicMessageDto; import com.munjaon.server.queue.enums.QueueTypeWorker; import com.munjaon.server.server.dto.ConnectUserDto; -import com.munjaon.server.server.dto.HeaderDto; -import com.munjaon.server.server.packet.*; +import com.munjaon.server.server.queue.CollectUserQueue; +import com.munjaon.server.server.task.CollectReadTask; import com.munjaon.server.util.LogUtil; import lombok.Getter; import org.json.simple.JSONObject; @@ -16,31 +12,33 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; public class CollectServerService extends Service { private final InetSocketAddress listenAddress; - private CollectorThreadService threadService; + private CollectThreadService threadService; + private CollectUserQueue collectUserQueue = CollectUserQueue.getInstance(); private Selector selector; private final String serviceType; + private int readMaxCore; public CollectServerService(String serviceName, String serviceType, int port) { super(serviceName); + this.readMaxCore = Integer.parseInt(getProp("READ_MAX_CORE").trim()); this.listenAddress = new InetSocketAddress(port); this.serviceType = serviceType; } + @Override public void checkReady() { QueueTypeWorker worker = QueueTypeWorker.find(this.serviceType); @@ -55,7 +53,7 @@ public class CollectServerService extends Service { public void initResources() { try { initCollectChannel(); - threadService = new CollectorThreadService(8, this.serviceType, logger); + threadService = new CollectThreadService(readMaxCore, this.serviceType, logger); } catch (IOException e) { saveSystemLog(e); throw new RuntimeException(e); @@ -105,22 +103,78 @@ public class CollectServerService extends Service { return ; } Iterator keys = selector.selectedKeys().iterator(); + List> list = new ArrayList<>(); while (keys.hasNext()) { SelectionKey key = keys.next(); + /* 키 셋에서 제거. */ + keys.remove(); + if (key.isValid()) { if (key.isAcceptable()) { // 접속일 경우.. saveSystemLog("isAcceptable"); - threadService.submit(selector, key, 1); + accept(selector, key); } else if (key.isReadable()) { // 수신일 경우.. saveSystemLog("isReadable"); - threadService.submit(selector, key, 2); - } else if (key.isWritable()) { // 발신일 경우.. - saveSystemLog("isWritable"); - threadService.submit(selector, key, 3); + Future future = threadService.submit(new CollectReadTask(selector, key, this.serviceType, logger)); + list.add(future); +// threadService.submit(selector, key, 2); } + } else { + expireConnectUser(key); } - /* 키 셋에서 제거. */ - keys.remove(); + } + for (Future future : list) { + ConnectUserDto connectUserDto = null; + try { + connectUserDto = future.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + + if (connectUserDto == null) { + saveSystemLog("Future : " + future); + } else { + saveSystemLog("Future : " + connectUserDto.toString()); + } + } + } + + private void accept(Selector selector, SelectionKey key) { + try { + /* 키 채널을 가져온다. */ + ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); + /* accept을 해서 Socket 채널을 가져온다. */ + SocketChannel channel = serverChannel.accept(); + channel.configureBlocking(false); + /* 소켓 취득 */ + Socket socket = channel.socket(); + SocketAddress remoteAddr = socket.getRemoteSocketAddress(); + saveSystemLog("Connected to: " + remoteAddr); + // Socket 채널을 channel에 수신 등록한다 + channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void expireConnectUser(SelectionKey key) { + if (key == null || !key.isValid()) { + return; + } + try { + SocketChannel channel = (SocketChannel) key.channel(); + ConnectUserDto userDto = (ConnectUserDto) key.attachment(); + if (userDto != null && userDto.getUserId() != null) { + collectUserQueue.removeUser(this.serviceType, userDto.getUserId()); +// connectUserMap.remove(userDto.getUserId()); + key.attach(null); + } + // 소켓 채널 닫기 + channel.close(); + // 키 닫기 + key.cancel(); + } catch (IOException e) { + e.printStackTrace(); } } @@ -129,7 +183,7 @@ public class CollectServerService extends Service { return null; } - private static class CollectorThreadService { + private static class CollectThreadService { @Getter private String serviceType; @Getter @@ -138,238 +192,18 @@ public class CollectServerService extends Service { private final Map connectUserMap = new ConcurrentHashMap<>(); private final LogUtil logger; - public CollectorThreadService(String serviceType, LogUtil logger) { + public CollectThreadService(String serviceType, LogUtil logger) { this(Runtime.getRuntime().availableProcessors(), serviceType, logger); } - public CollectorThreadService(int maxCore, String serviceType, LogUtil logger) { + public CollectThreadService(int maxCore, String serviceType, LogUtil logger) { this.maxCore = maxCore; this.executor = Executors.newFixedThreadPool(maxCore); this.logger = logger; } - public void submit(Selector selector, SelectionKey key, int interestOps) { - executor.submit(() -> { - switch (interestOps) { - case 1 : accept(selector, key); break; - case 2 : read(selector, key); break; - case 3 : write(selector, key); break; - default : break; - } - }); - } - - private void accept(Selector selector, SelectionKey key) { - try { - /* 키 채널을 가져온다. */ - ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); - /* accept을 해서 Socket 채널을 가져온다. */ - SocketChannel channel = serverChannel.accept(); - channel.configureBlocking(false); - /* 소켓 취득 */ - Socket socket = channel.socket(); - SocketAddress remoteAddr = socket.getRemoteSocketAddress(); - saveSystemLog("accept : " + Thread.currentThread().getName()); - saveSystemLog("Connected to: " + remoteAddr); - // Socket 채널을 channel에 수신 등록한다 - channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - -// private void read(Selector selector, SelectionKey key) { -// try { -// saveSystemLog("read : " + Thread.currentThread().getName()); -// // 키 채널을 가져온다. -// SocketChannel channel = (SocketChannel) key.channel(); -// ConnectUserDto userDto = (ConnectUserDto) key.attachment(); -// -// int size = -1; -// -// ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH); -// ByteBuffer bindBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH); -// ByteBuffer linkBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_BODY_LENGTH); -// ByteBuffer msgBuffer = ByteBuffer.allocate(SmsMessage.DELIVER_SMS_BODY_LENGTH); -// try { -// size = channel.read(headBuffer); -// String command = Header.getCommand(headBuffer); -// System.out.println("command : " + command); -// if ("1".equals(command)) { -// size = channel.read(bindBuffer); -// } else if ("3".equals(command)) { -// size = channel.read(msgBuffer); -// } else if ("7".equals(command)) { -// size = channel.read(linkBuffer); -// } else { -// size = -1; -// } -// -// System.out.println("size : " + size); -// } catch (IOException e) {} -// if (size < 0) { -// expireConnectUser(key); -// } else if (size > 0) { -//// String command = Header.getCommand(buffer); -//// saveSystemLog("command : " + command); -//// switch (Integer.parseInt(command)) { -//// case 1 : recvBind(channel, buffer, userDto); break; -//// case 3 : recvDeliver(channel, buffer, userDto); break; -//// case 7 : recvLinkCheck(key); break; -//// default: expireConnectUser(key); break; -//// } -// } -// } catch (Exception e) { -// e.printStackTrace(); -// } -// System.out.println("read"); -// } - - private void read(Selector selector, SelectionKey key) { - try { - saveSystemLog("read : " + Thread.currentThread().getName()); - // 키 채널을 가져온다. - SocketChannel channel = (SocketChannel) key.channel(); - ConnectUserDto userDto = (ConnectUserDto) key.attachment(); - - int size = -1; - ByteBuffer buffer = ByteBuffer.allocate(256); - try { - size = channel.read(buffer); - } catch (IOException e) {} - if (size < 0) { - expireConnectUser(key); - } else if (size > 0) { - String command = Header.getCommand(buffer); - saveSystemLog("command : " + command); - switch (Integer.parseInt(command)) { - case 1 : recvBind(channel, buffer, userDto); break; - case 3 : recvDeliver(channel, buffer, userDto); break; - case 7 : recvLinkCheck(key); break; - default: expireConnectUser(key); break; - } - } - } catch (Exception e) { - e.printStackTrace(); - } - System.out.println("read"); - } - - private void recvBind(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) { - String resultCode = "00"; - try { - String id = Bind.getBindId(buffer); - String pwd = Bind.getBindPwd(buffer); - saveSystemLog("id : " + id); - saveSystemLog("pwd : " + pwd); - if (id == null || pwd == null) { - resultCode = "50"; - } else { - if (connectUserMap.containsKey(id)) { - resultCode = "60"; - } else { - MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService(); - MemberDto memberDto = null; - if (svc != null) { - memberDto = svc.get(id); - } - if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) { - resultCode = "20"; - } else { - userDto.setUserId(id); - userDto.setLogin(true); - userDto.setMemberDto(memberDto); - } - } - } - } catch (Exception e) { - resultCode = "10"; - e.printStackTrace(); - } - - try { - saveSystemLog("resultCode : " + resultCode); - channel.write(Bind.makeBindAckBuffer(resultCode)); - } catch (IOException e) { - e.printStackTrace(); - } - } - - private void recvDeliver(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) throws IOException { - BasicMessageDto messageDto = new BasicMessageDto(); - messageDto.setRouterSeq("40"); - messageDto.setServiceType("4"); - messageDto.setUserId(userDto.getUserId()); - messageDto.setRemoteIP(userDto.getRemoteIP()); - messageDto.setSendStatus("0"); - messageDto.setUserMsgID(CommonMessage.getMessageIdForDeliver(buffer)); - messageDto.setUserSender(CommonMessage.getSenderForDeliver(buffer)); - messageDto.setUserReceiver(CommonMessage.getReceiverForDeliver(buffer)); - messageDto.setReserveDt(CommonMessage.getReserveTimeForDeliver(buffer)); - messageDto.setRequestDt(CommonMessage.getRequestTimeForDeliver(buffer)); - messageDto.setUnitCost("10.4"); - messageDto.setUserMessage(SmsMessage.getMessageForDeliver(buffer)); - - QueueTypeWorker worker = QueueTypeWorker.find("SMS"); - if (worker != null) { - worker.pushQueue(messageDto); - channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); - } - } - private void recvLinkCheck(SelectionKey key) throws IOException { - SocketChannel channel = (SocketChannel) key.channel(); - channel.write(LinkCheck.makeLinkCheckAckBuffer()); - } - private void expireConnectUser(SelectionKey key) { - if (key == null || !key.isValid()) { - return; - } - try { - SocketChannel channel = (SocketChannel) key.channel(); - ConnectUserDto userDto = (ConnectUserDto) key.attachment(); - if (userDto != null && userDto.getUserId() != null) { - connectUserMap.remove(userDto.getUserId()); - key.attach(null); - } - // 소켓 채널 닫기 - channel.close(); - // 키 닫기 - key.cancel(); - } catch (IOException e) { - e.printStackTrace(); - } - } - private HeaderDto getHeader(SocketChannel channel) { - HeaderDto headerDto = HeaderDto.builder().build(); - int size = -1; - ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH); - if (channel != null) { - try { - saveSystemLog("Key is valid : "); -// SocketChannel channel = (SocketChannel) key.channel(); - size = channel.read(buffer); - } catch (IOException e) {} - } - - if (size < 0) { - saveSystemLog("Is Error : "); - headerDto.setError(true); - } else { - saveSystemLog("version : " + Header.getVersion(buffer)); - saveSystemLog("Command : " + Header.getCommand(buffer)); - saveSystemLog("BodyLength : " + Header.getBodyLength(buffer)); - headerDto.setVersion(Header.getVersion(buffer)); - headerDto.setCommand(Integer.parseInt(Header.getCommand(buffer))); - headerDto.setBodyLength(Integer.parseInt(Header.getBodyLength(buffer))); - } - - saveSystemLog("READ HEADER : " + size); - - return headerDto; - } - - private void write(Selector selector, SelectionKey key) { - System.out.println("write"); + public Future submit(CollectReadTask collectReadTask) { + return executor.submit(collectReadTask); } private void saveSystemLog(Object obj) { diff --git a/src/main/java/com/munjaon/server/server/service/ReportQueueServerService.java b/src/main/java/com/munjaon/server/server/service/ReportQueueServerService.java new file mode 100644 index 0000000..4d27f09 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/service/ReportQueueServerService.java @@ -0,0 +1,116 @@ +package com.munjaon.server.server.service; + +import com.munjaon.server.server.dto.ReportUserDto; +import com.munjaon.server.server.queue.ReportUserQueue; +import com.munjaon.server.server.task.ReportQueueTask; +import com.munjaon.server.util.LogUtil; +import lombok.Getter; +import org.json.simple.JSONObject; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class ReportQueueServerService extends Service { + private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance(); + private QueueThreadService threadService; + private int queueMaxCore; + + public ReportQueueServerService(String serviceName) { + super(serviceName); + this.queueMaxCore = Integer.parseInt(getProp("QUEUE_MAX_CORE").trim()); + } + + @Override + public void checkReady() { + this.IS_READY_YN = true; + } + + @Override + public void initResources() { + threadService = new QueueThreadService(queueMaxCore, logger); + } + + @Override + public void releaseResources() { + threadService.close(); + } + + @Override + public void doService() { + while (isRun()) { + try { + doQueueService(); + Thread.sleep(100); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private void doQueueService() { + List reportUserList = reportUserQueue.getUsers(); + if (reportUserList == null || reportUserList.size() == 0) { + return; + } + + for (ReportUserDto reportUserDto : reportUserList) { + threadService.execute(new ReportQueueTask(reportUserDto, logger)); + } + } + + @Override + public JSONObject monitorService() { + return null; + } + + private static class QueueThreadService { + @Getter + private final int maxCore; + private final ExecutorService executor; + private final LogUtil logger; + + public QueueThreadService(LogUtil logger) { + this(Runtime.getRuntime().availableProcessors(), logger); + } + + public QueueThreadService(int maxCore, LogUtil logger) { + this.maxCore = maxCore; + this.executor = Executors.newFixedThreadPool(maxCore); + this.logger = logger; + } + + public void execute(ReportQueueTask runnable) { + executor.execute(runnable); + } + + private void saveSystemLog(Object obj) { + saveLog(obj, true); + } + + private void saveLog(Object obj) { + saveLog(obj, false); + } + + private void saveLog(Object obj, boolean isConsoleOutput) { + if (isConsoleOutput) { + System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{QueueThreadService}} " + obj); + } + + if (logger == null) { + return; + } + + logger.log(obj); + } + + public void close() { + List unfinishedTasks = executor.shutdownNow(); + if (!unfinishedTasks.isEmpty()) { + saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size()); + } + } + } +} diff --git a/src/main/java/com/munjaon/server/server/service/ReportServerService.java b/src/main/java/com/munjaon/server/server/service/ReportServerService.java index 09cbd1e..54a4fe5 100644 --- a/src/main/java/com/munjaon/server/server/service/ReportServerService.java +++ b/src/main/java/com/munjaon/server/server/service/ReportServerService.java @@ -1,37 +1,58 @@ package com.munjaon.server.server.service; +import com.munjaon.server.queue.pool.ReportQueue; import com.munjaon.server.server.dto.ConnectUserDto; +import com.munjaon.server.server.dto.ReportDto; +import com.munjaon.server.server.dto.ReportUserDto; +import com.munjaon.server.server.packet.Header; +import com.munjaon.server.server.packet.LinkCheck; +import com.munjaon.server.server.packet.Packet; +import com.munjaon.server.server.packet.Report; +import com.munjaon.server.server.queue.ReportUserQueue; +import com.munjaon.server.server.task.ReportReadTask; import com.munjaon.server.util.LogUtil; import lombok.Getter; import org.json.simple.JSONObject; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; +import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; public class ReportServerService extends Service { private final InetSocketAddress listenAddress; - private final Map connectUserMap = new ConcurrentHashMap<>(); - private ReporterThreadService threadService; + private ReadThreadService threadService; + private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance(); private Selector selector; + private int readMaxCore; + private int queueMaxCore; +// private long CHECKED_INTEREST_TIME = System.currentTimeMillis(); +// private final long LIMIT_CYCLE_TIME = 3000; + +// private final ByteBuffer reportBuffer = ByteBuffer.allocateDirect(Header.HEADER_LENGTH + Report.REPORT_BODY_LENGTH); public ReportServerService(String serviceName, int port) { super(serviceName); + this.readMaxCore = Integer.parseInt(getProp("READ_MAX_CORE").trim()); + this.queueMaxCore = Integer.parseInt(getProp("QUEUE_MAX_CORE").trim()); this.listenAddress = new InetSocketAddress(port); } + @Override public void checkReady() { this.IS_READY_YN = true; @@ -41,7 +62,7 @@ public class ReportServerService extends Service { public void initResources() { try { initReportChannel(); - threadService = new ReporterThreadService(8, logger); + threadService = new ReadThreadService(8, logger); } catch (IOException e) { saveSystemLog(e); throw new RuntimeException(e); @@ -78,6 +99,67 @@ public class ReportServerService extends Service { @Override public void doService() { + while (isRun()) { + try { + execInterest(); + checkInterest(); + } catch (Exception e) { + saveSystemLog(e.toString()); + } + } + } + + private void checkInterest() throws IOException, InterruptedException { +// if (System.currentTimeMillis() - CHECKED_INTEREST_TIME < LIMIT_CYCLE_TIME) { +// return; +// } +// /* 체크시간 업데이트 */ +// CHECKED_INTEREST_TIME = System.currentTimeMillis(); + + Iterator keys = selector.keys().iterator(); + while (keys.hasNext()) { + SelectionKey key = keys.next(); + if (key.isValid()) { + ReportUserDto reportUserDto = (ReportUserDto) key.attachment(); + if (reportUserDto == null) { + continue; + } + SocketChannel channel = (SocketChannel) key.channel(); // 키 채널을 가져온다. + if (reportUserDto.isAlive() == 1) { + if (reportUserDto.getUserId() != null) { + reportUserQueue.removeUser(reportUserDto.getUserId()); + } + Socket socket = channel.socket(); // 소켓 취득 + channel.close(); // 소켓 채널 닫기 + socket.close(); // 소켓 닫기 + key.attach(null); // 키 닫기 + key.cancel(); + } else if (reportUserDto.isAlive() == 2) { + channel.write(LinkCheck.makeLinkCheckBuffer()); + } else { + if (reportUserDto.isLogin()) { + ReportQueue reportQueue = reportUserDto.getReportQueue(); + try { + ReportDto reportDto = reportQueue.popReportFromQueue(); + if (reportDto == null) { + saveSystemLog("reportQueue.popReportFromQueue() : null"); + continue; + } + saveSystemLog("reportQueue.popReportFromQueue() : " + reportDto.toString()); + ByteBuffer reportBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + Report.REPORT_BODY_LENGTH); + Packet.setDefaultByte(reportBuffer); + Header.putHeader(reportBuffer, Header.COMMAND_REPORT, Report.REPORT_BODY_LENGTH); + Report.putReport(reportBuffer, reportDto); + channel.write(reportBuffer); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + } else { + expireConnectUser(key); + } + } } private void execInterest() throws IOException { @@ -85,19 +167,39 @@ public class ReportServerService extends Service { return ; } Iterator keys = selector.selectedKeys().iterator(); + List> list = new ArrayList<>(); while (keys.hasNext()) { SelectionKey key = keys.next(); + /* 키 셋에서 제거. */ + keys.remove(); + if (key.isValid()) { if (key.isAcceptable()) { // 접속일 경우.. saveSystemLog("isAcceptable"); accept(selector, key); } else if (key.isReadable()) { // 수신일 경우.. saveSystemLog("isReadable"); + Future future = threadService.submit(new ReportReadTask(selector, key, logger)); + list.add(future); // threadService.submit(selector, key, 2); } + } else { + expireConnectUser(key); + } + } + for (Future future : list) { + ReportUserDto reportUserDto = null; + try { + reportUserDto = future.get(); + } catch (InterruptedException e) { + } catch (ExecutionException e) { + } + + if (reportUserDto == null) { + saveSystemLog("Future : " + future); + } else { + saveSystemLog("Future : " + reportUserDto.toString()); } - /* 키 셋에서 제거. */ - keys.remove(); } } @@ -113,42 +215,12 @@ public class ReportServerService extends Service { SocketAddress remoteAddr = socket.getRemoteSocketAddress(); saveSystemLog("Connected to: " + remoteAddr); // Socket 채널을 channel에 수신 등록한다 - channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build()); + channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).queuePath(System.getProperty("ROOTPATH") + File.separator + getProp("QUEUE_PATH")).build()); } catch (Exception e) { throw new RuntimeException(e); } } -// private void read(Selector selector, SelectionKey key) { -// try { -// saveSystemLog("read : " + Thread.currentThread().getName()); -// // 키 채널을 가져온다. -// SocketChannel channel = (SocketChannel) key.channel(); -// ConnectUserDto userDto = (ConnectUserDto) key.attachment(); -// -// int size = -1; -// ByteBuffer buffer = ByteBuffer.allocate(256); -// try { -// size = channel.read(buffer); -// } catch (IOException e) {} -// if (size < 0) { -// expireConnectUser(key); -// } else if (size > 0) { -// String command = Header.getCommand(buffer); -// saveSystemLog("command : " + command); -// switch (Integer.parseInt(command)) { -// case 1 : recvBind(channel, buffer, userDto); break; -// case 6 : recvDeliver(channel, buffer, userDto); break; -// case 8 : recvLinkCheck(key); break; -// default: expireConnectUser(key); break; -// } -// } -// } catch (Exception e) { -// e.printStackTrace(); -// } -// System.out.println("read"); -// } - private void expireConnectUser(SelectionKey key) { if (key == null || !key.isValid()) { return; @@ -157,7 +229,8 @@ public class ReportServerService extends Service { SocketChannel channel = (SocketChannel) key.channel(); ConnectUserDto userDto = (ConnectUserDto) key.attachment(); if (userDto != null && userDto.getUserId() != null) { - connectUserMap.remove(userDto.getUserId()); + reportUserQueue.removeUser(userDto.getUserId()); +// connectUserMap.remove(userDto.getUserId()); key.attach(null); } // 소켓 채널 닫기 @@ -174,27 +247,26 @@ public class ReportServerService extends Service { return null; } - private static class ReporterThreadService { + private static class ReadThreadService { @Getter private final int maxCore; private final ExecutorService executor; - private final Map connectUserMap = new ConcurrentHashMap<>(); + private ReportUserQueue reportUserQueue = ReportUserQueue.getInstance(); private final LogUtil logger; - public ReporterThreadService(LogUtil logger) { + public ReadThreadService(LogUtil logger) { this(Runtime.getRuntime().availableProcessors(), logger); } - public ReporterThreadService(int maxCore, LogUtil logger) { + public ReadThreadService(int maxCore, LogUtil logger) { this.maxCore = maxCore; this.executor = Executors.newFixedThreadPool(maxCore); this.logger = logger; } - private void write(Selector selector, SelectionKey key) { - System.out.println("write"); + public Future submit(ReportReadTask reportReadTask) { + return executor.submit(reportReadTask); } - private void saveSystemLog(Object obj) { saveLog(obj, true); } @@ -217,7 +289,6 @@ public class ReportServerService extends Service { public void close() { List unfinishedTasks = executor.shutdownNow(); - connectUserMap.clear(); if (!unfinishedTasks.isEmpty()) { saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size()); } diff --git a/src/main/java/com/munjaon/server/server/task/CollectReadTask.java b/src/main/java/com/munjaon/server/server/task/CollectReadTask.java new file mode 100644 index 0000000..c72fdef --- /dev/null +++ b/src/main/java/com/munjaon/server/server/task/CollectReadTask.java @@ -0,0 +1,269 @@ +package com.munjaon.server.server.task; + +import com.munjaon.server.cache.dto.MemberDto; +import com.munjaon.server.cache.enums.CacheService; +import com.munjaon.server.cache.service.MemberService; +import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.enums.QueueTypeWorker; +import com.munjaon.server.server.dto.ConnectUserDto; +import com.munjaon.server.server.packet.*; +import com.munjaon.server.server.queue.CollectUserQueue; +import com.munjaon.server.util.LogUtil; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Callable; + +public class CollectReadTask implements Callable { + public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]"); + public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]"; + + private Selector selector; + private SelectionKey key; + private CollectUserQueue collectUserQueue = CollectUserQueue.getInstance(); + private ConnectUserDto connectUserDto; + private String serviceType; + private final LogUtil logger; + + public CollectReadTask(Selector selector, SelectionKey key, String serviceType, LogUtil logger) { + this.selector = selector; + this.key = key; + this.connectUserDto = (ConnectUserDto) key.attachment(); + this.serviceType = serviceType; + this.logger = logger; + } + + @Override + public ConnectUserDto call() throws Exception { + int size = -1; + try { + SocketChannel channel = (SocketChannel) key.channel(); + /* 1. Head 읽기 */ + ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH); + try { + size = channel.read(headBuffer); + } catch (IOException e) {} + /* 2. Body 읽기 */ + if (size > 0) { +// Packet.printBuffer(headBuffer); + String command = Header.getCommand(headBuffer); + switch (Integer.parseInt(command)) { + case 1 : recvBind(channel, headBuffer); break; + case 3 : recvDeliver(channel, headBuffer); break; + case 7 : recvLinkCheck(channel); break; + default: expireConnectUser(); break; + } + } else { + expireConnectUser(); + } + } catch (Exception e) { + size = -1; + e.printStackTrace(); + } + + /* 읽은 데이터가 없는 경우 command -1 */ + if (size <= 0) { + connectUserDto.setCommand(-1); + } + + return connectUserDto; + } + + private void recvDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException { + switch (this.serviceType) { + case "SMS": + recvSmsDeliver(channel, headBuffer); + break; + case "LMS": + recvLmsDeliver(channel, headBuffer); + break; + case "MMS": + recvMmsDeliver(channel, headBuffer); + break; + case "KAT": + recvKatDeliver(channel, headBuffer); + break; + case "KFT": + recvKftDeliver(channel, headBuffer); + break; + default:break; + } + } + + public BasicMessageDto recvCommonMessage(ByteBuffer deliverBuffer) { + if (deliverBuffer == null) { + return null; + } + BasicMessageDto messageDto = new BasicMessageDto(); + messageDto.setRouterSeq("40"); + messageDto.setServiceType("4"); + messageDto.setUserId(connectUserDto.getUserId()); + messageDto.setRemoteIP(connectUserDto.getRemoteIP()); + messageDto.setSendStatus("0"); + messageDto.setUserMsgID(CommonMessage.getMessageIdForDeliver(deliverBuffer)); + messageDto.setUserSender(CommonMessage.getSenderForDeliver(deliverBuffer)); + messageDto.setUserReceiver(CommonMessage.getReceiverForDeliver(deliverBuffer)); + messageDto.setReserveDt(CommonMessage.getReserveTimeForDeliver(deliverBuffer)); + messageDto.setRequestDt(CommonMessage.getRequestTimeForDeliver(deliverBuffer)); + messageDto.setUnitCost("10.4"); + + return messageDto; + } + private void recvSmsDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException { + try { + ByteBuffer bodyBuffer = ByteBuffer.allocate(SmsMessage.DELIVER_SMS_BODY_LENGTH); + channel.read(bodyBuffer); + ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + SmsMessage.DELIVER_SMS_BODY_LENGTH); + Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer); + +// Packet.printBuffer(deliverBuffer); + BasicMessageDto messageDto = recvCommonMessage(deliverBuffer); + messageDto.setUserMessage(SmsMessage.getMessageForDeliver(deliverBuffer)); + System.out.println("BasicMessageDto : " + messageDto.toString()); + QueueTypeWorker worker = QueueTypeWorker.find("SMS"); + if (worker != null) { + worker.pushQueue(messageDto); + channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void recvLmsDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException { + try { + ByteBuffer bodyBuffer = ByteBuffer.allocate(LmsMessage.DELIVER_LMS_BODY_LENGTH); + channel.read(bodyBuffer); + ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + LmsMessage.DELIVER_LMS_BODY_LENGTH); + Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer); + + BasicMessageDto messageDto = recvCommonMessage(deliverBuffer); + messageDto.setUserSubject(LmsMessage.getSubjectForDeliver(deliverBuffer)); + messageDto.setUserMessage(LmsMessage.getMessageForDeliver(deliverBuffer)); + + QueueTypeWorker worker = QueueTypeWorker.find("LMS"); + if (worker != null) { + worker.pushQueue(messageDto); + channel.write(LmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void recvMmsDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException { + + } + + private void recvKatDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException { + + } + + private void recvKftDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException { + + } + + private void recvLinkCheck(SocketChannel channel) throws IOException { + ByteBuffer bodyBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_ACK_BODY_LENGTH); + channel.read(bodyBuffer); +// SocketChannel channel = (SocketChannel) key.channel(); + channel.write(LinkCheck.makeLinkCheckAckBuffer()); + } + + private void recvBind(SocketChannel channel, ByteBuffer headBuffer) { + String resultCode = "00"; + try { + ByteBuffer bindBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + Bind.BIND_BODY_LENGTH); + ByteBuffer bodyBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH); + channel.read(bodyBuffer); + Packet.mergeBuffers(bindBuffer, headBuffer, bodyBuffer); + + String id = Bind.getBindId(bindBuffer); + String pwd = Bind.getBindPwd(bindBuffer); + saveSystemLog("Bind id : " + id); + saveSystemLog("Bind pwd : " + pwd); + if (id == null || pwd == null) { + resultCode = "50"; + } else { + if (collectUserQueue.isExist(this.serviceType, id)) { + resultCode = "60"; + } else { + MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService(); + MemberDto memberDto = null; + if (svc != null) { + memberDto = svc.get(id); + } + if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) { + resultCode = "20"; + } else { + connectUserDto.setUserId(id); + connectUserDto.setLogin(true); + connectUserDto.setMemberDto(memberDto); + /* 세션통신 시간 업데이트 */ + connectUserDto.updateLastTrafficTime(); + } + } + } + } catch (Exception e) { + resultCode = "10"; + e.printStackTrace(); + } + + try { + saveSystemLog("Bind ResultCode : " + resultCode); + channel.write(Bind.makeBindAckBuffer(resultCode)); + if ("00".equals(resultCode) == false) { + expireConnectUser(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void expireConnectUser() { + if (key == null || !key.isValid()) { + return; + } + try { + SocketChannel channel = (SocketChannel) key.channel(); + if (connectUserDto != null) { + if (connectUserDto.getUserId() != null) { + collectUserQueue.removeUser(connectUserDto.getServiceType(), connectUserDto.getUserId()); + } + key.attach(null); + } + // 소켓 채널 닫기 + channel.close(); + // 키 닫기 + key.cancel(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void saveSystemLog(Object obj) { + saveLog(obj, true); + } + + private void saveLog(Object obj) { + saveLog(obj, false); + } + + private void saveLog(Object obj, boolean isConsoleOutput) { + if (isConsoleOutput) { + System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{COLLECT_READ_TASK}} " + obj); + } + + if (logger == null) { + return; + } + + logger.log(obj); + } +} diff --git a/src/main/java/com/munjaon/server/server/task/ReportQueueTask.java b/src/main/java/com/munjaon/server/server/task/ReportQueueTask.java new file mode 100644 index 0000000..37ff10b --- /dev/null +++ b/src/main/java/com/munjaon/server/server/task/ReportQueueTask.java @@ -0,0 +1,85 @@ +package com.munjaon.server.server.task; + +import com.munjaon.server.cache.enums.CacheService; +import com.munjaon.server.cache.service.ReportService; +import com.munjaon.server.queue.pool.ReportQueue; +import com.munjaon.server.server.dto.ReportDto; +import com.munjaon.server.server.dto.ReportUserDto; +import com.munjaon.server.util.LogUtil; + +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ReportQueueTask implements Runnable { + public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]"); + public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]"; + + private ReportUserDto reportUserDto; + private final LogUtil logger; + + public ReportQueueTask(ReportUserDto reportUserDto, LogUtil logger) { + this.reportUserDto = reportUserDto; + this.logger = logger; + } + + @Override + public void run() { + if (reportUserDto == null || reportUserDto.getUserId() == null) { + return; + } + if (reportUserDto.getReportQueue() == null || reportUserDto.getReportQueue().isOpen() == false) { + return; + } + + ReportQueue reportQueue = reportUserDto.getReportQueue(); + ReportService reportService = (ReportService) CacheService.REPORT_SERVICE.getService(); + List list = reportService.getReportListForUser(reportUserDto.getUserId()); + if (list == null || list.isEmpty()) { + return; + } + + StringBuilder builder = new StringBuilder(); + for (ReportDto dto : list) { + try { + if (builder.isEmpty()) { + builder.append(dto.getMsgId()); + } else { + builder.append(",").append(dto.getMsgId()); + } + saveSystemLog("reportDto : " + dto.toString()); + reportQueue.pushReportToQueue(dto); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + Map reqMap = new HashMap<>(); + reqMap.put("userId", reportUserDto.getUserId()); + reqMap.put("msgId", builder.toString()); + reportService.deleteBulkReport(reqMap); + } + + private void saveSystemLog(Object obj) { + saveLog(obj, true); + } + + private void saveLog(Object obj) { + saveLog(obj, false); + } + + private void saveLog(Object obj, boolean isConsoleOutput) { + if (isConsoleOutput) { + System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{REPORT_QUEUE_TASK}} " + obj); + } + + if (logger == null) { + return; + } + + logger.log(obj); + } +} diff --git a/src/main/java/com/munjaon/server/server/task/ReportReadTask.java b/src/main/java/com/munjaon/server/server/task/ReportReadTask.java new file mode 100644 index 0000000..416f823 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/task/ReportReadTask.java @@ -0,0 +1,199 @@ +package com.munjaon.server.server.task; + +import com.munjaon.server.cache.dto.MemberDto; +import com.munjaon.server.cache.enums.CacheService; +import com.munjaon.server.cache.service.MemberService; +import com.munjaon.server.queue.pool.ReportQueue; +import com.munjaon.server.server.dto.ReportUserDto; +import com.munjaon.server.server.packet.*; +import com.munjaon.server.server.queue.ReportUserQueue; +import com.munjaon.server.util.LogUtil; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.text.SimpleDateFormat; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.concurrent.Callable; + +public class ReportReadTask implements Callable { + public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]"); + public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]"; + + private Selector selector; + private SelectionKey key; + private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance(); + private ReportUserDto reportUserDto; + private final LogUtil logger; + + public ReportReadTask(Selector selector, SelectionKey key, LogUtil logger) { + this.selector = selector; + this.key = key; + this.reportUserDto = (ReportUserDto) key.attachment(); + this.logger = logger; + } + + @Override + public ReportUserDto call() throws Exception { + int size = -1; + try { + SocketChannel channel = (SocketChannel) key.channel(); + /* 1. Head 읽기 */ + ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH); + try { + size = channel.read(headBuffer); + } catch (IOException e) {} + /* 2. Body 읽기 */ + if (size > 0) { + String command = Header.getCommand(headBuffer); + switch (Integer.parseInt(command)) { + case 1 : recvBind(channel, headBuffer); break; + case 6 : recvReport(channel, headBuffer); break; + case 8 : recvLinkCheck(channel, headBuffer); break; + default: expireConnectUser(); break; + } + } else { + expireConnectUser(); + } + } catch (Exception e) { + size = -1; + e.printStackTrace(); + } + + /* 읽은 데이터가 없는 경우 command -1 */ + if (size <= 0) { + reportUserDto.setCommand(-1); + } + + return reportUserDto; + } + + private void recvLinkCheck(SocketChannel channel, ByteBuffer headBuffer) { + try { + ByteBuffer bodyBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_ACK_BODY_LENGTH); + int size = channel.read(bodyBuffer); + if (size > 0) { + saveSystemLog("Recv link check"); + reportUserDto.updateLastTrafficTime(); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void recvReport(SocketChannel channel, ByteBuffer headBuffer) { + try { + ByteBuffer bodyBuffer = ByteBuffer.allocate(Report.REPORT_ACK_BODY_LENGTH); + saveSystemLog("recv report"); + int size = channel.read(bodyBuffer); + if (size > 0) { + ReportQueue reportQueue = reportUserDto.getReportQueue(); + reportUserDto.updateLastTrafficTime(); + if (reportQueue != null) { + reportQueue.addReadCounter(); + } + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + private void recvBind(SocketChannel channel, ByteBuffer headBuffer) { + String resultCode = "00"; + try { + ByteBuffer bindBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + Bind.BIND_BODY_LENGTH); + ByteBuffer bodyBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH); + channel.read(bodyBuffer); + Packet.mergeBuffers(bindBuffer, headBuffer, bodyBuffer); + + String id = Bind.getBindId(bindBuffer); + String pwd = Bind.getBindPwd(bindBuffer); + saveSystemLog("Bind id : " + id); + saveSystemLog("Bind pwd : " + pwd); + if (id == null || pwd == null) { + resultCode = "50"; + } else { + if (reportUserQueue.isExist(id)) { + resultCode = "60"; + } else { + MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService(); + MemberDto memberDto = null; + if (svc != null) { + memberDto = svc.get(id); + } + if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) { + resultCode = "20"; + } else { + reportUserDto.setUserId(id); + reportUserDto.setLogin(true); + reportUserDto.setMemberDto(memberDto); + /* 리포트 큐 생성 */ + ReportQueue reportQueue = new ReportQueue(reportUserDto.getQueuePath(), reportUserDto.getUserId()); + reportUserDto.setReportQueue(reportQueue); + /* 사용자 Pool에 저장 */ + reportUserQueue.putUser(reportUserDto); + /* 세션통신 시간 업데이트 */ + reportUserDto.updateLastTrafficTime(); + } + } + } + } catch (Exception e) { + resultCode = "10"; + e.printStackTrace(); + } + + try { + saveSystemLog("Bind ResultCode : " + resultCode); + channel.write(Bind.makeBindAckBuffer(resultCode)); + if ("00".equals(resultCode) == false) { + expireConnectUser(); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void expireConnectUser() { + if (key == null || !key.isValid()) { + return; + } + try { + SocketChannel channel = (SocketChannel) key.channel(); + if (reportUserDto != null) { + if (reportUserDto.getUserId() != null) { + reportUserQueue.removeUser(reportUserDto.getUserId()); + } + key.attach(null); + } + // 소켓 채널 닫기 + channel.close(); + // 키 닫기 + key.cancel(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void saveSystemLog(Object obj) { + saveLog(obj, true); + } + + private void saveLog(Object obj) { + saveLog(obj, false); + } + + private void saveLog(Object obj, boolean isConsoleOutput) { + if (isConsoleOutput) { + System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{REPORT_READ_TASK}} " + obj); + } + + if (logger == null) { + return; + } + + logger.log(obj); + } +} diff --git a/src/main/java/com/munjaon/server/util/MessageUtil.java b/src/main/java/com/munjaon/server/util/MessageUtil.java index d7df8d3..91ea675 100644 --- a/src/main/java/com/munjaon/server/util/MessageUtil.java +++ b/src/main/java/com/munjaon/server/util/MessageUtil.java @@ -1,10 +1,9 @@ package com.munjaon.server.util; -import com.munjaon.server.queue.config.BodyCommonConfig; -import com.munjaon.server.queue.config.MediaBodyConfig; -import com.munjaon.server.queue.config.QueueHeaderConfig; -import com.munjaon.server.queue.config.SmsBodyConfig; +import com.munjaon.server.config.ServiceCode; +import com.munjaon.server.queue.config.*; import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.server.dto.ReportDto; import java.nio.ByteBuffer; import java.time.LocalDateTime; @@ -114,11 +113,11 @@ public final class MessageUtil { } public static int calcWritePosition(int pushCounter, int dataByteLength) { - return pushCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + pushCounter + dataByteLength); + return pushCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + pushCounter * dataByteLength); } public static int calcReadPosition(int popCounter, int dataByteLength) { - return popCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + popCounter + dataByteLength); + return popCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + popCounter * dataByteLength); } public static void setBytesForCommonMessage(ByteBuffer buffer, BasicMessageDto messageDto) { @@ -258,6 +257,23 @@ public final class MessageUtil { buffer.put(messageDto.getUserMessage().getBytes()); } + public static void getBytesForMediaMessage(ByteBuffer buffer, BasicMessageDto messageDto) { + byte[] destArray = null; + if (buffer == null || messageDto == null) { + return; + } + /* 14. 제목 */ + buffer.position(MediaBodyConfig.SUBJECT_BYTE_POSITION); + destArray = new byte[MediaBodyConfig.SUBJECT_BYTE_LENGTH]; + buffer.get(destArray); + messageDto.setUserSubject(new String(destArray)); + /* 15. 메시지 */ + buffer.position(MediaBodyConfig.MEDIA_MSG_BYTE_POSITION); + destArray = new byte[MediaBodyConfig.MEDIA_MSG_BYTE_LENGTH]; + buffer.get(destArray); + messageDto.setUserMessage(new String(destArray)); + } + public static void setBytesForMmsMessage(ByteBuffer buffer, BasicMessageDto messageDto) { /* 16. 파일카운트 */ buffer.position(MediaBodyConfig.FILECNT_BYTE_POSITION); @@ -278,4 +294,137 @@ public final class MessageUtil { buffer.put(messageDto.getUserFileName03().getBytes()); } } + + public static void getBytesForMmsMessage(ByteBuffer buffer, BasicMessageDto messageDto) { + byte[] destArray = null; + if (buffer == null || messageDto == null) { + return; + } + /* 16. 파일카운트 */ + buffer.position(MediaBodyConfig.FILECNT_BYTE_POSITION); + destArray = new byte[MediaBodyConfig.FILECNT_BYTE_LENGTH]; + buffer.get(destArray); + messageDto.setUserFileCnt(Integer.parseInt(new String(destArray))); + /* 17. 파일명 #1 */ + buffer.position(MediaBodyConfig.FILENAME_ONE_BYTE_POSITION); + destArray = new byte[MediaBodyConfig.FILENAME_ONE_BYTE_LENGTH]; + buffer.get(destArray); + messageDto.setUserFileName01(new String(destArray)); + /* 18. 파일명 #2 */ + buffer.position(MediaBodyConfig.FILENAME_TWO_BYTE_POSITION); + destArray = new byte[MediaBodyConfig.FILENAME_TWO_BYTE_LENGTH]; + buffer.get(destArray); + messageDto.setUserFileName02(new String(destArray)); + /* 19. 파일명 #3 */ + buffer.position(MediaBodyConfig.FILENAME_THREE_BYTE_POSITION); + destArray = new byte[MediaBodyConfig.FILENAME_THREE_BYTE_LENGTH]; + buffer.get(destArray); + messageDto.setUserFileName03(new String(destArray)); + } + + public static int isValidateMessageForReport(ReportDto reportDto) { + if (reportDto == null) { + return ServiceCode.MSG_ERROR_REPORT.getCode(); + } + /* MSG_ID (Length : 20 / Position : 0) */ + if (reportDto.getAgentMsgId() == null || reportDto.getAgentMsgId().trim().isEmpty()) { + return ServiceCode.MSG_ERROR_REPORT_MSG_ID.getCode(); + } + if (reportDto.getAgentMsgId().trim().length() > ReportConfig.MSG_ID_LENGTH) { + return ServiceCode.MSG_ERROR_REPORT_MSG_ID.getCode(); + } + /* AGENT_CODE (Length : 2 / Position : 20) */ + if (reportDto.getAgentCode() == null || reportDto.getAgentCode().trim().isEmpty()) { + return ServiceCode.MSG_ERROR_REPORT_AGENT_CODE.getCode(); + } + if (reportDto.getAgentCode().trim().length() > ReportConfig.AGENT_CODE_LENGTH) { + return ServiceCode.MSG_ERROR_REPORT_AGENT_CODE.getCode(); + } + /* SEND_TIME (Length : 14 / Position : 22) */ + if (reportDto.getRsltDate() == null || reportDto.getRsltDate().trim().isEmpty()) { + return ServiceCode.MSG_ERROR_REPORT_SEND_TIME.getCode(); + } + if (reportDto.getRsltDate().trim().length() > ReportConfig.SEND_TIME_LENGTH) { + return ServiceCode.MSG_ERROR_REPORT_SEND_TIME.getCode(); + } + /* TELECOM (Length : 3 / Position : 36) */ + if (reportDto.getRsltNet() == null || reportDto.getRsltNet().trim().isEmpty()) { + return ServiceCode.MSG_ERROR_REPORT_TELECOM.getCode(); + } + if (reportDto.getRsltNet().trim().length() > ReportConfig.TELECOM_LENGTH) { + return ServiceCode.MSG_ERROR_REPORT_TELECOM.getCode(); + } + /* RESULT (Length : 5 / Position : 39) */ + if (reportDto.getRsltCode() == null || reportDto.getRsltCode().trim().isEmpty()) { + return ServiceCode.MSG_ERROR_REPORT_RESULT.getCode(); + } + if (reportDto.getRsltCode().trim().length() > ReportConfig.RESULT_LENGTH) { + return ServiceCode.MSG_ERROR_REPORT_RESULT.getCode(); + } + + return ServiceCode.OK.getCode(); + } + + public static int calcWritePositionForReport(int pushCounter, int dataByteLength) { + return pushCounter < 0 ? ReportConfig.HEAD_LENGTH : (ReportConfig.HEAD_LENGTH + pushCounter * dataByteLength); + } + + public static void setBytesForReport(ByteBuffer buffer, ReportDto reportDto) { + if (buffer == null || reportDto == null) { + return; + } + + byte[] destArray = null; + /* MSG_ID (Length : 20 / Position : 0) */ + buffer.position(ReportConfig.MSG_ID_POSITION); + buffer.put(reportDto.getAgentMsgId().getBytes()); + /* AGENT_CODE (Length : 2 / Position : 20) */ + buffer.position(ReportConfig.AGENT_CODE_POSITION); + buffer.put(reportDto.getAgentCode().getBytes()); + /* SEND_TIME (Length : 14 / Position : 22) */ + buffer.position(ReportConfig.SEND_TIME_POSITION); + buffer.put(reportDto.getRsltDate().getBytes()); + /* TELECOM (Length : 3 / Position : 36) */ + buffer.position(ReportConfig.TELECOM_POSITION); + buffer.put(reportDto.getRsltNet().getBytes()); + /* RESULT (Length : 5 / Position : 39) */ + buffer.position(ReportConfig.RESULT_POSITION); + buffer.put(reportDto.getRsltCode().getBytes()); + } + + public static ReportDto getReportFromBuffer(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + byte[] destArray = null; + ReportDto reportDto = new ReportDto(); + /* MSG_ID (Length : 20 / Position : 0) */ + buffer.position(ReportConfig.MSG_ID_POSITION); + destArray = new byte[ReportConfig.MSG_ID_LENGTH]; + buffer.get(destArray); + reportDto.setMsgId(new String(destArray).trim()); + /* AGENT_CODE (Length : 2 / Position : 20) */ + buffer.position(ReportConfig.AGENT_CODE_POSITION); + destArray = new byte[ReportConfig.AGENT_CODE_LENGTH]; + buffer.get(destArray); + reportDto.setAgentCode(new String(destArray).trim()); + /* SEND_TIME (Length : 14 / Position : 22) */ + buffer.position(ReportConfig.SEND_TIME_POSITION); + destArray = new byte[ReportConfig.SEND_TIME_LENGTH]; + buffer.get(destArray); + reportDto.setRsltDate(new String(destArray).trim()); + /* TELECOM (Length : 3 / Position : 36) */ + buffer.position(ReportConfig.TELECOM_POSITION); + destArray = new byte[ReportConfig.TELECOM_LENGTH]; + buffer.get(destArray); + reportDto.setRsltNet(new String(destArray).trim()); + /* RESULT (Length : 5 / Position : 39) */ + buffer.position(ReportConfig.RESULT_POSITION); + destArray = new byte[ReportConfig.RESULT_LENGTH]; + buffer.get(destArray); + reportDto.setRsltCode(new String(destArray).trim()); + + return reportDto; + } } diff --git a/src/main/resources/sqlmap/lms_sql.xml b/src/main/resources/sqlmap/lms_sql.xml new file mode 100644 index 0000000..4de94a2 --- /dev/null +++ b/src/main/resources/sqlmap/lms_sql.xml @@ -0,0 +1,21 @@ + + + + + INSERT INTO MJ_MSG_DATA ( + MSG_ID + , MSG_GROUP_ID + , USER_ID + , AGENT_MSG_ID + , AGENT_CODE + , CUR_STATE + , REQ_DATE + , CALL_TO + , CALL_FROM + , SUBJECT + , SMS_TXT + , MSG_TYPE + , CONT_SEQ, FILE_CNT + ) VALUES (#{id}, #{msgGroupID}, #{userId}, #{userMsgID}, '04', 0, NOW(), #{userReceiver}, #{userSender}, #{userSubject}, #{userMessage}, '6', null, '0' ) + + \ No newline at end of file diff --git a/src/main/resources/sqlmap/report_sql.xml b/src/main/resources/sqlmap/report_sql.xml index a96b61d..ff74472 100644 --- a/src/main/resources/sqlmap/report_sql.xml +++ b/src/main/resources/sqlmap/report_sql.xml @@ -9,14 +9,44 @@ , AGENT_MSG_ID , AGENT_CODE , MSG_TYPE - , RSLT_DATE + , CASE WHEN RSLT_DATE IS NULL THEN DATE_FORMAT(NOW(), '%Y%m%d%H%i%S') ELSE DATE_FORMAT(RSLT_DATE, '%Y%m%d%H%i%S') END AS RSLT_DATE , RSLT_CODE , RSLT_NET FROM mj_msg_report WHERE USER_ID = #{userId} LIMIT 1 + + + DELETE FROM mj_msg_report WHERE MSG_ID = #{msgId} + + + DELETE SRC FROM mj_msg_report SRC + INNER JOIN ( + with recursive + T as ( select #{msgId} as items), + N as ( select 1 as n union select n + 1 from N, T + where n length(items) - length(replace(items, ',', ''))) + select distinct substring_index(substring_index(items, ',', n), ',', -1) + MSG_ID from N, T + ) DEST ON SRC.MSG_ID = DEST.MSG_ID + WHERE SRC.USER_ID = #{userId} + \ No newline at end of file