FAIRYFAR-INTERNAL
 
  FAIRYFAR-INTERNAL  |  SITEMAP  |  ABOUT-ME  |  HOME  
Greenplum 6.X 资源管理源码分析

本文适用Greenplum Database(GPDB)源码 6X_STABLE 分支。

一、源码路径

  • src\backend\utils\resgroup
  • src\backend\utils\resource_manager
  • src\backend\utils\resowner
  • src\backend\utils\resscheduler

二、内存管理

初始化共享内存:

snippet.c
#2  0x0000000000ddf80d in ResManagerShmemInit () at resource_manager.c:45
#3  0x0000000000be253d in CreateSharedMemoryAndSemaphores (port=6407) at ipci.c:284
#4  0x0000000000b8ce01 in reset_shared (port=6407) at postmaster.c:2915
#5  0x0000000000b8a825 in PostmasterMain (argc=5, argv=0x2f3c390) at postmaster.c:1331
#6  0x0000000000a93fc2 in main (argc=5, argv=0x2f3c390) at main.c:249

创建内存Context,用于palloc截取上下文:

snippet.c
MemoryContext
AllocSetContextCreate(MemoryContext parent,
					  const char *name,
					  Size minContextSize,
					  Size initBlockSize,
					  Size maxBlockSize);

通过切换上下文方式,切换当前内存Context,例如,

snippet.c
old_ctx = MemoryContextSwitchTo(new_ctx);
…… do something
MemoryContextSwitchTo(old_ctx);

当调用palloc时,

snippet.c
void *
palloc(Size size)
{
	……
  // CurrentMemoryContext->methods.alloc函数指针指向分配内存处理函数。
	ret = (*CurrentMemoryContext->methods.alloc) (CurrentMemoryContext, size);
	……
	return ret;
}

VmemTracker堆栈

使用vmtracker典型堆栈,当调研palloc时,被gp_malloc拦截:

snippet.gdb
#0  VmemTracker_ReserveVmem (newlyRequestedBytes=32776)	at vmem_tracker.c:524
#1  0x0000000000da8712 in gp_malloc_internal (requested_size=32768) at memprot.c:445
#2  0x0000000000da879f in gp_malloc (sz=32768) at memprot.c:479
#1  0x0000000000da1afe in AllocSetAllocImpl (context=0x2e92fa8, size=160, isHeader=0 '\000') at aset.c:1360
#2  0x0000000000da2016 in AllocSetAlloc (context=0x2e92fa8, size=160) at aset.c:1451
#3  0x0000000000da475d in palloc (size=160) at mcxt.c:1274
#4  0x0000000000c1208b in CreateQueryDesc (plannedstmt=0x2dbd120, sourceText=0x2dbc8a2 "select n_nationkey from nation order by n_nationkey desc;", snapshot=0x2dc4828, crosscheck_snapshot=0x0, dest=0x17d1b60 <donothingDR>, params=0x0, instrument_options=0)
    at pquery.c:91
#5  0x0000000000c12dfb in PortalStart (portal=0x2dc64e8, params=0x0, eflags=0, snapshot=0x0, ddesc=0x2dbdeb0) at pquery.c:655
#6  0x0000000000c0b299 in exec_mpp_query
……

内存模型转换

PG和GP很多时候会使用内存模型转换,例如:

snippet.c
typedef AllocSetContext *AllocSet;
typedef struct MemoryContextData *MemoryContext;
 
