#pragma once
#ifndef K_THREADSAFEQUEUE_H_
#define K_THREADSAFEQUEUE_H_
#include "stdio.h"
#include <condition_variable>
#include <mutex>
#include <queue>
#include <memory>
#include <atomic>
template<class T, class Container = std::queue<T>>
class KThreadSafeQueue
{
public:
KThreadSafeQueue() :m_bStopFlag(false),m_maxCount(100000)
{
}
~KThreadSafeQueue()
{
Destroy();
printf("\nKThreadSafeQueue object destruct\n");
}
template <class Element>
void Push(Element&& element)
{
//
std::lock_guard<std::mutex> lock(mutex_);
//
if (queue_.size()>100000)
//
{
//
not_empty_cv_.notify_all();
//
return;
//
}
std::unique_lock<std::mutex> lock(mutex_);
not_full_cv_.wait(lock, [this]() {
return !full() || m_bStopFlag;
});
if (m_bStopFlag)
{
return;
}
queue_.push(std::forward<Element>(element));
not_empty_cv_.notify_all();
}
void WaitAndPop(T& t)
{
std::unique_lock<std::mutex> lock(mutex_);
not_empty_cv_.wait(lock, [this]() {
return !queue_.empty() || m_bStopFlag;
});
if (m_bStopFlag)
{
return;
}
t = std::move(queue_.front());
queue_.pop();
//printf("count=%d\n",queue_.size());
not_full_cv_.notify_all();
}
std::shared_ptr<T> WaitAndPop()
{
std::unique_lock<std::mutex> lock(mutex_);
not_empty_cv_.wait(lock, [this]() {
return !queue_.empty() || m_bStopFlag;
});
if (m_bStopFlag)
{
return std::shared_ptr<T>();
}
std::shared_ptr<T> t_ptr = std::make_shared<T>(queue_.front());
queue_.pop();
not_full_cv_.notify_all();
return t_ptr;
}
bool IsEmpty() const
{
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
bool IsFull() const
{
std::lock_guard<std::mutex> lock(mutex_);
return full();
}
void Destroy()
{
m_bStopFlag = true;
not_empty_cv_.notify_all();
}
protected:
bool full() { return queue_.size() >= m_maxCount; }
private:
KThreadSafeQueue(const KThreadSafeQueue&) = delete;
KThreadSafeQueue& operator=(const KThreadSafeQueue&) = delete;
KThreadSafeQueue(KThreadSafeQueue&&) = delete;
KThreadSafeQueue& operator=(KThreadSafeQueue&&) = delete;
private:
Container queue_;
std::condition_variable not_empty_cv_;
std::condition_variable not_full_cv_;
mutable std::mutex mutex_;
std::atomic_bool m_bStopFlag;
std::atomic_int m_maxCount;
};
#endif