<kernel v5.4>
RCU NO-CB (Offload RCU callback)
rcu의 콜백을 처리하는 유형은 큰 흐름으로 다음과 같이 두 가지로 나뉜다.
- cb 용
- 1) softirq(디폴트)에서 처리하거나, 2) cb용 콜백 처리 커널 스레드에서 처리된다.
- softirq로 처리될 때 interrupt context에서 곧장 호출되어 처리되고 만일 softirq 처리 건 수가 많아져 지연되는 경우 softirqd 커널 스레드에서 호출되어 처리된다.
- softirq로 처리되는 경우 latency가 짧아 빠른 호출이 보장된다.
- no-cb 용
- 전용 no-cb용 콜백 처리 커널 스레드에 위탁(offloaded) 시켜 처리하는 것으로 latency는 약간 저하되지만 절전과 성능을 만족시키는 옵션이다.
- nohz full cpu들을 지정하거나 no-cb cpu들을 지정하는 경우 no-cb용으로 위탁(offloaded)하여 사용한다.
- 예) “nohz_full=3-7”, 예) “rcu_nocbs=3-7”
- no-cb cpu 지정을 위해 CONFIG_RCU_NOCB_CPU 커널 옵션을 사용하고 다음 3가지 중 하나를 선택할 수 있다.
- CONFIG_RCU_NOCB_CPU_NONE
- 디폴트로 nocb 지정된 cpu는 없지만 “nocbs=” 커널 파라메터로 특정 cpu들을 nocb로 지정할 수 있다.
- 디폴트로 nocb 지정된 cpu는 없지만 “nocbs=” 커널 파라메터로 특정 cpu들을 nocb로 지정할 수 있다.
- CONFIG_RCU_NOCB_CPU_ZERO
- 디폴트로 cpu#0을 nocb 지정한다. 추가로 “nocbs=” 커널 파라메터를 사용하여 다른 cpu들을 nocb로 지정할 수 있다.
- CONFIG_RCU_NOCB_CPU_ALL
- 디폴트로 모든 cpu를 nocb로 지정한다.
- CONFIG_RCU_NOCB_CPU_NONE
bypass 콜백 리스트
커널 v5.4-rc1에서 각 cpu에 있는 seg 콜백 리스트에 3 군데 사용처(cb 호출, nocb gp 커널 스레드, nocb 커널 스레드)의 과도한 lock contention을 줄이기 위해 nocb_nobypass_lim_per_jiffy(디폴트: ms당 16개) 개 이상 유입되는 콜백을 등록하는 경우 일시적으로 bypass 콜백 리스트에 추가한 후 nocb_bypass 콜백 수가 qhimark를 초과하거나 새 틱으로 전환될 때 seg 콜백 리스트에 플러시하여 lock contention에 대한 부담을 줄여 성능을 높일 수 있도록 설계되었다.
- 참고: rcu/nocb: Add bypass callback queueing (2019, v5.4-rc1)
no-cb용 rcu 초기화
RCU NO-CB 설정
rcu_is_nocb_cpu()
kernel/rcu/tree_plugin.h
/* Is the specified CPU a no-CBs CPU? */
bool rcu_is_nocb_cpu(int cpu) { if (have_rcu_nocb_mask) return cpumask_test_cpu(cpu, rcu_nocb_mask); return false; }
요청한 cpu가 no-cb(rcu 스레드 사용)으로 설정되었는지 여부를 반환한다.
- have_rcu_nocb_mask 변수는 rcu_nocb_setup() 함수가 호출되어 rcu_nocb_mask라는 cpu 마스크가 할당된 경우 true로 설정된다.
- CONFIG_RCU_NOCB_CPU_ALL 커널 옵션을 사용하는 경우 항상 rcu 스레드에서 동작시키기 위해 true를 반환한다.
- CONFIG_RCU_NOCB_CPU_ALL 및 CONFIG_RCU_NOCB_CPU 커널 옵션 둘 다 사용하지 않는 경우 항상 callback 처리하기 위해 false를 반환한다.
rcu_nocb_setup()
kernel/rcu/tree_plugin.h
/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
static int __init rcu_nocb_setup(char *str) { alloc_bootmem_cpumask_var(&rcu_nocb_mask); have_rcu_nocb_mask = true; cpulist_parse(str, rcu_nocb_mask); return 1; } __setup("rcu_nocbs=", rcu_nocb_setup);
커널 파라메터 “rcu_nocbs=”에 cpu 리스트를 설정한다. 이렇게 설정된 cpu들은 rcu callback 처리를 rcu 스레드에서 처리할 수 있다.
- 예) rcu_nocbs=3-6,8-10
nocb용 gp 및 cb 커널 스레드의 구성
rcu_organize_nocb_kthreads()
kernel/rcu/tree_plugin.h
/* * Initialize GP-CB relationships for all no-CBs CPU. */
static void __init rcu_organize_nocb_kthreads(void) { int cpu; bool firsttime = true; int ls = rcu_nocb_gp_stride; int nl = 0; /* Next GP kthread. */ struct rcu_data *rdp; struct rcu_data *rdp_gp = NULL; /* Suppress misguided gcc warn. */ struct rcu_data *rdp_prev = NULL; if (!cpumask_available(rcu_nocb_mask)) return; if (ls == -1) { ls = nr_cpu_ids / int_sqrt(nr_cpu_ids); rcu_nocb_gp_stride = ls; } /* * Each pass through this loop sets up one rcu_data structure. * Should the corresponding CPU come online in the future, then * we will spawn the needed set of rcu_nocb_kthread() kthreads. */ for_each_cpu(cpu, rcu_nocb_mask) { rdp = per_cpu_ptr(&rcu_data, cpu); if (rdp->cpu >= nl) { /* New GP kthread, set up for CBs & next GP. */ nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls; rdp->nocb_gp_rdp = rdp; rdp_gp = rdp; if (!firsttime && dump_tree) pr_cont("\n"); firsttime = false; pr_alert("%s: No-CB GP kthread CPU %d:", __func__, cpu); } else { /* Another CB kthread, link to previous GP kthread. */ rdp->nocb_gp_rdp = rdp_gp; rdp_prev->nocb_next_cb_rdp = rdp; pr_alert(" %d", cpu); } rdp_prev = rdp; } }
no-cb용 gp 및 cb 커널 스레드를 구성한다.
- 코드 라인 11~12에서 커널 파라미터로 no-cb 지정된 cpu가 없는 경우 함수를 빠져나간다.
- 코드 라인 13~16에서 모듈 파라미터 rcu_nocb_gp_stride (디폴트=-1)가 아직 설정되지 않은 경우 다음과 같이 산출한 후 ls와 rcu_nocb_gp_stride에 대입한다.
- = cpu 수 / srqt(cpu 수)
- 코드 라인 23~41에서 no-cb용 cpu들을 순회하며 ls로 지정된 단위마다 각 첫 번째 cpu는 gp용 커널 스레드로 지정된다.
다음 그림은 cpu 수에 따라 구성되는 no-cb용 gp 커널 스레드와 cb 커널 스레드들을 보여준다.
NO-CB용 콜백 처리 커널 스레드
기존 커널에서 no-cb용 콜백 리스트를 별도로 분리 구성하여 사용했었는데 이를 없애고 cb용 세그먼티드 콜백리스트 방식을 사용하는 것으로 바꿔 지연도 없애며 OOM 발생 확률을 줄였다.
- 참고: rcu/nocb: Use rcu_segcblist for no-CBs CPUs (2019, v5.4-rc1)
rcu_nocb_cb_kthread()
kernel/rcu/tree_plugin.h
/* * Per-rcu_data kthread, but only for no-CBs CPUs. Repeatedly invoke * nocb_cb_wait() to do the dirty work. */
static int rcu_nocb_cb_kthread(void *arg) { struct rcu_data *rdp = arg; // Each pass through this loop does one callback batch, and, // if there are no more ready callbacks, waits for them. for (;;) { nocb_cb_wait(rdp); cond_resched_tasks_rcu_qs(); } return 0; }
cpu 마다 구성되는 no-cb용 콜백 처리 커널 스레드이다. 무한 루프를 돌며 대기 중인 콜백들을 처리한다.
nocb_cb_wait()
kernel/rcu/tree_plugin.h
/* * Invoke any ready callbacks from the corresponding no-CBs CPU, * then, if there are no more, wait for more to appear. */
static void nocb_cb_wait(struct rcu_data *rdp) { unsigned long cur_gp_seq; unsigned long flags; bool needwake_gp = false; struct rcu_node *rnp = rdp->mynode; local_irq_save(flags); rcu_momentary_dyntick_idle(); local_irq_restore(flags); local_bh_disable(); rcu_do_batch(rdp); local_bh_enable(); lockdep_assert_irqs_enabled(); rcu_nocb_lock_irqsave(rdp, flags); if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && rcu_seq_done(&rnp->gp_seq, cur_gp_seq) && raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */ needwake_gp = rcu_advance_cbs(rdp->mynode, rdp); raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */ } if (rcu_segcblist_ready_cbs(&rdp->cblist)) { rcu_nocb_unlock_irqrestore(rdp, flags); if (needwake_gp) rcu_gp_kthread_wake(); return; } trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep")); WRITE_ONCE(rdp->nocb_cb_sleep, true); rcu_nocb_unlock_irqrestore(rdp, flags); if (needwake_gp) rcu_gp_kthread_wake(); swait_event_interruptible_exclusive(rdp->nocb_cb_wq, !READ_ONCE(rdp->nocb_cb_sleep)); if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */ /* ^^^ Ensure CB invocation follows _sleep test. */ return; } WARN_ON(signal_pending(current)); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty")); }
준비 완료된 콜백들을 호출하여 처리한다. 없는 경우 대기한다.
- 코드 라인 8~10에서 irq를 disable한 채로 rcu core가 긴급하게 qs 상태를 알아야 할 때 수행된다.
- 코드 라인 11~13에서 bh를 disable한 채로 준비 완료된 콜백들을 호출한다.
- 코드 라인 16~21에서 콜백리스트에 다음 gp를 기다리는 콜백들이 있고 gp 시퀀스도 완료된 경우해당 노드의 lock을 획득한 채로 콜백들을 advance(cascade) 처리한다.
- 코드 라인 22~27에서 준비 완료된 콜백들이 존재하는 경우 함수를 빠져나가며, 필요 시gp 커널 스레드를 깨운다.
- 코드 라인 30~35에서 rdp->nocb_cb_sleep에 true를 대입한 후 rdp->nocb_cb_sleep을 외부에서 false로 변경시킬 때까지 슬립한다.
- 코드 라인 36~39에서 rdp->nocb_cb_sleep에 대한 메모리 베리어 처리를 수행한다.
다음 그림은 커널 v5.4부터 no-cb용 콜백 처리 커널 스레드가 cb용 segmented 콜백 리스트를 그대로 사용하여 운영되는 모습을 보여준다.
- 추가로 nocb_bypass 콜백 리스트의 운용도 보여주고 있다.
다음 그림은 기존 커널 v5.3까지에서 no-cb용 콜백 처리 커널 스레드가 leader/follower로 구성되어 동작하는 모습이다.
no-cb용 gp 커널 스레드
기존 커널에서 leader 커널 스레드는 콜백과 gp를 관리하고, follower 커널 스레드들은 콜백만을 처리하는 구조였다. 그러나 수 많은 수의 콜백을 leader가 처리해야 하는 상황에서 많은 follower cpu들이 gp 완료까지 대기를 하느라 OOM이 발생하는 현상이 벌어져 새 커널에서는 gp 커널 스레드를 별도의 커널 스레드로 제공한다.
- 참고: rcu/nocb: Provide separate no-CBs grace-period kthreads (2019, v5.4-rc1)
rcu_nocb_gp_kthread()
kernel/rcu/tree_plugin.h
/* * No-CBs grace-period-wait kthread. There is one of these per group * of CPUs, but only once at least one CPU in that group has come online * at least once since boot. This kthread checks for newly posted * callbacks from any of the CPUs it is responsible for, waits for a * grace period, then awakens all of the rcu_nocb_cb_kthread() instances * that then have callback-invocation work to do. */
static int rcu_nocb_gp_kthread(void *arg) { struct rcu_data *rdp = arg; for (;;) { WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1); nocb_gp_wait(rdp); cond_resched_tasks_rcu_qs(); } return 0; }
cpu들을 대상으로 cpu수의 제곱근마다 그룹을 분리하고 각 그룹의 첫 번째 cpu에 구성하는 no-cb용 gp 커널 스레드이다. 무한 루프를 돌며 no-cb용 gp를 기다린다.
nocb_gp_wait()
kernel/rcu/tree_plugin.h -1/2-
/* * No-CBs GP kthreads come here to wait for additional callbacks to show up * or for grace periods to end. */
static void nocb_gp_wait(struct rcu_data *my_rdp) { bool bypass = false; long bypass_ncbs; int __maybe_unused cpu = my_rdp->cpu; unsigned long cur_gp_seq; unsigned long flags; bool gotcbs; unsigned long j = jiffies; bool needwait_gp = false; // This prevents actual uninitialized use. bool needwake; bool needwake_gp; struct rcu_data *rdp; struct rcu_node *rnp; unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning. /* * Each pass through the following loop checks for CBs and for the * nearest grace period (if any) to wait for next. The CB kthreads * and the global grace-period kthread are awakened if needed. */ for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check")); rcu_nocb_lock_irqsave(rdp, flags); bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); if (bypass_ncbs && (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) || bypass_ncbs > 2 * qhimark)) { // Bypass full or old, so flush it. (void)rcu_nocb_try_flush_bypass(rdp, j); bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) { rcu_nocb_unlock_irqrestore(rdp, flags); continue; /* No callbacks here, try next. */ } if (bypass_ncbs) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Bypass")); bypass = true; } rnp = rdp->mynode; if (bypass) { // Avoid race with first bypass CB. WRITE_ONCE(my_rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); del_timer(&my_rdp->nocb_timer); } // Advance callbacks if helpful and low contention. needwake_gp = false; if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL) || (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) { raw_spin_lock_rcu_node(rnp); /* irqs disabled. */ needwake_gp = rcu_advance_cbs(rnp, rdp); raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */ } // Need to wait on some grace period? WARN_ON_ONCE(!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL)); if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) { if (!needwait_gp || ULONG_CMP_LT(cur_gp_seq, wait_gp_seq)) wait_gp_seq = cur_gp_seq; needwait_gp = true; trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("NeedWaitGP")); } if (rcu_segcblist_ready_cbs(&rdp->cblist)) { needwake = rdp->nocb_cb_sleep; WRITE_ONCE(rdp->nocb_cb_sleep, false); smp_mb(); /* CB invocation -after- GP end. */ } else { needwake = false; } rcu_nocb_unlock_irqrestore(rdp, flags); if (needwake) { swake_up_one(&rdp->nocb_cb_wq); gotcbs = true; } if (needwake_gp) rcu_gp_kthread_wake(); }
no-cb용 gp 커널 스레에서 호출되며 gp를 대기한다.
- 코드 라인 22~35에서 gp 커널 스레드가 관리하는 cpu들을 대상으로 순회하며 ->nocb_bypass 리스트에 콜백들이 있고, 1틱 이상 시간이 지났거나 bypass 대기 중인 콜백들이 qhimark(디폴트=10000)의 2배를 넘는 경우 이들을 flush 한다. 그렇지 않고 bypass 콜백도 없고 세그먼트 콜백 리스트도 비어 있는 경우 skip 한다.
- 코드 라인 36~46에서 여전히 bypass 콜백이 존재하는 경우 bypass를 true로 변경하고, no-cb 타이머를 제거한다.
- 코드 라인 48~56에서 NEXT 구간에 콜백들이 존재하거나 wait 구간에서 대기 중인 콜백의 gp 시퀀스가 이미 만료된 상태인 경우 콜백들을 advance(cascade) 처리한다.
- 코드 라인 60~67에서 대기 중인 콜백이 있는 경우 needwait_gp에 true를 대입한다. wait_gp_seq를 갱신하는데 wait 구간의 gp 시퀀스보다 작을 때에만 갱신한다.
- 코드 라인 68~81에서 준비 완료된 콜백들이 있는 경우 no-cb용 콜백 처리 커널 스레드를 깨우고, nocb_cb_sleep에 false를 대입하여 no-cb용 gp 커널 스레드를 깨운다. 그런 후 계속 다음 cpu를 처리하기 위해 루프를 돈다.
kernel/rcu/tree_plugin.h -2/2-
my_rdp->nocb_gp_bypass = bypass; my_rdp->nocb_gp_gp = needwait_gp; my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0; if (bypass && !rcu_nocb_poll) { // At least one child with non-empty ->nocb_bypass, so set // timer in order to avoid stranding its callbacks. raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); mod_timer(&my_rdp->nocb_bypass_timer, j + 2); raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); } if (rcu_nocb_poll) { /* Polling, so trace if first poll in the series. */ if (gotcbs) trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll")); schedule_timeout_interruptible(1); } else if (!needwait_gp) { /* Wait for callbacks to appear. */ trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep")); swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq, !READ_ONCE(my_rdp->nocb_gp_sleep)); trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep")); } else { rnp = my_rdp->mynode; trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait")); swait_event_interruptible_exclusive( rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1], rcu_seq_done(&rnp->gp_seq, wait_gp_seq) || !READ_ONCE(my_rdp->nocb_gp_sleep)); trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait")); } if (!rcu_nocb_poll) { raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags); if (bypass) del_timer(&my_rdp->nocb_bypass_timer); WRITE_ONCE(my_rdp->nocb_gp_sleep, true); raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags); } my_rdp->nocb_gp_seq = -1; WARN_ON(signal_pending(current)); }
- 코드 라인 1에서 no-cb용 bypass 콜백이 지난 gp에서 스캔되었느지 여부를 대입한다.
- 코드 라인 2에서 no-cb용 다음 gp를 대기해야 하는지 요청 여부를 지정한다.
- 코드 라인 3에서 no-cb용 gp 시퀀스 번호를 대입한다. 만일 다음 gp 요청이 없는 경우 0을 대입한다.
- 코드 라인 4~10에서 bypass 콜백이 있고 no-cb용 cb 커널 스레드가 폴링을 하지 않는 경우 no-cb용 bypass 타이머를 2틱 뒤의 시각으로 설정한다.
- 코드 라인 11~15에서 no-cb용 cb 커널 스레드가 폴링을 하는 경우 1틱 슬립한다.
- 코드 라인 16~21에서 gp 요청이 없는 경우 rdp->nocb_gp_sleep이 외부에서 false를 대입할 때까지 슬립하며 대기한다.
- 코드 라인 22~30에서 그 외의 경우 대기 중인 gp 시퀀스가 종료되거나 rdp->nocb_gp_sleep이 외부에서 false를 대입할 때까지 슬립하며 대기한다.
- 코드 라인 31~37에서 no-cb용 cb 커널 스레드가 폴링을 하지 않는 경우 ->nocb_gp_sleep에 true를 대입한다. 만일 bypass 콜백이 있는 경우 bypass 타이머를 제거한다.
- 코드 라인 38에서 no-cb용 gp 시퀀스에 -1을 대입한다.
no-cb용 gp wakeup 타이머
do_nocb_deferred_wakeup()
kernel/rcu/tree_plugin.h
/* * Do a deferred wakeup of rcu_nocb_kthread() from fastpath. * This means we do an inexact common-case check. Note that if * we miss, ->nocb_timer will eventually clean things up. */
static void do_nocb_deferred_wakeup(struct rcu_data *rdp) { if (rcu_nocb_need_deferred_wakeup(rdp)) do_nocb_deferred_wakeup_common(rdp); }
no-cb용 gp 커널스레드의 deferred wakeup 요청을 처리한다.
rcu_nocb_need_deferred_wakeup()
kernel/rcu/tree_plugin.h
/* Is a deferred wakeup of rcu_nocb_kthread() required? */
static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp) { return READ_ONCE(rdp->nocb_defer_wakeup); }
no-cb용 커널스레드의 deferred wakeup이 요청되었는지 여부를 반환한다.
deferred wakeup 요청 값
kernel/rcu/tree.h”
/* Values for nocb_defer_wakeup field in struct rcu_data. */
#define RCU_NOCB_WAKE_NOT 0 #define RCU_NOCB_WAKE 1 #define RCU_NOCB_WAKE_FORCE 2
- RCU_NOCB_WAKE_NOT
- wake 요청하지 않은 상태
- RCU_NOCB_WAKE
- wake 요청한 상태
- RCU_NOCB_WAKE_FORCE
- wake 요청을 강제한 상태
deferred wakeup 타이머 핸들러
do_nocb_deferred_wakeup_timer()
kernel/rcu/tree_plugin.h
/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
static void do_nocb_deferred_wakeup_timer(struct timer_list *t) { struct rcu_data *rdp = from_timer(rdp, t, nocb_timer); do_nocb_deferred_wakeup_common(rdp); }
no-cb용 커널스레드의 deferred wakeup이 요청된 경우 no-cb용 gp 커널 스레드를 깨운다.
do_nocb_deferred_wakeup_common()
kernel/rcu/tree_plugin.h
/* Do a deferred wakeup of rcu_nocb_kthread(). */
static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp) { unsigned long flags; int ndw; rcu_nocb_lock_irqsave(rdp, flags); if (!rcu_nocb_need_deferred_wakeup(rdp)) { rcu_nocb_unlock_irqrestore(rdp, flags); return; } ndw = READ_ONCE(rdp->nocb_defer_wakeup); WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT); wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake")); }
no-cb용 커널스레드의 deferred wakeup이 요청된 경우 no-cb용 gp 커널 스레드를 깨운다.
- 코드 라인 7~10에서 no-cb용 커널스레드의 wakeup 유예가 요청된 경우가 아니면 함수를 빠져나간다.
- 코드 라인 11~12용 nocb_defer_wakeup 요청을 RCU_NOCB_WAKE_NOT(0)으로 클리어하고, 기존 값은 ndw로 알아온다.
- 코드 라인 13에서 gp 커널 스레드를 깨운다.
- 기존 값이 RCU_NOCB_WAKE_FORCE(2)인 경우 wakeup을 강제한다.
no-cb용 gp 커널 스레드 깨우기
wake_nocb_gp()
kernel/rcu/tree_plugin.h
/* * Kick the GP kthread for this NOCB group. Caller holds ->nocb_lock * and this function releases it. */
static void wake_nocb_gp(struct rcu_data *rdp, bool force, unsigned long flags) __releases(rdp->nocb_lock) { bool needwake = false; struct rcu_data *rdp_gp = rdp->nocb_gp_rdp; lockdep_assert_held(&rdp->nocb_lock); if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("AlreadyAwake")); rcu_nocb_unlock_irqrestore(rdp, flags); return; } del_timer(&rdp->nocb_timer); rcu_nocb_unlock_irqrestore(rdp, flags); raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags); if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) { WRITE_ONCE(rdp_gp->nocb_gp_sleep, false); needwake = true; trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake")); } raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags); if (needwake) wake_up_process(rdp_gp->nocb_gp_kthread); }
no-cb용 gp 커널 스레드를 깨운다.
- 코드 라인 6에서 현재 cpu에 대응하여 gp를 관리하는 cpu의 rdp를 알아온다.
- 코드 라인 9~14에서 gp용 cpu의 no-cb용 gp 커널 스레드가 아직 준비되지 않은 경우 함수를 빠져나간다.
- 코드 라인 15에서 no-cb용 타이머를 삭제한다.
- 코드 라인 18~25에서 입력 인자 @force가 요청되었거나 rdp_gp->nocb_gp_sleep이 true로 no-cb용 gp 커널 스레드가 슬립하며 대기 중인 경우 스레드를 깨운다.
no-cb용 gp 커널 스레드 지연시켜 깨우기
wake_nocb_gp_defer()
kernel/rcu/tree_plugin.h
/* * Arrange to wake the GP kthread for this NOCB group at some future * time when it is safe to do so. */
static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype, const char *reason) { if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT) mod_timer(&rdp->nocb_timer, jiffies + 1); if (rdp->nocb_defer_wakeup < waketype) WRITE_ONCE(rdp->nocb_defer_wakeup, waketype); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason); }
no-cb용 gp 커널 스레드를 @waketype에 맞춰 deferred wakeup 요청한다.
- 코드 라인 4~5에서 기존 설정 값(rdp->nocb_defer_wakeup)이 RCU_NOCB_WAKE_NOT(0) 상태에서 요청된 경우 1틱을 지연시켜 no-cb용 gp 커널 스레드를 깨우기 위해 타이머를 변경한다.
- 코드 라인 6~7에서 rdp->nocb_defer_wakeup 보다 @wakeup이 큰 경우에 한해 갱신한다.
no-cb용 bypass 타이머
bypass 타이머 핸들러
do_nocb_bypass_wakeup_timer()
kernel/rcu/tree_plugin.h
/* Wake up the no-CBs GP kthread to flush ->nocb_bypass. */
static void do_nocb_bypass_wakeup_timer(struct timer_list *t) { unsigned long flags; struct rcu_data *rdp = from_timer(rdp, t, nocb_bypass_timer); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer")); rcu_nocb_lock_irqsave(rdp, flags); smp_mb__after_spinlock(); /* Timer expire before wakeup. */ __call_rcu_nocb_wake(rdp, true, flags); }
nocb_bypass 리스트의 콜백들을 모두 처리하기 위해 nocb용 gp kthread를 깨운다.
__call_rcu_nocb_wake()
kernel/rcu/tree_plugin.h
/* * Awaken the no-CBs grace-period kthead if needed, either due to it * legitimately being asleep or due to overload conditions. * * If warranted, also wake up the kthread servicing this CPUs queues. */
static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone, unsigned long flags) __releases(rdp->nocb_lock) { unsigned long cur_gp_seq; unsigned long j; long len; struct task_struct *t; // If we are being polled or there is no kthread, just leave. t = READ_ONCE(rdp->nocb_gp_kthread); if (rcu_nocb_poll || !t) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNotPoll")); rcu_nocb_unlock_irqrestore(rdp, flags); return; } // Need to actually to a wakeup. len = rcu_segcblist_n_cbs(&rdp->cblist); if (was_alldone) { rdp->qlen_last_fqs_check = len; if (!irqs_disabled_flags(flags)) { /* ... if queue was empty ... */ wake_nocb_gp(rdp, false, flags); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeEmpty")); } else { wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE, TPS("WakeEmptyIsDeferred")); rcu_nocb_unlock_irqrestore(rdp, flags); } } else if (len > rdp->qlen_last_fqs_check + qhimark) { /* ... or if many callbacks queued. */ rdp->qlen_last_fqs_check = len; j = jiffies; if (j != rdp->nocb_gp_adv_time && rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) { rcu_advance_cbs_nowake(rdp->mynode, rdp); rdp->nocb_gp_adv_time = j; } smp_mb(); /* Enqueue before timer_pending(). */ if ((rdp->nocb_cb_sleep || !rcu_segcblist_ready_cbs(&rdp->cblist)) && !timer_pending(&rdp->nocb_bypass_timer)) wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE, TPS("WakeOvfIsDeferred")); rcu_nocb_unlock_irqrestore(rdp, flags); } else { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot")); rcu_nocb_unlock_irqrestore(rdp, flags); } return; }
nocb_bypass 리스트의 콜백들을 모두 처리하기 위해 nocb용 gp kthread를 깨운다.
- 코드 라인 11~17에서 아직 해당 cpu의 no-cb용 gp 커널 스레드가 지정되지 않은 경우 그냥 함수를 빠져나간다.
- 코드 라인 19에서 세그먼트 콜백 리스트의 전체 콜백 수를 len에 알아온다.
- 코드 라인 20~31에서 두 번째 인자 @was_alldone이 설정된 경우 nocb용 gp 커널 스레드를 깨운다. irq disable 상태에서는 1 틱 만큼 지연시켜 깨운다.
- 코드 라인 32~48에서 너무 많은 콜백이 처리를 기다리고 있는 중인 경우이다. nocb_gp_adv_time 틱이 흘러 변경되었거나 다음(wait) 구간의 gp 시퀀스가 완료된 경우 nocb_gp_adv_time 을 현재 시각으로 갱신하고 콜백을 advance(cascade) 처리한다. 만일 no-cb용 gp 커널 스레드가 슬립하여 대기중이거나 세그먼트 콜백 리스트에 완료된 콜백이 없는 경우이면서 bypass 타이머가 설정되어 있지 않은 경우 no-cb용 gp 커널 스레드를 wakeup 강제(force)한다.
- 코드 라인 49~52에서 그 외의 경우 nocb 언락한다.
bypass 리스트에 콜백 추가 시도
rcu_nocb_try_bypass()
kernel/rcu/tree_plugin.h -1/2-
/* * See whether it is appropriate to use the ->nocb_bypass list in order * to control contention on ->nocb_lock. A limited number of direct * enqueues are permitted into ->cblist per jiffy. If ->nocb_bypass * is non-empty, further callbacks must be placed into ->nocb_bypass, * otherwise rcu_barrier() breaks. Use rcu_nocb_flush_bypass() to switch * back to direct use of ->cblist. However, ->nocb_bypass should not be * used if ->cblist is empty, because otherwise callbacks can be stranded * on ->nocb_bypass because we cannot count on the current CPU ever again * invoking call_rcu(). The general rule is that if ->nocb_bypass is * non-empty, the corresponding no-CBs grace-period kthread must not be * in an indefinite sleep state. * * Finally, it is not permitted to use the bypass during early boot, * as doing so would confuse the auto-initialization code. Besides * which, there is no point in worrying about lock contention while * there is only one CPU in operation. */
static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp, bool *was_alldone, unsigned long flags) { unsigned long c; unsigned long cur_gp_seq; unsigned long j = jiffies; long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); if (!rcu_segcblist_is_offloaded(&rdp->cblist)) { *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); return false; /* Not offloaded, no bypassing. */ } lockdep_assert_irqs_disabled(); // Don't use ->nocb_bypass during early boot. if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) { rcu_nocb_lock(rdp); WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); return false; } // If we have advanced to a new jiffy, reset counts to allow // moving back from ->nocb_bypass to ->cblist. if (j == rdp->nocb_nobypass_last) { c = rdp->nocb_nobypass_count + 1; } else { WRITE_ONCE(rdp->nocb_nobypass_last, j); c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy; if (ULONG_CMP_LT(rdp->nocb_nobypass_count, nocb_nobypass_lim_per_jiffy)) c = 0; else if (c > nocb_nobypass_lim_per_jiffy) c = nocb_nobypass_lim_per_jiffy; } WRITE_ONCE(rdp->nocb_nobypass_count, c); // If there hasn't yet been all that many ->cblist enqueues // this jiffy, tell the caller to enqueue onto ->cblist. But flush // ->nocb_bypass first. if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) { rcu_nocb_lock(rdp); *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); if (*was_alldone) trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstQ")); WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j)); WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); return false; // Caller must enqueue the callback. }
no-cb용 bypass 리스트에 콜백을 추가한다. 만일 성공하는 경우 true를 반환한다.
- 코드 라인 7에서 nocb_bypass 리스트에 존재하는 콜백들의 수를 ncbs에 알아온다.
- 코드 라인 9~12에서 오프로드되지 않은 경우 false를 반환한다. 또한 펜딩 콜백이 없는지 여부를 출력 인자 @was_alldone에 대입한다.
- 코드 라인 16~21에서 rcu 스케줄러가 아직 준비되지 않은 경우 false를 반환한다. 또한 펜딩 콜백이 없는지 여부를 출력 인자 @was_alldone에 대입한다.
- 코드 라인 25~36에서 새 틱에 진입할 때마다 틱당 nobypass 카운터 제한 수(nocb_nobypass_lim_per_jiffy) 이하로 제한한다.
- 틱 변화가 없는 경우엔 +1 증가시킨다.
- 새 틱에 진입한 경우 nocb_nobypass_count 를 0 ~ nocb_nobypass_lim_per_jiffy 값으로 제한하여 갱신한다.
- 코드 라인 41~50에서 nocb_nobypass_count 수가 틱당 제한 수 미만으로 적게 유입되는 경우 nocb_bypass 리스트에서 처리하지 않고 원래의 세그먼트 콜백 리스트에 추가하기 위해 false를 반환한다. 또한 펜딩 콜백이 없는지 여부를 출력 인자 @was_alldone에 대입한다.
- nocb_nobypass_lim_per_jiffy
- 틱 당 유입되는 콜백 수로 이 값을 초과하는 경우에만 nocb_bypass 리스트에 추가한다.
- nocb_nobypass_lim_per_jiffy
kernel/rcu/tree_plugin.h -2/2-
// If ->nocb_bypass has been used too long or is too full, // flush ->nocb_bypass to ->cblist. if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) || ncbs >= qhimark) { rcu_nocb_lock(rdp); if (!rcu_nocb_flush_bypass(rdp, rhp, j)) { *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist); if (*was_alldone) trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstQ")); WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass)); return false; // Caller must enqueue the callback. } if (j != rdp->nocb_gp_adv_time && rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) && rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) { rcu_advance_cbs_nowake(rdp->mynode, rdp); rdp->nocb_gp_adv_time = j; } rcu_nocb_unlock_irqrestore(rdp, flags); return true; // Callback already enqueued. } // We need to use the bypass. rcu_nocb_wait_contended(rdp); rcu_nocb_bypass_lock(rdp); ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass); rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ rcu_cblist_enqueue(&rdp->nocb_bypass, rhp); if (!ncbs) { WRITE_ONCE(rdp->nocb_bypass_first, j); trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ")); } rcu_nocb_bypass_unlock(rdp); smp_mb(); /* Order enqueue before wake. */ if (ncbs) { local_irq_restore(flags); } else { // No-CBs GP kthread might be indefinitely asleep, if so, wake. rcu_nocb_lock(rdp); // Rare during call_rcu() flood. if (!rcu_segcblist_pend_cbs(&rdp->cblist)) { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQwake")); __call_rcu_nocb_wake(rdp, true, flags); } else { trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQnoWake")); rcu_nocb_unlock_irqrestore(rdp, flags); } } return true; // Callback already enqueued. }
- 코드 라인 3~22에서 첫 콜백이 추가된 후 1 틱이 지났거나 nocb_bypas 콜백들이 qhimark(디폴트=10000) 보다 많은 경우 이들을 한꺼번에 세그먼트 콜백리스트에 플러시하고 함수를 true로 빠져나가는 것으로 매 콜백 건마다 수행되는 lock 컨텐션을 줄인다.
- 코드 라인 25~26에서 no-cb 락 contention이 없어질때까지 스핀하며 대기한 후 락을 획득한다.
- 코드 라인 27~34에서 nocb_bypass 리스트에 콜백을 하나 추가하고 언락한다. 만일 nocb_bypass 리스트에 콜백이 처음 추가된 경우 nocb_bypass_first에 현재 시각을 갱신한다.
- 코드 라인 36~50에서 nocb_bypass 리스트가 비어있는 상태에서 첫 콜백으로 추가되었고 세그먼트 콜백 리스트에서 대기중인 콜백이 없는 경우 no-cb용 gp 커널 스레드를 깨운다.
- 코드 라인 51에서 true를 반환한다.
다음 그림은 콜백을 offload 하여 운영하는 경우 과도하게 유입되는 콜백을 nocb_bypass 리스트에 추가하고, 1틱이 지났거나 10000개 이상 누적되는 경우 flush 하는 것으로 lock contention을 줄여 성능을 올리기 위해 사용하는 모습을 보여준다.
rcu_nocb_flush_bypass()
kernel/rcu/tree_plugin.h
/* * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL. * However, if there is a callback to be enqueued and if ->nocb_bypass * proves to be initially empty, just return false because the no-CB GP * kthread may need to be awakened in this case. * * Note that this function always returns true if rhp is NULL. */
static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, unsigned long j) { if (!rcu_segcblist_is_offloaded(&rdp->cblist)) return true; rcu_lockdep_assert_cblist_protected(rdp); rcu_nocb_bypass_lock(rdp); return rcu_nocb_do_flush_bypass(rdp, rhp, j); }
nocb_bypass 리스트의 콜백들을 모두 세그먼트 콜백리스트에 옮긴다.
- 코드 라인 4~5에서 콜백 오프로드되지 않은 경우 true를 반환한다.
- 코드 라인 8에서 nocb_bypass 리스트의 콜백들을 모두 세그먼트 콜백리스트에 옮긴다. 성공한 경우 true를 반환한다.
rcu_nocb_do_flush_bypass()
kernel/rcu/tree_plugin.h
/* * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL. * However, if there is a callback to be enqueued and if ->nocb_bypass * proves to be initially empty, just return false because the no-CB GP * kthread may need to be awakened in this case. * * Note that this function always returns true if rhp is NULL. */
static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp, unsigned long j) { struct rcu_cblist rcl; WARN_ON_ONCE(!rcu_segcblist_is_offloaded(&rdp->cblist)); rcu_lockdep_assert_cblist_protected(rdp); lockdep_assert_held(&rdp->nocb_bypass_lock); if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) { raw_spin_unlock(&rdp->nocb_bypass_lock); return false; } /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */ if (rhp) rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */ rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp); rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl); WRITE_ONCE(rdp->nocb_bypass_first, j); rcu_nocb_bypass_unlock(rdp); return true; }
nocb_bypass 리스트의 콜백들을 모두 세그먼트 콜백리스트에 옮긴다. 성공한 경우 true를 반환한다.
- 코드 라인 9~12에서 nocb_bypass 리스트에 콜백들이 하나도 없는 경우 false를 반환한다.
- 코드 라인 14~15에서 인자로 주어진 콜백 @rhp가 있는 경우 콜백 수를 1 증가시킨다.
- 코드 라인 16에서 nocb_bypass 리스트의 콜백을 내부 임시 리스트에 옮긴다. 이 때 콜백도 하나 추가한다.
- 코드 라인 17에서 내부 임시 리스트에 옮겨진 콜백들을 세그먼트 콜백리스트의 next 구간에 추가한다.
- 코드 라인 18에서 nocb_bypass_first에 현재 시각을 갱신한다.
- 코드 라인 20에서 정상적으로 옮겼으므로 true를 반환한다.
참고
- RCU(Read Copy Update) -1- (Basic) | 문c
- RCU(Read Copy Update) -2- (Callback process) | 문c
- RCU(Read Copy Update) -3- (RCU threads) | 문c
- RCU(Read Copy Update) -4- (NOCB process) | 문c – 현재글
- RCU(Read Copy Update) -5- (Callback list) | 문c
- RCU(Read Copy Update) -6- (Expedited GP) | 문c
- RCU(Read Copy Update) -7- (Preemptible RCU) | 문c
- rcu_init() | 문c
- wait_for_completion() | 문c