MemoryContext
AllocSetContextCreate(……)
{
	AllocSet	context;
	context = (AllocSet) MemoryContextCreate(T_AllocSetContext,
											 sizeof(AllocSetContext),
											 &AllocSetMethods,
											 parent,
											 name);
  ……

可以这么做的原因是,AllocSetContext内存模型包含了MemoryContextData,且MemoryContextData位于AllocSetContext第一个成员:

snippet.c
typedef struct MemoryContextData
{
	NodeTag		type;
	MemoryContextMethods methods;
	MemoryContext parent;
  ……
} MemoryContextData;
 
typedef struct AllocSetContext
{
	MemoryContextData header;	// 必须是第一个成员
	AllocBlock	blocks;
	……
} AllocSetContext;

以上做法基于C99标准(Section 6.7.2.1 of the C99 standard),编译器不会主动调整结构体成员顺序:

Within a structure object, the non-bit-field members and the units in which bit-fields reside have addresses that increase in the order in which they are declared.

三、资源组

创建cgroups层级

src\backend\utils\resgroup\resgroup-ops-linux.c:buildPathSafe

数据结构与接口

  • src\include\utils\resgroup.h,资源组基本数据结构与接口定义

资源组创建、删除和修改处理函数

snippet.c
extern void CreateResourceGroup(CreateResourceGroupStmt *stmt);
extern void DropResourceGroup(DropResourceGroupStmt *stmt);
extern void AlterResourceGroup(AlterResourceGroupStmt *stmt);

关于资源类型的枚举

snippet.c
src\include\catalog\pg_resgroup.htypedef enum ResGroupLimitType
{
	RESGROUP_LIMIT_TYPE_UNKNOWN = 0,
 
	RESGROUP_LIMIT_TYPE_CONCURRENCY,
	RESGROUP_LIMIT_TYPE_CPU,
	RESGROUP_LIMIT_TYPE_MEMORY,
	RESGROUP_LIMIT_TYPE_MEMORY_SHARED_QUOTA,
	RESGROUP_LIMIT_TYPE_MEMORY_SPILL_RATIO,
	RESGROUP_LIMIT_TYPE_MEMORY_AUDITOR,
	RESGROUP_LIMIT_TYPE_CPUSET,
 
	RESGROUP_LIMIT_TYPE_COUNT,
} ResGroupLimitType;

创建资源组时的堆栈

snippet.gdb
#0  CreateResourceGroup (stmt=stmt@entry=0x26723e8) at resgroupcmds.c:103
#1  0x0000000000a7e7bb in standard_ProcessUtility (parsetree=0x26723e8, 
    queryString=0x2670e08 "CREATE RESOURCE GROUP rgroup1 WITH (CPU_RATE_LIMIT=5, MEMORY_LIMIT=25, MEMORY_SPILL_RATIO=20);", context=PROCESS_UTILITY_TOPLEVEL, 
    params=0x0, dest=0x2672728, completionTag=0x7ffd9572a250 "") at utility.c:863
#2  0x0000000000a7b8b5 in PortalRunUtility (portal=portal@entry=0x287cc58, utilityStmt=utilityStmt@entry=0x26723e8, isTopLevel=isTopLevel@entry=1 '\001', 
    dest=dest@entry=0x2672728, completionTag=completionTag@entry=0x7ffd9572a250 "") at pquery.c:1381
#3  0x0000000000a7c2b5 in PortalRunMulti (portal=portal@entry=0x287cc58, isTopLevel=isTopLevel@entry=1 '\001', dest=dest@entry=0x2672728, 
    altdest=altdest@entry=0x2672728, completionTag=completionTag@entry=0x7ffd9572a250 "") at pquery.c:1512
#4  0x0000000000a7d711 in PortalRun (portal=portal@entry=0x287cc58, count=count@entry=9223372036854775807, isTopLevel=isTopLevel@entry=1 '\001', 
    dest=dest@entry=0x2672728, altdest=altdest@entry=0x2672728, completionTag=completionTag@entry=0x7ffd9572a250 "") at pquery.c:1018
#5  0x0000000000a78514 in exec_simple_query (query_string=0x2670e08 "CREATE RESOURCE GROUP rgroup1 WITH (CPU_RATE_LIMIT=5, MEMORY_LIMIT=25, MEMORY_SPILL_RATIO=20);")
    at postgres.c:1824
#6  0x0000000000a7b470 in PostgresMain (argc=<optimized out>, argv=argv@entry=0x2650788, dbname=0x2650640 "tpch1s", username=<optimized out>) at postgres.c:5246
#7  0x00000000006bf09b in BackendRun (port=0x2680890) at postmaster.c:4811
#8  BackendStartup (port=0x2680890) at postmaster.c:4468
#9  ServerLoop () at postmaster.c:1948
#10 0x0000000000a025c9 in PostmasterMain (argc=argc@entry=6, argv=argv@entry=0x264e810) at postmaster.c:1518
#11 0x00000000006c3e4b in main (argc=6, argv=0x264e810) at main.c:245

查询开始时,资源登记

snippet.gdb
#0  ResLockUpdateLimit (increment=1 '\001', proclock=0x7fb305aa29b0, inError=0 '\000', incrementSet=0x7fb308a6cf68, lock=0x7fb305517130) at resqueue.c:834
#1  ResLockAcquire (locktag=locktag@entry=0x7ffebec95270, incrementSet=<optimized out>,	incrementSet@entry=0x7ffebec95280) at resqueue.c:406
#2  0x0000000000c22a08 in ResLockPortal (portal=portal@entry=0x3bc5148,	qDesc=qDesc@entry=0x3a8c1b8) at resscheduler.c:684
#3  0x0000000000a7d366 in PortalStart (portal=portal@entry=0x3bc5148, params=params@entry=0x0, eflags=eflags@entry=0, snapshot=snapshot@entry=0x0, ddesc=ddesc@entry=0x0) at pquery.c:713
#4  0x0000000000a784a4 in exec_simple_query (query_string=0x38d0c88 "select\n\tl_returnflag,\n\tl_linestatus,\n\tsum(l_quantity) as sum_qty,\n\tsum(l_extendedprice) as sum_base_price,\n\tsum(l_extendedprice * (1 - l_discount)) as sum_disc_price,\n\tsum(l_extendedprice * (1 - l_dis"...) at postgres.c:1785
#5  0x0000000000a7b470 in PostgresMain (argc=<optimized out>, argv=argv@entry=0x38b06b8, dbname=0x38b0570 "tpch1s", username=<optimized out>) at postgres.c:5246

查询结束时,资源注销

snippet.gdb
#0  ResLockUpdateLimit (proclock=0x7fb305aa29b0, inError=0 '\000', increment=0 '\000', incrementSet=0x7fb308a6cf68, lock=0x7fb305517130) at resqueue.c:834
#1  ResLockRelease (locktag=locktag@entry=0x7ffebec953d0, resPortalId=0) at resqueue.c:605
#2  0x0000000000c22d5f in ResUnLockPortal (portal=portal@entry=0x3bc5148) at resscheduler.c:852
#3  0x000000000085fc2a in PortalCleanup (portal=0x3bc5148) at portalcmds.c:344
#4  0x0000000000c102ca in PortalDrop (portal=0x3bc5148,	isTopCommit=<optimized out>) at portalmem.c:535

CPUSET资源限制时,PID写入tasks过程

假设资源组groupid为16466,使用CPUSET绑定CPU核,那么在segment的executor执行进程启动时,需要将fork出来的executor进程PID写入/sys/fs/cgroup/cpuset/gpdb/16466/cgroup.procs和/sys/fs/cgroup/cpuset/gpdb/16466/tasks,这个过程堆栈如下:

snippet.gdb
#0  writeInt64 (group=16466, base=BASETYPE_GPDB, comp=RESGROUP_COMP_TYPE_FIRST, prop=0x12f46a1 "cgroup.procs", x=49095) at resgroup-ops-linux.c:951
#1  0x0000000000db64aa in ResGroupOps_AssignGroup (group=16466, caps=0x1849840 <__self+32>, pid=49095) at resgroup-ops-linux.c:1565
#2  0x0000000000db008a in SwitchResGroupOnSegment (buf=0x349d812 "", len=37) at resgroup.c:2836
#3  0x0000000000c10f8d in PostgresMain (argc=1, argv=0x347c290, dbname=0x347c1a0 "tpch1s", username=0x347c188 "yz") at postgres.c:5345
#4  0x0000000000b8fbe4 in BackendRun (port=0x34aa840) at postmaster.c:4811
#5  0x0000000000b8f2b5 in BackendStartup (port=0x34aa840) at postmaster.c:4468
#6  0x0000000000b8b487 in ServerLoop () at postmaster.c:1948
#7  0x0000000000b8aa2c in PostmasterMain (argc=5, argv=0x347a390) at postmaster.c:1518
#8  0x0000000000a93f6e in main (argc=5, argv=0x347a390) at main.c:245

