Commit 32f68acc by shangshang.dai

Add 1:新增Socket类

parent bbce0280
#include <iostream>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include "SocketModule.h"
#include "../3rdParty/easylogging/easylogging++.h"
#include <stdio.h>
#define SOCKET_ERROR (-1)
#define LISTEN_PORT 23775
#define BUFFER_SIZE 1024*10 //接收数据缓冲区大小
TCPSocket::TCPSocket(): m_sockfd(-1){}
TCPSocket::~TCPSocket()
{
if ( m_sockfd != -1 )
::close(m_sockfd);
}
bool g_exit = false;
//socket 数据接收
//返回值 成功:接收长度
// 失败: -1
int socketRecvData(char * recvBuf, int bufSize, int socket_fd)
bool TCPSocket::create()
{
int rcvedLength = 0;
int rcvFlag = 0; //firstly, to rcv header
int needLength = sizeof(FMSOCKHEADER);
int rlt = 0;
close();
if ((m_sockfd = ::socket(AF_INET, SOCK_STREAM, 0)) == -1)
return false;
return true;
}
FMSOCKHEADER headx;
char * pcRcvBufer = (char *)(&headx);
bool TCPSocket::bind(unsigned short int port, const char *ip) const
{
if ( m_sockfd == -1 )
return false;
while(0 < needLength)
{
//接收并打印客户端数据
int length = recv(socket_fd, pcRcvBufer + rcvedLength, needLength, 0);
if (0 < length)
{
needLength -= length;
rcvedLength += length;
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
if (ip == NULL)
addr.sin_addr.s_addr = htonl(INADDR_ANY);
else
addr.sin_addr.s_addr = inet_addr(ip);
if ( ::bind(m_sockfd, (const struct sockaddr *)&addr, sizeof(addr)) == -1 )
return false;
return true;
}
bool TCPSocket::listen(int backlog) const
{
if ( m_sockfd == -1 )
return false;
if (0 == needLength)
{
if (0 == rcvFlag)
{
//header rcved complete
rcvFlag = 1;//to rcv payload
if ( ::listen(m_sockfd, backlog) == -1)
return false;
return true;
}
bool TCPSocket::accept(TCPSocket &clientSocket) const
{
if ( m_sockfd == -1 )
return false;
int flag = headx.flag;
int len = headx.len;
int ver = headx.ver;
clientSocket.m_sockfd =
::accept(this->m_sockfd, NULL, NULL);
if (clientSocket.m_sockfd == -1)
return false;
return true;
}
//need to check payloadLength is valid or not
if(len > bufSize)
{
rlt = -1;
LOG(ERROR)<<"socket recv 接收数据长度大于接收缓冲区:"<<len;
break;
}
needLength = len;
pcRcvBufer = recvBuf;
rcvedLength = 0;
continue;
}
bool TCPSocket::connect(unsigned short int port, const char *ip) const
{
if ( m_sockfd == -1 )
return false;
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip);
if ( ::connect(m_sockfd, (const struct sockaddr *)&addr, sizeof(addr)) == -1)
return false;
return true;
}
bool TCPSocket::setNonBlocking(bool flag) const
{
if ( m_sockfd == -1 )
return false;
int opt = fcntl(m_sockfd, F_GETFL, 0);
if (opt == -1)
return false;
if (flag)
opt |= O_NONBLOCK;
else
opt &= ~O_NONBLOCK;
if (fcntl(m_sockfd, F_SETFL, opt) == -1)
return false;
return true;
}
bool TCPSocket::reuseaddr() const
{
if ( m_sockfd == -1 )
return false;
int on = 1;
if (setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1)
return false;
return true;
}
bool TCPSocket::close()
{
if ( m_sockfd == -1 )
return false;
::close(m_sockfd);
m_sockfd = -1;
return true;
}
/** Server TCP Socket**/
bool TCPServer::doListen(unsigned short int port, const char *ip, int backlog)
{
if ( create() )
if ( reuseaddr() )
if ( bind(port, ip) )
if ( listen(backlog) )
{
//payload rcved complete
rlt = rcvedLength;
break;
}
}
}
else if(length == 0)
{
//socket error
rlt = -1;
LOG(ERROR)<<"socket recv 返回值:"<<length;
break;
}
return true;
}
return rlt;
return false;
}
//通过 sockClient 发送数据
int socketSendData(const char * sendBuf, int socket_fd)
bool TCPServer::accept(TCPSocket &clientSocket) const
{
int toSendLength = strlen(sendBuf);
char * pcSendBuf = (char *)sendBuf;
int curSendLength = 0;
int rlt=0;
char* m_pFmPackage = new char[toSendLength + sizeof(FMSOCKHEADER)];
FMSOCKHEADER header = { 0, 0, 0 };
header.flag = 0x4d46;
header.len = toSendLength;
header.ver = 0x1;
return TCPSocket::accept(clientSocket);
}
memcpy(m_pFmPackage, &header, sizeof(FMSOCKHEADER));
memcpy(m_pFmPackage+sizeof(FMSOCKHEADER), pcSendBuf, toSendLength);
toSendLength = toSendLength + sizeof(FMSOCKHEADER);
/** client端特有的send/receive **/
static ssize_t readn(int fd, void *buf, size_t count);
static ssize_t writen(int fd, const void *buf, size_t count);
while(curSendLength < toSendLength)
/** client TCP Socket **/
bool TCPClient::doConnect(unsigned short port, const char *ip)
{
if( create() )
{
int res = send(socket_fd, pcSendBuf + curSendLength, toSendLength - curSendLength, 0);
if(res == SOCKET_ERROR)
if( connect(port, ip) )
{
LOG(ERROR)<<"发送数据给fm服务端失败!";
rlt=-1;
break;
m_bValid = true;
return true;
}
curSendLength += res;
}
if(curSendLength != toSendLength)
return false;
}
//send
bool TCPClient::send(const std::string& message)
{
Packet buf;
buf.msgLen = htonl(message.length());
strcpy(buf.text, message.c_str());
if (writen(m_sockfd, &buf, sizeof(buf.msgLen)+message.length()) == -1)
{
rlt=-1;
LOG(ERROR)<<"socket数据发送不完整!";
m_bValid = false;
return false;
}
return rlt;
return true;
}
//SOCKET服务线程
void *FunSocketServer(void* lpParamter)
bool TCPClient::receive(std::string &message)
{
LOG(INFO)<<"SOCKET SERVER";
int socket_fd, connect_fd;
struct sockaddr_in servaddr;
char* pRecvBuf = new char[BUFFER_SIZE]; //打印数据接收区
int nRecvLen = 0;
int nErrCode = 0;
//初始化Socket
if((socket_fd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){
LOG(ERROR)<<"create socket error:"<<strerror(errno)<<" errno:"<<errno;
exit(0);
//首先读取头部
Packet buf = {0, 0};
size_t readBytes = readn(m_sockfd, &buf.msgLen, sizeof(buf.msgLen));
if (readBytes == (size_t)-1)
{
m_bValid = false;
return false;
}
//初始化
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);//IP地址设置成INADDR_ANY,让系统自动获取本机的IP地址。
servaddr.sin_port = htons(LISTEN_PORT); //设置的端口为LISTEN_PORT
// 设置套接字选项避免地址使用错误
int flag = 1;
if ((setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)))<0)
else if (readBytes != sizeof(buf.msgLen))
{
LOG(ERROR)<<"setsockopt failed";
exit(0);
m_bValid = false;
return false;
}
//将本地地址绑定到所创建的套接字上
if( bind(socket_fd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){
LOG(ERROR)<<"bind socket error:"<<strerror(errno)<<" errno:"<<errno;
exit(0);
//然后读取数据部分
unsigned int lenHost = ntohl(buf.msgLen);
readBytes = readn(m_sockfd, buf.text, lenHost);
if (readBytes == (size_t)-1)
{
m_bValid = false;
return false;
}
//开始监听是否有客户端连接
if( listen(socket_fd, 10) == -1){
LOG(ERROR)<<"listen socket error:"<<strerror(errno)<<" errno:"<<errno;
exit(0);
else if (readBytes != lenHost)
{
m_bValid = false;
return false;
}
LOG(INFO)<<"======waiting for client's request======";
message = buf.text;
return true;
}
while(!g_exit){
//阻塞直到有客户端连接,不然多浪费CPU资源。
if( (connect_fd = accept(socket_fd, (struct sockaddr*)NULL, NULL)) == -1){
LOG(ERROR)<<"accept socket error:"<<strerror(errno)<<" errno:"<<errno;
continue;
bool TCPClient::read(void *buf, size_t count)
{
ssize_t readBytes = ::read(m_sockfd, buf, count);
if (readBytes == -1)
{
m_bValid = false;
return false;
}
//接受客户端传过来的数据
int rlt = socketRecvData(pRecvBuf, BUFFER_SIZE, connect_fd);
if (rlt > 0)
return true;
}
bool TCPClient::write(const char *msg)
{
if( ::write(m_sockfd, msg, strlen(msg)) == -1 )
{
pRecvBuf[rlt] = '\0';
//
LOG(INFO)<<"打印请求:"<<pRecvBuf;
m_bValid = false;
return false;
}
return true;
}
//std::string strReturnJson = GetTakeawayResultJson(statusCode,msg.data(),"");
//发送返回数据
//int result = socketSendData(strReturnJson.data(), connect_fd);
//LOG(INFO)<<"外卖发送处理结果:"<<strReturnJson.c_str();
/** readn/writen实现部分 **/
static ssize_t readn(int fd, void *buf, size_t count)
{
size_t nLeft = count;
ssize_t nRead = 0;
char *pBuf = (char *)buf;
while (nLeft > 0)
{
if ((nRead = read(fd, pBuf, nLeft)) < 0)
{
//如果读取操作是被信号打断了, 则说明还可以继续读
if (errno == EINTR)
continue;
//否则就是其他错误
else
return -1;
}
//读取到末尾
else if (nRead == 0)
return count-nLeft;
close(connect_fd);
//正常读取
nLeft -= nRead;
pBuf += nRead;
}
return count;
}
static ssize_t writen(int fd, const void *buf, size_t count)
{
size_t nLeft = count;
ssize_t nWritten = 0;
char *pBuf = (char *)buf;
while (nLeft > 0)
{
if ((nWritten = write(fd, pBuf, nLeft)) < 0)
{
//如果写入操作是被信号打断了, 则说明还可以继续写入
if (errno == EINTR)
continue;
//否则就是其他错误
else
return -1;
}
//如果 ==0则说明是什么也没写入, 可以继续写
else if (nWritten == 0)
continue;
//正常写入
nLeft -= nWritten;
pBuf += nWritten;
}
close(socket_fd);
delete[] pRecvBuf;
return count;
}
#ifndef SOCKET_MODULE_H
#define SOCKET_MODULE_H
typedef struct {
unsigned int flag;
unsigned int ver;
unsigned int len;
}FMSOCKHEADER, *LPFMSOCKHEADER;
//socket 数据接收
//返回值 成功:接收长度
// 失败: -1
int socketRecvData(char * recvBuf, int bufSize, int socket_fd);
//通过 sockClient 发送数据
int socketSendData(const char * sendBuf, int socket_fd);
//socket server线程
void *FunSocketServer(void* lpParamter);
#include <string>
#include <string.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#define BUF_SIZE 1024*2
class TCPSocket
{
protected:
TCPSocket();
virtual ~TCPSocket();
bool create();
bool bind(unsigned short int port, const char *ip = NULL) const;
bool listen(int backlog = 1) const;
bool accept(TCPSocket &clientSocket) const;
bool connect(unsigned short int port, const char *ip) const;
bool reuseaddr() const;
public:
bool close();
int getfd() const{ return m_sockfd; }
//flag: true=SetNonBlock, false=SetBlock
bool setNonBlocking(bool flag) const;
protected:
int m_sockfd;
};
/** TCP Client **/
class TCPClient : public TCPSocket
{
private:
struct Packet
{
unsigned int msgLen; //数据部分的长度(网络字节序)
char text[BUF_SIZE]; //报文的数据部分
};
public:
TCPClient():m_bValid(false){}
~TCPClient(){}
/* 该函数会强制删除现有socket重新创建连接 */
bool doConnect(unsigned short port, const char *ip);
bool read(void *buf, size_t count);
bool write(const char *msg);
bool receive(std::string& message);
bool send(const std::string& message);
bool isValid(){ return m_bValid; }
private:
bool m_bValid;
};
/** TCP Server **/
class TCPServer : public TCPSocket
{
public:
TCPServer(){}
~TCPServer(){}
/* 该函数会强制删除现有socket重新创建监听 */
bool doListen(unsigned short int port, const char *ip = NULL, int backlog = SOMAXCONN);
bool accept(TCPSocket &clientSocket) const;
};
#endif
......@@ -12,6 +12,17 @@
INITIALIZE_EASYLOGGINGPP
std::string str_pos2ods_request;
std::string str_pos2ods_reply;
std::string str_ods2pos_push;
std::string str_ods2pos_reply;
std::string ods_ip;
int ods_listen_port;
int client_listen_port;
int pos_listen_port = 8009;
void logRolloutHandler(const char* filename, std::size_t size)
{
/// 备份日志
......@@ -23,10 +34,118 @@ void logRolloutHandler(const char* filename, std::size_t size)
LOG(INFO)<<"备份日志:"<<ss.str().c_str();
system(ss.str().c_str());
}
void* listen_ods_func(void* arg)
{
TCPClient *client = (TCPClient*)arg;
while(true)
{
std::string msg;
// 判断是否成功连接ODS
if( !client->isValid() )
{
if( client->doConnect(ods_listen_port, ods_ip.c_str()) )
{
LOG(INFO) << "重连ODS成功";
}else
{
continue;
usleep(800);
}
}
if( client->receive(msg) != -1 )
{
// 判断是推送还是回复
std::string mark("push");
if( msg.compare(0, mark.size(), mark) == 0 )
{
// 为ODS的推送消息
TCPClient tmp_client;
if(tmp_client.doConnect(pos_listen_port, "127.0.0.1"))
{
if(tmp_client.send(msg))
{
std::string tmp_msg;
if(tmp_client.receive(tmp_msg))
{
str_pos2ods_reply = tmp_msg;
}else
{
str_pos2ods_reply = "获取POS返回值失败";
}
}else
{
LOG(INFO) << "转发失败: "<<msg;
str_pos2ods_reply = "推送给POS失败";
}
tmp_client.close();
}else
{
str_pos2ods_reply = "连接POS失败";
}
}else
{
// 为ODS的返回消息
str_ods2pos_reply = msg;
}
}else
{
LOG(INFO) << "ods连接失效,尝试重连。。。。。。";
}
}
}
void* listen_pos_func(void* arg)
{
TCPServer server;
if( server.doListen(client_listen_port) )
{
LOG(INFO) << "监听端口成功";
}
while(true)
{
std::string msg;
TCPClient connect;
if( server.accept(connect) )
{
if( connect.receive(msg) )
{
// POS请求数据
// 赋值给中间变量,等待其他线程发送给ODS
str_pos2ods_request = msg;
// 阻塞等待ODS返回
while(str_ods2pos_reply.empty())
{
usleep(200);
continue;
}
// 返回给POS
connect.send(str_ods2pos_reply);
str_ods2pos_reply.clear();
connect.close();
}else
{
LOG(INFO) << "接收POS推送消息失败";
}
}else
{
LOG(INFO) << "接受POS连接失败";
}
}
}
int main()
{
signal(SIGPIPE, SIG_IGN);
// 初始化日志
el::Loggers::addFlag(el::LoggingFlag::StrictLogFileSizeCheck);
std::string strBinPath = GetProcDir();
std::string strLogPath(strBinPath.data());
......@@ -37,55 +156,61 @@ int main()
/// 注册回调函数
el::Helpers::installPreRollOutCallback(logRolloutHandler);
LOG(INFO)<<"日志测试";
LOG(INFO)<<"--------------程序启动--------------";
//---------ini test-------------
// 读取配置文件信息
std::string strIniPath(strBinPath.data());
strIniPath.append("config.ini");
std::string ip = ZIni::readString("SYS","ip", "",strIniPath.c_str());
LOG(INFO)<<"读取配置文件ip:"<<ip.data();
//------------end---------------
//---------- json test----------
// std::string json = GetTestJson(100,"test data","123456789");
// LOG(INFO)<<"生成JSON" <<json.data();
// LOG(INFO)<<"JSON解析";
// parseJson(json.data());
JsonModule jsonMod;
orderObj obj;
productAttr product;
product.pro.source="123";
productSpec spec;
spec.name="只";
product.vecSpec.push_back(spec);
obj.vecProducts.push_back(product);
std::string orderInfo = jsonMod.convertToNewOrderJson(obj);
LOG(INFO)<<"订单信息转换成JSON:"<<orderInfo.data();
//------------end---------------
//---------- pthread test ---------
LOG(INFO)<<"启动SOCKET线程";
pthread_t printId;
int ret = pthread_create(&printId, NULL, FunSocketServer, NULL);
//---------- end ------------------
//---------- sqlite test-----------
SQLite sqlite;
sqlite.initSQLite();
sqlite.insert("insert into fmTest(fm_id, statusCode,msg,prompt,fm_open_id,total_amount,paid_total_amount,invoice_amount,incentives_amount)\
values('aabbcc',111,'abc',123,'cba',1,1,1,1)");
sqlite.query("select * from fmTest");
sqlite.update("update fmTest set statusCode=200 where fm_id='aabbcc'");
sqlite.query("select * from fmTest");
sqlite.remove("delete from fmTest where fm_id='aabbcc'");
sqlite.query("select * from fmTest");
sqlite.closeSQLite();
//-----------end------------------
char pStr[20];
std::cin>>pStr;
ods_ip = ZIni::readString("ODS","ip", "", strIniPath.c_str());
ods_listen_port = ZIni::readInt("ODS", "listenPort", -1, strIniPath.c_str());
client_listen_port = ZIni::readInt("CLIENT", "listenPort", -1, strIniPath.c_str());
LOG(INFO) << "[ODS]服务器ip地址: " << ods_ip.data() << "-监听端口: " << ods_listen_port;
LOG(INFO) << "本地监听端口: " << client_listen_port;
// 监听POS请求的线程
pthread_t listen_pos_id, listen_ods_id;
// 和ODS长连接通信
TCPClient ods_client;
if( ods_client.doConnect(ods_listen_port, ods_ip.c_str()) )
LOG(INFO) << "连接ODS成功";
/*创建 listen_pos 线程*/
if(pthread_create(&listen_pos_id,NULL,listen_pos_func,NULL))
LOG(INFO) << "创建listen_pos线程失败";
/*创建 listen_ods 线程*/
if(pthread_create(&listen_ods_id,NULL,listen_ods_func,&ods_client))
LOG(INFO) << "创建listen_pos线程失败";
while(true)
{
// 专门负责pos到ods的数据转发
if(!str_pos2ods_reply.empty())
{
// 将POS返回给ODS
if( ods_client.send(str_pos2ods_reply) )
{
str_pos2ods_reply.clear();
}else
{
}
}
if(!str_pos2ods_request.empty())
{
// 将POS的请求转发给ODS
if( ods_client.send(str_pos2ods_request))
{
str_pos2ods_request.clear();
}else
{
str_ods2pos_reply = "请求转发给ODS失败";
}
}
usleep(100);
}
return 0;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment