本文适用于PG 13.2。
聚集算子有4种算法:
// src\include\nodes\nodes.h: /* * AggStrategy - * overall execution strategies for Agg plan nodes * * This is needed in both pathnodes.h and plannodes.h, so put it here... */ typedef enum AggStrategy { AGG_PLAIN, /* simple agg across all input rows */ AGG_SORTED, /* grouped agg, input must be sorted,适用于下层算子输出有序的情况。 */ AGG_HASHED, /* grouped agg, use internal hashtable */ AGG_MIXED /* grouped agg, hash and sort both used */ } AggStrategy;
group的计划path生成函数:
// src\backend\optimizer\plan\planner.c: static void grouping_planner(PlannerInfo *root, bool inheritance_update, double tuple_fraction);
相关函数:
create_ordinary_grouping_paths add_paths_to_grouping_rel
聚集算子执行器核心代码在:
src\backend\executor\nodeAgg.c
运行期聚集节点初始化:
/* ----------------- * ExecInitAgg * * Creates the run-time information for the agg node produced by the * planner and initializes its outer subtree. * * ----------------- */ AggState * ExecInitAgg(Agg *node, EState *estate, int eflags);
聚集内存限制评估:
/* * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the * number of partitions we expect to create (if we do spill). * * There are two limits: a memory limit, and also an ngroups limit. The * ngroups limit becomes important when we expect transition values to grow * substantially larger than the initial value. */ void hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, Size *mem_limit, uint64 *ngroups_limit, int *num_partitions);
如果需要溢出(执行前根据评估的输入行数做的预估。执行过程中,实际使用内存超出算子限制时,也会执行以下评估),则评估溢出的分片数:
/* * Determine the number of partitions to create when spilling, which will * always be a power of two. If log2_npartitions is non-NULL, set * *log2_npartitions to the log2() of the number of partitions. */ static int hash_choose_num_partitions(double input_groups, double hashentrysize, int used_bits, int *log2_npartitions);
执行过程中判定是否进入溢出模式:
/* * hash_agg_check_limits * * After adding a new group to the hash table, check whether we need to enter * spill mode. Allocations may happen without adding new groups (for instance, * if the transition state size grows), so this check is imperfect. */ static void hash_agg_check_limits(AggState *aggstate);
聚集算子执行器入口函数:
// src\backend\executor\nodeAgg.c: static TupleTableSlot * ExecAgg(PlanState *pstate) { AggState *node = castNode(AggState, pstate); TupleTableSlot *result = NULL; CHECK_FOR_INTERRUPTS(); if (!node->agg_done) { /* Dispatch based on strategy */ switch (node->phase->aggstrategy) { case AGG_HASHED: if (!node->table_filled) agg_fill_hash_table(node); /* FALLTHROUGH */ case AGG_MIXED: result = agg_retrieve_hash_table(node); break; case AGG_PLAIN: case AGG_SORTED: result = agg_retrieve_direct(node); break; } if (!TupIsNull(result)) return result; } return NULL; }
如果是Hash聚集,则建Hash表:
/* * ExecAgg for hashed case: retrieving groups from hash table * * After exhausting in-memory tuples, also try refilling the hash table using * previously-spilled tuples. Only returns NULL after all in-memory and * spilled tuples are exhausted. */ static TupleTableSlot * agg_retrieve_hash_table(AggState *aggstate) { TupleTableSlot *result = NULL; while (result == NULL) { result = agg_retrieve_hash_table_in_memory(aggstate); if (result == NULL) { if (!agg_refill_hash_table(aggstate)) { aggstate->agg_done = true; break; } } } return result; }
然后执行:
agg_retrieve_hash_table