buildPath

主要是对cgroups层级的访问,都需要经过此接口函数,如果对应的层级不存在,则创建之,防止出现对无效层级的访问。

snippet.c
src/backend/utils/resgroup/resgroup-ops-linux.cstatic char *
buildPath(Oid group,
		  BaseType base,
		  ResGroupCompType comp,
		  const char *prop,
		  char *path,
		  size_t pathsize)
{
	char	   *result = buildPathSafe(group, base, comp, prop, path, pathsize);
  ……
	return result;
}

四、dispatcher资源控制

内存配额

在查询一开始时通过以上函数获得内存保留值:

snippet.c
void
PortalStart(Portal portal, ParamListInfo params,
			int eflags, Snapshot snapshot,
			QueryDispatchDesc *ddesc)
{
// …
// query_mem = 0表示无限制,单位:Byte。
queryDesc->plannedstmt->query_mem = ResourceManagerGetQueryMemoryLimit(queryDesc->plannedstmt);

ResourceManagerGetQueryMemoryLimit函数获得查询内存资源配额:

snippet.c
// Calculate the amount of memory reserved for the query
int64
ResourceManagerGetQueryMemoryLimit(PlannedStmt* stmt)
{
	if (Gp_role != GP_ROLE_DISPATCH)
		return 0;
 
	/* no limits in single user mode. */
	if (!IsUnderPostmaster)   // 如果是postmaster的子进程,则IsUnderPostmaster为true。
		return 0;
 
	Assert(gp_session_id > -1);
	Assert(ActivePortal != NULL);
 
	if (IsResQueueEnabled())
		return ResourceQueueGetQueryMemoryLimit(stmt, ActivePortal->queueId);
	if (IsResGroupActivated())
		return ResourceGroupGetQueryMemoryLimit();
 
	return 0;
}

ResourceGroupGetQueryMemoryLimit需要知道每个segment可以使用的内存总额,这个总额是由decideTotalChunks函数计算得到的:

snippet.c
// Calculate the total memory chunks of the segment
static void
decideTotalChunks(int32 *totalChunks, int32 *chunkSizeInBits)
{
  ……
	nsegments = Gp_role == GP_ROLE_EXECUTE ? host_segments : pResGroupControl->segmentsOnMaster;
  // ResGroupOps_GetTotalMemory获得系统可用内存:
  // RAM * overcommit_ratio + Swap
	tmptotalChunks = ResGroupOps_GetTotalMemory() * gp_resource_group_memory_limit / nsegments;
 
	/* If vmem is larger than 16GB (i.e., 16K MB), we make the chunks bigger
	 * so that the vmem limit in chunks unit is not larger than 16K.*/
	tmpchunkSizeInBits = BITS_IN_MB;
	while(tmptotalChunks > (16 * 1024))
	{
		tmpchunkSizeInBits++;
		tmptotalChunks >>= 1;
	}
  ……
}

segment个数问题

以上计算segment内存配额时,其中一个计算依据是segment个数,注意,以下代码中的 pResGroupControl→segmentsOnMaster,该值是包含master个数的,即, $$ segmentsOnMaster = segment个数 + master个数 $$ 而segmentsOnMaster计算是在以下位置计算的:

snippet.cpp
/*
 *  Internal function to initialize each component info
 */
static CdbComponentDatabases *
getCdbComponentInfo(void)
{
  ……
  HTAB	   *hostSegsHash = hostSegsHashTableInit();
  ……
  hsEntry->segmentCount++;
  ……

并发控制

当资源组并发数超过限额时,新的事务将进入排队状态,堆栈如下:

snippet.gdb
#0  WaitLatchOrSocket (latch=0x7f6d848679a4, wakeEvents=17, sock=-1, timeout=-1) at latch.c:322
#1  0x0000000000b5fb6d in WaitLatch (latch=0x7f6d848679a4, wakeEvents=17, timeout=-1) at latch.c:303
#2  0x0000000000d4c5c8 in waitOnGroup (group=0x7f6d84800928, isMoveQuery=0 '\000') at resgroup.c:2918
#3  0x0000000000d4ada6 in groupAcquireSlot (pGroupInfo=0x7ffd422a3d70, isMoveQuery=0 '\000') at resgroup.c:1934
#4  0x0000000000d4bd7b in AssignResGroupOnMaster () at resgroup.c:2636
#5  0x0000000000787133 in StartTransaction () at xact.c:2606
#6  0x00000000007881a4 in StartTransactionCommand () at xact.c:3576
#7  0x0000000000b8f77c in start_xact_command ()	at postgres.c:3183
#8  0x0000000000b8cdb6 in exec_simple_query (query_string=0x31090e0 "select * from region limit 1;") at postgres.c:1573
#9  0x0000000000b91ca3 in PostgresMain (argc=1,	argv=0x3107220,	dbname=0x31070f8 "tpch1s", username=0x31070e0 "yz") at postgres.c:5085
……

五、资源队列

获得资源队列内存限制值

snippet.c
uint64 ResourceQueueGetQueryMemoryLimit(PlannedStmt *stmt, Oid queueId)

ResourceQueueGetQueryMemoryLimit分配内存原则:

snippet.c
// 超级用户,不限制内存。
if (superuser())                                                                                                                                                                                                                                         x
    return ResourceQueueGetSuperuserQueryMemoryLimit();
// gp_resqueue_memory_policy参数为none时,不限制内存。
if (IsResManagerMemoryPolicyNone())                                                                                                                                                                                                                      x
    return 0;
// 先按照并发数limit以及计划的cost,计算出等分内存的最小比例:
double minRatio = Min( 1.0/ (double) numSlots, planCost / costLimit);
……
// 以上按比例计算出来的内存大小,如果小于statement_mem,则使用statement_mem。
if (queryMem < (uint64) statement_mem * 1024L)
{
    queryMem = (uint64) statement_mem * 1024L;
}

六、算子buffer

算子分配内存配额

在dispatcher中,给算子分配内存配额:

snippet.c
src\backend\executor\execMain.cvoid standard_ExecutorStart(QueryDesc *queryDesc, int eflags)
{
  ……
	/*Distribute memory to operators.*/
	if (Gp_role == GP_ROLE_DISPATCH)
	{
		……
			switch(*gp_resmanager_memory_policy)
			{
				case RESMANAGER_MEMORY_POLICY_AUTO:
					PolicyAutoAssignOperatorMemoryKB(queryDesc->plannedstmt,
													 queryDesc->plannedstmt->query_mem);
					break;
				case RESMANAGER_MEMORY_POLICY_EAGER_FREE:
					PolicyEagerFreeAssignOperatorMemoryKB(queryDesc->plannedstmt,
														  queryDesc->plannedstmt->query_mem);
					break;
				default:
					Assert(IsResManagerMemoryPolicyNone());
					break;
			}

计算算子buffer配额大小的函数:

snippet.c
src\backend\executor\execUtils.cuint64 PlanStateOperatorMemKB(const PlanState *ps)
{
	Assert(ps);
	Assert(ps->plan);
	uint64 result = 0;
	if (ps->plan->operatorMemKB == 0)
	{
		/**
		 * There are some statements that do not go through the resource queue and these
		 * plans dont get decorated with the operatorMemKB. Someday, we should fix resource queues.
		 */
		result = work_mem;
	}
	else
	{
		if (IsA(ps, AggState))
		{
			/* Retrieve all relinquished memory (quota the other node not using) */
			result = ps->plan->operatorMemKB + (MemoryAccounting_RequestQuotaIncrease() >> 10);
		}
		else
			result = ps->plan->operatorMemKB;
	}
 
	return result;
}

例如,aggregate在创建hash表时,确定hash表最大可用内存:

snippet.c
HashAggTable *
create_agg_hash_table(AggState *aggstate)
{
	……
	HashAggTable *hashtable;
	……
	hashtable = (HashAggTable *) palloc0(sizeof(HashAggTable));
  ……
  uint64 operatorMemKB = PlanStateOperatorMemKB( (PlanState *) aggstate);
  ……
  hashtable->max_mem = 1024.0 * operatorMemKB;
  ……
}

判断是否内存敏感算子

判断函数:

snippet.c
static bool
IsMemoryIntensiveOperator(Node *node, PlannedStmt *stmt)
{
	Assert(is_plan_node(node));
	switch(nodeTag(node))
	{
		case T_Material:
		case T_Sort:
		case T_ShareInputScan:
		case T_Hash:
		case T_BitmapIndexScan:
		case T_WindowAgg:
		case T_TableFunctionScan:
		case T_FunctionScan:
			return true;
		case T_Agg:
			{
				Agg *agg = (Agg *) node;
				return IsAggMemoryIntensive(agg);
			}
		case T_Result:
			{
				Result *res = (Result *) node;
				return IsResultMemoryIntensive(res);
			}
		default:
			return false;
	}
}

内存敏感算子内存计算方法

snippet.c
src/backend/utils/resource_manager/memquota.c/*
 * ComputeAvgMemKBForMemIntenseOp
 *    Compute the average memory limit for each memory-intensive operators
 * in a given group.
 *
 * If there is no memory-intensive operators in this group, return 0.
 */
static uint64
ComputeAvgMemKBForMemIntenseOp(OperatorGroupNode *groupNode)
{
	if (groupNode->numMemIntenseOps == 0)
	{
		return 0;
	}
 
	const uint64 nonMemIntenseOpMemKB = (uint64)(*gp_resmanager_memory_policy_auto_fixed_mem);
 
	return (((double)groupNode->groupMemKB -
			 (double)groupNode->numNonMemIntenseOps * nonMemIntenseOpMemKB) /
			groupNode->numMemIntenseOps);
}

算子buffer配额数据结构

snippet.c
typedef struct Plan
{
	NodeTag		type;
	……
	/**
	 * How much memory (in KB) should be used to execute this plan node?
	 */
	uint64 operatorMemKB;
	……
} Plan;

七、参数控制

内存资源管理策略

gp_resqueue_memory_policy和gp_resgroup_memory_policy参数对应的枚举类型:

snippet.c
src\include\cdb\memquota.h:
typedef enum ResManagerMemoryPolicy
{
  RESMANAGER_MEMORY_POLICY_NONE,
  RESMANAGER_MEMORY_POLICY_AUTO,
  RESMANAGER_MEMORY_POLICY_EAGER_FREE
} ResManagerMemoryPolicy;

auto和eager_free两种策略对应的分配内存函数:

snippet.c
src\include\cdb\memquota.hextern void PolicyAutoAssignOperatorMemoryKB(PlannedStmt *stmt, uint64 memoryAvailable);
extern void PolicyEagerFreeAssignOperatorMemoryKB(PlannedStmt *stmt, uint64 memoryAvailable);

max_statement_mem无实际控制作用

安装GP文档说法,这个参数是对statement_mem的保护,但是从实际实验和代码看,没有实现文档所述功能。

snippet.c
src\backend\cdb\cdbvars.c:
bool
gpvars_check_statement_mem(int *newval, void **extra, GucSource source)
{
	if (*newval >= max_statement_mem)
	{
		GUC_check_errmsg("Invalid input for statement_mem, must be less than max_statement_mem (%d kB)",
						 max_statement_mem);
	}
 
	return true;
}

参考



打赏作者以资鼓励: