Report 기능 추가, LMS 기능 추가, MMS 기능 테스트중

This commit is contained in:
dsjang 2024-07-22 03:29:59 +09:00
parent 08277e679d
commit 2679d17629
37 changed files with 2632 additions and 324 deletions

View File

@ -3,8 +3,13 @@ package com.munjaon.server.cache.mapper;
import com.munjaon.server.server.dto.ReportDto; import com.munjaon.server.server.dto.ReportDto;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
import java.util.List;
import java.util.Map;
@Mapper @Mapper
public interface ReportMapper { public interface ReportMapper {
ReportDto getReportForUser(String userId); ReportDto getReportForUser(String userId);
List<ReportDto> getReportListForUser(String userId);
int deleteReport(String msgId); int deleteReport(String msgId);
int deleteBulkReport(Map<String, String> reqMap);
} }

View File

@ -6,6 +6,10 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j @Slf4j
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@ -16,7 +20,23 @@ public class ReportService {
return reportMapper.getReportForUser(userId); return reportMapper.getReportForUser(userId);
} }
public List<ReportDto> getReportListForUser(String userId) {
return reportMapper.getReportListForUser(userId);
}
public int deleteReport(String msgId) { public int deleteReport(String msgId) {
return reportMapper.deleteReport(msgId); return reportMapper.deleteReport(msgId);
} }
public int deleteBulkReport(String msgId, String userId) {
Map<String, String> reqMap = new HashMap<>();
reqMap.put("msgId", msgId);
reqMap.put("userId", userId);
return deleteBulkReport(reqMap);
}
public int deleteBulkReport(Map<String, String> reqMap) {
return reportMapper.deleteBulkReport(reqMap);
}
} }

View File

