From 229f464ec3ae15ad121aa0a0010bddf62d7ba779 Mon Sep 17 00:00:00 2001 From: jangdongsin Date: Mon, 17 Feb 2025 23:46:04 +0900 Subject: [PATCH] =?UTF-8?q?=EB=88=84=EB=9D=BD=EA=B1=B4=20=EB=A1=9C?= =?UTF-8?q?=EC=A7=81=20=EC=88=98=EC=A0=95,=20IP=EC=B2=B4=ED=81=AC,=20?= =?UTF-8?q?=EC=A4=91=EB=B3=B5=EA=B1=B4=20=EB=A1=9C=EC=A7=81=20=EC=B6=94?= =?UTF-8?q?=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/config/RunnerConfiguration.java | 20 +- .../server/queue/pool/KeyWriteQueue.java | 216 +++++++++++++++++ .../munjaon/server/queue/pool/QueuePool.java | 3 +- .../server/server/dto/ConnectUserDto.java | 8 + .../server/server/service/CollectServer.java | 18 +- .../server/server/service/ReportServer.java | 10 +- .../server/server/task/CollectServerTask.java | 229 ++++++++---------- src/main/resources/sqlmap/sms_sql.xml | 1 + 8 files changed, 369 insertions(+), 136 deletions(-) create mode 100644 src/main/java/com/munjaon/server/queue/pool/KeyWriteQueue.java diff --git a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java index 7f2f6c0..967fbbd 100644 --- a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java +++ b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java @@ -197,9 +197,11 @@ public class RunnerConfiguration { try { String serviceName = "SMS_COLLECTOR"; String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); + String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE"); int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); + int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE"); // CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port); - CollectServer collectServer = new CollectServer(serviceName, serviceType, port); + CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize); ShutdownService shutdownService = new ShutdownService(collectServer); Runtime.getRuntime().addShutdownHook(shutdownService); @@ -217,9 +219,11 @@ public class RunnerConfiguration { try { String serviceName = "LMS_COLLECTOR"; String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); + String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE"); int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); + int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE"); // CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port); - CollectServer collectServer = new CollectServer(serviceName, serviceType, port); + CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize); ShutdownService shutdownService = new ShutdownService(collectServer); Runtime.getRuntime().addShutdownHook(shutdownService); @@ -237,9 +241,11 @@ public class RunnerConfiguration { try { String serviceName = "MMS_COLLECTOR"; String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); + String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE"); int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); + int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE"); // CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port); - CollectServer collectServer = new CollectServer(serviceName, serviceType, port); + CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize); ShutdownService shutdownService = new ShutdownService(collectServer); Runtime.getRuntime().addShutdownHook(shutdownService); @@ -257,9 +263,11 @@ public class RunnerConfiguration { try { String serviceName = "KAT_COLLECTOR"; String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); + String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE"); int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); + int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE"); // CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port); - CollectServer collectServer = new CollectServer(serviceName, serviceType, port); + CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize); ShutdownService shutdownService = new ShutdownService(collectServer); Runtime.getRuntime().addShutdownHook(shutdownService); @@ -277,9 +285,11 @@ public class RunnerConfiguration { try { String serviceName = "KFT_COLLECTOR"; String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE"); + String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE"); int port = serverConfig.getInt(serviceName + ".SERVICE_PORT"); + int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE"); // CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port); - CollectServer collectServer = new CollectServer(serviceName, serviceType, port); + CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize); ShutdownService shutdownService = new ShutdownService(collectServer); Runtime.getRuntime().addShutdownHook(shutdownService); diff --git a/src/main/java/com/munjaon/server/queue/pool/KeyWriteQueue.java b/src/main/java/com/munjaon/server/queue/pool/KeyWriteQueue.java new file mode 100644 index 0000000..7adcb1e --- /dev/null +++ b/src/main/java/com/munjaon/server/queue/pool/KeyWriteQueue.java @@ -0,0 +1,216 @@ +package com.munjaon.server.queue.pool; + +import com.munjaon.server.queue.config.QueueConstants; +import com.munjaon.server.server.packet.common.Packet; +import com.munjaon.server.util.FileUtil; +import lombok.Getter; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +@Getter +public class KeyWriteQueue { + private final static int HEAD_SIZE = 40; + private final static int HEAD_TOTAL_COUNT_LOCATION = 0; + private final static int HEAD_TOTAL_COUNT_SIZE = 20; + private final static int HEAD_WRITE_POSITION_LOCATION = 20; + private final static int HEAD_WRITE_POSITION_SIZE = 20; + private final static int BODY_WRITE_KEY_SIZE = 20; + /* Queue 변수 */ + private final String queuePath; + private final String queueName; + private final int MAX_QUEUE_SIZE; + + private int writePosition = -1; + private int totalCount = -1; + + /** Queue Header Buffer */ + protected ByteBuffer headerBuffer = null; + /** Header 에서 사용하는 변수 */ + protected byte[] headerArray = null; + /** Queue File Channel */ + protected FileChannel channel = null; + /** pushBuffer() 함수에서 사용하는 변수 */ + protected ByteBuffer dataBuffer = null; + /** isExist() 함수에서 사용하는 변수 */ + protected ByteBuffer searchBuffer = null; + + public KeyWriteQueue(String queuePath, String queueName, int maxQueueSize) { + this.queuePath = System.getProperty("ROOTPATH") + File.separator + queuePath; + this.queueName = queueName; + MAX_QUEUE_SIZE = maxQueueSize; + /* 큐초기화 */ + try { + initQueue(); + } catch (Exception e) {} + } + + public boolean isExist(String key) throws Exception { + if (this.totalCount == 0) { + return false; + } + + boolean exists = false; + for (int i = 0; i < this.totalCount; i++) { + int position = this.HEAD_SIZE + (i * BODY_WRITE_KEY_SIZE); + this.channel.position(position); + /* 읽기용 데이터 버퍼 초기화 */ + initSearchBuffer(); + this.channel.read(searchBuffer); + + byte[] byteArray = new byte[BODY_WRITE_KEY_SIZE]; + searchBuffer.position(0); + searchBuffer.get(byteArray); + String queueKey = QueueConstants.getString(byteArray); + + if (queueKey.equals(key)) { + exists = true; + break; + } + } + + return exists; + } + + public void pushKeyToBuffer(String key) throws Exception { + if (key == null || key.isEmpty()) { + return; + } + + /* 1. dataBuffer 초기화 */ + initDataBuffer(); + /* 2. messageDto >> dataBuffer */ + this.dataBuffer.position(0); + this.dataBuffer.put(key.getBytes(Packet.AGENT_CHARACTER_SET)); + + /* 3.1 Header 정보 다시 일기 */ + readHeader(); + if (this.dataBuffer != null) { + int position = HEAD_SIZE + (BODY_WRITE_KEY_SIZE * this.writePosition); + this.channel.position(position); + this.dataBuffer.flip(); + this.channel.write(this.dataBuffer); + /* 3.2 Push 카운터 증가 */ + this.writePosition = this.writePosition + 1; + if (this.writePosition > this.MAX_QUEUE_SIZE) { + this.writePosition = 0; + } + this.totalCount = this.totalCount + 1; + if (this.totalCount > this.MAX_QUEUE_SIZE) { + this.totalCount = this.MAX_QUEUE_SIZE; + } + /* 3.3 Header 정보 변경 */ + writeHeader(); + } + } + + public void initDataBuffer() { + if (this.dataBuffer == null) { + this.dataBuffer = ByteBuffer.allocateDirect(BODY_WRITE_KEY_SIZE); + } + this.dataBuffer.clear(); + for(int loopCnt = 0; loopCnt < BODY_WRITE_KEY_SIZE; loopCnt++){ + this.dataBuffer.put(QueueConstants.SET_DEFAULT_BYTE); + } + this.dataBuffer.position(0); + } + + public void initSearchBuffer() { + if (this.searchBuffer == null) { + this.searchBuffer = ByteBuffer.allocateDirect(BODY_WRITE_KEY_SIZE); + } + this.searchBuffer.clear(); + for(int loopCnt = 0; loopCnt < BODY_WRITE_KEY_SIZE; loopCnt++){ + this.searchBuffer.put(QueueConstants.SET_DEFAULT_BYTE); + } + this.searchBuffer.position(0); + } + + private void initQueue() throws Exception { + this.headerBuffer = ByteBuffer.allocateDirect(HEAD_SIZE); + try{ + /* 1. 경로 체크 및 생성 */ + FileUtil.mkdirs(this.queuePath); + /* 2. 파일생성 및 읽기 */ + File file = new File(this.queuePath + File.separator + queueName + ".queue"); + this.channel = new RandomAccessFile(file, "rw").getChannel(); + + if (file.length() == 0) { + // Push 및 Pop 카운트 초기화 + this.writePosition = 0; + this.totalCount = 0; + // 헤더 초기화 + writeHeader(); + } else { + readHeader(); + } +// backupQueue(); + } catch(Exception e) { + throw e; + } finally { + //lock.release(); + } + } + protected void readHeader() throws Exception { + try { + initHeaderBuffer(); + this.channel.position(0); + this.channel.read(this.headerBuffer); + this.headerArray = new byte[HEAD_TOTAL_COUNT_SIZE]; + // 생성날짜 가져오기 - 생성날짜(10) / 읽은카운트(10) / 쓴카운트(10) + this.headerBuffer.position(HEAD_TOTAL_COUNT_LOCATION); + this.headerBuffer.get(this.headerArray); + this.totalCount = Integer.parseInt(QueueConstants.getString(this.headerArray)); + // 쓴 카운트 가져오기 + this.headerArray = new byte[HEAD_WRITE_POSITION_SIZE]; + this.headerBuffer.position(HEAD_WRITE_POSITION_LOCATION); + this.headerBuffer.get(this.headerArray); + this.writePosition = Integer.parseInt(QueueConstants.getString(this.headerArray)); + } catch(Exception e) { + throw e; + } + } + + protected void writeHeader() throws Exception { + try { + initHeaderBuffer(); + this.channel.position(0); + this.headerBuffer.put(Integer.toString(this.totalCount).getBytes(Packet.AGENT_CHARACTER_SET)); + this.headerBuffer.position(HEAD_WRITE_POSITION_LOCATION); + this.headerBuffer.put(Integer.toString(this.writePosition).getBytes(Packet.AGENT_CHARACTER_SET)); + this.headerBuffer.flip(); + this.channel.write(this.headerBuffer); + } catch(Exception e) { + throw e; + } + } + + protected void initHeaderBuffer(){ + this.headerBuffer.clear(); + for(int loopCnt = 0; loopCnt < HEAD_SIZE; loopCnt++){ + this.headerBuffer.put(QueueConstants.SET_DEFAULT_BYTE); + } + this.headerBuffer.position(0); + } + + public void close() throws IOException { + try { + if (isOpen()) { + channel.close(); + } + } catch(IOException e) { + throw e; + } + } + + protected boolean isOpen() { + if (this.channel == null) { + return false; + } + + return this.channel.isOpen(); + } +} diff --git a/src/main/java/com/munjaon/server/queue/pool/QueuePool.java b/src/main/java/com/munjaon/server/queue/pool/QueuePool.java index a5d43a1..e4d80ce 100644 --- a/src/main/java/com/munjaon/server/queue/pool/QueuePool.java +++ b/src/main/java/com/munjaon/server/queue/pool/QueuePool.java @@ -43,7 +43,7 @@ public abstract class QueuePool { queue = queuePool.get(loopCnt); if(name.equals(queue.getQueueName())){ queuePool.remove(loopCnt); - System.out.println("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Removed]"); + log.info("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Removed]"); break; } } @@ -55,6 +55,7 @@ public abstract class QueuePool { synchronized(lockMonitor){ if (queue != null){ queuePool.addLast(queue); + log.info("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Added]"); lockMonitor.notifyAll(); } } diff --git a/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java b/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java index ecdc302..d1a5829 100644 --- a/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java +++ b/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java @@ -1,6 +1,7 @@ package com.munjaon.server.server.dto; import com.munjaon.server.cache.dto.MemberDto; +import com.munjaon.server.queue.pool.KeyWriteQueue; import com.munjaon.server.server.config.ServerConfig; import lombok.Builder; import lombok.Getter; @@ -28,6 +29,13 @@ public class ConnectUserDto { private int command; /* 요청을 처리중인지 여부 */ private boolean isRunningMode; + /* MSGID 중복큐 경로 */ + private String msgKeyPath; + /* MSGID 중복큐 사이즈 */ + private int msgIdQueueSize; + /* MSG ID 중복체크 큐 */ + private KeyWriteQueue keyWriteQueue; + private MemberDto memberDto; diff --git a/src/main/java/com/munjaon/server/server/service/CollectServer.java b/src/main/java/com/munjaon/server/server/service/CollectServer.java index 703a035..4f455a8 100644 --- a/src/main/java/com/munjaon/server/server/service/CollectServer.java +++ b/src/main/java/com/munjaon/server/server/service/CollectServer.java @@ -1,6 +1,7 @@ package com.munjaon.server.server.service; import com.munjaon.server.queue.enums.QueueTypeWorker; +import com.munjaon.server.queue.pool.KeyWriteQueue; import com.munjaon.server.server.config.ServerConfig; import com.munjaon.server.server.dto.ConnectUserDto; import com.munjaon.server.server.queue.CollectUserQueue; @@ -12,6 +13,7 @@ import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; @@ -24,13 +26,17 @@ public class CollectServer extends Service { private Selector selector; private final String serviceType; + private final String msgKeyPath; + private final int msgIdQueueSize; private long USER_STATUS_LAST_CHECK_TIME = System.currentTimeMillis(); - public CollectServer(String serviceName, String serviceType, int port) { + public CollectServer(String serviceName, String serviceType, int port, String msgKeyPath, int msgIdQueueSize) { super(serviceName); this.listenAddress = new InetSocketAddress(port); this.serviceType = serviceType; + this.msgKeyPath = msgKeyPath; + this.msgIdQueueSize = msgIdQueueSize; } @Override @@ -174,9 +180,12 @@ public class CollectServer extends Service { /* 소켓 취득 */ Socket socket = channel.socket(); InetAddress inetAddress = socket.getInetAddress(); + SocketAddress socketAddress = socket.getRemoteSocketAddress(); + saveLog("[CLIENT CONNECTION IP : " + inetAddress.getHostAddress() + "]"); + saveLog("[CLIENT CONNECTION IP : " + socketAddress.toString() + "]"); // Socket 채널을 channel에 수신 등록한다 - channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).build()); + channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).msgKeyPath(this.msgKeyPath).msgIdQueueSize(this.msgIdQueueSize).lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).build()); } catch (Exception e) { saveLog(e.toString()); throw new RuntimeException(e); @@ -191,6 +200,11 @@ public class CollectServer extends Service { SocketChannel channel = (SocketChannel) key.channel(); ConnectUserDto userDto = (ConnectUserDto) key.attachment(); if (userDto != null && userDto.getUserId() != null) { + /* MSG ID 중복체크 큐 해제 */ + KeyWriteQueue keyWriteQueue = userDto.getKeyWriteQueue(); + if (keyWriteQueue != null) { + keyWriteQueue.close(); + } saveLog("[CLIENT USER IS DISCONNECT : " + userDto.toString() + "]"); collectUserQueue.removeUser(this.serviceType, userDto.getUserId()); key.attach(null); diff --git a/src/main/java/com/munjaon/server/server/service/ReportServer.java b/src/main/java/com/munjaon/server/server/service/ReportServer.java index 6728e19..b31e0ea 100644 --- a/src/main/java/com/munjaon/server/server/service/ReportServer.java +++ b/src/main/java/com/munjaon/server/server/service/ReportServer.java @@ -9,6 +9,7 @@ import com.munjaon.server.server.task.ReportResultTask; import org.json.simple.JSONObject; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketAddress; @@ -173,10 +174,13 @@ public class ReportServer extends Service { channel.configureBlocking(false); /* 소켓 취득 */ Socket socket = channel.socket(); - SocketAddress remoteAddr = socket.getRemoteSocketAddress(); - saveLog("Connected to: " + remoteAddr); + InetAddress inetAddress = socket.getInetAddress(); + SocketAddress socketAddress = socket.getRemoteSocketAddress(); + + saveLog("[CLIENT CONNECTION IP : " + inetAddress.getHostAddress() + "]"); + saveLog("[CLIENT CONNECTION IP : " + socketAddress.toString() + "]"); // Socket 채널을 channel에 수신 등록한다 - channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).queuePath(System.getProperty("ROOTPATH") + getProp("QUEUE_PATH")).build()); + channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).queuePath(System.getProperty("ROOTPATH") + getProp("QUEUE_PATH")).build()); } catch (Exception e) { saveLog(e.toString()); throw new RuntimeException(e); diff --git a/src/main/java/com/munjaon/server/server/task/CollectServerTask.java b/src/main/java/com/munjaon/server/server/task/CollectServerTask.java index cd9f020..a7e7c31 100644 --- a/src/main/java/com/munjaon/server/server/task/CollectServerTask.java +++ b/src/main/java/com/munjaon/server/server/task/CollectServerTask.java @@ -5,6 +5,7 @@ import com.munjaon.server.cache.enums.CacheService; import com.munjaon.server.cache.service.MemberService; import com.munjaon.server.queue.dto.BasicMessageDto; import com.munjaon.server.queue.enums.QueueTypeWorker; +import com.munjaon.server.queue.pool.KeyWriteQueue; import com.munjaon.server.server.config.ServerConfig; import com.munjaon.server.server.dto.ConnectUserDto; import com.munjaon.server.server.packet.common.*; @@ -278,9 +279,24 @@ public class CollectServerTask extends Thread { saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]"); QueueTypeWorker worker = QueueTypeWorker.find("SMS"); if (worker != null) { - worker.pushQueue(messageDto); - channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); - connectUserDto.updateLastTrafficTime(); + boolean isQueueYn = true; + KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue(); + try { + if (keyWriteQueue.isExist(messageDto.getUserMsgID())) { + saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID()); + } else { + worker.pushQueue(messageDto); + } + } catch (Exception e) { + isQueueYn = false; + saveSystemLog(e); + } + if (isQueueYn) { + channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + connectUserDto.updateLastTrafficTime(); + /* MSG_ID 큐에 저장 */ + keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID()); + } } } catch (Exception e) { saveLog(e); @@ -312,9 +328,25 @@ public class CollectServerTask extends Thread { saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]"); QueueTypeWorker worker = QueueTypeWorker.find("LMS"); if (worker != null) { - worker.pushQueue(messageDto); - channel.write(LmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); - connectUserDto.updateLastTrafficTime(); + boolean isQueueYn = true; + KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue(); + try { + if (keyWriteQueue.isExist(messageDto.getUserMsgID())) { + saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID()); + } else { + worker.pushQueue(messageDto); + } + + } catch (Exception e) { + isQueueYn = false; + saveSystemLog(e); + } + if (isQueueYn) { + channel.write(LmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + connectUserDto.updateLastTrafficTime(); + /* MSG_ID 큐에 저장 */ + keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID()); + } } } catch (Exception e) { saveLog(e); @@ -434,9 +466,25 @@ public class CollectServerTask extends Thread { saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]"); QueueTypeWorker worker = QueueTypeWorker.find("MMS"); if (worker != null) { - worker.pushQueue(messageDto); - channel.write(MmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); - connectUserDto.updateLastTrafficTime(); + boolean isQueueYn = true; + KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue(); + try { + if (keyWriteQueue.isExist(messageDto.getUserMsgID())) { + saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID()); + } else { + worker.pushQueue(messageDto); + } + + } catch (Exception e) { + isQueueYn = false; + saveSystemLog(e); + } + if (isQueueYn) { + channel.write(MmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + connectUserDto.updateLastTrafficTime(); + /* MSG_ID 큐에 저장 */ + keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID()); + } } } catch (Exception e) { saveLog(e); @@ -514,9 +562,25 @@ public class CollectServerTask extends Thread { saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]"); QueueTypeWorker worker = QueueTypeWorker.find("KAT"); if (worker != null) { - worker.pushQueue(messageDto); - channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); - connectUserDto.updateLastTrafficTime(); + boolean isQueueYn = true; + KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue(); + try { + if (keyWriteQueue.isExist(messageDto.getUserMsgID())) { + saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID()); + } else { + worker.pushQueue(messageDto); + } + + } catch (Exception e) { + isQueueYn = false; + saveSystemLog(e); + } + if (isQueueYn) { + channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + connectUserDto.updateLastTrafficTime(); + /* MSG_ID 큐에 저장 */ + keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID()); + } } else { saveSystemLog("worker is null"); } @@ -526,61 +590,6 @@ public class CollectServerTask extends Thread { } } - private void recvKatDeliver_bak(SocketChannel channel, ByteBuffer headBuffer) throws IOException { - try { - ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH); - channel.read(bodyBuffer); - ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + KakaoMessage.DELIVER_KAKAO_BODY_LENGTH); - Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer); - - BasicMessageDto messageDto = recvCommonMessage(deliverBuffer); - messageDto.setUserSubject(KakaoMessage.getSubjectForDeliver(deliverBuffer)); - messageDto.setUserMessage(KakaoMessage.getMessageForDeliver(deliverBuffer)); - messageDto.setKakaoSenderKey(KakaoMessage.getKakaoSenderKeyForDeliver(deliverBuffer)); - messageDto.setKakaoTemplateCode(KakaoMessage.getKakaoTemplateCodeForDeliver(deliverBuffer)); - - String jsonPath = System.getProperty("ROOTPATH") + File.separator + "kakaofile"; - jsonPath = jsonPath + File.separator + MessageUtil.getDate() + File.separator + SerialNoUtil.getSerialNo(); - FileUtil.mkdirs(jsonPath); - - ByteBuffer fileHeadBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_JSON_FILENAME_LENGTH + KakaoMessage.DELIVER_JSON_FILESIZE_LENGTH); - channel.read(fileHeadBuffer); - - String fileName = KakaoMessage.getFileNameForDeliver(fileHeadBuffer); - String fileSize = KakaoMessage.getFileSizeForDeliver(fileHeadBuffer); - - ByteBuffer fileBuffer = ByteBuffer.allocate(Integer.parseInt(fileSize)); - channel.read(fileBuffer); - fileBuffer.flip(); - JobFileFactory.saveFileForByteBuffer(jsonPath, fileName, fileBuffer); - - messageDto.setKakaoJsonFile(jsonPath + File.separator + fileName); - saveLog(printTaskLog() + "[KAT JSON] [File : " + fileName + ", Size : " + fileSize + "]"); - - /* 사용자 단가, 발송망 설정 */ - MemberDto savedMemberDto = null; - if (this.connectUserDto != null) { - savedMemberDto = this.connectUserDto.getMemberDto(); - } - if (savedMemberDto != null) { - messageDto.setRouterSeq(savedMemberDto.getKakaoAtAgentCode()); - messageDto.setUnitCost(String.valueOf(savedMemberDto.getKakaoAtPrice())); - } - - saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]"); - QueueTypeWorker worker = QueueTypeWorker.find("KAT"); - if (worker != null) { - worker.pushQueue(messageDto); - channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); - connectUserDto.updateLastTrafficTime(); - } else { - saveSystemLog("worker is null"); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - private void recvKftDeliver(SocketChannel channel, ByteBuffer headBuffer) throws Exception { try { ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH); @@ -651,9 +660,24 @@ public class CollectServerTask extends Thread { saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]"); QueueTypeWorker worker = QueueTypeWorker.find("KFT"); if (worker != null) { - worker.pushQueue(messageDto); - channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); - connectUserDto.updateLastTrafficTime(); + boolean isQueueYn = true; + KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue(); + try { + if (keyWriteQueue.isExist(messageDto.getUserMsgID())) { + saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID()); + } else { + worker.pushQueue(messageDto); + } + } catch (Exception e) { + isQueueYn = false; + saveSystemLog(e); + } + if (isQueueYn) { + channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); + connectUserDto.updateLastTrafficTime(); + /* MSG_ID 큐에 저장 */ + keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID()); + } } } catch (Exception e) { saveLog(e); @@ -661,59 +685,6 @@ public class CollectServerTask extends Thread { } } - private void recvKftDeliver_bak(SocketChannel channel, ByteBuffer headBuffer) throws IOException { - try { - ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH); - channel.read(bodyBuffer); - ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + KakaoMessage.DELIVER_KAKAO_BODY_LENGTH); - Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer); - - BasicMessageDto messageDto = recvCommonMessage(deliverBuffer); - messageDto.setUserSubject(KakaoMessage.getSubjectForDeliver(deliverBuffer)); - messageDto.setUserMessage(KakaoMessage.getMessageForDeliver(deliverBuffer)); - messageDto.setKakaoSenderKey(KakaoMessage.getKakaoSenderKeyForDeliver(deliverBuffer)); - messageDto.setKakaoTemplateCode(KakaoMessage.getKakaoTemplateCodeForDeliver(deliverBuffer)); - - String jsonPath = System.getProperty("ROOTPATH") + File.separator + "kakaofile"; - jsonPath = jsonPath + File.separator + MessageUtil.getDate() + File.separator + SerialNoUtil.getSerialNo(); - FileUtil.mkdirs(jsonPath); - - ByteBuffer fileHeadBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_JSON_FILENAME_LENGTH + KakaoMessage.DELIVER_JSON_FILESIZE_LENGTH); - channel.read(fileHeadBuffer); - - String fileName = KakaoMessage.getFileNameForDeliver(fileHeadBuffer); - String fileSize = KakaoMessage.getFileSizeForDeliver(fileHeadBuffer); - - ByteBuffer fileBuffer = ByteBuffer.allocate(Integer.parseInt(fileSize)); - channel.read(fileBuffer); - fileBuffer.flip(); - JobFileFactory.saveFileForByteBuffer(jsonPath, fileName, fileBuffer); - - messageDto.setKakaoJsonFile(jsonPath + File.separator + fileName); - saveLog(printTaskLog() + "[KFT JSON] [File : " + fileName + ", Size : " + fileSize + "]"); - - /* 사용자 단가, 발송망 설정 */ - MemberDto savedMemberDto = null; - if (this.connectUserDto != null) { - savedMemberDto = this.connectUserDto.getMemberDto(); - } - if (savedMemberDto != null) { - messageDto.setRouterSeq(savedMemberDto.getKakaoFtAgentCode()); - messageDto.setUnitCost(String.valueOf(savedMemberDto.getKakaoFtPrice())); - } - - saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]"); - QueueTypeWorker worker = QueueTypeWorker.find("KFT"); - if (worker != null) { - worker.pushQueue(messageDto); - channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus())); - connectUserDto.updateLastTrafficTime(); - } - } catch (Exception e) { - e.printStackTrace(); - } - } - private void recvLinkCheck(SocketChannel channel) throws IOException { /* 서비스 중지여부 체크 */ if (isExpireService()) { @@ -756,10 +727,10 @@ public class CollectServerTask extends Thread { saveLog(printTaskLog() + "[REMOTE IP : " + connectUserDto.getRemoteIP() + "]"); saveLog(printTaskLog() + "[ALLOW IP BASIC : " + memberDto.getAllowIpBasic() + "]"); saveLog(printTaskLog() + "[ALLOW IP EXTEND : " + memberDto.getAllowIpExtend() + "]"); - if (memberDto.getAllowIpBasic() != null && connectUserDto.getRemoteIP().indexOf(memberDto.getAllowIpBasic()) >= 0) { + if (memberDto.getAllowIpBasic() != null && connectUserDto.getRemoteIP().trim().equals(memberDto.getAllowIpBasic())) { isPermit = true; } - if (memberDto.getAllowIpExtend() != null && connectUserDto.getRemoteIP().indexOf(memberDto.getAllowIpExtend()) >= 0) { + if (memberDto.getAllowIpExtend() != null && connectUserDto.getRemoteIP().trim().equals(memberDto.getAllowIpExtend())) { isPermit = true; } } else { @@ -777,6 +748,9 @@ public class CollectServerTask extends Thread { connectUserDto.setMemberDto(memberDto); /* 세션통신 시간 업데이트 */ connectUserDto.updateLastTrafficTime(); + /* MSGID 중복체크큐 초기화 */ + KeyWriteQueue keyWriteQueue = new KeyWriteQueue(connectUserDto.getMsgKeyPath(), connectUserDto.getUserId(), connectUserDto.getMsgIdQueueSize()); + connectUserDto.setKeyWriteQueue(keyWriteQueue); } } catch (Exception e) { resultCode = "10"; @@ -864,6 +838,11 @@ public class CollectServerTask extends Thread { if (connectUserDto != null) { if (connectUserDto.getUserId() != null) { collectUserQueue.removeUser(connectUserDto.getServiceType(), connectUserDto.getUserId()); + /* MSG ID 중복체크 큐 해제 */ + KeyWriteQueue keyWriteQueue = connectUserDto.getKeyWriteQueue(); + if (keyWriteQueue != null) { + keyWriteQueue.close(); + } /* 모니터링 로그 */ HealthCheckServer.saveMonitorLog("[COLLECT SERVER][SERVICE TYPE : " + this.serviceType + "][ID : " + connectUserDto.getUserId() + "][EXPIRE CONNECT USER]"); } diff --git a/src/main/resources/sqlmap/sms_sql.xml b/src/main/resources/sqlmap/sms_sql.xml index 3a88a4b..778eed6 100644 --- a/src/main/resources/sqlmap/sms_sql.xml +++ b/src/main/resources/sqlmap/sms_sql.xml @@ -24,6 +24,7 @@ (#{item.id}, #{item.msgGroupID}, #{item.userId}, #{item.userMsgID}, #{item.routerSeq}, 0, NOW(), #{item.userReceiver}, #{item.userSender}, NULL, #{item.userMessage}, '4' ) + ON DUPLICATE KEY UPDATE REQ_DATE = VALUES(REQ_DATE)