Commit f49cc9e4 by guanghui.cui

本地存储发送失败的订单,加入重发逻辑

parent 536b74ed
...@@ -204,7 +204,7 @@ struct serverResponseOperationObj ...@@ -204,7 +204,7 @@ struct serverResponseOperationObj
//订单发送给pos失败结构体 //订单发送给pos失败结构体
struct orderSendFailedObj struct orderSendFailedObj
{ {
long long timestamp=0; //接收到订单时候的时间戳(精确到毫秒) int64_t timestamp=0; //接收到订单时候的时间戳(精确到毫秒)
std::string order_json; //订单json字符串 std::string order_json; //订单json字符串
}; };
......
...@@ -20,6 +20,14 @@ int ods_push_port; ...@@ -20,6 +20,14 @@ int ods_push_port;
int ods_recv_port; int ods_recv_port;
int client_listen_port; int client_listen_port;
int pos_listen_port; int pos_listen_port;
std::vector<orderSendFailedObj> vecFailedOrders;
bool bRetryThreadRunning=false; //retry线程是否正在运行
SQLite sqlite;
//函数,订单信息发送给pos
bool order_send_to_pos(IN std::string &order_json,OUT std::string &back_json);
//失败订单写入vector及sqlite
void write_failed_order(IN std::string &order_json);
void logRolloutHandler(const char* filename, std::size_t size) void logRolloutHandler(const char* filename, std::size_t size)
{ {
...@@ -122,6 +130,31 @@ void* listen_pos_func(void* arg) ...@@ -122,6 +130,31 @@ void* listen_pos_func(void* arg)
} }
} }
//发送POS订单失败,重新发送线程
void* retry_send_pos_func(void* arg)
{
bRetryThreadRunning=true;
std::vector<orderSendFailedObj>::iterator Iter;
for(Iter = vecFailedOrders.begin(); Iter != vecFailedOrders.end();) {
std::string back_json;
if(order_send_to_pos((*Iter).order_json,back_json)){
char lpSql[200] = {0};
sprintf(lpSql, "delete from fmOrderFailed where id = %lld",(*Iter).timestamp);
LOG(INFO)<<"sql:"<<lpSql;
if(!sqlite.remove(lpSql)){
LOG(ERROR) << "remove failed:"<<lpSql;
}
vecFailedOrders.erase(Iter); //Iter为删除元素的下一个元素的迭代器
Iter = vecFailedOrders.begin();
}
sleep(1); //等待1s,再次尝试发送
}
LOG(INFO)<<"retry_send_pos_func thread done";
bRetryThreadRunning=false;
}
int main() int main()
{ {
...@@ -144,23 +177,12 @@ int main() ...@@ -144,23 +177,12 @@ int main()
el::Helpers::installPreRollOutCallback(logRolloutHandler); el::Helpers::installPreRollOutCallback(logRolloutHandler);
LOG(INFO)<<"---------software start---------"; LOG(INFO)<<"---------software start---------";
//sqlite初始化
// //test sqlite if(!sqlite.initSQLite()){
// orderSendFailedObj orderFailedObj; LOG(INFO)<<"initSQLite failed";
// orderFailedObj.timestamp=1111222233334444; return 0;
// LOG(INFO)<<"--------- long long---------:"<<orderFailedObj.timestamp; }
// SQLite sqlite;
// sqlite.initSQLite();
// sqlite.insert("insert into fmOrderFailed(id,msg) values(122233464555,'asdfdfdfdfdfsdadadafdadfafaf')");
// std::vector<orderSendFailedObj> vecFailedOrders;
// sqlite.query("select * from fmOrderFailed",vecFailedOrders);
// for(auto order:vecFailedOrders){
// LOG(INFO)<<"vector element:"<<order.timestamp;
// }
// LOG(INFO)<<"seconds:"<<timestamp_seconds()<<" mSeconds:"<<timestamps_milliseconds();
// sqlite.closeSQLite();
// return 0;
// 读取配置文件信息 // 读取配置文件信息
std::string strIniPath(strBinPath.data()); std::string strIniPath(strBinPath.data());
...@@ -219,6 +241,16 @@ int main() ...@@ -219,6 +241,16 @@ int main()
if( ods.send(g_init_data) ) if( ods.send(g_init_data) )
{ {
ods.receive(odsPushData); ods.receive(odsPushData);
LOG(INFO)<<"ODS init back:"<<odsPushData.data();
//检测是否有发送失败的订单,如果有的话,启动线程,先发送原先失败的订单
sqlite.query("select * from fmOrderFailed",vecFailedOrders);
if(vecFailedOrders.size()>0){
pthread_t retry_thread_id;
if(pthread_create(&retry_thread_id,NULL,retry_send_pos_func,NULL))
LOG(INFO) << "create retry_send_pos_func thread failed";
}
break; break;
} }
} }
...@@ -245,35 +277,14 @@ int main() ...@@ -245,35 +277,14 @@ int main()
{ {
LOG(INFO) << "********convert data to pos************"; LOG(INFO) << "********convert data to pos************";
LOG(INFO) << pushPosData; LOG(INFO) << pushPosData;
TCPClient pos; if(!order_send_to_pos(pushPosData,responseData)){
if( pos.doConnect(pos_listen_port, pos_ip.c_str()) ) write_failed_order(pushPosData);
{ if(!bRetryThreadRunning){ //如果线程没有在运行
if( pos.write(pushPosData.c_str()) ) pthread_t retry_thread_id;
{ /*创建 retry send 线程*/
char tmpBuf[BUF_SIZE] = {0}; if(pthread_create(&retry_thread_id,NULL,retry_send_pos_func,NULL))
if( pos.read(tmpBuf,sizeof(tmpBuf)) ) LOG(INFO) << "create retry_send_pos_func thread failed";
{
LOG(INFO) << "pos response data:"<<tmpBuf;
std::string tmp(tmpBuf);
//pos发送过来的数据为gb2312编码,需要转换为utf8
//std::string tmpUtf8= charset_g2u(tmp);
//LOG(INFO)<<"gb2312 to utf8:"<<tmpUtf8.data();
//jsonTool.getOdsResponseData(tmpUtf8, pushPosData, responseData);
jsonTool.getOdsResponseData(tmp, pushPosData, responseData);
}else
{
jsonTool.getOdsResponseData(101, "receive data from [POS] failed!", responseData);
}
}else
{
jsonTool.getOdsResponseData(101, "send data to [POS] failed!", responseData);
} }
pos.close();
}else
{
jsonTool.getOdsResponseData(101, "connect [POS] failed!", responseData);
} }
}else }else
{ {
...@@ -297,6 +308,61 @@ int main() ...@@ -297,6 +308,61 @@ int main()
} }
/// 注销回调函数 /// 注销回调函数
el::Helpers::uninstallPreRollOutCallback(); el::Helpers::uninstallPreRollOutCallback();
sqlite.closeSQLite();
return 0; return 0;
} }
bool order_send_to_pos(IN std::string &order_json,OUT std::string &back_json)
{
bool rlt=true;
TCPClient pos;
JsonModule jsonTool;
std::string tmp="{\"status_code\": 100, \"msg\": \"success\"}";
if( pos.doConnect(pos_listen_port, pos_ip.c_str()) )
{
if( pos.write(order_json.c_str()) )
{
char tmpBuf[BUF_SIZE] = {0};
if( pos.read(tmpBuf,sizeof(tmpBuf)) )
{
LOG(INFO) << "pos response data:"<<tmpBuf;
tmp=tmpBuf;
//pos发送过来的数据为gb2312编码,需要转换为utf8
//std::string tmpUtf8= charset_g2u(tmp);
//LOG(INFO)<<"gb2312 to utf8:"<<tmpUtf8.data();
//jsonTool.getOdsResponseData(tmpUtf8, order_json, responseData);
}else
{
rlt=false;
}
}else
{
rlt=false;
}
pos.close();
}else
{
rlt=false;
}
jsonTool.getOdsResponseData(tmp, order_json, back_json);
return rlt;
}
void write_failed_order(IN std::string &order_json)
{
orderSendFailedObj orderFailedObj;
orderFailedObj.timestamp=timestamps_milliseconds();
orderFailedObj.order_json=order_json;
vecFailedOrders.push_back(orderFailedObj);
LOG(INFO)<<"---------------create timestamp:"<<orderFailedObj.timestamp<<" vector size:"<<vecFailedOrders.size();
char* lpSql = new char[BUF_SIZE];
sprintf(lpSql, "insert into fmOrderFailed(id,msg) values(%lld,'%s')",orderFailedObj.timestamp,orderFailedObj.order_json.c_str());
if(!sqlite.insert(lpSql)){
LOG(ERROR) << "write sqlite failed:"<<lpSql;
}
delete[] lpSql;
}
...@@ -94,14 +94,14 @@ std::string charset_u2g(const std::string& utf8) ...@@ -94,14 +94,14 @@ std::string charset_u2g(const std::string& utf8)
} }
//获取时间戳(毫秒) //获取时间戳(毫秒)
long long timestamps_milliseconds() int64_t timestamps_milliseconds()
{ {
struct timeval tv; struct timeval tv;
gettimeofday(&tv,NULL); gettimeofday(&tv,NULL);
return (int64_t)tv.tv_sec*1000 + tv.tv_usec/1000; return (int64_t)tv.tv_sec*1000 + tv.tv_usec/1000;
} }
//获取时间戳(秒) //获取时间戳(秒)
long long timestamp_seconds() int64_t timestamp_seconds()
{ {
struct timeval tv; struct timeval tv;
gettimeofday(&tv,NULL); gettimeofday(&tv,NULL);
......
...@@ -14,9 +14,9 @@ std::string charset_u2g(const std::string& utf8); ...@@ -14,9 +14,9 @@ std::string charset_u2g(const std::string& utf8);
std::string charset_g2u(const std::string& gb2312); std::string charset_g2u(const std::string& gb2312);
//获取时间戳(秒) //获取时间戳(秒)
long long timestamp_seconds(); int64_t timestamp_seconds();
//获取时间戳(毫秒) //获取时间戳(毫秒)
long long timestamps_milliseconds(); int64_t timestamps_milliseconds();
#endif #endif
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment