thread_local_value实现原理

多线程
Author

0warning0error

Published

May 16, 2025

今天无意看到了yalantinglibs,雅兰亭库,名字很优雅,也很强大,是阿里开源的一个现代C++基础工具库的集合。看到里面的实现尤为惊天。其中的thread_local_value类打开了我新世界的大门。

下面是thread_local_value的实现细节(经过简单的修改,可以以C++11标准编译)。

#pragma once
#include <atomic>
#include <vector>
#include <thread>

template <typename value_type>
class thread_local_value {
    using Wrapped = struct alignas(64)  {
        std::atomic<value_type> inner;
    };
 public:
  explicit thread_local_value(uint32_t dupli_count = std::thread::hardware_concurrency())
      : duplicates_(dupli_count) { 
    for(auto & a : duplicates_) {
      a.inner.store(0);
    }
  }

  ~thread_local_value() {}

  thread_local_value(const thread_local_value& other)
      : duplicates_(other.duplicates_.size()) {
    for (size_t i = 0; i < other.duplicates_.size(); i++) {
        duplicates_[i].inner.store(other.duplicates[i].load());
    }
  }

  thread_local_value& operator=(const thread_local_value& other) {
    for (size_t i = 0; i < other.duplicates_.size(); i++) {
      duplicates_[i].inner.store(other.duplicates[i].load());
    }
    return *this;
  }

  thread_local_value(thread_local_value&& other) noexcept
      : duplicates_(std::move(other.duplicates_)) {}

  thread_local_value& operator=(thread_local_value&& other) noexcept {
    duplicates_ = std::move(other.duplicates_);
    return *this;
  }

  void inc(value_type value = 1) {
    auto& local = local_value();
    local.fetch_add(value, std::memory_order_relaxed);
  }

  void dec(value_type value = 1) {
    auto& local = local_value();
    local.fetch_sub(value, std::memory_order_relaxed);
  }

  value_type update(value_type value = 1) {
    value_type val = get_value(0).exchange(value, std::memory_order_relaxed);
    for (size_t i = 1; i < duplicates_.size(); i++) {
      val += duplicates_[i].inner.exchange(0, std::memory_order_relaxed);
    }
    return val;
  }

  value_type reset() { return update(0); }

  std::atomic<value_type> & local_value() {
    auto index = get_round_index(duplicates_.size());
    return get_value(index);
  }

  std::atomic<value_type> & get_value(size_t index) {
    return duplicates_[index].inner;
  }

  value_type value() const {
    value_type val = 0;
    for (auto& t : duplicates_) {
      val += t.inner.load();
    }
    return val;
  }

 private:
  std::vector<Wrapped> duplicates_;

  static uint32_t get_round_index(uint32_t size) {
    static std::atomic<uint32_t> round{0};
    static thread_local uint32_t index = round++;
    return index % size;
  }
};

为什么需要设计这样的东西,或者说什么场景下需要这样的东西。

传统的全局计数器往往是使用原子变量,多线程对这个原子变量进行加一操作,虽然可以保证原子操作,但是频繁的读取写入,导致硬件同步与缓存一致性开销​。 thread_local_value 将单个原子变量拆分为多个原子变量,借鉴分片的思想,线程各自修改不同的原子变量,分散共享压力,需要汇总时最后合并结果,减少直接竞争。 比较适合写多读少的场景。

同时,为了避免多线程下的伪共享问题,在模板内部设计了结构体,强制使用64字节对齐(假设缓存行最大为64个字节),这样不同CPU的缓存行存储的不是同一个原子变量,避免了伪共享的问题,从而加速多线程下的并发。