RCU(Read Copy Update) -4- (NOCB process)

<kernel v5.4>

RCU NO-CB (Offload RCU callback)

rcu의 콜백을 처리하는 유형은 큰 흐름으로 다음과 같이 두 가지로 나뉜다.

  • cb 용
    • 1) softirq(디폴트)에서 처리하거나,  2) cb용 콜백 처리 커널 스레드에서 처리된다.
    • softirq로 처리될 때 interrupt context에서 곧장 호출되어 처리되고 만일 softirq 처리 건 수가 많아져 지연되는 경우 softirqd 커널 스레드에서 호출되어 처리된다.
    • softirq로 처리되는 경우 latency가 짧아 빠른 호출이 보장된다.
  • no-cb 용
    • 전용 no-cb용 콜백 처리 커널 스레드에 위탁(offloaded) 시켜 처리하는 것으로 latency는 약간 저하되지만 절전과 성능을 만족시키는 옵션이다.
    • nohz full cpu들을 지정하거나 no-cb cpu들을 지정하는 경우 no-cb용으로 위탁(offloaded)하여 사용한다.
      • 예) “nohz_full=3-7”, 예) “rcu_nocbs=3-7”
    • no-cb cpu 지정을 위해 CONFIG_RCU_NOCB_CPU 커널 옵션을 사용하고 다음 3가지 중 하나를 선택할 수 있다.
      • CONFIG_RCU_NOCB_CPU_NONE
        • 디폴트로 nocb 지정된 cpu는 없지만 “nocbs=” 커널 파라메터로 특정 cpu들을 nocb로 지정할 수 있다.
      • CONFIG_RCU_NOCB_CPU_ZERO
        • 디폴트로 cpu#0을 nocb 지정한다. 추가로 “nocbs=” 커널 파라메터를 사용하여 다른 cpu들을 nocb로 지정할 수 있다.
      • CONFIG_RCU_NOCB_CPU_ALL
        • 디폴트로 모든 cpu를 nocb로 지정한다.

 

bypass 콜백 리스트

커널 v5.4-rc1에서 각 cpu에 있는 seg 콜백 리스트에 3 군데 사용처(cb 호출, nocb gp 커널 스레드, nocb 커널 스레드)의 과도한 lock contention을 줄이기 위해 nocb_nobypass_lim_per_jiffy(디폴트: ms당 16개) 개 이상 유입되는 콜백을 등록하는 경우 일시적으로 bypass 콜백 리스트에 추가한 후 nocb_bypass 콜백 수가 qhimark를 초과하거나 새 틱으로 전환될 때 seg 콜백 리스트에 플러시하여 lock contention에 대한 부담을 줄여 성능을 높일 수 있도록 설계되었다.

 


no-cb용 rcu 초기화

RCU NO-CB 설정

rcu_is_nocb_cpu()

kernel/rcu/tree_plugin.h

/* Is the specified CPU a no-CBs CPU? */
bool rcu_is_nocb_cpu(int cpu)
{
        if (have_rcu_nocb_mask)
                return cpumask_test_cpu(cpu, rcu_nocb_mask);
        return false;
}

요청한 cpu가 no-cb(rcu 스레드 사용)으로 설정되었는지 여부를 반환한다.

  • have_rcu_nocb_mask 변수는 rcu_nocb_setup() 함수가 호출되어 rcu_nocb_mask라는 cpu 마스크가 할당된 경우 true로 설정된다.
  • CONFIG_RCU_NOCB_CPU_ALL 커널 옵션을 사용하는 경우 항상 rcu 스레드에서 동작시키기 위해 true를 반환한다.
  • CONFIG_RCU_NOCB_CPU_ALL 및 CONFIG_RCU_NOCB_CPU 커널 옵션 둘 다 사용하지 않는 경우 항상 callback 처리하기 위해 false를 반환한다.

 

rcu_nocb_setup()

kernel/rcu/tree_plugin.h

/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
static int __init rcu_nocb_setup(char *str)
{
        alloc_bootmem_cpumask_var(&rcu_nocb_mask);
        have_rcu_nocb_mask = true;
        cpulist_parse(str, rcu_nocb_mask);
        return 1;
}
__setup("rcu_nocbs=", rcu_nocb_setup);

커널 파라메터 “rcu_nocbs=”에 cpu 리스트를 설정한다. 이렇게 설정된 cpu들은 rcu callback 처리를 rcu 스레드에서 처리할 수 있다.

  • 예) rcu_nocbs=3-6,8-10

 

nocb용 gp 및 cb 커널 스레드의 구성

rcu_organize_nocb_kthreads()

kernel/rcu/tree_plugin.h

/*
 * Initialize GP-CB relationships for all no-CBs CPU.
 */
static void __init rcu_organize_nocb_kthreads(void)
{
        int cpu;
        bool firsttime = true;
        int ls = rcu_nocb_gp_stride;
        int nl = 0;  /* Next GP kthread. */
        struct rcu_data *rdp;
        struct rcu_data *rdp_gp = NULL;  /* Suppress misguided gcc warn. */
        struct rcu_data *rdp_prev = NULL;

        if (!cpumask_available(rcu_nocb_mask))
                return;
        if (ls == -1) {
                ls = nr_cpu_ids / int_sqrt(nr_cpu_ids);
                rcu_nocb_gp_stride = ls;
        }

        /*
         * Each pass through this loop sets up one rcu_data structure.
         * Should the corresponding CPU come online in the future, then
         * we will spawn the needed set of rcu_nocb_kthread() kthreads.
         */
        for_each_cpu(cpu, rcu_nocb_mask) {
                rdp = per_cpu_ptr(&rcu_data, cpu);
                if (rdp->cpu >= nl) {
                        /* New GP kthread, set up for CBs & next GP. */
                        nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
                        rdp->nocb_gp_rdp = rdp;
                        rdp_gp = rdp;
                        if (!firsttime && dump_tree)
                                pr_cont("\n");
                        firsttime = false;
                        pr_alert("%s: No-CB GP kthread CPU %d:", __func__, cpu);
                } else {
                        /* Another CB kthread, link to previous GP kthread. */
                        rdp->nocb_gp_rdp = rdp_gp;
                        rdp_prev->nocb_next_cb_rdp = rdp;
                        pr_alert(" %d", cpu);
                }
                rdp_prev = rdp;
        }
}

no-cb용 gp 및 cb 커널 스레드를 구성한다.

  • 코드 라인 11~12에서 커널 파라미터로 no-cb 지정된 cpu가 없는 경우 함수를 빠져나간다.
  • 코드 라인 13~16에서 모듈 파라미터 rcu_nocb_gp_stride (디폴트=-1)가 아직 설정되지 않은 경우 다음과 같이 산출한 후 ls와 rcu_nocb_gp_stride에 대입한다.
    • = cpu 수 / srqt(cpu 수)
  • 코드 라인 23~41에서 no-cb용 cpu들을 순회하며 ls로 지정된 단위마다 각 첫 번째 cpu는 gp용 커널 스레드로 지정된다.

 

다음 그림은 cpu 수에 따라 구성되는 no-cb용 gp 커널 스레드와 cb 커널 스레드들을 보여준다.

 


NO-CB용 콜백 처리 커널 스레드

기존 커널에서 no-cb용 콜백 리스트를 별도로 분리 구성하여 사용했었는데 이를 없애고 cb용 세그먼티드 콜백리스트 방식을 사용하는 것으로 바꿔 지연도 없애며 OOM 발생 확률을 줄였다.

 

rcu_nocb_cb_kthread()

kernel/rcu/tree_plugin.h

/*
 * Per-rcu_data kthread, but only for no-CBs CPUs.  Repeatedly invoke
 * nocb_cb_wait() to do the dirty work.
 */
static int rcu_nocb_cb_kthread(void *arg)
{
        struct rcu_data *rdp = arg;

        // Each pass through this loop does one callback batch, and,
        // if there are no more ready callbacks, waits for them.
        for (;;) {
                nocb_cb_wait(rdp);
                cond_resched_tasks_rcu_qs();
        }
        return 0;
}

cpu 마다 구성되는 no-cb용 콜백 처리 커널 스레드이다. 무한 루프를 돌며 대기 중인 콜백들을 처리한다.

 

nocb_cb_wait()

kernel/rcu/tree_plugin.h

/*
 * Invoke any ready callbacks from the corresponding no-CBs CPU,
 * then, if there are no more, wait for more to appear.
 */
static void nocb_cb_wait(struct rcu_data *rdp)
{
        unsigned long cur_gp_seq;
        unsigned long flags;
        bool needwake_gp = false;
        struct rcu_node *rnp = rdp->mynode;

        local_irq_save(flags);
        rcu_momentary_dyntick_idle();
        local_irq_restore(flags);
        local_bh_disable();
        rcu_do_batch(rdp);
        local_bh_enable();
        lockdep_assert_irqs_enabled();
        rcu_nocb_lock_irqsave(rdp, flags);
        if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
            rcu_seq_done(&rnp->gp_seq, cur_gp_seq) &&
            raw_spin_trylock_rcu_node(rnp)) { /* irqs already disabled. */
                needwake_gp = rcu_advance_cbs(rdp->mynode, rdp);
                raw_spin_unlock_rcu_node(rnp); /* irqs remain disabled. */
        }
        if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
                rcu_nocb_unlock_irqrestore(rdp, flags);
                if (needwake_gp)
                        rcu_gp_kthread_wake();
                return;
        }

        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("CBSleep"));
        WRITE_ONCE(rdp->nocb_cb_sleep, true);
        rcu_nocb_unlock_irqrestore(rdp, flags);
        if (needwake_gp)
                rcu_gp_kthread_wake();
        swait_event_interruptible_exclusive(rdp->nocb_cb_wq,
                                 !READ_ONCE(rdp->nocb_cb_sleep));
        if (!smp_load_acquire(&rdp->nocb_cb_sleep)) { /* VVV */
                /* ^^^ Ensure CB invocation follows _sleep test. */
                return;
        }
        WARN_ON(signal_pending(current));
        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WokeEmpty"));
}

준비 완료된 콜백들을 호출하여 처리한다. 없는 경우 대기한다.

  • 코드 라인 8~10에서 irq를 disable한 채로 rcu core가 긴급하게 qs 상태를 알아야 할 때 수행된다.
  • 코드 라인 11~13에서 bh를 disable한 채로 준비 완료된 콜백들을 호출한다.
  • 코드 라인 16~21에서 콜백리스트에 다음 gp를 기다리는 콜백들이 있고 gp 시퀀스도 완료된 경우해당 노드의 lock을 획득한 채로 콜백들을 advance(cascade) 처리한다.
  • 코드 라인 22~27에서 준비 완료된 콜백들이 존재하는 경우 함수를 빠져나가며, 필요 시gp 커널 스레드를 깨운다.
  • 코드 라인 30~35에서 rdp->nocb_cb_sleep에 true를 대입한 후 rdp->nocb_cb_sleep을 외부에서 false로 변경시킬 때까지 슬립한다.
  • 코드 라인 36~39에서 rdp->nocb_cb_sleep에 대한 메모리 베리어 처리를 수행한다.

 

다음 그림은 커널 v5.4부터 no-cb용 콜백 처리 커널 스레드가 cb용 segmented 콜백 리스트를 그대로 사용하여 운영되는 모습을 보여준다.

  • 추가로 nocb_bypass 콜백 리스트의 운용도 보여주고 있다.

 

