RCU NO-CB (Offload RCU callback)
rcu의 cb를 처리하는 유형은 큰 흐름으로 다음과 같이 두 가지로 나뉜다.
- cb 호출 – interrupt context에서 직접 처리
- softirq에서 처리된다. 이러한 경우 보통 interrupt context에서 곧장 호출되어 처리되고 만일 softirq 처리 건 수가 많아져 지연되는 경우 softirqd 커널 스레드에서 호출되어 처리된다.
- 디폴트 커널 설정이며 latency가 짧아 빠른 호출이 보장된다.
- no-cb 호출 – 전용 rcu 커널 스레드에서 처리
- 특정 rcu 커널 스레드에서 cb를 호출할 수 있도록 한다. 절전과 성능을 만족시키는 옵션이다.
- CONFIG_RCU_NOCB_CPU 커널 옵션을 사용하고 다음 3가지 중 하나를 선택할 수 있다.
- CONFIG_RCU_NOCB_CPU_NONE
- 디폴트로 nocb 지정된 cpu는 없지만 “nocbs=” 커널 파라메터로 특정 cpu들을 nocb로 지정할 수 있다.
- CONFIG_RCU_NOCB_CPU_ZERO
- 디폴트로 cpu#0을 nocb 지정한다. 추가로 “nocbs=” 커널 파라메터를 사용하여 다른 cpu들을 nocb로 지정할 수 있다.
- CONFIG_RCU_NOCB_CPU_ALL
- 디폴트로 모든 cpu를 nocb로 지정한다.
- CONFIG_RCU_NOCB_CPU_NONE
Leader & Follower CPU
NO-CB용 콜백을 사용하는 경우 지정된 cpu에 nocb용 커널 스레드가 구성된다. grace period 관리 및 이에 수반되는 콜백 리스트의 cascade 관리를 위해 leader cpu를 지정하고 그 leader cpu가 처리해준 작업을 수행하는 follower cpu를 구성하여 처리한다.
- Leader cpu가 하는 일
- gp 관리
- leader 및 follower에 속한 콜백 리스트의 이동
- leader 콜백 호출
- Follower cpu가 하는 일
- follower 콜백 호출
NO-CB용 콜백 추가
__call_rcu_nocb()
kernel/rcu/tree_plugin.h
/* * This is a helper for __call_rcu(), which invokes this when the normal * callback queue is inoperable. If this is not a no-CBs CPU, this * function returns failure back to __call_rcu(), which can complain * appropriately. * * Otherwise, this function queues the callback where the corresponding * "rcuo" kthread can find it. */ static bool __call_rcu_nocb(struct rcu_data *rdp, struct rcu_head *rhp, bool lazy, unsigned long flags) { if (!rcu_is_nocb_cpu(rdp->cpu)) return false; __call_rcu_nocb_enqueue(rdp, rhp, &rhp->next, 1, lazy, flags); if (__is_kfree_rcu_offset((unsigned long)rhp->func)) trace_rcu_kfree_callback(rdp->rsp->name, rhp, (unsigned long)rhp->func, -atomic_long_read(&rdp->nocb_q_count_lazy), -atomic_long_read(&rdp->nocb_q_count)); else trace_rcu_callback(rdp->rsp->name, rhp, -atomic_long_read(&rdp->nocb_q_count_lazy), -atomic_long_read(&rdp->nocb_q_count)); /* * If called from an extended quiescent state with interrupts * disabled, invoke the RCU core in order to allow the idle-entry * deferred-wakeup check to function. */ if (irqs_disabled_flags(flags) && !rcu_is_watching() && cpu_online(smp_processor_id())) invoke_rcu_core(); return true; }
no-cb 지정된 cpu인 경우 rcu 커널 스레드(“rcuo”)에서 처리하도록 rcu cb를 큐잉한다.
- 코드 라인 14~15에서 no-cb 지정된 cpu가 아니면 false를 반환한다.
- 코드 라인 16에서 no-cb 큐에 rcu를 추가하고 rcu 커널 스레드를 호출하다.
- 코드 라인 17~25에서 kfree용 rcu cb 및 함수 호출용 rcu cb 각각에 따른 trace 메시지 출력을 다르게 한다.
- 코드 라인 32~35에서 irq disable이면서 확장 qs 상태에서 rcu 요청한 경우 softirq를 호출하여 rcu core를 호출한다.
다음 그림은 9개의 cpu가 nocb를 사용하는 모습을 보여준다.
RCU NO-CB 설정
rcu_is_nocb_cpu()
kernel/rcu/tree_plugin.h
/* Is the specified CPU a no-CBs CPU? */ bool rcu_is_nocb_cpu(int cpu) { if (have_rcu_nocb_mask) return cpumask_test_cpu(cpu, rcu_nocb_mask); return false; }
요청한 cpu가 no-cb(rcu 스레드 사용)으로 설정되었는지 여부를 반환한다.
- have_rcu_nocb_mask 변수는 rcu_nocb_setup() 함수가 호출되어 rcu_nocb_mask라는 cpu 마스크가 할당된 경우 true로 설정된다.
- CONFIG_RCU_NOCB_CPU_ALL 커널 옵션을 사용하는 경우 항상 rcu 스레드에서 동작시키기 위해 true를 반환한다.
- CONFIG_RCU_NOCB_CPU_ALL 및 CONFIG_RCU_NOCB_CPU 커널 옵션 둘 다 사용하지 않는 경우 항상 callback 처리하기 위해 false를 반환한다.
rcu_nocb_setup()
kernel/rcu/tree_plugin.h
/* Parse the boot-time rcu_nocb_mask CPU list from the kernel parameters. */ static int __init rcu_nocb_setup(char *str) { alloc_bootmem_cpumask_var(&rcu_nocb_mask); have_rcu_nocb_mask = true; cpulist_parse(str, rcu_nocb_mask); return 1; } __setup("rcu_nocbs=", rcu_nocb_setup);
커널 파라메터 “rcu_nocbs=”에 cpu 리스트를 설정한다. 이렇게 설정된 cpu들은 rcu callback 처리를 rcu 스레드에서 처리할 수 있다.
- 예) rcu_nocbs=3-6,8-10
Leader cpu와 Follower cpu 구성
rcu_organize_nocb_kthreads()
kernel/rcu/tree_plugin.h
/* * Initialize leader-follower relationships for all no-CBs CPU. */ static void __init rcu_organize_nocb_kthreads(struct rcu_state *rsp) { int cpu; int ls = rcu_nocb_leader_stride; int nl = 0; /* Next leader. */ struct rcu_data *rdp; struct rcu_data *rdp_leader = NULL; /* Suppress misguided gcc warn. */ struct rcu_data *rdp_prev = NULL; if (!have_rcu_nocb_mask) return; if (ls == -1) { ls = int_sqrt(nr_cpu_ids); rcu_nocb_leader_stride = ls; } /* * Each pass through this loop sets up one rcu_data structure and * spawns one rcu_nocb_kthread(). */ for_each_cpu(cpu, rcu_nocb_mask) { rdp = per_cpu_ptr(rsp->rda, cpu); if (rdp->cpu >= nl) { /* New leader, set up for followers & next leader. */ nl = DIV_ROUND_UP(rdp->cpu + 1, ls) * ls; rdp->nocb_leader = rdp; rdp_leader = rdp; } else { /* Another follower, link to previous leader. */ rdp->nocb_leader = rdp_leader; rdp_prev->nocb_next_follower = rdp; } rdp_prev = rdp; } }
nocb 지정된 cpu에서 동작하는 스레드들에 대해 leader와 follower cpu로 나누어 구성한다.
- 코드 라인 13~14에서 nocb 설정된 cpu가 없으면 함수를 빠져나간다.
- 코드 라인 15~18에서 nocb로 구성된 cpu에 대해 leader cpu를 포함하여 follower cpu를 몇 개로 구성할지 여부를 알아온다. 만일 -1로 설정된 경우 sqrt(online cpu수) 결과값으로 사용한다.
- “rcu_nocb_leader_stride=4″로 설정하는 경우 4개의 cpu마다 그룹을 나누어 첫번째는 leader cpu로 구성하고 나머지는 3개는 follower cpu로 구성한다.
- y = int_sqrt(x) 함수에서 x 값의 rough한 루트 근사치를 구해온다.
- x=1~3 y=1
- x=4~8 y=2
- x=9~15 y=3
- x=16~24 y=4
- x=25~35 y=5
- x=36~48 y=6
- x=49~63 y=7
- x=64~80 y=8
- x=81~99 y=9
- x=100~120 y=10
- x=121~143 y=11
- x=144~168 y=12
- x=169~195 y=13
- x=196~224 y=14
- x=225~255 y=15
- 코드 라인 24~37에서 leader 및 follower cpu를 구성한다.
- 예) “rcu_nocb_leader_stride=4”, “rcu_nocbs=6-11, 16-19″인 경우
- 6=leader, 7=follower
- 8=leader, 9, 10, 11=follower
- 16=leader, 17, 18, 19=follower
- 예) “rcu_nocb_leader_stride=4”, “rcu_nocbs=6-11, 16-19″인 경우
RCU NO-CB 처리용 커널 스레드
rcu_nocb_kthread()
kernel/rcu/tree_plugin.h
/* * Per-rcu_data kthread, but only for no-CBs CPUs. Each kthread invokes * callbacks queued by the corresponding no-CBs CPU, however, there is * an optional leader-follower relationship so that the grace-period * kthreads don't have to do quite so many wakeups. */ static int rcu_nocb_kthread(void *arg) { int c, cl; struct rcu_head *list; struct rcu_head *next; struct rcu_head **tail; struct rcu_data *rdp = arg; /* Each pass through this loop invokes one batch of callbacks */ for (;;) { /* Wait for callbacks. */ if (rdp->nocb_leader == rdp) nocb_leader_wait(rdp); else nocb_follower_wait(rdp); /* Pull the ready-to-invoke callbacks onto local list. */ list = ACCESS_ONCE(rdp->nocb_follower_head); BUG_ON(!list); trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeNonEmpty"); ACCESS_ONCE(rdp->nocb_follower_head) = NULL; tail = xchg(&rdp->nocb_follower_tail, &rdp->nocb_follower_head); /* Each pass through the following loop invokes a callback. */ trace_rcu_batch_start(rdp->rsp->name, atomic_long_read(&rdp->nocb_q_count_lazy), atomic_long_read(&rdp->nocb_q_count), -1); c = cl = 0; while (list) { next = list->next; /* Wait for enqueuing to complete, if needed. */ while (next == NULL && &list->next != tail) { trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("WaitQueue")); schedule_timeout_interruptible(1); trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, TPS("WokeQueue")); next = list->next; } debug_rcu_head_unqueue(list); local_bh_disable(); if (__rcu_reclaim(rdp->rsp->name, list)) cl++; c++; local_bh_enable(); list = next; } trace_rcu_batch_end(rdp->rsp->name, c, !!list, 0, 0, 1); smp_mb__before_atomic(); /* _add after CB invocation. */ atomic_long_add(-c, &rdp->nocb_q_count); atomic_long_add(-cl, &rdp->nocb_q_count_lazy); rdp->n_nocbs_invoked += c; } return 0; }
no-cb용 커널 스레드로 무한 루프를 돌며 gp를 대기한다. gp가 완료되고 대기하던 콜백들이 최종적으로 follower 리스트로 이동하면 이를 모두 호출하여 처리한다.
- 코드 라인 16~21에서 무한 루프를 돌며 현재 스레드가 nocb_leader로서 동작하는 경우와 nocb_follower로 동작하는 것을 구분하여 콜백을 처리할 준비를 하기 위해 대기한다.
- 코드 라인 24~28에서 콜백들을 호출하기 위해 follower 리스트에 담긴 콜백들을 모두 비워 로컬에 있는 list로 옮긴다.
- 코드 라인 34~53에서 로컬 리스트로 가져온 콜백들을 모두 호출하여 수행한다.
- 코드 라인 55~57에서 처리한 콜백 수 만큼 nocb_q_count 및 nocb_q_count_lazy를 갱신한다.
- 코드 라인 48에서 콜백 호출한 횟 수인 n_nocbs_invoked 카운터를 갱신한다.
nocb_leader_wait()
kernel/rcu/tree_plugin.h
/* * Leaders come here to wait for additional callbacks to show up. * This function does not return until callbacks appear. */ static void nocb_leader_wait(struct rcu_data *my_rdp) { bool firsttime = true; bool gotcbs; struct rcu_data *rdp; struct rcu_head **tail; wait_again: /* Wait for callbacks to appear. */ if (!rcu_nocb_poll) { trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Sleep"); wait_event_interruptible(my_rdp->nocb_wq, !ACCESS_ONCE(my_rdp->nocb_leader_sleep)); /* Memory barrier handled by smp_mb() calls below and repoll. */ } else if (firsttime) { firsttime = false; /* Don't drown trace log with "Poll"! */ trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "Poll"); } /* * Each pass through the following loop checks a follower for CBs. * We are our own first follower. Any CBs found are moved to * nocb_gp_head, where they await a grace period. */ gotcbs = false; for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) { rdp->nocb_gp_head = ACCESS_ONCE(rdp->nocb_head); if (!rdp->nocb_gp_head) continue; /* No CBs here, try next follower. */ /* Move callbacks to wait-for-GP list, which is empty. */ ACCESS_ONCE(rdp->nocb_head) = NULL; rdp->nocb_gp_tail = xchg(&rdp->nocb_tail, &rdp->nocb_head); gotcbs = true; } /* * If there were no callbacks, sleep a bit, rescan after a * memory barrier, and go retry. */ if (unlikely(!gotcbs)) { if (!rcu_nocb_poll) trace_rcu_nocb_wake(my_rdp->rsp->name, my_rdp->cpu, "WokeEmpty"); WARN_ON(signal_pending(current)); schedule_timeout_interruptible(1); /* Rescan in case we were a victim of memory ordering. */ my_rdp->nocb_leader_sleep = true; smp_mb(); /* Ensure _sleep true before scan. */ for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) if (ACCESS_ONCE(rdp->nocb_head)) { /* Found CB, so short-circuit next wait. */ my_rdp->nocb_leader_sleep = false; break; } goto wait_again; }
leader 및 follower에 등록된 신규 콜백들을 follower_gp 리스트로 이동시킨다. gp가 만료된 후 follower_gp 리스트에 있는 콜백들을 follower 리스트로 옮기고 콜백들을 호출하도록 해당 follower의 nocb용 커널 스레드를 깨운다.
- 코드 라인 15~23에서 “rcu_nocb_poll” 커널 파라메터 설정이 없는 경우 이 라인에서 대기한다.
- 코드 라인 30~40에서 follower cpu들에 대해 nocb용 리스트를 nocb_gp 리스트로 옮긴다. 만일 등록된 콜백이 없으면 skip 한다.
- 코드 라인 46~63에서 leader에 대응하는 follower cpu들 모두에 등록된 콜백이 없으면 leader는 1 tick 만큼 슬립한 후 leader에 등록한 콜백을 검사해서 여전히 없으면 이 함수 처음 wait_agail 레이블로 다시 이동한다.
/* Wait for one grace period. */ rcu_nocb_wait_gp(my_rdp); /* * We left ->nocb_leader_sleep unset to reduce cache thrashing. * We set it now, but recheck for new callbacks while * traversing our follower list. */ my_rdp->nocb_leader_sleep = true; smp_mb(); /* Ensure _sleep true before scan of ->nocb_head. */ /* Each pass through the following loop wakes a follower, if needed. */ for (rdp = my_rdp; rdp; rdp = rdp->nocb_next_follower) { if (ACCESS_ONCE(rdp->nocb_head)) my_rdp->nocb_leader_sleep = false;/* No need to sleep.*/ if (!rdp->nocb_gp_head) continue; /* No CBs, so no need to wake follower. */ /* Append callbacks to follower's "done" list. */ tail = xchg(&rdp->nocb_follower_tail, rdp->nocb_gp_tail); *tail = rdp->nocb_gp_head; smp_mb__after_atomic(); /* Store *tail before wakeup. */ if (rdp != my_rdp && tail == &rdp->nocb_follower_head) { /* * List was empty, wake up the follower. * Memory barriers supplied by atomic_long_add(). */ wake_up(&rdp->nocb_wq); } } /* If we (the leader) don't have CBs, go wait some more. */ if (!my_rdp->nocb_follower_head) goto wait_again; }
- 코드 라인 2에서 gp의 완료까지 대기한다.
- 코드 라인 13~30에서 leader에 속한 follower cpu들을 순회하며 nocb_gp 리스트에 있는 콜백들을 nocb_follower 리스트로 옮기고 등록된 콜백들을 호출하라고 해당 nocb 커널 스레드를 깨운다.
- 코드 라인 33~34에서 leader에 등록된 콜백이 없으면 함수 처음으로 돌아간다.
rcu_nocb_wait_gp()
kernel/rcu/tree_plugin.h
/* * If necessary, kick off a new grace period, and either way wait * for a subsequent grace period to complete. */ static void rcu_nocb_wait_gp(struct rcu_data *rdp) { unsigned long c; bool d; unsigned long flags; bool needwake; struct rcu_node *rnp = rdp->mynode; raw_spin_lock_irqsave(&rnp->lock, flags); smp_mb__after_unlock_lock(); needwake = rcu_start_future_gp(rnp, rdp, &c); raw_spin_unlock_irqrestore(&rnp->lock, flags); if (needwake) rcu_gp_kthread_wake(rdp->rsp); /* * Wait for the grace period. Do so interruptibly to avoid messing * up the load average. */ trace_rcu_future_gp(rnp, rdp, c, TPS("StartWait")); for (;;) { wait_event_interruptible( rnp->nocb_gp_wq[c & 0x1], (d = ULONG_CMP_GE(ACCESS_ONCE(rnp->completed), c))); if (likely(d)) break; WARN_ON(signal_pending(current)); trace_rcu_future_gp(rnp, rdp, c, TPS("ResumeWait")); } trace_rcu_future_gp(rnp, rdp, c, TPS("EndWait")); smp_mb(); /* Ensure that CB invocation happens after GP end. */ }
새 gp가 완료되기를 기다린다.
- 코드 라인 13~18에서 노드락을 걸고 새 gp의 completed 번호를 발급받아온다. 결과 값에 다라 gp 커널 스레드의 wakeup을 요청한다.
- 코드 라인 25~33에서 발급받은 completed 번호로 홀짝 구분한 대기큐에서 요청 노드의 completed가 발급 받은 번호 이상이될 때까지 대기한다.
nocb_follower_wait()
kernel/rcu/tree_plugin.h
/* * Followers come here to wait for additional callbacks to show up. * This function does not return until callbacks appear. */ static void nocb_follower_wait(struct rcu_data *rdp) { bool firsttime = true; for (;;) { if (!rcu_nocb_poll) { trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "FollowerSleep"); wait_event_interruptible(rdp->nocb_wq, ACCESS_ONCE(rdp->nocb_follower_head)); } else if (firsttime) { /* Don't drown trace log with "Poll"! */ firsttime = false; trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "Poll"); } if (smp_load_acquire(&rdp->nocb_follower_head)) { /* ^^^ Ensure CB invocation follows _head test. */ return; } if (!rcu_nocb_poll) trace_rcu_nocb_wake(rdp->rsp->name, rdp->cpu, "WokeEmpty"); WARN_ON(signal_pending(current)); schedule_timeout_interruptible(1); } }
follower cpu는 이 곳에서 대기한다. leader가 gp 만료되면 깨워준다.
- “rcu_nocb_poll” 커널 파라메터가 설정된 경우 1 tick 마다 폴링한다.
참고
- RCU(Read Copy Update) -1- (Basic) | 문c
- RCU(Read Copy Update) -2- (Callback process) | 문c
- RCU(Read Copy Update) -3- (RCU threads) | 문c
- RCU(Read Copy Update) -4- (NOCB process) | 문c – 현재글
- rcu_init() | 문c
- wait_for_completion() | 문c