Thread-safe Channel implemented with C++11

Jul 6, 2016


template<typename T>
class Channel {
public:
    Channel() = delete;
    Channel(const Channel&) = delete;
    Channel(Channel&&) = delete;
    Channel& operator = (const Channel&) = delete;
    Channel& operator = (Channel&&) = delete;

    Channel(size_t capacity) : _val(capacity),
                               _used_capacity(0),
                               _read_idx(0),
                               _write_idx(0),
                               _is_closed(false) {}

    bool write(const T& w_value) {
        return write(1, &w_value);
    }

    bool write(size_t write_size, const T* write_val) {
        std::unique_lock<std::mutex> lck(_mtx);
        _write_cond.wait(lck, [this, &write_size]() {
                return this->_val.size() - this->_used_capacity >= write_size
                        || _is_closed == true;
        });
        if (_is_closed == true) {
            return false;
        }
        _used_capacity += write_size;
        for (size_t i = 0; i < write_size; ++i) {
            _val[_write_idx] = write_val[i];
            advance_write_idx();
        }
        _read_cond.notify_one();
        return true;
    }

    bool read(T* read_val) {
        return read(1, read_val);
    }

    bool read(size_t read_size, T* read_val) {
        std::unique_lock<std::mutex> lck(_mtx);
        _read_cond.wait(lck, [this, &read_size]() {
                return read_size <= _used_capacity || _is_closed == true;
        });
        if (_is_closed == true && read_size > _used_capacity) {
            return false;
        }
        _used_capacity -= read_size;
        for (size_t i = 0; i < read_size; ++i) {
            read_val[i] = _val[_read_idx];
            advance_read_idx();
        }
        _write_cond.notify_one();
        return true;
    }

    void close() {
        std::unique_lock<std::mutex> lck(_mtx);
        _is_closed = true;
        _write_cond.notify_all();
        _read_cond.notify_all();
    }

private:
    void advance(size_t* idx) {
        *idx = (*idx + 1) % _val.size();
    }
    void advance_write_idx() {
        advance(&_write_idx);
    }
    void advance_read_idx() {
        advance(&_read_idx);
    }

    std::vector<T> _val;
    std::mutex _mtx;
    std::condition_variable _write_cond;
    std::condition_variable _read_cond;
    size_t _used_capacity;
    size_t _read_idx;
    size_t _write_idx;
    bool _is_closed;

};