Lock与LockFree浅析

1. Memory Consistency和Cache Coherence

Memory Consistency保证的是各条机器指令对内存的影响顺序,类似于数据库的隔离级别。

Sequential Consistency是一种Memory Consistency,保证了各个线程分别以程序中机器指令的顺序影响内存,类似于RC,RR等隔离级别。

而x86默认情况下并不能完全遵从Sequential Consistency。会发生无因果关系的Save Load两条指令乱序成为Load Save引起Memory Reorder,这对程序性能有提升,但是在多线程下却可能造成非预期的结果。而解决这个问题的方式就是Memory Barrier。

Cache Coherence则指的是在多个core对内存访问时,就好像看不见多层的cache一样,保持他们之间的一致性关系,其使用的的是MESI协议。

2. Memory Barrier后面简称MB

MB是一种边界,存在它的地方,上下两条指令的顺序不能被优化。

MB分为两种:一种是编译器级的,一种是CPU级的。编译器级的可以保证生成的代码中不存在乱序的发生,但是在CPU执行时还是会出现指令乱序的情况。CPU级的MB暗含了编译器级MB的语义,同时也防止了CPU执行时的乱序。下面列出两种MB的实现:

#define COMPILER_BARRIER() __asm__ __volatile__("" : : : "memory")

#define CPU_BARRIER() __sync_synchronize()

值得一提的是原子操作指令是暗含CPU级别MB语义的。

3. Acquire和Release语义

Acquire语义可以保证下面的语句不能被乱序到这条语句以上。

Release语义可以保证上面的语句不能被乱序到这条语句以下。

一对Acquire和Release可以组成一组happen-before关系。

可以轻易的发现,加锁和解锁操作为了临界区代码不逃逸就必须分别拥有Acquire和Release语义。

4. spinlock

使用一个原子变量,利用CAS机器指令即可实现一个简单的spinlock,但这样实现的spinlock不能达到先来先拿锁的公平性,共享相同高层cache相邻的core会优先感知内存变化,很有可能造成这两个core交替拿锁饿死其他core。下面列出典型代码:

class SimpleSpinLock {

public:

    SimpleSpinLock() :

            atomic_(0) {

    }

    void lock() {

        while (!(ATOMIC_LOAD(&atomic_) == 0 && CAS(&atomic_, 0, 1))) {

            PAUSE();

        }

    }

    void unlock() {

        COMPILER_BARRIER();

        atomic_ = 0;

    }

private:

    int8_t atomic_;

};

这里在CAS之前做了一个读取探测优化。因为CAS操作无论是否成功都会产生为了保证cache coherence而产生的message,降低性能。这里只有探测成功之后才会进行CAS操作,进而提升性能。

5. ticketlock

为了解决公平性的问题,想了一个办法,类似银行服务,大家进来先去机器拿一个号码然后这个号码自增,服务台按照另一个号码叫号,大家拿号之后全部查看服务台,到了自己就去接受服务,服务完成之后走出服务台,服务台将自己的号码自增继续叫下一个人。

下面列出典型代码:

class TicketSpinLock {

public:

    void lock() {

        uint64_t my_id = FETCH_AND_ADD(&next_id_, 1);

        while (ATOMIC_LOAD(&service_id_) != my_id) {

            PAUSE();

        }

    }

    void unlock() {

        ADD_AND_FETCH(&service_id_, 1);

    }

private:

    uint64_t next_id_;

    uint64_t service_id_;

};

这里解决了公平性的问题。但scale能力较差,因为大家仍然都spin在同一个service_id_变量上,当某个线程释放锁之后,因为service_id_更新,为了保证cache coherence,需要将每个core上service_id_对应的cacheline给invalid掉,然后所有等锁的线程都会通过总线读取新值造成bus traffic,在大多数体系结构中甚至被顺序执行,拿锁时间正比于等锁线程数目。

6. mcslock

mcslock维护了一个等锁线程队列。通过每个等锁线程spin在自己的tls变量上等待通知,因为他们不被多核共享,提升scala能力。

struct MCSNode {

    bool waiting_;

    MCSNode *next_;

};

class MCSSpinLock {

public:

