RCU(Read Copy Update) -2-


RCU 동기 Update



 * synchronize_rcu - wait until a grace period has elapsed.
 * Control will return to the caller some time after a full grace
 * period has elapsed, in other words after all currently executing RCU
 * read-side critical sections have completed.  Note, however, that
 * upon return from synchronize_rcu(), the caller might well be executing
 * concurrently with new RCU read-side critical sections that began while
 * synchronize_rcu() was waiting.  RCU read-side critical sections are
 * delimited by rcu_read_lock() and rcu_read_unlock(), and may be nested.
 * See the description of synchronize_sched() for more detailed information
 * on memory ordering guarantees.
void synchronize_rcu(void)
        rcu_lockdep_assert(!lock_is_held(&rcu_bh_lock_map) &&
                           !lock_is_held(&rcu_lock_map) &&
                           "Illegal synchronize_rcu() in RCU read-side critical section");
        if (!rcu_scheduler_active)
        if (rcu_expedited)

grace period가 지날때까지 기다린다.

  • if (!rcu_scheduler_active) return;
    • 아직 rcu 스케쥴러가 동작하지 않는 경우 처리를 포기하고 함수를 빠져나온다.
  • if (rcu_expedited) synchronize_rcu_expedited(); else  wait_rcu_gp(call_rcu);
    • /sys/kernel/rcu_expedited 값이 설정된 경우 일반 RCU grace period 방법을 사용하는 대신 더 신속한 Brute-force RCU grace period 방법을 사용한다.
    • 그렇지 않은 경우 일반 RCU grace period 방법을 사용한다.


다음 그림은 동기화 rcu 요청 함수인 synchronize_rcu() 및 비동기 rcu 요청 함수인 call_rcu() 함수 두 가지에 대해 각각 호출 경로를 보여준다. 단 Brute-force RCU grace period를 사용하는 synchronize_rcu_expedited() 함수 호출 경로는 생략하였고, 일반 gp wait을 이용하는 wait_rcu_gp() 함수 호출 경로를 사용하였다.




 * synchronize_rcu_expedited - Brute-force RCU grace period
 * Wait for an RCU-preempt grace period, but expedite it.  The basic
 * idea is to invoke synchronize_sched_expedited() to push all the tasks to
 * the ->blkd_tasks lists and wait for this list to drain.  This consumes
 * significant time on all CPUs and is unfriendly to real-time workloads,
 * so is thus not recommended for any sort of common-case code.
 * In fact, if you are using synchronize_rcu_expedited() in a loop,
 * please restructure your code to batch your updates, and then Use a
 * single synchronize_rcu() instead.
void synchronize_rcu_expedited(void)
        unsigned long flags;
        struct rcu_node *rnp;
        struct rcu_state *rsp = &rcu_preempt_state;
        unsigned long snap;
        int trycount = 0;

        smp_mb(); /* Caller's modifications seen first by other CPUs. */
        snap = ACCESS_ONCE(sync_rcu_preempt_exp_count) + 1;
        smp_mb(); /* Above access cannot bleed into critical section. */

         * Block CPU-hotplug operations.  This means that any CPU-hotplug
         * operation that finds an rcu_node structure with tasks in the
         * process of being boosted will know that all tasks blocking
         * this expedited grace period will already be in the process of
         * being boosted.  This simplifies the process of moving tasks
         * from leaf to root rcu_node structures.
        if (!try_get_online_cpus()) {
                /* CPU-hotplug operation in flight, fall back to normal GP. */

         * Acquire lock, falling back to synchronize_rcu() if too many
         * lock-acquisition failures.  Of course, if someone does the
         * expedited grace period for us, just leave.
        while (!mutex_trylock(&sync_rcu_preempt_exp_mutex)) {
                if (ULONG_CMP_LT(snap,
                    ACCESS_ONCE(sync_rcu_preempt_exp_count))) {
                        goto mb_ret; /* Others did our work for us. */
                if (trycount++ < 10) {
                        udelay(trycount * num_online_cpus());
                } else {
        if (ULONG_CMP_LT(snap, ACCESS_ONCE(sync_rcu_preempt_exp_count))) {
                goto unlock_mb_ret; /* Others did our work for us. */
  • struct rcu_state *rsp = &rcu_preempt_state;
    • 전역 rcu_preempt_state 객체를 통해 rcu 트리 노드등을 사용할 목적으로 rsp 포인터에 대입한다.
  • snap = ACCESS_ONCE(sync_rcu_preempt_exp_count) + 1;
    • sync_rcu_preempt_exp_count 값을 읽어 1을 추가하여 snap에 대입한다.
  • if (!try_get_online_cpus()) { wait_rcu_gp(call_rcu); return; }
    • cpu hotplug용 mutex lock을 획득하지 못한 경우 일반 RCU grace period 방식을 사용한다.
    • 참고: Optimizing CPU hotplug locking | LWN.net
  • while (!mutex_trylock(&sync_rcu_preempt_exp_mutex)) {
    • contension 상황이라 아직 rcu용 mutex 락을 획득하지 못한 경우 루프를 돈다.
  • if (ULONG_CMP_LT(snap, ACCESS_ONCE(sync_rcu_preempt_exp_count))) { put_online_cpus(); goto mb_ret; }
    • snap 값이 sync_rcu_preempt_exp_count 값보다 작은 경우 함수를 빠져나간다.
  • if (trycount++ < 10) { udelay(trycount * num_online_cpus()); } else { put_online_cpus(); wait_rcu_gp(call_rcu); return; }
    • 10번 이내 시도 시 시도 횟 수만큼 비례하여 약간의 딜레이를 갖는다. 시도가 10번 이상인 경우 일반 RCU grace period 방식을 사용하여 대기를 한 후 함수를 빠져나간다.
  • if (ULONG_CMP_LT(snap, ACCESS_ONCE(sync_rcu_preempt_exp_count))) {  put_online_cpus(); goto unlock_mb_ret; }
    • mutex 락 획득 후 snap 값이 sync_rcu_preempt_exp_count 보다 작은 경우 함수를 빠져나간다.


        /* force all RCU readers onto ->blkd_tasks lists. */

        /* Initialize ->expmask for all non-leaf rcu_node structures. */
        rcu_for_each_nonleaf_node_breadth_first(rsp, rnp) {
                raw_spin_lock_irqsave(&rnp->lock, flags);
                rnp->expmask = rnp->qsmaskinit;
                raw_spin_unlock_irqrestore(&rnp->lock, flags);

        /* Snapshot current state of ->blkd_tasks lists. */
        rcu_for_each_leaf_node(rsp, rnp)
                sync_rcu_preempt_exp_init(rsp, rnp);
        if (NUM_RCU_NODES > 1)
                sync_rcu_preempt_exp_init(rsp, rcu_get_root(rsp));


        /* Wait for snapshotted ->blkd_tasks lists to drain. */
        rnp = rcu_get_root(rsp);

        /* Clean up and exit. */
        smp_mb(); /* ensure expedited GP seen before counter increment. */
        ACCESS_ONCE(sync_rcu_preempt_exp_count) =
                                        sync_rcu_preempt_exp_count + 1;
        smp_mb(); /* ensure subsequent action seen after grace period. */
  • synchronize_sched_expedited();
  • rcu_for_each_nonleaf_node_breadth_first(rsp, rnp) { raw_spin_lock_irqsave(&rnp->lock, flags); smp_mb__after_unlock_lock(); rnp->expmask = rnp->qsmaskinit; raw_spin_unlock_irqrestore(&rnp->lock, flags); }
    • leaf 노드가 아닌 rcu 노드에 대해 루프를 돌며 rnp->expmask에 rnp->qsmaskinit을 대입하여 초기화한다.
  • rcu_for_each_leaf_node(rsp, rnp) sync_rcu_preempt_exp_init(rsp, rnp);
    • leaf 노드에 대해 루프를 돌며
  • if (NUM_RCU_NODES > 1) sync_rcu_preempt_exp_init(rsp, rcu_get_root(rsp));
    • RCU 노드가 2단계 레벨 이상 구성된 경우
  • rnp = rcu_get_root(rsp); wait_event(sync_rcu_preempt_exp_wq, sync_rcu_preempt_exp_done(rnp));
    • sync_rcu_preempt_exp_wq에 이벤트가 도착하고 루트노드에서 RCU expedited grace period가 진행중이지 않을 때까지 기다린다.
  • ACCESS_ONCE(sync_rcu_preempt_exp_count) = sync_rcu_preempt_exp_count + 1
    • sync_rcu_preempt_exp_count를 1 증가시킨다.




void wait_rcu_gp(call_rcu_func_t crf)
        struct rcu_synchronize rcu;

        /* Will wake me after RCU finished. */
        crf(&rcu.head, wakeme_after_rcu);
        /* Wait for it. */

Grace Period가 완료될 때까지 대기한다. 대기를 하기 위해 wakeme_after_rcu라는 함수를 RCU callback으로 하여 비동기 rcu 요청한다. GP가 끝난 후에 해당 callback 함수가 호출되면 자연스럽게 이 함수가 완료된다.

  • init_rcu_head_on_stack(&rcu.head);
    • rcu_head를 stack에서 초기화한다.
  • init_completion(&rcu.completion);
    • completion 구조체를 초기화한다.
  • crf(&rcu.head, wakeme_after_rcu);
    • 인수 crf에는 call_rcu_sched()  또는 call_rcu_bh()함수가 담겨 비동기 rcu callback을 요청한다.
    • gp가 완료되면 wakeme_after_rcu()함수가 호출된다.
    • wakeme_after_rcu() 함수는 대기 중인 wait_for_completion() 함수의 완료시킨다.
  • wait_for_completion(&rcu.completion);
    • 작업이 끝날 때까지 대기한다.
  • destroy_rcu_head_on_stack(&rcu.head);
    • 스택에 위치한 rcu.head를 제거한다.




 * Awaken the corresponding synchronize_rcu() instance now that a
 * grace period has elapsed.
static void wakeme_after_rcu(struct rcu_head  *head)
        struct rcu_synchronize *rcu;

        rcu = container_of(head, struct rcu_synchronize, head);

작업 완료를 기다리고 있는 rcu에 complete 처리를 한다.


RCU 비동기 Update



 * Queue a preemptible-RCU callback for invocation after a grace period.
void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
        __call_rcu(head, func, &rcu_preempt_state, -1, 0);

grace period가 지난 후에 preemtible-RCU 콜백을 호출할 수 있도록 큐에 추가한다.




 * Helper function for call_rcu() and friends.  The cpu argument will
 * normally be -1, indicating "currently running CPU".  It may specify
 * a CPU only if that CPU is a no-CBs CPU.  Currently, only _rcu_barrier()
 * is expected to specify a CPU.
static void
__call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu),
           struct rcu_state *rsp, int cpu, bool lazy)
        unsigned long flags;
        struct rcu_data *rdp;

        WARN_ON_ONCE((unsigned long)head & 0x1); /* Misaligned rcu_head! */
        if (debug_rcu_head_queue(head)) {
                /* Probable double call_rcu(), so leak the callback. */
                ACCESS_ONCE(head->func) = rcu_leak_callback;
                WARN_ONCE(1, "__call_rcu(): Leaked duplicate callback\n");
        head->func = func;
        head->next = NULL;

         * Opportunistically note grace-period endings and beginnings.
         * Note that we might see a beginning right after we see an
         * end, but never vice versa, since this CPU has to pass through
         * a quiescent state betweentimes.
        rdp = this_cpu_ptr(rsp->rda);

        /* Add the callback to our list. */
        if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
                int offline;

                if (cpu != -1)
                        rdp = per_cpu_ptr(rsp->rda, cpu);
                offline = !__call_rcu_nocb(rdp, head, lazy, flags);
                /* _call_rcu() is illegal on offline CPU; leak the callback. */
        ACCESS_ONCE(rdp->qlen) = rdp->qlen + 1;
        if (lazy)
        smp_mb();  /* Count before adding callback for rcu_barrier(). */
        *rdp->nxttail[RCU_NEXT_TAIL] = head;
        rdp->nxttail[RCU_NEXT_TAIL] = &head->next;

        if (__is_kfree_rcu_offset((unsigned long)func))
                trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
                                         rdp->qlen_lazy, rdp->qlen);
                trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);

        /* Go handle any RCU core processing required. */
        __call_rcu_core(rsp, rdp, head, flags);

call_rcu의 헬퍼 함수로 grace period가 지난 후에 preemtible-RCU 콜백을 호출할 수 있도록 큐에 추가한다. 인수 cpu는 일반적으로 -1이 입력되는 경우 현재 러닝 중인 cpu를 의미한다.

  • 코드 라인 15~20에서 CONFIG_DEBUG_OBJECTS_RCU_HEAD 커널 옵션을 사용하여 디버깅을 하는 동안 __call_rcu() 함수가 이중 호출되는 경우 경고 메시지를 출력하고 함수를 빠져나간다.
  • 코드 라인 21~22에서 rcu 인스턴스에 인수로 전달받은 콜백 함수를 대입한다. rcu는 리스트로 연결되는데 가장 마지막에 추가되므로 마지막을 의미하는 null을 대입한다.
  • 코드 라인 31에서 현재 cpu에 대한 rcu 데이터를 알아온다.
  • 코드 라인 34~44에서 낮은 확률로 rcu cb를 추가할 포인터가 지정되지 않아 cb를 추가할 수 없는 적절치 못한 상황이거나 cpu가 지정된 경우 즉각적인 cb 처리를 위해 no-cb용 커널 스레드에서 처리한다.
  • 코드 라인 45~47에서 rcu 요청되어 큐에 들어올 때 마다 qlen이 1씩 증가되고 처리가 완료되면 처리된 수 만큼 감소한다. kfree용 rcu인 경우에는 qlen 및 qlen_lazy가 같이 증가되고 처리가 완료되면 처리된 수 만큼 같이 감소한다.
  • 코드 라인 51~52에서 nxttail[RCU_NEXT_TAIL]이 가리키는 곳의 다음에 cb를 추가한다. 보통 rdp->nxtlist의 가장 마지막에 cb가 추가된다. nxttail[RCU_NEXT_TAIL]은 항상 마지막에 추가된 cb를 가리킨다.
  • 코드 라인 54~58에서 kfree 타입 또는 일반 타입의 rcu 요청에 맞게 trace 로그를 출력한다.
  • 코드 라인 61에서 rcu core 처리를 수행한다.


RCU 콜백 리스트

RCU 콜백들이 추가되는 리스트에 대해 알아보자. 하나의 리스트를 사용하였고, 4개의 구간으로 나누어 관리하기 위해 4개의 포인터 배열을 사용한다.

  • 1개의 리스트: rdp->nxtlist
  • 4개의 포인터 배열: rdp->nxttail[4]
    • 1 번째 구간:
      • *nxtlist ~ nxttail[0]
      • nxtlist에 연결된 다음 콜백 위치부터 nxttail[RCU_DONE_TAIL] 까지
      • g.p가 완료하여 처리 가능한 구간이다. blimit 제한으로 인해 다 처리되지 못한 콜백들이 이 구간에 남아 있을 수 있다.
      • 이 구간의 콜백들은 아무때나 처리 가능하다.
    • 2 번째 구간
      • *nxttail[0] ~ nxttail[1]
      • nxttail[RCU_DONE_TAIL]에 연결된 다음 콜백 위치부터 nxttail[RCU_WAIT_TAIL] 까지
      • current gp 이전에 추가된 콜백으로 current gp가 끝난 후에 처리될 예정인 콜백들이 대기중인 구간이다.
      • 이 구간의 콜백들은 current gp가 지난 후에 처리되어야 한다.
    • 3 번째 구간
      • *nxttail[1] ~ nxttail[2]
      • nxttail[RCU_WAIT_TAIL]에 연결된 다음 콜백 위치부터 nxttail[RCU_NEXT_READY_TAIL] 까지
      • current gp 진행 중에 추가된 콜백들이 추가된 구간이다. 이 콜백들은 current gp가 완료되어도 곧바로 처리되면 안되고, 다음 gp가 완료된 후에 처리될 예정인 콜백들이 있는 구간이다.
      • 이 구간의 콜백들은 current gp가 완료되고 다음 gp가 완료된 후에 처리되어야 한다.
    • 4 번째 구간
      • *nxttail[2] ~ nxttail[3]
      • nxttail[RCU_NEXT_READY_TAIL]에 연결된 다음 콜백 위치부터 nxttail[RCU_NEXT_TAIL] 까지
      • 새 콜백이 추가되면 이 구간의 마지막에 추가되고, nxttail[RCU_NEXT_TAIL]이 추가된 마지막 콜백을 항상 가리키게 해야한다.




 * Handle any core-RCU processing required by a call_rcu() invocation.
static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
                            struct rcu_head *head, unsigned long flags)
        bool needwake;

         * If called from an extended quiescent state, invoke the RCU
         * core in order to force a re-evaluation of RCU's idleness.
        if (!rcu_is_watching() && cpu_online(smp_processor_id()))

        /* If interrupts were disabled or CPU offline, don't invoke RCU core. */
        if (irqs_disabled_flags(flags) || cpu_is_offline(smp_processor_id()))

         * Force the grace period if too many callbacks or too long waiting.
         * Enforce hysteresis, and don't invoke force_quiescent_state()
         * if some other CPU has recently done so.  Also, don't bother
         * invoking force_quiescent_state() if the newly enqueued callback
         * is the only one waiting for a grace period to complete.
        if (unlikely(rdp->qlen > rdp->qlen_last_fqs_check + qhimark)) {

                /* Are we ignoring a completed grace period? */
                note_gp_changes(rsp, rdp);

                /* Start a new grace period if one not already started. */
                if (!rcu_gp_in_progress(rsp)) {
                        struct rcu_node *rnp_root = rcu_get_root(rsp);

                        needwake = rcu_start_gp(rsp);
                        if (needwake)
                } else {
                        /* Give the grace period a kick. */
                        rdp->blimit = LONG_MAX;
                        if (rsp->n_force_qs == rdp->n_force_qs_snap &&
                            *rdp->nxttail[RCU_DONE_TAIL] != head)
                        rdp->n_force_qs_snap = rsp->n_force_qs;
                        rdp->qlen_last_fqs_check = rdp->qlen;
  • 코드 라인 13~14에서 cpu가 idle 상태로 진입하지 않았으면서 online 상태이면 rcu 콜백 처리를 위해 softirq를 호출한다.
  • 코드 라인 17~18에서 인터럽트가 disable된 상태인 경우이거나 offline 상태이면 함수를 빠져나간다.
  • 코드 라인 27에서 rcu 콜백이 너무 많이 대기 중이면
  • 코드 라인 33~41에서 gp가 진행 중이 아니면 새로운 gp를 시작시키기 위해 gp 스레드를 깨운다.
  • 코드 라인 42~49에서 gp가 진행 중이면 blimit 제한을 풀고 fqs를 진행한다.





 * Do RCU core processing for the current CPU.
static void rcu_process_callbacks(struct softirq_action *unused)
        struct rcu_state *rsp;

        if (cpu_is_offline(smp_processor_id()))
        trace_rcu_utilization(TPS("Start RCU core"));
        trace_rcu_utilization(TPS("End RCU core"));

rcu flavor 리스트에 등록된 rcu state는 rcu_sched_state, rcu_bh_state, rcu_preempt_state가 있다. 이들에 등록되어 있는 콜백을 단계별로 처리한다.




 * This does the RCU core processing work for the specified rcu_state
 * and rcu_data structures.  This may be called only from the CPU to
 * whom the rdp belongs.
static void
__rcu_process_callbacks(struct rcu_state *rsp)
        unsigned long flags;
        bool needwake;
        struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
        WARN_ON_ONCE(rdp->beenonline == 0);     

        /* Update RCU state based on any recent quiescent states. */
        rcu_check_quiescent_state(rsp, rdp);
        /* Does this CPU require a not-yet-started grace period? */
        if (cpu_needs_another_gp(rsp, rdp)) {   
                raw_spin_lock(&rcu_get_root(rsp)->lock); /* irqs disabled. */
                needwake = rcu_start_gp(rsp);   
                raw_spin_unlock_irqrestore(&rcu_get_root(rsp)->lock, flags);
                if (needwake)
        } else {
        /* If there are callbacks ready, invoke them. */
        if (cpu_has_callbacks_ready_to_invoke(rdp))
                invoke_rcu_callbacks(rsp, rdp);
        /* Do any needed deferred wakeups of rcuo kthreads. */

rcu 콜백을 처리한다.

  • 코드 라인 16에서 qs가 지났는지 체크한다.
  • 코드 라인 19~28에서 irq를 disable한 상태에서 새로운 gp가 필요하면 gp를 시작한다. 필요에 의해 gp 스레드가 필요하면 깨운다.
  • 코드 라인 31~32에서 rcu 콜백을 처리할 수 있는 상태이면 호출하여 단계별로 처리한다.
  • 코드 라인 35에서 rcu no-callback을 위한 처리를 한다.


RCU gpnum & completed 번호 관계

각각의 구조체 포인터 표현은 다음과 같다.

  • rsp -> rcu_state 구조체 포인터 (글로벌 락 사용)
  • rnp -> rcu_node 구조체 포인터 (노드에 포함된 cpu만 영향을 받는 노드락 사용)
  • rdp -> rcu_data 구조체 포인터 (per-cpu 로컬이므로 락 없이 사용)


위 3가지 구조체에 담긴 gpnum과 completed가 담겨 있다. rcu 관리에서 최대한 락 사용을 억제하기 위해 per-cpu를 사용한 rcu_data는 로컬 cpu 데이터를 취급한다. 로컬 데이터는 해당 cpu가 필요한 시점에서만 갱신한다. 따라서 이로 인해 로컬 gpnum과 completed 번호는 글로벌 gpnum과 completed 번호에 비해  최대 1 만큼 늦어질 수도 있다. (모든 값의 차이는 최대 1을 초과할 수 없다.) gp 번호가 담긴 gpnum은 gp가 시작할 때 증가된다. gp가 완료된 후 완료된 글로벌 gpnum 번호로 글로벌의 completed 번호를 갱신한다. gp가 완료되었으므로 completed 번호까지는 cb들을 처리할 수 있게된다.




 * Check to see if there is a new grace period of which this CPU
 * is not yet aware, and if so, set up local rcu_data state for it.
 * Otherwise, see if this CPU has just passed through its first
 * quiescent state for this grace period, and record that fact if so.
static void
rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
        /* Check for grace-period ends and beginnings. */
        note_gp_changes(rsp, rdp);

         * Does this CPU still need to do its part for current grace period?
         * If no, return and let the other CPUs do their part as well.
        if (!rdp->qs_pending)

         * Was there a quiescent state since the beginning of the grace
         * period? If no, then exit and wait for the next call.
        if (!rdp->passed_quiesce &&
            rdp->rcu_qs_ctr_snap == __this_cpu_read(rcu_qs_ctr))

         * Tell RCU we are done (but rcu_report_qs_rdp() will be the
         * judge of that).
        rcu_report_qs_rdp(rdp->cpu, rsp, rdp);





static void note_gp_changes(struct rcu_state *rsp, struct rcu_data *rdp)
        unsigned long flags;
        bool needwake;
        struct rcu_node *rnp;

        rnp = rdp->mynode;
        if ((rdp->gpnum == ACCESS_ONCE(rnp->gpnum) &&
             rdp->completed == ACCESS_ONCE(rnp->completed) &&
             !unlikely(ACCESS_ONCE(rdp->gpwrap))) || /* w/out lock. */
            !raw_spin_trylock(&rnp->lock)) { /* irqs already off, so later. */
        needwake = __note_gp_changes(rsp, rnp, rdp);
        raw_spin_unlock_irqrestore(&rnp->lock, flags);
        if (needwake)

next 리스트에 등록된 콜백들에 대해 completed 번호 식별을 한다. gp 상태에 변화가 있는지를 체크한다.  gp kthread를 깨운다.

  • 코드 라인 9~15에서 로컬 gpnum과 completed가 이미 갱신되었고 gpwrap이 0이거나 노드 락을 잡을 수 없는 경우 다음 기회로 미룬다.
  • 코드 라인 17에서 next 리스트에 등록된 콜백들에 대해 completed 번호 식별을 한다.
  • 코드 라인 19~20에서 gp 상태에 변화가 있는지를 체크한다.  gp kthread를 깨운다.




 * Update CPU-local rcu_data state to record the beginnings and ends of
 * grace periods.  The caller must hold the ->lock of the leaf rcu_node
 * structure corresponding to the current CPU, and must have irqs disabled.
 * Returns true if the grace-period kthread needs to be awakened.
static bool __note_gp_changes(struct rcu_state *rsp, struct rcu_node *rnp,
                              struct rcu_data *rdp)
        bool ret;

        /* Handle the ends of any preceding grace periods first. */
        if (rdp->completed == rnp->completed &&
            !unlikely(ACCESS_ONCE(rdp->gpwrap))) {

                /* No grace period end, so just accelerate recent callbacks. */
                ret = rcu_accelerate_cbs(rsp, rnp, rdp);

        } else {

                /* Advance callbacks. */
                ret = rcu_advance_cbs(rsp, rnp, rdp);

                /* Remember that we saw this grace-period completion. */
                rdp->completed = rnp->completed;
                trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("cpuend"));

        if (rdp->gpnum != rnp->gpnum || unlikely(ACCESS_ONCE(rdp->gpwrap))) {
                 * If the current grace period is waiting for this CPU,
                 * set up to detect a quiescent state, otherwise don't
                 * go looking for one.
                rdp->gpnum = rnp->gpnum;
                trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("cpustart"));
                rdp->passed_quiesce = 0;
                rdp->rcu_qs_ctr_snap = __this_cpu_read(rcu_qs_ctr);
                rdp->qs_pending = !!(rnp->qsmask & rdp->grpmask);
                ACCESS_ONCE(rdp->gpwrap) = false;
        return ret;

next 리스트에 등록된 콜백들에 대해 completed 번호 식별을 한다. 그런 후 gp 상태에 변화가 있는지를 체크한다.  gp kthread를 깨울 수 있도록 gp 상태가 변경된 경우 true를 반환한다.

  • 코드 라인 13~17에서 gp가 완료되지 않은 상태이다. 이러한 경우 새롭게 진입한 콜백들의 completed 번호 식별을 수행한다. gp 상태에 변화가 있는 경우 true를 반환한다.
  • 코드 라인 19~27에서 gp가 완료된 상태이다. 모든 기존 콜백 및 추가된 콜백들에 대해 식별 처리 후 done 구간으로 옮긴다. 그런 후 로컬completed를 갱신하고 gp 상태에 변화가 있는 경우 true를 반환한다.
  • 코드 라인 29~에서 로컬 gpnum이 갱신되지 않았거나 gp가 진행 중에 있는 경우 로컬 gpnum을 갱신하고 로컬 qs 상태를 0으로 한다. 로컬 gpwrap을 false로 갱신한다.




 * Move any callbacks whose grace period has completed to the
 * RCU_DONE_TAIL sublist, then compact the remaining sublists and
 * assign ->completed numbers to any callbacks in the RCU_NEXT_TAIL
 * sublist.  This function is idempotent, so it does not hurt to
 * invoke it repeatedly.  As long as it is not invoked -too- often...
 * Returns true if the RCU grace-period kthread needs to be awakened.
 * The caller must hold rnp->lock with interrupts disabled.
static bool rcu_advance_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
                            struct rcu_data *rdp)
        int i, j;

        /* If the CPU has no callbacks, nothing to do. */
        if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
                return false;

         * Find all callbacks whose ->completed numbers indicate that they
         * are ready to invoke, and put them into the RCU_DONE_TAIL sublist.
        for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
                if (ULONG_CMP_LT(rnp->completed, rdp->nxtcompleted[i]))
                rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[i];
        /* Clean up any sublist tail pointers that were misordered above. */
        for (j = RCU_WAIT_TAIL; j < i; j++)
                rdp->nxttail[j] = rdp->nxttail[RCU_DONE_TAIL];

        /* Copy down callbacks to fill in empty sublists. */
        for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) {
                if (rdp->nxttail[j] == rdp->nxttail[RCU_NEXT_TAIL])
                rdp->nxttail[j] = rdp->nxttail[i];
                rdp->nxtcompleted[j] = rdp->nxtcompleted[i];

        /* Classify any remaining callbacks. */
        return rcu_accelerate_cbs(rsp, rnp, rdp);

gp가 만료된 콜백들을 done 구간으로 옮긴다. gp kthread를 깨워야 하는 경우 true를 반환한다.

  • 코드 라인 17~18에서 처리할 콜백이 없으면 false를 반환한다.
  • 코드 라인 24~28에서 wait 구간과 next 구간에 이미 만료된 completed 번호의 콜백을 done 구간으로 옮긴다.
  • 코드 라인 30~31에서 위에서 wait 구간 또는 next 구간에 콜백들이 있었던 경우 wait tail 또는 next ready tail이 done tail 보다 앞서 있을 수 있다. 따라서 해당 구간을 일단 done tail과 동일하게 조정한다.
  • 코드 라인 34~39에서 next ready 구간에 있었던 콜백들을 wait 구간으로 옮긴다. 그리고 next 구간에 있었던 콜백들을 next ready 구간으로 옮긴다.
  • 코드 라인 42에서 done 구역이외의 남은 콜백들에 대한 식별 작업을 하고 done 구간으로 옮긴다.


다음 그림은 rnp->completed=101이 되는 순간 101번까지의 콜백들이 done 구간으로 옮겨지고, 다음 구간들이 한 단계씩 앞으로 이동하게되는 것을 알 수 있다. 이어서 rcu_accelerate_cbs() 함수를 호출하는데 그 과정은 해당 코드와 그림에서 표현한다.




kernel/rcu/tree.c -1/2-

 * If there is room, assign a ->completed number to any callbacks on
 * this CPU that have not already been assigned.  Also accelerate any
 * callbacks that were previously assigned a ->completed number that has
 * since proven to be too conservative, which can happen if callbacks get
 * assigned a ->completed number while RCU is idle, but with reference to
 * a non-root rcu_node structure.  This function is idempotent, so it does
 * not hurt to call it repeatedly.  Returns an flag saying that we should
 * awaken the RCU grace-period kthread.
 * The caller must hold rnp->lock with interrupts disabled.
static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
                               struct rcu_data *rdp)
        unsigned long c;
        int i;
        bool ret;

        /* If the CPU has no callbacks, nothing to do. */
        if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
                return false;

         * Starting from the sublist containing the callbacks most
         * recently assigned a ->completed number and working down, find the
         * first sublist that is not assignable to an upcoming grace period.
         * Such a sublist has something in it (first two tests) and has
         * a ->completed number assigned that will complete sooner than
         * the ->completed number for newly arrived callbacks (last test).
         * The key point is that any later sublist can be assigned the
         * same ->completed number as the newly arrived callbacks, which
         * means that the callbacks in any of these later sublist can be
         * grouped into a single sublist, whether or not they have already
         * been assigned a ->completed number.
        c = rcu_cbs_completed(rsp, rnp);
        for (i = RCU_NEXT_TAIL - 1; i > RCU_DONE_TAIL; i--)
                if (rdp->nxttail[i] != rdp->nxttail[i - 1] &&
                    !ULONG_CMP_GE(rdp->nxtcompleted[i], c))

이 함수에서는 새로 요청된 rcu 콜백들을 모두 모아 새로운 completed 번호를 부여하고 모두 done에 위치시킨다. rcu gp kthread를 깨워햐 하는 경우 true를 반환한다.

  • 코드 라인 21~22에서 처리할 콜백이 하나도 없는 경우 false를 반환한다.
    • 로컬 cpu가 no-cb로 지정된 경우 콜백 대기 리스트(nxtlist)를 사용하지 않는다.
    • done 구간 뒤로 콜백이 하나도 없으면 *rdp->nxttail[RCU_DONE_TAIL] == null이다.
  • 코드 라인 38에서 completed 번호를 부여하지 않은 각 구간을 위해 새 completed 번호로 사용하기 위해 c를 구해온다.
    • 이 구간들에 존재하는 콜백들은 completed 번호와 동일한 gpnum이 완료된 후 처리된다.
    • 각 구간에 속한 콜백들의 completed 번호는 동일하고 4개 엔트리로 구성된 nxtcompleted[] 배열에 각 completed 번호가 저장된다.
  • 코드 라인 39~42에서 NEXT_READY(2) 구간과 WAIT(1) 구간에 대해 역순회한다. 먼저 NEXT_READY(2) 구간에 콜백들이 존재하고 이 구간에 부여된 completed 번호가 새로 사용할 번호 c보다 작은 경우 i=NEXT_READY(2)인 채로 break 한다. 동일한 방법으로 WAIT(1) 구간도 처리한다. 조건을 만족하지 못하는 경우 i=DONE(0)으로 루프를 벗어난다.


kernel/rcu/tree.c -2/2-

         * If there are no sublist for unassigned callbacks, leave.
         * At the same time, advance "i" one sublist, so that "i" will
         * index into the sublist where all the remaining callbacks should
         * be grouped into.
        if (++i >= RCU_NEXT_TAIL)
                return false;

         * Assign all subsequent callbacks' ->completed number to the next
         * full grace period and group them all in the sublist initially
         * indexed by "i".
        for (; i <= RCU_NEXT_TAIL; i++) {
                rdp->nxttail[i] = rdp->nxttail[RCU_NEXT_TAIL];
                rdp->nxtcompleted[i] = c;
        /* Record any needed additional grace periods. */
        ret = rcu_start_future_gp(rnp, rdp, NULL);

        /* Trace depending on how much we were able to accelerate. */
        if (!*rdp->nxttail[RCU_WAIT_TAIL])
                trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccWaitCB"));
                trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccReadyCB"));
        return ret;
  • 코드 라인 7~8에서 NEXT_READY(2) 구간에 콜백들이 존재하면 false를 반환한다.
  • 코드 라인 15~18에서 WAIT(1) ~ NEXT(2) 구간까지 순회하며 nxttail[i]을 마지막 콜백 위치를 가리키게 한다. 그리고 각 구간의 nextcompleted[i]에 새로 사용할 completed 번호로 알아온 c를 대입한다.
    • WAIT(1) ~ NEXT(2) 구간의 completerd 번호를 새로 부여하고, 이 구간내의 모든 콜백들이 DONE(0) 구간으로 변경되는 것이다.
  • 코드 라인 20에서 다음 gp를 시작해야 하는지 알아온다.




 * Determine the value that ->completed will have at the end of the
 * next subsequent grace period.  This is used to tag callbacks so that
 * a CPU can invoke callbacks in a timely fashion even if that CPU has
 * been dyntick-idle for an extended period with callbacks under the
 * influence of RCU_FAST_NO_HZ.
 * The caller must hold rnp->lock with interrupts disabled.
static unsigned long rcu_cbs_completed(struct rcu_state *rsp,
                                       struct rcu_node *rnp)
         * If RCU is idle, we just wait for the next grace period.
         * But we can only be sure that RCU is idle if we are looking
         * at the root rcu_node structure -- otherwise, a new grace
         * period might have started, but just not yet gotten around
         * to initializing the current non-root rcu_node structure.
        if (rcu_get_root(rsp) == rnp && rnp->gpnum == rnp->completed)
                return rnp->completed + 1;

         * Otherwise, wait for a possible partial grace period and
         * then the subsequent full grace period.
        return rnp->completed + 2;

콜백들의 completed 번호 발급을 위해 다음 gp 번호 값을 반환한다.

  • 코드 라인 20~21에서 rcu idling 중, 즉 다음 gp가 시작되지 않은 상태이면 completed 번호에 1을 추가한 값을 반환한다.
    • gp가 시작되지 않았고 rcu idling이라는 것을 노드 정보로만 빠르게 확인할 수 있는 조건은 다음과 같다.
      • 현재 cpu가 루트 rcu 노드에 포함된 경우만 더 빠른 파악이 가능하다.
      • 그리고 진행 중인 노드의 gpnum이 노드의 completed와 동일한 경우이면 gp가 완료된 후 새로운 gp가 시작되지 않았음을 의미한다. (= rcu idle)
    • 예) 로컬 cpu가 루트 노드에 포함되었고, 노드의 complete=100, 노드의 gpnum=100인 경우
      • 다음 추가되는 콜백들을 위한 completed 값으로 다음에 시작될 101번 gp가 완료된 후 처리가 되도록 101번을 부여한다.
  • 코드 라인 27에서 루트 노드를 통해 rcu idling이 확인되지 않거나 그 밖의 경우 gp가 진행중인 것으로 가정한다. 이 때에는 노드의 completed 번호에 +2를 하여 반환한다.
    • 현재 노드 gp(partial gp)가 진행 중에 있다. 이러한 경우 콜백들에 부여할 complete 번호로 노드 completed에 2를 더한 값을 반환한다.
    • 예) 노드 complete=100, 노드 gpnum=101
      • 다음 추가되는 콜백들을 위한 completed 값으로 다음에 시작될 102번 gp가 완료된 후 처리가 되도록 102번을 부여한다.


다음 그림은 해당 cpu가 루트 노드에 포함되었다고 가정한 모습을 보여준다. 아래 하늘색 구간에서 콜백들이 추가되면 101번 gp가 끝나고 처리되도록 completed 번호로 101번을 발급한다.




 * Does the current CPU require a not-yet-started grace period?
 * The caller must have disabled interrupts to prevent races with
 * normal callback registry.
static int
cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
        int i;

        if (rcu_gp_in_progress(rsp))
                return 0;  /* No, a grace period is already in progress. */
        if (rcu_future_needs_gp(rsp))
                return 1;  /* Yes, a no-CBs CPU needs one. */
        if (!rdp->nxttail[RCU_NEXT_TAIL])
                return 0;  /* No, this is a no-CBs (or offline) CPU. */
        if (*rdp->nxttail[RCU_NEXT_READY_TAIL])
                return 1;  /* Yes, this CPU has newly registered callbacks. */
        for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++)
                if (rdp->nxttail[i - 1] != rdp->nxttail[i] &&
                        return 1;  /* Yes, CBs for future grace period. */
        return 0; /* No grace period needed. */

새로운 gp가 시작되어야 하는지 체크한다. (true=new gp 필요)

  • 코드 라인 11~12에서 아직 gp가 진행 중인 경우 새 gp를 시작할 필요가 없으므로 0을 반환한다.
  • 코드 라인 13~14에서 gp가 필요한 경우라면 1을 반환한다.
  • 코드 라인 15~16에서 no-cb cpu이거나 cpu가 offline인 경우 0을 반환한다.
    • NEXT_TAIL 포인터가 null로 설정된 경우는 no-cb cpu인 경우이므로 이 곳에 콜백들을 추가하지 않는다.
    • 아래 그림에서 case A)
  • 코드 라인 17~18에서 NEXT_READY_TAIL 구간 다음에 처리할 콜백이 있는 경우 1을 반환한다.
    • 아래 그림에서 case B)와 case C) 비교
  • 코드 라인 19~23에서 new gp 또는 new+1 gp에서 처리할 콜백들이 있고, current GP가 완료된 경우 새 gp를 시작해도 되므로 1을 반환한다.
    • current gp가 완료되었음은 콜백이 전부 또는 일부가 처리되어 rdp->nxtcompleted가 증가됨으로 rsp->completed 보다 커지게 되었음을 확인함으로 알 수 있다.
    • 아래 그림에서 case D)와 case E)의 경우이다.




