template<typename T>
class Channel {
public:
Channel() = delete;
Channel(const Channel&) = delete;
Channel(Channel&&) = delete;
Channel& operator = (const Channel&) = delete;
Channel& operator = (Channel&&) = delete;
Channel(size_t capacity) : _val(capacity),
_used_capacity(0),
_read_idx(0),
_write_idx(0),
_is_closed(false) {}
bool write(const T& w_value) {
return write(1, &w_value);
}
bool write(size_t write_size, const T* write_val) {
std::unique_lock<std::mutex> lck(_mtx);
_write_cond.wait(lck, [this, &write_size]() {
return this->_val.size() - this->_used_capacity >= write_size
|| _is_closed == true;
});
if (_is_closed == true) {
return false;
}
_used_capacity += write_size;
for (size_t i = 0; i < write_size; ++i) {
_val[_write_idx] = write_val[i];
advance_write_idx();
}
_read_cond.notify_one();
return true;
}
bool read(T* read_val) {
return read(1, read_val);
}
bool read(size_t read_size, T* read_val) {
std::unique_lock<std::mutex> lck(_mtx);
_read_cond.wait(lck, [this, &read_size]() {
return read_size <= _used_capacity || _is_closed == true;
});
if (_is_closed == true && read_size > _used_capacity) {
return false;
}
_used_capacity -= read_size;
for (size_t i = 0; i < read_size; ++i) {
read_val[i] = _val[_read_idx];
advance_read_idx();
}
_write_cond.notify_one();
return true;
}
void close() {
std::unique_lock<std::mutex> lck(_mtx);
_is_closed = true;
_write_cond.notify_all();
_read_cond.notify_all();
}
private:
void advance(size_t* idx) {
*idx = (*idx + 1) % _val.size();
}
void advance_write_idx() {
advance(&_write_idx);
}
void advance_read_idx() {
advance(&_read_idx);
}
std::vector<T> _val;
std::mutex _mtx;
std::condition_variable _write_cond;
std::condition_variable _read_cond;
size_t _used_capacity;
size_t _read_idx;
size_t _write_idx;
bool _is_closed;
};
Content