FAIRYFAR-INTERNAL
 
  FAIRYFAR-INTERNAL  |  SITEMAP  |  ABOUT-ME  |  HOME  
ClickHouse内存管理之MemoryTracker

本文适用于ClickHouse 20.10.*

一、分级(level)内存统计

MemoryTracker

MemoryTracker用于统计相应对象的内存当前使用量(amount)、峰值(peak)、大小限制(limit)。

snippet.cpp
class MemoryTracker
{
private:
    std::atomic<Int64> amount {0};
    std::atomic<Int64> peak {0};
    std::atomic<Int64> hard_limit {0};
    std::atomic<Int64> profiler_limit {0};
 
    /// 以下用于故障注入测试
    double fault_probability = 0;
    double sample_probability = 0;
 
    /// 父级
    std::atomic<MemoryTracker *> parent {};
……

分级定义

分层(级)内存统计与控制,分级定义:

snippet.cpp
enum class VariableContext
{
    Global = 0,
    User,           /// Group of processes
    Process,        /// For example, a query or a merge
    Thread,         /// A thread of a process
    Snapshot        /// Does not belong to anybody
};

全局MemoryTracker

统计和限制整个ClickHouse Server进程内存使用量。

snippet.cpp
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);

树形分级结构

多个MemoryTracker对象,分别统计不同层级的内存,组成一个树形结构,树的root是全局的MemoryTracker(total_memory_tracker)。

  • Global:全局MemoryTracker的level是Global。
  • User:用户有多个并发查询,每个用户有一个level是“User”的MemoryTracker。
  • Process:通常一个查询的MemoryTracker的level是“Process”。
  • Thread:查询算子可以有多个计算线程,每个线程有自己的MemoryTracker。

img

alloc和free时,子级会向上一级传递内存分配与释放统计信息:

snippet.cpp
void MemoryTracker::alloc(Int64 size)
{
……
    if (auto * loaded_next = parent.load(std::memory_order_relaxed))
        loaded_next->alloc(size);
 
//-------------------------------
void MemoryTracker::free(Int64 size)
{
……
    if (auto * loaded_next = parent.load(std::memory_order_relaxed))
        loaded_next->free(size);

二、线程的MemoryTracker

线程上下文

查询算子计算线程都带了一个线程上下文数据结构——ThreadStatus。

snippet.cpp
extern thread_local ThreadStatus * current_thread;
 
class ThreadStatus : public boost::noncopyable
{
public:
    /// Linux's PID (or TGID) (the same id is shown by ps util)
    const UInt64 thread_id = 0;
 
    MemoryTracker memory_tracker{VariableContext::Thread};
 
    /// Small amount of untracked memory (per thread atomic-less counter)
    Int64 untracked_memory = 0;
    /// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
    Int64 untracked_memory_limit = 4 * 1024 * 1024;
 
protected:
    ThreadGroupStatusPtr thread_group;
……

其中,

  • memory_tracker:负责内存统计与限制。
  • thread_group:该ThreadStatus所属的线程组,通常一个查询的多个计算线程成为一个线程组,对应“Process”层级。

untracked_memory

为了避免小数据量alloc和free频繁进行统计影响性能(主要是防止上级MemoryTracker锁争抢),ThreadStatus类设置成员变量untracked_memory和untracked_memory_limit,目的是,当前线程先进行计数,当计数值达到untracked_memory_limit限制(默认4MB),才会将统计值更新到MemoryTracker。

因此,当我们通过MemoryTracker::get()方法获得的内存统计值时,(每个线程)会有4MB的误差。

代码片段如下:

snippet.cpp
namespace CurrentMemoryTracker
{
    using DB::current_thread;
 
    void alloc(Int64 size)
    {
        if (auto * memory_tracker = DB::CurrentThread::getMemoryTracker())
        {
            current_thread->untracked_memory += size;
            if (current_thread->untracked_memory > current_thread->untracked_memory_limit)
            {
                /// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
                /// more. It could be useful to enlarge Exception message in rethrow logic.
                Int64 tmp = current_thread->untracked_memory;
                current_thread->untracked_memory = 0;
                memory_tracker->alloc(tmp);
            }
        }
    }
 
    void free(Int64 size)
    {
        if (auto * memory_tracker = DB::CurrentThread::getMemoryTracker())
        {
            current_thread->untracked_memory -= size;
            if (current_thread->untracked_memory < -current_thread->untracked_memory_limit)
            {
                memory_tracker->free(-current_thread->untracked_memory);
                current_thread->untracked_memory = 0;
            }
        }
    }
}

三、算子内存统计

从内存统计结构看,MemoryTracker没有统计单个算子内存,以聚集算子(Aggregator)为例:

snippet.cpp
bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedDataVariants & result,
    ColumnRawPtrs & key_columns, AggregateColumns & aggregate_columns, bool & no_more_keys)
{
  ……
    size_t result_size = result.sizeWithoutOverflowRow();
    Int64 current_memory_usage = 0;
    if (auto * memory_tracker_child = CurrentThread::getMemoryTracker())  // 当前线程的MemoryTracker
        if (auto * memory_tracker = memory_tracker_child->getParent())  // 父是Process级MemoryTracker
            current_memory_usage = memory_tracker->get();
 
    /// Here all the results in the sum are taken into account, from different threads.
    auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation;

可见,这里的current_memory_usage是当前查询的内存MemoryTracker。

通过,一个测试用例,也可证明。

用例数据为TPC-H 1s数据库,SQL及其查询计划如下:

snippet.sql
EXPLAIN SELECT MAX(cnt) FROM (SELECT COUNT(*) AS cnt FROM LINEITEM GROUP BY L_ORDERKEY);
┌─EXPLAIN───────────────────────────────────────────────┐
│ Expression (Projection)                               │
│   Expression (BEFORE ORDER BY AND SELECT)             │
│     Aggregating                                       │
│       Expression (BEFORE GROUP BY)                    │
│         Expression (Projection)                       │
│           Expression (BEFORE ORDER BY AND SELECT)     │
│             Aggregating                               │
│               Expression (BEFORE GROUP BY)            │
│                 ReadFromStorage (READ FROM MergeTree) │
└───────────────────────────────────────────────────────┘

该查询有两层聚集计算,内层聚集结果为1500000行,而外层的聚集结果为1行。

通过gdb调试clickhouse-server进程,可以知道,内层聚集current_memory_usage大小约76MB,而外层聚集current_memory_usage大小也约为76MB。实际上外层聚集算子需要内存极少。可见,这里的内存统计不是算子级的。



打赏作者以资鼓励: