使用C++ boost_asio开发异步的tcp server程序,核心是通过回调注册事件并处理读写事件,然后通过智能指针安全释放资源,并且借助定时器来做心跳处理,其大致的代码如下(麻雀虽小五脏俱全):
using boost::asio::ip::tcp; class TcpConn : public std::enable_shared_from_this<TcpConn> { public: TcpConn(tcp::socket socket); ~TcpConn(); void Start(ReadCallbackFunctor& cb, CloseCallbackFunctor close_cb, std::int32_t read_timeout = 60); void Close(); int WriteData(const char* data, int len); const std::string &GetClientIP() const { return m_client_ip; } std::uint16_t GetClientPort() { return m_client_port; } private: void Write(); void Read(); void CheckTimeout(); private: tcp::socket m_socket_; std::string m_client_ip; std::uint16_t m_client_port; enum { max_length = 10240, default_read_timeout = 60 }; std::string send_data_; char recv_data_[max_length]; ReadCallbackFunctor m_message_cb_; CloseCallbackFunctor m_close_cb_; boost::asio::deadline_timer m_t; std::int32_t m_read_timeout; }; class CTcpServer : public CTransportHelp { public: CTcpServer(); ~CTcpServer(); bool Open(unsigned short port); void Run(); void Close(); int SendData(const unsigned char* data, int len); void SetReadCallback(ReadCallbackFunctor cb) { read_cb = cb; } private: void Accept(); void CloseCallback() { m_conn_ptr.reset(); } private: boost::asio::io_context* m_io_context_ptr; tcp::acceptor* m_acceptor_ptr; ReadCallbackFunctor read_cb; std::shared_ptr<TcpConn> m_conn_ptr; }; CTcpServer::CTcpServer() : m_io_context_ptr(new boost::asio::io_context()) , m_acceptor_ptr(nullptr) , read_cb(nullptr) , m_conn_ptr(nullptr) { } CTcpServer::~CTcpServer() { Close(); if (m_acceptor_ptr) { delete m_acceptor_ptr; m_acceptor_ptr = nullptr; } if (m_io_context_ptr) { delete m_io_context_ptr; m_io_context_ptr = nullptr; } } bool CTcpServer::Open(unsigned short port) { bool result = false; if (!read_cb) { LOG_ERROR("read callback is null"); return result; } LOG_INFO("Open tcp server port: %u", port); try { m_acceptor_ptr = new tcp::acceptor(*m_io_context_ptr, tcp::endpoint(tcp::v4(), port)); Accept(); result = true; } catch (boost::system::system_error & e) { LOG_ERROR("Exception: %s", e.what()); } catch (...) { } return result; } void CTcpServer::Run() { if (m_io_context_ptr) { m_io_context_ptr->run(); } } void CTcpServer::Close() { if (m_conn_ptr) { m_conn_ptr->Close(); } m_io_context_ptr->stop(); } int CTcpServer::SendData(const unsigned char* data, int len) { int send_len = 0; if (m_conn_ptr) { m_conn_ptr->WriteData((const char *)data, len); } return send_len; } void CTcpServer::Accept() { if (m_acceptor_ptr) { m_acceptor_ptr->async_accept( [this](boost::system::error_code ec, tcp::socket socket) { if (!ec) { LOG_INFO("[%s:%u] accept new connection", socket.remote_endpoint().address().to_string().c_str(), socket.remote_endpoint().port()); if (m_conn_ptr) { LOG_INFO("[%s:%u] close old connection", m_conn_ptr->GetClientIP().c_str(), m_conn_ptr->GetClientPort()); m_conn_ptr->Close(); } m_conn_ptr = std::make_shared<TcpConn>(std::move(socket)); m_conn_ptr->Start(read_cb, std::bind(&CTcpServer::CloseCallback, this), 600); } else { LOG_ERROR("accept error: %s", ec.message().c_str()); } Accept(); } ); } } TcpConn::TcpConn(tcp::socket socket) : m_socket_(std::move(socket)) , m_client_ip(m_socket_.remote_endpoint().address().to_string()) , m_client_port(m_socket_.remote_endpoint().port()) , send_data_("") , m_message_cb_(nullptr) , m_close_cb_(nullptr) , m_t(socket.get_io_context()) , m_read_timeout(default_read_timeout) { memset(recv_data_, 0, max_length); m_socket_.non_blocking(); } TcpConn::~TcpConn() { LOG_INFO("TcpConn is released"); } void TcpConn::Start(ReadCallbackFunctor& cb, CloseCallbackFunctor close_cb, std::int32_t read_timeout) { m_read_timeout = read_timeout; if (m_read_timeout > 0) { m_t.expires_from_now(boost::posix_time::seconds(m_read_timeout)); m_t.async_wait(std::bind(&TcpConn::CheckTimeout, this)); } m_message_cb_ = cb; m_close_cb_ = close_cb; Read(); } void TcpConn::Close() { if (m_close_cb_) { m_close_cb_(); m_close_cb_ = nullptr; // 防止重复调用 m_socket_.close(); m_t.cancel(); LOG_ERROR("Connection is closed[%s:%u]", GetClientIP().c_str(), GetClientPort()); } } int TcpConn::WriteData(const char* data, int len) { auto self(shared_from_this()); std::string msg(data, len); m_socket_.get_io_context().post([self, msg]() { // m_send_vec_的操作只在主线程中进行 self->send_data_.append(msg); }); Write(); return len; } void TcpConn::Write() { auto self(shared_from_this()); boost::asio::async_write(m_socket_, boost::asio::buffer(send_data_.c_str(), send_data_.size()), [self](boost::system::error_code ec, std::size_t len) { if (!ec) { if (len <= self->send_data_.size()) { self->send_data_ = self->send_data_.substr(len); } else { self->send_data_.clear(); self->send_data_ = ""; } if (self->send_data_.size() > 0) { self->Write(); } } else { LOG_ERROR("write error: %s", ec.message().c_str()); if (ec == boost::asio::error::try_again || ec == boost::asio::error::would_block || ec == boost::asio::error::timed_out || ec == boost::asio::error::interrupted) { // 不处理 } else { self->Close(); } } } ); } void TcpConn::Read() { auto self(shared_from_this()); m_socket_.async_read_some(boost::asio::buffer(recv_data_, max_length), [self](boost::system::error_code ec, std::size_t length) { // 重置定时器 if (self->m_read_timeout > 0) { // printf("重置定时器\n"); self->m_t.expires_from_now(boost::posix_time::seconds(self->m_read_timeout)); } if (!ec) { // printf("read %d bytes data: %s\n", length, self->recv_data_); if (self->m_message_cb_) { self->m_message_cb_(self, std::string(self->recv_data_, length)); } self->Read(); } else { LOG_ERROR("read error: %s", ec.message().c_str()); if (ec == boost::asio::error::try_again || ec == boost::asio::error::would_block || ec == boost::asio::error::timed_out || ec == boost::asio::error::interrupted) { // 不处理 } else { self->Close(); } } } ); } void TcpConn::CheckTimeout() { // printf("check time\n"); auto self(shared_from_this()); if (!self) return; if (self->m_t.expires_at() <= boost::asio::deadline_timer::traits_type::now()) { self->Close(); //关闭socket self->m_t.expires_at(boost::posix_time::pos_infin); } else { self->m_t.async_wait(std::bind(&TcpConn::CheckTimeout, this)); } }