Commit 7273005e by unknown

Add 1:配置文件增加字段

Add 2:新增json请求返回的数据包装函数
parent 32f68acc
[SYS]
ip=127.0.0.1
port = 24446
[ODS]
ip = "115.234.23.176"
pushPort = 8996
recvPort = 8997
[POS]
ip = "127.0.0.1"
port = 24445
......@@ -176,6 +176,182 @@ void JsonModule::getPushOrders(IN const char* json,OUT std::vector<orderObj> &ve
}
}
bool JsonModule::isInitData(const std::string &data)
{
rapidjson::Document document; // 定义一个Document对象
document.Parse(json); // 解析,Parse()无返回值,也不会抛异常
if (document.HasParseError()) // 通过HasParseError()来判断解析是否成功
{
LOG(ERROR) << "JSON parse error:" << document.GetParseError() << ":" << document.GetErrorOffset();
return false;
}
return document["fm_cmd"].GetInt() == 1000;
}
bool JsonModule::checkInitData(const std::string &data, int &posListenPort)
{
rapidjson::Document document; // 定义一个Document对象
document.Parse(data); // 解析,Parse()无返回值,也不会抛异常
if (document.HasParseError()) // 通过HasParseError()来判断解析是否成功
{
LOG(ERROR) << "JSON parse error:" << document.GetParseError() << ":" << document.GetErrorOffset();
return false;
}
if( document["store_id"].GetString().empty()
|| document["store_id"].GetString().empty()
|| document["pos_id"].GetString().empty()
|| document["operator_id"].GetString().empty()
|| !document.HasMember("is_master")
|| document["listen_port"].GetInt()< 0 )
{
return false;
}
posListenPort = document["listen_port"].GetInt();
return true;
}
bool JsonModule::getPosResponseData(int status, const std::string &msg, std::string &result)
{
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
writer.StartObject();
writer.Key("status_code");
writer.Int(status);
writer.Key("msg");
writer.String(msg.c_str());
writer.EndObject();
result = buffer.GetString();
return true;
}
bool JsonModule::getPosResponseData(const std::string &odsResponse, std::string &result)
{
result = "{\"status_code\":100, \"msg\":\"\"}";
return true;
}
bool JsonModule::getOdsResponseData(int status_code, const std::string &msg, std::string &result)
{
/*
{
"type" : 2,
"order_id" : "xxxxx",
"status" : 1,
"status_code" : 100,
"msg" : "接收订单成功"
}
*/
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
writer.StartObject();
writer.Key("type");
writer.Int(2);
writer.Key("order_id");
writer.String("");
writer.Key("status");
writer.Int(0);
writer.Key("status_code");
writer.Int(status_code);
writer.Key("msg");
writer.String(msg.c_str());
writer.EndObject();
result = buffer.GetString();
return true;
}
bool JsonModule::getOdsResponseData(const std::string &posResponse, const std::string &orderData, std::string &result)
{
int status_code;
std::string msg;
std::string order_id;
int status;
if( posResponse.empty() || orderData.empty() )
return false;
rapidjson::Document document, document1;
document.Parse(posResponse);
document1.Parse(orderData);
if (document.HasParseError() || document1.HasParseError())
{
return false;
}
if( !document.HasMember("status_code")
|| !document1.HasMember("order_id")
|| !document1.HasMember("status") )
{
return false;
}
status_code = document["status_code"].GetInt();
msg = document["msg"].GetString();
order_id = document1["order_id"].GetString();
status = document1["status"].GetInt();
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
writer.StartObject();
writer.Key("type");
writer.Int(2);
writer.Key("order_id");
writer.String(order_id.c_str());
writer.Key("status");
writer.Int(status);
writer.Key("status_code");
writer.Int(status_code);
writer.Key("msg");
writer.String(msg.c_str());
writer.EndObject();
result = buffer.GetString();
return true;
}
bool JsonModule::convertDataPos2Ods(const std::string &data, std::string &result)
{
}
bool JsonModule::convertDataOds2Pos(const std::string &data, std::string &result)
{
rapidjson::Document data;
document.Parse(posResponse);
document1.Parse(orderData);
if (document.HasParseError() || document1.HasParseError())
{
return false;
}
if( !document.HasMember("status_code")
|| !document1.HasMember("order_id")
|| !document1.HasMember("status") )
{
return false;
}
}
std::string JsonModule::convertToNewOrderJson(orderObj &obj)
{
rapidjson::StringBuffer buffer;
......
......@@ -14,6 +14,45 @@ public:
void getPushOrders(IN const char* json,OUT std::vector<orderObj> &vecOrders);
/* 功能:判断是否是初始化数据
* 参数:[1]待判断数据
* 返回:...
* */
bool isInitData(IN const std::string& data);
/* 功能:检查初始化数据是否可用
* 参数:[1]待检查数据[2]POS监听端口
* 返回:是否正确格式
* */
bool checkInitData(IN const std::string& data, OUT int& posListenPort);
/* 功能:获取POS请求的返回数据
* 参数:[1]状态码[2]消息[3]转换后数据
* 重载:[1]ODS返回数据[2]转换后数据
* 返回:是否转换成功
* */
bool getPosResponseData(IN int status, IN const std::string& msg, OUT std::string& result);
bool getPosResponseData(IN const std::string& odsResponse, OUT std::string& result);
/* 功能:获取ODS推送的返回数据
* 参数:[1]状态码[2]消息[3]转换后数据
* 重载:[1]POS返回数据[2]订单数据[3]转换后数据
* 返回:是否转换成功
* */
bool getOdsResponseData(IN int status_code, IN const std::string& msg, OUT std::string& result);
bool getOdsResponseData(IN const std::string& posResponse, IN const std::string& orderData, OUT std::string& result);
/* 功能:转换POS数据格式到中台数据格式
* 参数:[1]POS数据格式[2]转换后数据
* 返回:是否转换成功
* */
bool convertDataPos2Ods(IN const std::string& data, OUT std::string& result);
/* 功能:转换中台数据格式到POS数据格式
* 参数:[1]中台数据格式[2]转换后数据
* 返回:是否转换成功
* */
bool convertDataOds2Pos(IN const std::string& data, OUT std::string& result);
std::string convertToNewOrderJson(orderObj &obj);
std::string convertToOrderStatusJson(orderStatusObj &obj);
std::string convertToRefundJson(refundObj &obj);
......
......@@ -12,16 +12,14 @@
INITIALIZE_EASYLOGGINGPP
std::string str_pos2ods_request;
std::string str_pos2ods_reply;
std::string str_ods2pos_push;
std::string str_ods2pos_reply;
std::string g_init_data;
std::string ods_ip;
int ods_listen_port;
std::string pos_ip = "127.0.0.1";
int ods_push_port;
int ods_recv_port;
int client_listen_port;
int pos_listen_port = 8009;
int pos_listen_port;
void logRolloutHandler(const char* filename, std::size_t size)
{
......@@ -34,107 +32,88 @@ void logRolloutHandler(const char* filename, std::size_t size)
LOG(INFO)<<"备份日志:"<<ss.str().c_str();
system(ss.str().c_str());
}
void* listen_ods_func(void* arg)
void* listen_pos_func(void* arg)
{
TCPClient *client = (TCPClient*)arg;
TCPServer server;
JsonModule jsonTool;
while(true)
{
std::string msg;
// 判断是否成功连接ODS
if( !client->isValid() )
{
if( client->doConnect(ods_listen_port, ods_ip.c_str()) )
if( server.doListen(client_listen_port) )
{
LOG(INFO) << "重连ODS成功";
LOG(INFO) << "监听端口: [" << client_listen_port << "] 成功";
}else
{
continue;
usleep(800);
}
LOG(INFO) << "监听端口: [" << client_listen_port << "] 失败";
}
if( client->receive(msg) != -1 )
while(true)
{
// 判断是推送还是回复
std::string mark("push");
if( msg.compare(0, mark.size(), mark) == 0 )
TCPClient pos;
std::string posRequestData;
std::string requestOdsData;
std::string responseData;
if( server.accept(pos) )
{
// 为ODS的推送消息
TCPClient tmp_client;
if(tmp_client.doConnect(pos_listen_port, "127.0.0.1"))
if( pos.receive(posRequestData) )
{
if(tmp_client.send(msg))
// 如果为初始化请求则通过长连接socket发送
if( jsonTool.isInitData(posRequestData) )
{
std::string tmp_msg;
if(tmp_client.receive(tmp_msg))
if( jsonTool.checkInitData(posRequestData, pos_listen_port) )
{
str_pos2ods_reply = tmp_msg;
g_init_data = posRequestData;
jsonTool.getPosResponseData(100, "successful!", responseData);
}else
{
str_pos2ods_reply = "获取POS返回值失败";
jsonTool.getPosResponseData(101, "invalid initdata!", responseData);
}
}else
{
LOG(INFO) << "转发失败: "<<msg;
str_pos2ods_reply = "推送给POS失败";
}
tmp_client.close();
}else
// 将POS请求数据转换为中台可接受数据格式
if( jsonTool.convertDataPos2Ods(posRequestData, requestOdsData) )
{
str_pos2ods_reply = "连接POS失败";
}
// 同步阻塞发送到ODS并等待返回
TCPClient ods;
if( ods.connect(ods_recv_port, ods_ip.c_str()) )
{
if( ods.send(RequestOdsData) )
{
std::string tmp;
if( ods.receive(tmp) )
{
jsonTool.getPosResponseData(tmp, responseData);
}else
{
// 为ODS的返回消息
str_ods2pos_reply = msg;
jsonTool.getPosResponseData(101, "receive data from [ODS] failed!", responseData);
}
}else
{
LOG(INFO) << "ods连接失效,尝试重连。。。。。。";
jsonTool.getPosResponseData(101, "send data to [ODS] failed!", responseData);
}
}
}
void* listen_pos_func(void* arg)
{
TCPServer server;
if( server.doListen(client_listen_port) )
ods.close();
}else
{
LOG(INFO) << "监听端口成功";
jsonTool.getPosResponseData(101, "connect [ODS] failed!", responseData);
}
while(true)
{
std::string msg;
TCPClient connect;
if( server.accept(connect) )
{
if( connect.receive(msg) )
{
// POS请求数据
// 赋值给中间变量,等待其他线程发送给ODS
str_pos2ods_request = msg;
// 阻塞等待ODS返回
while(str_ods2pos_reply.empty())
}else
{
usleep(200);
continue;
jsonTool.getPosResponseData(101, "convert data to [ODS] format failed!", responseData);
}
}
// 返回给POS
connect.send(str_ods2pos_reply);
str_ods2pos_reply.clear();
connect.close();
// TODO待加入重试机制
pos.send(responseData);
pos.close();
}else
{
LOG(INFO) << "接收POS推送消息失败";
}
}else
{
LOG(INFO) << "接受POS连接失败";
LOG(INFO) << "接收POS连接失败";
}
}
}
......@@ -142,7 +121,6 @@ void* listen_pos_func(void* arg)
int main()
{
signal(SIGPIPE, SIG_IGN);
// 初始化日志
......@@ -156,61 +134,112 @@ int main()
/// 注册回调函数
el::Helpers::installPreRollOutCallback(logRolloutHandler);
LOG(INFO)<<"--------------程序启动--------------";
LOG(INFO)<<"---------程序启动---------";
// 读取配置文件信息
std::string strIniPath(strBinPath.data());
strIniPath.append("config.ini");
ods_ip = ZIni::readString("ODS","ip", "", strIniPath.c_str());
ods_listen_port = ZIni::readInt("ODS", "listenPort", -1, strIniPath.c_str());
client_listen_port = ZIni::readInt("CLIENT", "listenPort", -1, strIniPath.c_str());
LOG(INFO) << "[ODS]服务器ip地址: " << ods_ip.data() << "-监听端口: " << ods_listen_port;
LOG(INFO) << "本地监听端口: " << client_listen_port;
ods_push_port = ZIni::readInt("ODS", "pushPort", 0, strIniPath.c_str());
ods_recv_port = ZIni::readInt("ODS", "recvPort", 0, strIniPath.c_str());
// 监听POS请求的线程
pthread_t listen_pos_id, listen_ods_id;
pos_ip = ZIni::readString("POS","ip", "", strIniPath.c_str());
pos_listen_port = ZIni::readInt("POS","port", 0, strIniPath.c_str());
// 和ODS长连接通信
TCPClient ods_client;
client_listen_port = ZIni::readInt("SYS", "port", 0, strIniPath.c_str());
if( ods_client.doConnect(ods_listen_port, ods_ip.c_str()) )
LOG(INFO) << "连接ODS成功";
LOG(INFO) << "[ODS]ip地址: " << ods_ip.data()
<< "-推送端口: " << ods_push_port
<< "-监听端口: " << ods_recv_port;
LOG(INFO) << "[CLIENT]监听端口: " << client_listen_port;
LOG(INFO) << "[POS]ip地址: " << pos_ip
<< "-监听端口: " << pos_listen_port;
//end
// 监听POS请求的线程
pthread_t listen_pos_id;
/*创建 listen_pos 线程*/
if(pthread_create(&listen_pos_id,NULL,listen_pos_func,NULL))
LOG(INFO) << "创建listen_pos线程失败";
/*创建 listen_ods 线程*/
if(pthread_create(&listen_ods_id,NULL,listen_ods_func,&ods_client))
LOG(INFO) << "创建listen_pos线程失败";
TCPClient ods;
JsonModule jsonTool;
while(true)
{
// 专门负责pos到ods的数据转发
if(!str_pos2ods_reply.empty())
std::string odsPushData;
std::string pushPosData;
std::string responseData;
// 判断是否成功连接ODS
if( !ods.isValid() )
{
// 将POS返回给ODS
if( ods_client.send(str_pos2ods_reply) )
if( ods.doConnect(ods_push_port, ods_ip.c_str()) )
{
str_pos2ods_reply.clear();
LOG(INFO) << "连接ODS成功";
// 注册socket信息
while(true)
{
if(!g_init_data.empty())
{
if( ods.send(g_init_data) )
{
break;
}
}
usleep(500);
continue;
}
}else
{
continue;
usleep(800);
}
}
if(!str_pos2ods_request.empty())
if( ods.receive(odsPushData) )
{
if( jsonTool.convertDataOds2Pos(odsPushData, pushPosData) )
{
TCPClient pos;
if( pos.connect(pos_listen_port, pos_ip.c_str()) )
{
// 将POS的请求转发给ODS
if( ods_client.send(str_pos2ods_request))
if( pos.send(pushPosData) )
{
str_pos2ods_request.clear();
std::string tmp;
if( pos.receive(tmp) )
{
jsonTool.getOdsResponseData(tmp, odsPushData, responseData);
}else
{
str_ods2pos_reply = "请求转发给ODS失败";
jsonTool.getOdsResponseData(101, "receive data from [POS] failed!", responseData);
}
}else
{
jsonTool.getOdsResponseData(101, "send data to [POS] failed!", responseData);
}
usleep(100);
pos.close();
}else
{
jsonTool.getOdsResponseData(101, "connect [POS] failed!", responseData);
}
}else
{
jsonTool.getOdsResponseData(101, "convert data to [POS] format failed!", responseData);
}
// TODO待加入重试机制
ods.send(responseData);
}else
{
LOG(INFO) << "接收ODS推送消息失败";
}
}
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment