RCU(Read Copy Update) -4- (NOCB process)

 

RCU NO-CB (Offload RCU callback)

rcu의 cb를 처리하는 유형은 큰 흐름으로 다음과 같이 두 가지로 나뉜다.

  • cb 호출 – interrupt context에서 직접 처리
    • softirq에서 처리된다. 이러한 경우 보통 interrupt context에서 곧장 호출되어 처리되고 만일 softirq 처리 건 수가 많아져 지연되는 경우 softirqd 커널 스레드에서 호출되어 처리된다.
    • 디폴트 커널 설정이며 latency가 짧아 빠른 호출이 보장된다.
  • no-cb 호출 – 전용 rcu 커널 스레드에서 처리
    • 특정 rcu 커널 스레드에서 cb를 호출할 수 있도록 한다. 절전과 성능을 만족시키는 옵션이다.
    • CONFIG_RCU_NOCB_CPU 커널 옵션을 사용하고 다음 3가지 중 하나를 선택할 수 있다.
      • CONFIG_RCU_NOCB_CPU_NONE
        • 디폴트로 nocb 지정된 cpu는 없지만 “nocbs=” 커널 파라메터로 특정 cpu들을 nocb로 지정할 수 있다.
      • CONFIG_RCU_NOCB_CPU_ZERO
        • 디폴트로 cpu#0을 nocb 지정한다. 추가로 “nocbs=” 커널 파라메터를 사용하여 다른 cpu들을 nocb로 지정할 수 있다.
      • CONFIG_RCU_NOCB_CPU_ALL
        • 디폴트로 모든 cpu를 nocb로 지정한다.

 

Leader & Follower CPU

NO-CB용 콜백을 사용하는 경우 지정된 cpu에 nocb용 커널 스레드가 구성된다. grace period 관리 및 이에 수반되는 콜백 리스트의 cascade 관리를 위해 leader cpu를 지정하고 그 leader cpu가 처리해준 작업을 수행하는 follower cpu를 구성하여 처리한다.

  • Leader cpu가 하는 일
    • gp 관리
    • leader 및 follower에 속한 콜백 리스트의 이동
    • leader 콜백 호출
  • Follower cpu가 하는 일
    • follower 콜백 호출

 

NO-CB용 콜백 추가

__call_rcu_nocb()

kernel/rcu/tree_plugin.h

/*
 * This is a helper for __call_rcu(), which invokes this when the normal
 * callback queue is inoperable.  If this is not a no-CBs CPU, this
 * function returns failure back to __call_rcu(), which can complain
 * appropriately.
 *
 * Otherwise, this function queues the callback where the corresponding
 * "rcuo" kthread can find it.
 */
static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
                            bool lazy, unsigned long flags)
{

        if (!rcu_is_nocb_cpu(rdp->cpu))
                return false;
        __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags);
        if (__is_kfree_rcu_offset((unsigned long)rhp->func))
                trace_rcu_kfree_callback(rdp->rsp->name, rhp,
                                         (unsigned long)rhp->func,
                                         -atomic_long_read(&rdp->nocb_q_count_lazy),
                                         -atomic_long_read(&rdp->nocb_q_count));
        else
                trace_rcu_callback(rdp->rsp->name, rhp,
                                   -atomic_long_read(&rdp->nocb_q_count_lazy),
                                   -atomic_long_read(&rdp->nocb_q_count));

        /*
         * If called from an extended quiescent state with interrupts
         * disabled, invoke the RCU core in order to allow the idle-entry
         * deferred-wakeup check to function.
         */
        if (irqs_disabled_flags(flags) &&
            !rcu_is_watching() &&
            cpu_online(smp_processor_id()))
                invoke_rcu_core();

        return true;
}

no-cb 지정된 cpu인 경우 rcu 커널 스레드(“rcuo”)에서 처리하도록 rcu cb를 큐잉한다.

  • 코드 라인 14~15에서 no-cb 지정된 cpu가 아니면 false를 반환한다.
  • 코드 라인 16에서 no-cb 큐에 rcu를 추가하고 rcu 커널 스레드를 호출하다.
  • 코드 라인 17~25에서 kfree용 rcu cb 및 함수 호출용 rcu cb 각각에 따른 trace 메시지 출력을 다르게 한다.
  • 코드 라인 32~35에서 irq disable이면서 확장 qs 상태에서 rcu 요청한 경우 softirq를 호출하여 rcu core를 호출한다.

 

다음 그림은 9개의 cpu가 nocb를 사용하는 모습을 보여준다.

 

RCU NO-CB 설정

rcu_is_nocb_cpu()

kernel/rcu/tree_plugin.h

/* Is the specified CPU a no-CBs CPU? */
bool rcu_is_nocb_cpu(int cpu)
{
        if (have_rcu_nocb_mask)
                return cpumask_test_cpu(cpu, rcu_nocb_mask);
        return false;
}

요청한 cpu가 no-cb(rcu 스레드 사용)으로 설정되었는지 여부를 반환한다.

  • have_rcu_nocb_mask 변수는 rcu_nocb_setup() 함수가 호출되어 rcu_nocb_mask라는 cpu 마스크가 할당된 경우 true로 설정된다.
  • CONFIG_RCU_NOCB_CPU_ALL 커널 옵션을 사용하는 경우 항상 rcu 스레드에서 동작시키기 위해 true를 반환한다.
  • CONFIG_RCU_NOCB_CPU_ALL 및 CONFIG_RCU_NOCB_CPU 커널 옵션 둘 다 사용하지 않는 경우 항상 callback 처리하기 위해 false를 반환한다.

 

rcu_nocb_setup()

kernel/rcu/tree_plugin.h

/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
static int __init rcu_nocb_setup(char *str)
{
        alloc_bootmem_cpumask_var(&rcu_nocb_mask);
        have_rcu_nocb_mask = true;
        cpulist_parse(str, rcu_nocb_mask);
        return 1;
}
__setup("rcu_nocbs=", rcu_nocb_setup);

커널 파라메터 “rcu_nocbs=”에 cpu 리스트를 설정한다. 이렇게 설정된 cpu들은 rcu callback 처리를 rcu 스레드에서 처리할 수 있다.

  • 예) rcu_nocbs=3-6,8-10

 

Leader cpu와 Follower cpu 구성

rcu_organize_nocb_kthreads()

kernel/rcu/tree_plugin.h

/*
 * Initialize leader-follower relationships for all no-CBs CPU.
 */
static void __init rcu_organize_nocb_kthreads(struct rcu_state *rsp)
{
        int cpu;
        int ls = rcu_nocb_leader_stride;
        int nl = 0;  /* Next leader. */
        struct rcu_data *rdp;
        struct rcu_data *rdp_leader = NULL;  /* Suppress misguided gcc warn. */
        struct rcu_data *rdp_prev = NULL;

        if (!have_rcu_nocb_mask)
                return;
        if (ls == -1) {
                ls = int_sqrt(nr_cpu_ids);
                rcu_nocb_leader_stride = ls;
        }

        /*
         * Each pass through this loop sets up one rcu_data structure and
         * spawns one rcu_nocb_kthread().
         */
        for_each_cpu(cpu, rcu_nocb_mask) {
                rdp = per_cpu_ptr(rsp->rda, cpu);
                if (rdp->cpu >= nl) {
                        /* New leader, set up for followers & next leader. */
                        nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls;
                        rdp->nocb_leader = rdp;
                        rdp_leader = rdp;
                } else {
                        /* Another follower, link to previous leader. */
                        rdp->nocb_leader = rdp_leader;
                        rdp_prev->nocb_next_follower = rdp;
                }
                rdp_prev = rdp;
        }
}

nocb 지정된 cpu에서 동작하는 스레드들에 대해 leader와 follower cpu로 나누어 구성한다.

  • 코드 라인 13~14에서 nocb 설정된 cpu가 없으면 함수를 빠져나간다.
  • 코드 라인 15~18에서 nocb로 구성된 cpu에 대해 leader cpu를 포함하여 follower cpu를 몇 개로 구성할지 여부를 알아온다. 만일 -1로 설정된 경우 sqrt(online cpu수) 결과값으로 사용한다.
    • “rcu_nocb_leader_stride=4″로 설정하는 경우 4개의 cpu마다 그룹을 나누어 첫번째는 leader cpu로 구성하고 나머지는 3개는 follower cpu로 구성한다.
    • y = int_sqrt(x) 함수에서 x 값의 rough한 루트 근사치를 구해온다.
      • x=1~3    y=1
      • x=4~8   y=2
      • x=9~15   y=3
      • x=16~24   y=4
      • x=25~35   y=5
      • x=36~48   y=6
      • x=49~63   y=7
      • x=64~80   y=8
      • x=81~99   y=9
      • x=100~120   y=10
      • x=121~143   y=11
      • x=144~168   y=12
      • x=169~195   y=13
      • x=196~224   y=14
      • x=225~255   y=15
  • 코드 라인 24~37에서 leader 및 follower cpu를 구성한다.
    • 예) “rcu_nocb_leader_stride=4”, “rcu_nocbs=6-11,  16-19″인 경우
      • 6=leader,     7=follower
      • 8=leader,     9, 10, 11=follower
      • 16=leader,    17, 18, 19=follower

 

RCU NO-CB 처리용 커널 스레드

rcu_nocb_kthread()

kernel/rcu/tree_plugin.h

/*
 * Per-rcu_data kthread, but only for no-CBs CPUs.  Each kthread invokes
 * callbacks queued by the corresponding no-CBs CPU, however, there is
 * an optional leader-follower relationship so that the grace-period
 * kthreads don't have to do quite so many wakeups.
 */
static int rcu_nocb_kthread(void *arg)
{
        int c, cl;
        struct rcu_head *list;
        struct rcu_head *next;
        struct rcu_head **tail;
        struct rcu_data *rdp = arg;

        /* Each pass through this loop invokes one batch of callbacks */
        for (;;) {
                /* Wait for callbacks. */
                if (rdp->nocb_leader == rdp)
                        nocb_leader_wait(rdp);
                else
                        nocb_follower_wait(rdp);

                /* Pull the ready-to-invoke callbacks onto local list. */
                list = ACCESS_ONCE(rdp->nocb_follower_head);
                BUG_ON(!list);
                trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty");
                ACCESS_ONCE(rdp->nocb_follower_head) = NULL;
                tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head);

                /* Each pass through the following loop invokes a callback. */
                trace_rcu_batch_start(rdp->rsp->name,
                                      atomic_long_read(&rdp->nocb_q_count_lazy),
                                      atomic_long_read(&rdp->nocb_q_count), -1);
                c = cl = 0;
                while (list) {
                        next = list->next;
                        /* Wait for enqueuing to complete, if needed. */
                        while (next == NULL && &list->next != tail) {
                                trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
                                                    TPS("WaitQueue"));
                                schedule_timeout_interruptible(1);
                                trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
                                                    TPS("WokeQueue"));
                                next = list->next;
                        }
                        debug_rcu_head_unqueue(list);
                        local_bh_disable();
                        if (__rcu_reclaim(rdp->rsp->name, list))
                                cl++;
                        c++;
                        local_bh_enable();
                        list = next;
                }
                trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1);
                smp_mb__before_atomic();  /* _add after CB invocation. */
                atomic_long_add(-c, &rdp->nocb_q_count);
                atomic_long_add(-cl, &rdp->nocb_q_count_lazy);
                rdp->n_nocbs_invoked += c;
        }
        return 0;
}

no-cb용 커널 스레드로 무한 루프를 돌며 gp를 대기한다. gp가 완료되고 대기하던 콜백들이 최종적으로 follower 리스트로 이동하면 이를 모두 호출하여 처리한다.

  • 코드 라인 16~21에서 무한 루프를 돌며 현재 스레드가 nocb_leader로서 동작하는 경우와 nocb_follower로 동작하는 것을 구분하여 콜백을 처리할 준비를 하기 위해 대기한다.
  • 코드 라인 24~28에서 콜백들을 호출하기 위해 follower 리스트에 담긴 콜백들을 모두 비워 로컬에 있는 list로 옮긴다.
  • 코드 라인 34~53에서 로컬 리스트로 가져온 콜백들을 모두 호출하여 수행한다.
  • 코드 라인 55~57에서 처리한 콜백 수 만큼 nocb_q_count 및 nocb_q_count_lazy를 갱신한다.
  • 코드 라인 48에서 콜백 호출한 횟 수인 n_nocbs_invoked 카운터를 갱신한다.

 

nocb_leader_wait()

kernel/rcu/tree_plugin.h

/*
 * Leaders come here to wait for additional callbacks to show up.
 * This function does not return until callbacks appear.
 */
static void nocb_leader_wait(struct rcu_data *my_rdp)
{
        bool firsttime = true;
        bool gotcbs;
        struct rcu_data *rdp;
        struct rcu_head **tail;

wait_again:

        /* Wait for callbacks to appear. */
        if (!rcu_nocb_poll) {
                trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Sleep");
                wait_event_interruptible(my_rdp->nocb_wq,
                                !ACCESS_ONCE(my_rdp->nocb_leader_sleep));
                /* Memory barrier handled by smp_mb() calls below and repoll. */
        } else if (firsttime) {
                firsttime = false; /* Don't drown trace log with "Poll"! */
                trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Poll");
        }

        /*
         * Each pass through the following loop checks a follower for CBs.
         * We are our own first follower.  Any CBs found are moved to
         * nocb_gp_head, where they await a grace period.
         */
        gotcbs = false;
        for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
                rdp->nocb_gp_head = ACCESS_ONCE(rdp->nocb_head);
                if (!rdp->nocb_gp_head)
                        continue;  /* No CBs here, try next follower. */

                /* Move callbacks to wait-for-GP list, which is empty. */
                ACCESS_ONCE(rdp->nocb_head) = NULL;
                rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head);
                gotcbs = true;
        }

        /*
         * If there were no callbacks, sleep a bit, rescan after a
         * memory barrier, and go retry.
         */
        if (unlikely(!gotcbs)) {
                if (!rcu_nocb_poll)
                        trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu,
                                            "WokeEmpty");
                WARN_ON(signal_pending(current));
                schedule_timeout_interruptible(1);

                /* Rescan in case we were a victim of memory ordering. */
                my_rdp->nocb_leader_sleep = true;
                smp_mb();  /* Ensure _sleep true before scan. */
                for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower)
                        if (ACCESS_ONCE(rdp->nocb_head)) {
                                /* Found CB, so short-circuit next wait. */
                                my_rdp->nocb_leader_sleep = false;
                                break;
                        }
                goto wait_again;
        }

leader 및 follower에 등록된 신규 콜백들을 follower_gp 리스트로 이동시킨다. gp가 만료된 후 follower_gp 리스트에 있는 콜백들을 follower 리스트로 옮기고 콜백들을 호출하도록 해당 follower의 nocb용 커널 스레드를 깨운다.

  • 코드 라인 15~23에서 “rcu_nocb_poll” 커널 파라메터 설정이 없는 경우 이 라인에서 대기한다.
  • 코드 라인 30~40에서 follower cpu들에 대해 nocb용 리스트를 nocb_gp 리스트로 옮긴다. 만일 등록된 콜백이 없으면 skip 한다.
  • 코드 라인 46~63에서 leader에 대응하는 follower cpu들 모두에 등록된 콜백이 없으면 leader는 1 tick 만큼 슬립한 후 leader에 등록한 콜백을 검사해서 여전히 없으면 이 함수 처음 wait_agail 레이블로 다시 이동한다.

 

        /* Wait for one grace period. */
        rcu_nocb_wait_gp(my_rdp);

        /*
         * We left ->nocb_leader_sleep unset to reduce cache thrashing.
         * We set it now, but recheck for new callbacks while
         * traversing our follower list.
         */
        my_rdp->nocb_leader_sleep = true;
        smp_mb(); /* Ensure _sleep true before scan of ->nocb_head. */

        /* Each pass through the following loop wakes a follower, if needed. */
        for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) {
                if (ACCESS_ONCE(rdp->nocb_head))
                        my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/
                if (!rdp->nocb_gp_head)
                        continue; /* No CBs, so no need to wake follower. */

                /* Append callbacks to follower's "done" list. */
                tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail);
                *tail = rdp->nocb_gp_head;
                smp_mb__after_atomic(); /* Store *tail before wakeup. */
                if (rdp != my_rdp && tail == &rdp->nocb_follower_head) {
                        /*
                         * List was empty, wake up the follower.
                         * Memory barriers supplied by atomic_long_add().
                         */
                        wake_up(&rdp->nocb_wq);
                }
        }

        /* If we (the leader) don't have CBs, go wait some more. */
        if (!my_rdp->nocb_follower_head)
                goto wait_again;
}
  • 코드 라인 2에서 gp의 완료까지 대기한다.
  • 코드 라인 13~30에서 leader에 속한 follower cpu들을 순회하며 nocb_gp 리스트에 있는 콜백들을 nocb_follower 리스트로 옮기고 등록된 콜백들을 호출하라고 해당 nocb 커널 스레드를 깨운다.
  • 코드 라인 33~34에서 leader에 등록된 콜백이 없으면 함수 처음으로 돌아간다.

 