다음 그림은 기존 커널 v5.3까지에서 no-cb용 콜백 처리 커널 스레드가 leader/follower로 구성되어 동작하는 모습이다.

 


no-cb용 gp 커널 스레드

기존 커널에서 leader 커널 스레드는 콜백과 gp를 관리하고, follower 커널 스레드들은 콜백만을 처리하는 구조였다. 그러나 수 많은 수의 콜백을 leader가 처리해야 하는 상황에서 많은 follower cpu들이 gp 완료까지 대기를 하느라 OOM이 발생하는 현상이 벌어져 새 커널에서는 gp 커널 스레드를 별도의 커널 스레드로 제공한다.

 

rcu_nocb_gp_kthread()

kernel/rcu/tree_plugin.h

/*
 * No-CBs grace-period-wait kthread.  There is one of these per group
 * of CPUs, but only once at least one CPU in that group has come online
 * at least once since boot.  This kthread checks for newly posted
 * callbacks from any of the CPUs it is responsible for, waits for a
 * grace period, then awakens all of the rcu_nocb_cb_kthread() instances
 * that then have callback-invocation work to do.
 */
static int rcu_nocb_gp_kthread(void *arg)
{
        struct rcu_data *rdp = arg;

        for (;;) {
                WRITE_ONCE(rdp->nocb_gp_loops, rdp->nocb_gp_loops + 1);
                nocb_gp_wait(rdp);
                cond_resched_tasks_rcu_qs();
        }
        return 0;
}

cpu들을 대상으로 cpu수의 제곱근마다 그룹을 분리하고 각 그룹의 첫 번째 cpu에 구성하는 no-cb용 gp 커널 스레드이다. 무한 루프를 돌며 no-cb용 gp를 기다린다.

 

nocb_gp_wait()

kernel/rcu/tree_plugin.h -1/2-

/*
 * No-CBs GP kthreads come here to wait for additional callbacks to show up
 * or for grace periods to end.
 */
static void nocb_gp_wait(struct rcu_data *my_rdp)
{
        bool bypass = false;
        long bypass_ncbs;
        int __maybe_unused cpu = my_rdp->cpu;
        unsigned long cur_gp_seq;
        unsigned long flags;
        bool gotcbs;
        unsigned long j = jiffies;
        bool needwait_gp = false; // This prevents actual uninitialized use.
        bool needwake;
        bool needwake_gp;
        struct rcu_data *rdp;
        struct rcu_node *rnp;
        unsigned long wait_gp_seq = 0; // Suppress "use uninitialized" warning.

        /*
         * Each pass through the following loop checks for CBs and for the
         * nearest grace period (if any) to wait for next.  The CB kthreads
         * and the global grace-period kthread are awakened if needed.
         */
        for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_cb_rdp) {
                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Check"));
                rcu_nocb_lock_irqsave(rdp, flags);
                bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
                if (bypass_ncbs &&
                    (time_after(j, READ_ONCE(rdp->nocb_bypass_first) + 1) ||
                     bypass_ncbs > 2 * qhimark)) {
                        // Bypass full or old, so flush it.
                        (void)rcu_nocb_try_flush_bypass(rdp, j);
                        bypass_ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
                } else if (!bypass_ncbs && rcu_segcblist_empty(&rdp->cblist)) {
                        rcu_nocb_unlock_irqrestore(rdp, flags);
                        continue; /* No callbacks here, try next. */
                }
                if (bypass_ncbs) {
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                            TPS("Bypass"));
                        bypass = true;
                }
                rnp = rdp->mynode;
                if (bypass) {  // Avoid race with first bypass CB.
                        WRITE_ONCE(my_rdp->nocb_defer_wakeup,
                                   RCU_NOCB_WAKE_NOT);
                        del_timer(&my_rdp->nocb_timer);
                }
                // Advance callbacks if helpful and low contention.
                needwake_gp = false;
                if (!rcu_segcblist_restempty(&rdp->cblist,
                                             RCU_NEXT_READY_TAIL) ||
                    (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
                     rcu_seq_done(&rnp->gp_seq, cur_gp_seq))) {
                        raw_spin_lock_rcu_node(rnp); /* irqs disabled. */
                        needwake_gp = rcu_advance_cbs(rnp, rdp);
                        raw_spin_unlock_rcu_node(rnp); /* irqs disabled. */
                }
                // Need to wait on some grace period?
                WARN_ON_ONCE(!rcu_segcblist_restempty(&rdp->cblist,
                                                      RCU_NEXT_READY_TAIL));
                if (rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq)) {
                        if (!needwait_gp ||
                            ULONG_CMP_LT(cur_gp_seq, wait_gp_seq))
                                wait_gp_seq = cur_gp_seq;
                        needwait_gp = true;
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                            TPS("NeedWaitGP"));
                }
                if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
                        needwake = rdp->nocb_cb_sleep;
                        WRITE_ONCE(rdp->nocb_cb_sleep, false);
                        smp_mb(); /* CB invocation -after- GP end. */
                } else {
                        needwake = false;
                }
                rcu_nocb_unlock_irqrestore(rdp, flags);
                if (needwake) {
                        swake_up_one(&rdp->nocb_cb_wq);
                        gotcbs = true;
                }
                if (needwake_gp)
                        rcu_gp_kthread_wake();
        }

