转自:https://zhuanlan.zhihu.com/p/112003245
注解:
并行查询是 PostgreSQL 的一个大特性,对于单一 SQL 查询,从传统单一进程,提升到多个进程协同完成。
并行查询使用了更多的 cpu/mem/io 资源,提高了任务的执行效率,在cpu=8以内执行效率线性增长。
并行查询设计的难点是需要协调多个worker进程间的同步:
本文主要讲述并行查询中最复杂的功能:paralle hash join。先从单进程的hashjoin逐步过渡到多进程并行的hashjoin。
从9.6和10版本就开始支持并行join,但是只支持outer table的并行;11版本支持outer和inner都并行执行;
SELECT COUNT(*) FROM lineitem JOIN orders ON l_orderkey = o_orderkey WHERE o_totalprice > 5.00;
Finalize Aggregate -> Gather Workers Planned: 2 -> Partial Aggregate -> Hash Join Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem -> Hash -> Seq Scan on orders Filter: (o_totalprice > 5.00)
过程是:
Finalize Aggregate -> Gather Workers Planned: 2 -> Partial Aggregate -> Parallel Hash Join Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) -> Parallel Seq Scan on lineitem -> Parallel Hash -> Parallel Seq Scan on orders Filter: (o_totalprice > 5.00)
过程是:
在PG10中,每个worker进程都各自在自己的私有内存中建立inner表的hash,然后并发的扫描outer表,并执行join。
下图是3个worker并发执行的时间线。
如前面所说,每个worker完整的执行inner plan,在各自的私有内存build hash table,outer table是并行的扫描,每个worker扫描一部分,并执行join,这样所有worker的结果集的并集就是最终的结果集。
只对outer plan进行了并行化。该方法适用于inner table很小的时候。它的问题是:
阿姆达尔定律指出:对系统中的一个子系统优化改进,受限于可使用该优化方法的时间占比。说白了,顶了天了优化的效果最多是把该被优化的子系统降到接近0。
但是该定律的重要性是说明了:收益递减。要把一个子系统优化从80%优化到40%付出的代价是变多的。
上图中,worker2执行的比较慢,总的执行时间可能取决于worker2,但是也可能worker1和worker3在执行outer plan进行并行的join时适当的多干些活。
平均总的执行时间 = inner plan + (outer plan / worker_num)
该方案是Posrgres11中的并行hashjoin算法。
inner plan也被多个worker并行执行,在共享内存中进程安全的build出一个共享的hash table。
注意点是:在进行join之前,要等待所有的worker都把inner plan执行完成,也就是要build出一份完成的hash table。因此图中的虚线是所有worker进程的barrier点,只有等所有的进程都达到了虚线,才能继续下面的步骤。
这是通过PG的barrier机制实现。
在build hash table阶段,worker2执行的慢,该阶段的执行速度也不一定取决于最慢的worker,同理,worker1和worker3可能会多干些活。但是一定要满足在barrier处等待所有的woker都完成build阶段; 平均总的执行时间 = (inner plan + outer plan) / worker_num
从中可以看出,Postgres社区对并行执行方案的演进路径:现在PG10版本并行化了outer plan,稳定后,再在PG11版本并行化inner plan。
不论是并行的hashjoin还是单进程的hash join都要解决inner table在内存装不下的问题。
解决方法是:
a) 对属于batch-0的tuple,直接在内存中构建hash table,和上面讲的单个batch相同;
b) 对不属于batch-0的tuple,写入相应batch的inner tuple文件;
每个worker在扫描inner table时做2件事情:
a)对属于batch-0的tuple,直接在内存中构建hash table,和上面讲的单个batch相同;
b)对不属于batch-0的tuple,写入相应batch的inner tuple文件;
为了避免多个worker同时往一个batch写文件加锁的问题,每个batch给每个worker都分配一个inner文件和outer文件。
对于batch-5:
attach的细节:先对batch5进行处理的worker3可能build完hash table,也可能没有。因此内部维护了一个简单的状态机:
在hash join中关键数据结构是HashJoinState和HashState。
HashJoinState对应的exec函数是ExecHashJoinImpl负责维护驱动整个join的过程:
HashState的exec函数是MultiExecParallelHash,负责并行的build inner表。
PostgreSQL是多进程的模型,进程间通过share memory来通信。PG在启动时申请一大片固定的share memory,在这片内存上分配大概50多个结构体以及buffer。每当有新连接进来时就fork一个新的进程,这个进程和main进程共享这些内存。
基于多进程共享内存的编程范式:
对于并行查询,多个worker间需要通信,但是需要的内存量是不确定的。PG提供动态的创建共享内存机制(dsm)给并行查询分配内存。
并行查询的leader进程负责创建dsm,follower进程attach上去,attach动作是为了生成一个私有的dsm描述符,并把dsm中的refcnt加1。
dsm机制是通过系统调用(linux平台是mmap)来分配出来的内存,不支持小内存分配;而且如果在dsm上分配几个有关联关系的的小对象是有问题的,
比如:
struct A { B *b; };
dsm在每个进程上的地址空间是不同,指针b的地址是创建该dsm的进程所看到的地址,为了解决这两个问题,提供dsa机制:
一个batch的构成:
在插入bucket时,通过对buckets[bucketno]的head进行CAS(Compare-and-Swap),实现线程安全的hash insert。
for (;;) { tuple->next.shared = dsa_pointer_atomic_read(head); if (dsa_pointer_atomic_compare_exchange(head, &tuple->next.shared, tuple_shared)) break; }
该图是不精确的,worker1和worker2都应该attach到batch-0上:一边扫描一遍插入batch-0的内存,其他的batch则是写入文件,这样总的内存使用量就是一个bath了,因为batch的引入本来就是为了解决内存不足的问题。
a)给这个batch在dsa上分配bucket(batch0除外);
b)等待所有woker完成分配内存;
c)把该batch上的inner文件中的tuple加载该batch的hashtable中;
d) 选定一个batch开始处理,其他的batch可能还没有开始处理:
外连接,需要填充所有的outer,因此需要回到OUTER
内连接,需要填充所有的inner,因此需要回到NEW_BATCH;
并行的关键是:每次一个worker处理完一个batch后,都回到HJNEEDNEW_BATCH去领取下一个batch,如果当前只剩下一个batch了,那么所有的workerattach上去并行的处理。
PostgreSQL的并行hashjoin的演进,体现了复杂feature的迭代过程。多个worker进程通过共享内存‘争抢’一些工作量来处理,‘能者多劳’处理快的worker不会因为得不到任务而处于idle的状态。
并行查询需要高改造10来个算子,使得这些算子感知多进程之间通信,可想代码量是很大的。
可以考虑使用类似gp的motion也许会有意外的收获,只不过gp中节点间是通过interconnect通信,这里通过共享内存通信。