Commit da77d83b by guanghui.cui

去除发送失败本地sqlite存储,直接返回服务端失败

parent f4550690
......@@ -412,79 +412,6 @@ bool JsonModule::getPushOrders(IN const char* json,OUT orderObj &order)
}
}
//积分信息
// if(orderContent_obj.HasMember("points"))
// {
// rapidjson::Value& points_obj = orderContent_obj["points"];
// if(points_obj.IsObject())
// {
// order.bonusInfo.summary = std::to_string(GetJsonIntSafe(points_obj,"totalPoint"));
// //积分信息详情
// if(points_obj.HasMember("pointDetails"))
// {
// rapidjson::Value& pointDetails_array = points_obj["pointDetails"];
// if(pointDetails_array.IsArray())
// {
// for(unsigned int i=0;i<pointDetails_array.Size();i++){
// rapidjson::Value& pointDetails_obj = pointDetails_array[i];
// bonusDetail detail;
// GetJsonStringSafe(pointDetails_obj,"bomId");
// GetJsonStringSafe(pointDetails_obj,"comboId");
// detail.desc = GetJsonStringSafe(pointDetails_obj,"desc");
// GetJsonStringSafe(pointDetails_obj,"groupId");
// detail.point = GetJsonIntSafe(pointDetails_obj,"point");
// detail.sku = GetJsonStringSafe(pointDetails_obj,"sku");
// detail.type = atoi(GetJsonStringSafe(pointDetails_obj,"type"));
// order.bonusInfo.vecDetail.push_back(detail);
// }
// }
// }
// }
// }
//优惠信息
// if(orderContent_obj.HasMember("promotions"))
// {
// rapidjson::Value& promotions_obj = orderContent_obj["promotions"];
// if(promotions_obj.IsObject())
// {
// order.promotionInfo.summary=GetJsonIntSafe(promotions_obj,"totalDiscount");
// GetJsonIntSafe(promotions_obj,"totalOriginalPrice");
// GetJsonIntSafe(promotions_obj,"totalPrmotionPrice");
// //优惠信息详情
// if(promotions_obj.HasMember("promtionDetails"))
// {
// rapidjson::Value& promtionDetails_array = promotions_obj["promtionDetails"];
// if(promtionDetails_array.IsArray())
// {
// for(unsigned int i=0;i<promtionDetails_array.Size();i++){
// rapidjson::Value& promtionDetails_obj = promtionDetails_array[i];
// promotionsDetail detail;
// detail.pro_id = GetJsonStringSafe(promtionDetails_obj,"proId");
// GetJsonStringSafe(promtionDetails_obj,"bomId");
// GetJsonStringSafe(promtionDetails_obj,"comboId");
// detail.desc = GetJsonStringSafe(promtionDetails_obj,"desc");
// detail.offer = GetJsonIntSafe(promtionDetails_obj,"discount");
// GetJsonStringSafe(promtionDetails_obj,"groupId");
// GetJsonIntSafe(promtionDetails_obj,"originalPrice");
// GetJsonStringSafe(promtionDetails_obj,"pcode");
// GetJsonIntSafe(promtionDetails_obj,"prmotionPrice");
// detail.sku = GetJsonStringSafe(promtionDetails_obj,"sku");
// detail.type = atoi(GetJsonStringSafe(promtionDetails_obj,"type"));
// order.promotionInfo.vecDetail.push_back(detail);
// }
// }
// }
// }
// }
GetJsonStringSafe(orderContent_obj,"sessionId");
order.reduced_price = GetJsonIntSafe(orderContent_obj,"totalDiscount");
......@@ -642,13 +569,8 @@ bool JsonModule::getInitBackData(IN const char* inJson,OUT std::string& outJson)
rapidjson::Value& code = document["code"];
status_code = atoi(code.GetString());
if(document.HasMember("orderConfirmType")){
autoconfirm = document["orderConfirmType"].GetInt();
}
if(document.HasMember("orderPushPosIndex")){
default_pos = document["orderPushPosIndex"].GetString();
}
autoconfirm = GetJsonIntSafe(document,"orderConfirmType");
default_pos = GetJsonStringSafe(document,"orderPushPosIndex");
// if(document.HasMember("message")){
// msg = document["message"].GetString();
......@@ -1283,78 +1205,6 @@ std::string JsonModule::_convertToNewOrderJson(orderObj &obj)
writer.EndArray();
//----------------电子点标 结束-----------
// //----------------消费积分----------------
// writer.Key("bonus");
// writer.StartObject();
// writer.Key("summary");
// writer.String(obj.bonusInfo.summary.c_str());
// //-----------------start 积分详情------------------
// writer.Key("details");
// writer.StartArray();
// for(unsigned int i=0;i<obj.bonusInfo.vecDetail.size();i++)
// {
// writer.StartObject();
// writer.Key("type");
// writer.Int(obj.bonusInfo.vecDetail[i].type);
// writer.Key("desc");
// writer.String(obj.bonusInfo.vecDetail[i].desc.c_str());
// writer.Key("point");
// writer.Int(obj.bonusInfo.vecDetail[i].point);
// writer.Key("sku");
// writer.String(obj.bonusInfo.vecDetail[i].sku.c_str());
// writer.EndObject();
// }
// writer.EndArray();
// //-----------------end 积分详情--------------------
// writer.EndObject();
// //----------------消费积分 结束-----------
// //----------------促销列表----------------
// writer.Key("promotions");
// writer.StartObject();
// writer.Key("summary");
// writer.String(obj.promotionInfo.summary.c_str());
// //-----------------start 促销详情------------------
// writer.Key("details");
// writer.StartArray();
// for(unsigned int i=0;i<obj.promotionInfo.vecDetail.size();i++)
// {
// writer.StartObject();
// writer.Key("pro_id");
// writer.String(obj.promotionInfo.vecDetail[i].pro_id.c_str());
// writer.Key("type");
// writer.Int(obj.promotionInfo.vecDetail[i].type);
// writer.Key("desc");
// writer.String(obj.promotionInfo.vecDetail[i].desc.c_str());
// writer.Key("offer");
// writer.Int(obj.promotionInfo.vecDetail[i].offer);
// writer.Key("sku");
// writer.String(obj.promotionInfo.vecDetail[i].sku.c_str());
// writer.EndObject();
// }
// writer.EndArray();
// //-----------------end 促销详情--------------------
// writer.EndObject();
// //----------------促销列表 结束-----------
writer.EndObject();
return buffer.GetString();
......
......@@ -24,16 +24,16 @@ int ods_push_port;
int ods_recv_port;
int client_listen_port;
int pos_listen_port;
std::vector<orderSendFailedObj> vecFailedOrders;
bool bRetryThreadRunning=false; //retry线程是否正在运行
//std::vector<orderSendFailedObj> vecFailedOrders;
//bool bRetryThreadRunning=false; //retry线程是否正在运行
bool bInitDone=false; //初始化完成
bool bPriorityDone=true; //设置POS优先级
SQLite sqlite;
//SQLite sqlite;
//函数,订单信息发送给pos
bool order_send_to_pos(IN std::string &order_json,IN std::string &ods_json,OUT std::string &back_json);
//失败订单写入vector及sqlite
void write_failed_order(IN std::string &order_json);
//void write_failed_order(IN std::string &order_json);
void logRolloutHandler(const char* filename, std::size_t size)
{
......@@ -77,7 +77,7 @@ void* listen_pos_func(void* arg)
// 如果为初始化请求则通过长连接socket发送
//bool isInit=jsonTool.isInitData(posRequestData);
int reqType=jsonTool.getPushType(posRequestData.data());
LOG(INFO)<<"reqType.:"<<reqType;
LOG(INFO)<<"pos reqType:"<<reqType;
if(reqType==REQUEST_TYPE_INIT)
{
bInitDone=false;
......@@ -85,15 +85,15 @@ void* listen_pos_func(void* arg)
{
g_init_data_ods_back.clear();
// g_init_data = posRequestData;
LOG(INFO) <<"POS req data:"<<posRequestData.c_str();
LOG(INFO) <<"INIT POS ===>> PLUGIN:"<<posRequestData.c_str();
jsonTool.convertInitDataPos2Ods(posRequestData,g_init_data);
//jsonTool.getPosResponseData(100, "successful!", responseData);
LOG(INFO) <<"POS init data:"<<g_init_data.c_str();
LOG(INFO) <<"INIT PLUGIN ===>> ODS:"<<g_init_data.c_str();
//等待ods返回初始化结果
while(true){
if(!g_init_data_ods_back.empty()){
jsonTool.getInitBackData(g_init_data_ods_back.data(),responseData);
LOG(INFO) <<"responseData:"<<responseData.c_str();
LOG(INFO) <<"INIT PLUGIN ===>> POS:"<<responseData.c_str();
break;
}
else{
......@@ -135,11 +135,11 @@ void* listen_pos_func(void* arg)
}
else
{
LOG(INFO)<<"POS send data:"<<posRequestData.data();
LOG(INFO)<<"REQ POS ===>> PLUGIN:"<<posRequestData.data();
// 将POS请求数据转换为中台可接受数据格式
if( jsonTool.convertDataPos2Ods(posRequestData, requestOdsData) )
{
LOG(INFO)<<"convert pos data to ods:"<<requestOdsData.data();
LOG(INFO)<<"REQ PLUGIN ===>> ODS:"<<requestOdsData.data();
// 同步阻塞发送到ODS并等待返回
TCPClient ods;
if( ods.doConnect(ods_recv_port, ods_ip.c_str()) )
......@@ -149,7 +149,7 @@ void* listen_pos_func(void* arg)
std::string tmp;
if( ods.receive(tmp) )
{
LOG(INFO)<<"receive ods back:"<<tmp.data();
LOG(INFO)<<"REQ ODS ===>> PLUGIN:"<<tmp.data();
jsonTool.getPosResponseData(tmp,posRequestData, responseData);
}else
{
......@@ -172,7 +172,7 @@ void* listen_pos_func(void* arg)
}
// TODO待加入重试机制
LOG(INFO) << "ODS response data send to pos:"<<responseData.data();
LOG(INFO) << "PLUGIN ===>>POS:"<<responseData.data();
pos.write(responseData.c_str());
pos.close();
if(reqType==REQUEST_TYPE_INIT){
......@@ -181,7 +181,7 @@ void* listen_pos_func(void* arg)
if(reqType==REQUEST_TYPE_POS_PRIORITY){
bPriorityDone=true; //设置pos优先级完成
}
LOG(INFO)<<"pos.close";
LOG(INFO)<<"SEND END";
}else
{
LOG(INFO) << "recv pos pushDate failed";
......@@ -193,37 +193,37 @@ void* listen_pos_func(void* arg)
}
}
//发送POS订单失败,重新发送线程
void* retry_send_pos_func(void* arg)
{
bRetryThreadRunning=true;
JsonModule jsonTool;
std::vector<orderSendFailedObj>::iterator Iter;
for(Iter = vecFailedOrders.begin(); Iter != vecFailedOrders.end();) {
Iter = vecFailedOrders.begin(); //一直发送第一个,直到发送成功
std::string back_json;
std::string ods_json=(*Iter).order_json;
std::string order_json;
if( jsonTool.convertDataOds2Pos(ods_json, order_json) ){
if(order_send_to_pos(order_json,ods_json,back_json)){
char lpSql[200] = {0};
sprintf(lpSql, "delete from fmOrderFailed where id = %lld",(*Iter).timestamp);
LOG(INFO)<<"sql:"<<lpSql;
if(!sqlite.remove(lpSql)){
LOG(ERROR) << "remove failed:"<<lpSql;
}
vecFailedOrders.erase(Iter); //Iter为删除元素的下一个元素的迭代器
}
}
sleep(1); //等待1s,再次尝试发送
}
LOG(INFO)<<"retry_send_pos_func thread done";
bRetryThreadRunning=false;
pthread_detach(pthread_self());
}
// //发送POS订单失败,重新发送线程
// void* retry_send_pos_func(void* arg)
// {
// bRetryThreadRunning=true;
// JsonModule jsonTool;
// std::vector<orderSendFailedObj>::iterator Iter;
// for(Iter = vecFailedOrders.begin(); Iter != vecFailedOrders.end();) {
// Iter = vecFailedOrders.begin(); //一直发送第一个,直到发送成功
// std::string back_json;
// std::string ods_json=(*Iter).order_json;
// std::string order_json;
// if( jsonTool.convertDataOds2Pos(ods_json, order_json) ){
// if(order_send_to_pos(order_json,ods_json,back_json)){
// char lpSql[200] = {0};
// sprintf(lpSql, "delete from fmOrderFailed where id = %lld",(*Iter).timestamp);
// LOG(INFO)<<"sql:"<<lpSql;
// if(!sqlite.remove(lpSql)){
// LOG(ERROR) << "remove failed:"<<lpSql;
// }
// vecFailedOrders.erase(Iter); //Iter为删除元素的下一个元素的迭代器
// }
// }
// sleep(1); //等待1s,再次尝试发送
// }
// LOG(INFO)<<"retry_send_pos_func thread done";
// bRetryThreadRunning=false;
// pthread_detach(pthread_self());
// }
int main()
......@@ -249,10 +249,10 @@ int main()
LOG(INFO)<<"---------software start---------";
//sqlite初始化
if(!sqlite.initSQLite()){
LOG(INFO)<<"initSQLite failed";
return 0;
}
// if(!sqlite.initSQLite()){
// LOG(INFO)<<"initSQLite failed";
// return 0;
// }
// 读取配置文件信息
std::string strIniPath(strBinPath.data());
......@@ -312,7 +312,7 @@ int main()
{
ods.receive(odsPushData);
g_init_data_ods_back=odsPushData;
LOG(INFO)<<"ODS init back:"<<odsPushData.data();
LOG(INFO)<<"INIT ODS ===>> PLUGIN:"<<odsPushData.data();
while(!bInitDone){
LOG(INFO)<<"wait for init done";
if( !ods.isValid() ){
......@@ -322,23 +322,22 @@ int main()
}
sleep(1);
}
break;
//如果连接非法,跳出循环重新连接
if( !ods.isValid() ){
break;
}
// //如果连接非法,跳出循环重新连接
// if( !ods.isValid() ){
// break;
// }
//检测是否有发送失败的订单,如果有的话,启动线程,先发送原先失败的订单
sqlite.query("select * from fmOrderFailed",vecFailedOrders);
LOG(INFO)<<"vecFailedOrders size:"<<vecFailedOrders.size();
if(vecFailedOrders.size()>0){
pthread_t retry_thread_id;
if(pthread_create(&retry_thread_id,NULL,retry_send_pos_func,NULL))
LOG(INFO) << "create retry_send_pos_func thread failed";
}
break;
// //检测是否有发送失败的订单,如果有的话,启动线程,先发送原先失败的订单
// sqlite.query("select * from fmOrderFailed",vecFailedOrders);
// LOG(INFO)<<"vecFailedOrders size:"<<vecFailedOrders.size();
// if(vecFailedOrders.size()>0){
// pthread_t retry_thread_id;
// if(pthread_create(&retry_thread_id,NULL,retry_send_pos_func,NULL))
// LOG(INFO) << "create retry_send_pos_func thread failed";
// }
}
}
else{
......@@ -374,15 +373,16 @@ int main()
{
LOG(INFO) << "----------- PLUGIN ===>> POS ------------";
LOG(INFO) << pushPosData;
if(!order_send_to_pos(pushPosData,odsPushData,responseData)){
write_failed_order(odsPushData);
if(!bRetryThreadRunning){ //如果线程没有在运行
pthread_t retry_thread_id;
/*创建 retry send 线程*/
if(pthread_create(&retry_thread_id,NULL,retry_send_pos_func,NULL))
LOG(INFO) << "create retry_send_pos_func thread failed";
}
}
order_send_to_pos(pushPosData,odsPushData,responseData);
// if(!order_send_to_pos(pushPosData,odsPushData,responseData)){
// write_failed_order(odsPushData);
// if(!bRetryThreadRunning){ //如果线程没有在运行
// pthread_t retry_thread_id;
// /*创建 retry send 线程*/
// if(pthread_create(&retry_thread_id,NULL,retry_send_pos_func,NULL))
// LOG(INFO) << "create retry_send_pos_func thread failed";
// }
// }
}else
{
jsonTool.getOdsResponseData(101, "convert data to [POS] format failed!", responseData);
......@@ -394,7 +394,7 @@ int main()
//responseData="{\"fm_cmd\": 2,\"order_id\" : \"1234\",\"status\" : 1,\"channel\" : \"1234\",\"status_code\" : 100,\"msg\" : \"\"}";
bool rlt = ods.send(responseData);
LOG(INFO)<<"send result:"<<rlt;
LOG(INFO)<<"send to ods result:"<<rlt;
}
else{
if(!bInitDone){
......@@ -406,12 +406,12 @@ int main()
}else
{
ods.close();
LOG(INFO) << "recv ODS pushDate failed or timeout";
LOG(INFO) << "recv ODS pushData failed or timeout";
}
}
/// 注销回调函数
el::Helpers::uninstallPreRollOutCallback();
sqlite.closeSQLite();
//sqlite.closeSQLite();
return 0;
}
......@@ -420,7 +420,7 @@ bool order_send_to_pos(IN std::string &order_json,IN std::string &ods_json,OUT s
bool rlt=true;
TCPClient pos;
JsonModule jsonTool;
std::string tmp="{\"status_code\": 100, \"msg\": \"success\"}";
std::string tmp="{\"status_code\": 102, \"msg\": \"connect pos failed!\"}";
if( pos.doConnect(pos_listen_port, pos_ip.c_str()) )
{
if( pos.write(order_json.c_str()) )
......@@ -453,20 +453,20 @@ bool order_send_to_pos(IN std::string &order_json,IN std::string &ods_json,OUT s
return rlt;
}
void write_failed_order(IN std::string &order_json)
{
orderSendFailedObj orderFailedObj;
orderFailedObj.timestamp=timestamps_milliseconds();
orderFailedObj.order_json=order_json;
vecFailedOrders.push_back(orderFailedObj);
LOG(INFO)<<"---------------create timestamp:"<<orderFailedObj.timestamp<<" vector size:"<<vecFailedOrders.size();
// void write_failed_order(IN std::string &order_json)
// {
// orderSendFailedObj orderFailedObj;
// orderFailedObj.timestamp=timestamps_milliseconds();
// orderFailedObj.order_json=order_json;
// vecFailedOrders.push_back(orderFailedObj);
// LOG(INFO)<<"---------------create timestamp:"<<orderFailedObj.timestamp<<" vector size:"<<vecFailedOrders.size();
char* lpSql = new char[BUF_SIZE];
sprintf(lpSql, "insert into fmOrderFailed(id,msg) values(%lld,'%s')",orderFailedObj.timestamp,orderFailedObj.order_json.c_str());
// char* lpSql = new char[BUF_SIZE];
// sprintf(lpSql, "insert into fmOrderFailed(id,msg) values(%lld,'%s')",orderFailedObj.timestamp,orderFailedObj.order_json.c_str());
if(!sqlite.insert(lpSql)){
LOG(ERROR) << "write sqlite failed:"<<lpSql;
}
// if(!sqlite.insert(lpSql)){
// LOG(ERROR) << "write sqlite failed:"<<lpSql;
// }
delete[] lpSql;
}
// delete[] lpSql;
// }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment