170  
查询码:00000159
如何使用boost_asio搭建异步tcp服务端
作者: 盖杰 于 2020年06月04日 发布在分类 / 物联网组 / 边缘接入网关 下,并于 2020年06月04日 编辑
boost使用技巧

使用C++ boost_asio开发异步的tcp server程序,核心是通过回调注册事件并处理读写事件,然后通过智能指针安全释放资源,并且借助定时器来做心跳处理,其大致的代码如下(麻雀虽小五脏俱全):

using boost::asio::ip::tcp;

class TcpConn : public std::enable_shared_from_this<TcpConn>
{
public:
	TcpConn(tcp::socket socket);
	~TcpConn();

	void Start(ReadCallbackFunctor& cb, CloseCallbackFunctor close_cb, std::int32_t read_timeout = 60);

	void Close();

	int WriteData(const char* data, int len);

	const std::string &GetClientIP() const {
		return m_client_ip;
	}
	
	std::uint16_t GetClientPort() {
		return m_client_port;
	}
private:
	void Write();

	void Read();

	void CheckTimeout();

private:
	tcp::socket m_socket_;
	std::string m_client_ip;
	std::uint16_t m_client_port;

	enum { 
		max_length = 10240,
		default_read_timeout = 60
	};
	std::string send_data_;
	char recv_data_[max_length];
	ReadCallbackFunctor m_message_cb_;
	CloseCallbackFunctor m_close_cb_;
	boost::asio::deadline_timer m_t;
	std::int32_t m_read_timeout;
};

class CTcpServer : public CTransportHelp
{
public:
	CTcpServer();
	~CTcpServer();

	bool Open(unsigned short port);

	void Run();

	void Close();

	int SendData(const unsigned char* data, int len);

	void SetReadCallback(ReadCallbackFunctor cb) {
		read_cb = cb;
	}

private:
	void Accept();

	void CloseCallback() {
		m_conn_ptr.reset();
	}

private:
	boost::asio::io_context* m_io_context_ptr;
	tcp::acceptor* m_acceptor_ptr;

	ReadCallbackFunctor read_cb;

	std::shared_ptr<TcpConn>    m_conn_ptr;
};


CTcpServer::CTcpServer() : m_io_context_ptr(new boost::asio::io_context())
				, m_acceptor_ptr(nullptr)
				, read_cb(nullptr)
				, m_conn_ptr(nullptr)
{
}

CTcpServer::~CTcpServer()
{
	Close();
	if (m_acceptor_ptr)
	{
		delete m_acceptor_ptr;
		m_acceptor_ptr = nullptr;
	}

	if (m_io_context_ptr)
	{
		delete m_io_context_ptr;
		m_io_context_ptr = nullptr;
	}
}

bool CTcpServer::Open(unsigned short port)
{
	bool result = false;
	if (!read_cb)
	{
		LOG_ERROR("read callback is null");
		return result;
	}
	LOG_INFO("Open tcp server port: %u", port);

	try
	{
		m_acceptor_ptr = new tcp::acceptor(*m_io_context_ptr, tcp::endpoint(tcp::v4(), port));
		Accept();
		result = true;
	}
	catch (boost::system::system_error & e)
	{
		LOG_ERROR("Exception: %s", e.what());
	}
	catch (...)
	{
	}
	
	return result;
}

void CTcpServer::Run()
{
	if (m_io_context_ptr)
	{
		m_io_context_ptr->run();
	}
}

void CTcpServer::Close()
{
	if (m_conn_ptr)
	{
		m_conn_ptr->Close();
	}

	m_io_context_ptr->stop();
}

int CTcpServer::SendData(const unsigned char* data, int len)
{
	int send_len = 0;
	if (m_conn_ptr)
	{
		m_conn_ptr->WriteData((const char *)data, len);
	}

	return send_len;
}

void CTcpServer::Accept()
{
	if (m_acceptor_ptr)
	{
		m_acceptor_ptr->async_accept(
			[this](boost::system::error_code ec, tcp::socket socket)
			{
				if (!ec)
				{
					LOG_INFO("[%s:%u] accept new connection", socket.remote_endpoint().address().to_string().c_str(), socket.remote_endpoint().port());
					if (m_conn_ptr)
					{
						LOG_INFO("[%s:%u] close old connection", m_conn_ptr->GetClientIP().c_str(), m_conn_ptr->GetClientPort());
						m_conn_ptr->Close();
					}

					m_conn_ptr = std::make_shared<TcpConn>(std::move(socket));
					m_conn_ptr->Start(read_cb, std::bind(&CTcpServer::CloseCallback, this), 600);
				}
				else
				{
					LOG_ERROR("accept error: %s", ec.message().c_str());
				}

				Accept();
			}
		);
	}
}

TcpConn::TcpConn(tcp::socket socket) : m_socket_(std::move(socket))
					, m_client_ip(m_socket_.remote_endpoint().address().to_string())
					, m_client_port(m_socket_.remote_endpoint().port())
					, send_data_("")
					, m_message_cb_(nullptr)
					, m_close_cb_(nullptr)
					, m_t(socket.get_io_context())
					, m_read_timeout(default_read_timeout)
{
	memset(recv_data_, 0, max_length);
	m_socket_.non_blocking();
}

TcpConn::~TcpConn()
{
	LOG_INFO("TcpConn is released");
}

void TcpConn::Start(ReadCallbackFunctor& cb, CloseCallbackFunctor close_cb, std::int32_t read_timeout)
{
	m_read_timeout = read_timeout;
	if (m_read_timeout > 0)
	{
		m_t.expires_from_now(boost::posix_time::seconds(m_read_timeout));
		m_t.async_wait(std::bind(&TcpConn::CheckTimeout, this));
	}

	m_message_cb_ = cb;
	m_close_cb_ = close_cb;
	Read();
}

void TcpConn::Close()
{
	if (m_close_cb_)
	{
		m_close_cb_();
		m_close_cb_ = nullptr;  // 防止重复调用
		m_socket_.close();
		m_t.cancel();
		LOG_ERROR("Connection is closed[%s:%u]", GetClientIP().c_str(), GetClientPort());
	}
}

int TcpConn::WriteData(const char* data, int len)
{
	auto self(shared_from_this());
	std::string msg(data, len);
	m_socket_.get_io_context().post([self, msg]() {
		// m_send_vec_的操作只在主线程中进行
		self->send_data_.append(msg);
	});

	Write();

	return len;
}

void TcpConn::Write()
{
	auto self(shared_from_this());
	boost::asio::async_write(m_socket_, boost::asio::buffer(send_data_.c_str(), send_data_.size()),
		[self](boost::system::error_code ec, std::size_t len)
		{
			if (!ec)
			{
				if (len <= self->send_data_.size())
				{
					self->send_data_ = self->send_data_.substr(len);
				}
				else
				{
					self->send_data_.clear();
					self->send_data_ = "";
				}

				if (self->send_data_.size() > 0)
				{
					self->Write();
				}
			}
			else
			{
				LOG_ERROR("write error: %s", ec.message().c_str());
				if (ec == boost::asio::error::try_again
					|| ec == boost::asio::error::would_block
					|| ec == boost::asio::error::timed_out
					|| ec == boost::asio::error::interrupted)
				{
					// 不处理
				}
				else
				{
					self->Close();
				}
			}
		}
	);
}

void TcpConn::Read()
{
	auto self(shared_from_this());
	m_socket_.async_read_some(boost::asio::buffer(recv_data_, max_length),
		[self](boost::system::error_code ec, std::size_t length)
		{
			// 重置定时器
			if (self->m_read_timeout > 0)
			{
				// printf("重置定时器\n");
				self->m_t.expires_from_now(boost::posix_time::seconds(self->m_read_timeout));
			}

			if (!ec)
			{
				// printf("read %d bytes data: %s\n", length, self->recv_data_);
				if (self->m_message_cb_)
				{
					self->m_message_cb_(self, std::string(self->recv_data_, length));
				}
				self->Read();
			}
			else
			{
				LOG_ERROR("read error: %s", ec.message().c_str());
				if (ec == boost::asio::error::try_again
					|| ec == boost::asio::error::would_block
					|| ec == boost::asio::error::timed_out
					|| ec == boost::asio::error::interrupted)
				{
					// 不处理
				}
				else
				{
					self->Close();
				}
			}

		}
	);
}

void TcpConn::CheckTimeout()
{
	// printf("check time\n");
	auto self(shared_from_this());
	if (!self)
		return;

	if (self->m_t.expires_at() <= boost::asio::deadline_timer::traits_type::now())
	{
		self->Close();    //关闭socket
		self->m_t.expires_at(boost::posix_time::pos_infin);
	}
	else
	{
		self->m_t.async_wait(std::bind(&TcpConn::CheckTimeout, this));
	}
}



 推荐知识

 历史版本

修改日期 修改人 备注
2020-06-04 16:39:44[当前版本] 盖杰 boost使用v1.0

知识分享平台 -V 4.8.7 -wcp