RCU NO-CB (Offload RCU callback)

rcu의 cb를 처리하는 유형은 큰 흐름으로 다음과 같이 두 가지로 나뉜다.

  • cb 호출 – interrupt context에서 직접 처리
    • softirq에서 처리된다. 이러한 경우 보통 interrupt context에서 곧장 호출되어 처리되고 만일 softirq 처리 건 수가 많아져 지연되는 경우 softirqd 커널 스레드에서 호출되어 처리된다.
    • 디폴트 커널 설정이며 latency가 짧아 빠른 호출이 보장된다.
  • no-cb 호출 – 전용 rcu 커널 스레드에서 처리
    • 특정 rcu 커널 스레드에서 cb를 호출할 수 있도록 한다. 절전과 성능을 만족시키는 옵션이다.
    • CONFIG_RCU_NOCB_CPU 커널 옵션을 사용하고 다음 3가지 중 하나를 선택할 수 있다.
        • 디폴트로 nocb 지정된 cpu는 없지만 “nocbs=” 커널 파라메터로 특정 cpu들을 nocb로 지정할 수 있다.
        • 디폴트로 cpu#0을 nocb 지정한다. 추가로 “nocbs=” 커널 파라메터를 사용하여 다른 cpu들을 nocb로 지정할 수 있다.
        • 디폴트로 모든 cpu를 nocb로 지정한다.




 * This is a helper for __call_rcu(), which invokes this when the normal
 * callback queue is inoperable.  If this is not a no-CBs CPU, this
 * function returns failure back to __call_rcu(), which can complain
 * appropriately.
 * Otherwise, this function queues the callback where the corresponding
 * "rcuo" kthread can find it.
