누락건 로직 수정, IP체크, 중복건 로직 추가

This commit is contained in:
jangdongsin 2025-02-17 23:46:04 +09:00
parent 9ea4d7653b
commit 229f464ec3
8 changed files with 369 additions and 136 deletions

View File

@ -197,9 +197,11 @@ public class RunnerConfiguration {
try {
String serviceName = "SMS_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
@ -217,9 +219,11 @@ public class RunnerConfiguration {
try {
String serviceName = "LMS_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
@ -237,9 +241,11 @@ public class RunnerConfiguration {
try {
String serviceName = "MMS_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
@ -257,9 +263,11 @@ public class RunnerConfiguration {
try {
String serviceName = "KAT_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);
@ -277,9 +285,11 @@ public class RunnerConfiguration {
try {
String serviceName = "KFT_COLLECTOR";
String serviceType = serverConfig.getString(serviceName + ".SERVICE_TYPE");
String msgKeyPath = serverConfig.getString(serviceName + ".MSGID_QUEUE");
int port = serverConfig.getInt(serviceName + ".SERVICE_PORT");
int msgIdQueueSize = serverConfig.getInt(serviceName + ".MSGID_QUEUE_SIZE");
// CollectBackServerService collectServerService = new CollectBackServerService(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port);
CollectServer collectServer = new CollectServer(serviceName, serviceType, port, msgKeyPath, msgIdQueueSize);
ShutdownService shutdownService = new ShutdownService(collectServer);
Runtime.getRuntime().addShutdownHook(shutdownService);

View File

@ -0,0 +1,216 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.server.packet.common.Packet;
import com.munjaon.server.util.FileUtil;
import lombok.Getter;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
@Getter
public class KeyWriteQueue {
private final static int HEAD_SIZE = 40;
private final static int HEAD_TOTAL_COUNT_LOCATION = 0;
private final static int HEAD_TOTAL_COUNT_SIZE = 20;
private final static int HEAD_WRITE_POSITION_LOCATION = 20;
private final static int HEAD_WRITE_POSITION_SIZE = 20;
private final static int BODY_WRITE_KEY_SIZE = 20;
/* Queue 변수 */
private final String queuePath;
private final String queueName;
private final int MAX_QUEUE_SIZE;
private int writePosition = -1;
private int totalCount = -1;
/** Queue Header Buffer */
protected ByteBuffer headerBuffer = null;
/** Header 에서 사용하는 변수 */
protected byte[] headerArray = null;
/** Queue File Channel */
protected FileChannel channel = null;
/** pushBuffer() 함수에서 사용하는 변수 */
protected ByteBuffer dataBuffer = null;
/** isExist() 함수에서 사용하는 변수 */
protected ByteBuffer searchBuffer = null;
public KeyWriteQueue(String queuePath, String queueName, int maxQueueSize) {
this.queuePath = System.getProperty("ROOTPATH") + File.separator + queuePath;
this.queueName = queueName;
MAX_QUEUE_SIZE = maxQueueSize;
/* 큐초기화 */
try {
initQueue();
} catch (Exception e) {}
}
public boolean isExist(String key) throws Exception {
if (this.totalCount == 0) {
return false;
}
boolean exists = false;
for (int i = 0; i < this.totalCount; i++) {
int position = this.HEAD_SIZE + (i * BODY_WRITE_KEY_SIZE);
this.channel.position(position);
/* 읽기용 데이터 버퍼 초기화 */
initSearchBuffer();
this.channel.read(searchBuffer);
byte[] byteArray = new byte[BODY_WRITE_KEY_SIZE];
searchBuffer.position(0);
searchBuffer.get(byteArray);
String queueKey = QueueConstants.getString(byteArray);
if (queueKey.equals(key)) {
exists = true;
break;
}
}
return exists;
}
public void pushKeyToBuffer(String key) throws Exception {
if (key == null || key.isEmpty()) {
return;
}
/* 1. dataBuffer 초기화 */
initDataBuffer();
/* 2. messageDto >> dataBuffer */
this.dataBuffer.position(0);
this.dataBuffer.put(key.getBytes(Packet.AGENT_CHARACTER_SET));
/* 3.1 Header 정보 다시 일기 */
readHeader();
if (this.dataBuffer != null) {
int position = HEAD_SIZE + (BODY_WRITE_KEY_SIZE * this.writePosition);
this.channel.position(position);
this.dataBuffer.flip();
this.channel.write(this.dataBuffer);
/* 3.2 Push 카운터 증가 */
this.writePosition = this.writePosition + 1;
if (this.writePosition > this.MAX_QUEUE_SIZE) {
this.writePosition = 0;
}
this.totalCount = this.totalCount + 1;
if (this.totalCount > this.MAX_QUEUE_SIZE) {
this.totalCount = this.MAX_QUEUE_SIZE;
}
/* 3.3 Header 정보 변경 */
writeHeader();
}
}
public void initDataBuffer() {
if (this.dataBuffer == null) {
this.dataBuffer = ByteBuffer.allocateDirect(BODY_WRITE_KEY_SIZE);
}
this.dataBuffer.clear();
for(int loopCnt = 0; loopCnt < BODY_WRITE_KEY_SIZE; loopCnt++){
this.dataBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
}
this.dataBuffer.position(0);
}
public void initSearchBuffer() {
if (this.searchBuffer == null) {
this.searchBuffer = ByteBuffer.allocateDirect(BODY_WRITE_KEY_SIZE);
}
this.searchBuffer.clear();
for(int loopCnt = 0; loopCnt < BODY_WRITE_KEY_SIZE; loopCnt++){
this.searchBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
}
this.searchBuffer.position(0);
}
private void initQueue() throws Exception {
this.headerBuffer = ByteBuffer.allocateDirect(HEAD_SIZE);
try{
/* 1. 경로 체크 및 생성 */
FileUtil.mkdirs(this.queuePath);
/* 2. 파일생성 및 읽기 */
File file = new File(this.queuePath + File.separator + queueName + ".queue");
this.channel = new RandomAccessFile(file, "rw").getChannel();
if (file.length() == 0) {
// Push Pop 카운트 초기화
this.writePosition = 0;
this.totalCount = 0;
// 헤더 초기화
writeHeader();
} else {
readHeader();
}
// backupQueue();
} catch(Exception e) {
throw e;
} finally {
//lock.release();
}
}
protected void readHeader() throws Exception {
try {
initHeaderBuffer();
this.channel.position(0);
this.channel.read(this.headerBuffer);
this.headerArray = new byte[HEAD_TOTAL_COUNT_SIZE];
// 생성날짜 가져오기 - 생성날짜(10) / 읽은카운트(10) / 쓴카운트(10)
this.headerBuffer.position(HEAD_TOTAL_COUNT_LOCATION);
this.headerBuffer.get(this.headerArray);
this.totalCount = Integer.parseInt(QueueConstants.getString(this.headerArray));
// 카운트 가져오기
this.headerArray = new byte[HEAD_WRITE_POSITION_SIZE];
this.headerBuffer.position(HEAD_WRITE_POSITION_LOCATION);
this.headerBuffer.get(this.headerArray);
this.writePosition = Integer.parseInt(QueueConstants.getString(this.headerArray));
} catch(Exception e) {
throw e;
}
}
protected void writeHeader() throws Exception {
try {
initHeaderBuffer();
this.channel.position(0);
this.headerBuffer.put(Integer.toString(this.totalCount).getBytes(Packet.AGENT_CHARACTER_SET));
this.headerBuffer.position(HEAD_WRITE_POSITION_LOCATION);
this.headerBuffer.put(Integer.toString(this.writePosition).getBytes(Packet.AGENT_CHARACTER_SET));
this.headerBuffer.flip();
this.channel.write(this.headerBuffer);
} catch(Exception e) {
throw e;
}
}
protected void initHeaderBuffer(){
this.headerBuffer.clear();
for(int loopCnt = 0; loopCnt < HEAD_SIZE; loopCnt++){
this.headerBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
}
this.headerBuffer.position(0);
}
public void close() throws IOException {
try {
if (isOpen()) {
channel.close();
}
} catch(IOException e) {
throw e;
}
}
protected boolean isOpen() {
if (this.channel == null) {
return false;
}
return this.channel.isOpen();
}
}

View File

@ -43,7 +43,7 @@ public abstract class QueuePool {
queue = queuePool.get(loopCnt);
if(name.equals(queue.getQueueName())){
queuePool.remove(loopCnt);
System.out.println("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Removed]");
log.info("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Removed]");
break;
}
}
@ -55,6 +55,7 @@ public abstract class QueuePool {
synchronized(lockMonitor){
if (queue != null){
queuePool.addLast(queue);
log.info("[" + queue.getQueueInfo().getServiceType() + " Queue] [" + queue.getQueueName() + " is Added]");
lockMonitor.notifyAll();
}
}

View File

@ -1,6 +1,7 @@
package com.munjaon.server.server.dto;
import com.munjaon.server.cache.dto.MemberDto;
import com.munjaon.server.queue.pool.KeyWriteQueue;
import com.munjaon.server.server.config.ServerConfig;
import lombok.Builder;
import lombok.Getter;
@ -28,6 +29,13 @@ public class ConnectUserDto {
private int command;
/* 요청을 처리중인지 여부 */
private boolean isRunningMode;
/* MSGID 중복큐 경로 */
private String msgKeyPath;
/* MSGID 중복큐 사이즈 */
private int msgIdQueueSize;
/* MSG ID 중복체크 큐 */
private KeyWriteQueue keyWriteQueue;
private MemberDto memberDto;

View File

@ -1,6 +1,7 @@
package com.munjaon.server.server.service;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.queue.pool.KeyWriteQueue;
import com.munjaon.server.server.config.ServerConfig;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.queue.CollectUserQueue;
@ -12,6 +13,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
@ -24,13 +26,17 @@ public class CollectServer extends Service {
private Selector selector;
private final String serviceType;
private final String msgKeyPath;
private final int msgIdQueueSize;
private long USER_STATUS_LAST_CHECK_TIME = System.currentTimeMillis();
public CollectServer(String serviceName, String serviceType, int port) {
public CollectServer(String serviceName, String serviceType, int port, String msgKeyPath, int msgIdQueueSize) {
super(serviceName);
this.listenAddress = new InetSocketAddress(port);
this.serviceType = serviceType;
this.msgKeyPath = msgKeyPath;
this.msgIdQueueSize = msgIdQueueSize;
}
@Override
@ -174,9 +180,12 @@ public class CollectServer extends Service {
/* 소켓 취득 */
Socket socket = channel.socket();
InetAddress inetAddress = socket.getInetAddress();
SocketAddress socketAddress = socket.getRemoteSocketAddress();
saveLog("[CLIENT CONNECTION IP : " + inetAddress.getHostAddress() + "]");
saveLog("[CLIENT CONNECTION IP : " + socketAddress.toString() + "]");
// Socket 채널을 channel에 수신 등록한다
channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).build());
channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).msgKeyPath(this.msgKeyPath).msgIdQueueSize(this.msgIdQueueSize).lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).build());
} catch (Exception e) {
saveLog(e.toString());
throw new RuntimeException(e);
@ -191,6 +200,11 @@ public class CollectServer extends Service {
SocketChannel channel = (SocketChannel) key.channel();
ConnectUserDto userDto = (ConnectUserDto) key.attachment();
if (userDto != null && userDto.getUserId() != null) {
/* MSG ID 중복체크 큐 해제 */
KeyWriteQueue keyWriteQueue = userDto.getKeyWriteQueue();
if (keyWriteQueue != null) {
keyWriteQueue.close();
}
saveLog("[CLIENT USER IS DISCONNECT : " + userDto.toString() + "]");
collectUserQueue.removeUser(this.serviceType, userDto.getUserId());
key.attach(null);

View File

@ -9,6 +9,7 @@ import com.munjaon.server.server.task.ReportResultTask;
import org.json.simple.JSONObject;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
@ -173,10 +174,13 @@ public class ReportServer extends Service {
channel.configureBlocking(false);
/* 소켓 취득 */
Socket socket = channel.socket();
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
saveLog("Connected to: " + remoteAddr);
InetAddress inetAddress = socket.getInetAddress();
SocketAddress socketAddress = socket.getRemoteSocketAddress();
saveLog("[CLIENT CONNECTION IP : " + inetAddress.getHostAddress() + "]");
saveLog("[CLIENT CONNECTION IP : " + socketAddress.toString() + "]");
// Socket 채널을 channel에 수신 등록한다
channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).queuePath(System.getProperty("ROOTPATH") + getProp("QUEUE_PATH")).build());
channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).queuePath(System.getProperty("ROOTPATH") + getProp("QUEUE_PATH")).build());
} catch (Exception e) {
saveLog(e.toString());
throw new RuntimeException(e);

View File

@ -5,6 +5,7 @@ import com.munjaon.server.cache.enums.CacheService;
import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.queue.pool.KeyWriteQueue;
import com.munjaon.server.server.config.ServerConfig;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.packet.common.*;
@ -278,9 +279,24 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("SMS");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
boolean isQueueYn = true;
KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
try {
if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
} else {
worker.pushQueue(messageDto);
}
} catch (Exception e) {
isQueueYn = false;
saveSystemLog(e);
}
if (isQueueYn) {
channel.write(SmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
/* MSG_ID 큐에 저장 */
keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
}
}
} catch (Exception e) {
saveLog(e);
@ -312,9 +328,25 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("LMS");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(LmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
boolean isQueueYn = true;
KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
try {
if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
} else {
worker.pushQueue(messageDto);
}
} catch (Exception e) {
isQueueYn = false;
saveSystemLog(e);
}
if (isQueueYn) {
channel.write(LmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
/* MSG_ID 큐에 저장 */
keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
}
}
} catch (Exception e) {
saveLog(e);
@ -434,9 +466,25 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("MMS");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(MmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
boolean isQueueYn = true;
KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
try {
if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
} else {
worker.pushQueue(messageDto);
}
} catch (Exception e) {
isQueueYn = false;
saveSystemLog(e);
}
if (isQueueYn) {
channel.write(MmsMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
/* MSG_ID 큐에 저장 */
keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
}
}
} catch (Exception e) {
saveLog(e);
@ -514,9 +562,25 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("KAT");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
boolean isQueueYn = true;
KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
try {
if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
} else {
worker.pushQueue(messageDto);
}
} catch (Exception e) {
isQueueYn = false;
saveSystemLog(e);
}
if (isQueueYn) {
channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
/* MSG_ID 큐에 저장 */
keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
}
} else {
saveSystemLog("worker is null");
}
@ -526,61 +590,6 @@ public class CollectServerTask extends Thread {
}
}
private void recvKatDeliver_bak(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
try {
ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
channel.read(bodyBuffer);
ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer);
BasicMessageDto messageDto = recvCommonMessage(deliverBuffer);
messageDto.setUserSubject(KakaoMessage.getSubjectForDeliver(deliverBuffer));
messageDto.setUserMessage(KakaoMessage.getMessageForDeliver(deliverBuffer));
messageDto.setKakaoSenderKey(KakaoMessage.getKakaoSenderKeyForDeliver(deliverBuffer));
messageDto.setKakaoTemplateCode(KakaoMessage.getKakaoTemplateCodeForDeliver(deliverBuffer));
String jsonPath = System.getProperty("ROOTPATH") + File.separator + "kakaofile";
jsonPath = jsonPath + File.separator + MessageUtil.getDate() + File.separator + SerialNoUtil.getSerialNo();
FileUtil.mkdirs(jsonPath);
ByteBuffer fileHeadBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_JSON_FILENAME_LENGTH + KakaoMessage.DELIVER_JSON_FILESIZE_LENGTH);
channel.read(fileHeadBuffer);
String fileName = KakaoMessage.getFileNameForDeliver(fileHeadBuffer);
String fileSize = KakaoMessage.getFileSizeForDeliver(fileHeadBuffer);
ByteBuffer fileBuffer = ByteBuffer.allocate(Integer.parseInt(fileSize));
channel.read(fileBuffer);
fileBuffer.flip();
JobFileFactory.saveFileForByteBuffer(jsonPath, fileName, fileBuffer);
messageDto.setKakaoJsonFile(jsonPath + File.separator + fileName);
saveLog(printTaskLog() + "[KAT JSON] [File : " + fileName + ", Size : " + fileSize + "]");
/* 사용자 단가, 발송망 설정 */
MemberDto savedMemberDto = null;
if (this.connectUserDto != null) {
savedMemberDto = this.connectUserDto.getMemberDto();
}
if (savedMemberDto != null) {
messageDto.setRouterSeq(savedMemberDto.getKakaoAtAgentCode());
messageDto.setUnitCost(String.valueOf(savedMemberDto.getKakaoAtPrice()));
}
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("KAT");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
} else {
saveSystemLog("worker is null");
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void recvKftDeliver(SocketChannel channel, ByteBuffer headBuffer) throws Exception {
try {
ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
@ -651,9 +660,24 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("KFT");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
boolean isQueueYn = true;
KeyWriteQueue keyWriteQueue = this.connectUserDto.getKeyWriteQueue();
try {
if (keyWriteQueue.isExist(messageDto.getUserMsgID())) {
saveSystemLog("USER_ID : " + this.connectUserDto.getUserId() + " | DUPLICATED MSG_ID : " + messageDto.getUserMsgID());
} else {
worker.pushQueue(messageDto);
}
} catch (Exception e) {
isQueueYn = false;
saveSystemLog(e);
}
if (isQueueYn) {
channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
/* MSG_ID 큐에 저장 */
keyWriteQueue.pushKeyToBuffer(messageDto.getUserMsgID());
}
}
} catch (Exception e) {
saveLog(e);
@ -661,59 +685,6 @@ public class CollectServerTask extends Thread {
}
}
private void recvKftDeliver_bak(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
try {
ByteBuffer bodyBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
channel.read(bodyBuffer);
ByteBuffer deliverBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + KakaoMessage.DELIVER_KAKAO_BODY_LENGTH);
Packet.mergeBuffers(deliverBuffer, headBuffer, bodyBuffer);
BasicMessageDto messageDto = recvCommonMessage(deliverBuffer);
messageDto.setUserSubject(KakaoMessage.getSubjectForDeliver(deliverBuffer));
messageDto.setUserMessage(KakaoMessage.getMessageForDeliver(deliverBuffer));
messageDto.setKakaoSenderKey(KakaoMessage.getKakaoSenderKeyForDeliver(deliverBuffer));
messageDto.setKakaoTemplateCode(KakaoMessage.getKakaoTemplateCodeForDeliver(deliverBuffer));
String jsonPath = System.getProperty("ROOTPATH") + File.separator + "kakaofile";
jsonPath = jsonPath + File.separator + MessageUtil.getDate() + File.separator + SerialNoUtil.getSerialNo();
FileUtil.mkdirs(jsonPath);
ByteBuffer fileHeadBuffer = ByteBuffer.allocate(KakaoMessage.DELIVER_JSON_FILENAME_LENGTH + KakaoMessage.DELIVER_JSON_FILESIZE_LENGTH);
channel.read(fileHeadBuffer);
String fileName = KakaoMessage.getFileNameForDeliver(fileHeadBuffer);
String fileSize = KakaoMessage.getFileSizeForDeliver(fileHeadBuffer);
ByteBuffer fileBuffer = ByteBuffer.allocate(Integer.parseInt(fileSize));
channel.read(fileBuffer);
fileBuffer.flip();
JobFileFactory.saveFileForByteBuffer(jsonPath, fileName, fileBuffer);
messageDto.setKakaoJsonFile(jsonPath + File.separator + fileName);
saveLog(printTaskLog() + "[KFT JSON] [File : " + fileName + ", Size : " + fileSize + "]");
/* 사용자 단가, 발송망 설정 */
MemberDto savedMemberDto = null;
if (this.connectUserDto != null) {
savedMemberDto = this.connectUserDto.getMemberDto();
}
if (savedMemberDto != null) {
messageDto.setRouterSeq(savedMemberDto.getKakaoFtAgentCode());
messageDto.setUnitCost(String.valueOf(savedMemberDto.getKakaoFtPrice()));
}
saveLog(printTaskLog() + "[MESSAGE : " + messageDto.toString() + "]");
QueueTypeWorker worker = QueueTypeWorker.find("KFT");
if (worker != null) {
worker.pushQueue(messageDto);
channel.write(KakaoMessage.makeDeliverAckBuffer(messageDto.getUserMsgID(), messageDto.getSendStatus()));
connectUserDto.updateLastTrafficTime();
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void recvLinkCheck(SocketChannel channel) throws IOException {
/* 서비스 중지여부 체크 */
if (isExpireService()) {
@ -756,10 +727,10 @@ public class CollectServerTask extends Thread {
saveLog(printTaskLog() + "[REMOTE IP : " + connectUserDto.getRemoteIP() + "]");
saveLog(printTaskLog() + "[ALLOW IP BASIC : " + memberDto.getAllowIpBasic() + "]");
saveLog(printTaskLog() + "[ALLOW IP EXTEND : " + memberDto.getAllowIpExtend() + "]");
if (memberDto.getAllowIpBasic() != null && connectUserDto.getRemoteIP().indexOf(memberDto.getAllowIpBasic()) >= 0) {
if (memberDto.getAllowIpBasic() != null && connectUserDto.getRemoteIP().trim().equals(memberDto.getAllowIpBasic())) {
isPermit = true;
}
if (memberDto.getAllowIpExtend() != null && connectUserDto.getRemoteIP().indexOf(memberDto.getAllowIpExtend()) >= 0) {
if (memberDto.getAllowIpExtend() != null && connectUserDto.getRemoteIP().trim().equals(memberDto.getAllowIpExtend())) {
isPermit = true;
}
} else {
@ -777,6 +748,9 @@ public class CollectServerTask extends Thread {
connectUserDto.setMemberDto(memberDto);
/* 세션통신 시간 업데이트 */
connectUserDto.updateLastTrafficTime();
/* MSGID 중복체크큐 초기화 */
KeyWriteQueue keyWriteQueue = new KeyWriteQueue(connectUserDto.getMsgKeyPath(), connectUserDto.getUserId(), connectUserDto.getMsgIdQueueSize());
connectUserDto.setKeyWriteQueue(keyWriteQueue);
}
} catch (Exception e) {
resultCode = "10";
@ -864,6 +838,11 @@ public class CollectServerTask extends Thread {
if (connectUserDto != null) {
if (connectUserDto.getUserId() != null) {
collectUserQueue.removeUser(connectUserDto.getServiceType(), connectUserDto.getUserId());
/* MSG ID 중복체크 큐 해제 */
KeyWriteQueue keyWriteQueue = connectUserDto.getKeyWriteQueue();
if (keyWriteQueue != null) {
keyWriteQueue.close();
}
/* 모니터링 로그 */
HealthCheckServer.saveMonitorLog("[COLLECT SERVER][SERVICE TYPE : " + this.serviceType + "][ID : " + connectUserDto.getUserId() + "][EXPIRE CONNECT USER]");
}

View File

@ -24,6 +24,7 @@
<foreach separator="," item="item" collection="list">
(#{item.id}, #{item.msgGroupID}, #{item.userId}, #{item.userMsgID}, #{item.routerSeq}, 0, NOW(), #{item.userReceiver}, #{item.userSender}, NULL, #{item.userMessage}, '4' )
</foreach>
ON DUPLICATE KEY UPDATE REQ_DATE = VALUES(REQ_DATE)
</insert>
<insert id="insertGroupForList" parameterType="java.util.List">