    void lock() {

        MCSNode *tls_node = get_tls_node();

        tls_node->next_ = nullptr;

        MCSNode *prev_gnode = ATOMIC_EXCHANGE(&gnode_, tls_node);

        if (prev_gnode == nullptr) {

            return;

        }

        tls_node->waiting_ = true;

        COMPILER_BARRIER();

        prev_gnode->next_ = tls_node;

        while (ATOMIC_LOAD(&tls_node->waiting_)) {

            PAUSE();

        }

    }

    void unlock() {

        MEM_BARRIER();

        MCSNode *tls_node = get_tls_node();

        if (tls_node->next_==nullptr) {

            if (CAS(&gnode_, tls_node, nullptr)) {

                return;

            }

            while (!ATOMIC_LOAD(&tls_node->next_)) {

                PAUSE();

            }

        }

        tls_node->next_->waiting_ = false;

    }

private:

    MCSNode* gnode_ = nullptr;

    MCSNode *get_tls_node() {

        static __thread MCSNode tls_node;

        return &tls_node;

    }

};

其中gnode_始终指向链表的尾端。

7. rwlock

读写锁,将一个原子变量拆成三部分,加读锁次数,是否加写锁,是否有写锁在等待。是否有写锁在等待是为了防止写者饥饿而设置的。下面是具体实现:

class RWSpinLock {

    struct Atomic {

        union {

            uint64_t v_;

            struct {

                uint64_t r_cnt_ :62;

                uint64_t w_pending_ :1;

                uint64_t w_flag_ :1;

            };

        };

        Atomic() :

                v_(0) {

        }

    };

public:

    void rLock() {

        while (true) {

            Atomic old_v = ATOMIC_LOAD(&atomic_);

            Atomic new_v = old_v;

            new_v.r_cnt_++;

            if (old_v.w_flag_

                    == 0&& old_v.w_pending_ == 0 && CAS(&atomic_.v_, old_v.v_, new_v.v_)) {

                break;

            }

            PAUSE();

        }

    }

    void rUnLock() {

        ADD_AND_FETCH(&atomic_.v_, -1);

    }

    void wLock() {

        while (true) {

            Atomic old_v = ATOMIC_LOAD(&atomic_);

            Atomic new_v = old_v;

            bool pending = false;

            if (old_v.r_cnt_ != 0 || old_v.w_flag_ != 0) {

                new_v.w_pending_ = 1;

                pending = true;

            } else {

                new_v.w_flag_ = 1;

                new_v.w_pending_ = 0;

            }

            if (CAS(&atomic_.v_, old_v.v_, new_v.v_)) {

                if (!pending) {

                    break;

                }

            }

            PAUSE();

        }

    }

    void wUnLock() {

        COMPILER_BARRIER();

        atomic_.w_flag_ = 0;

    }

private:

    Atomic atomic_;

};

加读锁时需要没有写锁加锁并且没有写锁等待,满足则将加读锁次数加1,不满足则循环等待。解读锁时仅仅对加读锁次数减1。加写锁时判断是否有人持有读写锁,如果有则将写者等待设置为真,否则将写锁加锁并且将写者等待设置为假。解写锁时仅仅对标志位设置为假。为了读锁和写锁之间与写锁和写锁之间的公平性可以使用mcslock等spinlock来保证。对于多写的读写锁可以优化为每个线程一个读写锁,读操作CAS操作互不影响,降低bus traffic,拿写锁需要遍历所有线程拿写锁。

8. sequencelock

从上可知,读写锁在读者长期持有锁的情况下还是拿不到锁,为了避免这种情况,我们需要写者在读者持有锁的时候进行抢占,sequencelock就可以解决这个问题。实现如下:

class SequenceLock {

private:

    uint64_t seq_;

    std::mutex mu_;

public:

    SequenceLock() :

            seq_(0) {

    }

    void wLock() {

        mu_.lock();

        seq_++;

        COMPILER_BARRIER();

    }

    void wUnLock() {

        COMPILER_BARRIER();

        seq_++;

        mu_.unlock();

    }

    uint64_t rBegin() {

        uint64_t seq = 0;

        while (true) {

            seq = ATOMIC_LOAD(&seq_);

            if (!(seq & 1)) {

                break;

            }

        }

        return seq;

    }

    bool rSuccess(uint64_t seq) {

        MEM_BARRIER();

        return ATOMIC_LOAD(&seq_) == seq;

    }

};

写者之间通过互斥锁保持互斥,当写者临界区时其sequence号为奇数,否则为偶数。读者根据取到的sequence号的奇偶性决定是否可以进入临界区,在退出临界区时检查当前的sequence号和进入临界区时的sequence号是否一致,如果一致则成功,否则失败,需要整个推翻重来。

