226  
查询码:00000276
c++11 高性能线程安全的队列
作者: 董康康 于 2020年06月30日 发布在分类 / 物联网组 / 第三方代理网关 下,并于 2020年06月30日 编辑

#pragma once
#ifndef K_THREADSAFEQUEUE_H_
#define K_THREADSAFEQUEUE_H_


#include "stdio.h"
#include <condition_variable>
#include <mutex>
#include <queue>
#include <memory>
#include <atomic>

template<class T, class Container = std::queue<T>>
class KThreadSafeQueue 
{
public:
KThreadSafeQueue() :m_bStopFlag(false),m_maxCount(100000)
{

}

~KThreadSafeQueue()
{
Destroy();
printf("\nKThreadSafeQueue object destruct\n");
}


template <class Element>
void Push(Element&& element) 
{
// std::lock_guard<std::mutex> lock(mutex_);
// if (queue_.size()>100000)
// {
// not_empty_cv_.notify_all();
// return;
// }

std::unique_lock<std::mutex> lock(mutex_);

not_full_cv_.wait(lock, [this]() {
return !full() || m_bStopFlag;
});

if (m_bStopFlag)
{
return;
}

queue_.push(std::forward<Element>(element));
not_empty_cv_.notify_all();
}

void WaitAndPop(T& t) 
{
std::unique_lock<std::mutex> lock(mutex_);
not_empty_cv_.wait(lock, [this]() {
return !queue_.empty() || m_bStopFlag;
});

if (m_bStopFlag)
{
return;
}

t = std::move(queue_.front());

queue_.pop();
//printf("count=%d\n",queue_.size());
not_full_cv_.notify_all();
}

std::shared_ptr<T> WaitAndPop() 
{
std::unique_lock<std::mutex> lock(mutex_);
not_empty_cv_.wait(lock, [this]() {
return !queue_.empty() || m_bStopFlag;
});

if (m_bStopFlag)
{
return std::shared_ptr<T>();
}

std::shared_ptr<T> t_ptr = std::make_shared<T>(queue_.front());
queue_.pop();
not_full_cv_.notify_all();
return t_ptr;
}


bool IsEmpty() const
{
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}

bool IsFull() const
{
std::lock_guard<std::mutex> lock(mutex_);
return full();

}



void Destroy()
{
m_bStopFlag = true;
not_empty_cv_.notify_all();
}

protected:
bool full() { return queue_.size() >= m_maxCount; }

private:
KThreadSafeQueue(const KThreadSafeQueue&) = delete;
KThreadSafeQueue& operator=(const KThreadSafeQueue&) = delete;
KThreadSafeQueue(KThreadSafeQueue&&) = delete;
KThreadSafeQueue& operator=(KThreadSafeQueue&&) = delete;

private:
Container queue_;

std::condition_variable not_empty_cv_;


std::condition_variable not_full_cv_;


mutable std::mutex mutex_;

std::atomic_bool m_bStopFlag;

std::atomic_int m_maxCount;

};


#endif


 推荐知识

 历史版本

修改日期 修改人 备注
2020-06-30 19:52:11[当前版本] 董康康 创建版本

知识分享平台 -V 4.8.7 -wcp