no-cb용 gp 커널 스레에서 호출되며 gp를 대기한다.

  • 코드 라인 22~35에서 gp 커널 스레드가 관리하는 cpu들을 대상으로 순회하며 ->nocb_bypass 리스트에 콜백들이 있고, 1틱 이상 시간이 지났거나 bypass 대기 중인 콜백들이 qhimark(디폴트=10000)의 2배를 넘는 경우 이들을 flush 한다. 그렇지 않고 bypass 콜백도 없고 세그먼트 콜백 리스트도 비어 있는 경우 skip 한다.
  • 코드 라인 36~46에서 여전히 bypass 콜백이 존재하는 경우 bypass를 true로 변경하고, no-cb 타이머를 제거한다.
  • 코드 라인 48~56에서 NEXT 구간에 콜백들이 존재하거나 wait 구간에서 대기 중인 콜백의 gp 시퀀스가 이미 만료된 상태인 경우 콜백들을 advance(cascade) 처리한다.
  • 코드 라인 60~67에서 대기 중인 콜백이 있는 경우 needwait_gp에 true를 대입한다. wait_gp_seq를 갱신하는데 wait 구간의 gp 시퀀스보다 작을 때에만 갱신한다.
  • 코드 라인 68~81에서 준비 완료된 콜백들이 있는 경우 no-cb용 콜백 처리 커널 스레드를 깨우고, nocb_cb_sleep에 false를 대입하여 no-cb용 gp 커널 스레드를 깨운다. 그런 후 계속 다음 cpu를 처리하기 위해 루프를 돈다.

 

kernel/rcu/tree_plugin.h -2/2-

        my_rdp->nocb_gp_bypass = bypass;
        my_rdp->nocb_gp_gp = needwait_gp;
        my_rdp->nocb_gp_seq = needwait_gp ? wait_gp_seq : 0;
        if (bypass && !rcu_nocb_poll) {
                // At least one child with non-empty ->nocb_bypass, so set
                // timer in order to avoid stranding its callbacks.
                raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
                mod_timer(&my_rdp->nocb_bypass_timer, j + 2);
                raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
        }
        if (rcu_nocb_poll) {
                /* Polling, so trace if first poll in the series. */
                if (gotcbs)
                        trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Poll"));
                schedule_timeout_interruptible(1);
        } else if (!needwait_gp) {
                /* Wait for callbacks to appear. */
                trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("Sleep"));
                swait_event_interruptible_exclusive(my_rdp->nocb_gp_wq,
                                !READ_ONCE(my_rdp->nocb_gp_sleep));
                trace_rcu_nocb_wake(rcu_state.name, cpu, TPS("EndSleep"));
        } else {
                rnp = my_rdp->mynode;
                trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("StartWait"));
                swait_event_interruptible_exclusive(
                        rnp->nocb_gp_wq[rcu_seq_ctr(wait_gp_seq) & 0x1],
                        rcu_seq_done(&rnp->gp_seq, wait_gp_seq) ||
                        !READ_ONCE(my_rdp->nocb_gp_sleep));
                trace_rcu_this_gp(rnp, my_rdp, wait_gp_seq, TPS("EndWait"));
        }
        if (!rcu_nocb_poll) {
                raw_spin_lock_irqsave(&my_rdp->nocb_gp_lock, flags);
                if (bypass)
                        del_timer(&my_rdp->nocb_bypass_timer);
                WRITE_ONCE(my_rdp->nocb_gp_sleep, true);
                raw_spin_unlock_irqrestore(&my_rdp->nocb_gp_lock, flags);
        }
        my_rdp->nocb_gp_seq = -1;
        WARN_ON(signal_pending(current));
}
  • 코드 라인 1에서 no-cb용 bypass 콜백이 지난 gp에서 스캔되었느지 여부를 대입한다.
  • 코드 라인 2에서 no-cb용 다음 gp를 대기해야 하는지 요청 여부를 지정한다.
  • 코드 라인 3에서 no-cb용 gp 시퀀스 번호를 대입한다. 만일 다음 gp 요청이 없는 경우 0을 대입한다.
  • 코드 라인 4~10에서 bypass 콜백이 있고 no-cb용 cb 커널 스레드가 폴링을 하지 않는 경우 no-cb용 bypass 타이머를 2틱 뒤의 시각으로 설정한다.
  • 코드 라인 11~15에서 no-cb용 cb 커널 스레드가 폴링을 하는 경우 1틱 슬립한다.
  • 코드 라인 16~21에서 gp 요청이 없는 경우 rdp->nocb_gp_sleep이 외부에서 false를 대입할 때까지 슬립하며 대기한다.
  • 코드 라인 22~30에서 그 외의 경우 대기 중인 gp 시퀀스가 종료되거나 rdp->nocb_gp_sleep이 외부에서 false를 대입할 때까지 슬립하며 대기한다.
  • 코드 라인 31~37에서 no-cb용 cb 커널 스레드가 폴링을 하지 않는 경우 ->nocb_gp_sleep에 true를 대입한다. 만일 bypass 콜백이 있는 경우 bypass 타이머를 제거한다.
  • 코드 라인 38에서 no-cb용 gp 시퀀스에 -1을 대입한다.

 


no-cb용 gp wakeup 타이머

do_nocb_deferred_wakeup()

kernel/rcu/tree_plugin.h

/*
 * Do a deferred wakeup of rcu_nocb_kthread() from fastpath.
 * This means we do an inexact common-case check.  Note that if
 * we miss, ->nocb_timer will eventually clean things up.
 */
static void do_nocb_deferred_wakeup(struct rcu_data *rdp)
{
        if (rcu_nocb_need_deferred_wakeup(rdp))
                do_nocb_deferred_wakeup_common(rdp);
}

no-cb용 gp 커널스레드의 deferred wakeup 요청을 처리한다.

 

rcu_nocb_need_deferred_wakeup()

kernel/rcu/tree_plugin.h

/* Is a deferred wakeup of rcu_nocb_kthread() required? */
static int rcu_nocb_need_deferred_wakeup(struct rcu_data *rdp)
{
        return READ_ONCE(rdp->nocb_defer_wakeup);
}

no-cb용 커널스레드의 deferred wakeup이 요청되었는지 여부를 반환한다.

 

deferred wakeup 요청 값

kernel/rcu/tree.h”

/* Values for nocb_defer_wakeup field in struct rcu_data. */
#define RCU_NOCB_WAKE_NOT       0
#define RCU_NOCB_WAKE           1
#define RCU_NOCB_WAKE_FORCE     2
  • RCU_NOCB_WAKE_NOT
    • wake 요청하지 않은 상태
  • RCU_NOCB_WAKE
    • wake 요청한 상태
  • RCU_NOCB_WAKE_FORCE
    • wake 요청을 강제한 상태

 

deferred wakeup 타이머 핸들러

do_nocb_deferred_wakeup_timer()

kernel/rcu/tree_plugin.h

/* Do a deferred wakeup of rcu_nocb_kthread() from a timer handler. */
static void do_nocb_deferred_wakeup_timer(struct timer_list *t)
{
        struct rcu_data *rdp = from_timer(rdp, t, nocb_timer);

        do_nocb_deferred_wakeup_common(rdp);
}

no-cb용 커널스레드의 deferred wakeup이 요청된 경우 no-cb용 gp 커널 스레드를 깨운다.

 

do_nocb_deferred_wakeup_common()

kernel/rcu/tree_plugin.h

/* Do a deferred wakeup of rcu_nocb_kthread(). */
static void do_nocb_deferred_wakeup_common(struct rcu_data *rdp)
{
        unsigned long flags;
        int ndw;

        rcu_nocb_lock_irqsave(rdp, flags);
        if (!rcu_nocb_need_deferred_wakeup(rdp)) {
                rcu_nocb_unlock_irqrestore(rdp, flags);
                return;
        }
        ndw = READ_ONCE(rdp->nocb_defer_wakeup);
        WRITE_ONCE(rdp->nocb_defer_wakeup, RCU_NOCB_WAKE_NOT);
        wake_nocb_gp(rdp, ndw == RCU_NOCB_WAKE_FORCE, flags);
        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DeferredWake"));
}

no-cb용 커널스레드의 deferred wakeup이 요청된 경우 no-cb용 gp 커널 스레드를 깨운다.

  • 코드 라인 7~10에서 no-cb용 커널스레드의 wakeup 유예가 요청된 경우가 아니면 함수를 빠져나간다.
  • 코드 라인 11~12용 nocb_defer_wakeup 요청을 RCU_NOCB_WAKE_NOT(0)으로 클리어하고, 기존 값은 ndw로 알아온다.
  • 코드 라인 13에서 gp 커널 스레드를 깨운다.
    • 기존 값이 RCU_NOCB_WAKE_FORCE(2)인 경우 wakeup을 강제한다.

 

no-cb용 gp 커널 스레드 깨우기

wake_nocb_gp()

kernel/rcu/tree_plugin.h

/*
 * Kick the GP kthread for this NOCB group.  Caller holds ->nocb_lock
 * and this function releases it.
 */
static void wake_nocb_gp(struct rcu_data *rdp, bool force,
                           unsigned long flags)
        __releases(rdp->nocb_lock)
{
        bool needwake = false;
        struct rcu_data *rdp_gp = rdp->nocb_gp_rdp;

        lockdep_assert_held(&rdp->nocb_lock);
        if (!READ_ONCE(rdp_gp->nocb_gp_kthread)) {
                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                    TPS("AlreadyAwake"));
                rcu_nocb_unlock_irqrestore(rdp, flags);
                return;
        }
        del_timer(&rdp->nocb_timer);
        rcu_nocb_unlock_irqrestore(rdp, flags);
        raw_spin_lock_irqsave(&rdp_gp->nocb_gp_lock, flags);
        if (force || READ_ONCE(rdp_gp->nocb_gp_sleep)) {
                WRITE_ONCE(rdp_gp->nocb_gp_sleep, false);
                needwake = true;
                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("DoWake"));
        }
        raw_spin_unlock_irqrestore(&rdp_gp->nocb_gp_lock, flags);
        if (needwake)
                wake_up_process(rdp_gp->nocb_gp_kthread);
}

no-cb용 gp 커널 스레드를 깨운다.

  • 코드 라인 6에서 현재 cpu에 대응하여 gp를 관리하는 cpu의 rdp를 알아온다.
  • 코드 라인 9~14에서 gp용 cpu의 no-cb용 gp 커널 스레드가 아직 준비되지 않은 경우 함수를 빠져나간다.
  • 코드 라인 15에서 no-cb용 타이머를 삭제한다.
  • 코드 라인 18~25에서 입력 인자 @force가 요청되었거나 rdp_gp->nocb_gp_sleep이 true로 no-cb용 gp 커널 스레드가 슬립하며 대기 중인 경우 스레드를 깨운다.

 

no-cb용 gp 커널 스레드 지연시켜 깨우기

wake_nocb_gp_defer()

kernel/rcu/tree_plugin.h

/*
 * Arrange to wake the GP kthread for this NOCB group at some future
 * time when it is safe to do so.
 */
static void wake_nocb_gp_defer(struct rcu_data *rdp, int waketype,
                               const char *reason)
{
        if (rdp->nocb_defer_wakeup == RCU_NOCB_WAKE_NOT)
                mod_timer(&rdp->nocb_timer, jiffies + 1);
        if (rdp->nocb_defer_wakeup < waketype)
                WRITE_ONCE(rdp->nocb_defer_wakeup, waketype);
        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, reason);
}