static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp,
                            bool lazy, unsigned long flags)

        if (!rcu_is_nocb_cpu(rdp->cpu))
                return false;
        __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags);
        if (__is_kfree_rcu_offset((unsigned long)rhp->func))
                trace_rcu_kfree_callback(rdp->rsp->name, rhp,
                                         (unsigned long)rhp->func,
                trace_rcu_callback(rdp->rsp->name, rhp,

         * If called from an extended quiescent state with interrupts
         * disabled, invoke the RCU core in order to allow the idle-entry
         * deferred-wakeup check to function.
        if (irqs_disabled_flags(flags) &&
            !rcu_is_watching() &&

        return true;

no-cb 지정된 cpu인 경우 rcu 커널 스레드(“rcuo”)에서 처리하도록 rcu cb를 큐잉한다.

  • 코드 라인 14~15에서 no-cb 지정된 cpu가 아니면 false를 반환한다.
  • 코드 라인 16에서 no-cb 큐에 rcu를 추가하고 rcu 커널 스레드를 호출하다.
  • 코드 라인 17~25에서 kfree용 rcu cb 및 함수 호출용 rcu cb 각각에 따른 trace 메시지 출력을 다르게 한다.
  • 코드 라인 32~35에서 irq disable이면서 확장 qs 상태에서 rcu 요청한 경우 softirq를 호출하여 wakeup 유예 체크를 한다.





/* Is the specified CPU a no-CBs CPU? */
bool rcu_is_nocb_cpu(int cpu)
        if (have_rcu_nocb_mask)
                return cpumask_test_cpu(cpu, rcu_nocb_mask);
        return false;

요청한 cpu가 no-cb(rcu 스레드 사용)으로 설정되었는지 여부를 반환한다.

  • have_rcu_nocb_mask 변수는 rcu_nocb_setup() 함수가 호출되어 rcu_nocb_mask라는 cpu 마스크가 할당된 경우 true로 설정된다.
  • CONFIG_RCU_NOCB_CPU_ALL 커널 옵션을 사용하는 경우 항상 rcu 스레드에서 동작시키기 위해 true를 반환한다.
  • CONFIG_RCU_NOCB_CPU_ALL 및 CONFIG_RCU_NOCB_CPU 커널 옵션 둘 다 사용하지 않는 경우 항상 callback 처리하기 위해 false를 반환한다.




/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */
static int __init rcu_nocb_setup(char *str)
        have_rcu_nocb_mask = true;
        cpulist_parse(str, rcu_nocb_mask);
        return 1;
__setup("rcu_nocbs=", rcu_nocb_setup);

커널 파라메터 “rcu_nocbs=”에 cpu 리스트를 설정한다. 이렇게 설정된 cpu들은 rcu callback 처리를 rcu 스레드에서 처리할 수 있다.

  • 예) rcu_nocbs=3-6,8-10



답글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다.