本文适用Greenplum Database(GPDB)源码 6X_STABLE 分支。
一、源码路径
- src\backend\utils\resgroup
- src\backend\utils\resource_manager
- src\backend\utils\resowner
- src\backend\utils\resscheduler
二、内存管理
初始化共享内存:
- snippet.c
#2 0x0000000000ddf80d in ResManagerShmemInit () at resource_manager.c:45 #3 0x0000000000be253d in CreateSharedMemoryAndSemaphores (port=6407) at ipci.c:284 #4 0x0000000000b8ce01 in reset_shared (port=6407) at postmaster.c:2915 #5 0x0000000000b8a825 in PostmasterMain (argc=5, argv=0x2f3c390) at postmaster.c:1331 #6 0x0000000000a93fc2 in main (argc=5, argv=0x2f3c390) at main.c:249
创建内存Context,用于palloc截取上下文:
- snippet.c
MemoryContext AllocSetContextCreate(MemoryContext parent, const char *name, Size minContextSize, Size initBlockSize, Size maxBlockSize);
通过切换上下文方式,切换当前内存Context,例如,
- snippet.c
old_ctx = MemoryContextSwitchTo(new_ctx); …… do something MemoryContextSwitchTo(old_ctx);
当调用palloc时,
- snippet.c
void * palloc(Size size) { …… // CurrentMemoryContext->methods.alloc函数指针指向分配内存处理函数。 ret = (*CurrentMemoryContext->methods.alloc) (CurrentMemoryContext, size); …… return ret; }
VmemTracker堆栈
使用vmtracker典型堆栈,当调研palloc时,被gp_malloc拦截:
- snippet.gdb
#0 VmemTracker_ReserveVmem (newlyRequestedBytes=32776) at vmem_tracker.c:524 #1 0x0000000000da8712 in gp_malloc_internal (requested_size=32768) at memprot.c:445 #2 0x0000000000da879f in gp_malloc (sz=32768) at memprot.c:479 #1 0x0000000000da1afe in AllocSetAllocImpl (context=0x2e92fa8, size=160, isHeader=0 '\000') at aset.c:1360 #2 0x0000000000da2016 in AllocSetAlloc (context=0x2e92fa8, size=160) at aset.c:1451 #3 0x0000000000da475d in palloc (size=160) at mcxt.c:1274 #4 0x0000000000c1208b in CreateQueryDesc (plannedstmt=0x2dbd120, sourceText=0x2dbc8a2 "select n_nationkey from nation order by n_nationkey desc;", snapshot=0x2dc4828, crosscheck_snapshot=0x0, dest=0x17d1b60 <donothingDR>, params=0x0, instrument_options=0) at pquery.c:91 #5 0x0000000000c12dfb in PortalStart (portal=0x2dc64e8, params=0x0, eflags=0, snapshot=0x0, ddesc=0x2dbdeb0) at pquery.c:655 #6 0x0000000000c0b299 in exec_mpp_query ……
内存模型转换
PG和GP很多时候会使用内存模型转换,例如:
- snippet.c
typedef AllocSetContext *AllocSet; typedef struct MemoryContextData *MemoryContext; MemoryContext AllocSetContextCreate(……) { AllocSet context; context = (AllocSet) MemoryContextCreate(T_AllocSetContext, sizeof(AllocSetContext), &AllocSetMethods, parent, name); ……
可以这么做的原因是,AllocSetContext内存模型包含了MemoryContextData,且MemoryContextData位于AllocSetContext第一个成员:
- snippet.c
typedef struct MemoryContextData { NodeTag type; MemoryContextMethods methods; MemoryContext parent; …… } MemoryContextData; typedef struct AllocSetContext { MemoryContextData header; // 必须是第一个成员 AllocBlock blocks; …… } AllocSetContext;
以上做法基于C99标准(Section 6.7.2.1 of the C99 standard),编译器不会主动调整结构体成员顺序:
Within a structure object, the non-bit-field members and the units in which bit-fields reside have addresses that increase in the order in which they are declared.
三、资源组
创建cgroups层级
src\backend\utils\resgroup\resgroup-ops-linux.c:buildPathSafe
数据结构与接口
- src\include\utils\resgroup.h,资源组基本数据结构与接口定义
资源组创建、删除和修改处理函数
- snippet.c
extern void CreateResourceGroup(CreateResourceGroupStmt *stmt); extern void DropResourceGroup(DropResourceGroupStmt *stmt); extern void AlterResourceGroup(AlterResourceGroupStmt *stmt);
关于资源类型的枚举
- snippet.c
src\include\catalog\pg_resgroup.h: typedef enum ResGroupLimitType { RESGROUP_LIMIT_TYPE_UNKNOWN = 0, RESGROUP_LIMIT_TYPE_CONCURRENCY, RESGROUP_LIMIT_TYPE_CPU, RESGROUP_LIMIT_TYPE_MEMORY, RESGROUP_LIMIT_TYPE_MEMORY_SHARED_QUOTA, RESGROUP_LIMIT_TYPE_MEMORY_SPILL_RATIO, RESGROUP_LIMIT_TYPE_MEMORY_AUDITOR, RESGROUP_LIMIT_TYPE_CPUSET, RESGROUP_LIMIT_TYPE_COUNT, } ResGroupLimitType;
创建资源组时的堆栈
- snippet.gdb
#0 CreateResourceGroup (stmt=stmt@entry=0x26723e8) at resgroupcmds.c:103 #1 0x0000000000a7e7bb in standard_ProcessUtility (parsetree=0x26723e8, queryString=0x2670e08 "CREATE RESOURCE GROUP rgroup1 WITH (CPU_RATE_LIMIT=5, MEMORY_LIMIT=25, MEMORY_SPILL_RATIO=20);", context=PROCESS_UTILITY_TOPLEVEL, params=0x0, dest=0x2672728, completionTag=0x7ffd9572a250 "") at utility.c:863 #2 0x0000000000a7b8b5 in PortalRunUtility (portal=portal@entry=0x287cc58, utilityStmt=utilityStmt@entry=0x26723e8, isTopLevel=isTopLevel@entry=1 '\001', dest=dest@entry=0x2672728, completionTag=completionTag@entry=0x7ffd9572a250 "") at pquery.c:1381 #3 0x0000000000a7c2b5 in PortalRunMulti (portal=portal@entry=0x287cc58, isTopLevel=isTopLevel@entry=1 '\001', dest=dest@entry=0x2672728, altdest=altdest@entry=0x2672728, completionTag=completionTag@entry=0x7ffd9572a250 "") at pquery.c:1512 #4 0x0000000000a7d711 in PortalRun (portal=portal@entry=0x287cc58, count=count@entry=9223372036854775807, isTopLevel=isTopLevel@entry=1 '\001', dest=dest@entry=0x2672728, altdest=altdest@entry=0x2672728, completionTag=completionTag@entry=0x7ffd9572a250 "") at pquery.c:1018 #5 0x0000000000a78514 in exec_simple_query (query_string=0x2670e08 "CREATE RESOURCE GROUP rgroup1 WITH (CPU_RATE_LIMIT=5, MEMORY_LIMIT=25, MEMORY_SPILL_RATIO=20);") at postgres.c:1824 #6 0x0000000000a7b470 in PostgresMain (argc=<optimized out>, argv=argv@entry=0x2650788, dbname=0x2650640 "tpch1s", username=<optimized out>) at postgres.c:5246 #7 0x00000000006bf09b in BackendRun (port=0x2680890) at postmaster.c:4811 #8 BackendStartup (port=0x2680890) at postmaster.c:4468 #9 ServerLoop () at postmaster.c:1948 #10 0x0000000000a025c9 in PostmasterMain (argc=argc@entry=6, argv=argv@entry=0x264e810) at postmaster.c:1518 #11 0x00000000006c3e4b in main (argc=6, argv=0x264e810) at main.c:245
查询开始时,资源登记
- snippet.gdb
#0 ResLockUpdateLimit (increment=1 '\001', proclock=0x7fb305aa29b0, inError=0 '\000', incrementSet=0x7fb308a6cf68, lock=0x7fb305517130) at resqueue.c:834 #1 ResLockAcquire (locktag=locktag@entry=0x7ffebec95270, incrementSet=<optimized out>, incrementSet@entry=0x7ffebec95280) at resqueue.c:406 #2 0x0000000000c22a08 in ResLockPortal (portal=portal@entry=0x3bc5148, qDesc=qDesc@entry=0x3a8c1b8) at resscheduler.c:684 #3 0x0000000000a7d366 in PortalStart (portal=portal@entry=0x3bc5148, params=params@entry=0x0, eflags=eflags@entry=0, snapshot=snapshot@entry=0x0, ddesc=ddesc@entry=0x0) at pquery.c:713 #4 0x0000000000a784a4 in exec_simple_query (query_string=0x38d0c88 "select\n\tl_returnflag,\n\tl_linestatus,\n\tsum(l_quantity) as sum_qty,\n\tsum(l_extendedprice) as sum_base_price,\n\tsum(l_extendedprice * (1 - l_discount)) as sum_disc_price,\n\tsum(l_extendedprice * (1 - l_dis"...) at postgres.c:1785 #5 0x0000000000a7b470 in PostgresMain (argc=<optimized out>, argv=argv@entry=0x38b06b8, dbname=0x38b0570 "tpch1s", username=<optimized out>) at postgres.c:5246
查询结束时,资源注销
- snippet.gdb
#0 ResLockUpdateLimit (proclock=0x7fb305aa29b0, inError=0 '\000', increment=0 '\000', incrementSet=0x7fb308a6cf68, lock=0x7fb305517130) at resqueue.c:834 #1 ResLockRelease (locktag=locktag@entry=0x7ffebec953d0, resPortalId=0) at resqueue.c:605 #2 0x0000000000c22d5f in ResUnLockPortal (portal=portal@entry=0x3bc5148) at resscheduler.c:852 #3 0x000000000085fc2a in PortalCleanup (portal=0x3bc5148) at portalcmds.c:344 #4 0x0000000000c102ca in PortalDrop (portal=0x3bc5148, isTopCommit=<optimized out>) at portalmem.c:535
CPUSET资源限制时,PID写入tasks过程
假设资源组groupid为16466,使用CPUSET绑定CPU核,那么在segment的executor执行进程启动时,需要将fork出来的executor进程PID写入/sys/fs/cgroup/cpuset/gpdb/16466/cgroup.procs和/sys/fs/cgroup/cpuset/gpdb/16466/tasks,这个过程堆栈如下:
- snippet.gdb
#0 writeInt64 (group=16466, base=BASETYPE_GPDB, comp=RESGROUP_COMP_TYPE_FIRST, prop=0x12f46a1 "cgroup.procs", x=49095) at resgroup-ops-linux.c:951 #1 0x0000000000db64aa in ResGroupOps_AssignGroup (group=16466, caps=0x1849840 <__self+32>, pid=49095) at resgroup-ops-linux.c:1565 #2 0x0000000000db008a in SwitchResGroupOnSegment (buf=0x349d812 "", len=37) at resgroup.c:2836 #3 0x0000000000c10f8d in PostgresMain (argc=1, argv=0x347c290, dbname=0x347c1a0 "tpch1s", username=0x347c188 "yz") at postgres.c:5345 #4 0x0000000000b8fbe4 in BackendRun (port=0x34aa840) at postmaster.c:4811 #5 0x0000000000b8f2b5 in BackendStartup (port=0x34aa840) at postmaster.c:4468 #6 0x0000000000b8b487 in ServerLoop () at postmaster.c:1948 #7 0x0000000000b8aa2c in PostmasterMain (argc=5, argv=0x347a390) at postmaster.c:1518 #8 0x0000000000a93f6e in main (argc=5, argv=0x347a390) at main.c:245
buildPath
主要是对cgroups层级的访问,都需要经过此接口函数,如果对应的层级不存在,则创建之,防止出现对无效层级的访问。
- snippet.c
src/backend/utils/resgroup/resgroup-ops-linux.c: static char * buildPath(Oid group, BaseType base, ResGroupCompType comp, const char *prop, char *path, size_t pathsize) { char *result = buildPathSafe(group, base, comp, prop, path, pathsize); …… return result; }
四、dispatcher资源控制
内存配额
在查询一开始时通过以上函数获得内存保留值:
- snippet.c
void PortalStart(Portal portal, ParamListInfo params, int eflags, Snapshot snapshot, QueryDispatchDesc *ddesc) { // … // query_mem = 0表示无限制,单位:Byte。 queryDesc->plannedstmt->query_mem = ResourceManagerGetQueryMemoryLimit(queryDesc->plannedstmt);
ResourceManagerGetQueryMemoryLimit函数获得查询内存资源配额:
- snippet.c
// Calculate the amount of memory reserved for the query int64 ResourceManagerGetQueryMemoryLimit(PlannedStmt* stmt) { if (Gp_role != GP_ROLE_DISPATCH) return 0; /* no limits in single user mode. */ if (!IsUnderPostmaster) // 如果是postmaster的子进程,则IsUnderPostmaster为true。 return 0; Assert(gp_session_id > -1); Assert(ActivePortal != NULL); if (IsResQueueEnabled()) return ResourceQueueGetQueryMemoryLimit(stmt, ActivePortal->queueId); if (IsResGroupActivated()) return ResourceGroupGetQueryMemoryLimit(); return 0; }
ResourceGroupGetQueryMemoryLimit需要知道每个segment可以使用的内存总额,这个总额是由decideTotalChunks函数计算得到的:
- snippet.c
// Calculate the total memory chunks of the segment static void decideTotalChunks(int32 *totalChunks, int32 *chunkSizeInBits) { …… nsegments = Gp_role == GP_ROLE_EXECUTE ? host_segments : pResGroupControl->segmentsOnMaster; // ResGroupOps_GetTotalMemory获得系统可用内存: // RAM * overcommit_ratio + Swap tmptotalChunks = ResGroupOps_GetTotalMemory() * gp_resource_group_memory_limit / nsegments; /* If vmem is larger than 16GB (i.e., 16K MB), we make the chunks bigger * so that the vmem limit in chunks unit is not larger than 16K.*/ tmpchunkSizeInBits = BITS_IN_MB; while(tmptotalChunks > (16 * 1024)) { tmpchunkSizeInBits++; tmptotalChunks >>= 1; } …… }
segment个数问题
以上计算segment内存配额时,其中一个计算依据是segment个数,注意,以下代码中的 pResGroupControl→segmentsOnMaster,该值是包含master个数的,即, $$ segmentsOnMaster = segment个数 + master个数 $$ 而segmentsOnMaster计算是在以下位置计算的:
- snippet.cpp
/* * Internal function to initialize each component info */ static CdbComponentDatabases * getCdbComponentInfo(void) { …… HTAB *hostSegsHash = hostSegsHashTableInit(); …… hsEntry->segmentCount++; ……
并发控制
当资源组并发数超过限额时,新的事务将进入排队状态,堆栈如下:
- snippet.gdb
#0 WaitLatchOrSocket (latch=0x7f6d848679a4, wakeEvents=17, sock=-1, timeout=-1) at latch.c:322 #1 0x0000000000b5fb6d in WaitLatch (latch=0x7f6d848679a4, wakeEvents=17, timeout=-1) at latch.c:303 #2 0x0000000000d4c5c8 in waitOnGroup (group=0x7f6d84800928, isMoveQuery=0 '\000') at resgroup.c:2918 #3 0x0000000000d4ada6 in groupAcquireSlot (pGroupInfo=0x7ffd422a3d70, isMoveQuery=0 '\000') at resgroup.c:1934 #4 0x0000000000d4bd7b in AssignResGroupOnMaster () at resgroup.c:2636 #5 0x0000000000787133 in StartTransaction () at xact.c:2606 #6 0x00000000007881a4 in StartTransactionCommand () at xact.c:3576 #7 0x0000000000b8f77c in start_xact_command () at postgres.c:3183 #8 0x0000000000b8cdb6 in exec_simple_query (query_string=0x31090e0 "select * from region limit 1;") at postgres.c:1573 #9 0x0000000000b91ca3 in PostgresMain (argc=1, argv=0x3107220, dbname=0x31070f8 "tpch1s", username=0x31070e0 "yz") at postgres.c:5085 ……
五、资源队列
获得资源队列内存限制值
- snippet.c
uint64 ResourceQueueGetQueryMemoryLimit(PlannedStmt *stmt, Oid queueId)
ResourceQueueGetQueryMemoryLimit分配内存原则:
- snippet.c
// 超级用户,不限制内存。 if (superuser()) x return ResourceQueueGetSuperuserQueryMemoryLimit(); // gp_resqueue_memory_policy参数为none时,不限制内存。 if (IsResManagerMemoryPolicyNone()) x return 0; // 先按照并发数limit以及计划的cost,计算出等分内存的最小比例: double minRatio = Min( 1.0/ (double) numSlots, planCost / costLimit); …… // 以上按比例计算出来的内存大小,如果小于statement_mem,则使用statement_mem。 if (queryMem < (uint64) statement_mem * 1024L) { queryMem = (uint64) statement_mem * 1024L; }
六、算子buffer
算子分配内存配额
在dispatcher中,给算子分配内存配额:
- snippet.c
src\backend\executor\execMain.c: void standard_ExecutorStart(QueryDesc *queryDesc, int eflags) { …… /*Distribute memory to operators.*/ if (Gp_role == GP_ROLE_DISPATCH) { …… switch(*gp_resmanager_memory_policy) { case RESMANAGER_MEMORY_POLICY_AUTO: PolicyAutoAssignOperatorMemoryKB(queryDesc->plannedstmt, queryDesc->plannedstmt->query_mem); break; case RESMANAGER_MEMORY_POLICY_EAGER_FREE: PolicyEagerFreeAssignOperatorMemoryKB(queryDesc->plannedstmt, queryDesc->plannedstmt->query_mem); break; default: Assert(IsResManagerMemoryPolicyNone()); break; }
计算算子buffer配额大小的函数:
- snippet.c
src\backend\executor\execUtils.c: uint64 PlanStateOperatorMemKB(const PlanState *ps) { Assert(ps); Assert(ps->plan); uint64 result = 0; if (ps->plan->operatorMemKB == 0) { /** * There are some statements that do not go through the resource queue and these * plans dont get decorated with the operatorMemKB. Someday, we should fix resource queues. */ result = work_mem; } else { if (IsA(ps, AggState)) { /* Retrieve all relinquished memory (quota the other node not using) */ result = ps->plan->operatorMemKB + (MemoryAccounting_RequestQuotaIncrease() >> 10); } else result = ps->plan->operatorMemKB; } return result; }
例如,aggregate在创建hash表时,确定hash表最大可用内存:
- snippet.c
HashAggTable * create_agg_hash_table(AggState *aggstate) { …… HashAggTable *hashtable; …… hashtable = (HashAggTable *) palloc0(sizeof(HashAggTable)); …… uint64 operatorMemKB = PlanStateOperatorMemKB( (PlanState *) aggstate); …… hashtable->max_mem = 1024.0 * operatorMemKB; …… }
判断是否内存敏感算子
判断函数:
- snippet.c
static bool IsMemoryIntensiveOperator(Node *node, PlannedStmt *stmt) { Assert(is_plan_node(node)); switch(nodeTag(node)) { case T_Material: case T_Sort: case T_ShareInputScan: case T_Hash: case T_BitmapIndexScan: case T_WindowAgg: case T_TableFunctionScan: case T_FunctionScan: return true; case T_Agg: { Agg *agg = (Agg *) node; return IsAggMemoryIntensive(agg); } case T_Result: { Result *res = (Result *) node; return IsResultMemoryIntensive(res); } default: return false; } }
内存敏感算子内存计算方法
- snippet.c
src/backend/utils/resource_manager/memquota.c: /* * ComputeAvgMemKBForMemIntenseOp * Compute the average memory limit for each memory-intensive operators * in a given group. * * If there is no memory-intensive operators in this group, return 0. */ static uint64 ComputeAvgMemKBForMemIntenseOp(OperatorGroupNode *groupNode) { if (groupNode->numMemIntenseOps == 0) { return 0; } const uint64 nonMemIntenseOpMemKB = (uint64)(*gp_resmanager_memory_policy_auto_fixed_mem); return (((double)groupNode->groupMemKB - (double)groupNode->numNonMemIntenseOps * nonMemIntenseOpMemKB) / groupNode->numMemIntenseOps); }
算子buffer配额数据结构
- snippet.c
typedef struct Plan { NodeTag type; …… /** * How much memory (in KB) should be used to execute this plan node? */ uint64 operatorMemKB; …… } Plan;
七、参数控制
内存资源管理策略
gp_resqueue_memory_policy和gp_resgroup_memory_policy参数对应的枚举类型:
- snippet.c
src\include\cdb\memquota.h: typedef enum ResManagerMemoryPolicy { RESMANAGER_MEMORY_POLICY_NONE, RESMANAGER_MEMORY_POLICY_AUTO, RESMANAGER_MEMORY_POLICY_EAGER_FREE } ResManagerMemoryPolicy;
auto和eager_free两种策略对应的分配内存函数:
- snippet.c
src\include\cdb\memquota.h: extern void PolicyAutoAssignOperatorMemoryKB(PlannedStmt *stmt, uint64 memoryAvailable); extern void PolicyEagerFreeAssignOperatorMemoryKB(PlannedStmt *stmt, uint64 memoryAvailable);
max_statement_mem无实际控制作用
安装GP文档说法,这个参数是对statement_mem的保护,但是从实际实验和代码看,没有实现文档所述功能。
- snippet.c
src\backend\cdb\cdbvars.c: bool gpvars_check_statement_mem(int *newval, void **extra, GucSource source) { if (*newval >= max_statement_mem) { GUC_check_errmsg("Invalid input for statement_mem, must be less than max_statement_mem (%d kB)", max_statement_mem); } return true; }