no-cb용 gp 커널 스레드를 @waketype에 맞춰 deferred wakeup 요청한다.

  • 코드 라인 4~5에서 기존 설정 값(rdp->nocb_defer_wakeup)이 RCU_NOCB_WAKE_NOT(0) 상태에서 요청된 경우 1틱을 지연시켜 no-cb용 gp 커널 스레드를 깨우기 위해 타이머를 변경한다.
  • 코드 라인 6~7에서 rdp->nocb_defer_wakeup 보다 @wakeup이 큰 경우에 한해 갱신한다.

 


no-cb용 bypass 타이머

 

bypass 타이머 핸들러

do_nocb_bypass_wakeup_timer()

kernel/rcu/tree_plugin.h

/* Wake up the no-CBs GP kthread to flush ->nocb_bypass. */
static void do_nocb_bypass_wakeup_timer(struct timer_list *t)
{
        unsigned long flags;
        struct rcu_data *rdp = from_timer(rdp, t, nocb_bypass_timer);

        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("Timer"));
        rcu_nocb_lock_irqsave(rdp, flags);
        smp_mb__after_spinlock(); /* Timer expire before wakeup. */
        __call_rcu_nocb_wake(rdp, true, flags);
}

nocb_bypass 리스트의 콜백들을 모두 처리하기 위해 nocb용 gp kthread를 깨운다.

 

__call_rcu_nocb_wake()

kernel/rcu/tree_plugin.h

/*
 * Awaken the no-CBs grace-period kthead if needed, either due to it
 * legitimately being asleep or due to overload conditions.
 *
 * If warranted, also wake up the kthread servicing this CPUs queues.
 */
static void __call_rcu_nocb_wake(struct rcu_data *rdp, bool was_alldone,
                                 unsigned long flags)
                                 __releases(rdp->nocb_lock)
{
        unsigned long cur_gp_seq;
        unsigned long j;
        long len;
        struct task_struct *t;

        // If we are being polled or there is no kthread, just leave.
        t = READ_ONCE(rdp->nocb_gp_kthread);
        if (rcu_nocb_poll || !t) {
                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                    TPS("WakeNotPoll"));
                rcu_nocb_unlock_irqrestore(rdp, flags);
                return;
        }
        // Need to actually to a wakeup.
        len = rcu_segcblist_n_cbs(&rdp->cblist);
        if (was_alldone) {
                rdp->qlen_last_fqs_check = len;
                if (!irqs_disabled_flags(flags)) {
                        /* ... if queue was empty ... */
                        wake_nocb_gp(rdp, false, flags);
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                            TPS("WakeEmpty"));
                } else {
                        wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE,
                                           TPS("WakeEmptyIsDeferred"));
                        rcu_nocb_unlock_irqrestore(rdp, flags);
                }
        } else if (len > rdp->qlen_last_fqs_check + qhimark) {
                /* ... or if many callbacks queued. */
                rdp->qlen_last_fqs_check = len;
                j = jiffies;
                if (j != rdp->nocb_gp_adv_time &&
                    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
                    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
                        rcu_advance_cbs_nowake(rdp->mynode, rdp);
                        rdp->nocb_gp_adv_time = j;
                }
                smp_mb(); /* Enqueue before timer_pending(). */
                if ((rdp->nocb_cb_sleep ||
                     !rcu_segcblist_ready_cbs(&rdp->cblist)) &&
                    !timer_pending(&rdp->nocb_bypass_timer))
                        wake_nocb_gp_defer(rdp, RCU_NOCB_WAKE_FORCE,
                                           TPS("WakeOvfIsDeferred"));
                rcu_nocb_unlock_irqrestore(rdp, flags);
        } else {
                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("WakeNot"));
                rcu_nocb_unlock_irqrestore(rdp, flags);
        }
        return;
}

nocb_bypass 리스트의 콜백들을 모두 처리하기 위해 nocb용 gp kthread를 깨운다.

  • 코드 라인 11~17에서 아직 해당 cpu의 no-cb용 gp 커널 스레드가 지정되지 않은 경우 그냥 함수를 빠져나간다.
  • 코드 라인 19에서 세그먼트 콜백 리스트의 전체 콜백 수를 len에 알아온다.
  • 코드 라인 20~31에서 두 번째 인자 @was_alldone이 설정된 경우 nocb용 gp 커널 스레드를 깨운다. irq disable 상태에서는 1 틱 만큼 지연시켜 깨운다.
  • 코드 라인 32~48에서 너무 많은 콜백이 처리를 기다리고 있는 중인 경우이다. nocb_gp_adv_time 틱이 흘러 변경되었거나 다음(wait) 구간의 gp 시퀀스가 완료된 경우 nocb_gp_adv_time 을 현재 시각으로 갱신하고 콜백을 advance(cascade) 처리한다. 만일 no-cb용 gp 커널 스레드가 슬립하여 대기중이거나 세그먼트 콜백 리스트에 완료된 콜백이 없는 경우이면서 bypass 타이머가 설정되어 있지 않은 경우 no-cb용 gp 커널 스레드를 wakeup 강제(force)한다.
  • 코드 라인 49~52에서 그 외의 경우 nocb 언락한다.

 


bypass 리스트에 콜백 추가 시도

rcu_nocb_try_bypass()

kernel/rcu/tree_plugin.h -1/2-

/*
 * See whether it is appropriate to use the ->nocb_bypass list in order
 * to control contention on ->nocb_lock.  A limited number of direct
 * enqueues are permitted into ->cblist per jiffy.  If ->nocb_bypass
 * is non-empty, further callbacks must be placed into ->nocb_bypass,
 * otherwise rcu_barrier() breaks.  Use rcu_nocb_flush_bypass() to switch
 * back to direct use of ->cblist.  However, ->nocb_bypass should not be
 * used if ->cblist is empty, because otherwise callbacks can be stranded
 * on ->nocb_bypass because we cannot count on the current CPU ever again
 * invoking call_rcu().  The general rule is that if ->nocb_bypass is
 * non-empty, the corresponding no-CBs grace-period kthread must not be
 * in an indefinite sleep state.
 *
 * Finally, it is not permitted to use the bypass during early boot,
 * as doing so would confuse the auto-initialization code.  Besides
 * which, there is no point in worrying about lock contention while
 * there is only one CPU in operation.
 */