9. hazardpointer

为了防止free其他线程正在使用的对象,并避免出现ABA问题,引入了hazardpointer。每一个线程维护一个自己的use_list和retire_list。其中retire_list仅由自己读写,怎么实现都行。use_list被自己读写,被其他线程读取,因此只需要实现不支持删除的一个单写者多读者并发的单链表就可以了。保护对象时调用acquire将其指针放入本线程的use_list,使用完了之后调用release将其指针从本线程的use_list中删除。当对象被逻辑删除本线程没人引用之后调用retire将其指针放入本线程的retire_list。每一个线程负责释放自己retire_list中的对象,当需要释放时调用reclaim,它扫描自己的retire_list中的每一个指针,如果它不出现在其他线程的use_list则释放它对应对象的内存,并将其从retire_list中删除。

template<typename T>
class HazardPointer {
private:
   struct HazardNode {
       T *p_;
       HazardNode *next_;
       HazardNode() :
               p_(nullptr), next_(nullptr) {
       }
       HazardNode(T *p, HazardNode *next) :
               p_(p), next_(next) {
       }
   };
   struct HazardList {
       uint64_t len_;
       uint64_t cap_;
       HazardNode list_;
       HazardList() :
               len_(0), cap_(0) {
       }
   }CACHE_ALIGNED;
public:
   int acquire(T *p, int thread_id);
   int release(T *p, int thread_id);
   int retire(T *p, int thread_id);
   int reclaim(int thread_id);
   int help(HazardList *list, T *p, int thread_id);
private:
   static const uint64_t MAX_THREAD_NUM = 10;
   static const uint64_t MIN_LEN_TO_RECLAIM = 10;
   HazardList use_list_[MAX_THREAD_NUM];
   HazardList retire_list_[MAX_THREAD_NUM];
};
template<typename T>
int HazardPointer<T>::acquire(T *p, int thread_id) {
   return help(use_list_, p, thread_id);
}
template<typename T>
int HazardPointer<T>::release(T *p, int thread_id) {
   for (HazardNode *node = &use_list_[thread_id].list_; node != nullptr; node =
           node->next_) {
       if (node->p_ == p) {
           node->p_ = nullptr;
           use_list_[thread_id].len_--;
           return 0;
       }
   }
   return -1;
}
template<typename T>
int HazardPointer<T>::retire(T *p, int thread_id) {
   return help(retire_list_, p, thread_id);
}
template<typename T>
int HazardPointer<T>::reclaim(int thread_id) {
   if (retire_list_[thread_id].len_ < MIN_LEN_TO_RECLAIM) {
       return 0;
   }
   for (HazardNode *retire_node = &retire_list_[thread_id].list_;
           retire_node != nullptr; retire_node = retire_node->next_) {
       if (retire_node->p_ == nullptr) {
           continue;
       }
       bool found = false;
       for (int other_thread_id = 0; other_thread_id < MAX_THREAD_NUM;
               other_thread_id++) {
           if (thread_id == other_thread_id) {
               continue;
           }
           for (HazardNode *other_use_node = &use_list_[other_thread_id].list_;
                   other_use_node != nullptr;
                   other_use_node = other_use_node->next_) {
               if (other_use_node->p_ == retire_node->p_) {
                   found = true;
                   break;
               }
           }
           if (found) {
               break;
           }
       }
       if (!found) {
           retire_list_[thread_id].len_--;
           delete retire_node->p_;
           retire_node->p_ = nullptr;
       }
   }
   return 0;
}
template<typename T>
int HazardPointer<T>::help(HazardList *list, T *p, int thread_id) {
   if (p == nullptr) {
       return 0;
   }
   bool found = false;
   HazardNode *empty_node = nullptr;
   for (empty_node = &list[thread_id].list_; empty_node != nullptr;
           empty_node = empty_node->next_) {
       if (empty_node->p_ == nullptr) {
           found = true;
           break;
       }
   }
   if (found) {
       empty_node->p_ = p;
       list[thread_id].len_++;
   } else {
       HazardNode *new_node = new HazardNode(p, list[thread_id].list_.next_);
       MEM_BARRIER();
       list[thread_id].list_.next_ = new_node;
       list[thread_id].len_++;
       list[thread_id].cap_++;
   }
   return 0;
}