rcu_nocb_wait_gp()

kernel/rcu/tree_plugin.h

/*
 * If necessary, kick off a new grace period, and either way wait
 * for a subsequent grace period to complete.
 */
static void rcu_nocb_wait_gp(struct rcu_data *rdp)
{
        unsigned long c;
        bool d;
        unsigned long flags;
        bool needwake;
        struct rcu_node *rnp = rdp->mynode;

        raw_spin_lock_irqsave(&rnp->lock, flags);
        smp_mb__after_unlock_lock();
        needwake = rcu_start_future_gp(rnp, rdp, &c);
        raw_spin_unlock_irqrestore(&rnp->lock, flags);
        if (needwake)
                rcu_gp_kthread_wake(rdp->rsp);

        /*
         * Wait for the grace period.  Do so interruptibly to avoid messing
         * up the load average.
         */
        trace_rcu_future_gp(rnp, rdp, c, TPS("StartWait"));
        for (;;) {
                wait_event_interruptible(
                        rnp->nocb_gp_wq[c & 0x1],
                        (d = ULONG_CMP_GE(ACCESS_ONCE(rnp->completed), c)));
                if (likely(d))
                        break;
                WARN_ON(signal_pending(current));
                trace_rcu_future_gp(rnp, rdp, c, TPS("ResumeWait"));
        }
        trace_rcu_future_gp(rnp, rdp, c, TPS("EndWait"));
        smp_mb(); /* Ensure that CB invocation happens after GP end. */
}

새 gp가 완료되기를 기다린다.

  • 코드 라인 13~18에서 노드락을 걸고 새 gp의 completed 번호를 발급받아온다. 결과 값에 다라 gp 커널 스레드의 wakeup을 요청한다.
  • 코드 라인 25~33에서 발급받은 completed 번호로 홀짝 구분한 대기큐에서 요청 노드의 completed가 발급 받은 번호 이상이될 때까지 대기한다.

 

nocb_follower_wait()

kernel/rcu/tree_plugin.h

/*
 * Followers come here to wait for additional callbacks to show up.
 * This function does not return until callbacks appear.
 */
static void nocb_follower_wait(struct rcu_data *rdp)
{
        bool firsttime = true;

        for (;;) {
                if (!rcu_nocb_poll) {
                        trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
                                            "FollowerSleep");
                        wait_event_interruptible(rdp->nocb_wq,
                                                 ACCESS_ONCE(rdp->nocb_follower_head));
                } else if (firsttime) {
                        /* Don't drown trace log with "Poll"! */
                        firsttime = false;
                        trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "Poll");
                }
                if (smp_load_acquire(&rdp->nocb_follower_head)) {
                        /* ^^^ Ensure CB invocation follows _head test. */
                        return;
                }
                if (!rcu_nocb_poll)
                        trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu,
                                            "WokeEmpty");
                WARN_ON(signal_pending(current));
                schedule_timeout_interruptible(1);
        }
}

follower cpu는 이 곳에서 대기한다. leader가 gp 만료되면 깨워준다.

  • “rcu_nocb_poll” 커널 파라메터가 설정된 경우 1 tick 마다 폴링한다.

 

참고

 

답글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다