@ -1,11 +1,8 @@
package com.munjaon.server.config; package com.munjaon.server.config;
import com.munjaon.server.queue.dto.QueueInfo; import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.queue.pool.SmsReadQueue; import com.munjaon.server.queue.pool.*;
import com.munjaon.server.queue.pool.SmsWriteQueue; import com.munjaon.server.server.service.*;
import com.munjaon.server.server.service.CollectServerService;
import com.munjaon.server.server.service.PropertyLoader;
import com.munjaon.server.server.service.QueueServerService;
import com.munjaon.server.util.ServiceUtil; import com.munjaon.server.util.ServiceUtil;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -67,6 +64,60 @@ public class RunnerConfiguration {
return args -> System.out.println("Runner Bean #2"); return args -> System.out.println("Runner Bean #2");
} }
@Bean
@Order(2)
public CommandLineRunner getRunnerBeanForLmsQueue() {
try {
String[] svcArray = ServiceUtil.getServiceNames(serverConfig.getStringArray("LMS.SERVICE_LIST"));
if (svcArray == null || svcArray.length == 0) {
log.info("LMS service list is empty");
} else {
if (ServiceUtil.isDuplicate(svcArray)) {
log.info("LMS service list is duplicated");
} else {
for (String svc : svcArray) {
log.info("SERVICE CREATE : {}", svc);
QueueInfo queueInfo = QueueInfo.builder().queueName(svc).serviceType("LMS").queuePath(serverConfig.getString("LMS.QUEUE_PATH")).build();
LmsWriteQueue lmsWriteQueue = new LmsWriteQueue(queueInfo);
LmsReadQueue lmsReadQueue = new LmsReadQueue(queueInfo);
QueueServerService queueServerService = new QueueServerService(svc, lmsWriteQueue, lmsReadQueue);
queueServerService.start();
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return args -> System.out.println("Runner Bean #2");
}
@Bean
@Order(2)
public CommandLineRunner getRunnerBeanForMmsQueue() {
try {
String[] svcArray = ServiceUtil.getServiceNames(serverConfig.getStringArray("MMS.SERVICE_LIST"));
if (svcArray == null || svcArray.length == 0) {
log.info("MMS service list is empty");
} else {
if (ServiceUtil.isDuplicate(svcArray)) {
log.info("MMS service list is duplicated");
} else {
for (String svc : svcArray) {
log.info("SERVICE CREATE : {}", svc);
QueueInfo queueInfo = QueueInfo.builder().queueName(svc).serviceType("MMS").queuePath(serverConfig.getString("MMS.QUEUE_PATH")).build();
MmsWriteQueue mmsWriteQueue = new MmsWriteQueue(queueInfo);
MmsReadQueue mmsReadQueue = new MmsReadQueue(queueInfo);
QueueServerService queueServerService = new QueueServerService(svc, mmsWriteQueue, mmsReadQueue);
queueServerService.start();
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return args -> System.out.println("Runner Bean #2");
}
@Bean @Bean
@Order(3) @Order(3)
public CommandLineRunner getRunnerBeanForSmsCollector() { public CommandLineRunner getRunnerBeanForSmsCollector() {
@ -74,11 +125,55 @@ public class RunnerConfiguration {
String serviceName = "SMS_COLLECTOR"; String serviceName = "SMS_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
CollectServerService collectServerService = new CollectServerService(serviceName, serviceType, port); CollectServerService collectServerService = new CollectServerService(serviceName, serviceType, port);
collectServerService.start(); collectServerService.start();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return args -> System.out.println("Runner Bean #2"); return args -> System.out.println("Runner Bean SmsCollector #3");
}
@Bean
@Order(3)
public CommandLineRunner getRunnerBeanForLmsCollector() {
try {
String serviceName = "LMS_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
CollectServerService collectServerService = new CollectServerService(serviceName, serviceType, port);
collectServerService.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
return args -> System.out.println("Runner Bean SmsCollector #3");
}
@Bean
@Order(3)
public CommandLineRunner getRunnerBeanForReporter() {
try {
String serviceName = "REPORTER";
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
ReportServerService reportServerService = new ReportServerService(serviceName, port);
reportServerService.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
return args -> System.out.println("Runner Bean Reporter #3");
}
@Bean
@Order(4)
public CommandLineRunner getRunnerBeanForReportQueue() {
try {
String serviceName = "REPORT_QUEUE";
ReportQueueServerService reportQueueServerService = new ReportQueueServerService(serviceName);
reportQueueServerService.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
return args -> System.out.println("Runner Bean ReporterQueue #4");
} }
} }

View File

@ -25,6 +25,15 @@ public enum ServiceCode {
MSG_ERROR_MEDIA_SUBJECT(500, "LMS, MMS Subject is Null OR is Over Limit Byte"), MSG_ERROR_MEDIA_SUBJECT(500, "LMS, MMS Subject is Null OR is Over Limit Byte"),
MSG_ERROR_MEDIA_MESSAGE(501, "LMS, MMS Message is Null OR is Over Limit Byte"), MSG_ERROR_MEDIA_MESSAGE(501, "LMS, MMS Message is Null OR is Over Limit Byte"),
/* 카카오 알림톡, 친구톡 메시지 */
/* 리포트 관련 에러 메시지 */
MSG_ERROR_REPORT(800, "REPORT DATA is Null OR is invalid"),
MSG_ERROR_REPORT_MSG_ID(801, "MSG_ID is Null OR is Over Limit Byte"),
MSG_ERROR_REPORT_AGENT_CODE(802, "AGENT_CODE is Null OR is Over Limit Byte"),
MSG_ERROR_REPORT_SEND_TIME(803, "SEND_TIME is Null OR is Over Limit Byte"),
MSG_ERROR_REPORT_TELECOM(804, "TELECOM is Null OR is Over Limit Byte"),
MSG_ERROR_REPORT_RESULT(805, "RESULT is Null OR is Over Limit Byte"),
ETC_ERROR(999, "ETC ERROR"); ETC_ERROR(999, "ETC ERROR");

View File

@ -0,0 +1,46 @@
package com.munjaon.server.queue.config;
public final class ReportConfig {
/* USER_ID (Length : 20 / Position : 0) */
public static final int USER_ID_LENGTH = 20;
public static final int USER_ID_POSITION = 0;
/* WRITE_COUNT (Length : 10 / Position : 20) */
public static final int WRITE_COUNT_LENGTH = 10;
public static final int WRITE_COUNT_POSITION = USER_ID_POSITION + USER_ID_LENGTH;
/* READ_COUNT (Length : 10 / Position : 30) */
public static final int READ_COUNT_LENGTH = 10;
public static final int READ_COUNT_POSITION = WRITE_COUNT_POSITION + WRITE_COUNT_LENGTH;
/* Head 길이 */
public static final int HEAD_LENGTH = READ_COUNT_POSITION + READ_COUNT_LENGTH;
/* MSG_ID (Length : 20 / Position : 0) */
public static final int MSG_ID_LENGTH = 20;
public static final int MSG_ID_POSITION = 0;
/* AGENT_CODE (Length : 2 / Position : 20) */
public static final int AGENT_CODE_LENGTH = 2;
public static final int AGENT_CODE_POSITION = MSG_ID_POSITION + MSG_ID_LENGTH;
/* SEND_TIME (Length : 14 / Position : 22) */
public static final int SEND_TIME_LENGTH = 14;
public static final int SEND_TIME_POSITION = AGENT_CODE_POSITION + AGENT_CODE_LENGTH;
/* TELECOM (Length : 3 / Position : 36) */
public static final int TELECOM_LENGTH = 3;
public static final int TELECOM_POSITION = SEND_TIME_POSITION + SEND_TIME_LENGTH;
/* RESULT (Length : 5 / Position : 39) */
public static final int RESULT_LENGTH = 5;
public static final int RESULT_POSITION = TELECOM_POSITION + TELECOM_LENGTH;
/* Report 길이 */
public static final int BODY_LENGTH = RESULT_POSITION + RESULT_LENGTH;
// 큐에 저장하기전에 바이트를 채울 문자
public static final byte SET_DEFAULT_BYTE = (byte) 0x20;
}

View File

@ -0,0 +1,7 @@
package com.munjaon.server.queue.mapper;
import com.munjaon.server.queue.dto.BasicMessageDto;
public interface LmsMapper {
int insert(BasicMessageDto messageDto);
}

View File

@ -0,0 +1,39 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.config.MediaBodyConfig;
import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.util.MessageUtil;
import java.nio.ByteBuffer;
public class LmsReadQueue extends ReadQueue {
public LmsReadQueue(QueueInfo queueInfo) throws Exception {
this.queueInfo = queueInfo;
// initQueue();
}
@Override
void popBuffer() throws Exception {
this.channel.position(MessageUtil.calcReadPosition(this.popCounter, MediaBodyConfig.LMS_SUM_BYTE_LENGTH));
this.channel.read(this.dataBuffer);
}
@Override
void getBytesForExtendMessage(BasicMessageDto messageDto) {
MessageUtil.getBytesForMediaMessage(this.dataBuffer, messageDto);
}
@Override
void initDataBuffer() {
if (this.dataBuffer == null) {
this.dataBuffer = ByteBuffer.allocateDirect(MediaBodyConfig.LMS_SUM_BYTE_LENGTH);
}
this.dataBuffer.clear();
for(int loopCnt = 0; loopCnt < MediaBodyConfig.LMS_SUM_BYTE_LENGTH; loopCnt++){
this.dataBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
}
this.dataBuffer.position(0);
}
}

View File

@ -4,11 +4,17 @@ import com.munjaon.server.config.ServiceCode;
import com.munjaon.server.queue.config.MediaBodyConfig; import com.munjaon.server.queue.config.MediaBodyConfig;
import com.munjaon.server.queue.config.QueueConstants; import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.queue.dto.BasicMessageDto; import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.util.MessageUtil; import com.munjaon.server.util.MessageUtil;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public class LmsWriteQueue extends WriteQueue { public class LmsWriteQueue extends WriteQueue {
public LmsWriteQueue(QueueInfo queueInfo) throws Exception {
this.queueInfo = queueInfo;
/* 큐초기화 */
// initQueue();
}
@Override @Override
public int isValidateMessageForExtend(BasicMessageDto messageDto) { public int isValidateMessageForExtend(BasicMessageDto messageDto) {

View File

@ -0,0 +1,40 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.config.MediaBodyConfig;
import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.util.MessageUtil;
import java.nio.ByteBuffer;
public class MmsReadQueue extends ReadQueue {
public MmsReadQueue(QueueInfo queueInfo) throws Exception {
this.queueInfo = queueInfo;
// initQueue();
}
@Override
void popBuffer() throws Exception {
this.channel.position(MessageUtil.calcReadPosition(this.popCounter, MediaBodyConfig.MMS_SUM_BYTE_LENGTH));
this.channel.read(this.dataBuffer);
}
@Override
void getBytesForExtendMessage(BasicMessageDto messageDto) {
MessageUtil.getBytesForMediaMessage(this.dataBuffer, messageDto);
MessageUtil.getBytesForMmsMessage(this.dataBuffer, messageDto);
}
@Override
void initDataBuffer() {
if (this.dataBuffer == null) {
this.dataBuffer = ByteBuffer.allocateDirect(MediaBodyConfig.MMS_SUM_BYTE_LENGTH);
}
this.dataBuffer.clear();
for(int loopCnt = 0; loopCnt < MediaBodyConfig.MMS_SUM_BYTE_LENGTH; loopCnt++){
this.dataBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
}
this.dataBuffer.position(0);
}
}

View File

@ -4,11 +4,17 @@ import com.munjaon.server.config.ServiceCode;
import com.munjaon.server.queue.config.MediaBodyConfig; import com.munjaon.server.queue.config.MediaBodyConfig;
import com.munjaon.server.queue.config.QueueConstants; import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.queue.dto.BasicMessageDto; import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.util.MessageUtil; import com.munjaon.server.util.MessageUtil;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
public class MmsWriteQueue extends WriteQueue { public class MmsWriteQueue extends WriteQueue {
public MmsWriteQueue(QueueInfo queueInfo) throws Exception {
this.queueInfo = queueInfo;
/* 큐초기화 */
// initQueue();
}
@Override @Override
public int isValidateMessageForExtend(BasicMessageDto messageDto) { public int isValidateMessageForExtend(BasicMessageDto messageDto) {

View File

@ -0,0 +1,213 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.config.ServiceCode;
import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.queue.config.ReportConfig;
import com.munjaon.server.server.dto.ReportDto;
import com.munjaon.server.util.FileUtil;
import com.munjaon.server.util.MessageUtil;
import lombok.Getter;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class ReportQueue {
private final Object lockObject = new Object(); // Lock Object
@Getter
private String queuePathFile; // 사용자 파일
@Getter
private String userId; // 사용자 아이디
@Getter
private int writeCounter = 0; // 쓰기 카운트
@Getter
private int readCounter = 0; // 읽기 카운트
private FileChannel channel = null; // Queue File Channel
private ByteBuffer headBuffer = ByteBuffer.allocateDirect(ReportConfig.HEAD_LENGTH); // Queue Head Buffer
private ByteBuffer bodyBuffer = ByteBuffer.allocateDirect(ReportConfig.BODY_LENGTH); // Queue Body Buffer
private byte[] byteArray = null;
public ReportQueue(String queuePathFile, String userId) throws Exception {
this.queuePathFile = queuePathFile;
this.userId = userId;
initQueue();
}
private void initQueue() throws Exception {
/* 2. 경로 체크 및 생성 */
FileUtil.mkdirs(this.queuePathFile);
File file = new File(this.queuePathFile + File.separator + this.userId + ".queue");
this.channel = new RandomAccessFile(file, "rw").getChannel();
if (file.length() == 0) {
this.writeCounter = 0;
this.readCounter = 0;
writeHeader();
} else {
readHeader();
}
}
public void pushReportToQueue(ReportDto reportDto) throws Exception {
synchronized (lockObject) {
if (ServiceCode.OK.getCode() != MessageUtil.isValidateMessageForReport(reportDto)) {
return;
}
/* 1. buffer 초기화 및 데이터 Set */
initBodyBuffer();
MessageUtil.setBytesForReport(this.bodyBuffer, reportDto);
/* 2. Header 정보 다시 읽기 */
readHeader();
/* 3. 파일채널에 쓰기 */
this.channel.position(MessageUtil.calcWritePositionForReport(this.writeCounter, ReportConfig.BODY_LENGTH));
this.bodyBuffer.flip();
this.channel.write(this.bodyBuffer);
/* 4 Push 카운터 증가 및 head 저장 */
this.writeCounter = this.writeCounter + 1;
writeHeader();
}
}
public ReportDto popReportFromQueue() throws Exception {
synchronized (lockObject) {
/* 1. Header 정보 다시 읽기 */
readHeader();
/* 2. 읽을 데이터가 없는지 체크 */
if (this.writeCounter <= this.readCounter) {
return null;
}
/* 3. 채널에서 읽기 */
this.bodyBuffer.clear();
this.channel.position(MessageUtil.calcWritePositionForReport(this.readCounter, ReportConfig.BODY_LENGTH));
this.channel.read(this.bodyBuffer);
return MessageUtil.getReportFromBuffer(this.bodyBuffer);
}
}
public boolean isTruncateQueue(int maxWriteCount) {
boolean truncate = false;
synchronized (lockObject) {
if (this.writeCounter >= maxWriteCount) {
if (this.writeCounter == this.readCounter) {
truncate = true;
}
}
}
return truncate;
}
public boolean isWriteLimit(int maxWriteCount) {
boolean isLimit = false;
synchronized (lockObject) {
if (this.writeCounter >= maxWriteCount) {
isLimit = true;
}
}
return isLimit;
}
public void truncateQueue() throws Exception {
synchronized (lockObject) {
if (isOpen()) {
this.channel.truncate(0);
this.writeCounter = 0;
this.readCounter = 0;
writeHeader();
}
}
}
private void readHeader() throws Exception {
synchronized (lockObject) {
initHeadBuffer();
this.channel.position(0);
this.channel.read(this.headBuffer);
this.byteArray = new byte[ReportConfig.USER_ID_LENGTH];
// USER_ID
this.headBuffer.position(ReportConfig.USER_ID_POSITION);
this.headBuffer.get(this.byteArray);
this.userId = (new String(this.byteArray)).trim();
// 쓰기 카운트 가져오기
this.byteArray = new byte[ReportConfig.WRITE_COUNT_LENGTH];
this.headBuffer.position(ReportConfig.WRITE_COUNT_POSITION);
this.headBuffer.get(this.byteArray);
this.writeCounter = Integer.parseInt((new String(this.byteArray)).trim());
// 읽기 카운트 가져오기
this.byteArray = new byte[ReportConfig.READ_COUNT_LENGTH];
this.headBuffer.position(ReportConfig.READ_COUNT_POSITION);
this.headBuffer.get(this.byteArray);
this.readCounter = Integer.parseInt((new String(this.byteArray)).trim());
}
}
public void addReadCounter() throws Exception {
synchronized (lockObject) {
/* 읽기 카운트 증가 */
this.readCounter = this.readCounter + 1;
initHeadBuffer();
this.channel.position(ReportConfig.USER_ID_POSITION);
this.headBuffer.put(this.userId.getBytes());
this.headBuffer.position(ReportConfig.WRITE_COUNT_POSITION);
this.headBuffer.put(Integer.toString(this.writeCounter).getBytes());
this.headBuffer.position(ReportConfig.READ_COUNT_POSITION);
this.headBuffer.put(Integer.toString(this.readCounter).getBytes());
this.headBuffer.flip();
this.channel.write(this.headBuffer);
}
}
private void writeHeader() throws Exception {
synchronized (lockObject) {
initHeadBuffer();
this.channel.position(ReportConfig.USER_ID_POSITION);
this.headBuffer.put(this.userId.getBytes());
this.headBuffer.position(ReportConfig.WRITE_COUNT_POSITION);
this.headBuffer.put(Integer.toString(this.writeCounter).getBytes());
this.headBuffer.position(ReportConfig.READ_COUNT_POSITION);
this.headBuffer.put(Integer.toString(this.readCounter).getBytes());
this.headBuffer.flip();
this.channel.write(this.headBuffer);
}
}
private void initHeadBuffer() {
this.headBuffer.clear();
for(int loopCnt = 0; loopCnt < ReportConfig.HEAD_LENGTH; loopCnt++){
this.headBuffer.put(ReportConfig.SET_DEFAULT_BYTE);
}
this.headBuffer.position(0);
}
public void initBodyBuffer() {
this.bodyBuffer.clear();
for(int loopCnt = 0; loopCnt < ReportConfig.BODY_LENGTH; loopCnt++){
this.bodyBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
}
this.bodyBuffer.position(0);
}
public void close() throws IOException {
try {
if (isOpen()) {
channel.close();
}
} catch(IOException e) {
throw e;
}
}
public boolean isOpen() {
if (this.channel == null) {
return false;
}
return this.channel.isOpen();
}
}

View File

@ -1,6 +1,8 @@
package com.munjaon.server.queue.service; package com.munjaon.server.queue.service;
import com.munjaon.server.cache.service.SerialNoService;
import com.munjaon.server.queue.dto.BasicMessageDto; import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.mapper.LmsMapper;
import com.munjaon.server.queue.pool.LmsMemoryQueue; import com.munjaon.server.queue.pool.LmsMemoryQueue;
import com.munjaon.server.queue.pool.LmsQueuePool; import com.munjaon.server.queue.pool.LmsQueuePool;
import com.munjaon.server.queue.pool.WriteQueue; import com.munjaon.server.queue.pool.WriteQueue;
@ -12,8 +14,10 @@ import org.springframework.stereotype.Service;
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
public class LmsQueueService implements QueueAction { public class LmsQueueService implements QueueAction {
private final LmsMapper lmsMapper;
private final LmsQueuePool queueInstance = LmsQueuePool.getInstance(); private final LmsQueuePool queueInstance = LmsQueuePool.getInstance();
private final LmsMemoryQueue memoryQueue = LmsMemoryQueue.getInstance(); private final LmsMemoryQueue memoryQueue = LmsMemoryQueue.getInstance();
private final SerialNoService serialNoService;
@Override @Override
public int getQueueSize() { public int getQueueSize() {
@ -51,7 +55,12 @@ public class LmsQueueService implements QueueAction {
@Override @Override
public int saveMessageToTable(BasicMessageDto data) { public int saveMessageToTable(BasicMessageDto data) {
return 0; String serialNo = serialNoService.getSerialNo();
String groupSerialNo = serialNo.replace("MSGID", "MGRP");
data.setId(serialNo);
data.setMsgGroupID(groupSerialNo);
log.debug("Save message to table : {}", data);
return lmsMapper.insert(data);
} }
@Override @Override

View File

@ -56,7 +56,7 @@ public class SmsQueueService implements QueueAction {
@Override @Override
public int saveMessageToTable(BasicMessageDto data) { public int saveMessageToTable(BasicMessageDto data) {
String serialNo = serialNoService.getSerialNo(); String serialNo = serialNoService.getSerialNo();
String groupSerialNo = serialNo.replace("MSGID", "MSGGID"); String groupSerialNo = serialNo.replace("MSGID", "MGRP");
data.setId(serialNo); data.setId(serialNo);
data.setMsgGroupID(groupSerialNo); data.setMsgGroupID(groupSerialNo);
log.debug("Save message to table : {}", data); log.debug("Save message to table : {}", data);

View File

@ -4,7 +4,6 @@ import com.munjaon.server.cache.service.MemberService;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@Slf4j @Slf4j
@ -20,7 +19,7 @@ public class CacheScheduleService {
/* 사용자 설정 테이블 마지막 업데이트 시간 */ /* 사용자 설정 테이블 마지막 업데이트 시간 */
private String config_last_modified_time = null; private String config_last_modified_time = null;
@Scheduled(cron="0/5 * * * * *") // @Scheduled(cron="0/5 * * * * *")
public void doService() throws Exception { public void doService() throws Exception {
doMemberService(); doMemberService();
} }

View File

@ -6,7 +6,7 @@ public final class ServerConfig {
/* 서버 연결후 로그인 만료 시간 */ /* 서버 연결후 로그인 만료 시간 */
public static final int LIMIT_BIND_TIMEOUT = 5000; public static final int LIMIT_BIND_TIMEOUT = 5000;
/* Session Check 만료 시간 */ /* Session Check 만료 시간 */
public static final int LIMIT_LINK_CHECK_TIMEOUT = 35000; public static final int LIMIT_LINK_CHECK_TIMEOUT = 10000;
/* 서버 프로퍼티 reload interval 시간 */ /* 서버 프로퍼티 reload interval 시간 */
public static final Long INTERVAL_PROPERTY_RELOAD_TIME = 3000L; public static final Long INTERVAL_PROPERTY_RELOAD_TIME = 3000L;

View File

@ -24,6 +24,7 @@ public class ConnectUserDto {
private String remoteIP; private String remoteIP;
/* 요금제(선불 : P / 후불 : A) */ /* 요금제(선불 : P / 후불 : A) */
private final String feeType = "A"; private final String feeType = "A";
private int command; //요청 command
private MemberDto memberDto; private MemberDto memberDto;

View File

@ -1,6 +1,7 @@
package com.munjaon.server.server.dto; package com.munjaon.server.server.dto;
import com.munjaon.server.cache.dto.MemberDto; import com.munjaon.server.cache.dto.MemberDto;
import com.munjaon.server.queue.pool.ReportQueue;
import com.munjaon.server.server.config.ServerConfig; import com.munjaon.server.server.config.ServerConfig;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
@ -12,16 +13,14 @@ import lombok.ToString;
@Builder @Builder
@ToString @ToString
public class ReportUserDto { public class ReportUserDto {
/* 로그인여부 */ private boolean isLogin; //로그인여부
private boolean isLogin; private Long lastTrafficTime; //마지막 통신 시간
/* 마지막 통신 시간 */ private String userId; //사용자 ID
private Long lastTrafficTime; private String remoteIP; //사용자 접속 IP
/* 사용자 ID */ private String queuePath; //저장큐경로
private String userId; private ReportQueue reportQueue; //ReportQueue
/* 사용자 접속 IP */ private MemberDto memberDto; //사용자 정보
private String remoteIP; private int command; //요청 command
private MemberDto memberDto;
public int isAlive() { public int isAlive() {
if (isLogin) { if (isLogin) {

View File

@ -23,7 +23,7 @@ public final class LinkCheck {
public static ByteBuffer makeLinkCheckAckBuffer() { public static ByteBuffer makeLinkCheckAckBuffer() {
ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + LINK_CHECK_ACK_BODY_LENGTH); ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + LINK_CHECK_ACK_BODY_LENGTH);
Packet.setDefaultByte(buffer); Packet.setDefaultByte(buffer);
Header.putHeader(buffer, Header.COMMAND_LINK_CHECK, LINK_CHECK_ACK_BODY_LENGTH); Header.putHeader(buffer, Header.COMMAND_LINK_CHECK_ACK, LINK_CHECK_ACK_BODY_LENGTH);
buffer.put(LINK_CHECK_ACK_BODY_POSITION, LINK_CHECK_ACK_VALUE.getBytes()); buffer.put(LINK_CHECK_ACK_BODY_POSITION, LINK_CHECK_ACK_VALUE.getBytes());
return buffer; return buffer;

View File

@ -0,0 +1,87 @@
package com.munjaon.server.server.packet;
import com.munjaon.server.util.CommonUtil;
import java.nio.ByteBuffer;
public final class LmsMessage {
public static final int DELIVER_LMS_BODY_LENGTH = 2119;
public static final int DELIVER_LMS_ACK_BODY_LENGTH = 21;
/* DELIVER */
/* SUBJECT */
public static final int DELIVER_SUBJECT_LENGTH = 40;
public static final int DELIVER_SUBJECT_POSITION = CommonMessage.DELIVER_MSG_TYPE_POSITION + CommonMessage.DELIVER_MSG_TYPE_LENGTH;
/* MESSAGE */
public static final int DELIVER_MESSAGE_LENGTH = 2000;
public static final int DELIVER_MESSAGE_POSITION = DELIVER_SUBJECT_POSITION + DELIVER_SUBJECT_LENGTH;
public static void putSubjectForDeliver(ByteBuffer buffer, String subject) {
if (buffer == null || subject == null) {
return;
}
subject = CommonUtil.cutString(subject, DELIVER_SUBJECT_LENGTH);
buffer.put(DELIVER_SUBJECT_POSITION, subject.getBytes());
}
public static String getSubjectForDeliver(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(DELIVER_SUBJECT_POSITION);
byte[] destArray = new byte[DELIVER_SUBJECT_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static void putMessageForDeliver(ByteBuffer buffer, String message) {
if (buffer == null || message == null) {
return;
}
message = CommonUtil.cutString(message, DELIVER_MESSAGE_LENGTH);
buffer.put(DELIVER_MESSAGE_POSITION, message.getBytes());
}
public static String getMessageForDeliver(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(DELIVER_MESSAGE_POSITION);
byte[] destArray = new byte[DELIVER_MESSAGE_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static ByteBuffer makeDeliverAckBuffer(String msgId, String status) {
ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + DELIVER_LMS_ACK_BODY_LENGTH);
Packet.setDefaultByte(buffer);
Header.putHeader(buffer, Header.COMMAND_DELIVER_ACK, DELIVER_LMS_ACK_BODY_LENGTH);
buffer.put(CommonMessage.DELIVER_ACK_MESSAGE_ID_POSITION, msgId.getBytes());
buffer.put(CommonMessage.DELIVER_ACK_RESULT_POSITION, status.getBytes());
return buffer;
}
// public static void makeDataForDeliver(ByteBuffer buffer, MunjaonMsg data) {
// if (buffer == null || data == null) {
// return;
// }
// /* MSG_ID */
// CommonMessage.putMessageIdForDeliver(buffer, data.getMsgId());
// /* SENDER */
// CommonMessage.putSenderForDeliver(buffer, data.getSendPhone());
// /* RECEIVER */
// CommonMessage.putReceiverForDeliver(buffer, data.getRecvPhone());
// /* RESERVE_TIME */
// CommonMessage.putReserveTimeForDeliver(buffer, data.getReserveDate());
// /* REQUEST_TIME */
// CommonMessage.putRequestTimeForDeliver(buffer, data.getRequestDate());
// /* MSG_TYPE */
// CommonMessage.putMsgTypeForDeliver(buffer, data.getMsgType());
// /* Subject */
// putSubjectForDeliver(buffer, data.getSubject());
// /* MSG */
// putMessageForDeliver(buffer, data.getMessage());
// }
}

View File

@ -0,0 +1,104 @@
package com.munjaon.server.server.packet;
import com.munjaon.server.util.CommonUtil;
import java.nio.ByteBuffer;
public final class MmsMessage {
public static final int DELIVER_MMS_BODY_LENGTH = 2120;
public static final int DELIVER_MMS_ACK_BODY_LENGTH = 21;
/* DELIVER */
/* SUBJECT */
public static final int DELIVER_SUBJECT_LENGTH = 40;
public static final int DELIVER_SUBJECT_POSITION = CommonMessage.DELIVER_MSG_TYPE_POSITION + CommonMessage.DELIVER_MSG_TYPE_LENGTH;
/* MESSAGE */
public static final int DELIVER_MESSAGE_LENGTH = 2000;
public static final int DELIVER_MESSAGE_POSITION = DELIVER_SUBJECT_POSITION + DELIVER_SUBJECT_LENGTH;
/* FILECOUNT */
public static final int DELIVER_FILECOUNT_LENGTH = 1;
public static final int DELIVER_FILECOUNT_POSITION = DELIVER_MESSAGE_POSITION + DELIVER_MESSAGE_LENGTH;
public static void putSubjectForDeliver(ByteBuffer buffer, String subject) {
if (buffer == null || subject == null) {
return;
}
subject = CommonUtil.cutString(subject, DELIVER_SUBJECT_LENGTH);
buffer.put(DELIVER_SUBJECT_POSITION, subject.getBytes());
}
public static String getSubjectForDeliver(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(DELIVER_SUBJECT_POSITION);
byte[] destArray = new byte[DELIVER_SUBJECT_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static void putMessageForDeliver(ByteBuffer buffer, String message) {
if (buffer == null || message == null) {
return;
}
message = CommonUtil.cutString(message, DELIVER_MESSAGE_LENGTH);
buffer.put(DELIVER_MESSAGE_POSITION, message.getBytes());
}
public static String getMessageForDeliver(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(DELIVER_MESSAGE_POSITION);
byte[] destArray = new byte[DELIVER_MESSAGE_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static void putFileCountForDeliver(ByteBuffer buffer, int fileCount) {
putFileCountForDeliver(buffer, Integer.toString(fileCount));
}
public static void putFileCountForDeliver(ByteBuffer buffer, String fileCount) {
if (buffer == null || fileCount == null) {
return;
}
buffer.put(DELIVER_FILECOUNT_POSITION, fileCount.getBytes());
}
public static String getFileCountForDeliver(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(DELIVER_FILECOUNT_POSITION);
byte[] destArray = new byte[DELIVER_FILECOUNT_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
// public static void makeDataForDeliver(ByteBuffer buffer, MunjaonMsg data) {
// if (buffer == null || data == null) {
// return;
// }
// /* MSG_ID */
// CommonMessage.putMessageIdForDeliver(buffer, data.getMsgId());
// /* SENDER */
// CommonMessage.putSenderForDeliver(buffer, data.getSendPhone());
// /* RECEIVER */
// CommonMessage.putReceiverForDeliver(buffer, data.getRecvPhone());
// /* RESERVE_TIME */
// CommonMessage.putReserveTimeForDeliver(buffer, data.getReserveDate());
// /* REQUEST_TIME */
// CommonMessage.putRequestTimeForDeliver(buffer, data.getRequestDate());
// /* MSG_TYPE */
// CommonMessage.putMsgTypeForDeliver(buffer, data.getMsgType());
// /* Subject */
// putSubjectForDeliver(buffer, data.getSubject());
// /* MSG */
// putMessageForDeliver(buffer, data.getMessage());
// /* FileCount */
// putFileCountForDeliver(buffer, data.getFileCount());
// }
}

View File

@ -48,10 +48,21 @@ public final class Packet {
if (dest.capacity() != (srcHead.capacity() + srcBody.capacity())) { if (dest.capacity() != (srcHead.capacity() + srcBody.capacity())) {
return; return;
} }
byte[] srcHeadArray = srcHead.array(); for (int i = 0; i < srcHead.capacity(); i++) {
byte[] srcBodyArray = srcBody.array(); dest.put(i, srcHead.get(i));
}
for (int i = 0; i < srcBody.capacity(); i++) {
dest.put((i + srcHead.capacity()), srcBody.get(i));
}
}
dest.put(0, srcHeadArray); public static void printBuffer(ByteBuffer buffer) {
dest.put(srcHeadArray.length, srcBodyArray); if (buffer == null) {
return;
}
byte[] srcArray = buffer.array();
for (int i = 0; i < srcArray.length; i++) {
System.out.print(srcArray[i] + " ");
}
} }
} }

View File

@ -0,0 +1,160 @@
package com.munjaon.server.server.packet;
import com.munjaon.server.server.dto.ReportDto;
import java.nio.ByteBuffer;
public final class Report {
/* MSG_ID (Length : 20 / Position : 0) */
public static final int REPORT_MSG_ID_LENGTH = 20;
public static final int REPORT_MSG_ID_POSITION = Header.HEADER_LENGTH;
/* AGENT_CODE (Length : 2 / Position : 20) */
public static final int REPORT_AGENT_CODE_LENGTH = 2;
public static final int REPORT_AGENT_CODE_POSITION = REPORT_MSG_ID_POSITION + REPORT_MSG_ID_LENGTH;
/* SEND_TIME (Length : 14 / Position : 22) */
public static final int REPORT_SEND_TIME_LENGTH = 14;
public static final int REPORT_SEND_TIME_POSITION = REPORT_AGENT_CODE_POSITION + REPORT_AGENT_CODE_LENGTH;
/* TELECOM (Length : 3 / Position : 36) */
public static final int REPORT_TELECOM_LENGTH = 3;
public static final int REPORT_TELECOM_POSITION = REPORT_SEND_TIME_POSITION + REPORT_SEND_TIME_LENGTH;
/* RESULT (Length : 5 / Position : 39) */
public static final int REPORT_RESULT_LENGTH = 5;
public static final int REPORT_RESULT_POSITION = REPORT_TELECOM_POSITION + REPORT_TELECOM_LENGTH;
/* Report 길이 */
public static final int REPORT_BODY_LENGTH = REPORT_RESULT_POSITION + REPORT_RESULT_LENGTH;
/* RESULT (Length : 5 / Position : 39) */
public static final int REPORT_ACK_RESULT_LENGTH = 1;
public static final int REPORT_ACK_RESULT_POSITION = 0;
public static final int REPORT_ACK_BODY_LENGTH = REPORT_ACK_RESULT_POSITION + REPORT_ACK_RESULT_LENGTH;
public static void putReport(final ByteBuffer buffer, final ReportDto reportDto) {
if (reportDto == null) {
return;
}
putMsgId(buffer, reportDto.getMsgId());
putAgentCode(buffer, reportDto.getAgentCode());
putSendTime(buffer, reportDto.getRsltDate());
putTelecom(buffer, reportDto.getRsltNet());
putResult(buffer, reportDto.getRsltCode());
}
public static void putMsgId(final ByteBuffer buffer, String msgId) {
if (buffer == null || msgId == null) {
return;
}
buffer.put(REPORT_MSG_ID_POSITION, msgId.getBytes());
}
public static String getMsgId(final ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(REPORT_MSG_ID_POSITION);
byte[] destArray = new byte[REPORT_MSG_ID_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static void putAgentCode(final ByteBuffer buffer, String agentCode) {
if (buffer == null || agentCode == null) {
return;
}
buffer.put(REPORT_AGENT_CODE_POSITION, agentCode.getBytes());
}
public static String getAgentCode(final ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(REPORT_AGENT_CODE_POSITION);
byte[] destArray = new byte[REPORT_AGENT_CODE_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static void putSendTime(final ByteBuffer buffer, String sendTime) {
if (buffer == null || sendTime == null) {
return;
}
buffer.put(REPORT_SEND_TIME_POSITION, sendTime.getBytes());
}
public static String getSendTime(final ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(REPORT_SEND_TIME_POSITION);
byte[] destArray = new byte[REPORT_SEND_TIME_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static void putTelecom(final ByteBuffer buffer, String telecom) {
if (buffer == null || telecom == null) {
return;
}
buffer.put(REPORT_TELECOM_POSITION, telecom.getBytes());
}
public static String getTelecom(final ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(REPORT_TELECOM_POSITION);
byte[] destArray = new byte[REPORT_TELECOM_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static void putResult(final ByteBuffer buffer, String result) {
if (buffer == null || result == null) {
return;
}
buffer.put(REPORT_RESULT_POSITION, result.getBytes());
}
public static String getResult(final ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(REPORT_RESULT_POSITION);
byte[] destArray = new byte[REPORT_RESULT_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
public static void putResultAck(final ByteBuffer buffer, String result) {
if (buffer == null || result == null) {
return;
}
buffer.put(REPORT_ACK_RESULT_POSITION, result.getBytes());
}
public static String getResultAck(final ByteBuffer buffer) {
if (buffer == null) {
return null;
}
buffer.position(REPORT_ACK_RESULT_POSITION);
byte[] destArray = new byte[REPORT_ACK_RESULT_LENGTH];
buffer.get(destArray);
return Packet.getString(destArray);
}
}

View File

@ -0,0 +1,118 @@
package com.munjaon.server.server.queue;
import com.munjaon.server.server.dto.ConnectUserDto;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class CollectUserQueue {
private final Object lockObject = new Object(); // Lock Object
private final Map<String, ConnectUserDto> smstUserQueue = new LinkedHashMap<>();
private final Map<String, ConnectUserDto> lmsUserQueue = new LinkedHashMap<>();
private final Map<String, ConnectUserDto> mmsUserQueue = new LinkedHashMap<>();
private final Map<String, ConnectUserDto> katUserQueue = new LinkedHashMap<>();
private final Map<String, ConnectUserDto> kftUserQueue = new LinkedHashMap<>();
private static CollectUserQueue collectUserQueue;
private CollectUserQueue() {}
public synchronized static CollectUserQueue getInstance() {
if (collectUserQueue == null) {
collectUserQueue = new CollectUserQueue();
}
return collectUserQueue;
}
public void putUser(final String serviceType, final ConnectUserDto user) {
synchronized (lockObject) {
switch (serviceType) {
case "SMS" : smstUserQueue.put(user.getUserId(), user); break;
case "LMS" : lmsUserQueue.put(user.getUserId(), user); break;
case "MMS" : mmsUserQueue.put(user.getUserId(), user); break;
case "KAT" : katUserQueue.put(user.getUserId(), user); break;
case "KFT" : kftUserQueue.put(user.getUserId(), user); break;
default: break;
}
}
}
public ConnectUserDto getUser(final String serviceType, final String userId) {
synchronized (lockObject) {
switch (serviceType) {
case "SMS" : return smstUserQueue.get(userId);
case "LMS" : return lmsUserQueue.get(userId);
case "MMS" : return mmsUserQueue.get(userId);
case "KAT" : return katUserQueue.get(userId);
case "KFT" : return kftUserQueue.get(userId);
default: return null;
}
}
}
public List<ConnectUserDto> getUsers(final String serviceType) {
synchronized (lockObject) {
switch (serviceType) {
case "SMS" : return new ArrayList<>(smstUserQueue.values());
case "LMS" : return new ArrayList<>(lmsUserQueue.values());
case "MMS" : return new ArrayList<>(mmsUserQueue.values());
case "KAT" : return new ArrayList<>(katUserQueue.values());
case "KFT" : return new ArrayList<>(kftUserQueue.values());
default: return null;
}
}
}
public boolean isExist(final String serviceType, final String userId) {
synchronized (lockObject) {
switch (serviceType) {
case "SMS" : return smstUserQueue.containsKey(userId);
case "LMS" : return lmsUserQueue.containsKey(userId);
case "MMS" : return mmsUserQueue.containsKey(userId);
case "KAT" : return katUserQueue.containsKey(userId);
case "KFT" : return kftUserQueue.containsKey(userId);
default: return false;
}
}
}
public void removeUser(final String serviceType, final String userId) {
synchronized (lockObject) {
switch (serviceType) {
case "SMS" : smstUserQueue.remove(userId); break;
case "LMS" : lmsUserQueue.remove(userId); break;
case "MMS" : mmsUserQueue.remove(userId); break;
case "KAT" : katUserQueue.remove(userId); break;
case "KFT" : kftUserQueue.remove(userId); break;
default: break;
}
}
}
public int size(final String serviceType) {
synchronized (lockObject) {
switch (serviceType) {
case "SMS" : return smstUserQueue.size();
case "LMS" : return lmsUserQueue.size();
case "MMS" : return mmsUserQueue.size();
case "KAT" : return katUserQueue.size();
case "KFT" : return kftUserQueue.size();
default: return 0;
}
}
}
public boolean isEmpty(final String serviceType) {
synchronized (lockObject) {
switch (serviceType) {
case "SMS" : return smstUserQueue.isEmpty();
case "LMS" : return lmsUserQueue.isEmpty();
case "MMS" : return mmsUserQueue.isEmpty();
case "KAT" : return katUserQueue.isEmpty();
case "KFT" : return kftUserQueue.isEmpty();
default: return true;
}
}
}
}

View File

@ -0,0 +1,76 @@
package com.munjaon.server.server.queue;
import com.munjaon.server.queue.pool.ReportQueue;
import com.munjaon.server.server.dto.ReportUserDto;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
public class ReportUserQueue {
private final Object lockObject = new Object(); // Lock Object
private final Map<String, ReportUserDto> connectUserQueue = new LinkedHashMap<>();
private static ReportUserQueue reportUserQueue;
private ReportUserQueue() {}
public synchronized static ReportUserQueue getInstance() {
if (reportUserQueue == null) {
reportUserQueue = new ReportUserQueue();
}
return reportUserQueue;
}
public void putUser(final ReportUserDto user) {
synchronized (lockObject) {
connectUserQueue.put(user.getUserId(), user);
}
}
public ReportUserDto getUser(final String userId) {
synchronized (lockObject) {
return connectUserQueue.get(userId);
}
}
public List<ReportUserDto> getUsers() {
synchronized (lockObject) {
return new ArrayList<>(connectUserQueue.values());
}
}
public boolean isExist(final String userId) {
synchronized (lockObject) {
return connectUserQueue.containsKey(userId);
}
}
public void removeUser(final String userId) {
synchronized (lockObject) {
ReportUserDto userDto = connectUserQueue.remove(userId);
if (userDto != null) {
ReportQueue queue = userDto.getReportQueue();
try {
queue.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
public int size() {
synchronized (lockObject) {
return connectUserQueue.size();
}
}
public boolean isEmpty() {
synchronized (lockObject) {
return connectUserQueue.isEmpty();
}
}
}

View File

@ -0,0 +1,34 @@
package com.munjaon.server.server.sample;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService e = Executors.newFixedThreadPool(5);
// 리턴 값이 필요 없는 경우
// for (int i = 0; i < 5; i++) {
// e.execute(new MyRunnable(i * 20));
// }
// 작업을 기다려야 경우
//e.shutdown();
// 리턴 값이 필요한 경우
List<Future<Integer>> l = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Future<Integer> f = e.submit(new MyCallable(i * 20));
l.add(f);
}
// 작업이 완료 되길 기다려야 경우
int n = 0;
for (int i = 0; i < 5; i++) {
Future<Integer> f = l.get(i);
n += f.get();
}
}
}

View File

@ -0,0 +1,27 @@
package com.munjaon.server.server.sample;
import java.util.concurrent.Callable;
public class MyCallable implements Callable {
private final int s;
public MyCallable(int s) {
this.s = s;
}
@Override
public Integer call() throws Exception {
int in = 0;
for (int i = 0; i < 20; i++) {
System.out.println(s + " : MyCallable");
System.out.println(i + s);
in += i + s;
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return in;
}
}

View File

@ -0,0 +1,20 @@
package com.munjaon.server.server.sample;
public class MyRunnable implements Runnable {
private final int s;
public MyRunnable(int s) {
this.s = s;
}
public void run() {
for (int i = 0; i < 20; i++) {
System.out.println(s + " : MyRunnable");
System.out.println(i + s);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -0,0 +1,403 @@
package com.munjaon.server.server.service;
import com.munjaon.server.cache.dto.MemberDto;
import com.munjaon.server.cache.enums.CacheService;
import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.dto.HeaderDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.util.LogUtil;
import lombok.Getter;
import org.json.simple.JSONObject;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CollectBackServerService extends Service {
private final InetSocketAddress listenAddress;
private CollectorThreadService threadService;
private Selector selector;
private final String serviceType;
public CollectBackServerService(String serviceName, String serviceType, int port) {
super(serviceName);
this.listenAddress = new InetSocketAddress(port);
this.serviceType = serviceType;
}
@Override
public void checkReady() {
QueueTypeWorker worker = QueueTypeWorker.find(this.serviceType);
if (worker != null && worker.getQueueSize() > 0) {
this.IS_READY_YN = true;
} else {
this.IS_READY_YN = false;
}
}
@Override
public void initResources() {
try {
initCollectChannel();
threadService = new CollectorThreadService(8, this.serviceType, logger);
} catch (IOException e) {
saveSystemLog(e);
throw new RuntimeException(e);
}
}
private void initCollectChannel() throws IOException {
selector = Selector.open();
/* 채널 생성 */
ServerSocketChannel serverChannel = ServerSocketChannel.open();
/* non-Blocking 설정 */
serverChannel.configureBlocking(false);
/* 서버 ip, port 설정 */
serverChannel.socket().bind(listenAddress);
/* 채널에 accept 대기 설정 */
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
}
private void closeCollectChannel() throws IOException {
selector.close();
}
@Override
public void releaseResources() {
try {
closeCollectChannel();
threadService.close();
} catch (IOException e) {
saveSystemLog(e);
throw new RuntimeException(e);
}
}
@Override
public void doService() {
while (isRun()) {
try {
execInterest();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void execInterest() throws IOException {
if (selector.select(1000) == 0) {
return ;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
if (key.isValid()) {
if (key.isAcceptable()) { // 접속일 경우..
saveSystemLog("isAcceptable");
threadService.submit(selector, key, 1);
} else if (key.isReadable()) { // 수신일 경우..
saveSystemLog("isReadable");
threadService.submit(selector, key, 2);
} else if (key.isWritable()) { // 발신일 경우..
saveSystemLog("isWritable");
threadService.submit(selector, key, 3);
}
}
/* 키 셋에서 제거. */
keys.remove();
}
}
@Override
public JSONObject monitorService() {
return null;
}
private static class CollectorThreadService {
@Getter
private String serviceType;
@Getter
private final int maxCore;
private final ExecutorService executor;
private final Map<String, ConnectUserDto> connectUserMap = new ConcurrentHashMap<>();
private final LogUtil logger;
public CollectorThreadService(String serviceType, LogUtil logger) {
this(Runtime.getRuntime().availableProcessors(), serviceType, logger);
}
public CollectorThreadService(int maxCore, String serviceType, LogUtil logger) {
this.maxCore = maxCore;
this.executor = Executors.newFixedThreadPool(maxCore);
this.logger = logger;
}
public void submit(Selector selector, SelectionKey key, int interestOps) {
executor.submit(() -> {
switch (interestOps) {
case 1 : accept(selector, key); break;
case 2 : read(selector, key); break;
case 3 : write(selector, key); break;
default : break;
}
});
}
private void accept(Selector selector, SelectionKey key) {
try {
/* 키 채널을 가져온다. */
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
/* accept을 해서 Socket 채널을 가져온다. */
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
/* 소켓 취득 */
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
saveSystemLog("accept : " + Thread.currentThread().getName());
saveSystemLog("Connected to: " + remoteAddr);
// Socket 채널을 channel에 수신 등록한다
channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// private void read(Selector selector, SelectionKey key) {
// try {
// saveSystemLog("read : " + Thread.currentThread().getName());
// // 채널을 가져온다.
// SocketChannel channel = (SocketChannel) key.channel();
// ConnectUserDto userDto = (ConnectUserDto) key.attachment();
//
// int size = -1;
//
// ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH);
// ByteBuffer bindBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH);
// ByteBuffer linkBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_BODY_LENGTH);
// ByteBuffer msgBuffer = ByteBuffer.allocate(SmsMessage.DELIVER_SMS_BODY_LENGTH);
// try {
// size = channel.read(headBuffer);
// String command = Header.getCommand(headBuffer);
// System.out.println("command : " + command);
// if ("1".equals(command)) {
// size = channel.read(bindBuffer);
// } else if ("3".equals(command)) {
// size = channel.read(msgBuffer);
// } else if ("7".equals(command)) {
// size = channel.read(linkBuffer);
// } else {
// size = -1;
// }
//
// System.out.println("size : " + size);
// } catch (IOException e) {}
// if (size < 0) {
// expireConnectUser(key);
// } else if (size > 0) {
//// String command = Header.getCommand(buffer);
//// saveSystemLog("command : " + command);
//// switch (Integer.parseInt(command)) {
//// case 1 : recvBind(channel, buffer, userDto); break;
//// case 3 : recvDeliver(channel, buffer, userDto); break;
//// case 7 : recvLinkCheck(key); break;
//// default: expireConnectUser(key); break;
//// }
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// System.out.println("read");
// }
private void read(Selector selector, SelectionKey key) {
try {
saveSystemLog("read : " + Thread.currentThread().getName());
// 채널을 가져온다.
SocketChannel channel = (SocketChannel) key.channel();
ConnectUserDto userDto = (ConnectUserDto) key.attachment();
int size = -1;
ByteBuffer buffer = ByteBuffer.allocate(256);
try {
size = channel.read(buffer);
} catch (IOException e) {}
if (size < 0) {
expireConnectUser(key);
} else if (size > 0) {
String command = Header.getCommand(buffer);
saveSystemLog("command : " + command);
switch (Integer.parseInt(command)) {
case 1 : recvBind(channel, buffer, userDto); break;
case 3 : recvDeliver(channel, buffer, userDto); break;
case 7 : recvLinkCheck(key); break;
default: expireConnectUser(key); break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("read");
}
private void recvBind(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) {
String resultCode = "00";
try {
String id = Bind.getBindId(buffer);
String pwd = Bind.getBindPwd(buffer);
saveSystemLog("id : " + id);
saveSystemLog("pwd : " + pwd);
if (id == null || pwd == null) {
resultCode = "50";
} else {
if (connectUserMap.containsKey(id)) {
resultCode = "60";
} else {
MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService();
MemberDto memberDto = null;
if (svc != null) {
memberDto = svc.get(id);
}
if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) {
resultCode = "20";
} else {
userDto.setUserId(id);
userDto.setLogin(true);
userDto.setMemberDto(memberDto);
}
}
}
} catch (Exception e) {
resultCode = "10";
e.printStackTrace();
}
try {
saveSystemLog("resultCode : " + resultCode);
channel.write(Bind.makeBindAckBuffer(resultCode));
} catch (IOException e) {
e.printStackTrace();
}
}
private void recvDeliver(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) throws IOException {
BasicMessageDto messageDto = new BasicMessageDto();
messageDto.setRouterSeq("40");
messageDto.setServiceType("4");
messageDto.setUserId(userDto.getUserId());
messageDto.setRemoteIP(userDto.getRemoteIP());
messageDto.setSendStatus("0");
messageDto.setUserMsgID(CommonMessage.getMessageIdForDeliver(buffer));
messageDto.setUserSender(CommonMessage.getSenderForDeliver(buffer));
messageDto.setUserReceiver(CommonMessage.getReceiverForDeliver(buffer));
messageDto.setReserveDt(CommonMessage.getReserveTimeForDeliver(buffer));
messageDto.setRequestDt(CommonMessage.getRequestTimeForDeliver(buffer));
messageDto.setUnitCost("10.4");
messageDto.setUserMessage(SmsMessage.getMessageForDeliver(buffer));
QueueTypeWorker worker = QueueTypeWorker.find("SMS");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
}
}
private void recvLinkCheck(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
channel.write(LinkCheck.makeLinkCheckAckBuffer());
}
private void expireConnectUser(SelectionKey key) {
if (key == null || !key.isValid()) {
return;
}
try {
SocketChannel channel = (SocketChannel) key.channel();
ConnectUserDto userDto = (ConnectUserDto) key.attachment();
if (userDto != null && userDto.getUserId() != null) {
connectUserMap.remove(userDto.getUserId());
key.attach(null);
}
// 소켓 채널 닫기
channel.close();
// 닫기
key.cancel();
} catch (IOException e) {
e.printStackTrace();
}
}
private HeaderDto getHeader(SocketChannel channel) {
HeaderDto headerDto = HeaderDto.builder().build();
int size = -1;
ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH);
if (channel != null) {
try {
saveSystemLog("Key is valid : ");
// SocketChannel channel = (SocketChannel) key.channel();
size = channel.read(buffer);
} catch (IOException e) {}
}
if (size < 0) {
saveSystemLog("Is Error : ");
headerDto.setError(true);
} else {
saveSystemLog("version : " + Header.getVersion(buffer));
saveSystemLog("Command : " + Header.getCommand(buffer));
saveSystemLog("BodyLength : " + Header.getBodyLength(buffer));
headerDto.setVersion(Header.getVersion(buffer));
headerDto.setCommand(Integer.parseInt(Header.getCommand(buffer)));
headerDto.setBodyLength(Integer.parseInt(Header.getBodyLength(buffer)));
}
saveSystemLog("READ HEADER : " + size);
return headerDto;
}
private void write(Selector selector, SelectionKey key) {
System.out.println("write");
}
private void saveSystemLog(Object obj) {
saveLog(obj, true);
}
private void saveLog(Object obj) {
saveLog(obj, false);
}
private void saveLog(Object obj, boolean isConsoleOutput) {
if (isConsoleOutput) {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{" + serviceType + "}} " + obj);
}
if (logger == null) {
return;
}
logger.log(obj);
}
public void close() {
List<Runnable> unfinishedTasks = executor.shutdownNow();
connectUserMap.clear();
if (!unfinishedTasks.isEmpty()) {
saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size());
}
}
}
}

View File

@ -1,13 +1,9 @@
package com.munjaon.server.server.service; package com.munjaon.server.server.service;
import com.munjaon.server.cache.dto.MemberDto;
import com.munjaon.server.cache.enums.CacheService;
import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker; import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.server.dto.ConnectUserDto; import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.dto.HeaderDto; import com.munjaon.server.server.queue.CollectUserQueue;
import com.munjaon.server.server.packet.*; import com.munjaon.server.server.task.CollectReadTask;
import com.munjaon.server.util.LogUtil; import com.munjaon.server.util.LogUtil;
import lombok.Getter; import lombok.Getter;
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
@ -16,31 +12,33 @@ import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CollectServerService extends Service { public class CollectServerService extends Service {
private final InetSocketAddress listenAddress; private final InetSocketAddress listenAddress;
private CollectorThreadService threadService; private CollectThreadService threadService;
private CollectUserQueue collectUserQueue = CollectUserQueue.getInstance();
private Selector selector; private Selector selector;
private final String serviceType; private final String serviceType;
private int readMaxCore;
public CollectServerService(String serviceName, String serviceType, int port) { public CollectServerService(String serviceName, String serviceType, int port) {
super(serviceName); super(serviceName);
this.readMaxCore = Integer.parseInt(getProp("READ_MAX_CORE").trim());
this.listenAddress = new InetSocketAddress(port); this.listenAddress = new InetSocketAddress(port);
this.serviceType = serviceType; this.serviceType = serviceType;
} }
@Override @Override
public void checkReady() { public void checkReady() {
QueueTypeWorker worker = QueueTypeWorker.find(this.serviceType); QueueTypeWorker worker = QueueTypeWorker.find(this.serviceType);
@ -55,7 +53,7 @@ public class CollectServerService extends Service {
public void initResources() { public void initResources() {
try { try {
initCollectChannel(); initCollectChannel();
threadService = new CollectorThreadService(8, this.serviceType, logger); threadService = new CollectThreadService(readMaxCore, this.serviceType, logger);
} catch (IOException e) { } catch (IOException e) {
saveSystemLog(e); saveSystemLog(e);
throw new RuntimeException(e); throw new RuntimeException(e);
@ -105,22 +103,78 @@ public class CollectServerService extends Service {
return ; return ;
} }
Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
List<Future<ConnectUserDto>> list = new ArrayList<>();
while (keys.hasNext()) { while (keys.hasNext()) {
SelectionKey key = keys.next(); SelectionKey key = keys.next();
/* 키 셋에서 제거. */
keys.remove();
if (key.isValid()) { if (key.isValid()) {
if (key.isAcceptable()) { // 접속일 경우.. if (key.isAcceptable()) { // 접속일 경우..
saveSystemLog("isAcceptable"); saveSystemLog("isAcceptable");
threadService.submit(selector, key, 1); accept(selector, key);
} else if (key.isReadable()) { // 수신일 경우.. } else if (key.isReadable()) { // 수신일 경우..
saveSystemLog("isReadable"); saveSystemLog("isReadable");
threadService.submit(selector, key, 2); Future<ConnectUserDto> future = threadService.submit(new CollectReadTask(selector, key, this.serviceType, logger));
} else if (key.isWritable()) { // 발신일 경우.. list.add(future);
saveSystemLog("isWritable"); // threadService.submit(selector, key, 2);
threadService.submit(selector, key, 3);
} }
} else {
expireConnectUser(key);
} }
/* 키 셋에서 제거. */ }
keys.remove(); for (Future<ConnectUserDto> future : list) {
ConnectUserDto connectUserDto = null;
try {
connectUserDto = future.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
if (connectUserDto == null) {
saveSystemLog("Future : " + future);
} else {
saveSystemLog("Future : " + connectUserDto.toString());
}
}
}
private void accept(Selector selector, SelectionKey key) {
try {
/* 키 채널을 가져온다. */
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
/* accept을 해서 Socket 채널을 가져온다. */
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
/* 소켓 취득 */
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
saveSystemLog("Connected to: " + remoteAddr);
// Socket 채널을 channel에 수신 등록한다
channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void expireConnectUser(SelectionKey key) {
if (key == null || !key.isValid()) {
return;
}
try {
SocketChannel channel = (SocketChannel) key.channel();
ConnectUserDto userDto = (ConnectUserDto) key.attachment();
if (userDto != null && userDto.getUserId() != null) {
collectUserQueue.removeUser(this.serviceType, userDto.getUserId());
// connectUserMap.remove(userDto.getUserId());
key.attach(null);
}
// 소켓 채널 닫기
channel.close();
// 닫기
key.cancel();
} catch (IOException e) {
e.printStackTrace();
} }
} }
@ -129,7 +183,7 @@ public class CollectServerService extends Service {
return null; return null;
} }
private static class CollectorThreadService { private static class CollectThreadService {
@Getter @Getter
private String serviceType; private String serviceType;
@Getter @Getter
@ -138,238 +192,18 @@ public class CollectServerService extends Service {
private final Map<String, ConnectUserDto> connectUserMap = new ConcurrentHashMap<>(); private final Map<String, ConnectUserDto> connectUserMap = new ConcurrentHashMap<>();
private final LogUtil logger; private final LogUtil logger;
public CollectorThreadService(String serviceType, LogUtil logger) { public CollectThreadService(String serviceType, LogUtil logger) {
this(Runtime.getRuntime().availableProcessors(), serviceType, logger); this(Runtime.getRuntime().availableProcessors(), serviceType, logger);
} }
public CollectorThreadService(int maxCore, String serviceType, LogUtil logger) { public CollectThreadService(int maxCore, String serviceType, LogUtil logger) {
this.maxCore = maxCore; this.maxCore = maxCore;
this.executor = Executors.newFixedThreadPool(maxCore); this.executor = Executors.newFixedThreadPool(maxCore);
this.logger = logger; this.logger = logger;
} }
public void submit(Selector selector, SelectionKey key, int interestOps) { public Future<ConnectUserDto> submit(CollectReadTask collectReadTask) {
executor.submit(() -> { return executor.submit(collectReadTask);
switch (interestOps) {
case 1 : accept(selector, key); break;
case 2 : read(selector, key); break;
case 3 : write(selector, key); break;
default : break;
}
});
}
private void accept(Selector selector, SelectionKey key) {
try {
/* 키 채널을 가져온다. */
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
/* accept을 해서 Socket 채널을 가져온다. */
SocketChannel channel = serverChannel.accept();
channel.configureBlocking(false);
/* 소켓 취득 */
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
saveSystemLog("accept : " + Thread.currentThread().getName());
saveSystemLog("Connected to: " + remoteAddr);
// Socket 채널을 channel에 수신 등록한다
channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// private void read(Selector selector, SelectionKey key) {
// try {
// saveSystemLog("read : " + Thread.currentThread().getName());
// // 채널을 가져온다.
// SocketChannel channel = (SocketChannel) key.channel();
// ConnectUserDto userDto = (ConnectUserDto) key.attachment();
//
// int size = -1;
//
// ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH);
// ByteBuffer bindBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH);
// ByteBuffer linkBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_BODY_LENGTH);
// ByteBuffer msgBuffer = ByteBuffer.allocate(SmsMessage.DELIVER_SMS_BODY_LENGTH);
// try {
// size = channel.read(headBuffer);
// String command = Header.getCommand(headBuffer);
// System.out.println("command : " + command);
// if ("1".equals(command)) {
// size = channel.read(bindBuffer);
// } else if ("3".equals(command)) {
// size = channel.read(msgBuffer);
// } else if ("7".equals(command)) {
// size = channel.read(linkBuffer);
// } else {
// size = -1;
// }
//
// System.out.println("size : " + size);
// } catch (IOException e) {}
// if (size < 0) {
// expireConnectUser(key);
// } else if (size > 0) {
//// String command = Header.getCommand(buffer);
//// saveSystemLog("command : " + command);
//// switch (Integer.parseInt(command)) {
//// case 1 : recvBind(channel, buffer, userDto); break;
//// case 3 : recvDeliver(channel, buffer, userDto); break;
//// case 7 : recvLinkCheck(key); break;
//// default: expireConnectUser(key); break;
//// }
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// System.out.println("read");
// }
private void read(Selector selector, SelectionKey key) {
try {
saveSystemLog("read : " + Thread.currentThread().getName());
// 채널을 가져온다.
SocketChannel channel = (SocketChannel) key.channel();
ConnectUserDto userDto = (ConnectUserDto) key.attachment();
int size = -1;
ByteBuffer buffer = ByteBuffer.allocate(256);
try {
size = channel.read(buffer);
} catch (IOException e) {}
if (size < 0) {
expireConnectUser(key);
} else if (size > 0) {
String command = Header.getCommand(buffer);
saveSystemLog("command : " + command);
switch (Integer.parseInt(command)) {
case 1 : recvBind(channel, buffer, userDto); break;
case 3 : recvDeliver(channel, buffer, userDto); break;
case 7 : recvLinkCheck(key); break;
default: expireConnectUser(key); break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("read");
}
private void recvBind(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) {
String resultCode = "00";
try {
String id = Bind.getBindId(buffer);
String pwd = Bind.getBindPwd(buffer);
saveSystemLog("id : " + id);
saveSystemLog("pwd : " + pwd);
if (id == null || pwd == null) {
resultCode = "50";
} else {
if (connectUserMap.containsKey(id)) {
resultCode = "60";
} else {
MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService();
MemberDto memberDto = null;
if (svc != null) {
memberDto = svc.get(id);
}
if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) {
resultCode = "20";
} else {
userDto.setUserId(id);
userDto.setLogin(true);
userDto.setMemberDto(memberDto);
}
}
}
} catch (Exception e) {
resultCode = "10";
e.printStackTrace();
}
try {
saveSystemLog("resultCode : " + resultCode);
channel.write(Bind.makeBindAckBuffer(resultCode));
} catch (IOException e) {
e.printStackTrace();
}
}
private void recvDeliver(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) throws IOException {
BasicMessageDto messageDto = new BasicMessageDto();
messageDto.setRouterSeq("40");
messageDto.setServiceType("4");
messageDto.setUserId(userDto.getUserId());
messageDto.setRemoteIP(userDto.getRemoteIP());
messageDto.setSendStatus("0");
messageDto.setUserMsgID(CommonMessage.getMessageIdForDeliver(buffer));
messageDto.setUserSender(CommonMessage.getSenderForDeliver(buffer));
messageDto.setUserReceiver(CommonMessage.getReceiverForDeliver(buffer));
messageDto.setReserveDt(CommonMessage.getReserveTimeForDeliver(buffer));
messageDto.setRequestDt(CommonMessage.getRequestTimeForDeliver(buffer));
messageDto.setUnitCost("10.4");
messageDto.setUserMessage(SmsMessage.getMessageForDeliver(buffer));
QueueTypeWorker worker = QueueTypeWorker.find("SMS");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
}
}
private void recvLinkCheck(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
channel.write(LinkCheck.makeLinkCheckAckBuffer());
}
private void expireConnectUser(SelectionKey key) {
if (key == null || !key.isValid()) {
return;
}
try {
SocketChannel channel = (SocketChannel) key.channel();
ConnectUserDto userDto = (ConnectUserDto) key.attachment();
if (userDto != null && userDto.getUserId() != null) {
connectUserMap.remove(userDto.getUserId());
key.attach(null);
}
// 소켓 채널 닫기
channel.close();
// 닫기
key.cancel();
} catch (IOException e) {
e.printStackTrace();
}
}
private HeaderDto getHeader(SocketChannel channel) {
HeaderDto headerDto = HeaderDto.builder().build();
int size = -1;
ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH);
if (channel != null) {
try {
saveSystemLog("Key is valid : ");
// SocketChannel channel = (SocketChannel) key.channel();
size = channel.read(buffer);
} catch (IOException e) {}
}
if (size < 0) {
saveSystemLog("Is Error : ");
headerDto.setError(true);
} else {
saveSystemLog("version : " + Header.getVersion(buffer));
saveSystemLog("Command : " + Header.getCommand(buffer));
saveSystemLog("BodyLength : " + Header.getBodyLength(buffer));
headerDto.setVersion(Header.getVersion(buffer));
headerDto.setCommand(Integer.parseInt(Header.getCommand(buffer)));
headerDto.setBodyLength(Integer.parseInt(Header.getBodyLength(buffer)));
}
saveSystemLog("READ HEADER : " + size);
return headerDto;
}
private void write(Selector selector, SelectionKey key) {
System.out.println("write");
} }
private void saveSystemLog(Object obj) { private void saveSystemLog(Object obj) {

View File

@ -0,0 +1,116 @@
package com.munjaon.server.server.service;
import com.munjaon.server.server.dto.ReportUserDto;
import com.munjaon.server.server.queue.ReportUserQueue;
import com.munjaon.server.server.task.ReportQueueTask;
import com.munjaon.server.util.LogUtil;
import lombok.Getter;
import org.json.simple.JSONObject;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ReportQueueServerService extends Service {
private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance();
private QueueThreadService threadService;
private int queueMaxCore;
public ReportQueueServerService(String serviceName) {
super(serviceName);
this.queueMaxCore = Integer.parseInt(getProp("QUEUE_MAX_CORE").trim());
}
@Override
public void checkReady() {
this.IS_READY_YN = true;
}
@Override
public void initResources() {
threadService = new QueueThreadService(queueMaxCore, logger);
}
@Override
public void releaseResources() {
threadService.close();
}
@Override
public void doService() {
while (isRun()) {
try {
doQueueService();
Thread.sleep(100);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void doQueueService() {
List<ReportUserDto> reportUserList = reportUserQueue.getUsers();
if (reportUserList == null || reportUserList.size() == 0) {
return;
}
for (ReportUserDto reportUserDto : reportUserList) {
threadService.execute(new ReportQueueTask(reportUserDto, logger));
}
}
@Override
public JSONObject monitorService() {
return null;
}
private static class QueueThreadService {
@Getter
private final int maxCore;
private final ExecutorService executor;
private final LogUtil logger;
public QueueThreadService(LogUtil logger) {
this(Runtime.getRuntime().availableProcessors(), logger);
}
public QueueThreadService(int maxCore, LogUtil logger) {
this.maxCore = maxCore;
this.executor = Executors.newFixedThreadPool(maxCore);
this.logger = logger;
}
public void execute(ReportQueueTask runnable) {
executor.execute(runnable);
}
private void saveSystemLog(Object obj) {
saveLog(obj, true);
}
private void saveLog(Object obj) {
saveLog(obj, false);
}
private void saveLog(Object obj, boolean isConsoleOutput) {
if (isConsoleOutput) {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{QueueThreadService}} " + obj);
}
if (logger == null) {
return;
}
logger.log(obj);
}
public void close() {
List<Runnable> unfinishedTasks = executor.shutdownNow();
if (!unfinishedTasks.isEmpty()) {
saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size());
}
}
}
}

View File

@ -1,37 +1,58 @@
package com.munjaon.server.server.service; package com.munjaon.server.server.service;
import com.munjaon.server.queue.pool.ReportQueue;
import com.munjaon.server.server.dto.ConnectUserDto; import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.dto.ReportDto;
import com.munjaon.server.server.dto.ReportUserDto;
import com.munjaon.server.server.packet.Header;
import com.munjaon.server.server.packet.LinkCheck;
import com.munjaon.server.server.packet.Packet;
import com.munjaon.server.server.packet.Report;
import com.munjaon.server.server.queue.ReportUserQueue;
import com.munjaon.server.server.task.ReportReadTask;
import com.munjaon.server.util.LogUtil; import com.munjaon.server.util.LogUtil;
import lombok.Getter; import lombok.Getter;
import org.json.simple.JSONObject; import org.json.simple.JSONObject;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress; import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel; import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ReportServerService extends Service { public class ReportServerService extends Service {
private final InetSocketAddress listenAddress; private final InetSocketAddress listenAddress;
private final Map<String, ConnectUserDto> connectUserMap = new ConcurrentHashMap<>(); private ReadThreadService threadService;
private ReporterThreadService threadService; private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance();
private Selector selector; private Selector selector;
private int readMaxCore;
private int queueMaxCore;
// private long CHECKED_INTEREST_TIME = System.currentTimeMillis();
// private final long LIMIT_CYCLE_TIME = 3000;
// private final ByteBuffer reportBuffer = ByteBuffer.allocateDirect(Header.HEADER_LENGTH + Report.REPORT_BODY_LENGTH);
public ReportServerService(String serviceName, int port) { public ReportServerService(String serviceName, int port) {
super(serviceName); super(serviceName);
this.readMaxCore = Integer.parseInt(getProp("READ_MAX_CORE").trim());
this.queueMaxCore = Integer.parseInt(getProp("QUEUE_MAX_CORE").trim());
this.listenAddress = new InetSocketAddress(port); this.listenAddress = new InetSocketAddress(port);
} }
@Override @Override
public void checkReady() { public void checkReady() {
this.IS_READY_YN = true; this.IS_READY_YN = true;
@ -41,7 +62,7 @@ public class ReportServerService extends Service {
public void initResources() { public void initResources() {
try { try {
initReportChannel(); initReportChannel();
threadService = new ReporterThreadService(8, logger); threadService = new ReadThreadService(8, logger);
} catch (IOException e) { } catch (IOException e) {
saveSystemLog(e); saveSystemLog(e);
throw new RuntimeException(e); throw new RuntimeException(e);
@ -78,6 +99,67 @@ public class ReportServerService extends Service {
@Override @Override
public void doService() { public void doService() {
while (isRun()) {
try {
execInterest();
checkInterest();
} catch (Exception e) {
saveSystemLog(e.toString());
}
}
}
private void checkInterest() throws IOException, InterruptedException {
// if (System.currentTimeMillis() - CHECKED_INTEREST_TIME < LIMIT_CYCLE_TIME) {
// return;
// }
// /* 체크시간 업데이트 */
// CHECKED_INTEREST_TIME = System.currentTimeMillis();
Iterator<SelectionKey> keys = selector.keys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
if (key.isValid()) {
ReportUserDto reportUserDto = (ReportUserDto) key.attachment();
if (reportUserDto == null) {
continue;
}
SocketChannel channel = (SocketChannel) key.channel(); // 채널을 가져온다.
if (reportUserDto.isAlive() == 1) {
if (reportUserDto.getUserId() != null) {
reportUserQueue.removeUser(reportUserDto.getUserId());
}
Socket socket = channel.socket(); // 소켓 취득
channel.close(); // 소켓 채널 닫기
socket.close(); // 소켓 닫기
key.attach(null); // 닫기
key.cancel();
} else if (reportUserDto.isAlive() == 2) {
channel.write(LinkCheck.makeLinkCheckBuffer());
} else {
if (reportUserDto.isLogin()) {
ReportQueue reportQueue = reportUserDto.getReportQueue();
try {
ReportDto reportDto = reportQueue.popReportFromQueue();
if (reportDto == null) {
saveSystemLog("reportQueue.popReportFromQueue() : null");
continue;
}
saveSystemLog("reportQueue.popReportFromQueue() : " + reportDto.toString());
ByteBuffer reportBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + Report.REPORT_BODY_LENGTH);
Packet.setDefaultByte(reportBuffer);
Header.putHeader(reportBuffer, Header.COMMAND_REPORT, Report.REPORT_BODY_LENGTH);
Report.putReport(reportBuffer, reportDto);
channel.write(reportBuffer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
} else {
expireConnectUser(key);
}
}
} }
private void execInterest() throws IOException { private void execInterest() throws IOException {
@ -85,19 +167,39 @@ public class ReportServerService extends Service {
return ; return ;
} }
Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
List<Future<ReportUserDto>> list = new ArrayList<>();
while (keys.hasNext()) { while (keys.hasNext()) {
SelectionKey key = keys.next(); SelectionKey key = keys.next();
/* 키 셋에서 제거. */
keys.remove();
if (key.isValid()) { if (key.isValid()) {
if (key.isAcceptable()) { // 접속일 경우.. if (key.isAcceptable()) { // 접속일 경우..
saveSystemLog("isAcceptable"); saveSystemLog("isAcceptable");
accept(selector, key); accept(selector, key);
} else if (key.isReadable()) { // 수신일 경우.. } else if (key.isReadable()) { // 수신일 경우..
saveSystemLog("isReadable"); saveSystemLog("isReadable");
Future<ReportUserDto> future = threadService.submit(new ReportReadTask(selector, key, logger));
list.add(future);
// threadService.submit(selector, key, 2); // threadService.submit(selector, key, 2);
} }
} else {
expireConnectUser(key);
}
}
for (Future<ReportUserDto> future : list) {
ReportUserDto reportUserDto = null;
try {
reportUserDto = future.get();
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
if (reportUserDto == null) {
saveSystemLog("Future : " + future);
} else {
saveSystemLog("Future : " + reportUserDto.toString());
} }
/* 키 셋에서 제거. */
keys.remove();
} }
} }
@ -113,42 +215,12 @@ public class ReportServerService extends Service {
SocketAddress remoteAddr = socket.getRemoteSocketAddress(); SocketAddress remoteAddr = socket.getRemoteSocketAddress();
saveSystemLog("Connected to: " + remoteAddr); saveSystemLog("Connected to: " + remoteAddr);
// Socket 채널을 channel에 수신 등록한다 // Socket 채널을 channel에 수신 등록한다
channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build()); channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).queuePath(System.getProperty("ROOTPATH") + File.separator + getProp("QUEUE_PATH")).build());
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
// private void read(Selector selector, SelectionKey key) {
// try {
// saveSystemLog("read : " + Thread.currentThread().getName());
// // 채널을 가져온다.
// SocketChannel channel = (SocketChannel) key.channel();
// ConnectUserDto userDto = (ConnectUserDto) key.attachment();
//
// int size = -1;
// ByteBuffer buffer = ByteBuffer.allocate(256);
// try {
// size = channel.read(buffer);
// } catch (IOException e) {}
// if (size < 0) {
// expireConnectUser(key);
// } else if (size > 0) {
// String command = Header.getCommand(buffer);
// saveSystemLog("command : " + command);
// switch (Integer.parseInt(command)) {
// case 1 : recvBind(channel, buffer, userDto); break;
// case 6 : recvDeliver(channel, buffer, userDto); break;
// case 8 : recvLinkCheck(key); break;
// default: expireConnectUser(key); break;
// }
// }
// } catch (Exception e) {
// e.printStackTrace();
// }
// System.out.println("read");
// }
private void expireConnectUser(SelectionKey key) { private void expireConnectUser(SelectionKey key) {
if (key == null || !key.isValid()) { if (key == null || !key.isValid()) {
return; return;
@ -157,7 +229,8 @@ public class ReportServerService extends Service {
SocketChannel channel = (SocketChannel) key.channel(); SocketChannel channel = (SocketChannel) key.channel();
ConnectUserDto userDto = (ConnectUserDto) key.attachment(); ConnectUserDto userDto = (ConnectUserDto) key.attachment();
if (userDto != null && userDto.getUserId() != null) { if (userDto != null && userDto.getUserId() != null) {
connectUserMap.remove(userDto.getUserId()); reportUserQueue.removeUser(userDto.getUserId());
// connectUserMap.remove(userDto.getUserId());
key.attach(null); key.attach(null);
} }
// 소켓 채널 닫기 // 소켓 채널 닫기
@ -174,27 +247,26 @@ public class ReportServerService extends Service {
return null; return null;
} }
private static class ReporterThreadService { private static class ReadThreadService {
@Getter @Getter
private final int maxCore; private final int maxCore;
private final ExecutorService executor; private final ExecutorService executor;
private final Map<String, ConnectUserDto> connectUserMap = new ConcurrentHashMap<>(); private ReportUserQueue reportUserQueue = ReportUserQueue.getInstance();
private final LogUtil logger; private final LogUtil logger;
public ReporterThreadService(LogUtil logger) { public ReadThreadService(LogUtil logger) {
this(Runtime.getRuntime().availableProcessors(), logger); this(Runtime.getRuntime().availableProcessors(), logger);
} }
public ReporterThreadService(int maxCore, LogUtil logger) { public ReadThreadService(int maxCore, LogUtil logger) {
this.maxCore = maxCore; this.maxCore = maxCore;
this.executor = Executors.newFixedThreadPool(maxCore); this.executor = Executors.newFixedThreadPool(maxCore);
this.logger = logger; this.logger = logger;
} }
private void write(Selector selector, SelectionKey key) { public Future<ReportUserDto> submit(ReportReadTask reportReadTask) {
System.out.println("write"); return executor.submit(reportReadTask);
} }
private void saveSystemLog(Object obj) { private void saveSystemLog(Object obj) {
saveLog(obj, true); saveLog(obj, true);
} }
@ -217,7 +289,6 @@ public class ReportServerService extends Service {
public void close() { public void close() {
List<Runnable> unfinishedTasks = executor.shutdownNow(); List<Runnable> unfinishedTasks = executor.shutdownNow();
connectUserMap.clear();
if (!unfinishedTasks.isEmpty()) { if (!unfinishedTasks.isEmpty()) {
saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size()); saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size());
} }

View File

@ -0,0 +1,269 @@
package com.munjaon.server.server.task;
import com.munjaon.server.cache.dto.MemberDto;
import com.munjaon.server.cache.enums.CacheService;
import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.server.queue.CollectUserQueue;
import com.munjaon.server.util.LogUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Callable;
public class CollectReadTask implements Callable<ConnectUserDto> {
public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]");
public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]";
private Selector selector;
private SelectionKey key;
private CollectUserQueue collectUserQueue = CollectUserQueue.getInstance();
private ConnectUserDto connectUserDto;
private String serviceType;
private final LogUtil logger;
public CollectReadTask(Selector selector, SelectionKey key, String serviceType, LogUtil logger) {
this.selector = selector;
this.key = key;
this.connectUserDto = (ConnectUserDto) key.attachment();
this.serviceType = serviceType;
this.logger = logger;
}
@Override
public ConnectUserDto call() throws Exception {
int size = -1;
try {
SocketChannel channel = (SocketChannel) key.channel();
/* 1. Head 읽기 */
ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH);
try {
size = channel.read(headBuffer);
} catch (IOException e) {}
/* 2. Body 읽기 */
if (size > 0) {
// Packet.printBuffer(headBuffer);
String command = Header.getCommand(headBuffer);
switch (Integer.parseInt(command)) {
case 1 : recvBind(channel, headBuffer); break;
case 3 : recvDeliver(channel, headBuffer); break;
case 7 : recvLinkCheck(channel); break;
default: expireConnectUser(); break;
}
} else {
expireConnectUser();
}
} catch (Exception e) {
size = -1;
e.printStackTrace();
}
/* 읽은 데이터가 없는 경우 command -1 */
if (size <= 0) {
connectUserDto.setCommand(-1);
}
return connectUserDto;
}
private void recvDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
switch (this.serviceType) {
case "SMS":
recvSmsDeliver(channel, headBuffer);
break;
case "LMS":
recvLmsDeliver(channel, headBuffer);
break;
case "MMS":
recvMmsDeliver(channel, headBuffer);
break;
case "KAT":
recvKatDeliver(channel, headBuffer);
break;
case "KFT":
recvKftDeliver(channel, headBuffer);
break;
default:break;
}
}
public BasicMessageDto recvCommonMessage(ByteBuffer deliverBuffer) {
if (deliverBuffer == null) {
return null;
}
BasicMessageDto messageDto = new BasicMessageDto();
messageDto.setRouterSeq("40");
messageDto.setServiceType("4");
messageDto.setUserId(connectUserDto.getUserId());
messageDto.setRemoteIP(connectUserDto.getRemoteIP());
messageDto.setSendStatus("0");
messageDto.setUserMsgID(CommonMessage.getMessageIdForDeliver(deliverBuffer));
messageDto.setUserSender(CommonMessage.getSenderForDeliver(deliverBuffer));
messageDto.setUserReceiver(CommonMessage.getReceiverForDeliver(deliverBuffer));
messageDto.setReserveDt(CommonMessage.getReserveTimeForDeliver(deliverBuffer));
messageDto.setRequestDt(CommonMessage.getRequestTimeForDeliver(deliverBuffer));
messageDto.setUnitCost("10.4");
return messageDto;
}
private void recvSmsDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
try {
ByteBuffer bodyBuffer = ByteBuffer.allocate(SmsMessage.DELIVER_SMS_BODY_LENGTH);
channel.read(bodyBuffer);
ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + SmsMessage.DELIVER_SMS_BODY_LENGTH);
Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer);
// Packet.printBuffer(deliverBuffer);
BasicMessageDto messageDto = recvCommonMessage(deliverBuffer);
messageDto.setUserMessage(SmsMessage.getMessageForDeliver(deliverBuffer));
System.out.println("BasicMessageDto : " + messageDto.toString());
QueueTypeWorker worker = QueueTypeWorker.find("SMS");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void recvLmsDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
try {
ByteBuffer bodyBuffer = ByteBuffer.allocate(LmsMessage.DELIVER_LMS_BODY_LENGTH);
channel.read(bodyBuffer);
ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + LmsMessage.DELIVER_LMS_BODY_LENGTH);
Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer);
BasicMessageDto messageDto = recvCommonMessage(deliverBuffer);
messageDto.setUserSubject(LmsMessage.getSubjectForDeliver(deliverBuffer));
messageDto.setUserMessage(LmsMessage.getMessageForDeliver(deliverBuffer));
QueueTypeWorker worker = QueueTypeWorker.find("LMS");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(LmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void recvMmsDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
}
private void recvKatDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
}
private void recvKftDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
}
private void recvLinkCheck(SocketChannel channel) throws IOException {
ByteBuffer bodyBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_ACK_BODY_LENGTH);
channel.read(bodyBuffer);
// SocketChannel channel = (SocketChannel) key.channel();
channel.write(LinkCheck.makeLinkCheckAckBuffer());
}
private void recvBind(SocketChannel channel, ByteBuffer headBuffer) {
String resultCode = "00";
try {
ByteBuffer bindBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + Bind.BIND_BODY_LENGTH);
ByteBuffer bodyBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH);
channel.read(bodyBuffer);
Packet.mergeBuffers(bindBuffer, headBuffer, bodyBuffer);
String id = Bind.getBindId(bindBuffer);
String pwd = Bind.getBindPwd(bindBuffer);
saveSystemLog("Bind id : " + id);
saveSystemLog("Bind pwd : " + pwd);
if (id == null || pwd == null) {
resultCode = "50";
} else {
if (collectUserQueue.isExist(this.serviceType, id)) {
resultCode = "60";
} else {
MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService();
MemberDto memberDto = null;
if (svc != null) {
memberDto = svc.get(id);
}
if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) {
resultCode = "20";
} else {
connectUserDto.setUserId(id);
connectUserDto.setLogin(true);
connectUserDto.setMemberDto(memberDto);
/* 세션통신 시간 업데이트 */
connectUserDto.updateLastTrafficTime();
}
}
}
} catch (Exception e) {
resultCode = "10";
e.printStackTrace();
}
try {
saveSystemLog("Bind ResultCode : " + resultCode);
channel.write(Bind.makeBindAckBuffer(resultCode));
if ("00".equals(resultCode) == false) {
expireConnectUser();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void expireConnectUser() {
if (key == null || !key.isValid()) {
return;
}
try {
SocketChannel channel = (SocketChannel) key.channel();
if (connectUserDto != null) {
if (connectUserDto.getUserId() != null) {
collectUserQueue.removeUser(connectUserDto.getServiceType(), connectUserDto.getUserId());
}
key.attach(null);
}
// 소켓 채널 닫기
channel.close();
// 닫기
key.cancel();
} catch (IOException e) {
e.printStackTrace();
}
}
private void saveSystemLog(Object obj) {
saveLog(obj, true);
}
private void saveLog(Object obj) {
saveLog(obj, false);
}
private void saveLog(Object obj, boolean isConsoleOutput) {
if (isConsoleOutput) {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{COLLECT_READ_TASK}} " + obj);
}
if (logger == null) {
return;
}
logger.log(obj);
}
}

View File

@ -0,0 +1,85 @@
package com.munjaon.server.server.task;
import com.munjaon.server.cache.enums.CacheService;
import com.munjaon.server.cache.service.ReportService;
import com.munjaon.server.queue.pool.ReportQueue;
import com.munjaon.server.server.dto.ReportDto;
import com.munjaon.server.server.dto.ReportUserDto;
import com.munjaon.server.util.LogUtil;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ReportQueueTask implements Runnable {
public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]");
public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]";
private ReportUserDto reportUserDto;
private final LogUtil logger;
public ReportQueueTask(ReportUserDto reportUserDto, LogUtil logger) {
this.reportUserDto = reportUserDto;
this.logger = logger;
}
@Override
public void run() {
if (reportUserDto == null || reportUserDto.getUserId() == null) {
return;
}
if (reportUserDto.getReportQueue() == null || reportUserDto.getReportQueue().isOpen() == false) {
return;
}
ReportQueue reportQueue = reportUserDto.getReportQueue();
ReportService reportService = (ReportService) CacheService.REPORT_SERVICE.getService();
List<ReportDto> list = reportService.getReportListForUser(reportUserDto.getUserId());
if (list == null || list.isEmpty()) {
return;
}
StringBuilder builder = new StringBuilder();
for (ReportDto dto : list) {
try {
if (builder.isEmpty()) {
builder.append(dto.getMsgId());
} else {
builder.append(",").append(dto.getMsgId());
}
saveSystemLog("reportDto : " + dto.toString());
reportQueue.pushReportToQueue(dto);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
Map<String, String> reqMap = new HashMap<>();
reqMap.put("userId", reportUserDto.getUserId());
reqMap.put("msgId", builder.toString());
reportService.deleteBulkReport(reqMap);
}
private void saveSystemLog(Object obj) {
saveLog(obj, true);
}
private void saveLog(Object obj) {
saveLog(obj, false);
}
private void saveLog(Object obj, boolean isConsoleOutput) {
if (isConsoleOutput) {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{REPORT_QUEUE_TASK}} " + obj);
}
if (logger == null) {
return;
}
logger.log(obj);
}
}

View File

@ -0,0 +1,199 @@
package com.munjaon.server.server.task;
import com.munjaon.server.cache.dto.MemberDto;
import com.munjaon.server.cache.enums.CacheService;
import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.pool.ReportQueue;
import com.munjaon.server.server.dto.ReportUserDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.server.queue.ReportUserQueue;
import com.munjaon.server.util.LogUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.Callable;
public class ReportReadTask implements Callable<ReportUserDto> {
public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]");
public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]";
private Selector selector;
private SelectionKey key;
private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance();
private ReportUserDto reportUserDto;
private final LogUtil logger;
public ReportReadTask(Selector selector, SelectionKey key, LogUtil logger) {
this.selector = selector;
this.key = key;
this.reportUserDto = (ReportUserDto) key.attachment();
this.logger = logger;
}
@Override
public ReportUserDto call() throws Exception {
int size = -1;
try {
SocketChannel channel = (SocketChannel) key.channel();
/* 1. Head 읽기 */
ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH);
try {
size = channel.read(headBuffer);
} catch (IOException e) {}
/* 2. Body 읽기 */
if (size > 0) {
String command = Header.getCommand(headBuffer);
switch (Integer.parseInt(command)) {
case 1 : recvBind(channel, headBuffer); break;
case 6 : recvReport(channel, headBuffer); break;
case 8 : recvLinkCheck(channel, headBuffer); break;
default: expireConnectUser(); break;
}
} else {
expireConnectUser();
}
} catch (Exception e) {
size = -1;
e.printStackTrace();
}
/* 읽은 데이터가 없는 경우 command -1 */
if (size <= 0) {
reportUserDto.setCommand(-1);
}
return reportUserDto;
}
private void recvLinkCheck(SocketChannel channel, ByteBuffer headBuffer) {
try {
ByteBuffer bodyBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_ACK_BODY_LENGTH);
int size = channel.read(bodyBuffer);
if (size > 0) {
saveSystemLog("Recv link check");
reportUserDto.updateLastTrafficTime();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void recvReport(SocketChannel channel, ByteBuffer headBuffer) {
try {
ByteBuffer bodyBuffer = ByteBuffer.allocate(Report.REPORT_ACK_BODY_LENGTH);
saveSystemLog("recv report");
int size = channel.read(bodyBuffer);
if (size > 0) {
ReportQueue reportQueue = reportUserDto.getReportQueue();
reportUserDto.updateLastTrafficTime();
if (reportQueue != null) {
reportQueue.addReadCounter();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void recvBind(SocketChannel channel, ByteBuffer headBuffer) {
String resultCode = "00";
try {
ByteBuffer bindBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + Bind.BIND_BODY_LENGTH);
ByteBuffer bodyBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH);
channel.read(bodyBuffer);
Packet.mergeBuffers(bindBuffer, headBuffer, bodyBuffer);
String id = Bind.getBindId(bindBuffer);
String pwd = Bind.getBindPwd(bindBuffer);
saveSystemLog("Bind id : " + id);
saveSystemLog("Bind pwd : " + pwd);
if (id == null || pwd == null) {
resultCode = "50";
} else {
if (reportUserQueue.isExist(id)) {
resultCode = "60";
} else {
MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService();
MemberDto memberDto = null;
if (svc != null) {
memberDto = svc.get(id);
}
if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) {
resultCode = "20";
} else {
reportUserDto.setUserId(id);
reportUserDto.setLogin(true);
reportUserDto.setMemberDto(memberDto);
/* 리포트 큐 생성 */
ReportQueue reportQueue = new ReportQueue(reportUserDto.getQueuePath(), reportUserDto.getUserId());
reportUserDto.setReportQueue(reportQueue);
/* 사용자 Pool에 저장 */
reportUserQueue.putUser(reportUserDto);
/* 세션통신 시간 업데이트 */
reportUserDto.updateLastTrafficTime();
}
}
}
} catch (Exception e) {
resultCode = "10";
e.printStackTrace();
}
try {
saveSystemLog("Bind ResultCode : " + resultCode);
channel.write(Bind.makeBindAckBuffer(resultCode));
if ("00".equals(resultCode) == false) {
expireConnectUser();
}
} catch (IOException e) {
e.printStackTrace();
}
}
private void expireConnectUser() {
if (key == null || !key.isValid()) {
return;
}
try {
SocketChannel channel = (SocketChannel) key.channel();
if (reportUserDto != null) {
if (reportUserDto.getUserId() != null) {
reportUserQueue.removeUser(reportUserDto.getUserId());
}
key.attach(null);
}
// 소켓 채널 닫기
channel.close();
// 닫기
key.cancel();
} catch (IOException e) {
e.printStackTrace();
}
}
private void saveSystemLog(Object obj) {
saveLog(obj, true);
}
private void saveLog(Object obj) {
saveLog(obj, false);
}
private void saveLog(Object obj, boolean isConsoleOutput) {
if (isConsoleOutput) {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{REPORT_READ_TASK}} " + obj);
}
if (logger == null) {
return;
}
logger.log(obj);
}
}

View File

@ -1,10 +1,9 @@
package com.munjaon.server.util; package com.munjaon.server.util;
import com.munjaon.server.queue.config.BodyCommonConfig; import com.munjaon.server.config.ServiceCode;
import com.munjaon.server.queue.config.MediaBodyConfig; import com.munjaon.server.queue.config.*;
import com.munjaon.server.queue.config.QueueHeaderConfig;
import com.munjaon.server.queue.config.SmsBodyConfig;
import com.munjaon.server.queue.dto.BasicMessageDto; import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.server.dto.ReportDto;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -114,11 +113,11 @@ public final class MessageUtil {
} }
public static int calcWritePosition(int pushCounter, int dataByteLength) { public static int calcWritePosition(int pushCounter, int dataByteLength) {
return pushCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + pushCounter + dataByteLength); return pushCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + pushCounter * dataByteLength);
} }
public static int calcReadPosition(int popCounter, int dataByteLength) { public static int calcReadPosition(int popCounter, int dataByteLength) {
return popCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + popCounter + dataByteLength); return popCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + popCounter * dataByteLength);
} }
public static void setBytesForCommonMessage(ByteBuffer buffer, BasicMessageDto messageDto) { public static void setBytesForCommonMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
@ -258,6 +257,23 @@ public final class MessageUtil {
buffer.put(messageDto.getUserMessage().getBytes()); buffer.put(messageDto.getUserMessage().getBytes());
} }
public static void getBytesForMediaMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
byte[] destArray = null;
if (buffer == null || messageDto == null) {
return;
}
/* 14. 제목 */
buffer.position(MediaBodyConfig.SUBJECT_BYTE_POSITION);
destArray = new byte[MediaBodyConfig.SUBJECT_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserSubject(new String(destArray));
/* 15. 메시지 */
buffer.position(MediaBodyConfig.MEDIA_MSG_BYTE_POSITION);
destArray = new byte[MediaBodyConfig.MEDIA_MSG_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserMessage(new String(destArray));
}
public static void setBytesForMmsMessage(ByteBuffer buffer, BasicMessageDto messageDto) { public static void setBytesForMmsMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
/* 16. 파일카운트 */ /* 16. 파일카운트 */
buffer.position(MediaBodyConfig.FILECNT_BYTE_POSITION); buffer.position(MediaBodyConfig.FILECNT_BYTE_POSITION);
@ -278,4 +294,137 @@ public final class MessageUtil {
buffer.put(messageDto.getUserFileName03().getBytes()); buffer.put(messageDto.getUserFileName03().getBytes());
} }
} }
public static void getBytesForMmsMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
byte[] destArray = null;
if (buffer == null || messageDto == null) {
return;
}
/* 16. 파일카운트 */
buffer.position(MediaBodyConfig.FILECNT_BYTE_POSITION);
destArray = new byte[MediaBodyConfig.FILECNT_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserFileCnt(Integer.parseInt(new String(destArray)));
/* 17. 파일명 #1 */
buffer.position(MediaBodyConfig.FILENAME_ONE_BYTE_POSITION);
destArray = new byte[MediaBodyConfig.FILENAME_ONE_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserFileName01(new String(destArray));
/* 18. 파일명 #2 */
buffer.position(MediaBodyConfig.FILENAME_TWO_BYTE_POSITION);
destArray = new byte[MediaBodyConfig.FILENAME_TWO_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserFileName02(new String(destArray));
/* 19. 파일명 #3 */
buffer.position(MediaBodyConfig.FILENAME_THREE_BYTE_POSITION);
destArray = new byte[MediaBodyConfig.FILENAME_THREE_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserFileName03(new String(destArray));
}
public static int isValidateMessageForReport(ReportDto reportDto) {
if (reportDto == null) {
return ServiceCode.MSG_ERROR_REPORT.getCode();
}
/* MSG_ID (Length : 20 / Position : 0) */
if (reportDto.getAgentMsgId() == null || reportDto.getAgentMsgId().trim().isEmpty()) {
return ServiceCode.MSG_ERROR_REPORT_MSG_ID.getCode();
}
if (reportDto.getAgentMsgId().trim().length() > ReportConfig.MSG_ID_LENGTH) {
return ServiceCode.MSG_ERROR_REPORT_MSG_ID.getCode();
}
/* AGENT_CODE (Length : 2 / Position : 20) */
if (reportDto.getAgentCode() == null || reportDto.getAgentCode().trim().isEmpty()) {
return ServiceCode.MSG_ERROR_REPORT_AGENT_CODE.getCode();
}
if (reportDto.getAgentCode().trim().length() > ReportConfig.AGENT_CODE_LENGTH) {
return ServiceCode.MSG_ERROR_REPORT_AGENT_CODE.getCode();
}
/* SEND_TIME (Length : 14 / Position : 22) */
if (reportDto.getRsltDate() == null || reportDto.getRsltDate().trim().isEmpty()) {
return ServiceCode.MSG_ERROR_REPORT_SEND_TIME.getCode();
}
if (reportDto.getRsltDate().trim().length() > ReportConfig.SEND_TIME_LENGTH) {
return ServiceCode.MSG_ERROR_REPORT_SEND_TIME.getCode();
}
/* TELECOM (Length : 3 / Position : 36) */
if (reportDto.getRsltNet() == null || reportDto.getRsltNet().trim().isEmpty()) {
return ServiceCode.MSG_ERROR_REPORT_TELECOM.getCode();
}
if (reportDto.getRsltNet().trim().length() > ReportConfig.TELECOM_LENGTH) {
return ServiceCode.MSG_ERROR_REPORT_TELECOM.getCode();
}
/* RESULT (Length : 5 / Position : 39) */
if (reportDto.getRsltCode() == null || reportDto.getRsltCode().trim().isEmpty()) {
return ServiceCode.MSG_ERROR_REPORT_RESULT.getCode();
}
if (reportDto.getRsltCode().trim().length() > ReportConfig.RESULT_LENGTH) {
return ServiceCode.MSG_ERROR_REPORT_RESULT.getCode();
}
return ServiceCode.OK.getCode();
}
public static int calcWritePositionForReport(int pushCounter, int dataByteLength) {
return pushCounter < 0 ? ReportConfig.HEAD_LENGTH : (ReportConfig.HEAD_LENGTH + pushCounter * dataByteLength);
}
public static void setBytesForReport(ByteBuffer buffer, ReportDto reportDto) {
if (buffer == null || reportDto == null) {
return;
}
byte[] destArray = null;
/* MSG_ID (Length : 20 / Position : 0) */
buffer.position(ReportConfig.MSG_ID_POSITION);
buffer.put(reportDto.getAgentMsgId().getBytes());
/* AGENT_CODE (Length : 2 / Position : 20) */
buffer.position(ReportConfig.AGENT_CODE_POSITION);
buffer.put(reportDto.getAgentCode().getBytes());
/* SEND_TIME (Length : 14 / Position : 22) */
buffer.position(ReportConfig.SEND_TIME_POSITION);
buffer.put(reportDto.getRsltDate().getBytes());
/* TELECOM (Length : 3 / Position : 36) */
buffer.position(ReportConfig.TELECOM_POSITION);
buffer.put(reportDto.getRsltNet().getBytes());
/* RESULT (Length : 5 / Position : 39) */
buffer.position(ReportConfig.RESULT_POSITION);
buffer.put(reportDto.getRsltCode().getBytes());
}
public static ReportDto getReportFromBuffer(ByteBuffer buffer) {
if (buffer == null) {
return null;
}
byte[] destArray = null;
ReportDto reportDto = new ReportDto();
/* MSG_ID (Length : 20 / Position : 0) */
buffer.position(ReportConfig.MSG_ID_POSITION);
destArray = new byte[ReportConfig.MSG_ID_LENGTH];
buffer.get(destArray);
reportDto.setMsgId(new String(destArray).trim());
/* AGENT_CODE (Length : 2 / Position : 20) */
buffer.position(ReportConfig.AGENT_CODE_POSITION);
destArray = new byte[ReportConfig.AGENT_CODE_LENGTH];
buffer.get(destArray);
reportDto.setAgentCode(new String(destArray).trim());
/* SEND_TIME (Length : 14 / Position : 22) */
buffer.position(ReportConfig.SEND_TIME_POSITION);
destArray = new byte[ReportConfig.SEND_TIME_LENGTH];
buffer.get(destArray);
reportDto.setRsltDate(new String(destArray).trim());
/* TELECOM (Length : 3 / Position : 36) */
buffer.position(ReportConfig.TELECOM_POSITION);
destArray = new byte[ReportConfig.TELECOM_LENGTH];
buffer.get(destArray);
reportDto.setRsltNet(new String(destArray).trim());
/* RESULT (Length : 5 / Position : 39) */
buffer.position(ReportConfig.RESULT_POSITION);
destArray = new byte[ReportConfig.RESULT_LENGTH];
buffer.get(destArray);
reportDto.setRsltCode(new String(destArray).trim());
return reportDto;
}
} }

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.munjaon.server.queue.mapper.LmsMapper">
<insert id="insert">
INSERT INTO MJ_MSG_DATA (
MSG_ID
, MSG_GROUP_ID
, USER_ID
, AGENT_MSG_ID
, AGENT_CODE
, CUR_STATE
, REQ_DATE
, CALL_TO
, CALL_FROM
, SUBJECT
, SMS_TXT
, MSG_TYPE
, CONT_SEQ, FILE_CNT
) VALUES (#{id}, #{msgGroupID}, #{userId}, #{userMsgID}, '04', 0, NOW(), #{userReceiver}, #{userSender}, #{userSubject}, #{userMessage}, '6', null, '0' )
</insert>
</mapper>

View File

@ -9,14 +9,44 @@
, AGENT_MSG_ID , AGENT_MSG_ID
, AGENT_CODE , AGENT_CODE
, MSG_TYPE , MSG_TYPE
, RSLT_DATE , CASE WHEN RSLT_DATE IS NULL THEN DATE_FORMAT(NOW(), '%Y%m%d%H%i%S') ELSE DATE_FORMAT(RSLT_DATE, '%Y%m%d%H%i%S') END AS RSLT_DATE
, RSLT_CODE , RSLT_CODE
, RSLT_NET , RSLT_NET
FROM mj_msg_report FROM mj_msg_report
WHERE USER_ID = #{userId} WHERE USER_ID = #{userId}
LIMIT 1 LIMIT 1
</select> </select>
<select id="getReportListForUser" resultType="ReportDto">
/* ReportMapper.getReportListForUser */
SELECT
MSG_ID
, USER_ID
, AGENT_MSG_ID
, AGENT_CODE
, MSG_TYPE
, CASE WHEN RSLT_DATE IS NULL THEN DATE_FORMAT(NOW(), '%Y%m%d%H%i%S') ELSE DATE_FORMAT(RSLT_DATE, '%Y%m%d%H%i%S') END AS RSLT_DATE
, RSLT_CODE
, RSLT_NET
FROM mj_msg_report
WHERE USER_ID = #{userId}
LIMIT 50
</select>
<delete id="deleteReport"> <delete id="deleteReport">
DELETE FROM mj_msg_report WHERE MSG_ID = #{msgId} DELETE FROM mj_msg_report WHERE MSG_ID = #{msgId}
</delete> </delete>
<delete id="deleteBulkReport">
DELETE SRC FROM mj_msg_report SRC
INNER JOIN (
with recursive
T as ( select #{msgId} as items),
N as ( select 1 as n union select n + 1 from N, T
where n <![CDATA[<=]]> length(items) - length(replace(items, ',', '')))
select distinct substring_index(substring_index(items, ',', n), ',', -1)
MSG_ID from N, T
) DEST ON SRC.MSG_ID = DEST.MSG_ID
WHERE SRC.USER_ID = #{userId}
</delete>
</mapper> </mapper>