10. hazardversion

因为要找出哪些指针需要被保护非常增加心智负担,因此引入hazardversion。它提供简便的acquire、release和retire接口,使用起来非常方便。每个线程维护自己的retire_list和version,全局存一个version,此外retire_list中每一个元素除了待释放的指针外也有一个version。当调用acquire时,将全局版本号设置给自己。当调用release时,将自己的版本置为max,并判断自己的retire_list是否太长需要reclaim和所有的retire_list之和是否太长需要reclaim。retire操作将全局版本加1后赋予retire元素,并将其加入retire_list。对于自己的reclaim,首先获得所有线程的最小版本,然后首先将整个retire_list队列CAS到局部,然后在局部遍历删除释放版本号小于等于所有线程最小版本的元素,最后将剩下的元素列表CAS到链表头部。对于全局的reclaim,首先获得所有线程的最小版本,然后遍历所有线程的retire_list,将其CAS到局部,然后在局部遍历删除释放版本号小于等于所有线程最小版本的元素,最后将剩下的元素列表CAS到自己retire_list的链表头部。由于只有retire_list只有自己能添加,其他线程只能将其清空,因此不会出现ABA问题。为什么是小于等于?首先逻辑删除,然后版本号加1赋予retire节点,但是拿到增加之后版本号的是不可能使用retire节点内存的,因为它已经被逻辑删除。

template<typename T>

class HazardVersion {

    struct Node {

        Node *next_;

        T *data_;

        uint64_t version_;

        Node(T *data);

        ~Node();

    };

    struct ThreadStore {

        uint64_t version_;

        uint64_t seq_;

        Node *retire_list_;

        uint64_t retire_list_len_;

        uint16_t tid_;

        bool enabled_;

        uint64_t last_reclaim_version_;

        ThreadStore *next_;

        ThreadStore();

        ~ThreadStore();

        uint64_t acquire(const uint64_t version, const uint16_t tid);

        void release(const uint64_t tid, uint64_t &handle);

        void add_node(uint64_t version, Node *node);

        uint64_t reclaim(const uint64_t version, ThreadStore *node_receiver);

        void add_nodes(Node *head, Node *tail, const int64_t cnt);

    };

public:

    void reclaim(uint16_t tid);

    uint64_t acquire(uint16_t tid);

    void retire(uint16_t tid, T *p);

    void release(uint16_t tid, uint64_t handle);

    HazardVersion() :

            min_num_to_reclaim_(64), threads_list_(nullptr), threads_cnt_(0), min_version_(

                    0), min_version_ts_(0), min_version_cache_interval_(200000), version_(

                    0), retire_cnt_(0) {

    }

private:

    int64_t tv_to_usec(const timeval &tv);

    int64_t now();

    uint64_t get_min_version(bool force);

    ThreadStore* get_thread_store(int16_t tid) {

        if (tid > MAX_THREAD_CNT) {

            assert(false);

        }

        ThreadStore *ts = &threads_[tid];

        if (!ts->enabled_) {

            threads_lock_.lock();

            if (!ts->enabled_) {

                ts->tid_ = tid;

                ts->version_ = UINT64_MAX;

                ts->next_ = threads_list_;

                ATOMIC_STORE(&threads_list_, ts);

                ADD_AND_FETCH(&threads_cnt_, 1);

                ts->enabled_ = true;

            }

            threads_lock_.unlock();

        }

        return ts;

    }

    const static uint16_t MAX_THREAD_CNT = 10;

    uint64_t min_num_to_reclaim_;

    SimpleSpinLock threads_lock_;

    ThreadStore threads_[MAX_THREAD_CNT];

    ThreadStore *threads_list_;

    uint64_t threads_cnt_;

    uint64_t min_version_;

    int64_t min_version_ts_;

    int64_t min_version_cache_interval_;

    uint64_t version_;

    uint64_t retire_cnt_;

};

template<typename T>

HazardVersion<T>::Node::Node(T *data) :

        next_(nullptr), version_(UINT64_MAX), data_(data) {

}

template<typename T>

HazardVersion<T>::Node::~Node() {

    delete data_;

}

template<typename T>

HazardVersion<T>::ThreadStore::ThreadStore() :

        enabled_(false), tid_(0), last_reclaim_version_(0), seq_(0), version_(

                0), retire_list_(nullptr), retire_list_len_(0), next_(nullptr) {

}