static bool rcu_nocb_try_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
                                bool *was_alldone, unsigned long flags)
{
        unsigned long c;
        unsigned long cur_gp_seq;
        unsigned long j = jiffies;
        long ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);

        if (!rcu_segcblist_is_offloaded(&rdp->cblist)) {
                *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
                return false; /* Not offloaded, no bypassing. */
        }
        lockdep_assert_irqs_disabled();

        // Don't use ->nocb_bypass during early boot.
        if (rcu_scheduler_active != RCU_SCHEDULER_RUNNING) {
                rcu_nocb_lock(rdp);
                WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
                *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
                return false;
        }

        // If we have advanced to a new jiffy, reset counts to allow
        // moving back from ->nocb_bypass to ->cblist.
        if (j == rdp->nocb_nobypass_last) {
                c = rdp->nocb_nobypass_count + 1;
        } else {
                WRITE_ONCE(rdp->nocb_nobypass_last, j);
                c = rdp->nocb_nobypass_count - nocb_nobypass_lim_per_jiffy;
                if (ULONG_CMP_LT(rdp->nocb_nobypass_count,
                                 nocb_nobypass_lim_per_jiffy))
                        c = 0;
                else if (c > nocb_nobypass_lim_per_jiffy)
                        c = nocb_nobypass_lim_per_jiffy;
        }
        WRITE_ONCE(rdp->nocb_nobypass_count, c);

        // If there hasn't yet been all that many ->cblist enqueues
        // this jiffy, tell the caller to enqueue onto ->cblist.  But flush
        // ->nocb_bypass first.
        if (rdp->nocb_nobypass_count < nocb_nobypass_lim_per_jiffy) {
                rcu_nocb_lock(rdp);
                *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
                if (*was_alldone)
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                            TPS("FirstQ"));
                WARN_ON_ONCE(!rcu_nocb_flush_bypass(rdp, NULL, j));
                WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
                return false; // Caller must enqueue the callback.
        }

no-cb용 bypass 리스트에 콜백을 추가한다. 만일 성공하는 경우 true를 반환한다.

  • 코드 라인 7에서 nocb_bypass 리스트에 존재하는 콜백들의 수를 ncbs에 알아온다.
  • 코드 라인 9~12에서 오프로드되지 않은 경우 false를 반환한다. 또한 펜딩 콜백이 없는지 여부를 출력 인자 @was_alldone에 대입한다.
  • 코드 라인 16~21에서 rcu 스케줄러가 아직 준비되지 않은 경우 false를 반환한다. 또한 펜딩 콜백이 없는지 여부를 출력 인자 @was_alldone에 대입한다.
  • 코드 라인 25~36에서 새 틱에 진입할 때마다 틱당 nobypass 카운터 제한 수(nocb_nobypass_lim_per_jiffy) 이하로 제한한다.
    • 틱 변화가 없는 경우엔 +1 증가시킨다.
    • 새 틱에 진입한 경우 nocb_nobypass_count 를 0 ~ nocb_nobypass_lim_per_jiffy 값으로 제한하여 갱신한다.
  • 코드 라인 41~50에서 nocb_nobypass_count 수가 틱당 제한 수 미만으로 적게 유입되는 경우 nocb_bypass 리스트에서 처리하지 않고 원래의 세그먼트 콜백 리스트에 추가하기 위해 false를 반환한다. 또한 펜딩 콜백이 없는지 여부를 출력 인자 @was_alldone에 대입한다.
    • nocb_nobypass_lim_per_jiffy
      • 틱 당 유입되는 콜백 수로 이 값을 초과하는 경우에만 nocb_bypass 리스트에 추가한다.

 

