本文代码基于:openGauss 2021.03月中旬代码(openGauss 1.0.1 build 6942b898)。
openGauss对原生PostgreSQL的信号处理有很大改动,主要有以下几个方面:
信号接收线程(signal receiver thread)是新增的线程,线程名为GaussMaster,线程函数为gs_signal_receiver_thread。
gdb可以看到,thread 3是信号接收线程:
(gdb) info threads Id Target Id Frame 1 Thread 0x7fbd2397d480 (LWP 2592557) "GaussMaster" 0x00007fbd246f2bed in poll () from /usr/lib64/libc.so.6 2 Thread 0x7fbd22fff700 (LWP 2592558) "jemalloc_bg_thd" 0x00007fbd249d89f5 in pthread_cond_wait@@GLIBC_2.3.2 () from /usr/lib64/libpthread.so.0 * 3 Thread 0x7fbcfe1ff700 (LWP 2592588) "GaussMaster" gs_signal_receiver_thread (args=0x7fbd234e51b0) at gs_signal.cpp:893
信号接收函数:
void* gs_signal_receiver_thread(void* args) { sigset_t waitMask; /* wait below signals: SIGINT, SIGTERM, SIGQUIT, SIGHUP, SIGUSR1 */ sigemptyset(&waitMask); sigaddset(&waitMask, SIGINT); sigaddset(&waitMask, SIGTERM); sigaddset(&waitMask, SIGQUIT); sigaddset(&waitMask, SIGHUP); sigaddset(&waitMask, SIGUSR1); gs_signal_block_sigusr2(); /* add just for memcheck */ gs_thread_args_free(); for (;;) { int signo; /* Wait for signals arrival. */ sigwait(&waitMask, &signo); /* send signal to thread */ (void)gs_signal_send(PostmasterPid, signo); } return NULL; }
可见信号接收函数会sigwait等待 SIGINT, SIGTERM, SIGQUIT, SIGHUP, SIGUSR1 五种信号。
当这五种信号到达时:
使用 gs_signal_send 函数,将信号 signo 转发给 PostmasterPid 线程(注:这时Postmaster的线程TID,不是进程PID)转换成模拟信号。
以下是gssignalsend的代码逻辑:
#define RES_SIGNAL SIGUSR2 int gs_signal_send(ThreadId thread_id, int signo, int nowait) { 参数检查……; // 屏蔽用户信号,防止信号重入。 sigset_t old_sigset = gs_signal_block_sigusr2(); // 信号作为模拟信号发送给目标线程thread_id code = gs_signal_set_signal_by_threadid(thread_id, signo); // 给目标线程发送 SIGUSR2信号,等待目标信号处理模拟信号。 code = gs_signal_thread_kill(thread_id, RES_SIGNAL); // 取消屏蔽用户信号,允许信号继续进入。 gs_signal_recover_mask(old_sigset); return code; }
信号插槽用于处理模拟信号,即要解决:
哪个线程给哪个线程发送了什么信号?
可以结合 gs_signal_slots_init 函数来了解信号插槽数据结构。
体现数据结构的代码片段如下:
int PostmasterMain(int argc, char* argv[]) { …… gs_signal_slots_init(GLOBAL_ALL_PROCS + EXTERN_SLOTS_NUM); //信号插槽初始化 gs_signal_startup_siginfo("PostmasterMain"); gs_signal_monitor_startup(); …… } void gs_signal_slots_init(unsigned long int size) { g_instance.signal_base->slots = (GsSignalSlot*)MemoryContextAlloc(t_thrd.mem_cxt.gs_signal_mem_cxt, (sizeof(GsSignalSlot) * size)); /* create GsSignal for ever slot */ g_instance.signal_base->slots_size = size; for (loop = 0; loop < g_instance.signal_base->slots_size; loop++) { int cnt_nodes = ((loop > 0) ? SUB_HODLER_SIZE : size); // SUB_HODLER_SIZE = 100 GsSignalSlot* tmp_sig_slot = tmp_sig_slot = &(g_instance.signal_base->slots[loop]); tmp_sig_slot->gssignal = gs_signal_init(cnt_nodes); } } static GsSignal* gs_signal_init(int cnt_nodes) { GsSignal* gs_signal = (GsSignal*)MemoryContextAlloc(t_thrd.mem_cxt.gs_signal_mem_cxt, sizeof(struct GsSignal)); gs_signal_sigpool_init(gs_signal, cnt_nodes); return gs_signal; } static void gs_signal_sigpool_init(GsSignal* gs_signal, int cnt_nodes) { SignalPool* sigpool = &gs_signal->sig_pool; sigpool->free_head = (GsNode*)MemoryContextAlloc(t_thrd.mem_cxt.gs_signal_mem_cxt, (unsigned int)(cnt_nodes) * sizeof(GsNode)); for (loop = 0; loop < (unsigned int)cnt_nodes - 1; loop++) { sigpool->free_head[loop].next = &(sigpool->free_head[loop + 1]); } sigpool->free_head[cnt_nodes - 1].next = NULL; sigpool->free_head = &sigpool->free_head[0]; sigpool->free_tail = &sigpool->free_head[cnt_nodes - 1]; sigpool->used_head = NULL; sigpool->used_tail = NULL; sigpool->pool_size = cnt_nodes; }
typedef struct SignalPool { GsNode* free_head; /* the head of free signal list */ GsNode* free_tail; /* the tail of free signal list */ GsNode* used_head; /* the head of used signal list */ GsNode* used_tail; /* the tail of used signal list */ int pool_size; /* the size of the array list */ pthread_mutex_t sigpool_lock; } SignalPool;
信号源的管理使用两个链表,free_head和free_tail 管理尚未使用的信号源节点,而used_head和used_tail管理正在使用的信号源节点。
Postmaster(线程)的ServerLoop在侦听网络或者耗时操作过程中,会对信号的屏蔽状态进行切换:
static int ServerLoop(void) { …… // 所有模拟信号都不阻塞 gs_signal_setmask(&t_thrd.libpq_cxt.UnBlockSig, NULL); // SIGUSR2, SIGPROF, SIGSEGV, SIGBUS, SIGFPE, SIGILL, SIGSYS 不阻塞。 (void)gs_signal_unblock_sigusr2(); if (pmState == PM_WAIT_DEAD_END) { pg_usleep(100000L); /* 100 msec seems reasonable */ } else { poll 或者 select } /* * Block all signals until we wait again. (This makes it safe for our * signal handlers to do nontrivial work.) */ // 除了SIGTRAP, SIGABRT, SIGILL, SIGFPE, SIGSEGV, SIGBUS, SIGSYS, SIGCONT,其它模拟信号全部阻塞。 gs_signal_setmask(&t_thrd.libpq_cxt.BlockSig, NULL); // 除了SIGPROF, SIGSEGV, SIGBUS, SIGFPE, SIGILL, SIGSYS,其它信号全部阻塞。 gs_signal_block_sigusr2(); …… }
这个函数的作用是:
void gs_signal_startup_siginfo(char* thread_name) { pqinitmask(); (void)gs_signal_alloc_slot_for_new_thread(thread_name, gs_thread_self()); }
注册信号处理函数,用于处理 SIGUSR2(即RES_SIGNAL)模拟信号。
static gs_sigaction_func gs_signal_install_handler(void) { struct sigaction act, oact; sigemptyset(&act.sa_mask); act.sa_sigaction = gs_res_signal_handler; act.sa_flags = 0; act.sa_flags |= SA_SIGINFO; act.sa_flags |= SA_RESTART; sigaction(RES_SIGNAL, &act, &oact); return oact.sa_sigaction; }
以下两个函数主要作为调试用,在信号插槽加锁阶段标注是哪个函数在持有锁,例如:
static void gs_signal_location_base_signal_lock_info(const char* funname, int just_init); static void gs_signal_unlocation_base_signal_lock_info(void); static int gs_signal_thread_kill(ThreadId tid, int signo) { (void)pthread_mutex_lock(&(g_instance.signal_base->slots_lock)); gs_signal_location_base_signal_lock_info(__func__, 0); …… for (loop = 0; loop < g_instance.signal_base->slots_size; loop++) { …… } …… gs_signal_unlocation_base_signal_lock_info(); (void)pthread_mutex_unlock(&(g_instance.signal_base->slots_lock)); }
gs_ctl stop 默认工作在 FAST_MODE,即gs_ctl将向Postmaster进程发送 SIGINT(2) 信号。