Commit 443c9bf2 by wuyang.zou

<1> 插件初始化登录ODS优化;[守护程序存在,不能及时收到POS的初始化数据]

插件收到POS的初始化数据 则更新本地文件进行持久化; 【自测OK; FileName: PosRequestInitData.json 】
插件启动后 读取初始化信息文件,如果6分钟没有收到初始数据则 检测 POS的接受消息端口是否正常监听,是则进行模拟登录;【自测OK; 端口正常:OK; 端口异常:OK ;】

<2> 插件针对启动项文件是否包含插件启动项信息进行读取,并伴随登录接口将数据上传【自测OK; KEY: plugin_boot_config 】

<3> 插件兜底长连接假死的自检:存在待推送订单且长时间没有接受到推送则进行长连接主动断开后重连;【自测OK】
10倍心跳周期中无推送订单记录(6分钟) 读取ODS心跳数据待推送订单数量[wait_push_order_sum 小于10倍心跳周期前的数量];
则断开重连(同时附带重连原因: KEY: plugin_relogin_reason );
parent cb5cd820
# CMake 最低版本要求
cmake_minimum_required (VERSION 2.8)
# 新增两个宏定义
#SET(CMAKE_BUILD_TYPE "RelWithDebInfo")
#SET(CMAKE_INSTALL_PREFIX "/usr/local")
# 指定gcc
SET(CMAKE_C_COMPILER "/usr/bin/gcc")
SET(CMAKE_CXX_COMPILER "/usr/bin/g++")
# 项目名称
project (proTakeaway)
#生成可执行文件名称
SET(TARGET_NAME takeaway)
#支持C++11
add_compile_options(-std=c++11)
#设置执行文件输出目录
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
#设置编译后的可执行程序优先调用本地库(和可执行程序在同一个文件夹下的库)
SET(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
SET(CMAKE_SKIP_BUILD_RPATH TRUE)
SET(CMAKE_INSTALL_RPATH "\$ORIGIN")
# 查找目录下的所有源文件,并将名称保存到 DIR_ 变量中
aux_source_directory(./3rdParty/easylogging/ DIR_LOGS)
aux_source_directory(./src/ DIR_SRCS)
aux_source_directory(./utility/ DIR_UTILITY)
# 指定生成目标
add_executable (${TARGET_NAME} ${DIR_LOGS} ${DIR_SRCS} ${DIR_UTILITY})
# 添加链接库
target_link_libraries(${TARGET_NAME} pthread)
......@@ -40,4 +40,15 @@ products[i].specs[j] 数组对象字段: extraPrice:套餐子商品加价价格
3.8 2021-07-26 wuyang.zou Version: 1.2.7-2 RC //新增属性字段:
orderCoupons[i] 数组对象字段: 仅orderCoupons[i].type="P"->是商品券时存在 ①bonusDayBusinessType:日商类型 ->入门店(S)默认值,入总部(T); ② bonusDayBusinessAmt:日商金(单位分);
3. 9 2021-10-13 wuyang.zou Version: 1.2.8 RC //新增插件的备注 属性字段;
\ No newline at end of file
3.9 2021-10-13 wuyang.zou Version: 1.2.8 RC //新增插件的备注 属性字段;
4.0 2022-06-30 wuyang.zou Version: 1.3.1 RC
<4.0.1> 插件初始化登录ODS优化;[守护程序存在,不能及时收到POS的初始化数据]
插件收到POS的初始化数据 则更新本地文件进行持久化; 【自测OK; FileName: PosRequestInitData.json 】
插件启动后 读取初始化信息文件,如果6分钟没有收到初始数据则 检测 POS的接受消息端口是否正常监听,是则进行模拟登录;【自测OK; 端口正常:OK; 端口异常:OK ;】
<4.0.2> 插件针对启动项文件是否包含插件启动项信息进行读取,并伴随登录接口将数据上传【自测OK; KEY: plugin_boot_config 】
<4.0.3> 插件兜底长连接假死的自检:存在待推送订单且长时间没有接受到推送则进行长连接主动断开后重连;【自测OK】
10倍心跳周期中无推送订单记录(6分钟) 读取ODS心跳数据待推送订单数量[wait_push_order_sum 小于10倍心跳周期前的数量];
则断开重连(同时附带重连原因: KEY: plugin_relogin_reason );
\ No newline at end of file
{"fm_cmd":1000,"listen_port":24445,"storeId":"208888","pos_id":"5","operator_id":"zouwuyang","is_master":false,"plugin_comment":"123.60.107.183","ver":1,"version":"1.3.1 RC"}
\ No newline at end of file
[SYS]
port=24446
#<<贝瑞>>
#config.ini 文件 中 [ODS] 模块中的
#(生产环境外网 IP)
#ip: 103.13.247.77
#(测试环境外网 IP [内网IP:10.0.103.116] )
#ip: 103.13.247.78
#<<全家>
#config.ini 文件 中 [ODS] 模块中的
#(生产环境外网 IP [内网IP:10.0.100.70] )
#ip: 103.13.247.72
#(测试环境外网 IP [内网IP:10.0.102.14] )
#ip: 103.13.247.73
#(生产环境外网 IP [内网IP:10.251.100.70] )
#ip: 123.60.107.182
#(测试环境外网 IP [内网IP:10.251.102.14] )
#ip: 123.60.107.183
[ODS]
ip=api.ods.chinafamilymart.com.cn
......
@echo OFF
if not "%~1"=="p" start /min cmd.exe /c %0 p&exit
set _selfAppName=cmd
set _task=takeaway_d.exe
set _svr=E:\zouwuyang\FamilyMart\PosPluginClient\familyMart_takeaway\bin\takeaway_d.exe
set _svrWorkDir=E:\zouwuyang\FamilyMart\PosPluginClient\familyMart_takeaway\bin\
echo ******** %time% Daemon - Daemon Plugin Bat Begin Running ...******** >nul
rem 检查 Daemon Script Running Or Not[ Can't gt 1, Will Been Call More 2];
for /f "delims=" %%a in ('tasklist /nh^|find /i "%_selfAppName%" /c') do set _daemonCount=%%a
echo ******** %time% CheckOrStart:: %_daemonCount% ******** >nul
if %_daemonCount% geq 3 (
echo ******** %time% CheckOrStart:: Daemon Second Running, Count geq 3 [first equ 2] Then Exit ...******** >nul
exit
) else (
echo ******** %time% CheckOrStart:: Daemon Fist Running, Then Go ...******** >nul
)
:CheckOrStart
rem 检查 Plugin App Running Or Not;
ping 127.0.0.257 -n 30 >nul & tasklist /nh | findstr /i %_task% >nul
:: echo %ERRORLEVEL% >nul
if ERRORLEVEL 1 (
echo %ERRORLEVEL%
echo ******** %time% CheckOrStart:: App Not Running ...******** >nul & goto StartSvr
) else (
echo %ERRORLEVEL%
echo ******** %time% CheckOrStart:: App Running ...******** >nul & goto CheckAgain
)
:StartSvr
echo ******** %time% StartSvr:: Restart Plugin Begin...******** >nul
cd %_svrWorkDir%
start %_svr%
echo ******** %time% StartSvr:: Restart Plugin End ...******** >nul
ping 127.0.0.257 -n 20 >nul & goto CheckOrStart
:CheckAgain
echo ******** %time% CheckAgain:: Plugin App Running, Keep Check After 200s ...******** >nul
ping 127.0.0.257 -n 20 >nul & goto CheckOrStart
\ No newline at end of file
#!/bin/sh
cd /opt/pos/zhclient
nohup /opt/pos/zhclient/zh_client > /dev/null &
nohup /opt/pos/fmtakeout/takeaway_d.exe > /dev/null &
export LANG=zh_CN.GBK
export LC_ALL=zh_CN.GBK
CREAM_JAR_DIR=/opt/pos/cream
CREAM_JAR_BAK_DIR=/opt/pos/cream_bak
GROOVY_HOME=/opt/pos/groovy-1.7
POS_HOME=/opt/pos
cd $POS_HOME
export DISPLAY=:0.0
/usr/X11R6/bin/xhost +
/usr/bin/x11vnc -forever -shared -ncache 10 -clip 800x600+0+0 -display :0 &
sh bin/setvolume.sh
sh bin/deploy_sandini.sh
/bin/chmod a+rw /dev/ttyS*
#/bin/chmod a+x bin/catAdapter
/sbin/modprobe lp
# Classpath for Cream --
if test -n "$CLASSPATH" ; then
CLASSPATH="${CREAM_JAR_DIR}/cream.jar:${POS_HOME}/conf:${CLASSPATH}"
else
CLASSPATH="${CREAM_JAR_DIR}/cream.jar:${POS_HOME}/conf"
fi
CLASSPATH="${CLASSPATH}:${GROOVY_HOME}/embeddable/groovy-all-1.7.10.jar"
for n in ${POS_HOME}/lib/*.jar ; do
CLASSPATH="${n}:${CLASSPATH}"
done
# Upgrade cream --
if [ -f cream_hot.jar ]; then
/bin/cp -f cream_hot.jar ${CREAM_JAR_DIR}/cream.jar
/bin/mv -f cream_hot.jar ${CREAM_JAR_BAK_DIR}/cream_hot.jar.`date +%Y-%m-%d`
fi
if [ -f cream.jar.pack.gz ]; then
/usr/bin/unpack200 cream.jar.pack.gz ${CREAM_JAR_DIR}/cream.jar
/bin/mv -f cream.jar.pack.gz ${CREAM_JAR_BAK_DIR}/cream.jar.pack.gz.`date +%Y-%m-%d`
fi
# Run Cream POS --
LD_LIBRARY_PATH=${POS_HOME}/native:${POS_HOME}
export LD_LIBRARY_PATH
MAIN_CLASS=hyi.cream.POSTerminalApplication
#MAIN_CLASS=hyi.cream.Bootstrapper
/usr/bin/java -Duser.language="zh" -Duser.region="CN" -DuseFakeCAT=false -DskipConnectServer=true \
-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005 \
-Dcom.sun.management.jmxremote.port=1099 -Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=192.168.0.207 \
-Djava.library.path=${POS_HOME}/native -Dfile.encoding=GBK -cp $CLASSPATH ${MAIN_CLASS} \
-screensize 800x600 \
>> log/stdout.log 2>> log/stderr.log
# Shrink log files
tail -c 50240000 log/stdout.log > log/stdout_.log; mv -f log/stdout_.log log/stdout.log
tail -c 50240000 log/stderr.log > log/stderr_.log; mv -f log/stderr_.log log/stderr.log
tail -c 50240000 log/player.log > log/player_.log; mv -f log/player_.log log/player.log
tail -c 50240000 log/trace_state.log > log/trace_state_.log; mv -f log/trace_state_.log log/trace_state.log
rm -f /root/.serverauth*
pkill x11vnc

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 14
# Blend for Visual Studio 14
VisualStudioVersion = 14.0.25420.1
MinimumVisualStudioVersion = 10.0.40219.1
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "familyMart_takeaway", "familyMart_takeaway\familyMart_takeaway.vcxproj", "{B24A017B-387C-49C2-A321-3554AD9A1D48}"
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "takeaway_ut", "takeaway_ut\takeaway_ut.vcxproj", "{395675BD-3E73-4656-A9D0-24A94B28C20C}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "fmPluginDaemon", "fmPluginDaemon", "{54A3388C-3886-46E1-87A9-6865E87582E4}"
ProjectSection(SolutionItems) = preProject
fmPluginDaemon\fmPluginDaemon.c = fmPluginDaemon\fmPluginDaemon.c
fmPluginDaemon\Makefile = fmPluginDaemon\Makefile
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
......
......@@ -155,6 +155,7 @@
<ClInclude Include="..\base\BaseDefine.h" />
<ClInclude Include="..\base\CommonStruct.h" />
<ClInclude Include="..\src\JsonModule.h" />
<ClInclude Include="..\src\PosHandle.h" />
<ClInclude Include="..\src\SocketModule.h" />
<ClInclude Include="..\utility\utility.h" />
<ClInclude Include="..\utility\zini.h" />
......@@ -163,6 +164,7 @@
<ClCompile Include="..\3rdParty\easylogging\easylogging++.cc" />
<ClCompile Include="..\src\JsonModule.cpp" />
<ClCompile Include="..\src\main.cpp" />
<ClCompile Include="..\src\PosHandle.cpp" />
<ClCompile Include="..\src\SocketModule.cpp" />
<ClCompile Include="..\utility\utility.cpp" />
<ClCompile Include="..\utility\zini.cpp" />
......
......@@ -51,6 +51,9 @@
<ClInclude Include="..\src\SocketModule.h">
<Filter>src</Filter>
</ClInclude>
<ClInclude Include="..\src\PosHandle.h">
<Filter>src</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\3rdParty\easylogging\easylogging++.cc">
......@@ -71,5 +74,8 @@
<ClCompile Include="..\src\SocketModule.cpp">
<Filter>src</Filter>
</ClCompile>
<ClCompile Include="..\src\PosHandle.cpp">
<Filter>src</Filter>
</ClCompile>
</ItemGroup>
</Project>
\ No newline at end of file
fmPluginDaemon: fmPluginDaemon.o
gcc -o fmPluginDaemon fmPluginDaemon.o
fmPluginDaemon.o: fmPluginDaemon.c
gcc -c fmPluginDaemon.c
clean:
rm *.o
rm fmPluginDaemon
\ No newline at end of file
@echo OFF
if not "%~1"=="p" start /min cmd.exe /c %0 p&exit
set _selfAppName=cmd
set _task=takeaway_d.exe
set _svr=E:\zouwuyang\FamilyMart\PosPluginClient\familyMart_takeaway\bin\takeaway_d.exe
set _svrWorkDir=E:\zouwuyang\FamilyMart\PosPluginClient\familyMart_takeaway\bin\
echo ******** %time% Daemon - Daemon Plugin Bat Begin Running ...******** >nul
rem 检查 Daemon Script Running Or Not[ Can't gt 1, Will Been Call More 2];
for /f "delims=" %%a in ('tasklist /nh^|find /i "%_selfAppName%" /c') do set _daemonCount=%%a
echo ******** %time% CheckOrStart:: %_daemonCount% ******** >nul
if %_daemonCount% geq 3 (
echo ******** %time% CheckOrStart:: Daemon Second Running, Count geq 3 [first equ 2] Then Exit ...******** >nul
exit
) else (
echo ******** %time% CheckOrStart:: Daemon Fist Running, Then Go ...******** >nul
)
:CheckOrStart
rem 检查 Plugin App Running Or Not;
ping 127.0.0.257 -n 30 >nul & tasklist /nh | findstr /i %_task% >nul
:: echo %ERRORLEVEL% >nul
if ERRORLEVEL 1 (
echo %ERRORLEVEL%
echo ******** %time% CheckOrStart:: App Not Running ...******** >nul & goto StartSvr
) else (
echo %ERRORLEVEL%
echo ******** %time% CheckOrStart:: App Running ...******** >nul & goto CheckAgain
)
:StartSvr
echo ******** %time% StartSvr:: Restart Plugin Begin...******** >nul
cd %_svrWorkDir%
start %_svr%
echo ******** %time% StartSvr:: Restart Plugin End ...******** >nul
ping 127.0.0.257 -n 20 >nul & goto CheckOrStart
:CheckAgain
echo ******** %time% CheckAgain:: Plugin App Running, Keep Check After 200s ...******** >nul
ping 127.0.0.257 -n 20 >nul & goto CheckOrStart
\ No newline at end of file
#include<unistd.h>
#include<signal.h>
#include<stdio.h>
#include<stdlib.h>
#include<sys/param.h>
#include<sys/types.h>
#include<sys/stat.h>
#include<time.h>
#include<sys/wait.h>
#include<fcntl.h>
#include<limits.h>
#define BUFSZ 150
// 监控 App 间隔周期 (单位: s)
#define MONITOR_APP_INTERVAL 60
#define MONITOR_APP_NAME "takeaway"
// POS Real Start Command :: nohup /opt/pos/fmtakeout/takeaway > /dev/null &
#define MONITOR_APP_START_COMMAND "nohup /opt/pos/fmtakeout/takeaway > /dev/null & "
#define MONITOR_APP_RUN_DIR "/opt/pos/zhclient/"
#define MONITOR_LOG_DIR "/opt/pos/fmtakeout/"
#define MONITOR_LOG_NAME "/opt/pos/fmtakeout/fmPluginDaemon.log"
void init_daemon()
{
int pid;
int i;
pid=fork();
if(pid<0)
exit(1); //创建错误,退出
else if(pid>0) //父进程退出
exit(0);
setsid(); //使子进程成为组长
pid=fork();
if(pid>0)
exit(0); //再次退出,使进程不是组长,这样进程就不会打开控制终端
else if(pid<0)
exit(1);
//关闭进程打开的文件句柄
for(i=0;i<NOFILE;i++)
close(i);
chdir(MONITOR_LOG_DIR); //改变目录
umask(0);//重设文件创建的掩码
return;
}
void err_quit(char *msg)
{
perror(msg);
exit(EXIT_FAILURE);
}
// 判断程序是否在运行
int does_service_work()
{
FILE* fp;
int count;
char buf[BUFSZ];
char command[150];
sprintf(command, "%s%s%s" ,"ps -ef | grep ", MONITOR_APP_NAME , " | grep -v grep | wc -l" );
if((fp = popen(command,"r")) == NULL)
err_quit("popen");
if( (fgets(buf,BUFSZ,fp))!= NULL )
{
count = atoi(buf);
}
pclose(fp);
return count;
}
void main()
{
FILE *fp = NULL;
time_t t;
int count;
init_daemon();
fp=fopen(MONITOR_LOG_NAME,"w+"); //守护程序初次启动,避免日志无限增大,对日志进行覆盖;
if ( fp > 0 ) {
fclose(fp);
fp = NULL;
}
while(1)
{
sleep( MONITOR_APP_INTERVAL ); //等待一分钟再写入
count = does_service_work();
fp=fopen(MONITOR_LOG_NAME,"a"); //为记录后续状态,对日志文件进行追加;
if ( fp != NULL ) {
time(&t);
if(count>0)
{
fprintf(fp,"FmPluginDaemon:: Current Time Is:%s And The Process Exists, The App Count Is %d\n",asctime(localtime(&t)), count); //转换为本地时间输出
fclose(fp);
fp = NULL ;
}
else
{
fprintf(fp,"FmPluginDaemon:: Current Time Is:%s And The Process Does Not Exist, Restart It!\n",asctime(localtime(&t))); //转换为本地时间输出
fclose(fp);
fp = NULL ;
chdir(MONITOR_APP_RUN_DIR); //改变被守护程序启动目录位置,同时也会改变DaemonApp的当前路径;
system(MONITOR_APP_START_COMMAND); //启动服务 注意自己可执行程序的目录
}
} else
{
exit(-1);
}
}
return;
}
\ No newline at end of file
......@@ -8,11 +8,6 @@
#include "../3rdParty/easylogging/easylogging++.h"
#include <iostream>
std::string g_store_id = "";
std::string g_pos_id = "";
bool g_pos_ismaster = false;
std::string g_plugin_comment = "";
using namespace rapidjson;
JsonModule::JsonModule()
......@@ -487,15 +482,32 @@ int JsonModule::isHeartbeatData(IN const char* data)
{
rapidjson::Document document; // 定义一个Document对象
document.Parse(data); // 解析,Parse()无返回值,也不会抛异常
if (document.HasParseError()) // 通过HasParseError()来判断解析是否成功
if (document.HasParseError()) // 通过HasParseError()来判断解析是否成功: 解析失败: 返回 3;
{
LOG(ERROR) << "isHeartbeatData JSON parse error:" << document.GetParseError() << ":" << document.GetErrorOffset();
return 3;
}
if(document.HasMember("fm_cmd")){
if(document["fm_cmd"].GetInt() == 3)
// ODS 正常推送 心跳消息类型;
if(document.HasMember("fm_cmd")) {
if (document["fm_cmd"].GetInt() == 3) {
g_ods_heart_10x_index++;
// 如果报文是心跳数据,且 是10次ODS心跳周期,则读取心跳报文中的待推送订单数;
if (0 == g_ods_heart_10x_index % 10) {
g_ods_wait_push_order_last_sum = g_ods_wait_push_order_this_sum;
if (document.HasMember("wait_push_order_sum")) {
g_ods_wait_push_order_this_sum = document["wait_push_order_sum"].GetInt();
}
else {
g_ods_wait_push_order_this_sum = 0; // 考虑ODS多实例升级过程中: 部分实例有返回,部分实例没有返回,将会导致待推送数据不准确;
}
}
return 1;
}
}
// ODS 正常推送消息类型;
return 2;
}
......@@ -568,7 +580,7 @@ bool JsonModule::convertInitDataPos2Ods(IN const std::string& data, OUT std::str
writer.Int(1);
writer.Key("version");
writer.String(VERSION);
writer.String(g_plugin_version.data() );
writer.EndObject();
......@@ -844,6 +856,7 @@ bool JsonModule::getOdsResponseData(IN int status_code, IN const std::string &ms
int fm_cmd = document1["fm_cmd"].GetInt();
LOG(INFO)<<"fm_cmd:"<<fm_cmd;
bool rlt=false;
if(fm_cmd==REQUEST_TYPE_NEWORDER_PUSH){
rlt=_getOrderResponseJson(posResponse,orderData,odsData,result);
......@@ -920,7 +933,7 @@ bool JsonModule::convertDataPos2Ods(IN const std::string &data, OUT std::string
bool JsonModule::convertDataOds2Pos(IN const std::string &data, OUT std::string &result)
{
bool rlt=true;
int pushType=getPushType(data.c_str());
int pushType=getPosOdsPushType(data.c_str());
if(pushType==ODS_PUSH_TYPE_STOCKWARN){ //库存预警
stockWarnObj warn_obj;
......@@ -1796,15 +1809,16 @@ void JsonModule::_getStockWarnObj(IN const char* json,OUT stockWarnObj &warn_obj
}
}
int JsonModule::getPushType(IN const char* data)
int JsonModule::getPosOdsPushType(IN const char* data)
{
rapidjson::Document document; // 定义一个Document对象
document.Parse(data); // 解析,Parse()无返回值,也不会抛异常
if (document.HasParseError()) // 通过HasParseError()来判断解析是否成功
{
LOG(ERROR) << "getPushType JSON parse error:" << document.GetParseError() << ":" << document.GetErrorOffset();
LOG(ERROR) << "getPosOdsPushType JSON parse error:" << document.GetParseError() << ":" << document.GetErrorOffset();
return 0;
}
if(document.HasMember("fm_cmd")){
return document["fm_cmd"].GetInt();
}
......@@ -1878,6 +1892,8 @@ bool JsonModule::_getOrderResponseJson(IN const std::string& posResponse, IN con
writer.Bool(g_pos_ismaster);
writer.Key("plugin_comment");
writer.String(g_plugin_comment.c_str());
writer.Key("plugin_boot_config");
writer.String(g_posPluginBootConfig.c_str());
writer.Key("child_store_id");
writer.String(child_store_id.c_str());
......@@ -2195,9 +2211,66 @@ void JsonModule::setInitData(IN const char* data)
g_pos_ismaster = document["is_master"].GetBool();
}
void JsonModule::saveLocalPosInitData(IN const char* data)
{
std::string strBinPath = GetProcDir();
std::string strFile = strBinPath + "PosRequestInitData.json";
std::ofstream fileStream(strFile.c_str(), std::ios::binary | std::ios::out );
if (!fileStream.is_open()) {
LOG(ERROR) << "saveLocalPosInitData:: Open File: PosRequestInitData.json Failed ";
return;
}
rapidjson::Document doc;
doc.Parse(data);
if (doc.HasParseError()) {
LOG(ERROR) << "saveLocalPosInitData:: Rapidjson Parse Error :" << doc.GetParseError();
return;
}
fileStream.write(data, std::strlen(data) );
fileStream.close();
return;
}
std::string JsonModule::getLocalPosInitData()
{
std::string strBinPath = GetProcDir();
std::string strFile = strBinPath + "PosRequestInitData.json";
std::ifstream fileStream(strFile.c_str(), std::ios::binary | std::ios::in | std::ios::ate );
if (!fileStream.is_open()) {
LOG(INFO) << "getLocalPosInitData:: Open File: PosRequestInitData.json Failed, May Be Not Exist ";
return "";
}
char * buffer = nullptr; long size;
size = fileStream.tellg();
buffer = new char[size + 1];
fileStream.seekg(0, std::ios::beg);
fileStream.read(buffer, size);
buffer[size] = '\0';
std::string posRequestInitData = buffer;
delete[] buffer; buffer = nullptr;
fileStream.close();
rapidjson::Document doc;
doc.Parse(posRequestInitData.c_str());
if (doc.HasParseError()) {
LOG(ERROR) << "getLocalPosInitData:: Rapidjson Parse Error :" << doc.GetParseError();
return "";
}
return posRequestInitData;
}
std::string JsonModule::getTestOrderJson() {
std::string strBinPath = GetProcDir();
std::string strTestOrderFile = strBinPath += "testOrderJson.json";
std::string strTestOrderFile = strBinPath + "testOrderJson.json";
std::ifstream fileStream(strTestOrderFile.c_str(), std::ios::binary | std::ios::in | std::ios::ate );
if (!fileStream.is_open()) {
......@@ -2213,6 +2286,7 @@ std::string JsonModule::getTestOrderJson() {
buffer[size] = '\0';
std::string testOrderData = buffer;
delete[] buffer; buffer = nullptr;
fileStream.close();
rapidjson::Document doc;
doc.Parse( testOrderData.c_str() );
......
......@@ -7,12 +7,15 @@
#include "../base/CommonStruct.h"
#include "../base/BaseDefine.h"
extern std::string g_plugin_version;
extern std::string g_store_id;
extern std::string g_pos_id;
extern bool g_pos_ismaster;
extern std::string g_plugin_comment;
#define VERSION "1.2.8 RC" //版本号;
extern std::string g_posPluginBootConfig;
extern int g_ods_wait_push_order_this_sum;
extern int g_ods_wait_push_order_last_sum;
extern int g_ods_heart_10x_index;
class JsonModule
......@@ -106,7 +109,7 @@ public:
* 参数:[1]待判断数据
* 返回:...
* */
int getPushType(IN const char* data);
int getPosOdsPushType(IN const char* data);
/* 功能:缓存初始化信息,其它接口需要门店信息字段时可以直接拿取
* 参数:[1]初始化json
......@@ -114,6 +117,21 @@ public:
* */
void setInitData(IN const char* data);
/* 功能:保存POS初始化信息到本地文件,为了插件守护程序 重启插件后直接连接ODS做准备;
* 参数:[1]初始化json
* 返回:...
* */
void saveLocalPosInitData(IN const char* data);
/* 功能:通过本地POS初始化文件 读取到应用程序中,为了插件守护程序 重启插件后直接连接ODS做准备;
* 参数:[1]初始化json
* 返回:...
*/
std::string getLocalPosInitData();
/* 功能: Win32 模式 读取 本地测试订单数据
* 参数:
* 返回: testOrderJson.json 文件的报文数据;
......
#include "PosHandle.h"
#include "../3rdParty/rapidjson/rapidjson.h"
#include "../3rdParty/rapidjson/document.h"
#include "../3rdParty/rapidjson/reader.h"
#include "../3rdParty/rapidjson/writer.h"
#include "../3rdParty/rapidjson/stringbuffer.h"
using namespace rapidjson;
PosHandle::PosHandle()
{
}
PosHandle::~PosHandle()
{
}
void PosHandle::logRolloutHandler(const char* filename, std::size_t size)
{
std::cout << "bk filename:" << filename << std::endl;
std::stringstream ss;
ss << "mv " << filename << " " << filename << "_bk";
system(ss.str().c_str());
}
// 线程处理函数:监听 Pos Send Request;
#ifdef WIN32
DWORD PosHandle::listen_pos_func(LPVOID lpParamter) {
#else
void* PosHandle::listen_pos_func(void* arg) {
#endif // WIN32
TCPServer server;
JsonModule jsonTool;
if (server.doListen(client_listen_port)) { // 监控Plugin本地端口: Plugin Liston -> port [24446]
LOG(INFO) << "[PosPlugin] Listen Port: [" << client_listen_port << "] Successful, Recv [POS] Request" << '\n';
} else {
LOG(INFO) << "[PosPlugin] Listen Port: [" << client_listen_port << "] Failed, Maybe Instance Is Exist, Exit!!!";
exit(0);
}
while (true) { // 循环接受 POS 发送 给 Plugin 的业务请求;
TCPClient pos;
std::string posRequestData;
std::string requestOdsData;
std::string responseData;
if (server.accept(pos)) { // Socket-Accept: 监控+接受 POS发送给插件的 长连接请求 Success;
char tmpBuf[FM_BUF_SIZE] = { 0 };
if (pos.read(tmpBuf, sizeof(tmpBuf))) { // Read Socket: Pos Send Login Init Request Success;
posRequestData = tmpBuf;
// 如果为初始化请求则通过长连接socket发送
int reqType = jsonTool.getPosOdsPushType(posRequestData.data());
LOG(INFO) << "Pos ReqType:" << reqType;
if (reqType == REQUEST_TYPE_INIT) {
// POS请求类型:: 登录初始化;
// 如果当前内存标识: 插件已接受到, Pos发送过来的登录请求, 初始化标识完成;将断开 插件与 ODS 的长连接, 等待后面插件再与ODS重新连接
if (bInitDone) {
LOG(INFO) << "Set Timeout 2";
longConnectionOds.setSocketTimeout(2);
longConnectionOds.close(); //关闭长连接,等待重连
}
bInitDone = false;
if (jsonTool.checkInitData(posRequestData, pos_listen_port)) {
g_init_data_ods_back.clear();
LOG(INFO) << "INIT POS ===>> PLUGIN:" << posRequestData.c_str();
jsonTool.convertInitDataPos2Ods(posRequestData, g_init_data);
LOG(INFO) << "INIT PLUGIN ===>> ODS:" << g_init_data.c_str();
jsonTool.setInitData(posRequestData.data()); //把初始化数据暂存起来
jsonTool.saveLocalPosInitData(g_init_data.data()); //把POS初始化数据 转化后的初始化数据 存储到本地文件;
//等待ods返回初始化结果
while (true) {
if (!g_init_data_ods_back.empty()) {
jsonTool.getInitBackData(g_init_data_ods_back.data(), responseData);
LOG(INFO) << "INIT PLUGIN ===>> POS:" << responseData.c_str();
break;
}
else {
LOG(INFO) << "Wait For ODS Response Pos Init Request Data";
os_sleep(3);
}
}
}
else {
jsonTool.getPosResponseData(101, "Invalid Init Data!", responseData);
}
}
else if (reqType == REQUEST_TYPE_POS_PRIORITY) {
// POS请求类型:: 优先级设置;
if (bInitDone) {
//未完成,缺接口
bPriorityDone = false;
LOG(INFO) << "POS Send Priority Data:" << posRequestData.data();
g_set_pos_priority_ods_back.clear();
jsonTool.convertSetPosPriorityDataPos2Ods(posRequestData, g_set_pos_priority);
//等待ods返回初始化结果
while (true) {
if (!g_set_pos_priority_ods_back.empty()) {
jsonTool.getSetPosPriorityBackData(g_set_pos_priority_ods_back.data(), responseData);
break;
} else {
LOG(INFO) << "Wait For ODS Response Pos Priority Request Data";
os_sleep(1);
}
}
}
}
else if (reqType == REQUEST_TYPE_GOODS_CHANGE) {
// POS请求类型:: 商品变更;
std::string posReq = charset_g2u(posRequestData);
LOG(INFO) << "POS send goods change data:" << posReq.data();
jsonTool.getPosResponseData(100, "success", responseData);
}
else if (
OPERATION_POS_CONFIRM == reqType
|| OPERATION_POS_CANCEL == reqType
|| OPERATION_POS_REFUND_AGREE == reqType
|| OPERATION_POS_REFUND_DISAGREE == reqType
|| OPERATION_POS_APPOINTMENT_MAKEING_DONE == reqType
|| OPERATION_POS_APPOINTMENT_DONE == reqType
|| OPERATION_POS_APPOINTMENT_REFUND == reqType
|| OPERATION_POS_APPOINTMENT_CONFIRM == reqType
|| OPERATION_POS_SCANCODE_DONE == reqType
|| OPERATION_POS_SCANCODE_REFUND == reqType
|| REQUEST_TYPE_QUERY_ORDER_STAUS == reqType
|| OPERATION_POS_COFFEE_MAKEING_DONE == reqType
|| OPERATION_POS_COFFEE_DONE == reqType
|| OPERATION_POS_COFFEE_REFUND == reqType)
{
// POS请求类型:: 业务类型;
LOG(INFO) << "REQ POS ===>> PLUGIN:" << posRequestData.data();
// 将POS请求数据转换为中台可接受数据格式
if (jsonTool.convertDataPos2Ods(posRequestData, requestOdsData)) {
LOG(INFO) << "REQ PLUGIN ===>> ODS:" << requestOdsData.data();
// 同步阻塞发送到ODS并等待返回
TCPClient ods;
if (ods.doConnect(ods_recv_port, ods_ip.c_str())) {
ods.setSocketTimeout(30); //设置超时
if (ods.send(requestOdsData)) {
std::string tmp;
if (ods.receive(tmp)) {
LOG(INFO) << "REQ ODS ===>> PLUGIN:" << tmp.data();
if (!jsonTool.getPosResponseData(tmp, posRequestData, responseData)) {
jsonTool.getPosResponseData(101, "receive data from [ODS] invalid!", responseData);
}
}
else {
jsonTool.getPosResponseData(101, "receive data from [ODS] failed!", responseData);
}
}
else {
jsonTool.getPosResponseData(101, "send data to [ODS] failed!", responseData);
}
ods.close();
}
else {
jsonTool.getPosResponseData(101, "connect [ODS] failed!", responseData);
}
}
else {
jsonTool.getPosResponseData(101, "convert data to [ODS] format failed!", responseData);
}
}
else if (-1 == reqType) {
// POS请求类型:: 重启类型; 接收到-1,程序需要重启,关闭本进程
LOG(INFO) << "Recive Exit Msessage, Exit!!!";
pos.write("100");
pos.close();
server.close();
exit(0);
}
else if (-2 == reqType) {
// POS请求类型:: 检查Plugin是否存在; 接收到 -2 ,表示 检查插件监听的端口是否正常;
LOG(INFO) << "Recive Check Plugin Port Is OK !!!";
responseData = "200";
}
// TODO待加入重试机制
LOG(INFO) << "PLUGIN ===>>POS:" << responseData.data();
pos.write(responseData.c_str());
pos.close();
if (reqType == REQUEST_TYPE_INIT) {
bInitDone = true; //初始化完成,可以接收订单
}
if (reqType == REQUEST_TYPE_POS_PRIORITY) {
bPriorityDone = true; //设置pos优先级完成: 应该是设置 POS 为 master POS For Recv Order : Print Tickit Or Checkout;
}
LOG(INFO) << "PLUGIN ===>>POS: Send End";
} // Read Socket: Pos Send Login Init Request Failed;
else { // 读取 Pos 发送过来的请求数据 Failed;
/****** X86 & DEBUG 模式,才支持插件脱离POS独立运行 ******/
#ifdef WIN32
LOG(INFO) << " Windows X86 模式 无需等待 POS 推送初始化数据";
#else
if (!bInitDone) {
LOG(INFO) << "Recv Pos Init Request Data Timeout, Wait Pos Next Connect PosPlugin";
}
#endif
}
} else { // Socket-Accept: 监控+接受 POS发送给插件的 长连接动态 Failed;
LOG(INFO) << "Accept Pos Connect Failed --------> ";
}
}
#ifdef WIN32
return 0;
#endif // WIN32
}
void PosHandle::getIpByDns(IN std::string &ods_ip) {
if (ods_ip.front() < '0' || ods_ip.front() > '9') {
LOG(INFO) << "DNS Resolve IP Begin , Origin ods_ip: " << ods_ip.c_str();
struct hostent* pHost = NULL;
bool bGetDnsRet = false;
// 需考虑POS机断网后,域名解析会频繁失败,频繁失败,插件有退出程序的可能,故在此进行类似死循环处理,插件永不主动退出;
for (int n = 0; n < n + 3; n++) {
pHost = gethostbyname(ods_ip.c_str());
if (!pHost) {
LOG(INFO) << "DNS Resolve Failed: ODS Domain Name: " << n << " " << ods_ip.c_str();
LOG(ERROR) << "DNS Resolve Failed: I Don't Know What To Do, Sleep 20 Seconds First ";
os_sleep(20);
} else {
bGetDnsRet = true;
break;
}
}
if (bGetDnsRet) {
int i = 0;
if (pHost->h_addrtype == AF_INET) {
char str[32];
char **pptr;
pptr = pHost->h_addr_list;
#ifdef WIN32
// ods_ip = *(ULONG*)pHost->h_addr; // 转换失败;
// ods_ip = inet_ntoa(*(in_addr*)*pHost->h_addr_list); // 转换成功
ods_ip = inet_ntoa(*(in_addr*)pHost->h_addr); // 转换首个IP成功
#else
for (; *pptr != NULL; pptr++) {
inet_ntop(pHost->h_addrtype, *pptr, str, sizeof(str));
LOG(INFO) << "DNS Resolve IP List: " << str;
}
inet_ntop(pHost->h_addrtype, pHost->h_addr, str, sizeof(str));
ods_ip = str;
#endif
}
} else {
LOG(ERROR) << "DNS Resolve Failed: I Will Exit, Bye-Bye";
exit(0);
}
LOG(INFO) << "DNS Resolve Sucessful ODS_IP: " << ods_ip.c_str();
} else {
LOG(INFO) << "No Need DNS Resolve ODS_IP: " << ods_ip.c_str();
}
}
bool PosHandle::order_send_to_pos(IN std::string &order_json, IN std::string &ods_json, OUT std::string &back_json) {
bool rlt = true;
TCPClient pos;
JsonModule jsonTool;
#ifdef WIN32
// Simulation Pos Recived New Order Data;
std::string tmp = "{\"fm_cmd\": 1001,\"status_code\": 100, \"msg\": \"Simulation Push Pos Sucessful\"}";
#else
std::string tmp = "{\"status_code\": 102, \"msg\": \"connect pos failed!\"}";
if (pos.doConnect(pos_listen_port, pos_ip.c_str())) {
pos.setSocketTimeout(60); //设置超时
if (pos.write(order_json.c_str())) {
char tmpBuf[1024 * 10] = { 0 };
if (pos.read(tmpBuf, sizeof(tmpBuf))) {
LOG(INFO) << "POS ===>> PLUGIN:" << tmpBuf;
tmp = tmpBuf;
//pos发送过来的数据为gb2312编码,需要转换为utf8
} else {
LOG(INFO) << "receive data from pos failed";
rlt = false;
}
} else {
LOG(INFO) << "send data to pos failed";
rlt = false;
}
pos.close();
} else {
LOG(INFO) << "connect pos failed,pos_listen_port:" << pos_listen_port << " pos_ip:" << pos_ip;
rlt = false;
}
#endif
jsonTool.getOdsResponseData(tmp, order_json, ods_json, back_json);
return rlt;
}
void PosHandle::attach_plugin_reloginReason_bootConfig(IN OUT std::string &init_data, IN std::string relogin_reason, IN std::string boot_config) {
if ( init_data.empty() ) {
LOG(INFO) << "attach_plugin_relogin_reason:: g_init_data empty input";
return;
}
rapidjson::Document document;
document.Parse(init_data.c_str());
if (document.HasParseError())
{
LOG(INFO) << "attach_plugin_relogin_reason:: g_init_data JSON parse error: " << document.GetParseError() << ":" << document.GetErrorOffset();
return;
}
std::string json_key1 = "";
std::string json_value1 = "";
// 插件登录报文中不存在 plugin_relogin_reason 则添加, 已存在则更新;
if ( document.HasMember("plugin_relogin_reason") )
{
document.RemoveMember("plugin_relogin_reason");
}
if ( relogin_reason.length() )
{
json_key1 = "plugin_relogin_reason";
json_value1 = relogin_reason.data();
document.AddMember(rapidjson::StringRef(json_key1.data() ), rapidjson::StringRef( json_value1.data() ), document.GetAllocator());
}
// 插件登录报文中不存在 plugin_boot_config 则添加; 已存在则更新;
if (document.HasMember("plugin_boot_config"))
{
document.RemoveMember("plugin_boot_config" );
}
std::string json_key2 = "plugin_boot_config";
std::string json_value2 = boot_config.data();
document.AddMember(rapidjson::StringRef(json_key2.data()), rapidjson::StringRef(json_value2.data()), document.GetAllocator());
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
document.Accept(writer);
init_data = std::string(buffer.GetString());
return;
}
bool PosHandle::check_plugin_already_exist() {
TCPClient simulatorPos;
std::string tmp = "{\"fm_cmd\": -2}";
bool checkExistRet = false;
if (simulatorPos.doConnect(client_listen_port, pos_ip.c_str())) {
LOG(INFO) << "check_plugin_already_exist:: Check PosPlugin Listen Port Ok , Port: " << client_listen_port;
if (simulatorPos.write(tmp.c_str())) {
char tmpBuf[100] = { 0 };
if (simulatorPos.read(tmpBuf, sizeof(tmpBuf))) {
LOG(INFO) << "check_plugin_already_exist:: Check PosPlugin Listen Port Write & Read OK, tmpBuff: " << tmpBuf;
checkExistRet = true;
}
else {
LOG(INFO) << "check_plugin_already_exist:: Check PosPlugin Listen Port Write Ok & Read Failed, tmpBuff: " << tmp;
checkExistRet = true;
}
}
else {
LOG(INFO) << "check_plugin_already_exist:: Check PosPlugin Listen Port Write Failed, tmpBuff: " << tmp;
checkExistRet = false;
}
simulatorPos.close();
}
return checkExistRet;
}
bool PosHandle::check_pos_already_exist() {
TCPClient simulatorPlugin;
// std::string tmp = "{\"fm_cmd\": -3}";
bool checkExistRet = false;
if (simulatorPlugin.doConnect(pos_listen_port, pos_ip.c_str())) {
LOG(INFO) << "check_pos_already_exist:: Check Pos Listen Port Ok , Port: " << pos_listen_port;
checkExistRet = true;
/*
if (simulatorPlugin.write(tmp.c_str())) {
char tmpBuf[100] = { 0 };
if (simulatorPlugin.read(tmpBuf, sizeof(tmpBuf))) {
LOG(INFO) << "check_pos_already_exist:: Check Pos Listen Port Write & Read OK, tmpBuff: " << tmpBuf;
checkExistRet = true;
}
else {
LOG(INFO) << "check_pos_already_exist:: Check Pos Listen Port Write Ok & Read Failed, tmpBuff: " << tmp;
checkExistRet = true;
}
}
else {
LOG(INFO) << "check_pos_already_exist:: Check Pos Listen Port Write Failed, tmpBuff: " << tmp;
checkExistRet = false;
}
*/
simulatorPlugin.close();
}
return checkExistRet;
}
std::string PosHandle::check_boot_app_enable_str(std::string filename, std::string appName ) {
std::string enableRetList = std::string();
std::string lineBuff = std::string();
std::string validBuff = std::string();
std::ifstream fileStream(filename.c_str(), std::ifstream::in);
// 1、判断POS开机启动脚本文件是否存在;
if (fileStream.is_open()) {
while (fileStream.good())
{
lineBuff = ""; // 每行读取前需 清空Buff;
std::getline(fileStream, lineBuff);
if ( lineBuff.find(appName) != std::string::npos ) {
validBuff = replace_all(lineBuff,"'"," "); // 过滤特殊字符 ' 防止出现 json 截断;
validBuff = replace_all(validBuff, "\"", " "); // 过滤特殊字符 " 防止出现 json 截断;
enableRetList.append(validBuff);
enableRetList.append(" |** Split Line **| ");
}
}
}
else {
enableRetList.append(" AppBootStriptFile Not Exist: ");
LOG(INFO) << "AppBootStriptFile Not Exist: " << filename.c_str();
}
return enableRetList;
}
//实现逻辑:向本地socket发送一个命令,程序接到命令后自杀,如果没有自杀成功,根据名称杀掉
void PosHandle::kill_origin_process()
{
std::string execName; //可执行文件名
#ifdef WIN32
#ifdef _DEBUG
execName = "takeaway_d.exe";
#else
execName = "takeaway.exe";
#endif // _DEBUG
#else
execName = "takeaway";
#endif
bool rlt = true;
TCPClient pos;
JsonModule jsonTool;
std::string tmp = "{\"fm_cmd\": -1}";
if (pos.doConnect(client_listen_port, pos_ip.c_str())) {
if (pos.write(tmp.c_str())) {
char tmpBuf[100] = { 0 };
if (pos.read(tmpBuf, sizeof(tmpBuf))) {
LOG(INFO) << "Kill Back:" << tmpBuf;
if (strcmp(tmpBuf, "100") == 0) {
os_sleep(2); //成功杀掉进程,延时2s等待释放端口资源
} else
rlt = false;
} else
rlt = false;
} else
rlt = false;
pos.close();
}
if (!rlt) { // 如果关闭失败,根据进程名称杀掉进程
kill_process_by_name(execName.c_str());
}
}
// 使用此程序模拟POS给插件发送初始化请求
#ifdef WIN32
bool PosHandle::simulator_pos_send_init() {
JsonModule jsonTool;
bool sendPluginRet = FALSE;
LOG(INFO) << "Simulator Pos Send Login Request ===>> PosPlugin " << g_simulator_pos_req_data.data();
LOG(INFO) << "Simulator Pos Connect pos_ip: " << pos_ip.c_str() << " Port:" << client_listen_port;
if (g_simulator_pos_client.doConnect(client_listen_port, pos_ip.c_str())) {
g_simulator_pos_client.setSocketTimeout(5); //设置超时
if (g_simulator_pos_client.write(g_simulator_pos_req_data.data())) {
//等待POS-Plugin返回初始化结果
char tmpBuf[FM_BUF_SIZE] = { 0 };
g_simulator_pos_client.read(tmpBuf, sizeof(tmpBuf));
g_simulator_pos_resp_data = tmpBuf;
// 解析报文状态码是否 100; 100 标识成功;
rapidjson::Document document; // 定义一个Document对象
document.Parse(g_simulator_pos_resp_data.c_str()); // 解析,Parse()无返回值,也不会抛异常
if (document.HasParseError()) // 通过HasParseError()来判断解析是否成功
LOG(INFO) << "convertInitDataPos2Ods JSON parse error:" << document.GetParseError() << ":" << document.GetErrorOffset();
else if (100 == document["fm_cmd"].GetInt()) {
sendPluginRet = TRUE;
}
}
else {
jsonTool.getPosResponseData(101, "Pos Send Login Data to [PosPlugin] failed!", g_simulator_pos_req_data);
}
}
else {
jsonTool.getPosResponseData(101, "Pos Connect [PosPlugin] failed!", g_simulator_pos_req_data);
}
// TODO待加入重试机制
LOG(INFO) << "Simulator Pos Send Login Request , Get Response: " << g_simulator_pos_resp_data.data();
return sendPluginRet;
}
#endif
#ifndef POS_HANDLE_H
#define POS_HANDLE_H
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <iostream>
#include <fstream>
#include "SocketModule.h"
#include "JsonModule.h"
#include "../utility/utility.h"
#include "../3rdParty/easylogging/easylogging++.h"
#ifdef WIN32
#include <Windows.h>
#else
#include <pthread.h>
#include <unistd.h>
#include <netdb.h>
#include <sys/socket.h>
#endif
extern std::string ods_ip;
extern std::string pos_ip;
extern int ods_push_port;
extern int ods_recv_port;
extern int client_listen_port;
extern int pos_listen_port;
extern bool bInitDone;
extern std::string g_init_data;
extern std::string g_init_data_ods_back;
extern std::string g_local_pos_init_data; // 上一次 POS 初始化请求报文 本地化数据;
extern std::string g_set_pos_priority; // 设置pos优先级
extern std::string g_set_pos_priority_ods_back;
extern bool bPriorityDone;
extern TCPClient longConnectionOds;
#ifdef WIN32
extern std::string g_simulator_role;
extern std::string g_simulator_pos_req_data;
extern std::string g_simulator_pos_resp_data;
extern std::string g_plugin_auto_login_init;
extern TCPClient g_simulator_pos_client;
#endif
class PosHandle
{
public:
static PosHandle* getInstance() {
return &handle;
}
static void logRolloutHandler(const char* filename, std::size_t size);
static void getIpByDns(IN std::string &ods_ip);
// 线程处理函数:监听 Pos Send Request;
#ifdef WIN32
static bool PosHandle::simulator_pos_send_init();
static DWORD WINAPI listen_pos_func(LPVOID lpParamter);
#else
static void* listen_pos_func(void* arg);
#endif // WIN32
static bool check_plugin_already_exist();
static bool check_pos_already_exist();
static std::string check_boot_app_enable_str(std::string fileName, std::string appName );
static void kill_origin_process();
//函数,订单信息发送给pos
static bool order_send_to_pos(IN std::string &order_json, IN std::string &ods_json, OUT std::string &back_json);
static void attach_plugin_reloginReason_bootConfig(IN OUT std::string &init_data, IN std::string relogin_reason, IN std::string boot_config);
static std::string& replace_all(std::string& str, const std::string& old_value, const std::string& new_value)
{
while (true) {
std::string::size_type pos(0);
if ((pos = str.find(old_value)) != std::string::npos)
str.replace(pos, old_value.length(), new_value);
else
break;
}
return str;
}
static std::string& replace_all_distinct(std::string& str, const std::string& old_value, const std::string& new_value)
{
for (std::string::size_type pos(0); pos != std::string::npos; pos += new_value.length()) {
if ( ( pos = str.find(old_value, pos) ) != std::string::npos )
str.replace(pos, old_value.length(), new_value);
else
break;
}
return str;
}
private:
static PosHandle handle;
PosHandle();
~PosHandle();
PosHandle(const PosHandle&) {}
};
#endif
\ No newline at end of file
......@@ -2,6 +2,7 @@
#include <fstream>
#include "JsonModule.h"
#include "SocketModule.h"
#include "PosHandle.h"
#include "../utility/utility.h"
#include "../utility/zini.h"
......@@ -19,6 +20,8 @@
#include "../3rdParty/easylogging/easylogging++.h"
INITIALIZE_EASYLOGGINGPP
PosHandle PosHandle::handle;
/******Windows X86 模式,才支持插件脱离POS独立运行 ******/
#ifdef WIN32
std::string pos_config_storeid;
......@@ -26,15 +29,18 @@ int pos_config_posid;
std::string pos_config_ismaster;
std::string g_simulator_role;
bool simulator_pos_send_init_ret = false;
std::string g_simulator_pos_req_data;
std::string g_simulator_pos_resp_data;
std::string g_plugin_auto_login_init;
TCPClient g_simulator_pos_client;
#endif
std::string g_plugin_version = "1.3.1 RC"; //插件版本号;
std::string g_init_data;
std::string g_init_data_ods_back;
std::string g_set_pos_priority; //设置pos优先级
std::string g_local_pos_init_data; // 上一次 POS 初始化请求报文 本地化数据;
std::string g_set_pos_priority; // 设置pos优先级
std::string g_set_pos_priority_ods_back;
std::string ods_ip;
......@@ -43,273 +49,43 @@ int ods_push_port;
int ods_recv_port;
int client_listen_port;
int pos_listen_port;
std::string g_posPluginBootConfig = ""; // 进程运行期间存储全局: PosPlugin 在启动项中的配置信息;
std::string g_store_id; //进程运行期间存储全局 Store 编号;
std::string g_pos_id; //进程运行期间存储全局 Pos 编号;
bool g_pos_ismaster; //进程运行期间存储全局 是否主POS 编号;
std::string g_plugin_comment; //插件的备注功能: 目前用来存储终端 使用 域名/IP 连接ODS;
int g_ods_heart_10x_index = 0; // 400s 10次心跳间隔检查一次 (长连接是否逻辑无效);
bool g_ods_pushed_10x_term = false; // 400s 10次心跳间隔检查 期间,是否存在ODS推送记录;
int g_ods_wait_push_order_this_sum = 0; // 这次心跳时 待推送订单数;
int g_ods_wait_push_order_last_sum = 0; // 上一次心跳时 待推送订单数;
std::string g_plugin_relogin_ods_reason; // 插件重新登录 ODS 的原因;
extern std::string g_store_id; //进程运行期间存储全局 Store 编号;
extern std::string g_pos_id; //进程运行期间存储全局 Pos 编号;
extern bool g_pos_ismaster; //进程运行期间存储全局 是否主POS 编号;
extern std::string g_plugin_comment; //插件的备注功能: 目前用来存储终端 使用 域名/IP 连接ODS;
/****** Windows X86 模式,才支持插件脱离POS独立运行 ******/
#ifdef WIN32
bool bInitDone = TRUE; //初始化完成
bool bInitDone = false; //初始化完成
std::string g_pluginAppName = "takeaway_d";
std::string g_appBootStriptFile = "E:\\zouwuyang\\FamilyMart\\PosPluginClient\\familyMart_takeaway\\bin\\run_cream.sh";
#else
bool bInitDone = false; //初始化完成
std::string g_pluginAppName = "takeaway";
std::string g_appBootStriptFile = "/opt/pos/bin/run_cream.sh";
#endif
bool bPriorityDone=true; //设置POS优先级
bool bPriorityDone = true; //设置POS优先级
TCPClient longConnectionOds; //长连接tcp对象
//函数,订单信息发送给pos
bool order_send_to_pos(IN std::string &order_json,IN std::string &ods_json,OUT std::string &back_json);
void getIpByDns(IN std::string &ods_ip);
//但进程运行,杀掉已有进程
void kill_origin_process();
void logRolloutHandler(const char* filename, std::size_t size)
{
std::cout<<"bk filename:"<<filename<<std::endl;
std::stringstream ss;
ss << "mv " << filename << " "<<filename<<"_bk";
system(ss.str().c_str());
}
#ifdef WIN32
DWORD WINAPI listen_pos_func(LPVOID lpParamter)
#else
void* listen_pos_func(void* arg)
#endif // WIN32
{
TCPServer server;
JsonModule jsonTool;
if( server.doListen(client_listen_port) ) { // 监控Plugin本地端口: SYS -> port [24446]
LOG(INFO) << "[PosPlugin] Listen Port: [" << client_listen_port << "] successful, Recv [POS] Request" << '\n';
} else {
LOG(INFO) << "[PosPlugin] Listen Port: [" << client_listen_port << "] Failed, Maybe Instance Is Exist, Exit!!!";
exit(0);
}
while(true) { // 循环接受 POS 发送 给 Plugin 的业务请求;
TCPClient pos;
std::string posRequestData;
std::string requestOdsData;
std::string responseData;
if( server.accept(pos) ) { // Socket-Accept: 监控+接受 POS发送给插件的 长连接请求 Success;
char tmpBuf[FM_BUF_SIZE] = {0};
if( pos.read(tmpBuf, sizeof(tmpBuf)) ) { // Read Socket: Pos Send Login Init Request Success;
posRequestData = tmpBuf;
// 如果为初始化请求则通过长连接socket发送
int reqType=jsonTool.getPushType(posRequestData.data());
LOG(INFO)<<"Pos ReqType:"<<reqType;
if(reqType==REQUEST_TYPE_INIT) {
// POS请求类型:: 登录初始化;
// 如果当前内存标识: 插件已接受到, Pos发送过来的登录请求, 初始化标识完成;将断开 插件与 ODS 的长连接, 等待后面插件再与ODS重新连接
if (bInitDone) {
LOG(INFO) << "set timeout 1";
longConnectionOds.setSocketTimeout(1);
longConnectionOds.close(); //关闭长连接,等待重连
}
bInitDone=false;
if( jsonTool.checkInitData(posRequestData, pos_listen_port) ) {
g_init_data_ods_back.clear();
LOG(INFO) <<"INIT POS ===>> PLUGIN:"<<posRequestData.c_str();
jsonTool.convertInitDataPos2Ods(posRequestData,g_init_data);
LOG(INFO) <<"INIT PLUGIN ===>> ODS:"<<g_init_data.c_str();
//等待ods返回初始化结果
while(true) {
if(!g_init_data_ods_back.empty()){
jsonTool.getInitBackData(g_init_data_ods_back.data(),responseData);
LOG(INFO) <<"INIT PLUGIN ===>> POS:"<<responseData.c_str();
break;
} else {
LOG(INFO)<<"wait for ods init back data";
os_sleep(3);
}
}
jsonTool.setInitData(posRequestData.data()); //把初始化数据暂存起来
} else {
jsonTool.getPosResponseData(101, "invalid initdata!", responseData);
}
}
else if(reqType==REQUEST_TYPE_POS_PRIORITY) {
// POS请求类型:: 优先级设置;
if(bInitDone){
//未完成,缺接口
bPriorityDone=false;
LOG(INFO)<<"POS send priority data:"<<posRequestData.data();
g_set_pos_priority_ods_back.clear();
jsonTool.convertSetPosPriorityDataPos2Ods(posRequestData,g_set_pos_priority);
//等待ods返回初始化结果
while(true) {
if ( !g_set_pos_priority_ods_back.empty() ) {
jsonTool.getSetPosPriorityBackData(g_set_pos_priority_ods_back.data(),responseData);
break;
} else {
LOG(INFO)<<"wait for ods priority back data";
os_sleep(1);
}
}
}
} else if ( reqType==REQUEST_TYPE_GOODS_CHANGE) {
// POS请求类型:: 商品变更;
std::string posReq = charset_g2u(posRequestData);
LOG(INFO)<<"POS send goods change data:"<<posReq.data();
jsonTool.getPosResponseData(100, "success", responseData);
}
else if (
OPERATION_POS_CONFIRM == reqType
|| OPERATION_POS_CANCEL == reqType
|| OPERATION_POS_REFUND_AGREE == reqType
|| OPERATION_POS_REFUND_DISAGREE == reqType
|| OPERATION_POS_APPOINTMENT_MAKEING_DONE == reqType
|| OPERATION_POS_APPOINTMENT_DONE == reqType
|| OPERATION_POS_APPOINTMENT_REFUND == reqType
|| OPERATION_POS_APPOINTMENT_CONFIRM == reqType
|| OPERATION_POS_SCANCODE_DONE == reqType
|| OPERATION_POS_SCANCODE_REFUND == reqType
|| REQUEST_TYPE_QUERY_ORDER_STAUS == reqType
|| OPERATION_POS_COFFEE_MAKEING_DONE == reqType
|| OPERATION_POS_COFFEE_DONE == reqType
|| OPERATION_POS_COFFEE_REFUND == reqType)
{
// POS请求类型:: 业务类型;
LOG(INFO)<<"REQ POS ===>> PLUGIN:"<<posRequestData.data();
// 将POS请求数据转换为中台可接受数据格式
if( jsonTool.convertDataPos2Ods(posRequestData, requestOdsData) ) {
LOG(INFO)<<"REQ PLUGIN ===>> ODS:"<<requestOdsData.data();
// 同步阻塞发送到ODS并等待返回
TCPClient ods;
if( ods.doConnect(ods_recv_port, ods_ip.c_str()) ) {
ods.setSocketTimeout(30); //设置超时
if( ods.send(requestOdsData) ) {
std::string tmp;
if( ods.receive(tmp) ) {
LOG(INFO)<<"REQ ODS ===>> PLUGIN:"<<tmp.data();
if (!jsonTool.getPosResponseData(tmp, posRequestData, responseData)) {
jsonTool.getPosResponseData(101, "receive data from [ODS] invalid!", responseData);
}
} else {
jsonTool.getPosResponseData(101, "receive data from [ODS] failed!", responseData);
}
} else {
jsonTool.getPosResponseData(101, "send data to [ODS] failed!", responseData);
}
ods.close();
} else {
jsonTool.getPosResponseData(101, "connect [ODS] failed!", responseData);
}
} else {
jsonTool.getPosResponseData(101, "convert data to [ODS] format failed!", responseData);
}
}
else if (-1 == reqType) {
// POS请求类型:: 重启类型;
//接收到-1,程序需要重启,关闭本进程
LOG(INFO) << "recive exit msessage,exit!!!";
pos.write("100");
pos.close();
server.close();
exit(0);
}
// TODO待加入重试机制
LOG(INFO) << "PLUGIN ===>>POS:"<<responseData.data();
pos.write(responseData.c_str());
pos.close();
if(reqType==REQUEST_TYPE_INIT){
bInitDone=true; //初始化完成,可以接收订单
}
if(reqType==REQUEST_TYPE_POS_PRIORITY){
bPriorityDone=true; //设置pos优先级完成
}
LOG(INFO)<<"PLUGIN ===>>POS: Send End";
} // Read Socket: Pos Send Login Init Request Failed;
else { // 读取 Pos 发送过来的请求数据 Failed;
/****** X86 & DEBUG 模式,才支持插件脱离POS独立运行 ******/
#ifdef WIN32
LOG(INFO) << " Windows X86 模式 无需等待 POS 推送初始化数据";
#else
LOG(INFO) << "recv pos pushData failed";
#endif
}
} else { // Socket-Accept: 监控+接受 POS发送给插件的 长连接动态 Failed;
LOG(INFO) << "Accept Pos Connect Failed --------> ";
}
}
#ifdef WIN32
return 0;
#endif // WIN32
}
#ifdef WIN32
void simulator_pos_send_init() {
JsonModule jsonTool;
TCPClient simulator_pos_client;
LOG(INFO) << " Simulator Pos Send Login Request ===>> PosPlugin " << g_simulator_pos_req_data.data();
LOG(INFO) << " Simulator Pos Connect pos_ip: " << pos_ip.c_str() << " Port:" << client_listen_port;
if (simulator_pos_client.doConnect(client_listen_port, pos_ip.c_str())) {
simulator_pos_client.setSocketTimeout(360); //设置超时
if (simulator_pos_client.write( g_simulator_pos_req_data.data() ) ) {
//等待POS-Plugin返回初始化结果
char tmpBuf[FM_BUF_SIZE] = { 0 };
simulator_pos_send_init_ret = simulator_pos_client.read(tmpBuf, sizeof(tmpBuf) );
g_simulator_pos_resp_data = tmpBuf;
}
else {
jsonTool.getPosResponseData(101, "Pos Send Login Data to [PosPlugin] failed!", g_simulator_pos_req_data);
}
simulator_pos_client.close();
}
else {
jsonTool.getPosResponseData(101, "Pos Connect [PosPlugin] failed!", g_simulator_pos_req_data);
}
// TODO待加入重试机制
LOG(INFO) << "Simulator Pos Send Login Request , Get Response: " << g_simulator_pos_resp_data.data();
}
#endif
int main(int argc,char *argv[])
{
if (2 == argc && 0 == strcmp("-v", argv[1])) {
printf("version:%s\n", VERSION);
return 0;
}
if (2 == argc && 0 == strcmp("-v", argv[1])) { return 0; }
#ifdef WIN32
WSADATA transData;
WSAStartup(MAKEWORD(2, 2), &transData);
/****** Windows X86 模式,才支持插件脱离POS独立运行 ******/
//ShowWindow(GetConsoleWindow(), SW_HIDE); //隐藏窗口,模拟很多程序以 后台方式 运行
JsonModule jsonToolTemp;
bool bCheckInitData = jsonToolTemp.isInitData(g_init_data);
LOG(INFO) << "Check g_init_data is OK ?: " << bCheckInitData;
#else
signal(SIGPIPE, SIG_IGN);
#endif
......@@ -323,18 +99,34 @@ int main(int argc,char *argv[])
/// 设置全部logger的配置
el::Loggers::reconfigureAllLoggers(conf);
/// 注册回调函数
el::Helpers::installPreRollOutCallback(logRolloutHandler);
el::Helpers::installPreRollOutCallback( PosHandle::logRolloutHandler );
JsonModule jsonToolTemp;
LOG(INFO) << "---------software start---------";
LOG(INFO) << "version:"<< VERSION ;
LOG(INFO) << "version:"<< g_plugin_version;
// 读取配置文件信息
std::string strIniPath(strBinPath.data());
strIniPath.append("config.ini");
LOG(INFO) << "strIniPath: " << strIniPath.data();
if ( PosHandle::check_plugin_already_exist() ) {
LOG(INFO) << "Pos Plugin Had Been Running, Can't Restart Double Pos Plugin ";
exit(0);
}
// 获取 PosPlugin 在开机启动项中的配置;
g_posPluginBootConfig = PosHandle::check_boot_app_enable_str(g_appBootStriptFile, g_pluginAppName);
// 启动另一个进程运行守护程序;
create_process_daemon();
// PosHandle::kill_origin_process(); //如果存在已有进程,杀掉(有存在自杀的风险)
g_local_pos_init_data = jsonToolTemp.getLocalPosInitData(); // 获取上一次插件运行时,接受POS发送给插件并转化完成的初始化请求;
ods_ip = ZIni::readString("ODS","ip", "", strIniPath.c_str());
g_plugin_comment = ods_ip;
getIpByDns(ods_ip);
PosHandle::getIpByDns(ods_ip);
ods_push_port = ZIni::readInt("ODS", "pushPort", 0, strIniPath.c_str());
ods_recv_port = ZIni::readInt("ODS", "recvPort", 0, strIniPath.c_str());
int ods_socket_timeout = ZIni::readInt("ODS", "socketTimeout", 0, strIniPath.c_str());
......@@ -356,24 +148,24 @@ int main(int argc,char *argv[])
if ( !g_plugin_auto_login_init.compare("false") ) {
bInitDone = false; //不自动登录,需要初始化 标识为 = false ,方便后续模拟测试;
g_init_data.clear();
}
else {
} else {
g_init_data = "{ \"fm_cmd\":1000,\"storeId\":\"" + pos_config_storeid + "\",\"pos_id\":\"" + std::to_string(pos_config_posid)
+ "\",\"operator_id\":\"zouwuyang\",\"business_date\":\"20220401\",\"is_master\":" + pos_config_ismaster + " ,\"plugin_comment\":\"" + g_plugin_comment +
+ "\",\"version\":\"" + VERSION + "\",\"listen_port\":24445 }";
+ "\",\"operator_id\":\"zouwuyang\",\"business_date\":\"20220615\",\"is_master\":" + pos_config_ismaster + " ,\"plugin_comment\":\"" + g_plugin_comment +
+ "\",\"version\":\"" + g_plugin_version + "\",\"listen_port\":24445 }";
}
LOG(INFO) << "[ODS]ip: " << ods_ip.data() << "-push port: " << ods_push_port << "-recv port: " << ods_recv_port << "-socket timeout: " << ods_socket_timeout;
LOG(INFO) << "[POS]ip:" << pos_ip << "-listen port:" << pos_listen_port << " storeId:" << pos_config_storeid << " posId:" << pos_config_posid << " is_master:" << pos_config_ismaster;
LOG(INFO) << "[PosPlugin] Listen Port: " << client_listen_port;
LOG(INFO) << "[PosPlugin] Comment: " << g_plugin_comment;
LOG(INFO) << "[POS]ip: " << pos_ip << "-listen port:" << pos_listen_port << " storeId:" << pos_config_storeid << " posId:" << pos_config_posid << " is_master:" << pos_config_ismaster;
#else
LOG(INFO) << "[ODS]ip: " << ods_ip.data() << "-push port: " << ods_push_port << "-recv port: " << ods_recv_port << "-socket timeout: " << ods_socket_timeout;
LOG(INFO) << "[POS]ip: " << pos_ip << "-listen port: " << pos_listen_port;
#endif
LOG(INFO) << "[PosPlugin] Listen Port: " << client_listen_port;
LOG(INFO) << "[PosPlugin] Comment: " << g_plugin_comment;
#endif
LOG(INFO) << "[PosPlugin] Local File -> Pos Last Time Init Request: " << g_local_pos_init_data;
LOG(INFO) << "[PosPlugin] PosPlugin Startup Boot Config: " << g_posPluginBootConfig;
#ifdef WIN32
if ( !g_simulator_role.compare("pos") ) {
......@@ -382,47 +174,69 @@ int main(int argc,char *argv[])
pos_config_storeid = ZIni::readString("POS", "storeId", "208888", strIniPath.c_str());
pos_config_posid = ZIni::readInt("POS", "posId", 1, strIniPath.c_str());
pos_config_ismaster = ZIni::readString("POS", "isMaster", "false", strIniPath.c_str());
g_simulator_pos_req_data = "{ \"fm_cmd\":1000, \"store_id\":\"" + pos_config_storeid + "\", \"pos_id\":\"" + std::to_string(pos_config_posid)
+ "\", \"operator_id\":\"zouwuyang\",\"business_date\":\"20210511\",\"is_master\":" + pos_config_ismaster + ",\"version\":\"" + VERSION + "\",\"listen_port\":24445 }";
while (!simulator_pos_send_init_ret) {
simulator_pos_send_init();
os_sleep(5);
}
LOG(INFO) << " simulator_pos_send_init Finished -------------Then Exit";
os_sleep(60);
g_simulator_pos_req_data = "{\"fm_cmd\":1000,\"store_id\":\"" + pos_config_storeid + "\",\"pos_id\":\"" + std::to_string(pos_config_posid)
+ "\",\"operator_id\":\"zouwuyang\",\"business_date\":\"20210511\",\"is_master\":" + pos_config_ismaster + ",\"version\":\"" + g_plugin_version + "\",\"listen_port\":24445}";
LOG(INFO) << g_simulator_pos_req_data;
int retry_send_max_count = 0;
while (! PosHandle::simulator_pos_send_init() && retry_send_max_count < 3 ) {
os_sleep(10);
retry_send_max_count++;
}
g_simulator_pos_client.close();
LOG(INFO) << "simulator_pos_send_init Finished,After 600s";
TCPServer simulatorPosServer;
if (simulatorPosServer.doListen ( pos_listen_port ) ) { // 监控Pos端口: Pos Liston -> port [24445]
LOG(INFO) << "[Simulator Pos] Listen Port: [" << pos_listen_port << "] Successful, Recv [Plugin] Request" << '\n';
}
else {
LOG(INFO) << "[Simulator Pos] Listen Port: [" << pos_listen_port << "] Failed, Maybe Pos Instance Is Exist, Exit!!!";
exit(0);
}
while (true) {
TCPClient tempPlugin;
if (simulatorPosServer.accept(tempPlugin)) {
char tmpBuf[FM_BUF_SIZE] = { 0 };
if (tempPlugin.read(tmpBuf, sizeof(tmpBuf)))
LOG(INFO) << "Simulator Pos , Recv Plugin Send Msg :" << tmpBuf;
}
}
os_sleep(600);
exit(0);
}
#endif
kill_origin_process(); //如果已有进程,杀掉
#ifdef WIN32
// Test Add New Feature Convert Result: Column Field
std::string testOrderJson = jsonToolTemp.getTestOrderJson();
LOG(INFO) << "----------- Plugin Read Local Test Order File Success ------------" << testOrderJson << '\n';
if (testOrderJson.length()) {
std::string convertTestOrderJson = jsonToolTemp.getConvertOrderJson(testOrderJson);
if (testOrderJson.length() && convertTestOrderJson.length() ) {
if (testOrderJson.length() && convertTestOrderJson.length()) {
LOG(INFO) << "----------- Plugin Read&Convert Local Test Order File Success ------------";
LOG(INFO) << charset_u2g(convertTestOrderJson);
LOG(INFO) << "----------No charset_u2g(*)-------------";
LOG(INFO) << convertTestOrderJson;
} else {
}
else {
LOG(INFO) << "----------- Plugin Read&Convert Local Test Order File Failed------------";
}
}
HANDLE hTakeway = CreateThread(NULL, 0, listen_pos_func, NULL, 0, NULL);
HANDLE hTakeway = CreateThread(NULL, 0, PosHandle::listen_pos_func, NULL, 0, NULL);
#else
// 监听POS请求的线程
pthread_t listen_pos_id;
/*创建 listen_pos 线程*/
if (pthread_create(&listen_pos_id, NULL, listen_pos_func, NULL))
if (pthread_create(&listen_pos_id, NULL, PosHandle::listen_pos_func, NULL))
LOG(INFO) << "create listen_pos thread failed";
#endif // WIN32
//TCPClient ods;
JsonModule jsonTool;
// 启用保存 POS初始化数据的本地文件 计时器;
int enableLocalPosInitReqTimeOutLimit = 0;
while(true) {
std::string odsPushData;
......@@ -434,11 +248,11 @@ int main(int argc,char *argv[])
if( !longConnectionOds.isValid() ) {
// 长连接断开重连时, 重新解析 域名映射地址;
ods_ip = ZIni::readString("ODS", "ip", "", strIniPath.c_str());
getIpByDns(ods_ip);
PosHandle::getIpByDns(ods_ip);
// ①插件将重连 ODS : ip + port 成功;
if ( longConnectionOds.doConnect( ods_push_port, ods_ip.c_str() ) ) {
LOG(INFO) << " Connect ODS Successful ";
LOG(INFO) << "Retry Connect ODS Successful ";
longConnectionOds.setSocketTimeout(ods_socket_timeout); //设置超时时间10s
int waitPosInitReqTimeOutCount = 0;
......@@ -446,6 +260,10 @@ int main(int argc,char *argv[])
if(!g_init_data.empty()) {
// ②与ODS建立连接后 发送登录初始化请求报文成功 完成登录; Bug: Send Faild, Will While(1) ;
// 需要手动添加登录时,插件重新登录原因, 或插件开机启动配置;
PosHandle::attach_plugin_reloginReason_bootConfig(g_init_data, g_plugin_relogin_ods_reason, g_posPluginBootConfig);
LOG(INFO) << "Plugin Convert Pos Init Request Final Init : " << g_init_data;
if(longConnectionOds.send(g_init_data) ) {
longConnectionOds.receive(odsPushData);
g_init_data_ods_back=odsPushData;
......@@ -460,10 +278,10 @@ int main(int argc,char *argv[])
int count=0;
while(!bInitDone) {
LOG(INFO)<<"wait for init done";
LOG(INFO)<<"Wait For Init Done";
if( !longConnectionOds.isValid() ){
LOG(INFO)<<"ods is not valid";
LOG(INFO)<<"ODS Is Not Valid";
os_sleep(1); //防止在ods挂掉后狂刷日志
break;
}
......@@ -475,6 +293,8 @@ int main(int argc,char *argv[])
break;
}
}
// Plugin Login Success, Need Clean Relogin Reason;
g_plugin_relogin_ods_reason = "";
break;
}
......@@ -486,10 +306,24 @@ int main(int argc,char *argv[])
}
} else {
LOG(INFO)<<"Haven't receive init data";
waitPosInitReqTimeOutCount++;
enableLocalPosInitReqTimeOutLimit++;
LOG(INFO) << "PosPlugin Haven't Receive Pos Init Tcp Data, Sleep Wait 10s......; " << "waitPosInitReqTimeOutCount: " << waitPosInitReqTimeOutCount
<< " enableLocalPosInitReqTimeOutLimit: " << enableLocalPosInitReqTimeOutLimit;
}
os_sleep(10);
// 每6次等待超时,Plugin会进行1次重连ODS, 长连接 连接ODS会有超时2分钟Block, 故调整插件启动后超时时间为 3 minutes;
// 当插件启动后超过 10s * 6 * 3 = 3 minutes; 若未接受到POS的 TCP 方式发送的初始化请求,则启用本地的初始化数据
if (g_local_pos_init_data.length() && enableLocalPosInitReqTimeOutLimit > 18 ) {
LOG(INFO) << "PosPlugin Haven't Receive Pos Init Tcp Data, Try Enable Local Pos Init Data File";
if (PosHandle::check_pos_already_exist() ) {
LOG(INFO) << "PosPlugin Haven't Receive Pos Init Tcp Data, Check Pos Listen Port Is Ok, Prepare Login ODS...... ";
g_init_data = g_local_pos_init_data;
bInitDone = true;
}
}
if ( waitPosInitReqTimeOutCount < 6 ) {
continue; //Continue / break; 6次超时(60s) 内都继续本循环; 超过6次超时(60s) 将退出本循环,进入上一层循环进行重新连接Ods的 ip+ port;
} else {
......@@ -498,6 +332,11 @@ int main(int argc,char *argv[])
} // socket: Send / Recive To ODS --> End
// 循环登录ODS最终成功,更新初始化数据值;
g_ods_heart_10x_index = 0;
g_ods_wait_push_order_this_sum = 0;
g_ods_wait_push_order_last_sum = 0;
}else {
// ①插件连接 ODS: ip + port 失败,将重连;
LOG(INFO) << "Connect ODS: ip + port Failed";
......@@ -508,12 +347,12 @@ int main(int argc,char *argv[])
// Pos 向 ODS 发送优先级设置请求,并接受结果;
if(!bPriorityDone){
LOG(INFO)<<"ODS set priority send:"<<g_set_pos_priority.data();
LOG(INFO)<<"ODS Set Priority Send:"<<g_set_pos_priority.data();
if (longConnectionOds.send( g_set_pos_priority ) ) {
longConnectionOds.receive(odsPushData);
g_set_pos_priority_ods_back=odsPushData;
LOG(INFO)<<"ODS set priority back:"<<odsPushData.data();
LOG(INFO)<<"ODS Set Priority Response:"<<odsPushData.data();
}
}
......@@ -525,6 +364,7 @@ int main(int argc,char *argv[])
int dataType=jsonTool.isHeartbeatData(odsPushData.data());
if ( 2==dataType ) { //ODS Push Order Data ReqCommand;
g_ods_pushed_10x_term = true;
if( jsonTool.convertDataOds2Pos(odsPushData, pushPosData) ) {
LOG(INFO) << "----------- PLUGIN ===>> POS ------------";
......@@ -534,12 +374,12 @@ int main(int argc,char *argv[])
#ifdef WIN32
LOG(INFO) << "-----------X86 & DEBUG 模式 无需真实推送到POS ------------";
g_store_id = pos_config_storeid; g_pos_id = std::to_string(pos_config_posid); g_pos_ismaster = pos_config_ismaster == "true" ? true : false;
if (!order_send_to_pos(pushPosData, odsPushData, responseData)) {
if (!PosHandle::order_send_to_pos(pushPosData, odsPushData, responseData)) {
jsonTool.getOdsResponseData(101, "send to pos failed or receive data illegal!", odsPushData, responseData);
}
#else
//-----------Production Environment, Need Delivery POS ------------
if (!order_send_to_pos(pushPosData, odsPushData, responseData)) {
if (!PosHandle::order_send_to_pos(pushPosData, odsPushData, responseData)) {
jsonTool.getOdsResponseData(101, "send to pos failed or receive data illegal!", odsPushData, responseData);
}
#endif
......@@ -554,23 +394,45 @@ int main(int argc,char *argv[])
LOG(INFO) << "PLUGIN ===>> ODS:"<<responseData.data();
bool rlt = longConnectionOds.send(responseData);
LOG(INFO)<<"send to ods result:"<<rlt;
LOG(INFO)<<"Plugin Send To ODS Result:"<<rlt;
} else if ( 1==dataType ) { //ODS Send Heartbeat ReqCommand;
if ( !bInitDone ) {
longConnectionOds.setSocketTimeout(1);
longConnectionOds.setValid(false);
LOG(INFO)<<"maybe pos restart,reconnect!!!";
LOG(INFO)<<"Maybe Wait Pos Restart, Reconnect!!!";
g_ods_heart_10x_index = 0;
}
// 400s 十次心跳间隔检查一次,长连接是否逻辑无效;
// ①使用一个标识来标识 此期间,Plugin是否接受到ODS推送过的记录;
// ②如果此期间无推送记录 且 心跳报文中 等待推送订单数量 >= 十次心跳前等待推送订单数量; 则标识此长连接逻辑无效;
// ③如果长连接逻辑无效,则需要将此原因添加到Plugin下次登录请求报文中;
if ( 0 == ( g_ods_heart_10x_index % 10 ) ) {
if (!g_ods_pushed_10x_term && g_ods_wait_push_order_this_sum
&& (g_ods_wait_push_order_this_sum >= g_ods_wait_push_order_last_sum) ) {
longConnectionOds.setSocketTimeout(1);
longConnectionOds.setValid(false);
g_plugin_relogin_ods_reason = "Socket Logic Invalid: Exist Wait Push Order, But Havn't Recv, Plugin setSocket Timeout, Prepare Reconnect ODS !!!";
LOG(INFO) << "Socket Logic Invalid: Exist Wait Push Order, But Havn't Recv, Plugin setSocket Timeout , Prepare Reconnect ODS !!!";
os_sleep(5); // 等待5s后再重连;
}
// 需要重新清空 ODS的推送记录,新任期内 进行重新计数;
g_ods_pushed_10x_term = false;
}
} else if ( 3==dataType ) { //ODS Undefine ReqCommand;
LOG(INFO)<<"illegal data!!";
g_ods_pushed_10x_term = true;
LOG(INFO)<<"ODS Push Illegal Data!!";
}
} else { // Long Connect Recv Failed;
longConnectionOds.setSocketTimeout(1);
longConnectionOds.close();
os_sleep(2); //防止在ods挂掉后狂刷日志
LOG(INFO) << "recv ODS pushData failed or timeout";
LOG(INFO) << "Recv ODS PushData Failed Or Timeout";
}
}
/// 注销回调函数
......@@ -581,150 +443,3 @@ int main(int argc,char *argv[])
return 0;
}
\ No newline at end of file
bool order_send_to_pos(IN std::string &order_json,IN std::string &ods_json,OUT std::string &back_json)
{
bool rlt=true;
TCPClient pos;
JsonModule jsonTool;
#ifdef WIN32
// Simulation Pos Recived New Order Data;
std::string tmp = "{\"fm_cmd\": 1001,\"status_code\": 100, \"msg\": \"Simulation Push Pos Sucessful\"}";
#else
std::string tmp = "{\"status_code\": 102, \"msg\": \"connect pos failed!\"}";
if( pos.doConnect(pos_listen_port, pos_ip.c_str()) ) {
pos.setSocketTimeout(60); //设置超时
if( pos.write(order_json.c_str()) ) {
char tmpBuf[1024*10] = {0};
if( pos.read(tmpBuf,sizeof(tmpBuf)) ) {
LOG(INFO) << "POS ===>> PLUGIN:"<<tmpBuf;
tmp=tmpBuf;
//pos发送过来的数据为gb2312编码,需要转换为utf8
} else {
LOG(INFO) << "receive data from pos failed";
rlt=false;
}
} else {
LOG(INFO) << "send data to pos failed";
rlt=false;
}
pos.close();
}else {
LOG(INFO) << "connect pos failed,pos_listen_port:" << pos_listen_port << " pos_ip:" << pos_ip;
rlt=false;
}
#endif
jsonTool.getOdsResponseData(tmp, order_json,ods_json, back_json);
return rlt;
}
void getIpByDns(IN std::string &ods_ip) {
if (ods_ip.front() < '0' || ods_ip.front() > '9') {
LOG(INFO) << "DNS Resolve IP Begin , Origin ods_ip: " << ods_ip.c_str();
struct hostent* pHost = NULL;
bool bGetDnsRet = false;
// 需考虑POS机断网后,域名解析会频繁失败,频繁失败,插件有退出程序的可能,故在此进行类似死循环处理,插件永不主动退出;
for (int n = 0; n < n + 3 ; n++) {
pHost = gethostbyname(ods_ip.c_str());
if (!pHost) {
LOG(INFO) << "DNS Resolve Failed: ods domain name: " << n << " " << ods_ip.c_str();
LOG(ERROR) << "DNS Resolve Failed: I don't know what to do, sleep 20 seconds first ";
os_sleep(20);
} else {
bGetDnsRet = true;
break;
}
}
if (bGetDnsRet) {
int i = 0;
if (pHost->h_addrtype == AF_INET) {
char str[32];
char **pptr;
pptr = pHost->h_addr_list;
#ifdef WIN32
// ods_ip = *(ULONG*)pHost->h_addr; // 转换失败;
// ods_ip = inet_ntoa(*(in_addr*)*pHost->h_addr_list); // 转换成功
ods_ip = inet_ntoa(*(in_addr*)pHost->h_addr); // 转换首个IP成功
#else
for (; *pptr != NULL; pptr++) {
inet_ntop(pHost->h_addrtype, *pptr, str, sizeof(str));
LOG(INFO) << "DNS Resolve IP List: " << str;
}
inet_ntop(pHost->h_addrtype, pHost->h_addr, str, sizeof(str));
ods_ip = str;
#endif
}
} else {
LOG(ERROR) << "DNS Resolve Failed: I will exit, bye-bye";
exit(0);
}
LOG(INFO) << "DNS Resolve Sucessful ODS_IP: " << ods_ip.c_str();
} else {
LOG(INFO) << "No Need DNS Resolve ODS_IP: " << ods_ip.c_str();
}
}
//实现逻辑:向本地socket发送一个命令,程序接到命令后自杀,如果没有自杀成功,根据名称杀掉
void kill_origin_process()
{
std::string execName; //可执行文件名
#ifdef WIN32
#ifdef _DEBUG
execName = "takeaway_d.exe";
#else
execName = "takeaway.exe";
#endif // _DEBUG
#else
execName = "takeaway";
#endif
bool rlt = true;
TCPClient pos;
JsonModule jsonTool;
std::string tmp = "{\"fm_cmd\": -1}";
if ( pos.doConnect( client_listen_port, pos_ip.c_str() ) ) {
if ( pos.write( tmp.c_str() ) ) {
char tmpBuf[100] = { 0 };
if ( pos.read( tmpBuf, sizeof(tmpBuf) ) ) {
LOG(INFO) << "kill back:" << tmpBuf;
if (strcmp(tmpBuf, "100") == 0) {
os_sleep(2); //成功杀掉进程,延时2s等待释放端口资源
} else
rlt = false;
} else
rlt = false;
} else
rlt = false;
pos.close();
}
if (!rlt) {
//如果关闭失败,根据进程名称杀掉进程
kill_process_by_name(execName.c_str());
}
}
......@@ -110,9 +110,9 @@ TEST_F(jsonModule_ut, convertDataOds2Pos)
EXPECT_TRUE(rlt2);
}
TEST_F(jsonModule_ut, getPushType)
TEST_F(jsonModule_ut, getPosOdsPushType)
{
std::string strOut;
int rlt = jsonTool.getPushType(strPosInit.data());
int rlt = jsonTool.getPosOdsPushType(strPosInit.data());
EXPECT_EQ(rlt,1000);
}
......@@ -305,3 +305,92 @@ void kill_process_by_name(const char *name)
}
#endif
}
// CreateProcess :创建另一个进程运行守护程序;
void create_process_daemon() {
#ifdef WIN32
// 1、判断daemon脚本文件是否存在,
std::string strBinPath = GetProcDir();
std::string strDaemonAppPath(strBinPath.data());
strDaemonAppPath.append("fmPluginDaemon.bat");
WIN32_FIND_DATA wfd;
HANDLE hFind = FindFirstFile(strDaemonAppPath.c_str() , &wfd );
if (INVALID_HANDLE_VALUE == hFind) {
LOG(INFO) << "strDaemonAppPath:" << strDaemonAppPath.c_str() << " No Exist !!!";
return;
}
else {
// 1.2、将daemon脚本文件进行可执行权限 赋权
CloseHandle(hFind); // Windows 忽略 文件的可执行权限;
// 1.3、启动daemon脚本程序到子进程中;
// CreateProcess(NULL, "regedit.exe"
STARTUPINFO si = { sizeof(si) };
PROCESS_INFORMATION pi;
LPTSTR pszCmd = new TCHAR[strDaemonAppPath.length() + 1];
memset(pszCmd, 0, sizeof(TCHAR)*(strDaemonAppPath.length() + 1));
strcpy(pszCmd, strDaemonAppPath.c_str() );
CreateProcess(NULL, pszCmd, NULL, NULL, FALSE, CREATE_NO_WINDOW, NULL, NULL, &si, &pi);
// WaitForSingleObject(pi.hProcess, INFINITE);
// CloseHandle(pi.hProcess);
// system("start regedit.exe"); // Meybe Block Main Process;
LOG(INFO) << "Start Another Process Success , DaemonAppPath:" << strDaemonAppPath.c_str();
}
#else
// 1、判断daemon程序文件是否存在,
// 2、将daemon程序文件进行可执行权限 赋权
std::string strBinPath = GetProcDir();
std::string strDaemonAppPath(strBinPath.data());
strDaemonAppPath.append("fmPluginDaemon");
if (access(strDaemonAppPath.c_str(), X_OK) == 0) {
LOG(INFO) << "create_process_daemon:: DaemonAppPath:" << strDaemonAppPath.c_str() << " Exist And Has Execute Permission ";
} else {
LOG(INFO) << "create_process_daemon:: DaemonAppPath:" << strDaemonAppPath.c_str() << " Non-Exist Or No Execute Permission ";
return;
}
// 3、判断daemon程序是否已经启动,已经启动着无需重复启动;
if ( does_daemon_work() )
LOG(INFO) << "create_process_daemon:: DaemonApp: Is Running No Need Run Another One";
else {
LOG(INFO) << "create_process_daemon:: DaemonApp: Haven't Run, Start DaemonApp Now ";
// 4、启动daemon程序;
system( strDaemonAppPath.c_str() );
}
#endif
}
int does_daemon_work()
{
#ifdef WIN32
LOG(INFO) << "does_daemon_work:: DaemonApp Is Running, No Need Check Status ";
return 0;
#else
FILE* fp;
int count;
char buf[150];
char command[150];
sprintf(command, "%s%s%s", "ps -ef | grep ", "fmPluginDaemon", " | grep -v grep | wc -l");
if ((fp = popen(command, "r")) == NULL) {
LOG(INFO) << "does_daemon_work:: Check DaemonApp Is Running Command Error !!! popen Failed Skip DaemonApp";
// exit(EXIT_FAILURE);
return 100;
}
if ((fgets(buf, 150, fp)) != NULL)
{
count = atoi(buf);
}
pclose(fp);
return count;
#endif
}
\ No newline at end of file
......@@ -28,4 +28,9 @@ void os_sleep(int seconds);
//根据进程名称杀死进程
void kill_process_by_name(const char *name);
// CreateProcess :创建另一个进程运行守护程序;
void create_process_daemon();
int does_daemon_work();
#endif
......@@ -166,10 +166,10 @@ std::string ZIni::readString(std::string strSectName,
{
return strDefault;
}
if (strTemp.length() > strKeyName.length() + 1 &&
0 == strTemp.compare(0,
strKeyName.length(), strKeyName) &&
'=' == strTemp[strKeyName.length()])
if ( strTemp.length() > strKeyName.length() + 1
&& 0 == strTemp.compare(0,strKeyName.length(), strKeyName)
&& '=' == strTemp[strKeyName.length()] )
{
return (strTemp.substr(strKeyName.length() + 1));
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment