使用C++ boost_asio开发异步的tcp server程序,核心是通过回调注册事件并处理读写事件,然后通过智能指针安全释放资源,并且借助定时器来做心跳处理,其大致的代码如下(麻雀虽小五脏俱全):
using boost::asio::ip::tcp;
class TcpConn : public std::enable_shared_from_this<TcpConn>
{
public:
TcpConn(tcp::socket socket);
~TcpConn();
void Start(ReadCallbackFunctor& cb, CloseCallbackFunctor close_cb, std::int32_t read_timeout = 60);
void Close();
int WriteData(const char* data, int len);
const std::string &GetClientIP() const {
return m_client_ip;
}
std::uint16_t GetClientPort() {
return m_client_port;
}
private:
void Write();
void Read();
void CheckTimeout();
private:
tcp::socket m_socket_;
std::string m_client_ip;
std::uint16_t m_client_port;
enum {
max_length = 10240,
default_read_timeout = 60
};
std::string send_data_;
char recv_data_[max_length];
ReadCallbackFunctor m_message_cb_;
CloseCallbackFunctor m_close_cb_;
boost::asio::deadline_timer m_t;
std::int32_t m_read_timeout;
};
class CTcpServer : public CTransportHelp
{
public:
CTcpServer();
~CTcpServer();
bool Open(unsigned short port);
void Run();
void Close();
int SendData(const unsigned char* data, int len);
void SetReadCallback(ReadCallbackFunctor cb) {
read_cb = cb;
}
private:
void Accept();
void CloseCallback() {
m_conn_ptr.reset();
}
private:
boost::asio::io_context* m_io_context_ptr;
tcp::acceptor* m_acceptor_ptr;
ReadCallbackFunctor read_cb;
std::shared_ptr<TcpConn> m_conn_ptr;
};
CTcpServer::CTcpServer() : m_io_context_ptr(new boost::asio::io_context())
, m_acceptor_ptr(nullptr)
, read_cb(nullptr)
, m_conn_ptr(nullptr)
{
}
CTcpServer::~CTcpServer()
{
Close();
if (m_acceptor_ptr)
{
delete m_acceptor_ptr;
m_acceptor_ptr = nullptr;
}
if (m_io_context_ptr)
{
delete m_io_context_ptr;
m_io_context_ptr = nullptr;
}
}
bool CTcpServer::Open(unsigned short port)
{
bool result = false;
if (!read_cb)
{
LOG_ERROR("read callback is null");
return result;
}
LOG_INFO("Open tcp server port: %u", port);
try
{
m_acceptor_ptr = new tcp::acceptor(*m_io_context_ptr, tcp::endpoint(tcp::v4(), port));
Accept();
result = true;
}
catch (boost::system::system_error & e)
{
LOG_ERROR("Exception: %s", e.what());
}
catch (...)
{
}
return result;
}
void CTcpServer::Run()
{
if (m_io_context_ptr)
{
m_io_context_ptr->run();
}
}
void CTcpServer::Close()
{
if (m_conn_ptr)
{
m_conn_ptr->Close();
}
m_io_context_ptr->stop();
}
int CTcpServer::SendData(const unsigned char* data, int len)
{
int send_len = 0;
if (m_conn_ptr)
{
m_conn_ptr->WriteData((const char *)data, len);
}
return send_len;
}
void CTcpServer::Accept()
{
if (m_acceptor_ptr)
{
m_acceptor_ptr->async_accept(
[this](boost::system::error_code ec, tcp::socket socket)
{
if (!ec)
{
LOG_INFO("[%s:%u] accept new connection", socket.remote_endpoint().address().to_string().c_str(), socket.remote_endpoint().port());
if (m_conn_ptr)
{
LOG_INFO("[%s:%u] close old connection", m_conn_ptr->GetClientIP().c_str(), m_conn_ptr->GetClientPort());
m_conn_ptr->Close();
}
m_conn_ptr = std::make_shared<TcpConn>(std::move(socket));
m_conn_ptr->Start(read_cb, std::bind(&CTcpServer::CloseCallback, this), 600);
}
else
{
LOG_ERROR("accept error: %s", ec.message().c_str());
}
Accept();
}
);
}
}
TcpConn::TcpConn(tcp::socket socket) : m_socket_(std::move(socket))
, m_client_ip(m_socket_.remote_endpoint().address().to_string())
, m_client_port(m_socket_.remote_endpoint().port())
, send_data_("")
, m_message_cb_(nullptr)
, m_close_cb_(nullptr)
, m_t(socket.get_io_context())
, m_read_timeout(default_read_timeout)
{
memset(recv_data_, 0, max_length);
m_socket_.non_blocking();
}
TcpConn::~TcpConn()
{
LOG_INFO("TcpConn is released");
}
void TcpConn::Start(ReadCallbackFunctor& cb, CloseCallbackFunctor close_cb, std::int32_t read_timeout)
{
m_read_timeout = read_timeout;
if (m_read_timeout > 0)
{
m_t.expires_from_now(boost::posix_time::seconds(m_read_timeout));
m_t.async_wait(std::bind(&TcpConn::CheckTimeout, this));
}
m_message_cb_ = cb;
m_close_cb_ = close_cb;
Read();
}
void TcpConn::Close()
{
if (m_close_cb_)
{
m_close_cb_();
m_close_cb_ = nullptr; // 防止重复调用
m_socket_.close();
m_t.cancel();
LOG_ERROR("Connection is closed[%s:%u]", GetClientIP().c_str(), GetClientPort());
}
}
int TcpConn::WriteData(const char* data, int len)
{
auto self(shared_from_this());
std::string msg(data, len);
m_socket_.get_io_context().post([self, msg]() {
// m_send_vec_的操作只在主线程中进行
self->send_data_.append(msg);
});
Write();
return len;
}
void TcpConn::Write()
{
auto self(shared_from_this());
boost::asio::async_write(m_socket_, boost::asio::buffer(send_data_.c_str(), send_data_.size()),
[self](boost::system::error_code ec, std::size_t len)
{
if (!ec)
{
if (len <= self->send_data_.size())
{
self->send_data_ = self->send_data_.substr(len);
}
else
{
self->send_data_.clear();
self->send_data_ = "";
}
if (self->send_data_.size() > 0)
{
self->Write();
}
}
else
{
LOG_ERROR("write error: %s", ec.message().c_str());
if (ec == boost::asio::error::try_again
|| ec == boost::asio::error::would_block
|| ec == boost::asio::error::timed_out
|| ec == boost::asio::error::interrupted)
{
// 不处理
}
else
{
self->Close();
}
}
}
);
}
void TcpConn::Read()
{
auto self(shared_from_this());
m_socket_.async_read_some(boost::asio::buffer(recv_data_, max_length),
[self](boost::system::error_code ec, std::size_t length)
{
// 重置定时器
if (self->m_read_timeout > 0)
{
// printf("重置定时器\n");
self->m_t.expires_from_now(boost::posix_time::seconds(self->m_read_timeout));
}
if (!ec)
{
// printf("read %d bytes data: %s\n", length, self->recv_data_);
if (self->m_message_cb_)
{
self->m_message_cb_(self, std::string(self->recv_data_, length));
}
self->Read();
}
else
{
LOG_ERROR("read error: %s", ec.message().c_str());
if (ec == boost::asio::error::try_again
|| ec == boost::asio::error::would_block
|| ec == boost::asio::error::timed_out
|| ec == boost::asio::error::interrupted)
{
// 不处理
}
else
{
self->Close();
}
}
}
);
}
void TcpConn::CheckTimeout()
{
// printf("check time\n");
auto self(shared_from_this());
if (!self)
return;
if (self->m_t.expires_at() <= boost::asio::deadline_timer::traits_type::now())
{
self->Close(); //关闭socket
self->m_t.expires_at(boost::posix_time::pos_infin);
}
else
{
self->m_t.async_wait(std::bind(&TcpConn::CheckTimeout, this));
}
}