template<typename T>

HazardVersion<T>::ThreadStore::~ThreadStore() {

    while (retire_list_ != nullptr) {

        Node *retire_node = retire_list_;

        retire_list_ = retire_list_->next_;

        delete retire_node;

    }

}

template<typename T>

uint64_t HazardVersion<T>::ThreadStore::acquire(const uint64_t version,

        const uint16_t tid) {

    if (version_ != UINT64_MAX || tid_ != tid) {

        assert(false);

    }

    version_ = version;

    return seq_;

}

template<typename T>

void HazardVersion<T>::ThreadStore::release(const uint64_t tid,

        uint64_t &handle) {

    if (seq_ != handle || tid != tid_) {

        assert(false);

    }

    version_ = UINT64_MAX;

    seq_++;

}

template<typename T>

void HazardVersion<T>::ThreadStore::add_nodes(Node *head, Node *tail,

        const int64_t cnt) {

    if (cnt > 0) {

        Node *curr = ATOMIC_LOAD(&retire_list_);

        Node *old = curr;

        tail->next_ = old;

        while (old != (curr = CAS_RET_VAL(&retire_list_, old, head))) {

            old = curr;

            tail->next_ = old;

        }

        ADD_AND_FETCH(&retire_list_len_, cnt);

    }

}

template<typename T>

void HazardVersion<T>::ThreadStore::add_node(uint64_t version, Node *node) {

    node->version_ = version;

    add_nodes(node, node, 1);

}

template<typename T>

uint64_t HazardVersion<T>::ThreadStore::reclaim(const uint64_t version,

        ThreadStore *node_receiver) {

    if (last_reclaim_version_ == version) {

        return 0;

    }

    last_reclaim_version_ = version;

    Node *curr = ATOMIC_LOAD(&retire_list_);

    Node *old = curr;

    while (old != (curr = CAS_RET_VAL(&retire_list_, old, nullptr))) {

        old = curr;

    }

    Node *list2free = nullptr;

    uint64_t move_cnt = 0;

    uint64_t free_cnt = 0;

    Node dummy(nullptr);

    dummy.next_ = curr;

    Node *iter = &dummy;

    while (iter->next_ != nullptr) {

        if (iter->next_->version_ <= version) {

            free_cnt++;

            Node *node2free = iter->next_;

            iter->next_ = iter->next_->next_;

            node2free->next_ = list2free;

            list2free = node2free;

        } else {

            move_cnt++;

            iter = iter->next_;

        }

    }

    node_receiver->add_nodes(dummy.next_, iter, move_cnt);

    ADD_AND_FETCH(&retire_list_len_, -(move_cnt + free_cnt));

    while (list2free != nullptr) {

        Node *node2free = list2free;

        list2free = list2free->next_;

        delete node2free;

    }

    return free_cnt;

}

template<typename T>

int64_t HazardVersion<T>::tv_to_usec(const timeval &tv) {

        return (((int64_t) tv.tv_sec) * 1000000 + (int64_t) tv.tv_usec);

    }

template<typename T>

int64_t HazardVersion<T>::now() {

    struct timeval tv;

    gettimeofday(&tv, nullptr);

    return tv_to_usec(tv);

}

template<typename T>

uint64_t HazardVersion<T>::get_min_version(bool force) {

    uint64_t ret = 0;

    int64_t now_ts = now();

    if (!force && (ret = ATOMIC_LOAD(&min_version_)) != 0

            && ATOMIC_LOAD(&min_version_ts_) + min_version_cache_interval_

                    > now_ts) {

        return ret;

    }

    ret = ATOMIC_LOAD(&version_);

    ThreadStore *iter = ATOMIC_LOAD(&threads_list_);

    while (iter != nullptr) {

        uint64_t ts_version = iter->version_;

        if (ret > ts_version) {

            ret = ts_version;

        }

        iter = iter->next_;

    }

    ATOMIC_STORE(&min_version_, ret);

    ATOMIC_STORE(&min_version_ts_, now_ts);

    return ret;

}

template<typename T>