kernel/rcu/tree_plugin.h -2/2-

        // If ->nocb_bypass has been used too long or is too full,
        // flush ->nocb_bypass to ->cblist.
        if ((ncbs && j != READ_ONCE(rdp->nocb_bypass_first)) ||
            ncbs >= qhimark) {
                rcu_nocb_lock(rdp);
                if (!rcu_nocb_flush_bypass(rdp, rhp, j)) {
                        *was_alldone = !rcu_segcblist_pend_cbs(&rdp->cblist);
                        if (*was_alldone)
                                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                                    TPS("FirstQ"));
                        WARN_ON_ONCE(rcu_cblist_n_cbs(&rdp->nocb_bypass));
                        return false; // Caller must enqueue the callback.
                }
                if (j != rdp->nocb_gp_adv_time &&
                    rcu_segcblist_nextgp(&rdp->cblist, &cur_gp_seq) &&
                    rcu_seq_done(&rdp->mynode->gp_seq, cur_gp_seq)) {
                        rcu_advance_cbs_nowake(rdp->mynode, rdp);
                        rdp->nocb_gp_adv_time = j;
                }
                rcu_nocb_unlock_irqrestore(rdp, flags);
                return true; // Callback already enqueued.
        }

        // We need to use the bypass.
        rcu_nocb_wait_contended(rdp);
        rcu_nocb_bypass_lock(rdp);
        ncbs = rcu_cblist_n_cbs(&rdp->nocb_bypass);
        rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
        rcu_cblist_enqueue(&rdp->nocb_bypass, rhp);
        if (!ncbs) {
                WRITE_ONCE(rdp->nocb_bypass_first, j);
                trace_rcu_nocb_wake(rcu_state.name, rdp->cpu, TPS("FirstBQ"));
        }
        rcu_nocb_bypass_unlock(rdp);
        smp_mb(); /* Order enqueue before wake. */
        if (ncbs) {
                local_irq_restore(flags);
        } else {
                // No-CBs GP kthread might be indefinitely asleep, if so, wake.
                rcu_nocb_lock(rdp); // Rare during call_rcu() flood.
                if (!rcu_segcblist_pend_cbs(&rdp->cblist)) {
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                            TPS("FirstBQwake"));
                        __call_rcu_nocb_wake(rdp, true, flags);
                } else {
                        trace_rcu_nocb_wake(rcu_state.name, rdp->cpu,
                                            TPS("FirstBQnoWake"));
                        rcu_nocb_unlock_irqrestore(rdp, flags);
                }
        }
        return true; // Callback already enqueued.
}
  • 코드 라인 3~22에서 첫 콜백이 추가된 후 1 틱이 지났거나 nocb_bypas 콜백들이 qhimark(디폴트=10000) 보다 많은 경우 이들을 한꺼번에 세그먼트 콜백리스트에 플러시하고 함수를 true로 빠져나가는 것으로 매 콜백 건마다 수행되는 lock 컨텐션을 줄인다.
  • 코드 라인 25~26에서 no-cb 락 contention이 없어질때까지 스핀하며 대기한 후 락을 획득한다.
  • 코드 라인 27~34에서 nocb_bypass 리스트에 콜백을 하나 추가하고 언락한다. 만일 nocb_bypass 리스트에 콜백이 처음 추가된 경우 nocb_bypass_first에 현재 시각을 갱신한다.
  • 코드 라인 36~50에서 nocb_bypass 리스트가 비어있는 상태에서 첫 콜백으로 추가되었고 세그먼트 콜백 리스트에서 대기중인 콜백이 없는 경우 no-cb용 gp 커널 스레드를 깨운다.
  • 코드 라인 51에서 true를 반환한다.

 

다음 그림은 콜백을 offload 하여 운영하는 경우 과도하게 유입되는 콜백을 nocb_bypass 리스트에 추가하고, 1틱이 지났거나 10000개 이상 누적되는 경우 flush 하는 것으로 lock contention을 줄여 성능을 올리기 위해 사용하는 모습을 보여준다.

 

rcu_nocb_flush_bypass()

kernel/rcu/tree_plugin.h

/*
 * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
 * However, if there is a callback to be enqueued and if ->nocb_bypass
 * proves to be initially empty, just return false because the no-CB GP
 * kthread may need to be awakened in this case.
 *
 * Note that this function always returns true if rhp is NULL.
 */
static bool rcu_nocb_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
                                  unsigned long j)
{
        if (!rcu_segcblist_is_offloaded(&rdp->cblist))
                return true;
        rcu_lockdep_assert_cblist_protected(rdp);
        rcu_nocb_bypass_lock(rdp);
        return rcu_nocb_do_flush_bypass(rdp, rhp, j);
}

nocb_bypass 리스트의 콜백들을 모두 세그먼트 콜백리스트에 옮긴다.

  • 코드 라인 4~5에서 콜백 오프로드되지 않은 경우 true를 반환한다.
  • 코드 라인 8에서 nocb_bypass 리스트의 콜백들을 모두 세그먼트 콜백리스트에 옮긴다. 성공한 경우 true를 반환한다.

 

rcu_nocb_do_flush_bypass()

kernel/rcu/tree_plugin.h

/*
 * Flush the ->nocb_bypass queue into ->cblist, enqueuing rhp if non-NULL.
 * However, if there is a callback to be enqueued and if ->nocb_bypass
 * proves to be initially empty, just return false because the no-CB GP
 * kthread may need to be awakened in this case.
 *
 * Note that this function always returns true if rhp is NULL.
 */
static bool rcu_nocb_do_flush_bypass(struct rcu_data *rdp, struct rcu_head *rhp,
                                     unsigned long j)
{
        struct rcu_cblist rcl;

        WARN_ON_ONCE(!rcu_segcblist_is_offloaded(&rdp->cblist));
        rcu_lockdep_assert_cblist_protected(rdp);
        lockdep_assert_held(&rdp->nocb_bypass_lock);
        if (rhp && !rcu_cblist_n_cbs(&rdp->nocb_bypass)) {
                raw_spin_unlock(&rdp->nocb_bypass_lock);
                return false;
        }
        /* Note: ->cblist.len already accounts for ->nocb_bypass contents. */
        if (rhp)
                rcu_segcblist_inc_len(&rdp->cblist); /* Must precede enqueue. */
        rcu_cblist_flush_enqueue(&rcl, &rdp->nocb_bypass, rhp);
        rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rcl);
        WRITE_ONCE(rdp->nocb_bypass_first, j);
        rcu_nocb_bypass_unlock(rdp);
        return true;
}

nocb_bypass 리스트의 콜백들을 모두 세그먼트 콜백리스트에 옮긴다. 성공한 경우 true를 반환한다.

  • 코드 라인 9~12에서 nocb_bypass 리스트에 콜백들이 하나도 없는 경우 false를 반환한다.
  • 코드 라인 14~15에서 인자로 주어진 콜백 @rhp가 있는 경우 콜백 수를 1 증가시킨다.
  • 코드 라인 16에서 nocb_bypass 리스트의 콜백을 내부 임시 리스트에 옮긴다. 이 때 콜백도 하나 추가한다.
  • 코드 라인 17에서 내부 임시 리스트에 옮겨진 콜백들을 세그먼트 콜백리스트의 next 구간에 추가한다.
  • 코드 라인 18에서 nocb_bypass_first에 현재 시각을 갱신한다.
  • 코드 라인 20에서 정상적으로 옮겼으므로 true를 반환한다.

 

참고

 

댓글 남기기