Thread 이슈 및 클라이언트 종료 조건 수정

This commit is contained in:
dsjang 2024-09-12 09:26:27 +09:00
parent 48c5207d05
commit 4bc5fbcc26
35 changed files with 329 additions and 276 deletions

View File

@ -50,6 +50,10 @@ dependencies {
// https://mvnrepository.com/artifact/org.jdom/jdom2
implementation 'org.jdom:jdom2:2.0.6.1'
implementation("com.slack.api:bolt:1.40.3")
implementation("com.slack.api:bolt-servlet:1.40.3")
implementation("com.slack.api:bolt-jetty:1.40.3")
testCompileOnly 'org.projectlombok:lombok'
testAnnotationProcessor 'org.projectlombok:lombok'
}

View File

@ -88,24 +88,18 @@ public class ReportQueue {
}
}
public boolean isTruncateQueue(int maxWriteCount) {
public boolean isTruncateQueue(int maxQueueCount) {
synchronized (lockObject) {
if (this.writeCounter >= maxWriteCount) {
if (this.writeCounter == this.readCounter) {
return true;
}
return false;
if (this.writeCounter > 0 && this.writeCounter >= maxQueueCount) {
return this.writeCounter == this.readCounter;
}
return false;
}
}
public boolean isWriteLimit(int maxWriteCount) {
public boolean isWriteLimit(int maxQueueCount) {
synchronized (lockObject) {
if (this.writeCounter >= maxWriteCount) {
return true;
}
return false;
return this.writeCounter > 0 && this.writeCounter >= maxQueueCount;
}
}

View File

@ -13,7 +13,7 @@ public final class ServerConfig {
/* 사용자 정보 조회 체크 시간 */
public static final long USER_STATUS_CYCLE_TIME = 60000;
/* Deliver Thread 실행 시간 */
public static final long DELIVER_EXEC_CYCLE_TIME = 5000;
public static final long DELIVER_EXEC_CYCLE_TIME = 3000;
/* Report Thread 실행 시간 */
public static final long REPORT_EXEC_CYCLE_TIME = 5000;
public static final long REPORT_EXEC_CYCLE_TIME = 3000;
}

View File

@ -21,7 +21,8 @@ public class ReportUserDto {
private ReportQueue reportQueue; //ReportQueue
private MemberDto memberDto; //사용자 정보
private int command; //요청 command
private int maxWriteCount; //요청 command
private int maxWriteCount; //건당 Report 개수
private int maxQueueCount; //Report Queue에 유지할 건수
/* 요청을 처리중인지 여부 */
private boolean isRunningMode;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import java.nio.ByteBuffer;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import com.munjaon.server.util.CommonUtil;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import com.munjaon.server.util.ByteUtil;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import com.munjaon.server.util.CommonUtil;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import java.nio.ByteBuffer;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import com.munjaon.server.util.CommonUtil;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import com.munjaon.server.util.CommonUtil;

View File

@ -1,10 +1,13 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import java.nio.ByteBuffer;
public final class Packet {
public static final byte SET_DEFAULT_BYTE = (byte) 0x00;
public static final long LINK_CHECK_CYCLE = 30000L;
/* LinkCheck 전송 체크 시간 간격 */
public static final long LINK_CHECK_CYCLE = 10000L;
/* 패킷 만료 시간 체크 */
public static final long LIMIT_PACKET_TIMEOUT = 25000L;
public static void setDefaultByte(ByteBuffer buffer) {
if (buffer == null) {
return;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import com.munjaon.server.server.dto.ReportDto;

View File

@ -1,4 +1,4 @@
package com.munjaon.server.server.packet;
package com.munjaon.server.server.packet.common;
import com.munjaon.server.util.CommonUtil;

View File

@ -0,0 +1,49 @@
package com.munjaon.server.server.packet.deliver;
import com.munjaon.server.cache.dto.MemberDto;
import com.munjaon.server.server.queue.CollectUserQueue;
public final class DeliverBind {
public static String checkDuplicate(CollectUserQueue collectUserQueue, String serviceType, String id) {
if (collectUserQueue == null || serviceType == null || id == null) {
return "60";
}
if (collectUserQueue.isExist(serviceType, id)) {
return "60";
}
return "00";
}
public static String checkService(MemberDto memberDto, String serviceType, String id, String pwd) {
if (id == null || pwd == null) {
return "50";
}
if (memberDto == null || !pwd.equals(memberDto.getAccessKey())) {
return "20";
}
/* 회원 사용 상태 */
if (memberDto.getMberSttus() == null || "N".equals(memberDto.getMberSttus())) {
return "30";
}
/* 서비스 이용 상태 */
if ("SMS".equals(serviceType) && "N".equals(memberDto.getSmsUseYn())) {
return "30";
}
if ("LMS".equals(serviceType) && "N".equals(memberDto.getLmsUseYn())) {
return "30";
}
if ("MMS".equals(serviceType) && "N".equals(memberDto.getMmsUseYn())) {
return "30";
}
if ("KAT".equals(serviceType) && "N".equals(memberDto.getKakaoAtUseYn())) {
return "30";
}
if ("KFT".equals(serviceType) && "N".equals(memberDto.getKakaoFtUseYn())) {
return "30";
}
return "00";
}
}

View File

@ -0,0 +1,32 @@
package com.munjaon.server.server.sample;
import com.slack.api.Slack;
import com.slack.api.methods.MethodsClient;
import com.slack.api.methods.SlackApiException;
import com.slack.api.methods.request.chat.ChatPostMessageRequest;
import java.io.IOException;
public class SlackTest {
public static void main(final String[] args) {
String token = "xoxb-7591405602163-7594011803524-gKJQfJbhRSmOFr1XFcwujWDr";
String channelAddress = "#프로젝트";
String message = "test";
try{
MethodsClient methods = Slack.getInstance().methods(token);
ChatPostMessageRequest request = ChatPostMessageRequest.builder()
.channel(channelAddress)
.text(message)
.build();
methods.chatPostMessage(request);
// log.info("Slack " + channel + " 에 메시지 보냄");
} catch (SlackApiException | IOException e) {
e.printStackTrace();
// log.error(e.getMessage());
}
}
}

View File

@ -7,7 +7,7 @@ import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.dto.HeaderDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.server.packet.common.*;
import com.munjaon.server.util.LogUtil;
import lombok.Getter;
import org.json.simple.JSONObject;

View File

@ -20,7 +20,7 @@ import java.util.Iterator;
public class CollectServer extends Service {
private final InetSocketAddress listenAddress;
private CollectUserQueue collectUserQueue = CollectUserQueue.getInstance();
private final CollectUserQueue collectUserQueue = CollectUserQueue.getInstance();
private Selector selector;
private final String serviceType;
@ -58,8 +58,7 @@ public class CollectServer extends Service {
}
private void initCollectChannel() throws IOException {
saveSystemLog("COLLECT_SERVER_SERVICE : SERVER SOCKET INITIALIZING ... ...");
saveSystemLog("COLLECT_SERVER_SERVICE : SERVER PORT [" + listenAddress.getPort() + "]");
saveSystemLog("COLLECT_SERVER_SERVICE INITIALIZING : SERVER PORT [" + listenAddress.getPort() + "]");
selector = Selector.open();
/* 채널 생성 */
ServerSocketChannel serverChannel = ServerSocketChannel.open();
@ -94,9 +93,11 @@ public class CollectServer extends Service {
while (isRun()) {
try {
execInterest();
checkInterest();
execUserStatus();
} catch (Exception e) {
throw new RuntimeException(e);
saveSystemLog(e);
// throw new RuntimeException(e);
}
}
saveSystemLog("COLLECT_SERVER_SERVICE : SERVER SERVICE STOPPED ... ...");
@ -122,7 +123,30 @@ public class CollectServer extends Service {
continue;
}
/* 사용자별 Collect Thread 실행 */
new CollectServerTask(selector, key, getName(), this.serviceType, logger).run();
new CollectServerTask(selector, key, getName(), this.serviceType, logger).start();
}
} else {
expireConnectUser(key);
}
}
}
private void checkInterest() {
for (SelectionKey key : selector.keys()) {
if (key.isValid()) {
ConnectUserDto connectUserDto = (ConnectUserDto) key.attachment();
if (connectUserDto == null) {
continue;
}
if (connectUserDto.isAlive() == 1) { // 로그인이 완료되지 않은 경우
expireConnectUser(key);
} else {
if (connectUserDto.isRunningMode()) {
continue;
}
connectUserDto.setRunningMode(true);
/* 사용자별 Collect Thread 실행 */
new CollectServerTask(selector, key, getName(), this.serviceType, logger).start();
}
} else {
expireConnectUser(key);
@ -151,6 +175,7 @@ public class CollectServer extends Service {
// Socket 채널을 channel에 수신 등록한다
channel.register(selector, SelectionKey.OP_READ, ConnectUserDto.builder().serviceType(this.serviceType).lastTrafficTime(System.currentTimeMillis()).remoteIP(inetAddress.getHostAddress()).build());
} catch (Exception e) {
saveSystemLog(e.toString());
throw new RuntimeException(e);
}
}
@ -165,7 +190,6 @@ public class CollectServer extends Service {
if (userDto != null && userDto.getUserId() != null) {
saveSystemLog("[CLIENT USER IS DISCONNECT : " + userDto.toString() + "]");
collectUserQueue.removeUser(this.serviceType, userDto.getUserId());
// connectUserMap.remove(userDto.getUserId());
key.attach(null);
}
// 소켓 채널 닫기
@ -173,6 +197,7 @@ public class CollectServer extends Service {
// 닫기
key.cancel();
} catch (IOException e) {
saveSystemLog(e.toString());
e.printStackTrace();
}
}

View File

@ -79,6 +79,7 @@ public class QueueServerService extends Service {
loadMemoryQueue();
}
} catch (Exception e) {
saveSystemLog(e.toString());
throw new RuntimeException(e);
}
}

View File

@ -16,13 +16,15 @@ import java.util.concurrent.Executors;
public class ReportQueueServerService extends Service {
private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance();
private QueueThreadService threadService;
private int queueMaxCore;
private int maxWriteCount;
private final int queueMaxCore;
private final int maxWriteCount;
private final int maxQueueCount;
public ReportQueueServerService(String serviceName) {
super(serviceName);
this.queueMaxCore = Integer.parseInt(getProp("QUEUE_MAX_CORE").trim());
this.maxWriteCount = Integer.parseInt(getProp("MAX_WRITE_COUNT").trim());
this.maxQueueCount = Integer.parseInt(getProp("MAX_QUEUE_COUNT").trim());
}
@Override
@ -32,16 +34,21 @@ public class ReportQueueServerService extends Service {
@Override
public void initResources() {
saveSystemLog("REPORT_QUEUE_SERVICE : RESOURCES INITIALIZING ... ...");
threadService = new QueueThreadService(queueMaxCore, logger);
saveSystemLog("REPORT_QUEUE_SERVICE : RESOURCES INITIALIZED ... ...");
}
@Override
public void releaseResources() {
saveSystemLog("REPORT_QUEUE_SERVICE : SERVER RESOURCE RELEASING ... ...");
threadService.close();
saveSystemLog("REPORT_QUEUE_SERVICE : SERVER RESOURCE RELEASED ... ...");
}
@Override
public void doService() {
saveSystemLog("REPORT_QUEUE_SERVICE : SERVER SERVICE STARTED ... ...");
while (isRun()) {
try {
doQueueService();
@ -50,16 +57,18 @@ public class ReportQueueServerService extends Service {
throw new RuntimeException(e);
}
}
saveSystemLog("REPORT_QUEUE_SERVICE : SERVER SERVICE STOPPED ... ...");
}
private void doQueueService() {
List<ReportUserDto> reportUserList = reportUserQueue.getUsers();
if (reportUserList == null || reportUserList.size() == 0) {
if (reportUserList == null || reportUserList.isEmpty()) {
return;
}
for (ReportUserDto reportUserDto : reportUserList) {
reportUserDto.setMaxWriteCount(this.maxWriteCount);
reportUserDto.setMaxQueueCount(this.maxQueueCount);
threadService.execute(new ReportQueueTask(reportUserDto, logger));
}
}

View File

@ -6,7 +6,6 @@ import com.munjaon.server.server.queue.ReportUserQueue;
import com.munjaon.server.server.task.ReportServerTask;
import org.json.simple.JSONObject;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@ -38,7 +37,9 @@ public class ReportServer extends Service {
@Override
public void initResources() {
try {
saveSystemLog("REPORT_SERVER : RESOURCES INITIALIZING ... ...");
initReportChannel();
saveSystemLog("REPORT_SERVER : RESOURCES INITIALIZED ... ...");
} catch (IOException e) {
saveSystemLog(e);
throw new RuntimeException(e);
@ -46,6 +47,7 @@ public class ReportServer extends Service {
}
private void initReportChannel() throws IOException {
saveSystemLog("REPORT_SERVER INITIALIZING : SERVER PORT [" + listenAddress.getPort() + "]");
selector = Selector.open();
/* 채널 생성 */
ServerSocketChannel serverChannel = ServerSocketChannel.open();
@ -55,6 +57,7 @@ public class ReportServer extends Service {
serverChannel.socket().bind(listenAddress);
/* 채널에 accept 대기 설정 */
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
saveSystemLog("REPORT_SERVER : SERVER SOCKET INITIALIZED ... ...");
}
private void closeReportChannel() throws IOException {
@ -63,17 +66,19 @@ public class ReportServer extends Service {
@Override
public void releaseResources() {
saveSystemLog("REPORT_SERVER : SERVER RESOURCE RELEASING ... ...");
try {
closeReportChannel();
} catch (IOException e) {
saveSystemLog(e);
throw new RuntimeException(e);
}
saveSystemLog("REPORT_SERVER : SERVER RESOURCE RELEASED ... ...");
}
@Override
public void doService() {
saveSystemLog("REPORT_SERVER : SERVER SERVICE STARTED ... ...");
while (isRun()) {
try {
execInterest();
@ -82,6 +87,7 @@ public class ReportServer extends Service {
saveSystemLog(e.toString());
}
}
saveSystemLog("REPORT_SERVER : SERVER SERVICE STOPPED ... ...");
}
private void checkInterest() throws IOException, InterruptedException {
@ -93,13 +99,13 @@ public class ReportServer extends Service {
if (reportUserDto == null) {
continue;
}
SocketChannel channel = (SocketChannel) key.channel(); // 채널을 가져온다.
if (reportUserDto.isAlive() == 1) { // 로그인이 완료되지 않은 경우
expireConnectUser(key);
} else if (reportUserDto.isAlive() == 2) {
if (reportUserDto == null || reportUserDto.isRunningMode()) {
if (reportUserDto.isRunningMode()) {
continue;
}
reportUserDto.setRunningMode(true);
/* 사용자별 Report Thread 실행 */
new ReportServerTask(selector, key, getName(), logger).run();
} else {
@ -108,8 +114,9 @@ public class ReportServer extends Service {
if (reportUserDto.isRunningMode()) {
continue;
}
reportUserDto.setRunningMode(true);
/* 사용자별 Report Thread 실행 */
new ReportServerTask(selector, key, getName(), logger).run();
new ReportServerTask(selector, key, getName(), logger).start();
}
}
} else {
@ -138,14 +145,9 @@ public class ReportServer extends Service {
if (reportUserDto == null || reportUserDto.isRunningMode()) {
continue;
}
if (reportUserDto.isLogin()) {
saveSystemLog("[REPORT SERVER READ] [ID : " + reportUserDto.getUserId() + "]");
} else {
saveSystemLog("[REPORT SERVER READ] [FIRST CONNECTION ... ... ... ... ... ... ...]");
}
reportUserDto.setRunningMode(true);
/* 사용자별 Report Thread 실행 */
new ReportServerTask(selector, key, getName(), logger).run();
new ReportServerTask(selector, key, getName(), logger).start();
}
} else {
expireConnectUser(key);
@ -165,8 +167,9 @@ public class ReportServer extends Service {
SocketAddress remoteAddr = socket.getRemoteSocketAddress();
saveSystemLog("Connected to: " + remoteAddr);
// Socket 채널을 channel에 수신 등록한다
channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).queuePath(System.getProperty("ROOTPATH") + File.separator + getProp("QUEUE_PATH")).build());
channel.register(selector, SelectionKey.OP_READ, ReportUserDto.builder().lastTrafficTime(System.currentTimeMillis()).remoteIP(remoteAddr.toString()).queuePath(System.getProperty("ROOTPATH") + getProp("QUEUE_PATH")).build());
} catch (Exception e) {
saveSystemLog(e.toString());
throw new RuntimeException(e);
}
}
@ -189,6 +192,7 @@ public class ReportServer extends Service {
// 닫기
key.cancel();
} catch (IOException e) {
saveSystemLog(e.toString());
e.printStackTrace();
}
}

View File

@ -4,10 +4,10 @@ import com.munjaon.server.queue.pool.ReportQueue;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.dto.ReportDto;
import com.munjaon.server.server.dto.ReportUserDto;
import com.munjaon.server.server.packet.Header;
import com.munjaon.server.server.packet.LinkCheck;
import com.munjaon.server.server.packet.Packet;
import com.munjaon.server.server.packet.Report;
import com.munjaon.server.server.packet.common.Header;
import com.munjaon.server.server.packet.common.LinkCheck;
import com.munjaon.server.server.packet.common.Packet;
import com.munjaon.server.server.packet.common.Report;
import com.munjaon.server.server.queue.ReportUserQueue;
import com.munjaon.server.server.task.ReportReadTask;
import com.munjaon.server.util.LogUtil;

View File

@ -93,7 +93,6 @@ public abstract class Service extends Thread {
protected void initLogFile() {
LOG_FILE = System.getProperty("ROOTPATH") + getProp("LOG_FILE");
System.out.println("LOG_FILE: " + LOG_FILE);
setLogFile( LOG_FILE );
saveSystemLog("Service Log Initializing ... ...");
}

View File

@ -6,7 +6,7 @@ import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.server.packet.common.*;
import com.munjaon.server.server.queue.CollectUserQueue;
import com.munjaon.server.util.*;

View File

@ -7,7 +7,7 @@ import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.server.config.ServerConfig;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.server.packet.common.*;
import com.munjaon.server.server.queue.CollectUserQueue;
import com.munjaon.server.server.service.PropertyLoader;
import com.munjaon.server.util.*;
@ -22,25 +22,27 @@ import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class CollectServerTask implements Runnable {
public class CollectServerTask extends Thread {
public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]");
public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]";
private Selector selector;
private SelectionKey key;
private SocketChannel channel;
private CollectUserQueue collectUserQueue = CollectUserQueue.getInstance();
private ConnectUserDto connectUserDto;
private String serviceName;
private String serviceType;
private final SelectionKey key;
private final SocketChannel channel;
private final CollectUserQueue collectUserQueue = CollectUserQueue.getInstance();
private final ConnectUserDto connectUserDto;
private final String serviceName;
private final String serviceType;
private final LogUtil logger;
private boolean IS_SERVER_RUN; // 서버가 구동중인지 여부
private boolean IS_RUN_YN;
private long RUN_FLAG_CHECK_TIME;
private long LAST_PACKET_SEND_TIME = System.currentTimeMillis(); // 패킷 송수신 시간을 체크하기 위한 변수(최대 3초간 요청이 없는 경우 Thread 종료)
private boolean IS_ERROR = false;
/* 클라이언트 요청 데이터 수신 */
private ByteBuffer headBuffer = ByteBuffer.allocateDirect(Header.HEADER_LENGTH);
private final ByteBuffer headBuffer = ByteBuffer.allocateDirect(Header.HEADER_LENGTH);
/* 세션이 만료되었는지 체크 */
private boolean isExpiredYn;
@ -53,6 +55,11 @@ public class CollectServerTask implements Runnable {
this.serviceName = serviceName;
this.serviceType = serviceType;
this.logger = logger;
if (connectUserDto.isLogin()) {
saveSystemLog("[COLLECT SERVER READ] [ID : " + connectUserDto.getUserId() + "]");
} else {
saveSystemLog("[COLLECT SERVER READ] [FIRST CONNECTION ... ... ... ... ... ... ...]");
}
}
protected String getProp(String name) {
@ -121,6 +128,13 @@ public class CollectServerTask implements Runnable {
break;
}
try {
/* 2. Packet Timeout Check */
if (checkTimeOut()) {
saveSystemLog(printTaskLog() + "[checkTimeOut : Expired ... ... ... ... ... ... ...]");
saveSystemLog(printTaskLog() + "[### End ### ### ### ### ### ### ###]");
expireConnectUser();
break;
}
/* 3. HeadBuffer 읽기 */
int size = readHeader();
/* 4. Body 읽기 */
@ -133,7 +147,7 @@ public class CollectServerTask implements Runnable {
default: expireConnectUser(); break;
}
} else if (size == 0) {
if (System.currentTimeMillis() - connectUserDto.getLastTrafficTime() > ServerConfig.DELIVER_EXEC_CYCLE_TIME) {
if (System.currentTimeMillis() - LAST_PACKET_SEND_TIME > ServerConfig.DELIVER_EXEC_CYCLE_TIME) {
this.isExpiredYn = true;
}
Thread.sleep(1);
@ -143,16 +157,29 @@ public class CollectServerTask implements Runnable {
} catch (Exception e) {
/* 세션 만료 여부 */
this.isExpiredYn = true;
e.printStackTrace();
this.IS_ERROR = true;
saveSystemLog(e);
}
/* RUN Flag 체크 */
reloadRunFlag();
}
/* 중요 : 사용자 Thread 실행모드 Off */
connectUserDto.setRunningMode(false);
/* 에러가 발생한 경우 세션을 종료힌다. */
if (IS_ERROR) {
expireConnectUser();
}
saveSystemLog(printTaskLog() + "[### End ### ### ### ### ### ### ###]");
}
private boolean checkTimeOut() {
if (System.currentTimeMillis() - this.connectUserDto.getLastTrafficTime() >= Packet.LIMIT_PACKET_TIMEOUT) {
return true;
}
return false;
}
private void recvDeliver(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
/* 서비스 중지여부 체크 */
if (isExpireService()) {
@ -168,6 +195,8 @@ public class CollectServerTask implements Runnable {
case "KFT": recvKftDeliver(channel, headBuffer); break;
default:break;
}
/* 마지막 패킷 수신시간 체크 */
LAST_PACKET_SEND_TIME = System.currentTimeMillis();
}
private boolean isExpireService() {
@ -468,16 +497,17 @@ public class CollectServerTask implements Runnable {
}
private void recvLinkCheck(SocketChannel channel) throws IOException {
ByteBuffer bodyBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_ACK_BODY_LENGTH);
channel.read(bodyBuffer);
// SocketChannel channel = (SocketChannel) key.channel();
channel.write(LinkCheck.makeLinkCheckAckBuffer());
connectUserDto.updateLastTrafficTime();
/* 서비스 중지여부 체크 */
if (isExpireService()) {
expireConnectUser();
return;
}
ByteBuffer bodyBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_ACK_BODY_LENGTH);
channel.read(bodyBuffer);
saveSystemLog(printTaskLog() + "[COLLECTOR LINK CHECK RECEIVE ... ... ... ... ... ... ...]");
channel.write(LinkCheck.makeLinkCheckAckBuffer());
saveSystemLog(printTaskLog() + "[COLLECTOR LINK CHECK ACK SEND ... ... ... ... ... ... ...]");
connectUserDto.updateLastTrafficTime();
}
private void recvBind(SocketChannel channel, ByteBuffer headBuffer) {
@ -490,13 +520,14 @@ public class CollectServerTask implements Runnable {
String id = Bind.getBindId(bindBuffer);
String pwd = Bind.getBindPwd(bindBuffer);
saveSystemLog(printTaskLog() + "[BIND REQUEST] [ID : " + id + ", PWD : " + pwd + "]");
MemberService svc = (MemberService) CacheService.LOGIN_SERVICE.getService();
MemberDto memberDto = null;
if (svc != null) {
memberDto = svc.get(id);
}
saveSystemLog(printTaskLog() + "[BIND REQUEST] [ID : " + id + ", PWD : " + pwd + "]");
/* Bind Check */
resultCode = checkBind(memberDto, this.serviceType, id, pwd);

View File

@ -18,7 +18,7 @@ public class ReportQueueTask implements Runnable {
public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]");
public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]";
private ReportUserDto reportUserDto;
private final ReportUserDto reportUserDto;
private final LogUtil logger;
public ReportQueueTask(ReportUserDto reportUserDto, LogUtil logger) {
@ -37,15 +37,15 @@ public class ReportQueueTask implements Runnable {
ReportQueue reportQueue = reportUserDto.getReportQueue();
/* 리포트큐에 최대 크기까지 쓰고 모두 리포트는 전송했는지 체크 : 테스트후 적용 */
// if (reportQueue.isWriteLimit(reportUserDto.getMaxWriteCount())) {
// if (reportQueue.isTruncateQueue(reportUserDto.getMaxWriteCount())) {
// try {
// reportQueue.truncateQueue();
// } catch (Exception e) {}
// } else {
// return;
// }
// }
if (reportQueue.isWriteLimit(reportUserDto.getMaxQueueCount())) {
if (reportQueue.isTruncateQueue(reportUserDto.getMaxQueueCount())) {
try {
reportQueue.truncateQueue();
} catch (Exception ignored) {}
} else {
return;
}
}
ReportService reportService = (ReportService) CacheService.REPORT_SERVICE.getService();
List<ReportDto> list = reportService.getReportListForUser(reportUserDto.getUserId());
if (list == null || list.isEmpty()) {

View File

@ -5,7 +5,7 @@ import com.munjaon.server.cache.enums.CacheService;
import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.pool.ReportQueue;
import com.munjaon.server.server.dto.ReportUserDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.server.packet.common.*;
import com.munjaon.server.server.queue.ReportUserQueue;
import com.munjaon.server.util.LogUtil;

View File

@ -7,7 +7,7 @@ import com.munjaon.server.queue.pool.ReportQueue;
import com.munjaon.server.server.config.ServerConfig;
import com.munjaon.server.server.dto.ReportDto;
import com.munjaon.server.server.dto.ReportUserDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.server.packet.common.*;
import com.munjaon.server.server.queue.ReportUserQueue;
import com.munjaon.server.server.service.PropertyLoader;
import com.munjaon.server.util.LogUtil;
@ -21,28 +21,29 @@ import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class ReportServerTask implements Runnable {
public class ReportServerTask extends Thread {
public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]");
public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]";
private Selector selector;
private SelectionKey key;
private SocketChannel channel;
private final SelectionKey key;
private final SocketChannel channel;
private final ReportUserQueue reportUserQueue = ReportUserQueue.getInstance();
private ReportUserDto reportUserDto;
private ReportQueue reportQueue;
private String serviceName;
private final ReportUserDto reportUserDto;
private final ReportQueue reportQueue;
private final String serviceName;
private final LogUtil logger;
private boolean IS_SERVER_RUN; // 서버가 구동중인지 여부
private boolean IS_RUN_YN;
private long RUN_FLAG_CHECK_TIME;
private long SEND_CYCLE_CHECK_TIME;
private boolean IS_ERROR = false;
/* 세션이 만료되었는지 체크 */
private boolean isExpiredYn;
/* 클라이언트 요청 데이터 수신 */
private ByteBuffer headBuffer = ByteBuffer.allocateDirect(Header.HEADER_LENGTH);
private final ByteBuffer headBuffer = ByteBuffer.allocateDirect(Header.HEADER_LENGTH);
private ReportDto reportDto; // 전송 리포트
/* Packet을 전송했는지 여부 */
private boolean isPacketSendYn;
@ -56,6 +57,11 @@ public class ReportServerTask implements Runnable {
this.serviceName = serviceName;
this.reportQueue = reportUserDto.getReportQueue();
this.logger = logger;
if (reportUserDto.isLogin()) {
saveSystemLog("[REPORT SERVER READ] [ID : " + reportUserDto.getUserId() + "]");
} else {
saveSystemLog("[REPORT SERVER READ] [FIRST CONNECTION ... ... ... ... ... ... ...]");
}
}
protected String getProp(String name) {
@ -105,27 +111,42 @@ public class ReportServerTask implements Runnable {
if (isExpiredYn) {
break;
}
// saveSystemLog("ReportServerTask is Running");
try {
sendInterest();
recvInterest();
/* RUN Flag 체크 */
reloadRunFlag();
} catch (Exception e) {
this.isExpiredYn = true;
e.printStackTrace();
/* 2. Packet Timeout Check */
if (checkTimeOut()) {
saveSystemLog(printTaskLog() + "[checkTimeOut : Expired ... ... ... ... ... ... ...]");
saveSystemLog(printTaskLog() + "[### End ### ### ### ### ### ### ###]");
expireConnectUser();
break;
}
sendInterest();
recvInterest();
/* RUN Flag 체크 */
reloadRunFlag();
}
} catch (Exception e) {
/* 세션 만료 여부 */
this.isExpiredYn = true;
e.printStackTrace();
this.IS_ERROR = true;
saveSystemLog(e);
}
/* 중요 : 사용자 Thread 실행모드 Off */
reportUserDto.setRunningMode(false);
/* 에러가 발생한 경우 세션을 종료힌다. */
if (IS_ERROR) {
expireConnectUser();
}
saveSystemLog(printTaskLog() + "[### End ### ### ### ### ### ### ###]");
}
private boolean checkTimeOut() {
if (System.currentTimeMillis() - this.reportUserDto.getLastTrafficTime() >= Packet.LIMIT_PACKET_TIMEOUT) {
return true;
}
return false;
}
private void initHeaderBuffer() {
this.headBuffer.clear();
for (int loopCnt = 0; loopCnt < Header.HEADER_LENGTH; loopCnt++) {
@ -146,8 +167,7 @@ public class ReportServerTask implements Runnable {
return size;
}
private void bindInterest() throws IOException {
// if (reportUserDto.isLogin() && !isRun()) {
private void bindInterest() {
if (reportUserDto.isLogin()) {
return;
}
@ -158,9 +178,10 @@ public class ReportServerTask implements Runnable {
/* 2. Body 읽기 */
if (size > 0) {
String command = Header.getCommand(this.headBuffer);
switch (Integer.parseInt(command)) {
case 1 : recvBind(channel, headBuffer); break;
default: expireConnectUser(); break;
if (Integer.parseInt(command) == 1) {
recvBind(channel, headBuffer);
} else {
expireConnectUser();
}
/* 패킷 수신한 경우 무조건 루프를 빠져나간다 */
break;
@ -230,17 +251,17 @@ public class ReportServerTask implements Runnable {
}
} catch (Exception e) {
resultCode = "10";
e.printStackTrace();
saveSystemLog(e);
}
try {
saveSystemLog(printTaskLog() + "[BIND RESULT : " + resultCode + "]");
channel.write(Bind.makeBindAckBuffer(resultCode));
if ("00".equals(resultCode) == false) {
if (!"00".equals(resultCode)) {
expireConnectUser();
}
} catch (IOException e) {
e.printStackTrace();
saveSystemLog(e);
}
}
@ -262,7 +283,7 @@ public class ReportServerTask implements Runnable {
return "00";
}
private void recvInterest() throws IOException, InterruptedException, Exception {
private void recvInterest() throws IOException, Exception {
while (isPacketSendYn) {
/* 1. Head 읽기 */
ByteBuffer headBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH);
@ -270,8 +291,8 @@ public class ReportServerTask implements Runnable {
if (size > 0) {
String command = Header.getCommand(headBuffer);
switch (Integer.parseInt(command)) {
case 6 : recvReport(channel, headBuffer); break;
case 8 : recvLinkCheck(channel, headBuffer); break;
case 6 : recvReport(channel); break;
case 8 : recvLinkCheck(channel); break;
default: saveSystemLog(printTaskLog() + "[INVALID REQUEST][command : " + command + ";");
expireConnectUser(); break;
}
@ -289,7 +310,7 @@ public class ReportServerTask implements Runnable {
}
}
private void recvLinkCheck(SocketChannel channel, ByteBuffer headBuffer) throws IOException {
private void recvLinkCheck(SocketChannel channel) throws IOException {
ByteBuffer bodyBuffer = ByteBuffer.allocate(LinkCheck.LINK_CHECK_ACK_BODY_LENGTH);
int size = channel.read(bodyBuffer);
if (size > 0) {
@ -299,7 +320,7 @@ public class ReportServerTask implements Runnable {
}
}
private void recvReport(SocketChannel channel, ByteBuffer headBuffer) throws Exception {
private void recvReport(SocketChannel channel) throws Exception {
ByteBuffer bodyBuffer = ByteBuffer.allocate(Report.REPORT_ACK_BODY_LENGTH);
int size = channel.read(bodyBuffer);
if (size != Report.REPORT_ACK_BODY_LENGTH) {
@ -316,34 +337,30 @@ public class ReportServerTask implements Runnable {
}
}
private void sendInterest() throws IOException {
if (reportUserDto.isLogin() == false) {
private void sendInterest() throws Exception {
if (!reportUserDto.isLogin()) {
return;
}
if (reportUserDto.isAlive() == 2) {
channel.write(LinkCheck.makeLinkCheckBuffer());
SEND_CYCLE_CHECK_TIME = System.currentTimeMillis();
/* Packet 전송했는지 여부 */
/* Packet 전송했는지 여부 */
isPacketSendYn = true;
} else {
if (this.reportQueue != null && this.reportQueue.isRemainReport()) {
try {
this.reportDto = this.reportQueue.popReportFromQueue();
if (reportDto == null) {
return;
}
saveSystemLog(printTaskLog() + "[REPORT SEND : " + reportDto.toString() + "]");
ByteBuffer reportBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + Report.REPORT_BODY_LENGTH);
Packet.setDefaultByte(reportBuffer);
Header.putHeader(reportBuffer, Header.COMMAND_REPORT, Report.REPORT_BODY_LENGTH);
Report.putReport(reportBuffer, reportDto);
channel.write(reportBuffer);
/* Packet을 전송했는지 여부 */
SEND_CYCLE_CHECK_TIME = System.currentTimeMillis();
isPacketSendYn = true;
} catch (Exception e) {
e.printStackTrace();
this.reportDto = this.reportQueue.popReportFromQueue();
if (reportDto == null) {
return;
}
saveSystemLog(printTaskLog() + "[REPORT SEND : " + reportDto.toString() + "]");
ByteBuffer reportBuffer = ByteBuffer.allocate(Header.HEADER_LENGTH + Report.REPORT_BODY_LENGTH);
Packet.setDefaultByte(reportBuffer);
Header.putHeader(reportBuffer, Header.COMMAND_REPORT, Report.REPORT_BODY_LENGTH);
Report.putReport(reportBuffer, reportDto);
channel.write(reportBuffer);
/* Packet 전송했는지 여부 */
SEND_CYCLE_CHECK_TIME = System.currentTimeMillis();
isPacketSendYn = true;
}
}
/* 쓰레드 완료 시점 체크 */
@ -372,7 +389,7 @@ public class ReportServerTask implements Runnable {
// 닫기
key.cancel();
} catch (IOException e) {
e.printStackTrace();
saveSystemLog(e);
}
}

View File

@ -6,7 +6,7 @@ import com.munjaon.server.cache.service.MemberService;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.server.dto.ConnectUserDto;
import com.munjaon.server.server.packet.*;
import com.munjaon.server.server.packet.common.*;
import com.munjaon.server.server.queue.CollectUserQueue;
import com.munjaon.server.util.*;

View File

@ -16,8 +16,8 @@ public class StatusCheckTask implements Runnable {
public static final SimpleDateFormat sdf = new SimpleDateFormat("[MM-dd HH:mm:ss]");
public static final String LOG_DATE_FORMAT = "[MM-dd HH:mm:ss]";
private CollectUserQueue collectUserQueue = CollectUserQueue.getInstance();
private String serviceType;
private final CollectUserQueue collectUserQueue = CollectUserQueue.getInstance();
private final String serviceType;
private final LogUtil logger;
public StatusCheckTask(String serviceType, LogUtil logger) {
@ -29,7 +29,7 @@ public class StatusCheckTask implements Runnable {
public void run() {
saveSystemLog("[" + this.serviceType + "][USER STATUS CHECK is starting ... ...]");
List<ConnectUserDto> userList = collectUserQueue.getUsers(this.serviceType);
if (userList == null && userList.isEmpty()) {
if (userList == null || userList.isEmpty()) {
saveSystemLog("[" + this.serviceType + "][USER STATUS CHECK is empty ... ...]");
saveSystemLog("[" + this.serviceType + "][USER STATUS CHECK is ended ... ...]");
return;
@ -39,9 +39,6 @@ public class StatusCheckTask implements Runnable {
for (ConnectUserDto user : userList) {
MemberDto savedMemberDto = user.getMemberDto();
saveSystemLog("[" + this.serviceType + "][USER PREVIOUS STATUS : " + savedMemberDto.toString() + "]");
if (savedMemberDto == null) {
continue;
}
MemberDto newMemberDto = svc.get(savedMemberDto.getMberId());
saveSystemLog("[" + this.serviceType + "][USER NEW STATUS : " + newMemberDto.toString() + "]");
if (newMemberDto == null) {

View File

@ -477,27 +477,27 @@ public final class MessageUtil {
buffer.position(ReportConfig.MSG_ID_POSITION);
destArray = new byte[ReportConfig.MSG_ID_LENGTH];
buffer.get(destArray);
reportDto.setMsgId(new String(destArray).trim());
reportDto.setMsgId(QueueConstants.getString(destArray));
/* AGENT_CODE (Length : 2 / Position : 20) */
buffer.position(ReportConfig.AGENT_CODE_POSITION);
destArray = new byte[ReportConfig.AGENT_CODE_LENGTH];
buffer.get(destArray);
reportDto.setAgentCode(new String(destArray).trim());
reportDto.setAgentCode(QueueConstants.getString(destArray));
/* SEND_TIME (Length : 14 / Position : 22) */
buffer.position(ReportConfig.SEND_TIME_POSITION);
destArray = new byte[ReportConfig.SEND_TIME_LENGTH];
buffer.get(destArray);
reportDto.setRsltDate(new String(destArray).trim());
reportDto.setRsltDate(QueueConstants.getString(destArray));
/* TELECOM (Length : 3 / Position : 36) */
buffer.position(ReportConfig.TELECOM_POSITION);
destArray = new byte[ReportConfig.TELECOM_LENGTH];
buffer.get(destArray);
reportDto.setRsltNet(new String(destArray).trim());
reportDto.setRsltNet(QueueConstants.getString(destArray));
/* RESULT (Length : 5 / Position : 39) */
buffer.position(ReportConfig.RESULT_POSITION);
destArray = new byte[ReportConfig.RESULT_LENGTH];
buffer.get(destArray);
reportDto.setRsltCode(new String(destArray).trim());
reportDto.setRsltCode(QueueConstants.getString(destArray));
return reportDto;
}

View File

@ -5,9 +5,6 @@
package com.munjaon.server.util;
import java.text.DecimalFormat;
import java.util.ArrayList;
/**
* 문자열 관련 유틸리티 클래스
* @author JDS
@ -39,129 +36,4 @@ public final class StringUtil {
return ((String)obj).trim();
}
public static String ltrim(byte[] obj) {
return ltrim(new String(obj));
}
public static String ltrim(String obj) {
return ltrim(obj.toCharArray());
}
public static String ltrim(char[] obj) {
int len = obj.length;
int idx = 0;
while( idx < len && obj[idx] <= ' ' ) {
idx++;
}
return new String(obj, idx, len-idx);
}
public static String rtrim(byte[] obj) {
return rtrim(new String(obj));
}
public static String rtrim(String obj) {
return rtrim(obj.toCharArray());
}
public static String rtrim(char[] obj) {
int len = obj.length;
int idx = len-1;
while( idx >= 0 && obj[idx] <= ' ' ) {
idx--;
}
return new String(obj, 0, idx+1);
}
public static String replaceAll(String src, String from, String to) {
StringBuilder sbuf = new StringBuilder();
int len = from.length();
int idx = 0;
int stx = 0;
while( (idx=src.indexOf(from, stx)) > -1 ) {
sbuf.append(src.substring(stx, idx));
sbuf.append(to);
stx=idx+len;
}
sbuf.append(src.substring(stx));
return sbuf.toString();
}
public static String[] split(String sSrc, String sDelim) {
ArrayList aList = new ArrayList();
String sTmp;
int len = sDelim.length();
int idx = 0;
int stx = 0;
while( (idx=sSrc.indexOf(sDelim, stx)) > -1 ) {
sTmp = sSrc.substring(stx, idx);
aList.add(sTmp);
stx=idx+len;
}
if( stx <= sSrc.length() ) {
aList.add(sSrc.substring(stx));
}
String[] sRet = new String[aList.size()];
for( int i=0; i<aList.size(); i++ ) {
sRet[i] = (String) aList.get(i);
}
return sRet;
}
public static String substring(String obj, int idx, int length) {
if( obj.getBytes().length <= idx ) {
return "";
}
if( obj.getBytes().length <= length ) {
return obj;
}
int totallen=0;
int i=idx;
for( i=idx; i<obj.length(); i++ ) {
totallen += obj.substring(i, i+1).getBytes().length;
if( length < totallen ) {
break;
}
}
return obj.substring(idx, i);
}
public static String substring(String obj, int length) {
return substring(obj, 0, length);
}
public static String numFilter(String src) {
return src.replaceAll("[^0-9]", "");
}
public static String toNumberFormat(Object obj, String fmt) {
DecimalFormat df = new DecimalFormat(fmt);
return df.format(obj);
}
public static String pad0(String str, int size) {
char[] zeros = new char[size - str.length()];
for (int i = 0; i < zeros.length; i++)
zeros[i] = '0';
return new String(zeros) + str;
}
}

View File

@ -7,6 +7,11 @@ spring:
# jdbc-url: jdbc:mariadb://119.193.215.98:3306/mjon_agent_back
username: mjonUr_agent
password: mjagent123$
connectionTimeout: 30000
maximumPoolSize: 15
maxLifetime: 1800000
poolName: HikariCP
connectionTestQuery: SELECT 1
server:
port: 8090

View File

@ -2,10 +2,15 @@ spring:
datasource:
server:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://119.193.215.98:3306/mjon_agent_back
jdbc-url: jdbc:mariadb://119.193.215.98:3306/mjon_agent_back
username: mjonUr_agent
password: mjagent123$
url: jdbc:mariadb://localhost:3306/mjon
jdbc-url: jdbc:mariadb://localhost:3306/mjon
username: root
password: 1234
connectionTimeout: 30000
maximumPoolSize: 15
maxLifetime: 1800000
poolName: HikariCP
connectionTestQuery: SELECT 1
server:
port: 8090

View File

@ -6,6 +6,11 @@ spring:
jdbc-url: jdbc:mariadb://119.193.215.98:3306/mjon_agent
username: mjonUr_agent
password: mjagent123$
connectionTimeout: 30000
maximumPoolSize: 20
maxLifetime: 1800000
poolName: HikariCP
connectionTestQuery: SELECT 1
server:
port: 8090