write 큐 기능 개선, 통신 프로세스 변경, Read 큐 프로세스 개선, 서버, 클라이언트 통신 오류 수정중

This commit is contained in:
dsjang 2024-07-01 01:31:50 +09:00
parent e857310b62
commit 3950fba677
29 changed files with 1517 additions and 232 deletions

View File

@ -0,0 +1,34 @@
package com.munjaon.server.api.controller;
import com.munjaon.server.api.dto.base.ApiResponse;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RequestMapping("/api/message")
@RestController
@RequiredArgsConstructor
public class MessageController {
@PostMapping("/save")
public ResponseEntity postSample(@RequestBody BasicMessageDto reqDto) throws ConfigurationException {
log.debug("BasicMessageDto : {}", reqDto);
if (reqDto.getServiceType() != null && reqDto.getServiceType().equals("4")) {
QueueTypeWorker worker = QueueTypeWorker.find("SMS");
if (worker != null) {
log.debug("queue size : {}", worker.isExistQueue("SMS01"));
worker.pushQueue(reqDto);
}
}
return new ResponseEntity(ApiResponse.toResponse(200, "OK", reqDto), HttpStatus.OK);
}
}

View File

@ -1,13 +1,20 @@
package com.munjaon.server.config;
import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.queue.pool.SmsReadQueue;
import com.munjaon.server.queue.pool.SmsWriteQueue;
import com.munjaon.server.server.service.PropertyLoader;
import com.munjaon.server.server.service.QueueServerService;
import com.munjaon.server.util.ServiceUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
@Slf4j
@Configuration
@RequiredArgsConstructor
public class RunnerConfiguration {
@ -34,7 +41,28 @@ public class RunnerConfiguration {
@Bean
@Order(2)
public CommandLineRunner getRunnerBeanForService() {
public CommandLineRunner getRunnerBeanForSmsQueue() {
try {
String[] svcArray = ServiceUtil.getServiceNames(serverConfig.getStringArray("SMS.SERVICE_LIST"));
if (svcArray == null || svcArray.length == 0) {
log.info("SMS service list is empty");
} else {
if (ServiceUtil.isDuplicate(svcArray)) {
log.info("SMS service list is duplicated");
} else {
for (String svc : svcArray) {
log.info("SERVICE CREATE : {}", svc);
QueueInfo queueInfo = QueueInfo.builder().queueName(svc).serviceType("SMS").queuePath(serverConfig.getString("SMS.QUEUE_PATH")).build();
SmsWriteQueue smsWriteQueue = new SmsWriteQueue(queueInfo);
SmsReadQueue smsReadQueue = new SmsReadQueue(queueInfo);
QueueServerService queueServerService = new QueueServerService(svc, smsWriteQueue, smsReadQueue);
queueServerService.start();
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return args -> System.out.println("Runner Bean #2");
}
}

View File

@ -10,10 +10,10 @@ import lombok.ToString;
@Builder
@ToString
public class QueueInfo {
private String queueName = "";
private String queueFileName = "";
private String queuePath;
private String queueName;
private String queueFileName;
// private int queueDataLength = 0;
private String serviceType = "";
private String readXMLFileName = "";
private boolean isRun;
private String serviceType;
private String readXMLFileName;
}

View File

@ -39,6 +39,30 @@ public enum QueueTypeWorker {
SmsQueueService smsQueueService = (SmsQueueService) QueueService.SMS_QUEUE_SERVICE.getService();
return smsQueueService.saveMessageToTable(data);
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
SmsQueueService smsQueueService = (SmsQueueService) QueueService.SMS_QUEUE_SERVICE.getService();
smsQueueService.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
SmsQueueService smsQueueService = (SmsQueueService) QueueService.SMS_QUEUE_SERVICE.getService();
return smsQueueService.memoryDeQueue();
}
@Override
public int getMemorySize() {
SmsQueueService smsQueueService = (SmsQueueService) QueueService.SMS_QUEUE_SERVICE.getService();
return smsQueueService.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
SmsQueueService smsQueueService = (SmsQueueService) QueueService.SMS_QUEUE_SERVICE.getService();
return smsQueueService.isMemoryEmpty();
}
},
MSG_TYPE_LMS("LMS") {
@Override
@ -70,6 +94,30 @@ public enum QueueTypeWorker {
LmsQueueService lmsQueueService = (LmsQueueService) QueueService.LMS_QUEUE_SERVICE.getService();
return lmsQueueService.saveMessageToTable(data);
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
LmsQueueService lmsQueueService = (LmsQueueService) QueueService.LMS_QUEUE_SERVICE.getService();
lmsQueueService.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
LmsQueueService lmsQueueService = (LmsQueueService) QueueService.LMS_QUEUE_SERVICE.getService();
return lmsQueueService.memoryDeQueue();
}
@Override
public int getMemorySize() {
LmsQueueService lmsQueueService = (LmsQueueService) QueueService.LMS_QUEUE_SERVICE.getService();
return lmsQueueService.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
LmsQueueService lmsQueueService = (LmsQueueService) QueueService.LMS_QUEUE_SERVICE.getService();
return lmsQueueService.isMemoryEmpty();
}
},
MSG_TYPE_MMS("MMS") {
@Override
@ -101,6 +149,30 @@ public enum QueueTypeWorker {
MmsQueueService mmsQueueService = (MmsQueueService) QueueService.MMS_QUEUE_SERVICE.getService();
return mmsQueueService.saveMessageToTable(data);
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
MmsQueueService mmsQueueService = (MmsQueueService) QueueService.MMS_QUEUE_SERVICE.getService();
mmsQueueService.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
MmsQueueService mmsQueueService = (MmsQueueService) QueueService.MMS_QUEUE_SERVICE.getService();
return mmsQueueService.memoryDeQueue();
}
@Override
public int getMemorySize() {
MmsQueueService mmsQueueService = (MmsQueueService) QueueService.MMS_QUEUE_SERVICE.getService();
return mmsQueueService.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
MmsQueueService mmsQueueService = (MmsQueueService) QueueService.MMS_QUEUE_SERVICE.getService();
return mmsQueueService.isMemoryEmpty();
}
},
MSG_TYPE_KAT("KAT") {
@Override
@ -132,6 +204,30 @@ public enum QueueTypeWorker {
KakaoAlarmQueueService kakaoAlarmQueueService = (KakaoAlarmQueueService) QueueService.KAT_QUEUE_SERVICE.getService();
return kakaoAlarmQueueService.saveMessageToTable(data);
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
KakaoAlarmQueueService kakaoAlarmQueueService = (KakaoAlarmQueueService) QueueService.KAT_QUEUE_SERVICE.getService();
kakaoAlarmQueueService.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
KakaoAlarmQueueService kakaoAlarmQueueService = (KakaoAlarmQueueService) QueueService.KAT_QUEUE_SERVICE.getService();
return kakaoAlarmQueueService.memoryDeQueue();
}
@Override
public int getMemorySize() {
KakaoAlarmQueueService kakaoAlarmQueueService = (KakaoAlarmQueueService) QueueService.KAT_QUEUE_SERVICE.getService();
return kakaoAlarmQueueService.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
KakaoAlarmQueueService kakaoAlarmQueueService = (KakaoAlarmQueueService) QueueService.KAT_QUEUE_SERVICE.getService();
return kakaoAlarmQueueService.isMemoryEmpty();
}
},
MSG_TYPE_KFT("KFT") {
@Override
@ -163,6 +259,30 @@ public enum QueueTypeWorker {
KakaoFriendQueueService kakaoFriendQueueService = (KakaoFriendQueueService) QueueService.KFT_QUEUE_SERVICE.getService();
return kakaoFriendQueueService.saveMessageToTable(data);
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
KakaoFriendQueueService kakaoFriendQueueService = (KakaoFriendQueueService) QueueService.KFT_QUEUE_SERVICE.getService();
kakaoFriendQueueService.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
KakaoFriendQueueService kakaoFriendQueueService = (KakaoFriendQueueService) QueueService.KFT_QUEUE_SERVICE.getService();
return kakaoFriendQueueService.memoryDeQueue();
}
@Override
public int getMemorySize() {
KakaoFriendQueueService kakaoFriendQueueService = (KakaoFriendQueueService) QueueService.KFT_QUEUE_SERVICE.getService();
return kakaoFriendQueueService.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
KakaoFriendQueueService kakaoFriendQueueService = (KakaoFriendQueueService) QueueService.KFT_QUEUE_SERVICE.getService();
return kakaoFriendQueueService.isMemoryEmpty();
}
};
QueueTypeWorker(final String name) {
@ -186,4 +306,9 @@ public enum QueueTypeWorker {
public abstract void addQueue(WriteQueue queue);
public abstract void pushQueue(BasicMessageDto data);
public abstract int saveMessageToTable(BasicMessageDto data);
public abstract void memoryEnQueue(BasicMessageDto data);
public abstract BasicMessageDto memoryDeQueue();
public abstract int getMemorySize();
public abstract boolean isMemoryEmpty();
}

View File

@ -0,0 +1,47 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.dto.BasicMessageDto;
import java.util.LinkedList;
public class KakaoAlarmMemoryQueue {
/** Lock Object */
private final Object msgMonitor = new Object();
/** Message Queue */
private final LinkedList<BasicMessageDto> msgQueue = new LinkedList<>();
/** Singleton Instance */
private static KakaoAlarmMemoryQueue memoryQueue;
public synchronized static KakaoAlarmMemoryQueue getInstance() {
if(memoryQueue == null){
memoryQueue = new KakaoAlarmMemoryQueue();
}
return memoryQueue;
}
/** MESSAGE QUEUE ************************************************************************ */
/** MESSAGE enQueue */
public void memoryEnQueue(BasicMessageDto data){
synchronized(msgMonitor){
msgQueue.addLast(data);
}
}
/** SMS deQueue */
public BasicMessageDto memoryDeQueue() {
synchronized(msgMonitor){
if (msgQueue.isEmpty()) return null;
else return msgQueue.removeFirst();
}
}
/** SMS size */
public int getMemorySize() {
synchronized(msgMonitor) {
return msgQueue.size();
}
}
/** SMS isEmpty */
public boolean isMemoryEmpty() {
synchronized(msgMonitor) {
return msgQueue.isEmpty();
}
}
}

View File

@ -5,17 +5,17 @@ import com.munjaon.server.queue.dto.BasicMessageDto;
public class KakaoAlarmWriteQueue extends WriteQueue {
@Override
int isValidateMessageForExtend(BasicMessageDto messageDto) {
public int isValidateMessageForExtend(BasicMessageDto messageDto) {
return 0;
}
@Override
void pushMessageToBuffer(BasicMessageDto messageDto) throws Exception {
public void pushMessageToBuffer(BasicMessageDto messageDto) throws Exception {
}
@Override
void initDataBuffer() {
public void initDataBuffer() {
}
}

View File

@ -0,0 +1,47 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.dto.BasicMessageDto;
import java.util.LinkedList;
public class KakaoFriendMemoryQueue {
/** SMS Lock Object */
private final Object msgMonitor = new Object();
/** SMS Message Queue */
private final LinkedList<BasicMessageDto> msgQueue = new LinkedList<>();
/** Singleton Instance */
private static KakaoFriendMemoryQueue memoryQueue;
public synchronized static KakaoFriendMemoryQueue getInstance() {
if(memoryQueue == null){
memoryQueue = new KakaoFriendMemoryQueue();
}
return memoryQueue;
}
/** MESSAGE QUEUE ************************************************************************ */
/** MESSAGE enQueue */
public void memoryEnQueue(BasicMessageDto data){
synchronized(msgMonitor){
msgQueue.addLast(data);
}
}
/** SMS deQueue */
public BasicMessageDto memoryDeQueue() {
synchronized(msgMonitor){
if (msgQueue.isEmpty()) return null;
else return msgQueue.removeFirst();
}
}
/** SMS size */
public int getMemorySize() {
synchronized(msgMonitor) {
return msgQueue.size();
}
}
/** SMS isEmpty */
public boolean isMemoryEmpty() {
synchronized(msgMonitor) {
return msgQueue.isEmpty();
}
}
}

View File

@ -4,17 +4,17 @@ import com.munjaon.server.queue.dto.BasicMessageDto;
public class KakaoFriendWriteQueue extends WriteQueue {
@Override
int isValidateMessageForExtend(BasicMessageDto messageDto) {
public int isValidateMessageForExtend(BasicMessageDto messageDto) {
return 0;
}
@Override
void pushMessageToBuffer(BasicMessageDto messageDto) throws Exception {
public void pushMessageToBuffer(BasicMessageDto messageDto) throws Exception {
}
@Override
void initDataBuffer() {
public void initDataBuffer() {
}
}

View File

@ -0,0 +1,47 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.dto.BasicMessageDto;
import java.util.LinkedList;
public class LmsMemoryQueue {
/** Lock Object */
private final Object msgMonitor = new Object();
/** Message Queue */
private final LinkedList<BasicMessageDto> msgQueue = new LinkedList<>();
/** Singleton Instance */
private static LmsMemoryQueue memoryQueue;
public synchronized static LmsMemoryQueue getInstance() {
if(memoryQueue == null){
memoryQueue = new LmsMemoryQueue();
}
return memoryQueue;
}
/** MESSAGE QUEUE ************************************************************************ */
/** MESSAGE enQueue */
public void memoryEnQueue(BasicMessageDto data){
synchronized(msgMonitor){
msgQueue.addLast(data);
}
}
/** SMS deQueue */
public BasicMessageDto memoryDeQueue() {
synchronized(msgMonitor){
if (msgQueue.isEmpty()) return null;
else return msgQueue.removeFirst();
}
}
/** SMS size */
public int getMemorySize() {
synchronized(msgMonitor) {
return msgQueue.size();
}
}
/** SMS isEmpty */
public boolean isMemoryEmpty() {
synchronized(msgMonitor) {
return msgQueue.isEmpty();
}
}
}

View File

@ -0,0 +1,47 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.dto.BasicMessageDto;
import java.util.LinkedList;
public class MmsMemoryQueue {
/** Lock Object */
private final Object msgMonitor = new Object();
/** Message Queue */
private final LinkedList<BasicMessageDto> msgQueue = new LinkedList<>();
/** Singleton Instance */
private static MmsMemoryQueue memoryQueue;
public synchronized static MmsMemoryQueue getInstance() {
if(memoryQueue == null){
memoryQueue = new MmsMemoryQueue();
}
return memoryQueue;
}
/** MESSAGE QUEUE ************************************************************************ */
/** MESSAGE enQueue */
public void memoryEnQueue(BasicMessageDto data){
synchronized(msgMonitor){
msgQueue.addLast(data);
}
}
/** SMS deQueue */
public BasicMessageDto memoryDeQueue() {
synchronized(msgMonitor){
if (msgQueue.isEmpty()) return null;
else return msgQueue.removeFirst();
}
}
/** SMS size */
public int getMemorySize() {
synchronized(msgMonitor) {
return msgQueue.size();
}
}
/** SMS isEmpty */
public boolean isMemoryEmpty() {
synchronized(msgMonitor) {
return msgQueue.isEmpty();
}
}
}

View File

@ -1,9 +1,11 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.dto.BasicMessageDto;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
@Slf4j
public abstract class QueuePool {
/** Lock Object */
protected final Object lockMonitor = new Object();
@ -69,6 +71,7 @@ public abstract class QueuePool {
}
// 파일큐에 Push 한다.
queue = queuePool.get(queueIndex);
log.info("Adding queue : {}, Data : {}", queue.getQueueName(), data);
queue.pushMessageToBuffer(data);
// 큐인덱서를 증가시킨다.
queueIndex++;

View File

@ -0,0 +1,211 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.queue.config.QueueHeaderConfig;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.util.FileUtil;
import com.munjaon.server.util.MessageUtil;
import lombok.Getter;
import org.jdom2.Document;
import org.jdom2.Element;
import org.jdom2.JDOMException;
import org.jdom2.input.SAXBuilder;
import org.jdom2.output.Format;
import org.jdom2.output.XMLOutputter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
public abstract class ReadQueue {
/** Queue Header Buffer */
protected ByteBuffer headerBuffer = null;
/** Header 에서 사용하는 변수 */
protected byte[] headerArray = null;
/** Queue Create Date - [Format:YYYYMMDD] */
@Getter
protected String createDate;
/** XML 읽은 날짜 */
@Getter
protected String readXMLDate;
/** Queue Pop Counter */
@Getter
protected int popCounter = -1;
/** Queue Push Counter */
@Getter
protected int pushCounter = 0;
/* 큐경로 */
@Getter
protected String queuePath;
/** Queue File Channel */
protected FileChannel channel = null;
protected FileOutputStream fileOutputStream = null;
/** Queue Information */
@Getter
protected QueueInfo queueInfo = null;
/** pushBuffer() 함수에서 사용하는 변수 */
protected ByteBuffer dataBuffer = null;
protected void initQueuePath() {
/* 1. 큐경로 */
this.queuePath = System.getProperty("ROOTPATH") + File.separator + queueInfo.getQueuePath();
/* 2. 경로 체크 및 생성 */
FileUtil.mkdirs(this.queuePath);
}
public void initReadQueue() throws IOException, JDOMException {
this.queueInfo.setReadXMLFileName(this.queuePath + File.separator + this.queueInfo.getQueueName() + "_Read.xml");
File file = new File(this.queueInfo.getReadXMLFileName());
if (file.exists()) {
readPopCounter(file);
} else {
this.readXMLDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
this.popCounter = 0;
writePopCounter();
}
}
public void initPopCounter() throws IOException {
initHeaderBuffer();
// 데이터 초기화
this.readXMLDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
this.popCounter = 0;
writePopCounter();
}
protected void readPopCounter(File file) throws IOException, JDOMException {
SAXBuilder sax = new SAXBuilder();
// String that contains XML
Document doc = (Document) sax.build(file);
Element rootNode = doc.getRootElement();
List<Element> childElements = rootNode.getChildren();
for (Element child : childElements) {
if ("createDate".equals(child.getName())) {
this.readXMLDate = child.getValue();
}
if ("popCounter".equals(child.getName())) {
this.popCounter = Integer.parseInt(child.getValue());
}
}
}
protected void readPopCounter() throws IOException, JDOMException {
// String that contains XML
File file = new File(this.queueInfo.getReadXMLFileName());
readPopCounter(file);
}
public void writePopCounter() throws IOException {
SAXBuilder sb = new SAXBuilder();
Document docFile = new Document();
Element rootElement = new Element("ReadQueue");
rootElement.addContent(new Element("createDate").setText(this.readXMLDate));
rootElement.addContent(new Element("popCounter").setText(Integer.toString(this.popCounter)));
docFile.setRootElement(rootElement);
// pretty print format
XMLOutputter xmlOutputter = new XMLOutputter(Format.getPrettyFormat());
// output to console
this.fileOutputStream = new FileOutputStream(this.queueInfo.getReadXMLFileName());
xmlOutputter.output(docFile, fileOutputStream);
}
public void initQueue() throws Exception {
this.headerBuffer = ByteBuffer.allocateDirect(QueueHeaderConfig.QUEUE_HEADER_LENGTH);
try {
/* 1. 큐경로 초기화 */
initQueuePath();
this.queueInfo.setQueueFileName(this.queuePath + File.separator + this.queueInfo.getQueueName() + ".queue");
File file = new File(this.queueInfo.getQueueFileName());
if (file.exists()) {
this.channel = new RandomAccessFile(file, "r").getChannel();
// 파일큐의 헤더 정보를 읽어온다.
readHeader();
/* 읽기 큐 XML Load Or Create */
initReadQueue();
} else {
throw new Exception(this.queueInfo.getQueueName() + "'s Queue is Not Exists!!");
}
} catch(Exception e) {
throw e;
} finally {
//lock.release();
}
}
protected void readHeader() throws Exception {
try {
initHeaderBuffer();
this.channel.position(0);
this.channel.read(this.headerBuffer);
this.headerArray = new byte[QueueHeaderConfig.CREATE_DATE_LENGTH];
// 생성날짜 가져오기 - 생성날짜(10) / 읽은카운트(10) / 쓴카운트(10)
this.headerBuffer.position(QueueHeaderConfig.CREATE_DATE_POSITION);
this.headerBuffer.get(this.headerArray);
this.createDate = (new String(this.headerArray)).trim();
// 카운트 가져오기
this.headerArray = new byte[QueueHeaderConfig.PUSH_COUNT_LENGTH];
this.headerBuffer.position(QueueHeaderConfig.PUSH_COUNT_POSITION);
this.headerBuffer.get(this.headerArray);
this.pushCounter = Integer.parseInt((new String(this.headerArray)).trim());
} catch(Exception e) {
throw e;
}
}
protected void initHeaderBuffer(){
this.headerBuffer.clear();
for(int loopCnt = 0; loopCnt < QueueHeaderConfig.QUEUE_HEADER_LENGTH; loopCnt++){
this.headerBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
}
this.headerBuffer.position(0);
}
public void close() throws IOException {
try {
if (this.channel != null && this.channel.isOpen()) {
channel.close();
}
if (this.fileOutputStream != null) {
this.fileOutputStream.close();
}
} catch(IOException e) {
throw e;
}
}
public BasicMessageDto popMessageFromBuffer() throws IOException, JDOMException, Exception {
readHeader();
readPopCounter();
BasicMessageDto messageDto = null;
if (this.popCounter < this.pushCounter) {
/* Read buffer 초기화 */
initDataBuffer();
/* Read Queue 읽기 */
popBuffer();
/* 메시지 추출 */
messageDto = new BasicMessageDto();
MessageUtil.getBytesForCommonMessage(this.dataBuffer, messageDto);
getBytesForExtendMessage(messageDto);
/* Pop Counter 증가 및 저장 */
this.popCounter = this.popCounter + 1;
writePopCounter();
}
return messageDto;
}
abstract void popBuffer() throws Exception;
abstract void getBytesForExtendMessage(BasicMessageDto messageDto);
abstract void initDataBuffer();
}

View File

@ -0,0 +1,47 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.dto.BasicMessageDto;
import java.util.LinkedList;
public class SmsMemoryQueue {
/** Lock Object */
private final Object msgMonitor = new Object();
/** Message Queue */
private final LinkedList<BasicMessageDto> msgQueue = new LinkedList<>();
/** Singleton Instance */
private static SmsMemoryQueue memoryQueue;
public synchronized static SmsMemoryQueue getInstance() {
if(memoryQueue == null){
memoryQueue = new SmsMemoryQueue();
}
return memoryQueue;
}
/** MESSAGE QUEUE ************************************************************************ */
/** MESSAGE enQueue */
public void memoryEnQueue(BasicMessageDto data) {
synchronized(msgMonitor) {
msgQueue.addLast(data);
}
}
/** SMS deQueue */
public BasicMessageDto memoryDeQueue() {
synchronized(msgMonitor) {
if(msgQueue.isEmpty()) return null;
else return msgQueue.removeFirst();
}
}
/** SMS size */
public int getMemorySize(){
synchronized(msgMonitor) {
return msgQueue.size();
}
}
/** SMS isEmpty */
public boolean isMemoryEmpty() {
synchronized(msgMonitor) {
return msgQueue.isEmpty();
}
}
}

View File

@ -0,0 +1,39 @@
package com.munjaon.server.queue.pool;
import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.queue.config.SmsBodyConfig;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.util.MessageUtil;
import java.nio.ByteBuffer;
public class SmsReadQueue extends ReadQueue {
public SmsReadQueue(QueueInfo queueInfo) throws Exception {
this.queueInfo = queueInfo;
// initQueue();
}
@Override
void popBuffer() throws Exception {
this.channel.position(MessageUtil.calcReadPosition(this.popCounter, SmsBodyConfig.SMS_SUM_BYTE_LENGTH));
this.channel.read(this.dataBuffer);
}
@Override
void getBytesForExtendMessage(BasicMessageDto messageDto) {
MessageUtil.getBytesForSmsMessage(this.dataBuffer, messageDto);
}
@Override
void initDataBuffer() {
if (this.dataBuffer == null) {
this.dataBuffer = ByteBuffer.allocateDirect(SmsBodyConfig.SMS_SUM_BYTE_LENGTH);
}
this.dataBuffer.clear();
for(int loopCnt = 0; loopCnt < SmsBodyConfig.SMS_SUM_BYTE_LENGTH; loopCnt++){
this.dataBuffer.put(QueueConstants.SET_DEFAULT_BYTE);
}
this.dataBuffer.position(0);
}
}

View File

@ -13,7 +13,7 @@ public class SmsWriteQueue extends WriteQueue {
public SmsWriteQueue(QueueInfo queueInfo) throws Exception {
this.queueInfo = queueInfo;
/* 큐초기화 */
initQueue();
// initQueue();
}
@Override

View File

@ -6,6 +6,8 @@ import com.munjaon.server.queue.config.QueueConstants;
import com.munjaon.server.queue.config.QueueHeaderConfig;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.dto.QueueInfo;
import com.munjaon.server.util.FileUtil;
import com.munjaon.server.util.JobFileFactory;
import com.munjaon.server.util.MessageUtil;
import lombok.Getter;
@ -29,6 +31,10 @@ public abstract class WriteQueue {
@Getter
protected int pushCounter = 0;
/* 큐경로 */
@Getter
protected String queuePath;
/** Queue File Channel */
protected FileChannel channel = null;
/** Queue Information */
@ -37,12 +43,21 @@ public abstract class WriteQueue {
/** pushBuffer() 함수에서 사용하는 변수 */
protected ByteBuffer dataBuffer = null;
protected void initQueue() throws Exception {
protected void initQueuePath() {
/* 1. 큐경로 */
this.queuePath = System.getProperty("ROOTPATH") + File.separator + queueInfo.getQueuePath();
/* 2. 경로 체크 및 생성 */
FileUtil.mkdirs(this.queuePath);
}
public void initQueue() throws Exception {
this.headerBuffer = ByteBuffer.allocateDirect(QueueHeaderConfig.QUEUE_HEADER_LENGTH);
try{
/* 1. 큐경로 초기화 */
initQueuePath();
this.queueInfo.setQueueFileName(this.queuePath + File.separator + this.queueInfo.getQueueName() + ".queue");
File file = new File(this.queueInfo.getQueueFileName());
this.channel = new RandomAccessFile(file, "rw").getChannel();
//this.lock = this.channel.lock();
if (file.length() == 0) {
// Push Pop 카운트 초기화
@ -53,13 +68,22 @@ public abstract class WriteQueue {
} else {
readHeader();
}
}catch(Exception e){
// backupQueue();
} catch(Exception e) {
throw e;
} finally {
//lock.release();
}
}
public void backupQueue() throws IOException {
/* 1. 백업경로 */
String backupDir = this.queuePath + File.separator + "backup" + File.separator + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
/* 2. 파일 백업 */
JobFileFactory.transferFile(this.queuePath, this.queueInfo.getQueueName() + ".queue", backupDir, this.queueInfo.getQueueName() + ".queue");
}
protected void readHeader() throws Exception {
try {
initHeaderBuffer();
@ -102,7 +126,7 @@ public abstract class WriteQueue {
this.headerBuffer.position(0);
}
protected void close() throws IOException {
public void close() throws IOException {
try {
if (isOpen()) {
channel.close();
@ -120,7 +144,7 @@ public abstract class WriteQueue {
return this.channel.isOpen();
}
protected void truncateQueue() throws Exception{
public void truncateQueue() throws Exception{
try {
/* 1. 날짜가 지난경우와 더이상 읽을 데이터가 없을 경우 큐를 초기화 */
String thisDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
@ -213,7 +237,7 @@ public abstract class WriteQueue {
return ServiceCode.OK.getCode();
}
abstract int isValidateMessageForExtend(BasicMessageDto messageDto);
abstract void pushMessageToBuffer(BasicMessageDto messageDto) throws Exception;
abstract void initDataBuffer();
public abstract int isValidateMessageForExtend(BasicMessageDto messageDto);
public abstract void pushMessageToBuffer(BasicMessageDto messageDto) throws Exception;
public abstract void initDataBuffer();
}

View File

@ -1,6 +1,7 @@
package com.munjaon.server.queue.service;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.pool.KakaoAlarmMemoryQueue;
import com.munjaon.server.queue.pool.WriteQueue;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -10,6 +11,7 @@ import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KakaoAlarmQueueService implements QueueAction {
private final KakaoAlarmMemoryQueue memoryQueue = KakaoAlarmMemoryQueue.getInstance();
@Override
public boolean isExistQueue(String name) {
return false;
@ -34,4 +36,24 @@ public class KakaoAlarmQueueService implements QueueAction {
public int saveMessageToTable(BasicMessageDto data) {
return 0;
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
memoryQueue.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
return memoryQueue.memoryDeQueue();
}
@Override
public int getMemorySize() {
return memoryQueue.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
return memoryQueue.isMemoryEmpty();
}
}

View File

@ -1,6 +1,7 @@
package com.munjaon.server.queue.service;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.pool.KakaoAlarmMemoryQueue;
import com.munjaon.server.queue.pool.WriteQueue;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -10,6 +11,7 @@ import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class KakaoFriendQueueService implements QueueAction {
private final KakaoAlarmMemoryQueue memoryQueue = KakaoAlarmMemoryQueue.getInstance();
@Override
public boolean isExistQueue(String name) {
return false;
@ -34,4 +36,24 @@ public class KakaoFriendQueueService implements QueueAction {
public int saveMessageToTable(BasicMessageDto data) {
return 0;
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
memoryQueue.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
return memoryQueue.memoryDeQueue();
}
@Override
public int getMemorySize() {
return memoryQueue.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
return memoryQueue.isMemoryEmpty();
}
}

View File

@ -1,6 +1,7 @@
package com.munjaon.server.queue.service;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.pool.LmsMemoryQueue;
import com.munjaon.server.queue.pool.LmsQueuePool;
import com.munjaon.server.queue.pool.WriteQueue;
import lombok.RequiredArgsConstructor;
@ -12,6 +13,7 @@ import org.springframework.stereotype.Service;
@RequiredArgsConstructor
public class LmsQueueService implements QueueAction {
private final LmsQueuePool queueInstance = LmsQueuePool.getInstance();
private final LmsMemoryQueue memoryQueue = LmsMemoryQueue.getInstance();
@Override
public boolean isExistQueue(String name) {
@ -46,4 +48,24 @@ public class LmsQueueService implements QueueAction {
public int saveMessageToTable(BasicMessageDto data) {
return 0;
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
memoryQueue.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
return memoryQueue.memoryDeQueue();
}
@Override
public int getMemorySize() {
return memoryQueue.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
return memoryQueue.isMemoryEmpty();
}
}

View File

@ -1,6 +1,7 @@
package com.munjaon.server.queue.service;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.pool.MmsMemoryQueue;
import com.munjaon.server.queue.pool.MmsQueuePool;
import com.munjaon.server.queue.pool.WriteQueue;
import lombok.RequiredArgsConstructor;
@ -12,6 +13,7 @@ import org.springframework.stereotype.Service;
@RequiredArgsConstructor
public class MmsQueueService implements QueueAction {
private final MmsQueuePool queueInstance = MmsQueuePool.getInstance();
private final MmsMemoryQueue memoryQueue = MmsMemoryQueue.getInstance();
@Override
public boolean isExistQueue(String name) {
return queueInstance.isExistQueue(name);
@ -45,4 +47,24 @@ public class MmsQueueService implements QueueAction {
public int saveMessageToTable(BasicMessageDto data) {
return 0;
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
memoryQueue.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
return memoryQueue.memoryDeQueue();
}
@Override
public int getMemorySize() {
return memoryQueue.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
return memoryQueue.isMemoryEmpty();
}
}

View File

@ -9,4 +9,9 @@ public interface QueueAction {
void addQueue(WriteQueue queue);
void pushQueue(BasicMessageDto data);
int saveMessageToTable(BasicMessageDto data);
void memoryEnQueue(BasicMessageDto data);
BasicMessageDto memoryDeQueue();
int getMemorySize();
boolean isMemoryEmpty();
}

View File

@ -1,197 +0,0 @@
package com.munjaon.server.queue.service;
import com.munjaon.server.queue.dto.QueueInfo;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class ReadQueue {
/** Queue Header Size - [Create Date:10 Byte, Push Count:10 Byte] */
private static final int QUEUE_HEADER_LENGTH = 20;
/** Queue Create Date - [Format:YYYYMMDD] */
private String createDate = "";
/** Queue Pop Counter */
private int popCounter = 0;
/** Queue Push Counter */
private int pushCounter = 0;
/** XML 읽은 날짜 */
private String readXMLDate = "";
/** Queue Header Buffer */
private ByteBuffer headerBuffer = null;
/** Queue Data Read Buffer */
private ByteBuffer readBuffer = null;
/** Queue Information */
private QueueInfo queueInfo = null;
/** Queue File Channel */
private FileChannel channel = null;
/** Header 에서 사용하는 변수 */
byte[] headerArray = null;
/** XML Control */
// private Document document = null; // XML Document
// private Element rootElement = null;
// private Element childElement = null;
// private final Format format = Format.getPrettyFormat();
// private XMLOutputter outputter = null;
// private FileWriter fileWriter = null;
//
// public ReadQueue(QueueInfo info) throws Exception{
// this.queueInfo = info;
// this.headerBuffer = ByteBuffer.allocateDirect(this.QUEUE_HEADER_LENGTH);
// this.readBuffer = ByteBuffer.allocateDirect(this.queueInfo.getQueueDataLength());
// try{
// File file = new File(this.queueInfo.getQueueFileName());
// if(file.exists()){
// this.channel = new RandomAccessFile(file, "r").getChannel();
// // 파일큐의 헤더 정보를 읽어온다.
// readHeader();
// // XML Write Formater
// this.format.setEncoding("EUC-KR");
// this.format.setIndent("\t");
//
// // 해당큐를 읽은 PopCounter 정보를 가져온다.
// file = new File(this.queueInfo.getReadXMLFileName());
// if(file.exists()){
// readPopCounter();
// }else{
// this.readXMLDate = MessageUtil.currentDay();
// this.popCounter = 0;
// writePopCounter();
// }
// }else{
// throw new Exception(this.queueInfo.getQueueName() + "'s Queue is Not Exists!!");
// }
//
// }catch(Exception e){
// throw e;
// }
// }
// public ByteBuffer popBuffer() throws Exception{
// try {
// readHeader();
// readPopCounter();
//
// if(this.popCounter == this.pushCounter){
// // 더이상 읽을 데이터가 없는 경우
// return null;
// }else{
// initReadBuffer();
// this.channel.position(QUEUE_HEADER_LENGTH + (this.queueInfo.getQueueDataLength() * this.popCounter));
// this.channel.read(this.readBuffer);
// this.popCounter = this.popCounter + 1;
// // Header 정보 변경
// writePopCounter();
//
// return this.readBuffer;
// }
// } catch(Exception e) {
// throw e;
// }
// }
// public void readPopCounter() throws Exception{
// try {
// this.document = new SAXBuilder().build(this.queueInfo.getReadXMLFileName());
// this.rootElement = this.document.getRootElement();
// this.readXMLDate = this.rootElement.getChild("createDate").getText().trim();
// this.popCounter = Integer.parseInt(this.rootElement.getChild("PopCounter").getText().trim());
// } catch(Exception e) {
// throw e;
// }
// }
// public void writePopCounter() throws Exception{
// try {
// // Root Element
// this.rootElement = new Element("ReadQueue");
// // 생성날짜
// this.childElement = new Element("createDate");
// this.childElement.setText(this.readXMLDate);
// this.rootElement.addContent(this.childElement);
// // 읽은 카운트
// this.childElement = new Element("PopCounter");
// this.childElement.setText(Integer.toString(this.popCounter));
// this.rootElement.addContent(this.childElement);
//
// this.document = new Document(this.rootElement);
// this.outputter = new XMLOutputter();
// this.outputter.setFormat(this.format);
// this.fileWriter = new FileWriter(this.queueInfo.getReadXMLFileName());
// this.outputter.output(this.document, this.fileWriter);
// } catch(Exception e) {
// throw e;
// }finally{
// if(this.fileWriter != null){this.fileWriter.close(); this.fileWriter = null; }
// }
// }
// public void initPopCounter() throws Exception{
// try {
// initHeaderBuffer();
// // 데이터 초기화
// this.readXMLDate = MessageUtil.currentDay();
// this.popCounter = 0;
//
// writePopCounter();
// } catch(Exception e) {
// throw e;
// }
// }
// public void readHeader() throws Exception {
// try {
// initHeaderBuffer();
// this.channel.position(0);
// this.channel.read(this.headerBuffer);
// this.headerArray = new byte[10];
// // 생성날짜 가져오기 - 생성날짜(10) / 쓴카운트(10)
// this.headerBuffer.position(0);
// this.headerBuffer.get(this.headerArray);
// this.createDate = (new String(this.headerArray)).trim();
// // 카운트 가져오기
// this.headerArray = new byte[10];
// this.headerBuffer.position(10);
// this.headerBuffer.get(this.headerArray);
// this.pushCounter = Integer.parseInt((new String(this.headerArray)).trim());
// } catch(Exception e) {
// throw e;
// }
// }
// public void close() throws IOException {
// try {
// if(channel != null && channel.isOpen()) {
// channel.close();
// }
// } catch(IOException e) {
// throw e;
// }
// }
// public void initHeaderBuffer(){
// this.headerBuffer.clear();
// for(int loopCnt=0;loopCnt<this.QUEUE_HEADER_LENGTH;loopCnt++){
// this.headerBuffer.put(QueueVariable.SET_DEFAULT_BYTE);
// }
// this.headerBuffer.position(0);
// }
// public void initReadBuffer(){
// this.readBuffer.clear();
// for(int loopCnt=0;loopCnt<this.queueInfo.getQueueDataLength();loopCnt++){
// this.readBuffer.put(QueueVariable.SET_DEFAULT_BYTE);
// }
// this.readBuffer.position(0);
// }
// public String getQueueName(){
// if(this.queueInfo == null)
// return null;
// else
// return this.queueInfo.getQueueName();
// }
// public String getCreateDate() {
// return createDate;
// }
// public String getReadXMLDate() {
// return readXMLDate;
// }
// public int getPushCounter() {
// return pushCounter;
// }
// public int getPopCounter() {
// return popCounter;
// }
}

View File

@ -1,6 +1,7 @@
package com.munjaon.server.queue.service;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.pool.SmsMemoryQueue;
import com.munjaon.server.queue.pool.SmsQueuePool;
import com.munjaon.server.queue.pool.WriteQueue;
import lombok.RequiredArgsConstructor;
@ -12,6 +13,7 @@ import org.springframework.stereotype.Service;
@RequiredArgsConstructor
public class SmsQueueService implements QueueAction {
private final SmsQueuePool queueInstance = SmsQueuePool.getInstance();
private final SmsMemoryQueue memoryQueue = SmsMemoryQueue.getInstance();
@Override
public boolean isExistQueue(String name) {
@ -44,6 +46,27 @@ public class SmsQueueService implements QueueAction {
@Override
public int saveMessageToTable(BasicMessageDto data) {
log.debug("Save message to table : {}", data);
return 0;
}
@Override
public void memoryEnQueue(BasicMessageDto data) {
memoryQueue.memoryEnQueue(data);
}
@Override
public BasicMessageDto memoryDeQueue() {
return memoryQueue.memoryDeQueue();
}
@Override
public int getMemorySize() {
return memoryQueue.getMemorySize();
}
@Override
public boolean isMemoryEmpty() {
return memoryQueue.isMemoryEmpty();
}
}

View File

@ -1,19 +1,51 @@
package com.munjaon.server.server.service;
import com.munjaon.server.queue.dto.BasicMessageDto;
import com.munjaon.server.queue.enums.QueueTypeWorker;
import com.munjaon.server.queue.pool.ReadQueue;
import com.munjaon.server.queue.pool.WriteQueue;
import com.munjaon.server.util.CommonUtil;
import com.munjaon.server.util.ServiceUtil;
import org.json.simple.JSONObject;
import java.util.HashMap;
import java.util.Map;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class QueueServerService extends Service {
private int SMS_QUEUE_SIZE = 0;
private int LMS_QUEUE_SIZE = 0;
private int MMS_QUEUE_SIZE = 0;
private int KAT_QUEUE_SIZE = 0;
private int KFT_QUEUE_SIZE = 0;
private Map<String, String> queueConfigMap = new HashMap<String, String>();
/** 큐모드 설정(WAIT : 파일에 쓰기모드만 / DB : DB에 적재) */
private String QUEUE_MODE = null;
/** 큐모드 설정 모니터링(10초단위로 체크한다.) */
private long QUEUE_MODE_CHECK_TIME = 0;
/** 큐 초기화 모니터링(1분단위로 체크한다.) 및 초기화 여부 */
private long QUEUE_INIT_CHECK_TIME = 0;
/** Commit 누적 카운트 */
private long SUM_COMMIT_COUNT = 0;
/* 쓰기큐 */
private WriteQueue writeQueue;
private ReadQueue readQueue;
private QueueTypeWorker worker;
public QueueServerService(String serviceName, WriteQueue writeQueue, ReadQueue readQueue) {
super(serviceName);
this.writeQueue = writeQueue;
this.readQueue = readQueue;
this.worker = QueueTypeWorker.find(writeQueue.getQueueInfo().getServiceType());
if (this.worker == null) {
throw new RuntimeException("No worker found for " + writeQueue.getQueueInfo().getServiceType());
}
}
/** 큐모드 설정(WAIT : 파일에 쓰기모드만 / DB : DB에 적재) */
public void checkMode(){
if((System.currentTimeMillis() - QUEUE_MODE_CHECK_TIME) > 10000){
QUEUE_MODE = PropertyLoader.get("COMMON.QUEUE_MODE");
// 예외체크
if(QUEUE_MODE == null) QUEUE_MODE = "DB";
QUEUE_MODE = QUEUE_MODE.toUpperCase();
// 읽은 시간 업데이트
QUEUE_MODE_CHECK_TIME = System.currentTimeMillis();
}
}
@Override
public void checkReady() {
@ -22,17 +54,172 @@ public class QueueServerService extends Service {
@Override
public void initResources() {
SMS_QUEUE_SIZE =
// 큐모드 정보를 읽어온다.
QUEUE_MODE = getProp("COMMON", "QUEUE_MODE");
if (QUEUE_MODE == null) {
QUEUE_MODE = "DB"; // 예외체크
}
try {
if (isRun()) {
/* 쓰기 큐 */
writeQueue.initQueue();
/* 읽기 큐 */
readQueue.initQueue();
/* 큐 초기화 */
initQueue();
/* 큐 pool에 등록 */
worker.addQueue(writeQueue);
/* 메모리큐에서 다시 적재하기 */
loadMemoryQueue();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void initQueue() throws Exception {
boolean isQueueInit = false; // 초기화 여부
String writeQueueCreateDate = writeQueue.getCreateDate();
String readQueueCreateDate = readQueue.getReadXMLDate();
String thisDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
if (readQueueCreateDate.equals(readQueueCreateDate)) {
if (thisDate.equals(writeQueueCreateDate)) {
isQueueInit = false;
} else {
String preDate = CommonUtil.getTargetDay(-24);
// 하루전이면 남은 큐데이터를 DB 처리한다.
if (writeQueueCreateDate.equals(preDate)) {
while (true) {
BasicMessageDto messageDto = readQueue.popMessageFromBuffer();
if (messageDto == null) {
break;
} else {
worker.memoryEnQueue(messageDto);
}
}
isQueueInit = true;
} else {
isQueueInit = true;
}
}
} else {
isQueueInit = true;
}
// 큐초기화 대상이면
if (isQueueInit) {
/* 커밋 카운트 초기화 */
SUM_COMMIT_COUNT = 0;
writeQueue.truncateQueue();
readQueue.initPopCounter();
}
}
private void loadMemoryQueue() throws Exception {
if (worker.getMemorySize() > 0) {
while (true) {
BasicMessageDto messageDto = worker.memoryDeQueue();
if (messageDto == null) {
break;
} else {
writeQueue.pushMessageToBuffer(messageDto);
}
}
}
}
private void checkQueue() throws Exception {
if((System.currentTimeMillis() - QUEUE_INIT_CHECK_TIME) < 60000){
return;
}
// 초기화 체크시간 업데이트
QUEUE_INIT_CHECK_TIME = System.currentTimeMillis();
// 초기화 여부
boolean isQueueInit = false;
// 오늘 날짜 체크 생성날짜 저장
String thisDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String writeQueueCreateDate = writeQueue.getCreateDate();
if (writeQueueCreateDate.equals(thisDate)) {
// 아무 처리도 하지 않는다.
} else {
/* 큐 pool에서 제거 */
if (worker != null) {
worker.removeQueue(writeQueue.getQueueInfo().getQueueName());
}
while(true){
BasicMessageDto messageDto = readQueue.popMessageFromBuffer();
if (messageDto == null) {
break;
} else {
worker.memoryEnQueue(messageDto);
}
}
isQueueInit = true;
}
if (isQueueInit) {
/* 커밋 카운트 초기화 */
SUM_COMMIT_COUNT = 0;
// 큐를 초기화한다.
writeQueue.truncateQueue();
readQueue.initPopCounter();
/* 큐에 등록 */
worker.addQueue(writeQueue);
/* 메모리큐에서 다시 적재하기 */
loadMemoryQueue();
}
}
private void messageService() throws Exception {
int DB_PROC_COUNT = 0;
for (int loopCnt = 0; loopCnt < ServiceUtil.COMMIT_COUNT; loopCnt++) {
BasicMessageDto messageDto = readQueue.popMessageFromBuffer();
if (messageDto == null) {
break;
}
worker.saveMessageToTable(messageDto);
DB_PROC_COUNT++;
SUM_COMMIT_COUNT++;
}
// DB 처리한 카운트에 대한 처리
if (DB_PROC_COUNT > 0) {
Thread.sleep(10);
} else {
Thread.sleep(1000);
}
}
@Override
public void releaseResources() {
try {
if (writeQueue != null) {
/* 쓰기 Pool에서 제거 */
worker.removeQueue(writeQueue.getQueueInfo().getQueueName());
/* 자원 해제 */
writeQueue.close();
}
if (readQueue != null) {
readQueue.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void doService() {
while (isRun()) {
try {
checkMode();
checkQueue();
messageService();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override

View File

@ -24,7 +24,7 @@ public abstract class Service extends Thread {
public Service() {}
public Service(String serviceName) {
super(serviceName);
LOG_FILE = getProp("LOG_FILE");
LOG_FILE = System.getProperty("ROOTPATH") + getProp("LOG_FILE");
}
protected String getProp(String name) {
@ -92,7 +92,8 @@ public abstract class Service extends Thread {
}
protected void initLogFile() {
LOG_FILE = getProp("LOG_FILE");
LOG_FILE = System.getProperty("ROOTPATH") + getProp("LOG_FILE");
System.out.println("LOG_FILE: " + LOG_FILE);
setLogFile( LOG_FILE );
saveSystemLog("Service Log Initializing ... ...");
}
@ -103,9 +104,12 @@ public abstract class Service extends Thread {
try {
/* 1. 서비스간의 dependency에 따른 체크 */
checkReady();
/* 2. 로그 초기화 */
initLogFile();
/* 3. Runflag reload */
reloadCheckRun();
if (isRun() && isReady()) {
/* 2. 로그 초기화 */
initLogFile();
/* 3. 서비스 초기화 */
initResources();
/* 4. 서비스 시작 */

View File

@ -0,0 +1,359 @@
package com.munjaon.server.util;
import java.text.SimpleDateFormat;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
public final class CommonUtil {
public static final String DATE_FORMAT_14 = "yyyyMMddHHmmss";
public static final int TIMEOUT_DIVIDE = 1000 * 60;
// NewLine 공백처리하기
public static String cutNewLine(String msg){
String encString = "";
if(msg != null){
encString = msg.replaceAll("\r", "");
encString = encString.replaceAll("\n", "");
}
return encString;
}
// NewLine 인코딩 돌리기
public static String encodeNewLine(String msg){
String encString = "";
if(msg != null){
/*encString = msg.replaceAll("\r", "\n");
encString = encString.replaceAll("\n\n", "\n");
encString = encString.replaceAll("\n", "ⓝⓛ");*/
encString = msg.replaceAll("\r", "");
encString = encString.replaceAll("\n", "ⓝⓛ");
}
return encString;
}
// NewLine 디코딩 돌리기
public static String decodeNewLine(String msg){
String encString = "";
if(msg != null){
encString = msg.replaceAll("ⓝⓛ", "\n");
}
return encString;
}
// Comma 인코딩 돌리기
public static String encodeComma(String msg){
String encString = "";
if(msg != null){
encString = msg.replaceAll("\"", "ⓒⓞⓜⓜⓐ");
}
return encString;
}
// Comma 디코딩 돌리기
public static String decodeComma(String msg){
String decString = "";
if(msg != null){
decString = msg.replaceAll("ⓒⓞⓜⓜⓐ", "\"");
}
return decString;
}
// 문자열이 널인지 체크하는 함수
public static boolean checkMsgNull(String msgStr){
boolean isValid = false;
String[] msgArray = msgStr.split("ⓜⓢ");
int msgArrayLen = msgArray.length;
for(int i=0;i<msgArrayLen;i++){
msgArray[i] = msgArray[i].trim();
if(msgArray[i].length() < 1){
isValid = true;
break;
}
}
return isValid;
}
// 해당 길이만큼 문자열을 자르는 함수
public static String CutString(String str,int limit)
{
int len = str.length();
int sumLength=0;
String cutString = null;
byte[] toByte = str.getBytes();
if(limit < 2)
return "";
if(toByte.length > limit){
for(int cnt = 0 ; cnt < len ;cnt++){
if ((str.charAt(cnt) & 0xFF00) == 0) // 1 Byte 문자이면
sumLength++;
else // 2바이트 문자라면...
sumLength = sumLength + 2;
if(sumLength > limit){
cutString = str.substring(0, cnt);
break;
}
}
}else{
cutString = str;
}
return cutString;
}
// 날짜를 구간별로 자르는 함수
// (inDate:날짜 >> (입력형식:2012년 12월 20일일 경우 : 20121220))
public static int substringDate(String inDate, int beginIndex, int lastIndex){
int subDate = -1;
inDate = doNumber(inDate);
if(inDate.length() == 8){
if((beginIndex < lastIndex) && (lastIndex <= 8))
subDate = Integer.parseInt(inDate.substring(beginIndex, lastIndex));
}
return subDate;
}
// 시간을 구간별로 자르는 함수
public static int substringTime(String inTime, int beginIndex, int lastIndex){
int subDate = -1;
inTime = doNumber(inTime);
if(inTime.length() == 6){
if((beginIndex < lastIndex) && (lastIndex <= 6))
subDate = Integer.parseInt(inTime.substring(beginIndex, lastIndex));
}
return subDate;
}
public static boolean validDate(String fullDate){
boolean validFlag = true;
fullDate = doNumber(fullDate);
if(fullDate.length() != 14){
validFlag = false;
}else{
if(!(isValidDay(fullDate.substring(0, 8)) && isValidTime(fullDate.substring(8, 14))))
validFlag = false;
}
return validFlag;
}
// 유효한 날짜 형식인지 체크하는 함수
public static boolean isValidDay(String inDate){
boolean validFlag = true;
int year = substringDate(inDate, 0, 4);
// 년도에서 앞에 두자리 체크
if(year < 2013){
validFlag = false;
}
int month = substringDate(inDate, 4, 6);
int day = substringDate(inDate, 6, 8);
if (month < 1 || month > 12) { // check month range
validFlag = false;
}
if (day < 1 || day > 31) {
validFlag = false;
}
if ((month==4 || month==6 || month==9 || month==11) && day==31) {
validFlag = false;
}
if (month == 2) { // check for february 29th
boolean isleap = (year % 4 == 0 && (year % 100 != 0 || year % 400 == 0));
if (day>29 || (day==29 && !isleap)) {
validFlag = false;
}
}
return validFlag;
}
// 유효한 날짜 형식인지 체크하는 함수
public static boolean isValidTime(String inTime){
boolean validFlag = true;
int hour = substringTime(inTime, 0, 2);
int minute = substringTime(inTime, 2, 4);
int second = substringTime(inTime, 4, 6);
if(hour < 0 || hour > 23)
validFlag = false;
if(minute < 0 || minute > 59)
validFlag = false;
if(second < 0 || second > 59)
validFlag = false;
return validFlag;
}
// 예약일이 하루전인지 체크하는 함수
// 앞의 날짜가 뒤의 날짜보다 이전이면: true / 그렇지않으면 false)
public static boolean isBeforeDate(String srcDate, String destDate){
boolean validFlag = false;
Calendar srcCal = getCalendar(srcDate);
Calendar destCal = getCalendar(destDate);
if(srcCal == null || destCal == null){
validFlag = true;
}else{
if(srcCal.before(destCal))
validFlag = true;
}
return validFlag;
}
public static Calendar getCalendar(int hour){
Calendar thisCal = null;
thisCal.add(Calendar.HOUR, hour);
return thisCal;
}
public static Calendar getCalendar(String inDate){
Calendar thisCal = null;
inDate = doNumber(inDate);
if(inDate.length() == 14 && validDate(inDate)){
String date = inDate.substring(0, 8);
String time = inDate.substring(8, 14);
int year = substringDate(date, 0, 4);
int month = substringDate(date, 4, 6);
int day = substringDate(date, 6, 8);
int hour = substringTime(time, 0, 2);
int minute = substringTime(time, 2, 4);
int second = substringTime(time, 4, 6);
thisCal = Calendar.getInstance();
thisCal.set(year, (month-1), day, hour, minute, second);
}
return thisCal;
}
public static String currentTime(){
Calendar currDate = Calendar.getInstance();
SimpleDateFormat dateForm = new SimpleDateFormat("HHmmss");
return dateForm.format(currDate.getTime());
}
public static String getLastDate(String year, String month, String day){
String dateStr = null;
String lastDay = null;
Calendar currDate = Calendar.getInstance();
currDate.set(Integer.parseInt(year),Integer.parseInt(month)-1,Integer.parseInt(day));
lastDay = Integer.toString(currDate.getActualMaximum(Calendar.DAY_OF_MONTH));
dateStr = year+month+lastDay;
return dateStr;
}
public static String currentDay(){
Calendar currDate = Calendar.getInstance();
SimpleDateFormat dateForm = new SimpleDateFormat("yyyyMMdd");
return dateForm.format(currDate.getTime());
}
public static String currentDate(){
Calendar currDate = Calendar.getInstance();
SimpleDateFormat dateForm = new SimpleDateFormat("yyyyMMddHHmmss");
return dateForm.format(currDate.getTime());
}
public static String currentLogDate(){
Calendar currDate = Calendar.getInstance();
SimpleDateFormat dateForm = new SimpleDateFormat("[yyyy-MM-dd][HH:mm:ss]");
return dateForm.format(currDate.getTime());
}
// "yyyyMMddHHmmss" 조합
public static String formatDate(String format){
Calendar currDate = Calendar.getInstance();
SimpleDateFormat dateForm = new SimpleDateFormat(format);
return dateForm.format(currDate.getTime());
}
public static String getTargetDate(int term){
Calendar currDate = Calendar.getInstance();
currDate.add(Calendar.HOUR, term);
SimpleDateFormat dateForm = new SimpleDateFormat("yyyyMMddHHmmss");
return dateForm.format(currDate.getTime());
}
public static String getTargetDay(int term){
Calendar currDate = Calendar.getInstance();
currDate.add(Calendar.HOUR, term);
SimpleDateFormat dateForm = new SimpleDateFormat("yyyyMMdd");
return dateForm.format(currDate.getTime());
}
public static Date parseDate14(String sDay){
Date date = null;
if(sDay != null || sDay.length() == 14){
SimpleDateFormat dateForm = new SimpleDateFormat(DATE_FORMAT_14);
try {
date = dateForm.parse(sDay);
} catch (ParseException e) {
}
}
return date;
}
public static int validTimeOut(String sDay){
int interval = 100000;
Date fromDate = parseDate14(sDay);
Date toDate = parseDate14(currentDate());
if(fromDate != null && toDate != null){
long duration = toDate.getTime() - fromDate.getTime();
interval = (int)(duration / TIMEOUT_DIVIDE);
}
return interval;
}
// 숫자만 반환하는 함수
public static String doNumber(String spell){
String phoneNumber = "";
if(spell == null){
return phoneNumber;
}
spell = spell.trim();
int spell_Length = spell.length();
if(spell_Length < 1){
return phoneNumber;
}
for(int i=0;i<spell_Length;i++){
char eachChar = spell.charAt(i);
if( 0x30 <= eachChar && eachChar <= 0x39 ){
phoneNumber += eachChar;
}
}
return phoneNumber;
}
// 소수점 뒤에 해당하는 자리만큼 자르기
public static String cutFloatNumber(String srcNum, int digit){
String headNum = "";
String tailNum = "";
String retNum = "";
if(!(srcNum == null || srcNum.trim().equals(""))){
srcNum = srcNum.trim();
int index = srcNum.indexOf(".");
// 소수점 위치가 0보다 큰경우만 처리
if(index > 0){
headNum = srcNum.substring(0, index);
tailNum = srcNum.substring((index + 1), srcNum.length());
if(tailNum.length() == 0){
tailNum = "0";
}
if(tailNum.length() > digit){
tailNum = tailNum.substring(0, digit);
}
retNum = headNum + "." + tailNum;
}
}
return retNum;
}
// 수신번호 체크하기
public static boolean CheckPhone(String src) {
if(src == null || src.trim().length() < 10) {
return false;
}
if(!src.startsWith("0")) {
return false;
}
return true;
}
// 문자열 공백 제거
public static String trim(String obj) {
return StringUtil.trim(obj);
}
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
//System.out.println(QueueUtil.cutFloatNumber("10.2", 2));
//System.out.println(QueueUtil.isValidTime("20131207173555"));
/*Calendar currDate = Calendar.getInstance();
currDate.add(Calendar.HOUR, 0); // 1년후의 시간
System.out.println("하루전 시간 : " + currDate.getTime());*/
System.out.println(CommonUtil.isBeforeDate("20131206121212", "20131207121212"));
System.out.println("하루전 : " + CommonUtil.getTargetDay(-24));
System.out.println(CommonUtil.currentLogDate());
//Date date = MessageUtil.parseDate14("20141208125958");
System.out.println("validTimeOut() : " + CommonUtil.validTimeOut("20141209154558"));
}
}

View File

@ -117,6 +117,10 @@ public final class MessageUtil {
return pushCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + pushCounter + dataByteLength);
}
public static int calcReadPosition(int popCounter, int dataByteLength) {
return popCounter < 0 ? QueueHeaderConfig.QUEUE_HEADER_LENGTH : (QueueHeaderConfig.QUEUE_HEADER_LENGTH + popCounter + dataByteLength);
}
public static void setBytesForCommonMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
/* 1. 사용자 아이디 */
buffer.position(BodyCommonConfig.USERID_BYTE_POSITION);
@ -159,12 +163,92 @@ public final class MessageUtil {
buffer.put(messageDto.getRouterSeq().getBytes());
}
public static void getBytesForCommonMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
byte[] destArray = null;
if (buffer == null || messageDto == null) {
return;
}
/* 1. 사용자 아이디 */
buffer.position(BodyCommonConfig.USERID_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.USERID_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserId(new String(destArray));
/* 2. 요금제(선불 : P / 후불 : A) : final : 아무작업도 하지 않음 */
/* 3. 단가 */
buffer.position(BodyCommonConfig.UNITCOST_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.UNITCOST_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUnitCost(new String(destArray));
/* 4. MSG Group ID */
buffer.position(BodyCommonConfig.MSGGROUPID_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.MSGGROUPID_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setMsgGroupID(new String(destArray));
/* 5. MSG ID */
buffer.position(BodyCommonConfig.MSGID_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.MSGID_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserMsgID(new String(destArray));
/* 6. Service Type */
buffer.position(BodyCommonConfig.SERVICETYPE_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.SERVICETYPE_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setServiceType(new String(destArray));
/* 7. 메시지 전송 결과 >> 성공 : 0 / 필터링 : 기타값 */
buffer.position(BodyCommonConfig.SENDSTATUS_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.SENDSTATUS_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setSendStatus(new String(destArray));
/* 8. 회신번호 */
buffer.position(BodyCommonConfig.SENDER_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.SENDER_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserSender(new String(destArray));
/* 9. 수신번호 */
buffer.position(BodyCommonConfig.RECEIVER_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.RECEIVER_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserReceiver(new String(destArray));
/* 10. 예약시간 */
buffer.position(BodyCommonConfig.RESERVEDT_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.RESERVEDT_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setReserveDt(new String(destArray));
/* 11. 요청시간 */
buffer.position(BodyCommonConfig.REQUESTDT_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.REQUESTDT_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setRequestDt(new String(destArray));
/* 12. 원격 주소 */
buffer.position(BodyCommonConfig.REMOTEIP_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.REMOTEIP_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setRemoteIP(new String(destArray));
/* 13. 발송망 */
buffer.position(BodyCommonConfig.AGENT_CODE_BYTE_POSITION);
destArray = new byte[BodyCommonConfig.AGENT_CODE_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setRouterSeq(new String(destArray));
}
public static void setBytesForSmsMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
/* 14. 메시지 */
buffer.position(SmsBodyConfig.SMS_MSG_BYTE_POSITION);
buffer.put(messageDto.getUserMessage().getBytes());
}
public static void getBytesForSmsMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
byte[] destArray = null;
if (buffer == null || messageDto == null) {
return;
}
/* 14. 메시지 */
buffer.position(SmsBodyConfig.SMS_MSG_BYTE_POSITION);
destArray = new byte[SmsBodyConfig.SMS_MSG_BYTE_LENGTH];
buffer.get(destArray);
messageDto.setUserMessage(new String(destArray));
}
public static void setBytesForMediaMessage(ByteBuffer buffer, BasicMessageDto messageDto) {
/* 14. 제목 */
buffer.position(MediaBodyConfig.SUBJECT_BYTE_POSITION);

View File

@ -0,0 +1,32 @@
package com.munjaon.server.util;
public final class ServiceUtil {
/** DB Commit Check Counter */
public static final int COMMIT_COUNT = 30;
public static String[] getServiceNames(String[] serviceNames) {
if (serviceNames == null) return null;
for (int i = 0; i < serviceNames.length; i++) {
serviceNames[i] = serviceNames[i].trim();
}
return serviceNames;
}
public static boolean isDuplicate(String[] serviceNames) {
if (serviceNames == null) return false;
boolean duplicate = false;
for (int i = 0; i < serviceNames.length; i++) {
for (int j = (i + 1); j < serviceNames.length; j++) {
if (serviceNames[i].equals(serviceNames[j])) {
duplicate = true;
break;
}
}
if (duplicate) break;
}
return duplicate;
}
}

View File

@ -84,6 +84,7 @@ public class XmlUtil {
String value = bookElement.getValue();
System.out.println(name + " : " + value);
}
} catch (IOException | JDOMException e) {
e.printStackTrace();
}