diff --git a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java
index 7f2f6c0..967fbbd 100644
--- a/src/main/java/com/munjaon/server/config/RunnerConfiguration.java
+++ b/src/main/java/com/munjaon/server/config/RunnerConfiguration.java
@@ -197,9 +197,11 @@ public class RunnerConfiguration {
try {
String serviceName = "SMS_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
+ String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
+ int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
- CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
+ CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
@@ -217,9 +219,11 @@ public class RunnerConfiguration {
try {
String serviceName = "LMS_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
+ String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
+ int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
- CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
+ CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
@@ -237,9 +241,11 @@ public class RunnerConfiguration {
try {
String serviceName = "MMS_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
+ String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
+ int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
- CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
+ CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
@@ -257,9 +263,11 @@ public class RunnerConfiguration {
try {
String serviceName = "KAT_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
+ String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
+ int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
- CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
+ CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
@@ -277,9 +285,11 @@ public class RunnerConfiguration {
try {
String serviceName = "KFT_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
+ String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
+ int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
- CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
+ CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
diff --git a/src/main/java/com/munjaon/server/queue/pool/KeyWriteQueue.java b/src/main/java/com/munjaon/server/queue/pool/KeyWriteQueue.java
new file mode 100644
index 0000000..7adcb1e
--- /dev/null
+++ b/src/main/java/com/munjaon/server/queue/pool/KeyWriteQueue.java
@@ -0,0 +1,216 @@
+package com.munjaon.server.queue.pool;
+
+import com.munjaon.server.queue.config.QueueConstants;
+import com.munjaon.server.server.packet.common.Packet;
+import com.munjaon.server.util.FileUtil;
+import lombok.Getter;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+@Getter
+public class KeyWriteQueue {
+ private final static int HEAD_SIZE = 40;
+ private final static int HEAD_TOTAL_COUNT_LOCATION = 0;
+ private final static int HEAD_TOTAL_COUNT_SIZE = 20;
+ private final static int HEAD_WRITE_POSITION_LOCATION = 20;
+ private final static int HEAD_WRITE_POSITION_SIZE = 20;
+ private final static int BODY_WRITE_KEY_SIZE = 20;
+ /* Queue 변수 */
+ private final String queuePath;
+ private final String queueName;
+ private final int MAX_QUEUE_SIZE;
+
+ private int writePosition = -1;
+ private int totalCount = -1;
+
+ /** Queue Header Buffer */
+ protected ByteBuffer headerBuffer = null;
+ /** Header 에서 사용하는 변수 */
+ protected byte[] headerArray = null;
+ /** Queue File Channel */
+ protected FileChannel channel = null;
+ /** pushBuffer() 함수에서 사용하는 변수 */
+ protected ByteBuffer dataBuffer = null;
+ /** isExist() 함수에서 사용하는 변수 */
+ protected ByteBuffer searchBuffer = null;
+
+ public KeyWriteQueue(String queuePath, String queueName, int maxQueueSize) {
+ this.queuePath = System.getProperty("ROOTPATH") + File.separator + queuePath;
+ this.queueName = queueName;
+ MAX_QUEUE_SIZE = maxQueueSize;
+ /* 큐초기화 */
+ try {
+ initQueue();
+ } catch (Exception e) {}
+ }
+
+ public boolean isExist(String key) throws Exception {
+ if (this.totalCount == 0) {
+ return false;
+ }
+
+ boolean exists = false;
+ for (int i = 0; i < this.totalCount; i++) {
+ int position = this.HEAD_SIZE + (i * BODY_WRITE_KEY_SIZE);
+ this.channel.position(position);
+ /* 읽기용 데이터 버퍼 초기화 */
+ initSearchBuffer();
+ this.channel.read(searchBuffer);
+
+ byte[] byteArray = new byte[BODY_WRITE_KEY_SIZE];
+ searchBuffer.position(0);
+ searchBuffer.get(byteArray);
+ String queueKey = QueueConstants.getString(byteArray);
+
+ if (queueKey.equals(key)) {
+ exists = true;
+ break;
+ }
+ }
+
+ return exists;
+ }
+
+ public void pushKeyToBuffer(String key) throws Exception {
+ if (key == null || key.isEmpty()) {
+ return;
+ }
+
+ /* 1. dataBuffer 초기화 */
+ initDataBuffer();
+ /* 2. messageDto >> dataBuffer */
+ this.dataBuffer.position(0);
+ this.dataBuffer.put(key.getBytes(Packet.AGENT_CHARACTER_SET));
+
+ /* 3.1 Header 정보 다시 일기 */
+ readHeader();
+ if (this.dataBuffer != null) {
+ int position = HEAD_SIZE + (BODY_WRITE_KEY_SIZE * this.writePosition);
+ this.channel.position(position);
+ this.dataBuffer.flip();
+ this.channel.write(this.dataBuffer);
+ /* 3.2 Push 카운터 증가 */
+ this.writePosition = this.writePosition + 1;
+ if (this.writePosition > this.MAX_QUEUE_SIZE) {
+ this.writePosition = 0;
+ }
+ this.totalCount = this.totalCount + 1;
+ if (this.totalCount > this.MAX_QUEUE_SIZE) {
+ this.totalCount = this.MAX_QUEUE_SIZE;
+ }
+ /* 3.3 Header 정보 변경 */
+ writeHeader();
+ }
+ }
+
+ public void initDataBuffer() {
+ if (this.dataBuffer == null) {
+ this.dataBuffer = ByteBuffer.allocateDirect(BODY_WRITE_KEY_SIZE);
+ }
+ this.dataBuffer.clear();
+ for(int loopCnt = 0; loopCnt < BODY_WRITE_KEY_SIZE; loopCnt++){
+ this.dataBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
+ }
+ this.dataBuffer.position(0);
+ }
+
+ public void initSearchBuffer() {
+ if (this.searchBuffer == null) {
+ this.searchBuffer = ByteBuffer.allocateDirect(BODY_WRITE_KEY_SIZE);
+ }
+ this.searchBuffer.clear();
+ for(int loopCnt = 0; loopCnt < BODY_WRITE_KEY_SIZE; loopCnt++){
+ this.searchBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
+ }
+ this.searchBuffer.position(0);
+ }
+
+ private void initQueue() throws Exception {
+ this.headerBuffer = ByteBuffer.allocateDirect(HEAD_SIZE);
+ try{
+ /* 1. 경로 체크 및 생성 */
+ FileUtil.mkdirs(this.queuePath);
+ /* 2. 파일생성 및 읽기 */
+ File file = new File(this.queuePath + File.separator + queueName + ".queue");
+ this.channel = new RandomAccessFile(file, "rw").getChannel();
+
+ if (file.length() == 0) {
+ // Push 및 Pop 카운트 초기화
+ this.writePosition = 0;
+ this.totalCount = 0;
+ // 헤더 초기화
+ writeHeader();
+ } else {
+ readHeader();
+ }
+// backupQueue();
+ } catch(Exception e) {
+ throw e;
+ } finally {
+ //lock.release();
+ }
+ }
+ protected void readHeader() throws Exception {
+ try {
+ initHeaderBuffer();
+ this.channel.position(0);
+ this.channel.read(this.headerBuffer);
+ this.headerArray = new byte[HEAD_TOTAL_COUNT_SIZE];
+ // 생성날짜 가져오기 - 생성날짜(10) / 읽은카운트(10) / 쓴카운트(10)
+ this.headerBuffer.position(HEAD_TOTAL_COUNT_LOCATION);
+ this.headerBuffer.get(this.headerArray);
+ this.totalCount = Integer.parseInt(QueueConstants.getString(this.headerArray));
+ // 쓴 카운트 가져오기
+ this.headerArray = new byte[HEAD_WRITE_POSITION_SIZE];
+ this.headerBuffer.position(HEAD_WRITE_POSITION_LOCATION);
+ this.headerBuffer.get(this.headerArray);
+ this.writePosition = Integer.parseInt(QueueConstants.getString(this.headerArray));
+ } catch(Exception e) {
+ throw e;
+ }
+ }
+
+ protected void writeHeader() throws Exception {
+ try {
+ initHeaderBuffer();
+ this.channel.position(0);
+ this.headerBuffer.put(Integer.toString(this.totalCount).getBytes(Packet.AGENT_CHARACTER_SET));
+ this.headerBuffer.position(HEAD_WRITE_POSITION_LOCATION);
+ this.headerBuffer.put(Integer.toString(this.writePosition).getBytes(Packet.AGENT_CHARACTER_SET));
+ this.headerBuffer.flip();
+ this.channel.write(this.headerBuffer);
+ } catch(Exception e) {
+ throw e;
+ }
+ }
+
+ protected void initHeaderBuffer(){
+ this.headerBuffer.clear();
+ for(int loopCnt = 0; loopCnt < HEAD_SIZE; loopCnt++){
+ this.headerBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
+ }
+ this.headerBuffer.position(0);
+ }
+
+ public void close() throws IOException {
+ try {
+ if (isOpen()) {
+ channel.close();
+ }
+ } catch(IOException e) {
+ throw e;
+ }
+ }
+
+ protected boolean isOpen() {
+ if (this.channel == null) {
+ return false;
+ }
+
+ return this.channel.isOpen();
+ }
+}
diff --git a/src/main/java/com/munjaon/server/queue/pool/QueuePool.java b/src/main/java/com/munjaon/server/queue/pool/QueuePool.java
index a5d43a1..e4d80ce 100644
--- a/src/main/java/com/munjaon/server/queue/pool/QueuePool.java
+++ b/src/main/java/com/munjaon/server/queue/pool/QueuePool.java
@@ -43,7 +43,7 @@ public abstract class QueuePool {
queue = queuePool.get(loopCnt);
if(name.equals(queue.getQueueName())){
queuePool.remove(loopCnt);
- System.out.println("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Removed]");
+ log.info("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Removed]");
break;
}
}
@@ -55,6 +55,7 @@ public abstract class QueuePool {
synchronized(lockMonitor){
if (queue != null){
queuePool.addLast(queue);
+ log.info("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Added]");
lockMonitor.notifyAll();
}
}
diff --git a/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java b/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java
index ecdc302..d1a5829 100644
--- a/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java
+++ b/src/main/java/com/munjaon/server/server/dto/ConnectUserDto.java
@@ -1,6 +1,7 @@
package com.munjaon.server.server.dto;
import com.munjaon.server.cache.dto.MemberDto;
+import com.munjaon.server.queue.pool.KeyWriteQueue;
import com.munjaon.server.server.config.ServerConfig;
import lombok.Builder;
import lombok.Getter;
@@ -28,6 +29,13 @@ public class ConnectUserDto {
private int command;
/* 요청을 처리중인지 여부 */
private boolean isRunningMode;
+ /* MSGID 중복큐 경로 */
+ private String msgKeyPath;
+ /* MSGID 중복큐 사이즈 */
+ private int msgIdQueueSize;
+ /* MSG ID 중복체크 큐 */
+ private KeyWriteQueue keyWriteQueue;
+
private MemberDto memberDto;
diff --git a/src/main/java/com/munjaon/server/server/service/CollectServer.java b/src/main/java/com/munjaon/server/server/service/CollectServer.java
index 703a035..4f455a8 100644
--- a/src/main/java/com/munjaon/server/server/service/CollectServer.java
+++ b/src/main/java/com/munjaon/server/server/service/CollectServer.java
@@ -1,6 +1,7 @@
package com.munjaon.server.server.service;
import com.munjaon.server.queue.enums.QueueTypeWorker;
+import com.munjaon.server.queue.pool.KeyWriteQueue;
import com.munjaon.server.server.config.ServerConfig;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.queue.CollectUserQueue;
@@ -12,6 +13,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@@ -24,13 +26,17 @@ public class CollectServer extends Service {
private Selector selector;
private final String serviceType;
+ private final String msgKeyPath;
+ private final int msgIdQueueSize;
private long USER_STATUS_LAST_CHECK_TIME = System.currentTimeMillis();
- public CollectServer(String serviceName, String serviceType, int port) {
+ public CollectServer(String serviceName, String serviceType, int port, String msgKeyPath, int msgIdQueueSize) {
super(serviceName);
this.listenAddress = new InetSocketAddress(port);
this.serviceType = serviceType;
+ this.msgKeyPath = msgKeyPath;
+ this.msgIdQueueSize = msgIdQueueSize;
}
@Override
@@ -174,9 +180,12 @@ public class CollectServer extends Service {
/* 소켓 취득 */
Socket socket = channel.socket();
InetAddress inetAddress = socket.getInetAddress();
+ SocketAddress socketAddress = socket.getRemoteSocketAddress();
+
saveLog("[CLIENT CONNECTION IP : " + inetAddress.getHostAddress() + "]");
+ saveLog("[CLIENT CONNECTION IP : " + socketAddress.toString() + "]");
// Socket 채널을 channel에 수신 등록한다
- channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).build());
+ channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).msgKeyPath(this.msgKeyPath).msgIdQueueSize(this.msgIdQueueSize).lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).build());
} catch (Exception e) {
saveLog(e.toString());
throw new RuntimeException(e);
@@ -191,6 +200,11 @@ public class CollectServer extends Service {
SocketChannel channel = (SocketChannel) key.channel();
ConnectUserDto userDto = (ConnectUserDto) key.attachment();
if (userDto != null && userDto.getUserId() != null) {
+ /* MSG ID 중복체크 큐 해제 */
+ KeyWriteQueue keyWriteQueue = userDto.getKeyWriteQueue();
+ if (keyWriteQueue != null) {
+ keyWriteQueue.close();
+ }
saveLog("[CLIENT USER IS DISCONNECT : " + userDto.toString() + "]");
collectUserQueue.removeUser(this.serviceType, userDto.getUserId());
key.attach(null);
diff --git a/src/main/java/com/munjaon/server/server/service/ReportServer.java b/src/main/java/com/munjaon/server/server/service/ReportServer.java
index 6728e19..b31e0ea 100644
--- a/src/main/java/com/munjaon/server/server/service/ReportServer.java
+++ b/src/main/java/com/munjaon/server/server/service/ReportServer.java
@@ -9,6 +9,7 @@ import com.munjaon.server.server.task.ReportResultTask;
import org.json.simple.JSONObject;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
@@ -173,10 +174,13 @@ public class ReportServer extends Service {
channel.configureBlocking(false);
/* 소켓 취득 */
Socket socket = channel.socket();
- SocketAddress remoteAddr = socket.getRemoteSocketAddress();
- saveLog("Connected to: " + remoteAddr);
+ InetAddress inetAddress = socket.getInetAddress();
+ SocketAddress socketAddress = socket.getRemoteSocketAddress();
+
+ saveLog("[CLIENT CONNECTION IP : " + inetAddress.getHostAddress() + "]");
+ saveLog("[CLIENT CONNECTION IP : " + socketAddress.toString() + "]");
// Socket 채널을 channel에 수신 등록한다
- channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).queuePath(System.getProperty("ROOTPATH") + getProp("QUEUE_PATH")).build());
+ channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).queuePath(System.getProperty("ROOTPATH") + getProp("QUEUE_PATH")).build());
} catch (Exception e) {
saveLog(e.toString());
throw new RuntimeException(e);
diff --git a/src/main/java/com/munjaon/server/server/task/CollectServerTask.java b/src/main/java/com/munjaon/server/server/task/CollectServerTask.java
index cd9f020..a7e7c31 100644
--- a/src/main/java/com/munjaon/server/server/task/CollectServerTask.java
+++ b/src/main/java/com/munjaon/server/server/task/CollectServerTask.java
@@ -5,6 +5,7 @@ import com.munjaon.server.cache.enums.CacheService;
import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
+import com.munjaon.server.queue.pool.KeyWriteQueue;
import com.munjaon.server.server.config.ServerConfig;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.packet.common.*;
@@ -278,9 +279,24 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("SMS");
if (worker != null) {
- worker.pushQueue(messageDto);
- channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
- connectUserDto.updateLastTrafficTime();
+ boolean isQueueYn = true;
+ KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
+ try {
+ if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
+ saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
+ } else {
+ worker.pushQueue(messageDto);
+ }
+ } catch (Exception e) {
+ isQueueYn = false;
+ saveSystemLog(e);
+ }
+ if (isQueueYn) {
+ channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
+ connectUserDto.updateLastTrafficTime();
+ /* MSG_ID 큐에 저장 */
+ keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
+ }
}
} catch (Exception e) {
saveLog(e);
@@ -312,9 +328,25 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("LMS");
if (worker != null) {
- worker.pushQueue(messageDto);
- channel.write(LmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
- connectUserDto.updateLastTrafficTime();
+ boolean isQueueYn = true;
+ KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
+ try {
+ if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
+ saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
+ } else {
+ worker.pushQueue(messageDto);
+ }
+
+ } catch (Exception e) {
+ isQueueYn = false;
+ saveSystemLog(e);
+ }
+ if (isQueueYn) {
+ channel.write(LmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
+ connectUserDto.updateLastTrafficTime();
+ /* MSG_ID 큐에 저장 */
+ keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
+ }
}
} catch (Exception e) {
saveLog(e);
@@ -434,9 +466,25 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("MMS");
if (worker != null) {
- worker.pushQueue(messageDto);
- channel.write(MmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
- connectUserDto.updateLastTrafficTime();
+ boolean isQueueYn = true;
+ KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
+ try {
+ if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
+ saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
+ } else {
+ worker.pushQueue(messageDto);
+ }
+
+ } catch (Exception e) {
+ isQueueYn = false;
+ saveSystemLog(e);
+ }
+ if (isQueueYn) {
+ channel.write(MmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
+ connectUserDto.updateLastTrafficTime();
+ /* MSG_ID 큐에 저장 */
+ keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
+ }
}
} catch (Exception e) {
saveLog(e);
@@ -514,9 +562,25 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("KAT");
if (worker != null) {
- worker.pushQueue(messageDto);
- channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
- connectUserDto.updateLastTrafficTime();
+ boolean isQueueYn = true;
+ KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
+ try {
+ if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
+ saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
+ } else {
+ worker.pushQueue(messageDto);
+ }
+
+ } catch (Exception e) {
+ isQueueYn = false;
+ saveSystemLog(e);
+ }
+ if (isQueueYn) {
+ channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
+ connectUserDto.updateLastTrafficTime();
+ /* MSG_ID 큐에 저장 */
+ keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
+ }
} else {
saveSystemLog("worker is null");
}
@@ -526,61 +590,6 @@ public class CollectServerTask extends Thread {
}
}
- private void recvKatDeliver_bak(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
- try {
- ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
- channel.read(bodyBuffer);
- ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
- Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer);
-
- BasicMessageDto messageDto = recvCommonMessage(deliverBuffer);
- messageDto.setUserSubject(KakaoMessage.getSubjectForDeliver(deliverBuffer));
- messageDto.setUserMessage(KakaoMessage.getMessageForDeliver(deliverBuffer));
- messageDto.setKakaoSenderKey(KakaoMessage.getKakaoSenderKeyForDeliver(deliverBuffer));
- messageDto.setKakaoTemplateCode(KakaoMessage.getKakaoTemplateCodeForDeliver(deliverBuffer));
-
- String jsonPath = System.getProperty("ROOTPATH") + File.separator + "kakaofile";
- jsonPath = jsonPath + File.separator + MessageUtil.getDate() + File.separator + SerialNoUtil.getSerialNo();
- FileUtil.mkdirs(jsonPath);
-
- ByteBuffer fileHeadBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_JSON_FILENAME_LENGTH + KakaoMessage.DELIVER_JSON_FILESIZE_LENGTH);
- channel.read(fileHeadBuffer);
-
- String fileName = KakaoMessage.getFileNameForDeliver(fileHeadBuffer);
- String fileSize = KakaoMessage.getFileSizeForDeliver(fileHeadBuffer);
-
- ByteBuffer fileBuffer = ByteBuffer.allocate(Integer.parseInt(fileSize));
- channel.read(fileBuffer);
- fileBuffer.flip();
- JobFileFactory.saveFileForByteBuffer(jsonPath, fileName, fileBuffer);
-
- messageDto.setKakaoJsonFile(jsonPath + File.separator + fileName);
- saveLog(printTaskLog() + "[KAT JSON] [File : " + fileName + ", Size : " + fileSize + "]");
-
- /* 사용자 단가, 발송망 설정 */
- MemberDto savedMemberDto = null;
- if (this.connectUserDto != null) {
- savedMemberDto = this.connectUserDto.getMemberDto();
- }
- if (savedMemberDto != null) {
- messageDto.setRouterSeq(savedMemberDto.getKakaoAtAgentCode());
- messageDto.setUnitCost(String.valueOf(savedMemberDto.getKakaoAtPrice()));
- }
-
- saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
- QueueTypeWorker worker = QueueTypeWorker.find("KAT");
- if (worker != null) {
- worker.pushQueue(messageDto);
- channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
- connectUserDto.updateLastTrafficTime();
- } else {
- saveSystemLog("worker is null");
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
private void recvKftDeliver(SocketChannel channel, ByteBuffer headBuffer) throws Exception {
try {
ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
@@ -651,9 +660,24 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("KFT");
if (worker != null) {
- worker.pushQueue(messageDto);
- channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
- connectUserDto.updateLastTrafficTime();
+ boolean isQueueYn = true;
+ KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
+ try {
+ if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
+ saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
+ } else {
+ worker.pushQueue(messageDto);
+ }
+ } catch (Exception e) {
+ isQueueYn = false;
+ saveSystemLog(e);
+ }
+ if (isQueueYn) {
+ channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
+ connectUserDto.updateLastTrafficTime();
+ /* MSG_ID 큐에 저장 */
+ keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
+ }
}
} catch (Exception e) {
saveLog(e);
@@ -661,59 +685,6 @@ public class CollectServerTask extends Thread {
}
}
- private void recvKftDeliver_bak(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
- try {
- ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
- channel.read(bodyBuffer);
- ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
- Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer);
-
- BasicMessageDto messageDto = recvCommonMessage(deliverBuffer);
- messageDto.setUserSubject(KakaoMessage.getSubjectForDeliver(deliverBuffer));
- messageDto.setUserMessage(KakaoMessage.getMessageForDeliver(deliverBuffer));
- messageDto.setKakaoSenderKey(KakaoMessage.getKakaoSenderKeyForDeliver(deliverBuffer));
- messageDto.setKakaoTemplateCode(KakaoMessage.getKakaoTemplateCodeForDeliver(deliverBuffer));
-
- String jsonPath = System.getProperty("ROOTPATH") + File.separator + "kakaofile";
- jsonPath = jsonPath + File.separator + MessageUtil.getDate() + File.separator + SerialNoUtil.getSerialNo();
- FileUtil.mkdirs(jsonPath);
-
- ByteBuffer fileHeadBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_JSON_FILENAME_LENGTH + KakaoMessage.DELIVER_JSON_FILESIZE_LENGTH);
- channel.read(fileHeadBuffer);
-
- String fileName = KakaoMessage.getFileNameForDeliver(fileHeadBuffer);
- String fileSize = KakaoMessage.getFileSizeForDeliver(fileHeadBuffer);
-
- ByteBuffer fileBuffer = ByteBuffer.allocate(Integer.parseInt(fileSize));
- channel.read(fileBuffer);
- fileBuffer.flip();
- JobFileFactory.saveFileForByteBuffer(jsonPath, fileName, fileBuffer);
-
- messageDto.setKakaoJsonFile(jsonPath + File.separator + fileName);
- saveLog(printTaskLog() + "[KFT JSON] [File : " + fileName + ", Size : " + fileSize + "]");
-
- /* 사용자 단가, 발송망 설정 */
- MemberDto savedMemberDto = null;
- if (this.connectUserDto != null) {
- savedMemberDto = this.connectUserDto.getMemberDto();
- }
- if (savedMemberDto != null) {
- messageDto.setRouterSeq(savedMemberDto.getKakaoFtAgentCode());
- messageDto.setUnitCost(String.valueOf(savedMemberDto.getKakaoFtPrice()));
- }
-
- saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
- QueueTypeWorker worker = QueueTypeWorker.find("KFT");
- if (worker != null) {
- worker.pushQueue(messageDto);
- channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
- connectUserDto.updateLastTrafficTime();
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
private void recvLinkCheck(SocketChannel channel) throws IOException {
/* 서비스 중지여부 체크 */
if (isExpireService()) {
@@ -756,10 +727,10 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[REMOTE IP : " + connectUserDto.getRemoteIP() + "]");
saveLog(printTaskLog() + "[ALLOW IP BASIC : " + memberDto.getAllowIpBasic() + "]");
saveLog(printTaskLog() + "[ALLOW IP EXTEND : " + memberDto.getAllowIpExtend() + "]");
- if (memberDto.getAllowIpBasic() != null && connectUserDto.getRemoteIP().indexOf(memberDto.getAllowIpBasic()) >= 0) {
+ if (memberDto.getAllowIpBasic() != null && connectUserDto.getRemoteIP().trim().equals(memberDto.getAllowIpBasic())) {
isPermit = true;
}
- if (memberDto.getAllowIpExtend() != null && connectUserDto.getRemoteIP().indexOf(memberDto.getAllowIpExtend()) >= 0) {
+ if (memberDto.getAllowIpExtend() != null && connectUserDto.getRemoteIP().trim().equals(memberDto.getAllowIpExtend())) {
isPermit = true;
}
} else {
@@ -777,6 +748,9 @@ public class CollectServerTask extends Thread {
connectUserDto.setMemberDto(memberDto);
/* 세션통신 시간 업데이트 */
connectUserDto.updateLastTrafficTime();
+ /* MSGID 중복체크큐 초기화 */
+ KeyWriteQueue keyWriteQueue = new KeyWriteQueue(connectUserDto.getMsgKeyPath(), connectUserDto.getUserId(), connectUserDto.getMsgIdQueueSize());
+ connectUserDto.setKeyWriteQueue(keyWriteQueue);
}
} catch (Exception e) {
resultCode = "10";
@@ -864,6 +838,11 @@ public class CollectServerTask extends Thread {
if (connectUserDto != null) {
if (connectUserDto.getUserId() != null) {
collectUserQueue.removeUser(connectUserDto.getServiceType(), connectUserDto.getUserId());
+ /* MSG ID 중복체크 큐 해제 */
+ KeyWriteQueue keyWriteQueue = connectUserDto.getKeyWriteQueue();
+ if (keyWriteQueue != null) {
+ keyWriteQueue.close();
+ }
/* 모니터링 로그 */
HealthCheckServer.saveMonitorLog("[COLLECT SERVER][SERVICE TYPE : " + this.serviceType + "][ID : " + connectUserDto.getUserId() + "][EXPIRE CONNECT USER]");
}
diff --git a/src/main/resources/sqlmap/sms_sql.xml b/src/main/resources/sqlmap/sms_sql.xml
index 3a88a4b..778eed6 100644
--- a/src/main/resources/sqlmap/sms_sql.xml
+++ b/src/main/resources/sqlmap/sms_sql.xml
@@ -24,6 +24,7 @@
(#{item.id}, #{item.msgGroupID}, #{item.userId}, #{item.userMsgID}, #{item.routerSeq}, 0, NOW(), #{item.userReceiver}, #{item.userSender}, NULL, #{item.userMessage}, '4' )
+ ON DUPLICATE KEY UPDATE REQ_DATE = VALUES(REQ_DATE)