void HazardVersion<T>::reclaim(uint16_t tid) {

    ThreadStore *ts = get_thread_store(tid);

    uint64_t min_version = get_min_version(true);

    int64_t reclaim_cnt = ts->reclaim(min_version, ts);

    ADD_AND_FETCH(&retire_cnt_, -reclaim_cnt);

    ThreadStore *iter = ATOMIC_LOAD(&threads_list_);

    while (iter != nullptr) {

        if (iter != ts) {

            reclaim_cnt = iter->reclaim(min_version, ts);

            ADD_AND_FETCH(&retire_cnt_, -reclaim_cnt);

        }

        iter = iter->next_;

    }

}

template<typename T>

uint64_t HazardVersion<T>::acquire(uint16_t tid) {

    ThreadStore *ts = get_thread_store(tid);

    uint64_t version = ATOMIC_LOAD(&version_);

    uint64_t handle = ts->acquire(version, tid);

    return handle;

}

template<typename T>

void HazardVersion<T>::retire(uint16_t tid, T *p) {

    Node *node = new Node(p);

    ThreadStore *ts = get_thread_store(tid);

    ts->add_node(ADD_AND_FETCH(&version_, 1), node);

    ADD_AND_FETCH(&retire_cnt_, 1);

}

template<typename T>

void HazardVersion<T>::release(uint16_t tid, uint64_t handle) {

    if (tid > MAX_THREAD_CNT) {

        assert(false);

    }

    ThreadStore *ts = get_thread_store(tid);

    ts->release(tid, handle);

    if (ts->retire_list_len_ > min_num_to_reclaim_) {

        uint64_t min_version = get_min_version(false);

        uint64_t reclaim_cnt = ts->reclaim(min_version, ts);

        ADD_AND_FETCH(&retire_cnt_, -reclaim_cnt);

    } else if (ATOMIC_LOAD(&retire_cnt_)

            > ATOMIC_LOAD(&threads_cnt_) * min_num_to_reclaim_) {

        reclaim(tid);

    }

}

10. lock free queue

插入队列操作:首先拿到全局tail,然后拿到局部tail的next。判断局部tail是否等于全局tail,如果不等则继续循环,如果等说明next是全局tail的next,即有效的,其实这步可有可无。如果next不为空说明局部tail不再是最后一个元素,尝试前进全局tail并继续循环。尝试以null和新元素CAS局部tail的next,尝试前进全局tail并跳出循环。

移出队列操作:首先拿到全局的head和全局的tail,然后拿到局部head的next。判断局部head是否等于全局head,如果不等则继续循环,如果等说明next是全局head的next,即有效的,其实这步可有可无。如果next为空说明队列为空,返回空值。判断head是否等于tail,如果是说明队列不为空但是tail指针没能及时前进,尝试前进全局tail并继续循环。拿到next的数据,并尝试前进head,如果失败则继续循环,如果成功则返回取到的数据。

template<typename T>

class ConcurrentQueueWithVersion {

private:

    struct Item {

        T data_;

        Item *volatile next_;

        Item(): next_(nullptr){}

        Item(T data, Item *next) : data_(data),

                next_(next) {

        }

    };

    Item * volatile head_;

    Item * volatile tail_;

    Item dummy_item_;

    HazardVersion<Item> hv_;

public:

    ConcurrentQueueWithVersion() {

        head_ = &dummy_item_;

        tail_ = &dummy_item_;

    }

    void enqueue(const T element, int thread_id) {

        Item *item = new Item(element, nullptr);

        uint64_t handle = hv_.acquire(thread_id);

        while (true) {

            Item *tail = tail_;

            Item *next = tail->next_;

            if (tail != tail_) {

                continue;

            }

            if (next != nullptr) {

                CAS(&tail_, tail, next);

                continue;

            }

            if (CAS(&tail->next_, nullptr, item)) {

                CAS(&tail_, tail, item);

                break;

            }

        }

        hv_.release(thread_id, handle);

    }

    T dequeue(int thread_id) {

        Item *head = nullptr;

        T data;

        uint64_t handle = hv_.acquire(thread_id);

        while (true) {

            head = head_;

            Item *tail = tail_;

            Item *next = head->next_;

            if (head != head_) {

                continue;

            }

            if (next == nullptr) {

                hv_.release(thread_id, handle);

                return T();

            }

            if (head == tail) {

                CAS(&tail_, tail, next);

                continue;

            }

            data = next->data_;

            if (CAS(&head_, head, next)) {

                break;

            }

        }

        hv_.release(thread_id, handle);

        if (head != &dummy_item_) {

            hv_.retire(thread_id, head);

        }

        return data;

    }

};