diff --git a/src/main/java/com/munjaon/server/cache/enums/CacheService.java b/src/main/java/com/munjaon/server/cache/enums/CacheService.java new file mode 100644 index 0000000..635d7a7 --- /dev/null +++ b/src/main/java/com/munjaon/server/cache/enums/CacheService.java @@ -0,0 +1,15 @@ +package com.munjaon.server.cache.enums; + +import lombok.Getter; + +@Getter +public enum CacheService { + LOGIN_SERVICE, + REPORT_SERVICE; + + private Object service; + + public void setService(Object service) { + this.service = service; + } +} diff --git a/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java b/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java new file mode 100644 index 0000000..ab28d58 --- /dev/null +++ b/src/main/java/com/munjaon/server/cache/mapper/ReportMapper.java @@ -0,0 +1,10 @@ +package com.munjaon.server.cache.mapper; + +import com.munjaon.server.server.dto.ReportDto; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface ReportMapper { + ReportDto getReportForUser(String userId); + int deleteReport(String msgId); +} diff --git a/src/main/java/com/munjaon/server/cache/mapper/SerialNoMapper.java b/src/main/java/com/munjaon/server/cache/mapper/SerialNoMapper.java new file mode 100644 index 0000000..fa814c9 --- /dev/null +++ b/src/main/java/com/munjaon/server/cache/mapper/SerialNoMapper.java @@ -0,0 +1,11 @@ +package com.munjaon.server.cache.mapper; + +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface SerialNoMapper { + String getSerialNoForLock(); + int insert(); + int update(); + String getSerialNo(); +} diff --git a/src/main/java/com/munjaon/server/cache/service/CacheServiceInjector.java b/src/main/java/com/munjaon/server/cache/service/CacheServiceInjector.java new file mode 100644 index 0000000..56c0008 --- /dev/null +++ b/src/main/java/com/munjaon/server/cache/service/CacheServiceInjector.java @@ -0,0 +1,29 @@ +package com.munjaon.server.cache.service; + +import com.munjaon.server.cache.enums.CacheService; +import jakarta.annotation.PostConstruct; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.EnumSet; + +@Component +public class CacheServiceInjector { + @Autowired + private MemberService memberService; + @Autowired + private ReportService reportService; + + @PostConstruct + public void postConstruct() { + for (CacheService svc : EnumSet.allOf(CacheService.class)) { + switch (svc) { + case LOGIN_SERVICE : svc.setService(memberService); + break; + case REPORT_SERVICE: svc.setService(reportService); + break; + default:break; + } + } + } +} diff --git a/src/main/java/com/munjaon/server/cache/service/ReportService.java b/src/main/java/com/munjaon/server/cache/service/ReportService.java new file mode 100644 index 0000000..a5bcd6e --- /dev/null +++ b/src/main/java/com/munjaon/server/cache/service/ReportService.java @@ -0,0 +1,22 @@ +package com.munjaon.server.cache.service; + +import com.munjaon.server.cache.mapper.ReportMapper; +import com.munjaon.server.server.dto.ReportDto; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +@Slf4j +@Service +@RequiredArgsConstructor +public class ReportService { + private final ReportMapper reportMapper; + + public ReportDto getReportForUser(String userId) { + return reportMapper.getReportForUser(userId); + } + + public int deleteReport(String msgId) { + return reportMapper.deleteReport(msgId); + } +} diff --git a/src/main/java/com/munjaon/server/cache/service/SerialNoService.java b/src/main/java/com/munjaon/server/cache/service/SerialNoService.java new file mode 100644 index 0000000..f42fd64 --- /dev/null +++ b/src/main/java/com/munjaon/server/cache/service/SerialNoService.java @@ -0,0 +1,25 @@ +package com.munjaon.server.cache.service; + +import com.munjaon.server.cache.mapper.SerialNoMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Service +@RequiredArgsConstructor +public class SerialNoService { + private final SerialNoMapper serialNoMapper; + + @Transactional + public String getSerialNo() { + if (serialNoMapper.getSerialNoForLock() == null) { + serialNoMapper.insert(); + } else { + serialNoMapper.update(); + } + + return serialNoMapper.getSerialNo(); + } +} diff --git a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java index f85132a..f797ecd 100644 --- a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java +++ b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java @@ -3,6 +3,7 @@ package com.munjaon.server.config; import com.munjaon.server.queue.dto.QueueInfo; import com.munjaon.server.queue.pool.SmsReadQueue; import com.munjaon.server.queue.pool.SmsWriteQueue; +import com.munjaon.server.server.service.CollectServerService; import com.munjaon.server.server.service.PropertyLoader; import com.munjaon.server.server.service.QueueServerService; import com.munjaon.server.util.ServiceUtil; @@ -65,4 +66,19 @@ public class RunnerConfiguration { } return args -> System.out.println("Runner Bean #2"); } + + @Bean + @Order(3) + public CommandLineRunner getRunnerBeanForSmsCollector() { + try { + String serviceName = "SMS_COLLECTOR"; + String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); + int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); + CollectServerService collectServerService = new CollectServerService(serviceName, serviceType, port); + collectServerService.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + return args -> System.out.println("Runner Bean #2"); + } } diff --git a/src/main/java/com/munjaon/server/queue/dto/BasicMessageDto.java b/src/main/java/com/munjaon/server/queue/dto/BasicMessageDto.java index 8c7a3e5..d1b1978 100644 --- a/src/main/java/com/munjaon/server/queue/dto/BasicMessageDto.java +++ b/src/main/java/com/munjaon/server/queue/dto/BasicMessageDto.java @@ -8,6 +8,9 @@ import lombok.ToString; @Setter @ToString public class BasicMessageDto { + /* 임시로 사용하는 변수 */ + protected String id; + protected String userId = ""; // 사용자 아이디 protected final String feeType = "A"; // 요금제(선불 : P / 후불 : A) protected String unitCost = ""; // 단가 diff --git a/src/main/java/com/munjaon/server/queue/enums/QueueTypeWorker.java b/src/main/java/com/munjaon/server/queue/enums/QueueTypeWorker.java index 50ac738..a93a13f 100644 --- a/src/main/java/com/munjaon/server/queue/enums/QueueTypeWorker.java +++ b/src/main/java/com/munjaon/server/queue/enums/QueueTypeWorker.java @@ -10,6 +10,12 @@ import java.util.EnumSet; @Getter public enum QueueTypeWorker { MSG_TYPE_SMS("SMS") { + @Override + public int getQueueSize() { + SmsQueueService smsQueueService = (SmsQueueService) QueueService.SMS_QUEUE_SERVICE.getService(); + return smsQueueService.getQueueSize(); + } + @Override public boolean isExistQueue(String name) { SmsQueueService smsQueueService = (SmsQueueService) QueueService.SMS_QUEUE_SERVICE.getService(); @@ -65,6 +71,12 @@ public enum QueueTypeWorker { } }, MSG_TYPE_LMS("LMS") { + @Override + public int getQueueSize() { + LmsQueueService lmsQueueService = (LmsQueueService) QueueService.LMS_QUEUE_SERVICE.getService(); + return lmsQueueService.getQueueSize(); + } + @Override public boolean isExistQueue(String name) { LmsQueueService lmsQueueService = (LmsQueueService) QueueService.LMS_QUEUE_SERVICE.getService(); @@ -120,6 +132,12 @@ public enum QueueTypeWorker { } }, MSG_TYPE_MMS("MMS") { + @Override + public int getQueueSize() { + MmsQueueService mmsQueueService = (MmsQueueService) QueueService.MMS_QUEUE_SERVICE.getService(); + return mmsQueueService.getQueueSize(); + } + @Override public boolean isExistQueue(String name) { MmsQueueService mmsQueueService = (MmsQueueService) QueueService.MMS_QUEUE_SERVICE.getService(); @@ -175,6 +193,12 @@ public enum QueueTypeWorker { } }, MSG_TYPE_KAT("KAT") { + @Override + public int getQueueSize() { + KakaoAlarmQueueService kakaoAlarmQueueService = (KakaoAlarmQueueService) QueueService.KAT_QUEUE_SERVICE.getService(); + return kakaoAlarmQueueService.getQueueSize(); + } + @Override public boolean isExistQueue(String name) { KakaoAlarmQueueService kakaoAlarmQueueService = (KakaoAlarmQueueService) QueueService.KAT_QUEUE_SERVICE.getService(); @@ -230,6 +254,12 @@ public enum QueueTypeWorker { } }, MSG_TYPE_KFT("KFT") { + @Override + public int getQueueSize() { + KakaoFriendQueueService kakaoFriendQueueService = (KakaoFriendQueueService) QueueService.KFT_QUEUE_SERVICE.getService(); + return kakaoFriendQueueService.getQueueSize(); + } + @Override public boolean isExistQueue(String name) { KakaoFriendQueueService kakaoFriendQueueService = (KakaoFriendQueueService) QueueService.KFT_QUEUE_SERVICE.getService(); @@ -301,6 +331,7 @@ public enum QueueTypeWorker { return null; } + public abstract int getQueueSize(); public abstract boolean isExistQueue(String name); public abstract void removeQueue(String name); public abstract void addQueue(WriteQueue queue); diff --git a/src/main/java/com/munjaon/server/queue/mapper/SmsMapper.java b/src/main/java/com/munjaon/server/queue/mapper/SmsMapper.java new file mode 100644 index 0000000..7ea05f0 --- /dev/null +++ b/src/main/java/com/munjaon/server/queue/mapper/SmsMapper.java @@ -0,0 +1,9 @@ +package com.munjaon.server.queue.mapper; + +import com.munjaon.server.queue.dto.BasicMessageDto; +import org.apache.ibatis.annotations.Mapper; + +@Mapper +public interface SmsMapper { + int insert(BasicMessageDto messageDto); +} diff --git a/src/main/java/com/munjaon/server/queue/pool/QueuePool.java b/src/main/java/com/munjaon/server/queue/pool/QueuePool.java index ccfa405..1dde300 100644 --- a/src/main/java/com/munjaon/server/queue/pool/QueuePool.java +++ b/src/main/java/com/munjaon/server/queue/pool/QueuePool.java @@ -16,6 +16,11 @@ public abstract class QueuePool { /** File Queue 분배를 위한 인덱서 */ protected int queueIndex = 0; + public int getQueueSize() { + synchronized (lockMonitor) { + return queuePool.size(); + } + } /** Queue 존재하는지 조회 */ public boolean isExistQueue(String name){ synchronized (lockMonitor) { diff --git a/src/main/java/com/munjaon/server/queue/service/KakaoAlarmQueueService.java b/src/main/java/com/munjaon/server/queue/service/KakaoAlarmQueueService.java index 901d830..bbe655f 100644 --- a/src/main/java/com/munjaon/server/queue/service/KakaoAlarmQueueService.java +++ b/src/main/java/com/munjaon/server/queue/service/KakaoAlarmQueueService.java @@ -12,6 +12,12 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class KakaoAlarmQueueService implements QueueAction { private final KakaoAlarmMemoryQueue memoryQueue = KakaoAlarmMemoryQueue.getInstance(); + + @Override + public int getQueueSize() { + return 0; + } + @Override public boolean isExistQueue(String name) { return false; diff --git a/src/main/java/com/munjaon/server/queue/service/KakaoFriendQueueService.java b/src/main/java/com/munjaon/server/queue/service/KakaoFriendQueueService.java index bf89214..ee7bc00 100644 --- a/src/main/java/com/munjaon/server/queue/service/KakaoFriendQueueService.java +++ b/src/main/java/com/munjaon/server/queue/service/KakaoFriendQueueService.java @@ -12,6 +12,12 @@ import org.springframework.stereotype.Service; @RequiredArgsConstructor public class KakaoFriendQueueService implements QueueAction { private final KakaoAlarmMemoryQueue memoryQueue = KakaoAlarmMemoryQueue.getInstance(); + + @Override + public int getQueueSize() { + return 0; + } + @Override public boolean isExistQueue(String name) { return false; diff --git a/src/main/java/com/munjaon/server/queue/service/LmsQueueService.java b/src/main/java/com/munjaon/server/queue/service/LmsQueueService.java index 7753ff3..3a84508 100644 --- a/src/main/java/com/munjaon/server/queue/service/LmsQueueService.java +++ b/src/main/java/com/munjaon/server/queue/service/LmsQueueService.java @@ -15,6 +15,11 @@ public class LmsQueueService implements QueueAction { private final LmsQueuePool queueInstance = LmsQueuePool.getInstance(); private final LmsMemoryQueue memoryQueue = LmsMemoryQueue.getInstance(); + @Override + public int getQueueSize() { + return queueInstance.getQueueSize(); + } + @Override public boolean isExistQueue(String name) { return queueInstance.isExistQueue(name); diff --git a/src/main/java/com/munjaon/server/queue/service/MmsQueueService.java b/src/main/java/com/munjaon/server/queue/service/MmsQueueService.java index c3e5808..b3ea2fb 100644 --- a/src/main/java/com/munjaon/server/queue/service/MmsQueueService.java +++ b/src/main/java/com/munjaon/server/queue/service/MmsQueueService.java @@ -14,6 +14,12 @@ import org.springframework.stereotype.Service; public class MmsQueueService implements QueueAction { private final MmsQueuePool queueInstance = MmsQueuePool.getInstance(); private final MmsMemoryQueue memoryQueue = MmsMemoryQueue.getInstance(); + + @Override + public int getQueueSize() { + return queueInstance.getQueueSize(); + } + @Override public boolean isExistQueue(String name) { return queueInstance.isExistQueue(name); diff --git a/src/main/java/com/munjaon/server/queue/service/QueueAction.java b/src/main/java/com/munjaon/server/queue/service/QueueAction.java index e5d0514..2f33c1a 100644 --- a/src/main/java/com/munjaon/server/queue/service/QueueAction.java +++ b/src/main/java/com/munjaon/server/queue/service/QueueAction.java @@ -4,6 +4,7 @@ import com.munjaon.server.queue.dto.BasicMessageDto; import com.munjaon.server.queue.pool.WriteQueue; public interface QueueAction { + int getQueueSize(); boolean isExistQueue(String name); void removeQueue(String name); void addQueue(WriteQueue queue); diff --git a/src/main/java/com/munjaon/server/queue/service/SmsQueueService.java b/src/main/java/com/munjaon/server/queue/service/SmsQueueService.java index 1a911c3..f0963ac 100644 --- a/src/main/java/com/munjaon/server/queue/service/SmsQueueService.java +++ b/src/main/java/com/munjaon/server/queue/service/SmsQueueService.java @@ -1,6 +1,8 @@ package com.munjaon.server.queue.service; +import com.munjaon.server.cache.service.SerialNoService; import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.mapper.SmsMapper; import com.munjaon.server.queue.pool.SmsMemoryQueue; import com.munjaon.server.queue.pool.SmsQueuePool; import com.munjaon.server.queue.pool.WriteQueue; @@ -12,8 +14,15 @@ import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class SmsQueueService implements QueueAction { + private final SmsMapper smsMapper; private final SmsQueuePool queueInstance = SmsQueuePool.getInstance(); private final SmsMemoryQueue memoryQueue = SmsMemoryQueue.getInstance(); + private final SerialNoService serialNoService; + + @Override + public int getQueueSize() { + return queueInstance.getQueueSize(); + } @Override public boolean isExistQueue(String name) { @@ -46,8 +55,12 @@ public class SmsQueueService implements QueueAction { @Override public int saveMessageToTable(BasicMessageDto data) { + String serialNo = serialNoService.getSerialNo(); + String groupSerialNo = serialNo.replace("MSGID", "MSGGID"); + data.setId(serialNo); + data.setMsgGroupID(groupSerialNo); log.debug("Save message to table : {}", data); - return 0; + return smsMapper.insert(data); } @Override diff --git a/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java b/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java index e239bd5..0a5da10 100644 --- a/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java +++ b/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java @@ -1,5 +1,6 @@ package com.munjaon.server.server.dto; +import com.munjaon.server.cache.dto.MemberDto; import com.munjaon.server.server.config.ServerConfig; import lombok.Builder; import lombok.Getter; @@ -24,6 +25,8 @@ public class ConnectUserDto { /* 요금제(선불 : P / 후불 : A) */ private final String feeType = "A"; + private MemberDto memberDto; + public int isAlive() { if (isLogin) { if (System.currentTimeMillis() - lastTrafficTime > ServerConfig.LIMIT_LINK_CHECK_TIMEOUT) { diff --git a/src/main/java/com/munjaon/server/server/dto/HeaderDto.java b/src/main/java/com/munjaon/server/server/dto/HeaderDto.java new file mode 100644 index 0000000..934914b --- /dev/null +++ b/src/main/java/com/munjaon/server/server/dto/HeaderDto.java @@ -0,0 +1,17 @@ +package com.munjaon.server.server.dto; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@Builder +@ToString +public class HeaderDto { + private boolean isError; + private String version; + private int command; + private int bodyLength; +} diff --git a/src/main/java/com/munjaon/server/server/dto/ReportDto.java b/src/main/java/com/munjaon/server/server/dto/ReportDto.java new file mode 100644 index 0000000..ca3df56 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/dto/ReportDto.java @@ -0,0 +1,19 @@ +package com.munjaon.server.server.dto; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@ToString +public class ReportDto { + private String msgId; + private String userId; + private String agentMsgId; + private String agentCode; + private String msgType; + private String rsltDate; + private String rsltCode; + private String rsltNet; +} diff --git a/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java b/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java new file mode 100644 index 0000000..c4f8538 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/dto/ReportUserDto.java @@ -0,0 +1,43 @@ +package com.munjaon.server.server.dto; + +import com.munjaon.server.cache.dto.MemberDto; +import com.munjaon.server.server.config.ServerConfig; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@Builder +@ToString +public class ReportUserDto { + /* 로그인여부 */ + private boolean isLogin; + /* 마지막 통신 시간 */ + private Long lastTrafficTime; + /* 사용자 ID */ + private String userId; + /* 사용자 접속 IP */ + private String remoteIP; + + private MemberDto memberDto; + + public int isAlive() { + if (isLogin) { + if (System.currentTimeMillis() - lastTrafficTime > ServerConfig.LIMIT_LINK_CHECK_TIMEOUT) { + return 2; + } + } else { + if (System.currentTimeMillis() - lastTrafficTime > ServerConfig.LIMIT_BIND_TIMEOUT) { + return 1; + } + } + + return 0; + } + + public void updateLastTrafficTime() { + this.lastTrafficTime = System.currentTimeMillis(); + } +} diff --git a/src/main/java/com/munjaon/server/server/packet/Bind.java b/src/main/java/com/munjaon/server/server/packet/Bind.java new file mode 100644 index 0000000..ea2d9f1 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/Bind.java @@ -0,0 +1,99 @@ +package com.munjaon.server.server.packet; + +import java.nio.ByteBuffer; + +public final class Bind { + public static final int BIND_BODY_LENGTH = 41; + + public static final int BIND_ID_LENGTH = 20; + public static final int BIND_ID_POSITION = Header.BODY_POSITION + Header.BODY_LENGTH; + public static final int BIND_PWD_LENGTH = 20; + public static final int BIND_PWD_POSITION = BIND_ID_POSITION + BIND_ID_LENGTH; + public static final int BIND_ENCRYPTION_LENGTH = 1; + public static final int BIND_ENCRYPTION_POSITION = BIND_PWD_POSITION + BIND_PWD_LENGTH; + + public static final int BIND_ACK_BODY_LENGTH = 2; + public static final int BIND_ACK_RESULT_CODE_LENGTH = 2; + public static final int BIND_ACK_RESULT_CODE_POSITION = Header.HEADER_LENGTH; + + public static final String ENCRYPTION = "0"; + + public static ByteBuffer makeBindBuffer(String id, String pwd) { + ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + BIND_BODY_LENGTH); + Packet.setDefaultByte(buffer); + Header.putHeader(buffer, Header.COMMAND_BIND, BIND_BODY_LENGTH); + /* ID */ + if (id != null) { + buffer.put(BIND_ID_POSITION, id.getBytes()); + } + /* PWD */ + if (pwd != null) { + buffer.put(BIND_PWD_POSITION, pwd.getBytes()); + } + /* ENCRYPTION */ + buffer.put(BIND_ENCRYPTION_POSITION, ENCRYPTION.getBytes()); + + return buffer; + } + + public static ByteBuffer makeBindAckBuffer(String resultCode) { + ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + BIND_ACK_BODY_LENGTH); + Packet.setDefaultByte(buffer); + Header.putHeader(buffer, Header.COMMAND_BIND_ACK, BIND_ACK_BODY_LENGTH); + /* resultCode */ + if (resultCode != null) { + buffer.put(BIND_ACK_RESULT_CODE_POSITION, resultCode.getBytes()); + } + + return buffer; + } + + public static String getBindId(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(BIND_ID_POSITION); + byte[] destArray = new byte[BIND_ID_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static String getBindPwd(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(BIND_PWD_POSITION); + byte[] destArray = new byte[BIND_PWD_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static String getBindEncryption(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(BIND_ENCRYPTION_POSITION); + byte[] destArray = new byte[BIND_ENCRYPTION_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static String getBindAckResultCode(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(BIND_ACK_RESULT_CODE_POSITION); + byte[] destArray = new byte[BIND_ACK_RESULT_CODE_LENGTH]; + buffer.get(destArray); + +// return new String(destArray); + return Packet.getString(destArray); + } +} diff --git a/src/main/java/com/munjaon/server/server/packet/CommonMessage.java b/src/main/java/com/munjaon/server/server/packet/CommonMessage.java new file mode 100644 index 0000000..236f18c --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/CommonMessage.java @@ -0,0 +1,182 @@ +package com.munjaon.server.server.packet; + +import com.munjaon.server.util.CommonUtil; + +import java.nio.ByteBuffer; + +public final class CommonMessage { + /* DELIVER */ + /* MSG_ID */ + public static final int DELIVER_MESSAGE_ID_LENGTH = 20; + public static final int DELIVER_MESSAGE_ID_POSITION = Header.BODY_POSITION + Header.BODY_LENGTH; + /* SENDER */ + public static final int DELIVER_SENDER_LENGTH = 15; + public static final int DELIVER_SENDER_POSITION = DELIVER_MESSAGE_ID_POSITION + DELIVER_MESSAGE_ID_LENGTH; + /* RECEIVER */ + public static final int DELIVER_RECEIVER_LENGTH = 15; + public static final int DELIVER_RECEIVER_POSITION = DELIVER_SENDER_POSITION + DELIVER_SENDER_LENGTH; + /* RESERVE_TIME */ + public static final int DELIVER_RESERVE_TIME_LENGTH = 14; + public static final int DELIVER_RESERVE_TIME_POSITION = DELIVER_RECEIVER_POSITION + DELIVER_RECEIVER_LENGTH; + /* REQUEST_TIME */ + public static final int DELIVER_REQUEST_TIME_LENGTH = 14; + public static final int DELIVER_REQUEST_TIME_POSITION = DELIVER_RESERVE_TIME_POSITION + DELIVER_RESERVE_TIME_LENGTH; + /* MSG_TYPE */ + public static final int DELIVER_MSG_TYPE_LENGTH = 1; + public static final int DELIVER_MSG_TYPE_POSITION = DELIVER_REQUEST_TIME_POSITION + DELIVER_REQUEST_TIME_LENGTH; + + /* DELIVER_ACK */ + /* MSG_ID */ + public static final int DELIVER_ACK_MESSAGE_ID_LENGTH = 20; + public static final int DELIVER_ACK_MESSAGE_ID_POSITION = Header.BODY_POSITION + Header.BODY_LENGTH; + /* RESULT */ + public static final int DELIVER_ACK_RESULT_LENGTH = 1; + public static final int DELIVER_ACK_RESULT_POSITION = DELIVER_ACK_MESSAGE_ID_POSITION + DELIVER_ACK_MESSAGE_ID_LENGTH; + + public static void putMessageIdForDeliver(ByteBuffer buffer, String messageId) { + if (buffer == null || messageId == null) { + return; + } + buffer.put(DELIVER_MESSAGE_ID_POSITION, messageId.getBytes()); + } + public static String getMessageIdForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_MESSAGE_ID_POSITION); + byte[] destArray = new byte[DELIVER_MESSAGE_ID_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putSenderForDeliver(ByteBuffer buffer, String sender) { + if (buffer == null || sender == null) { + return; + } + sender = CommonUtil.cutString(CommonUtil.doNumber(sender), DELIVER_SENDER_LENGTH); + buffer.put(DELIVER_SENDER_POSITION, sender.getBytes()); + } + public static String getSenderForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_SENDER_POSITION); + byte[] destArray = new byte[DELIVER_SENDER_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putReceiverForDeliver(ByteBuffer buffer, String receiver) { + if (buffer == null || receiver == null) { + return; + } + receiver = CommonUtil.cutString(CommonUtil.doNumber(receiver), DELIVER_RECEIVER_LENGTH); + buffer.put(DELIVER_RECEIVER_POSITION, receiver.getBytes()); + } + public static String getReceiverForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_RECEIVER_POSITION); + byte[] destArray = new byte[DELIVER_RECEIVER_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putReserveTimeForDeliver(ByteBuffer buffer, String reserveTime) { + if (buffer == null || reserveTime == null) { + return; + } + buffer.put(DELIVER_RESERVE_TIME_POSITION, reserveTime.getBytes()); + } + public static String getReserveTimeForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_RESERVE_TIME_POSITION); + byte[] destArray = new byte[DELIVER_RESERVE_TIME_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putRequestTimeForDeliver(ByteBuffer buffer, String requestTime) { + if (buffer == null || requestTime == null) { + return; + } + buffer.put(DELIVER_REQUEST_TIME_POSITION, requestTime.getBytes()); + } + public static String getRequestTimeForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_REQUEST_TIME_POSITION); + byte[] destArray = new byte[DELIVER_REQUEST_TIME_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putMsgTypeForDeliver(ByteBuffer buffer, String msgType) { + if (buffer == null || msgType == null) { + return; + } + buffer.put(DELIVER_MSG_TYPE_POSITION, msgType.getBytes()); + } + public static String getMsgTypeForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_MSG_TYPE_POSITION); + byte[] destArray = new byte[DELIVER_MSG_TYPE_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + + public static void putMessageIdForDeliverAck(ByteBuffer buffer, String messageId) { + if (buffer == null || messageId == null) { + return; + } + buffer.put(DELIVER_ACK_MESSAGE_ID_POSITION, messageId.getBytes()); + } + public static String getMessageIdForDeliverAck(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_ACK_MESSAGE_ID_POSITION); + byte[] destArray = new byte[DELIVER_ACK_MESSAGE_ID_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putResultForDeliverAck(ByteBuffer buffer, String result) { + if (buffer == null || result == null) { + return; + } + buffer.put(DELIVER_ACK_RESULT_POSITION, result.getBytes()); + } + public static String getResultForDeliverAck(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_ACK_RESULT_POSITION); + byte[] destArray = new byte[DELIVER_ACK_RESULT_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } +} diff --git a/src/main/java/com/munjaon/server/server/packet/Header.java b/src/main/java/com/munjaon/server/server/packet/Header.java new file mode 100644 index 0000000..73811ff --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/Header.java @@ -0,0 +1,109 @@ +package com.munjaon.server.server.packet; + +import com.munjaon.server.util.ByteUtil; + +import java.nio.ByteBuffer; + +public final class Header { + public static final int HEADER_LENGTH = 10; + + public static final String VERSION = "ITN10"; + public static final int VERSION_LENGTH = 5; + public static final int VERSION_POSITION = 0; + + public static final int COMMAND_LENGTH = 1; + public static final int COMMAND_POSITION = VERSION_POSITION + VERSION_LENGTH; + + public static final String COMMAND_BIND = "1"; + public static final String COMMAND_BIND_ACK = "2"; + public static final String COMMAND_DELIVER = "3"; + public static final String COMMAND_DELIVER_ACK = "4"; + public static final String COMMAND_REPORT = "5"; + public static final String COMMAND_REPORT_ACK = "6"; + public static final String COMMAND_LINK_CHECK = "7"; + public static final String COMMAND_LINK_CHECK_ACK = "8"; + + public static final int BODY_LENGTH = 4; + public static final int BODY_POSITION = COMMAND_POSITION + COMMAND_LENGTH; + + public static final int BODY_BIND_LENGTH = 41; + public static final int BODY_BIND_ACK_LENGTH = 2; + public static final int BODY_LINK_CHECK_LENGTH = 3; + public static final int BODY_LINK_CHECK_ACK_LENGTH = 3; + public static final int BODY_DELIVER_SMS_LENGTH = 239; + public static final int BODY_DELIVER_SMS_ACK_LENGTH = 21; + public static final int BODY_DELIVER_LMS_LENGTH = 2091; + public static final int BODY_DELIVER_LMS_ACK_LENGTH = 21; + public static final int BODY_DELIVER_MMS_LENGTH = 2091; + public static final int BODY_DELIVER_MMS_ACK_LENGTH = 21; + public static final int BODY_REPORT_LENGTH = 58; + public static final int BODY_REPORT_ACK_LENGTH = 1; + + public static void putVersion(final ByteBuffer buffer) { + if (buffer == null) { + return; + } + buffer.put(VERSION_POSITION, VERSION.getBytes()); + } + public static String getVersion(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(VERSION_POSITION); + byte[] destArray = new byte[VERSION_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); +// return new String(destArray); + } + + public static void putCommand(final ByteBuffer buffer, String command) { + if (buffer == null) { + return; + } + buffer.put(COMMAND_POSITION, command.getBytes()); + } + public static String getCommand(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(COMMAND_POSITION); + byte[] destArray = new byte[COMMAND_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static void putBodyLength(final ByteBuffer buffer, int bodyLength) { + putBodyLength(buffer, Integer.toString(bodyLength)); + } + public static String getBodyLength(final ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(BODY_POSITION); + byte[] destArray = new byte[BODY_LENGTH]; + buffer.get(destArray); + System.out.println(ByteUtil.byteToHex(destArray)); + + return Packet.getString(destArray); + } + + public static void putBodyLength(final ByteBuffer buffer, String bodyLength) { + if (buffer == null) { + return; + } + buffer.put(BODY_POSITION, bodyLength.getBytes()); + } + public static void putHeader(final ByteBuffer buffer, String command, int bodyLength) { + putHeader(buffer, command, Integer.toString(bodyLength)); + } + public static void putHeader(final ByteBuffer buffer, String command, String bodyLength) { + putVersion(buffer); + putCommand(buffer, command); + putBodyLength(buffer, bodyLength); + } +} diff --git a/src/main/java/com/munjaon/server/server/packet/LinkCheck.java b/src/main/java/com/munjaon/server/server/packet/LinkCheck.java new file mode 100644 index 0000000..4c8365b --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/LinkCheck.java @@ -0,0 +1,43 @@ +package com.munjaon.server.server.packet; + +import java.nio.ByteBuffer; + +public final class LinkCheck { + public static final int LINK_CHECK_BODY_LENGTH = 3; + public static final int LINK_CHECK_BODY_POSITION = Header.HEADER_LENGTH; + public static final int LINK_CHECK_ACK_BODY_LENGTH = 3; + public static final int LINK_CHECK_ACK_BODY_POSITION = Header.HEADER_LENGTH; + + public static String LINK_CHECK_VALUE = "100"; + public static String LINK_CHECK_ACK_VALUE = "100"; + + public static ByteBuffer makeLinkCheckBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + LINK_CHECK_BODY_LENGTH); + Packet.setDefaultByte(buffer); + Header.putHeader(buffer, Header.COMMAND_LINK_CHECK, LINK_CHECK_BODY_LENGTH); + buffer.put(LINK_CHECK_BODY_POSITION, LINK_CHECK_VALUE.getBytes()); + + return buffer; + } + + public static ByteBuffer makeLinkCheckAckBuffer() { + ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + LINK_CHECK_ACK_BODY_LENGTH); + Packet.setDefaultByte(buffer); + Header.putHeader(buffer, Header.COMMAND_LINK_CHECK, LINK_CHECK_ACK_BODY_LENGTH); + buffer.put(LINK_CHECK_ACK_BODY_POSITION, LINK_CHECK_ACK_VALUE.getBytes()); + + return buffer; + } +// public static ByteBuffer bufferForSend; +// public static ByteBuffer bufferForAck; +// +// static { +// bufferForSend = ByteBuffer.allocateDirect(Header.HEADER_LENGTH + LINK_CHECK_BODY_LENGTH); +// Header.putHeader(bufferForSend, Header.COMMAND_LINK_CHECK, LINK_CHECK_BODY_LENGTH); +// bufferForSend.put(LINK_CHECK_BODY_POSITION, LINK_CHECK_VALUE.getBytes()); +// +// bufferForAck = ByteBuffer.allocateDirect(Header.HEADER_LENGTH + LINK_CHECK_ACK_BODY_LENGTH); +// Header.putHeader(bufferForAck, Header.COMMAND_LINK_CHECK_ACK, LINK_CHECK_ACK_BODY_LENGTH); +// bufferForSend.put(LINK_CHECK_ACK_BODY_POSITION, LINK_CHECK_ACK_VALUE.getBytes()); +// } +} diff --git a/src/main/java/com/munjaon/server/server/packet/Packet.java b/src/main/java/com/munjaon/server/server/packet/Packet.java new file mode 100644 index 0000000..42a5b5b --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/Packet.java @@ -0,0 +1,57 @@ +package com.munjaon.server.server.packet; + +import java.nio.ByteBuffer; + +public final class Packet { + public static final byte SET_DEFAULT_BYTE = (byte) 0x00; + public static final long LINK_CHECK_CYCLE = 30000L; + public static void setDefaultByte(ByteBuffer buffer) { + if (buffer == null) { + return; + } +// buffer.clear(); + for (int i = 0; i < buffer.capacity(); i++) { + buffer.put(i, SET_DEFAULT_BYTE); + } + } + + public static String getString(byte[] srcArray) { + if (srcArray == null) { + return null; + } + int size = 0; + for (int i = 0, len = srcArray.length; i < len; i++) { + if (srcArray[i] == SET_DEFAULT_BYTE) { + continue; + } + size++; + } + byte[] destArray = null; + if (size > 0) { + destArray = new byte[size]; + int index = 0; + for (int i = 0, len = srcArray.length; i < len; i++) { + if (srcArray[i] == SET_DEFAULT_BYTE) { + continue; + } + destArray[index++] = srcArray[i]; + } + } + + return destArray == null ? null : new String(destArray); + } + + public static void mergeBuffers(ByteBuffer dest, ByteBuffer srcHead, ByteBuffer srcBody) { + if (dest == null || srcHead == null || srcBody == null) { + return; + } + if (dest.capacity() != (srcHead.capacity() + srcBody.capacity())) { + return; + } + byte[] srcHeadArray = srcHead.array(); + byte[] srcBodyArray = srcBody.array(); + + dest.put(0, srcHeadArray); + dest.put(srcHeadArray.length, srcBodyArray); + } +} diff --git a/src/main/java/com/munjaon/server/server/packet/SmsMessage.java b/src/main/java/com/munjaon/server/server/packet/SmsMessage.java new file mode 100644 index 0000000..188e5a8 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/packet/SmsMessage.java @@ -0,0 +1,44 @@ +package com.munjaon.server.server.packet; + +import com.munjaon.server.util.CommonUtil; + +import java.nio.ByteBuffer; + +public final class SmsMessage { + public static final int DELIVER_SMS_BODY_LENGTH = 239; + public static final int DELIVER_SMS_ACK_BODY_LENGTH = 21; + + /* DELIVER */ + /* MESSAGE */ + public static final int DELIVER_MESSAGE_LENGTH = 160; + public static final int DELIVER_MESSAGE_POSITION = CommonMessage.DELIVER_MSG_TYPE_POSITION + CommonMessage.DELIVER_MSG_TYPE_LENGTH; + + public static void putMessageForDeliver(ByteBuffer buffer, String message) { + if (buffer == null || message == null) { + return; + } + message = CommonUtil.cutString(message, DELIVER_MESSAGE_LENGTH); + buffer.put(DELIVER_MESSAGE_POSITION, message.getBytes()); + } + public static String getMessageForDeliver(ByteBuffer buffer) { + if (buffer == null) { + return null; + } + + buffer.position(DELIVER_MESSAGE_POSITION); + byte[] destArray = new byte[DELIVER_MESSAGE_LENGTH]; + buffer.get(destArray); + + return Packet.getString(destArray); + } + + public static ByteBuffer makeDeliverAckBuffer(String msgId, String status) { + ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH + DELIVER_SMS_ACK_BODY_LENGTH); + Packet.setDefaultByte(buffer); + Header.putHeader(buffer, Header.COMMAND_DELIVER_ACK, DELIVER_SMS_ACK_BODY_LENGTH); + buffer.put(CommonMessage.DELIVER_ACK_MESSAGE_ID_POSITION, msgId.getBytes()); + buffer.put(CommonMessage.DELIVER_ACK_RESULT_POSITION, status.getBytes()); + + return buffer; + } +} diff --git a/src/main/java/com/munjaon/server/server/sample/ExecutorServiceTest4.java b/src/main/java/com/munjaon/server/server/sample/ExecutorServiceTest4.java index 33d36e4..149c170 100644 --- a/src/main/java/com/munjaon/server/server/sample/ExecutorServiceTest4.java +++ b/src/main/java/com/munjaon/server/server/sample/ExecutorServiceTest4.java @@ -14,10 +14,10 @@ public class ExecutorServiceTest4 { service.submit("job3"); service.submit("job4"); - for (int i = 0 ; i < 4; i++) { - String result = service.take(); - System.out.println(result); - } +// for (int i = 0 ; i < 4; i++) { +// String result = service.take(); +// System.out.println(result); +// } System.out.println("end"); service.close(); @@ -33,14 +33,14 @@ public class ExecutorServiceTest4 { public void submit(String job) { executor.submit(() -> { - String threadName = Thread.currentThread().getName(); +// String threadName = Thread.currentThread().getName(); System.out.println("finished " + job); - String result = job + ", " + threadName; - try { - queue.put(result); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } +// String result = job + ", " + threadName; +// try { +// queue.put(result); +// } catch (InterruptedException e) { +// Thread.currentThread().interrupt(); +// } }); } diff --git a/src/main/java/com/munjaon/server/server/service/CollectServerService.java b/src/main/java/com/munjaon/server/server/service/CollectServerService.java new file mode 100644 index 0000000..ff5e230 --- /dev/null +++ b/src/main/java/com/munjaon/server/server/service/CollectServerService.java @@ -0,0 +1,403 @@ +package com.munjaon.server.server.service; + +import com.munjaon.server.cache.dto.MemberDto; +import com.munjaon.server.cache.enums.CacheService; +import com.munjaon.server.cache.service.MemberService; +import com.munjaon.server.queue.dto.BasicMessageDto; +import com.munjaon.server.queue.enums.QueueTypeWorker; +import com.munjaon.server.server.dto.ConnectUserDto; +import com.munjaon.server.server.dto.HeaderDto; +import com.munjaon.server.server.packet.*; +import com.munjaon.server.util.LogUtil; +import lombok.Getter; +import org.json.simple.JSONObject; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class CollectServerService extends Service { + private final InetSocketAddress listenAddress; + private CollectorThreadService threadService; + private Selector selector; + private final String serviceType; + + public CollectServerService(String serviceName, String serviceType, int port) { + super(serviceName); + this.listenAddress = new InetSocketAddress(port); + this.serviceType = serviceType; + } + @Override + public void checkReady() { + QueueTypeWorker worker = QueueTypeWorker.find(this.serviceType); + if (worker != null && worker.getQueueSize() > 0) { + this.IS_READY_YN = true; + } else { + this.IS_READY_YN = false; + } + } + + @Override + public void initResources() { + try { + initCollectChannel(); + threadService = new CollectorThreadService(8, this.serviceType, logger); + } catch (IOException e) { + saveSystemLog(e); + throw new RuntimeException(e); + } + } + + private void initCollectChannel() throws IOException { + selector = Selector.open(); + /* 채널 생성 */ + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + /* non-Blocking 설정 */ + serverChannel.configureBlocking(false); + /* 서버 ip, port 설정 */ + serverChannel.socket().bind(listenAddress); + /* 채널에 accept 대기 설정 */ + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + } + + private void closeCollectChannel() throws IOException { + selector.close(); + } + + @Override + public void releaseResources() { + try { + closeCollectChannel(); + threadService.close(); + } catch (IOException e) { + saveSystemLog(e); + throw new RuntimeException(e); + } + } + + @Override + public void doService() { + while (isRun()) { + try { + execInterest(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private void execInterest() throws IOException { + if (selector.select(1000) == 0) { + return ; + } + Iterator keys = selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey key = keys.next(); + if (key.isValid()) { + if (key.isAcceptable()) { // 접속일 경우.. + saveSystemLog("isAcceptable"); + threadService.submit(selector, key, 1); + } else if (key.isReadable()) { // 수신일 경우.. + saveSystemLog("isReadable"); + threadService.submit(selector, key, 2); + } else if (key.isWritable()) { // 발신일 경우.. + saveSystemLog("isWritable"); + threadService.submit(selector, key, 3); + } + } + /* 키 셋에서 제거. */ + keys.remove(); + } + } + + @Override + public JSONObject monitorService() { + return null; + } + + private static class CollectorThreadService { + @Getter + private String serviceType; + @Getter + private final int maxCore; + private final ExecutorService executor; + private final Map connectUserMap = new ConcurrentHashMap<>(); + private final LogUtil logger; + + public CollectorThreadService(String serviceType, LogUtil logger) { + this(Runtime.getRuntime().availableProcessors(), serviceType, logger); + } + + public CollectorThreadService(int maxCore, String serviceType, LogUtil logger) { + this.maxCore = maxCore; + this.executor = Executors.newFixedThreadPool(maxCore); + this.logger = logger; + } + + public void submit(Selector selector, SelectionKey key, int interestOps) { + executor.submit(() -> { + switch (interestOps) { + case 1 : accept(selector, key); break; + case 2 : read(selector, key); break; + case 3 : write(selector, key); break; + default : break; + } + }); + } + + private void accept(Selector selector, SelectionKey key) { + try { + /* 키 채널을 가져온다. */ + ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); + /* accept을 해서 Socket 채널을 가져온다. */ + SocketChannel channel = serverChannel.accept(); + channel.configureBlocking(false); + /* 소켓 취득 */ + Socket socket = channel.socket(); + SocketAddress remoteAddr = socket.getRemoteSocketAddress(); + saveSystemLog("accept : " + Thread.currentThread().getName()); + saveSystemLog("Connected to: " + remoteAddr); + // Socket 채널을 channel에 수신 등록한다 + channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +// private void read(Selector selector, SelectionKey key) { +// try { +// saveSystemLog("read : " + Thread.currentThread().getName()); +// // 키 채널을 가져온다. +// SocketChannel channel = (SocketChannel) key.channel(); +// ConnectUserDto userDto = (ConnectUserDto) key.attachment(); +// +// int size = -1; +// +// ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH); +// ByteBuffer bindBuffer = ByteBuffer.allocate(Bind.BIND_BODY_LENGTH); +// ByteBuffer linkBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_BODY_LENGTH); +// ByteBuffer msgBuffer = ByteBuffer.allocate(SmsMessage.DELIVER_SMS_BODY_LENGTH); +// try { +// size = channel.read(headBuffer); +// String command = Header.getCommand(headBuffer); +// System.out.println("command : " + command); +// if ("1".equals(command)) { +// size = channel.read(bindBuffer); +// } else if ("3".equals(command)) { +// size = channel.read(msgBuffer); +// } else if ("7".equals(command)) { +// size = channel.read(linkBuffer); +// } else { +// size = -1; +// } +// +// System.out.println("size : " + size); +// } catch (IOException e) {} +// if (size < 0) { +// expireConnectUser(key); +// } else if (size > 0) { +//// String command = Header.getCommand(buffer); +//// saveSystemLog("command : " + command); +//// switch (Integer.parseInt(command)) { +//// case 1 : recvBind(channel, buffer, userDto); break; +//// case 3 : recvDeliver(channel, buffer, userDto); break; +//// case 7 : recvLinkCheck(key); break; +//// default: expireConnectUser(key); break; +//// } +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } +// System.out.println("read"); +// } + + private void read(Selector selector, SelectionKey key) { + try { + saveSystemLog("read : " + Thread.currentThread().getName()); + // 키 채널을 가져온다. + SocketChannel channel = (SocketChannel) key.channel(); + ConnectUserDto userDto = (ConnectUserDto) key.attachment(); + + int size = -1; + ByteBuffer buffer = ByteBuffer.allocate(256); + try { + size = channel.read(buffer); + } catch (IOException e) {} + if (size < 0) { + expireConnectUser(key); + } else if (size > 0) { + String command = Header.getCommand(buffer); + saveSystemLog("command : " + command); + switch (Integer.parseInt(command)) { + case 1 : recvBind(channel, buffer, userDto); break; + case 3 : recvDeliver(channel, buffer, userDto); break; + case 7 : recvLinkCheck(key); break; + default: expireConnectUser(key); break; + } + } + } catch (Exception e) { + e.printStackTrace(); + } + System.out.println("read"); + } + + private void recvBind(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) { + String resultCode = "00"; + try { + String id = Bind.getBindId(buffer); + String pwd = Bind.getBindPwd(buffer); + saveSystemLog("id : " + id); + saveSystemLog("pwd : " + pwd); + if (id == null || pwd == null) { + resultCode = "50"; + } else { + if (connectUserMap.containsKey(id)) { + resultCode = "60"; + } else { + MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService(); + MemberDto memberDto = null; + if (svc != null) { + memberDto = svc.get(id); + } + if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) { + resultCode = "20"; + } else { + userDto.setUserId(id); + userDto.setLogin(true); + userDto.setMemberDto(memberDto); + } + } + } + } catch (Exception e) { + resultCode = "10"; + e.printStackTrace(); + } + + try { + saveSystemLog("resultCode : " + resultCode); + channel.write(Bind.makeBindAckBuffer(resultCode)); + } catch (IOException e) { + e.printStackTrace(); + } + } + + private void recvDeliver(SocketChannel channel, ByteBuffer buffer, ConnectUserDto userDto) throws IOException { + BasicMessageDto messageDto = new BasicMessageDto(); + messageDto.setRouterSeq("40"); + messageDto.setServiceType("4"); + messageDto.setUserId(userDto.getUserId()); + messageDto.setRemoteIP(userDto.getRemoteIP()); + messageDto.setSendStatus("0"); + messageDto.setUserMsgID(CommonMessage.getMessageIdForDeliver(buffer)); + messageDto.setUserSender(CommonMessage.getSenderForDeliver(buffer)); + messageDto.setUserReceiver(CommonMessage.getReceiverForDeliver(buffer)); + messageDto.setReserveDt(CommonMessage.getReserveTimeForDeliver(buffer)); + messageDto.setRequestDt(CommonMessage.getRequestTimeForDeliver(buffer)); + messageDto.setUnitCost("10.4"); + messageDto.setUserMessage(SmsMessage.getMessageForDeliver(buffer)); + + QueueTypeWorker worker = QueueTypeWorker.find("SMS"); + if (worker != null) { + worker.pushQueue(messageDto); + channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + } + } + private void recvLinkCheck(SelectionKey key) throws IOException { + SocketChannel channel = (SocketChannel) key.channel(); + channel.write(LinkCheck.makeLinkCheckAckBuffer()); + } + private void expireConnectUser(SelectionKey key) { + if (key == null || !key.isValid()) { + return; + } + try { + SocketChannel channel = (SocketChannel) key.channel(); + ConnectUserDto userDto = (ConnectUserDto) key.attachment(); + if (userDto != null && userDto.getUserId() != null) { + connectUserMap.remove(userDto.getUserId()); + key.attach(null); + } + // 소켓 채널 닫기 + channel.close(); + // 키 닫기 + key.cancel(); + } catch (IOException e) { + e.printStackTrace(); + } + } + private HeaderDto getHeader(SocketChannel channel) { + HeaderDto headerDto = HeaderDto.builder().build(); + int size = -1; + ByteBuffer buffer = ByteBuffer.allocate(Header.HEADER_LENGTH); + if (channel != null) { + try { + saveSystemLog("Key is valid : "); +// SocketChannel channel = (SocketChannel) key.channel(); + size = channel.read(buffer); + } catch (IOException e) {} + } + + if (size < 0) { + saveSystemLog("Is Error : "); + headerDto.setError(true); + } else { + saveSystemLog("version : " + Header.getVersion(buffer)); + saveSystemLog("Command : " + Header.getCommand(buffer)); + saveSystemLog("BodyLength : " + Header.getBodyLength(buffer)); + headerDto.setVersion(Header.getVersion(buffer)); + headerDto.setCommand(Integer.parseInt(Header.getCommand(buffer))); + headerDto.setBodyLength(Integer.parseInt(Header.getBodyLength(buffer))); + } + + saveSystemLog("READ HEADER : " + size); + + return headerDto; + } + + private void write(Selector selector, SelectionKey key) { + System.out.println("write"); + } + + private void saveSystemLog(Object obj) { + saveLog(obj, true); + } + + private void saveLog(Object obj) { + saveLog(obj, false); + } + + private void saveLog(Object obj, boolean isConsoleOutput) { + if (isConsoleOutput) { + System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{" + serviceType + "}} " + obj); + } + + if (logger == null) { + return; + } + + logger.log(obj); + } + + public void close() { + List unfinishedTasks = executor.shutdownNow(); + connectUserMap.clear(); + if (!unfinishedTasks.isEmpty()) { + saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size()); + } + } + } +} diff --git a/src/main/java/com/munjaon/server/server/service/ReportServerService.java b/src/main/java/com/munjaon/server/server/service/ReportServerService.java new file mode 100644 index 0000000..09cbd1e --- /dev/null +++ b/src/main/java/com/munjaon/server/server/service/ReportServerService.java @@ -0,0 +1,226 @@ +package com.munjaon.server.server.service; + +import com.munjaon.server.server.dto.ConnectUserDto; +import com.munjaon.server.util.LogUtil; +import lombok.Getter; +import org.json.simple.JSONObject; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class ReportServerService extends Service { + private final InetSocketAddress listenAddress; + private final Map connectUserMap = new ConcurrentHashMap<>(); + private ReporterThreadService threadService; + private Selector selector; + + public ReportServerService(String serviceName, int port) { + super(serviceName); + this.listenAddress = new InetSocketAddress(port); + } + @Override + public void checkReady() { + this.IS_READY_YN = true; + } + + @Override + public void initResources() { + try { + initReportChannel(); + threadService = new ReporterThreadService(8, logger); + } catch (IOException e) { + saveSystemLog(e); + throw new RuntimeException(e); + } + } + + private void initReportChannel() throws IOException { + selector = Selector.open(); + /* 채널 생성 */ + ServerSocketChannel serverChannel = ServerSocketChannel.open(); + /* non-Blocking 설정 */ + serverChannel.configureBlocking(false); + /* 서버 ip, port 설정 */ + serverChannel.socket().bind(listenAddress); + /* 채널에 accept 대기 설정 */ + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + } + + private void closeReportChannel() throws IOException { + selector.close(); + } + + @Override + public void releaseResources() { + try { + closeReportChannel(); + threadService.close(); + } catch (IOException e) { + saveSystemLog(e); + throw new RuntimeException(e); + } + } + + @Override + public void doService() { + + } + + private void execInterest() throws IOException { + if (selector.select(1000) == 0) { + return ; + } + Iterator keys = selector.selectedKeys().iterator(); + while (keys.hasNext()) { + SelectionKey key = keys.next(); + if (key.isValid()) { + if (key.isAcceptable()) { // 접속일 경우.. + saveSystemLog("isAcceptable"); + accept(selector, key); + } else if (key.isReadable()) { // 수신일 경우.. + saveSystemLog("isReadable"); +// threadService.submit(selector, key, 2); + } + } + /* 키 셋에서 제거. */ + keys.remove(); + } + } + + private void accept(Selector selector, SelectionKey key) { + try { + /* 키 채널을 가져온다. */ + ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); + /* accept을 해서 Socket 채널을 가져온다. */ + SocketChannel channel = serverChannel.accept(); + channel.configureBlocking(false); + /* 소켓 취득 */ + Socket socket = channel.socket(); + SocketAddress remoteAddr = socket.getRemoteSocketAddress(); + saveSystemLog("Connected to: " + remoteAddr); + // Socket 채널을 channel에 수신 등록한다 + channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).build()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +// private void read(Selector selector, SelectionKey key) { +// try { +// saveSystemLog("read : " + Thread.currentThread().getName()); +// // 키 채널을 가져온다. +// SocketChannel channel = (SocketChannel) key.channel(); +// ConnectUserDto userDto = (ConnectUserDto) key.attachment(); +// +// int size = -1; +// ByteBuffer buffer = ByteBuffer.allocate(256); +// try { +// size = channel.read(buffer); +// } catch (IOException e) {} +// if (size < 0) { +// expireConnectUser(key); +// } else if (size > 0) { +// String command = Header.getCommand(buffer); +// saveSystemLog("command : " + command); +// switch (Integer.parseInt(command)) { +// case 1 : recvBind(channel, buffer, userDto); break; +// case 6 : recvDeliver(channel, buffer, userDto); break; +// case 8 : recvLinkCheck(key); break; +// default: expireConnectUser(key); break; +// } +// } +// } catch (Exception e) { +// e.printStackTrace(); +// } +// System.out.println("read"); +// } + + private void expireConnectUser(SelectionKey key) { + if (key == null || !key.isValid()) { + return; + } + try { + SocketChannel channel = (SocketChannel) key.channel(); + ConnectUserDto userDto = (ConnectUserDto) key.attachment(); + if (userDto != null && userDto.getUserId() != null) { + connectUserMap.remove(userDto.getUserId()); + key.attach(null); + } + // 소켓 채널 닫기 + channel.close(); + // 키 닫기 + key.cancel(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public JSONObject monitorService() { + return null; + } + + private static class ReporterThreadService { + @Getter + private final int maxCore; + private final ExecutorService executor; + private final Map connectUserMap = new ConcurrentHashMap<>(); + private final LogUtil logger; + + public ReporterThreadService(LogUtil logger) { + this(Runtime.getRuntime().availableProcessors(), logger); + } + + public ReporterThreadService(int maxCore, LogUtil logger) { + this.maxCore = maxCore; + this.executor = Executors.newFixedThreadPool(maxCore); + this.logger = logger; + } + + private void write(Selector selector, SelectionKey key) { + System.out.println("write"); + } + + private void saveSystemLog(Object obj) { + saveLog(obj, true); + } + + private void saveLog(Object obj) { + saveLog(obj, false); + } + + private void saveLog(Object obj, boolean isConsoleOutput) { + if (isConsoleOutput) { + System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern(LOG_DATE_FORMAT)) + " {{ReporterThreadService}} " + obj); + } + + if (logger == null) { + return; + } + + logger.log(obj); + } + + public void close() { + List unfinishedTasks = executor.shutdownNow(); + connectUserMap.clear(); + if (!unfinishedTasks.isEmpty()) { + saveSystemLog("Not all tasks finished before calling close: " + unfinishedTasks.size()); + } + } + } +} diff --git a/src/main/java/com/munjaon/server/server/service/SocketServerService.java b/src/main/java/com/munjaon/server/server/service/SocketServerService.java deleted file mode 100644 index bdaac6a..0000000 --- a/src/main/java/com/munjaon/server/server/service/SocketServerService.java +++ /dev/null @@ -1,30 +0,0 @@ -package com.munjaon.server.server.service; - -import org.json.simple.JSONObject; - -public class SocketServerService extends Service { - @Override - public void checkReady() { - - } - - @Override - public void initResources() { - - } - - @Override - public void releaseResources() { - - } - - @Override - public void doService() { - - } - - @Override - public JSONObject monitorService() { - return null; - } -} diff --git a/src/main/java/com/munjaon/server/util/CommonUtil.java b/src/main/java/com/munjaon/server/util/CommonUtil.java index a63de59..25c2ae1 100644 --- a/src/main/java/com/munjaon/server/util/CommonUtil.java +++ b/src/main/java/com/munjaon/server/util/CommonUtil.java @@ -69,7 +69,7 @@ public final class CommonUtil { return isValid; } // 해당 길이만큼 문자열을 자르는 함수 - public static String CutString(String str,int limit) + public static String cutString(String str, int limit) { int len = str.length(); int sumLength=0; diff --git a/src/main/java/com/munjaon/server/util/SerialNoUtil.java b/src/main/java/com/munjaon/server/util/SerialNoUtil.java index 9f7717a..3971d6a 100644 --- a/src/main/java/com/munjaon/server/util/SerialNoUtil.java +++ b/src/main/java/com/munjaon/server/util/SerialNoUtil.java @@ -5,7 +5,7 @@ import java.time.format.DateTimeFormatter; public final class SerialNoUtil { public static String getSerialNo() { - return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")) + getMultiAlphabet(3) + getMultiNumeric(3) + getMultiAlphabet(2) + getMultiNumeric(6); + return getMultiAlphabet(3) + getMultiNumeric(3) + getMultiAlphabet(2) + getMultiNumeric(6); } public static String getMultiAlphabet(final int count) { diff --git a/src/main/resources/sqlmap/report_sql.xml b/src/main/resources/sqlmap/report_sql.xml new file mode 100644 index 0000000..a96b61d --- /dev/null +++ b/src/main/resources/sqlmap/report_sql.xml @@ -0,0 +1,22 @@ + + + + + + DELETE FROM mj_msg_report WHERE MSG_ID = #{msgId} + + \ No newline at end of file diff --git a/src/main/resources/sqlmap/serialno_sql.xml b/src/main/resources/sqlmap/serialno_sql.xml new file mode 100644 index 0000000..3f9c3ee --- /dev/null +++ b/src/main/resources/sqlmap/serialno_sql.xml @@ -0,0 +1,29 @@ + + + + + + + /* SerialNoMapper.insert */ + INSERT IDS (TABLE_NAME, NEXT_ID) VALUES ('MSG_ID', 1) + + + + /* SerialNoMapper.update */ + UPDATE IDS SET NEXT_ID = NEXT_ID + 1 + WHERE TABLE_NAME = 'MSG_ID' + + + + \ No newline at end of file diff --git a/src/main/resources/sqlmap/sms_sql.xml b/src/main/resources/sqlmap/sms_sql.xml new file mode 100644 index 0000000..0c1050e --- /dev/null +++ b/src/main/resources/sqlmap/sms_sql.xml @@ -0,0 +1,20 @@ + + + + + INSERT INTO MJ_MSG_DATA ( + MSG_ID + , MSG_GROUP_ID + , USER_ID + , AGENT_MSG_ID + , AGENT_CODE + , CUR_STATE + , REQ_DATE + , CALL_TO + , CALL_FROM + , SUBJECT + , SMS_TXT + , MSG_TYPE + )VALUES (#{id}, #{msgGroupID}, #{userId}, #{userMsgID}, '04', 0, NOW(), #{userReceiver}, #{userSender}, NULL, #{userMessage}, '4' ) + + \ No newline at end of file