本文适用于PG 13.2。

可选算法

聚集算子有4种算法:

snippet.c
// src\include\nodes\nodes.h:
 
/*
 * AggStrategy -
 *	  overall execution strategies for Agg plan nodes
 *
 * This is needed in both pathnodes.h and plannodes.h, so put it here...
 */
typedef enum AggStrategy
{
	AGG_PLAIN,					/* simple agg across all input rows */
	AGG_SORTED,					/* grouped agg, input must be sorted,适用于下层算子输出有序的情况。 */
	AGG_HASHED,					/* grouped agg, use internal hashtable */
	AGG_MIXED					/* grouped agg, hash and sort both used */
} AggStrategy;

plan

group的计划path生成函数:

snippet.c
// src\backend\optimizer\plan\planner.c:
 
static void
grouping_planner(PlannerInfo *root, bool inheritance_update,
				 double tuple_fraction);

相关函数:

snippet.c
create_ordinary_grouping_paths
add_paths_to_grouping_rel

执行

聚集算子执行器核心代码在:

src\backend\executor\nodeAgg.c

运行期聚集节点初始化:

snippet.c
/* -----------------
 * ExecInitAgg
 *
 *	Creates the run-time information for the agg node produced by the
 *	planner and initializes its outer subtree.
 *
 * -----------------
 */
AggState *
ExecInitAgg(Agg *node, EState *estate, int eflags);

聚集内存限制评估:

snippet.c
/*
 * Set limits that trigger spilling to avoid exceeding hash_mem. Consider the
 * number of partitions we expect to create (if we do spill).
 *
 * There are two limits: a memory limit, and also an ngroups limit. The
 * ngroups limit becomes important when we expect transition values to grow
 * substantially larger than the initial value.
 */
void
hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits,
					Size *mem_limit, uint64 *ngroups_limit,
					int *num_partitions);

如果需要溢出(执行前根据评估的输入行数做的预估。执行过程中,实际使用内存超出算子限制时,也会执行以下评估),则评估溢出的分片数:

snippet.c
/*
 * Determine the number of partitions to create when spilling, which will
 * always be a power of two. If log2_npartitions is non-NULL, set
 * *log2_npartitions to the log2() of the number of partitions.
 */
static int
hash_choose_num_partitions(double input_groups, double hashentrysize,
						   int used_bits, int *log2_npartitions);

执行过程中判定是否进入溢出模式:

snippet.c
/*
 * hash_agg_check_limits
 *
 * After adding a new group to the hash table, check whether we need to enter
 * spill mode. Allocations may happen without adding new groups (for instance,
 * if the transition state size grows), so this check is imperfect.
 */
static void
hash_agg_check_limits(AggState *aggstate);

聚集算子执行器入口函数:

snippet.c
// src\backend\executor\nodeAgg.c:
 
static TupleTableSlot *
ExecAgg(PlanState *pstate)
{
	AggState   *node = castNode(AggState, pstate);
	TupleTableSlot *result = NULL;
 
	CHECK_FOR_INTERRUPTS();
 
	if (!node->agg_done)
	{
		/* Dispatch based on strategy */
		switch (node->phase->aggstrategy)
		{
			case AGG_HASHED:
				if (!node->table_filled)
					agg_fill_hash_table(node);
				/* FALLTHROUGH */
			case AGG_MIXED:
				result = agg_retrieve_hash_table(node);
				break;
			case AGG_PLAIN:
			case AGG_SORTED:
				result = agg_retrieve_direct(node);
				break;
		}
 
		if (!TupIsNull(result))
			return result;
	}
 
	return NULL;
}

如果是Hash聚集,则建Hash表:

snippet.c
/*
 * ExecAgg for hashed case: retrieving groups from hash table
 *
 * After exhausting in-memory tuples, also try refilling the hash table using
 * previously-spilled tuples. Only returns NULL after all in-memory and
 * spilled tuples are exhausted.
 */
static TupleTableSlot *
agg_retrieve_hash_table(AggState *aggstate)
{
	TupleTableSlot *result = NULL;
 
	while (result == NULL)
	{
		result = agg_retrieve_hash_table_in_memory(aggstate);
		if (result == NULL)
		{
			if (!agg_refill_hash_table(aggstate))
			{
				aggstate->agg_done = true;
				break;
			}
		}
	}
 
	return result;
}

然后执行:

agg_retrieve_hash_table