RCU(Read Copy Update) -1- (Basic)

 

RCU History

  • RCU는 읽기 동작에서 블러킹 되지 않는 read/write 동기화 메커니즘
  • 2002년 커널 버전 2.5.43에서 소개됨
  • 2005년 PREEMPT_RCU가 추가됨
  • 2009년 user-level RCU도 소개됨

 

장/단점

RCU는 read-side overhead를 최소화하는데 목적이 있기 때문에 동기화 로직이 읽기 동작에 더 많은 비율로 사용되는 경우에만 사용한다. 수정 동작이 10%이상인 경우 오히려 성능이 떨어지므로 RCU 대신 다른 동기화 기법을 선택해야 한다.

  • 장점
    • read side overhead가 거의 없음. zero wait, zero overhead
      • 정말 zero는 아니고 zero에 가까울 만큼 빠름
    • deadlock 이슈 없음
    • priority inversion 이슈 없음 (priority inversion & priority inheritance)
    • unbounded latency 이슈 없음
    • 메모리 leak hazard 이슈 없음
  • 단점
    • 사용이 약간 복잡
    • 쓰기 동작에서는 다른 동기화 기법보다 조금 더 느리다.

 

RCU 구현

  • Classical RCU – a.k.a tiny RCU
    • CONFIG_TINY_RCU 커널 옵션
    • single 데이터 스트럭처
    • CPU가 많아지는 경우 성능 떨어짐
      • 현재 커널이 SMP가 아니고 non-preempt-able로 빌드된 경우 디폴트로 선택되는 RCU 옵션이다.
  • Hierarchical RCU – a.k.a tree RCU
    • CONFIG_TREE_RCU 커널 옵션
    • tree 확장된 RCU 구현
    • 현재 커널이 SMP이고 non-preempt-able로 빌드된 경우 디폴트로 선택되는 RCU 옵션이다.
  • Preemptible tree-based hierarchical RCU
    • preempt-able(CONFIG_PREEMPT_RCU 커널 옵션)을 사용하는 최근 linux kernel의 기본 RCU이다.
    • tree 확장된 RCU 구현
    • read-side critical section에서 preemption이 지원
    • SRCU
      • CONFIG_SRCU 커널 옵션을 추가하여 사용하는 경우 read-side critical section에서 sleep 가능하다.
  • URCU
    • Userspace RCU (liburcu)

 RCU 구현 선택

RCU를 선택하여 사용할 때 일반적으로 다음과 같이 나누어 선택할 수 있다.

  • CONFIG_TINY_RCU
    • CONFIG_PREEMPT=n, CONFIG_SMP=n
    • footprint가 작은 시스템에서 운영할 때 적절하다.
  • CONFIG_TREE_RCU
    • CONFIG_PREEMPT=n, CONFIG_SMP=y
    • SMP 시스템이고 서버로 운영할 때 적절하다.
  • CONFIG_TREE_PREEMPT=y
    • CONFIG_PREEMPT=y
    • 빠른 응답을 요구하는 임베디드 시스템에서 운영할 때 적절하다.

 

추가적으로 절전을 요구하는 시스템에서 다음과 같은 설정을 사용한다.

  • CONFIG_RCU_FAST_NO_HZ=y
  • CONFIG_RCU_NOCB_CPU=y

 

더 자세한 커널 파라메터 설정은 다음 문서를 참고한다.

 

RCU 성능

reader-side에서의 성능 비교 (rwlock vs RCU)

rcu2

(통계: lwn.net 참고)

리눅스 커널에서의 사용율 증가

rcu3

일반 lock 획득시의 나쁜 성능

  • lock 획득하는데 소요하는 자원이 critical section을 수행하는 것에 비해 수백배 이상 over-head가 발생한다.

rcu4

 

RCU  기본 요소

RCU는 다음과 같이 3가지의 기본 요소와 특징이 있다.

  • Reader
    • rcu_read_lock()과 rcu_read_unlock() 코드 범위의 Read-side critical section이다.
    • 접근에 대한 주의
      • 가장 주의 할 것으로 보호 받아야 할 데이터(protected rcu data)에 접근할 때 항상 이 영역내에서만 접근하여야 한다.
      • 이 영역을 벗어난 사용은 RCU 규칙을 벗어나는 것이며 존재하지 않는 자료 또는 잘못된 데이터로의 접근이될 수 있다.
      • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 한다.
        • rcu_dereference() 매크로 함수를 사용하면 간단하다.
    • 구현에 따른 차이
      • non-preemptable RCU 커널을 사용하는 경우에는 read_rcu_lock() 내부에서 preemption을 금지하는 코드만 적용된다. sleep하는 blocked api를 사용하면 안되고 가급적 빨리 빠져나와야 한다.
      • preemptable RCU 커널을 사용하는 경우에는 read_rcu_lock() 내부에서 로컬 카운터를 증가시키고 read_rcu_unlock() 내부에서 로컬 카운터를 감소시킨다. read-side critical section 내에서 preemption될 수 있다. 만일 SRCU 커널 옵션도 사용하는 경우 sleep 가능한 blocked api의 사용도 가능하다.
  • Updater
    • 기존의 여러 가지 락 중 하나를 사용하여 데이터를 보호하고 수정한다.
    • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 한다.
      • rcu_dereference() 및 rcu_assign_pointer() 매크로 함수를 사용하면 간단하다.
    • 보호 받을 자료를 복사하여 수정한 후 기존 자료의 회수(free)를 위해 동기 함수인 synchronize_rcu() 또는 비동기 함수인 call_rcu() 함수를 호출하여 아래의 reclaimer를 동작하게 하다.
    • 이 구성요소에서 Read, Copy, Update 과정을 사용한다.
  • Reclaimer
    • Update에서 최종 수정한 객체가 아닌 사용이 완료된 객체들에 대해 cleanup한다.
    • 안전하게 cleanup되도록 삭제할 데이터에 접근하는 Reader 들이 없음을 보장하기 위해 Grace period가 지난 후에 Reclaimer가 동작한다.

rcu1

 

기본 구조체 노드에서의 RCU 사용

Reader: RCU node를 사용할 때

  • Read side critical section의 시작은 rcu_read_lock()으로 시작한다.
    • non-preemptable RCU 커널을 사용하는 경우에는 이 구간에서 preemption을 금지시킨다.
      • 이 영역에서는 sleep하는 blocked api를 사용하면 안되고 가급적 빨리 빠져나와야 한다.
    • preemptable RCU 커널을 사용하는 경우에는 로컬 카운터를 증가시킨다.
      • 이 영역에서도 preemption이 가능하다. 그러나 내부에서 sleep이 가능한 block-api를 사용하면 안된다.
  • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 하는데 이를 쉽게 할 수 있도록 rcu_dereference()와 같은 매크로 함수를 사용하면 편리하다. 이를 사용하여 안전하게 dereference된 RCU-protected pointer 값을 얻어온다.
    • read critical section을 완료하면 더 이상 dereference된 pointer를 사용할 수 없으므로 이 포인터를 이용하여 얻어올 해당 자료의 l-value 값 들을 읽어온다.(구조체 전체를 복사하여 가져오거나…)
  • Read side critical section의 끝은 read_rcu_unlock() 함수로 완료한다.
    • non-preemptable RCU 커널을 사용하는 경우에는 preemption를 다시 허용시킨다.
    • preemptable RCU 커널을 사용하는 경우에는 로컬 카운터를 감소시킨다.

 

다음 동기화되어 보호받아야 할 데이터의 멤버 a를 얻어오는 코드를 확인한다. (preemptible rcu)

  • 글로벌 참조 카운터의 증가는 conceptual한 것이다.
  • 실제 구현에서의  참조카운터 증가는 현재 태스크 디스크립터에 저장된다. 태스크 디스크립터는 캐시 되어 있을 것이므로 매우 빠른 액세스가 보장된다.
    • (p->rcu_read_lock_nesting)

rcu5b

int foo_get_a(void)
{
	int retval;
	rcu_read_lock();
	retval = rcu_dereference(gbl_foo)->a;
	rcu_read_unlock();
	return retval;
}

 

Updater: RCU node의 수정

  • 기존 노드 데이터를 곧바로 수정하지 않고 복사(Read-Copy)한 후 그 복사한 자료를 수정(Update)해야 하므로 준비 과정으로 새로운 노드 데이터를 먼저 할당한다.
  • Write side critical section의 시작은 기존 lock 중 적절한 하나를 사용한다.
  • rcu_dereference_protected() 매크로 함수를 사용하여 매모리 배리어를 사용한 후 안전하게 노드를 가리키는 포인터를 얻어온다.
  • 기존 노드 데이터에 대한 직접 수정은 금지된다. 곧바로 수정하지 않고 복사한 후에 수정 작업을 진행해야 다른 Reader 들이 안전하게 기존 노드 데이터를 사용할 수 있다.
  • rcu_assign_pointer()를 사용하여 먼저 매모리 배리어를 사용한 후 안전하게 노드 포인터를 교체한다.
  • 교체가 완료되었으면 Write side critical section의 끝으로 사용한 lock을 닫는다.
  • 마지막으로 기존 노드 메모리 삭제를 위해 다른 Reader 들이 접근하지 않는 것이 보장되는 시간에 기존 노드 메모리를 삭제할 수 있도록 rcu 동기 또는 비동기 호출을 한다.
    • call_rcu:
      • rcu 비동기 호출로 non-blocking 함수이다.
      • 모든 기존 RCU read-side critical sections들이 끝나면 인수로 요청한 함수를 호출한다.
    • synchronize_rcu():
      • rcu 동기 호출로 blocking 함수이다.
      • 모든 기존 RCU read-side critical sections들이 끝날 때까지 이 함수내에서 기다리며 그 후 인수로 요청한 함수를 호출한다.

 

아래 그림과 같이 기존 노드를 안전하게 수정하기 위해 복제 후 수정하고 기존 노드는 삭제 요청한다.

rcu7b

void foo_update_a(int new_a)
{
	struct foo *new_fp;
	struct foo *old_fp;
	new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
	spin_lock(&foo_mutex);
	old_fp = rcu_dereference_protected(gbl_foo,
		 lockdep_is_held(&foo_mutex));
	*new_fp = *old_fp;
	new_fp->a = new_a;
	rcu_assign_pointer(gbl_foo, new_fp);
	spin_unlock(&foo_mutex);
	call_rcu(&old_fp->rcu, foo_reclaim); 
}

 

Reclaimer: RCU node 사용 완료 후 폐기

  • 기존 데이터의 다른 Reader 들에 의해 사용되지 않음이 확인되면 grace periods 이후에 삭제하여 메모리를 회수한다.
  • 참조 카운터가 0이되면 삭제하는 것은 conceptual한 것이다. 글로벌 참조 카운터를 atomic하게 업데이트하는 구조(spin-lock)의 동기화를 사용하지 않아야 하므로 실제 구현은 더 복잡한다.

rcu6b

void foo_reclaim(struct rcu_head *rp)
{
	struct foo *fp = container_of(rp, struct foo, rcu);
	foo_cleanup(fp->a);
	kfree(fp);
} 

 

리스트에서의 RCU 사용

Reader: RCU node를 사용할 때

rcu13

void disp_foo(void)
{
	rcu_read_lock();
	list_for_each_entry_rcu(p, head, list) {
		disp_foo();
	}
	rcu_read_unlock();
}

 

Updater: RCU node의 수정

  • list_replace_rcu()를 수행한 순간 역방향 연결은 끊어진 반면 순방향 연결은 계속 살아있다.

rcu14

void foo_update_a(int new_a)
{
	struct foo *new_fp;
	struct foo *old_fp = search(head, key);
	if (old_fp == NULL) {
	/* Take appropriate action, unlock, and return. */
	}
	new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
	*new_fp = *old_fp;
	new_fp->a = new_a;
	list_replace_rcu(&old_fp->list, &new_fp->list);
	call_rcu(&old_fp->rcu, foo_reclaim); 
}
static inline void list_replace_rcu(struct list_head *old,
                                struct list_head *new)
{
        new->next = old->next;
        new->prev = old->prev;
        rcu_assign_pointer(list_next_rcu(new->prev), new);
        new->next->prev = new;
        old->prev = LIST_POISON2;
}

 

Reclaimer: RCU node 사용 완료 후 폐기

  • Grace periods를 지난 후 삭제 대상 노드를 모두 clean-up한다.

rcu15

void foo_reclaim(struct rcu_head *rp)
{
	struct foo *fp = container_of(rp, struct foo, rcu);
	foo_cleanup(fp->a);
	kfree(fp);
}

 

GP(Grace Period)

사용자는 특정 자료의 동기화를 위해 두 가지의 관점에서 처리한다.

  • 읽기 처리만 수행하는 read-side critical section이 있고 이를 Reader로 부른다.
  • 변경 처리가 포함된 write-side critical section이 있고 이를 Updater 또는 Writer로 부른다.

 

Writer 발생되는 시점, 즉 write-side critical section 부터 시작되는 구간 3가지를 정리해보았다.

  • Removal 구간: write-side critical section 구간에서는 읽어서(Read() 사본(Copy)을 만들고 변경(Update) 작업을 한다.
  • Grace Period 구간: Removal을 수행하는 자료에 관련된 Reader들의 처리가 완료됨을 보장할 수 있도록 대기하는 구간이다.
  • Reclamation 구간: 기존 사본을 원본에 적용하는 구간이다.

 

Removal(Read-Copy-Update)이 진행되는 동안 해당 데이터에 접근하고 있는 Reader들을 보호하기 위해 Removal은 기존 Reader들이 접근하고 있는 자료는 곧바로 수정하지 않고 이를 먼저 Copy한 후 Update하여 사용한다. 결국 Reader들은 원본 자료에만 접근하므로 안전하게 데이터에 접근할 수 있다.

 

Removal이 완료된 후 작업된 사본을 원본에 반영해야 하는데 곧바로 반영하면 각 cpu에서 동시에 처리되고 있는 Reader들에 문제가 발생한다. 따라서 Removal 기간 동안 같은 자료에 접근한 Reader들의 처리가 모두 완료될 때까지 기다려야 하는데 이 구간을 GP라고 한다.

 

GP의 완료를 인지하면 사본을 원본에 반영하고 필요 없어진 자료는 폐기하는 Reclamation 구간이 시작된다. 이러한 처리들은 RCU의 후CB(Call-Back) 함수를 리스트에 보관해 두었다가 Reclamation 구간에서 호출하여 처리한다.

 

다음 그림을 보고 Writer에 의해 발생되는 3가지 구간을 확인한다. 아래의 Grace Period는 conceptual한 의미이고, Min Grace Period로 불린다. Removal 기간 중 관련된 Reader들의 끝 부분이 곧바로 인식되지 않으므로 Grace Period가 끝나는 지점을 감지(detection) 하는 방법을 사용한다.

 

 

Grace Period의 종료 감지

커널이 Min Grace Period가 완료됨을 곧바로 인식하지 못하므로 실제 GP가 완료된 것을 감지하는 것은 특수한 기법이 필요하다. Reader(read-side critical section) 구간에서 성능을 저하시키는 추가 작업을 하지 않는 것이 핵심이므로 이 구간이 완료됨을 알아내기 위해 모든 cpu들이 QS(Quiescent State) 상태를 지나간 경우 GP가 완료된 것으로 판정한다.

 

 

QS(Quiescent State)

QS(Quiescent State)는 Reader 간의 사이 시간 상태로 GP 구간 전 이미 진행되고 있었던 Reader의 구간이 끝난 것을 인지할 목적으로 사용한다. QS 상태가 패스되었는지 확인하는 방법은 다음과 같은 몇 가지 방법을 사용한다.

각 cpu에서 태스크와 태스크의 사이에 호출되는 스케줄 틱을 이용하여 이 인터럽트 구간을 이용하는 rcu_sched의 3가지 방법과, softirq를 사용하는 rcu_bh의 1가지 방법을 알아본다.  non-preemptable rcu에서는 context switch 진입만으로도 Q.S 상태가 지난 것으로 간주되지만 preemtable rcu에서는 nested 카운터도 읽어 0이어야 Q.S가 지난 것으로 간주한다.

    • rcu_sched
      • Context Switch
        • __schedule() 함수내에서 해당 cpu는 q.s 상태로 전환되어 해당 cpu에서 Reader의 완료를 보장한다.
      • 유저 모드 실행 후 스케줄 틱 진입
        • 커널에서 실행되는 Reader가 유저 태스크를 수행했었다는 것은 이미 context switch를 완료했다는 의미이므로 해당 cpu에서 Reader의 완료를 보장한다.
        • rcu_check_callbacks()
      • Dynticks or idle
        • 커널에서 실행되는 Reader가 Idle 태스크를 수행한다는 것은 이미 context switch를 완료했다는 의미이므로 해당 cpu에서 Reader의 완료를 보장한다.
    • rcu_bh
      • irq enable 상태이고 softirq 외부 코드

 

다음 그림과 같이 conceptual 수준에서의 QS는 Reader들 간의 시간으로 각 QS는 기존 Reader의 처리가 완료되었음을 보장하는 기간이다.

Candidate Quiescent State & Observed Quiescent State

다음 그림은 Min GP 이후 detect된 Q.S가 검정색 원으로 표시되었다. 각 cpu의 Q.S가 검출되면 실제 GP가 완료되었음을 알 수 있다.

 

조금 더 conceptual 수준을 구현 레벨로 약간 확장해보면 Write가 끝나면 GP가 시작하는데 이 때 각 cpu의 QS 상태를 모두 클리어한다. 그 후 모든 cpu의 QS가 한 번 이상 패스되면 GP가 끝나고 Reclamation이 진행된다.

 

다음 그림은 커널에서 실제 구현 레벨에서 QS 인지를 하는 과정을 보여준다. RCU Reader 이후 곧바로 cpu가 Quiescent State로 변경되는 것이 아니라 context switch 등이 일어났음을 알 수 있는 포인트에서 Quiescent State가 시작한다.

  • 구현 레벨이 상당히 복잡하므로 아래 그림만 보아서는 쉽게 이해가 가지 않을 수 있다.
  • cpu#0번의 경우를 먼저 살펴보자. conceptual 레벨과 다르게 실제 구현에서는 reader가 완료되자마자 q.s를 인식하지 못한다. 유저 태스크에 진입하면 q.s가 시작하지만 GP가 시작됨과 동시에 q.s가 모두 클리어되었었음을 기억해야 한다. 때문에 다른 태스크의 전환을 수행하는 context switch를 통해 q.s를 인지하게 된다.
  • cpu#3번을 보면 역시 reader가 끝난 후 유저 태스크에 진입하는 것으로 q.s가 시작됨을 알 수 있다. 이 또한 GP가 시작됨과 동시에 클리어되고 스케줄 틱이 발생하여 q.s를 인지하게된다.
  • cpu#5번의 경우 q.s를 인지하였고 모든 cpu의 q.s가 설정되었음을 알게되었다. 이 때 Reclamation 구간이 시작된다.

 

Force Quiescent State

GP가 시작된 후 각 cpu의 qs를 대기하는데 너무 오랫동안 gp가 끝나지 않으면 강제로 qs를 패스상태로 만든다.

 

Brute-Force RCU GP

synchronize_rcu() 함수를 호출하여 gp가 완료되어 동기화를 요청하면 gp가 완료할 때까지 기다린 후 콜백들을 처리한다. 만일 수천 개의 cpu가 가동되는 시스템에서 이러한 요청이 있으면 매우 오랜 시간을 대기해야 하는데 cpu를 그룹별로 묶어 파티셔닝하여 처리하는 구현 방법으로 빠른 처리를 사용하는 방법이 있다. 이 기능은 preemptible rcu 커널 옵션을 사용할 때 synchronize_rcu() 함수의 사용에 대한 급행처리를 할 수 있게하는 기능이다.

 

3가지 rcu_state

rcu는 스케줄 틱마다 콜백을 처리하기 위해 설계되었지만 현재는 최대 3개의 gp에서 별개로 관리하기 위해 다음과 같이 나누어 사용한다.

  • rcu_sched
    • 일반적인 rcu 콜백은 rcu_sched용 gp가 끝난 후에 처리된다.
  • rcu_bh
    • DDOS attack 상황에서 OOM(Out Of Memory)이 발생하는 것을 방지하기 위해 softirq를 사용하는 네트워크 처리 시 rcu_bh 상태를 추가하여 별도로 관리하는 gp가 끝난 후에 처리한다.
  • rcu_preempt
    • preemptible 커널에서 read side critical 섹션 내에서 preemption을 허용하는 방식을 지원하기 위해 별도록 관리하는 gp가 끝난 후에 처리한다.

 

 

RCU 콜백 처리

synchronize_rcu() & call_rcu()

  • RCU updater가 콜백 요청을 하는데 모든 RCU read-side critical sections들이 끝난 후에 즉, gp가 끝난 후에 콜백이 처리되기를 기다린다.
  • read-side critical section 사용시 규칙: 모든 read-side critical section 즉 rcu_read_lock() 과 rcu_read_unlock()사이에는 block or sleep되면 안된다. (SRCU는 가능하다)
  • 모든 CPU의 context switch가 완료되면 RCU read-side critical section period는 완전히 끝난것으로 보장할 수 있게 되므로 모든 read-side critical section들이 안전하게 끝난것을 의미한다.
  • 삭제될 old 데이터들은 다른 Reader가 참조하고 있는 동안(Grace Period) 삭제 보류된 채로 있다가 old data를 참조하는 마지막 Reader의 사용이 끝나면 Reclamation을 진행하게 된다.

 

 read-side critical section period 보장

  • qs는 위에서 언급하였듯이 context switch이외에도 user 모드 또는 idle 시 스케줄 틱이 호출된 경우도 qs가 패스된다.
  • 모든 cpu의 qs가 패스되면 gp가 완료되어 reclmation이 진행된다.
    • preemptible rcu 구현이 추가되어 조금 더 복잡해졌다.
    • 최대 두 번의 gp가 완료되어야 reclamation이  진행될 수도 있다.

 

 

RCU NO-CB

절전 기능을 위해 콜백들을 인터럽트 context에서 호출하지 않고 별도의 cpu에 전용 no-cb 처리용 커널 스레드를 생성하여 운영하는 방법이다.

 

기존 rwlock 구현 소스를 rcu 구현 소스로 변경 예제

struct el {
	struct list_head list;
	long key;
	spinlock_t mutex;
	int data;
};
spinlock_t 	listmutex;                
struct 	el 	head;

 

search()

int search(long key, int *result) 
{
	struct list_head *lp;
	struct el *p;
	read_lock();
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			*result = p->data;
			read_unlock();
			return 1;
		}
	}
	read_unlock();
	return 0;
}
int search(long key, int *result) 
{
	struct list_head *lp;
	struct el *p;
	rcu_read_lock();
	list_for_each_entry_rcu(p, head, lp) {
		if (p->key == key) {
			*result = p->data;
			rcu_read_unlock();
			return 1;
		}
	}
	rcu_read_unlock();
	return 0;
}

 

delete()

int delete(long key)
{
	struct el *p;
	write_lock(&listmutex);
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			list_del(&p->list);
			write_unlock(&listmutex);
			kfree(p);
			return 1;
		}
	}
	write_unlock(&listmutex);
	return 0;
}
int delete(long key)
{
	struct el *p;
	spin_lock(&listmutex);
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			list_del_rcu(&p->list);
			spin_unlock(&listmutex);
			synchronize_rcu();
			kfree(p);
			return 1;
		}
	}
	spin_unlock(&listmutex);
	return 0;
}

 

RCU API 목록

RCU list traversal:

  • list_entry_rcu
  • list_first_entry_rcu
  • list_next_rcu
  • list_for_each_entry_rcu
  • list_for_each_entry_continue_rcu
  • hlist_first_rcu
  • hlist_next_rcu
  • hlist_pprev_rcu
  • hlist_for_each_entry_rcu
  • hlist_for_each_entry_rcu_bh
  • hlist_for_each_entry_continue_rcu
  • hlist_for_each_entry_continue_rcu_bh
  • hlist_nulls_first_rcu
  • hlist_nulls_for_each_entry_rcu
  • hlist_bl_first_rcu
  • hlist_bl_for_each_entry_rcu

RCU pointer/list update:

  • rcu_assign_pointer
  • list_add_rcu
  • list_add_tail_rcu
  • list_del_rcu
  • list_replace_rcu
  • hlist_add_behind_rcu
  • hlist_add_before_rcu
  • hlist_add_head_rcu
  • hlist_del_rcu
  • hlist_del_init_rcu
  • hlist_replace_rcu
  • list_splice_init_rcu()
  • hlist_nulls_del_init_rcu
  • hlist_nulls_del_rcu
  • hlist_nulls_add_head_rcu
  • hlist_bl_add_head_rcu
  • hlist_bl_del_init_rcu
  • hlist_bl_del_rcu
  • hlist_bl_set_first_rcu

RCU:

  • rcu_read_lock
  • synchronize_net
  • rcu_barrier
  • rcu_read_unlock
  • synchronize_rcu
  • rcu_dereference
  • synchronize_rcu_expedited
  • rcu_read_lock_held
  • call_rcu
  • rcu_dereference_check
  • kfree_rcu
  • rcu_dereference_protected

bh:

  • rcu_read_lock_bh
  • call_rcu_bh
  • rcu_barrier_bh
  • rcu_read_unlock_bh
  • synchronize_rcu_bh
  • rcu_dereference_bh
  • synchronize_rcu_bh_expedited
  • rcu_dereference_bh_check
  • rcu_dereference_bh_protected
  • rcu_read_lock_bh_held

sched:

  • rcu_read_lock_sched
  • synchronize_sched
  • rcu_barrier_sched
  • rcu_read_unlock_sched
  • call_rcu_sched
  • synchronize_sched_expedited
  • rcu_read_lock_sched_notrace
  • rcu_read_unlock_sched_notrace
  • rcu_dereference_sched
  • rcu_dereference_sched_check
  • rcu_dereference_sched_protected
  • rcu_read_lock_sched_held

SRCU:

  • srcu_read_lock
  • synchronize_srcu
  • srcu_barrier
  • srcu_read_unlock
  • call_srcu
  • srcu_dereference
  • synchronize_srcu_expedited
  • srcu_dereference_check
  • srcu_read_lock_held
  • Initialization/cleanup
  • init_srcu_struct
  • cleanup_srcu_struct

All: lockdep-checked RCU-protected pointer access

  • rcu_access_pointer
  • rcu_dereference_raw
  • RCU_LOCKDEP_WARN
  • rcu_sleep_check
  • RCU_NONIDLE

 

소스 분석

rcu_dereference()

  • rcu_dereference()는 안전하게 참조(dereference)된 RCU-protected pointer 값을 얻어온다.
  • weakly ordered CPU(out of order execution)를 위해 메모리 접근에 대한 order를 적절히 관리해야 하는데 rcu_dereference()를 사용하면 이를 완벽히 수행할 수 있다.

rcu10

 

rcu_read_lock()

  • read-side critical section 시작

rcu11

 

rcu_assign_pointer()

  • RCU로 보호되는 포인터(RCU-protecte pointer)에 새로운 값을 할당하기 위해 사용

rcu12

 

RCU_INIT_POINTER() 매크로

  • RCU 포인터영역을 Sparse 체크하고 RCU 변수 v를 초기화한다.
#define RCU_INIT_POINTER(p, v) \
        do { \
                rcu_dereference_sparse(p, __rcu); \
                p = RCU_INITIALIZER(v); \
        } while (0)
  • sparse:
    • 리눅스 커널의 문제를 찾아주는 툴
    • 스파스는 정적 분석 도구이고, 설치된 후 gcc extension으로 동작
    • 지원 속성으로 noderef, address_space, lock(acquires, releases), …
  • __rcu:
    • __attribute__((noderef, address_space(4)))
    • noderef:
      • 포인터 변수를 사용하여 직접 참조할 수 없다.
      • & 연산자를 사용해서 직접 참조해야 한다
    • address_space:
      • 커널에는 몇개의 주소공간이 있다.
      • 0 : kernel, 1: user, 2: iomem, 3: percpu, 4: __rcu 공간

 

참고

 

BKL(Big Kernel Lock)

  • 커널 버전 2.0에서 SMP가 소개됨
  • BKL은 giant-lock, big-lock 또는 kernel-lock이라고 알려졌었다.
  • 2.0 커널에서는 한 번에 하나의 스레드만이 커널 모드에서 동작하기 위해 lock을 획득하여야 커널 모드로 진입이 되었고, 나머지 CPU는 lock을 획득하기 위해 대기하였다.
  • 성능 및 리얼 타임 application에 대한 latency 이슈로 BKL은 spin-lock, mutex, RCU등으로 대체되기 시작함.
  • 리눅스 초창기에 SMP를 위해 구현된 BKL은 커널 2.6에서 일부 VFS와 몇개의 file system에만 남아있고 거의 대부분 제거되었다.
  • 2011년 커널 2.6.39에서 마지막 BKL 구현이 제거되었다.

BKL Functions

  • lock_kernel(): Acquires the BKL
  • unlock_kernel(): Releases the BKL
  • kernel_locked(): Returns nonzero if the lock is held and zero otherwise (UP always returns nonzero)

Synchronization

  • BKL은 CPU가 동시에 커널에  진입을 하는 것을 막아 동기화 문제를 해결한다.

bkl

Spinlock

특징

  • critical section에 각 프로세스의 동시 접근을 못하게 금지 하는 수단.
    • 최근 구현에 따른 차이를 말하면 critical section에서는 어떠한 구현 방법도 동시 접근을 못하게 금지 한다.
      • preemption도 금지 시키고 cpu 끼리도 동시에 진입못하게 하여 동기화를 구현한다.
      • spinlock을 소유한 cpu는 critical section에서 preemption이 정지되면 스케쥴링이 일어나지 않아 스레드에 대한 문맥 교환이 일어나지 않는다
        • critical section 내에서 오랜 시간동안 머무르면 다른 스레드 역시 이 critical section 내부를 접근하지 못하고 대기되며 비효율적이다.
        • 따라서 최소한의 시간내에 사용을 마치고 lock을 풀어줘야 한다.
    • critical section 구간과는 다르게 lock을 얻지 못해 spin을 하는 구간에서는 preemption 여부가 구현 방법에 따라 다른데 최근 3가지 구현 방법은 다음과 같다.
      • UP 시스템에서는 preemption을 무조건 금지 시킨다.
        • 초창기 kernel preemption이 전혀 구현되지 않았을 때에는 UP 시스템에서는 당연히 아무것도 하지 않았었다.
      • SMP + LHP(Lock-holder Preemption) 방식에서는 spin 하는 동안에는 preemption을 막지 않는다.
      • SMP + ticket 방식에서는 preemption을 막는다. (최신 구현)
    • spin_lock() 확장 명령 중에는 인터럽트까지도 막는 명령도 있다.
      • spin_lock_irq() or spin_lock_irqsave()
  • lock을 획득하고 critical section에 진입이 가능해질 때까지 lock 카운터 변수를 확인하면서 루프를 돌며 lock이 풀릴 때까지 기다린다.
    • 예전 구현에서는 32비트 락 카운트 변수가 0이 될때까지 기다린다.
    • 현재는 ticket 방식을 도입하여(LHP 구현에서도) 32비트 락 카운터를 16비트씩 둘로 나누어 ticket.next(lock 카운터)와 ticket.owner(unlock 카운터)를 서로 증가시켜 둘이 동일한(unlock) 상태가 되는 때까지 기다린다.
  • hardware bus locking 사용
    • ARMv6부터 lock 대기시간 시 전력을 줄이고자 lock 대기 시 ARM 이벤트 명령(wfe)을 사용하여 이벤트를 기다린다.
    • unlock을 할 때에는 lock 카운터도 감소 시키면서 ARMv6 이상에서 이벤트 전송 명령(sev)을 보내 lock을 종료시킨다.
  • spinlock은 interrupt context 뿐 아니라 어디서나 사용할 수 있는 lock.
    • mutex, semaphore등 조건에 따라 sleep 상태로 전환(sleepable wait)될 수 있으므로 이러한 lock은 interrupt context에서는 사용할 수 없다.

 

 Spinning, busy-waitting, busy-looping

  • CPU가 쉬지 않고(Non-sleep-able) 특정 컨디션이 될 때 까지 루프를 도는 일을 spinning, busy-waiting 또는 busy-looping 이라 불린다.
    루프를 탈출 할 수 있는 컨디션은 주로 다른 CPU에서 전달(조작)하는 특정 변수 값(카운터나 플래그) 또는 시그널로 판단을 한다.
  • CPU가 Non-sleep 한다는 말은 다른 태스크로 전환되지 않도록 preemption 되지 않는다는 의미다.
  • UP와는 관련 없고 SMP와 관련이 있다.

 

Spinner

  • spinner를 굳이 표현하자면 위와 같이 쉬지 않고 도는actor(CPU)를 의미한다.
  • spinner가 spinning을 하는 동안 다른 CPU가 빠르게 시그널(카운터 값 등)을 설정하지 않으면 spinner는 그 시간만큼 계속 루프를 돌아야 한다.
  • 보통 일반 적인 루프들은 조건에 부합되지 않으면 기다리는 동안 자신을 sleep시켜 다른 태스크로 CPU 자원을 양보(yield)를 하는데 그러한 양보로 문맥교환(context switch)이 일어나는 비용이 비싸다고 판단되는 경우에 사용하는 기법이다. 다시 말하면 루프는 문맥교환(context switch)보다 더 효율적으로 또는 더 빠르게 루프를 빠져 나가게 설계된다.

 

Spin 탈출을 위한 컨디션

  • 사용 목적에 따라 preempt_disable() 뿐만 irq_disable()까지 사용할 수도 있다.
  • non-preemption spin lock에 대해서는 아래 그림처럼 spin 하는 동안 preemption 되는 것을 방지하기 위해 critical section의 앞뒤로 preempt_disable()과 preempt_enable() 명령어로 보호를 받고있다.

spinlock3

 

CONFIG_GENERIC_LOCKBREAK

  • 이 커널 옵션은 spin lock이 lock을 얻지 못한 상태에서 spinning 상태인지를 알아내기 위해 다음과 같이 구현되었다.
    • raw_spinlock 구조체에 break_lock이라는 변수를 추가하여 spinning 상태 여부를 나타낸다.
    • SMP + LHP 방식에 break_lock을 설정/해제 하도록 구현되었다.
    • 단점으로 int 하나면 구현되는 raw_spinlock 구조체가 두 배로 커지는 문제가 있다.
  • 현재 커널은 어떠한 아키텍처도 이 옵션을 사용하지 않는다.
    • SMP + ticket에 대한 루틴이 구현되면서 spin_is_contended()라는 함수가 만들어졌다.
      • lock이 spin중인지를 break_lock 변수 없이 ticket.owner와 ticket.next의 차이가 1을 초과하는 경우 spinning 상태인지 알아낼 수 있게 되었다.
    • 2013년 12월 마지막으로 사용했었던 arm64 아키텍처 코드에서도 삭제되면서 이제는 필요 없어진 옵션이다.
    • 참고:

 

Spinlock의 명명 체계

  • 2009 Kernel summit에서 결정된 사항으로 RT 커널을 지원하면서 spinlock이 preemption이 가능해졌다. 그러나 호환을 위해 기존 non-preemptible spinlock에 대한 명명이 필요해서 non-premptible lock을 raw_spin_lock이라는 이름을 사용하기로 하였다. 아울러 일부 preemption이 되면 안되는 spin_lock(raw_spin_lock으로 이전됨)을 제외하고 많은 spin_lock들이 preemption이 가능한 새로운 mutex 기반으로 이전되고 있다.
  • spinlock에 대한 명명은 다음 3단계로 이루어진다.
  • 1) spinlock:
    • the weakest one, which might sleep in RT
  • 2) raw_spinlock:
    • spinlock which always spins even on RT
    • raw_spin_lock -> _raw_spin_lock -> __raw_spin_lock 으로 구현되었다.
  • 3) arch_spinlock:
    • the hardware level architecture dependent implementation

 

3가지 구현 방법: UP vs LHP vs Ticket

  • UP인 경우 __LOCK()을 호출하여 preemption만 disable하고 lock 카운터만 증가시킨다.
  • LHP(Lock-Holder Preemption) 방식은 lock 획득이 안되면 획득이 가능해질 때까지  preempt_enable() 한다. 즉 대기 시간 동안 preemption 가능하다.
  • LHP는 실제로 critical section 구간에서 preemption되는 것은 아니고 lock이 spin 시 가능하다는 뜻이다. 즉 어떤 알고리즘에서 사용한 spin lock이 오랜 시간 spin wait 될 수 밖에 없을 때에 preemption이 가능하게 해주는 구조로 preemption latency를 줄일 수 있어 뛰어난 real time application을 지원하기 위해 효과적이다.
  • 최근 구현은 SMP + Ticket을 사용한다. (rpi2)

spinlock2b

spin_lock API

다음은 RT Linux용 spin lock과 일반 Linux용 spin lock이다.

1) RT Linux용 spin_lock()

  • preemption이 가능한 down_mutex를 호출함.
static inline void spin_lock(spinlock_t *lock)
static void __spin_lock(spinlock_t *lock, unsigned long eip)
{
	SAVE_BKL(_down_mutex(&lock->lock, eip));
}

2) 일반 Linux 용 spin_lock()

  • 아직 리눅스 mainline은 PREEMPT_RT가 지원되지 않으므로 spin_lock()은 preemption이 되지 않는 기존의 코드가 옮겨간 raw_spin_lock()을 호출한다.
    • raw_spin_lock() 내부에서 preempt_disable()을 동작시키므로 결국 spinlock은 preemption 되지 않음을 알 수 있다.

includelinux/spinlock.h

/*
 * Define the various spin_lock methods.  Note we define these
 * regardless of whether CONFIG_SMP or CONFIG_PREEMPT are set. The
 * various methods are defined as nops in the case they are not
 * required.
 */
#define raw_spin_lock(lock)     _raw_spin_lock(lock)

 

raw_spin_lock API

  • raw_spin_lock()은 preemption이 되지 않는 것으로 규정되어 있다.
  • raw_spin_lock()의 명명 체계는 다음과 같다.
    • raw_spin_lock() -> _raw_spin_lock() -> __raw_spin_lock()
  • 3가지 구현 방법은 다음과 같다.

 

1) raw_spin_lock() – (UP)

include/linux/spinlock_api_up.h

#define _raw_spin_lock(lock)                    __LOCK(lock)
  • UP 시스템에서는 단순히 __LOCK()을 호출한다.

 

#define __LOCK(lock) \
  do { preempt_disable(); ___LOCK(lock); } while (0)
  • preempt_disable() 한 후 ___LOCK() 함수에서는 Sparse 정적 코드 분석 툴을 위한 매크로를 호출한다.

 

/*
 * In the UP-nondebug case there's no real locking going on, so the
 * only thing we have to do is to keep the preempt counts and irq
 * flags straight, to suppress compiler warnings of unused lock
 * variables, and to add the proper checker annotations:
 */
#define ___LOCK(lock) \
  do { __acquire(lock); (void)(lock); } while (0)
  • __acquire() 매크로는 Sparse 정적 코드 분석툴을 사용하여 lock에 대한 적절한 체크를 수행한다.
    • define __acquire(x)   __context__(x,1)

 

2) raw_spin_lock() – (LHP)

  • 아래 그림과 같이 critical section에서는 preemption이 불가능하지만 루프를 돌며 spin하는 동안은 preemption이 가능한 구조이다.
  • LHP에서 ticket 카운터인 ticket.next와 ticket.owner 필드는 여기에서도 사용한다. 다만 공평하게 먼저 진입한 lock owner를 판별하는 기능은 사용하지 않는다.

lhp-1

kernel/locking/spinlock.c

#define BUILD_LOCK_OPS(op, locktype)                                    \
void __lockfunc __raw_##op##_lock(locktype##_t *lock)                   \
{                                                                       \
        for (;;) {                                                      \
                preempt_disable();                                      \
                if (likely(do_raw_##op##_trylock(lock)))                \
                        break;                                          \
                preempt_enable();                                       \
                                                                        \
                if (!(lock)->break_lock)                                \
                        (lock)->break_lock = 1;                         \
                while (!raw_##op##_can_lock(lock) && (lock)->break_lock)\
                        arch_##op##_relax(&lock->raw_lock);             \
        }                                                               \
        (lock)->break_lock = 0;                                         \
}                                                                       \
                                                                        \
unsigned long __lockfunc __raw_##op##_lock_irqsave(locktype##_t *lock)  \
{                                                                       \
        unsigned long flags;                                            \
                                                                        \
        for (;;) {                                                      \
                preempt_disable();                                      \
                local_irq_save(flags);                                  \
                if (likely(do_raw_##op##_trylock(lock)))                \
                        break;                                          \
                local_irq_restore(flags);                               \
                preempt_enable();                                       \
                                                                        \
                if (!(lock)->break_lock)                                \
                        (lock)->break_lock = 1;                         \
                while (!raw_##op##_can_lock(lock) && (lock)->break_lock)\
                        arch_##op##_relax(&lock->raw_lock);             \
        }                                                               \
        (lock)->break_lock = 0;                                         \
        return flags;                                                   \
}                                                                       \
                                                                        \
void __lockfunc __raw_##op##_lock_irq(locktype##_t *lock)               \
{                                                                       \
        _raw_##op##_lock_irqsave(lock);                                 \
}                                                                       \
                                                                        \
void __lockfunc __raw_##op##_lock_bh(locktype##_t *lock)                \
{                                                                       \
        unsigned long flags;                                            \
                                                                        \
        /*                                                      */      \
        /* Careful: we must exclude softirqs too, hence the     */      \
        /* irq-disabling. We use the generic preemption-aware   */      \
        /* function:                                            */      \
        /**/                                                            \
        flags = _raw_##op##_lock_irqsave(lock);                         \
        local_bh_disable();                                             \
        local_irq_restore(flags);                                       \
}                                                                       \
BUILD_LOCK_OPS(spin, raw_spinlock);
BUILD_LOCK_OPS(read, rwlock);
BUILD_LOCK_OPS(write, rwlock);
  • 위의 마지막 3줄 BUILD_LOCK_OPS() 매크로를 사용하여 다음 12개의 함수들이 만들어진다.
    • __raw_spin_lock()
    • __raw_spin_lock_irqsave()
    • __raw_spin_lock_irq()
    • __raw_spin_lock_bh()
    • __raw_read_lock()
    • __raw_read_lock_irqsave()
    • __raw_read_lock_irq()
    • __raw_read_lock_bh()
    • __raw_write_lock()
    • __raw_write_lock_irqsave()
    • __raw_write_lock_irq()
    • __raw_write_lock_bh()
  • likely(do_raw_##op##_trylock(lock)))
    • 높은 확률로 do_raw_spin_trylock() 함수가 true가 될 수 있다고 판단하여 likely 함수를 사용하였다.
    • 성능이 요구되는 함수들은 캐시와 관련되어 여러 가지 optimization 방법을 사용한다.
  • if (!(lock)->break_lock)
    • spin하는 동안은 항상 1로 설정된다.
    • raw_spin_in_contended() 함수를 통해 spin 중인지 알아내기 위한 플래그로 사용된다.
  • !raw_spin_can_lock()
    • rock이 이미 걸려 있는 경우 루프를 돈다.
    • !raw_spin_is_locked() 매크로 호출
      • arch_spin_is_locked() 호출
        • !arch_spin_value_unlocked() 호출
          • lock.tickets.owner == lock.tickets.next를 비교 (unlock 상태인지)
  • arch_spin_relax()
    • cpu_relax() 매크로 호출
      • barrier() 매크로 호출
  • break_lock = 0
    • spin에서 빠져나왔으므로 spin 하지 않는다고 플래그를 설정한다.

 

raw_spin_can_lock()

/**
 * raw_spin_can_lock - would raw_spin_trylock() succeed?
 * @lock: the spinlock in question.
 */
#define raw_spin_can_lock(lock) (!raw_spin_is_locked(lock))
  • lock을 얻을 수 있는지 여부를 판단한다.
#define raw_spin_is_locked(lock)        arch_spin_is_locked(&(lock)->raw_lock)
  • raw_ -> arch_ 명명 형태로 마지막 아키텍처 쪽에서 lock이 걸려 있는지 여부를 판단을 한다.
static inline int arch_spin_is_locked(arch_spinlock_t *lock)
{
        return !arch_spin_value_unlocked(READ_ONCE(*lock));
}
  • unlock 상태를 판별하여 반대로 리턴한다.
  • READ_ONCE()
    • 인수의 사이즈에 따라 volatile 방식으로 읽어온다.
static inline int arch_spin_value_unlocked(arch_spinlock_t lock)
{
        return lock.tickets.owner == lock.tickets.next;
}
  • 락 카운터인 ticket.owner와 ticket.next가 같은 경우가 unlock 상태이다.
    • 기존 spinlock 구현 방식에서는 lock과 unlock시 lock 변수의 증/감 상태로 lock/unlock 상태를 알았었는데 ticket based spinlock이 구현되면서 lock/unlock 상태 여부는 tickets.owner와 tickets.next 값의 동일 여부로 확인할 수 있게 바뀌었다.

 

3) raw_spin_lock() – (Ticket)

  • Ticket 기능을 구현하여 다음과 같은 장점을 갖게되었다.
    • 공정성
      • 초기 spin lock은 lock 획득 순서가 공정하지 않았었는데 커널 2.6.25 부터 ticket을 부여받아 차례 대로 획득 가능해졌다.
    • cache bouncing 문제 제거
      • cache coherent 기능에 의해 두 개 이상의 CPU가 lock을 획득하기 위해 spin 하는 동안 strex 명령을 반복하여 사용하므로 spin 하는 CPU들에서 cache line의 로드와 invalidate(강제적인 eviction)를 반복하면서 성능이 저하된다. 이를 막기 위해 lock 값을 둘로 나누어 둘 값을 비교하면서 자기 차례가 아닌 경우에는 write 즉 strex 동작을 하지 않도록 하여 이 문제를 해결하였다.
      • cache bouncing 문제도 심각하게 lock contention을 야기하고 lock contention은 성능을 떨어뜨리는 큰 원인이된다.

spinlock

include/linux/spinlock_api_smp.h

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
        preempt_disable();
        spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
        LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
  • spin_acquire()
    • Lockdep 디버깅용 코드
  • LOCK_CONTENDED()
    • 실제 spin lock을 얻기 위해 spin 한다.
    • 내부에서는 do_raw_spin_try_lock() 함수를 먼저 이용해보고 안되면 do_raw_spin_lock() 함수를 호출하여 spin한다.
static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
        local_irq_disable();
        preempt_disable();
        spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
        LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}
  • __raw_spin_lock()과 같으나 local_irq_disable()이 먼저 동작한다.
static inline unsigned long __raw_spin_lock_irqsave(raw_spinlock_t *lock)
{
        unsigned long flags;

        local_irq_save(flags);
        preempt_disable();
        spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
        /*
         * On lockdep we dont want the hand-coded irq-enable of
         * do_raw_spin_lock_flags() code, because lockdep assumes
         * that interrupts are not re-enabled during lock-acquire:
         */
#ifdef CONFIG_LOCKDEP
        LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
#else
        do_raw_spin_lock_flags(lock, &flags);
#endif
        return flags;
}
  • __raw_spin_lock_irq()가 irq를 disable하는 것에 반해 이 함수에서는 irq 상태를 복구하기 위해 먼저 상태를 저장하고 irq를 disable한다.
static inline void __raw_spin_unlock_irqrestore(raw_spinlock_t *lock,
                                            unsigned long flags)
{
        spin_release(&lock->dep_map, 1, _RET_IP_);
        do_raw_spin_unlock(lock);
        local_irq_restore(flags);
        preempt_enable();
}
  • spin lock이 해제된 후 __raw_spin_unlock_irqsave() 함수에서 저장한 플래그를 사용하여 local irq 상태를 복구한다.
static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
        spin_release(&lock->dep_map, 1, _RET_IP_);
        do_raw_spin_unlock(lock);
        preempt_enable();
}

arch_spinlock API

arch_spin_lock()

  • 이 함수는 SMP + Ticket 방식에서 사용한다.

arch/arm/include/asm/spinlock.h

static inline void arch_spin_lock(arch_spinlock_t *lock)
{
        unsigned long tmp;
        u32 newval;
        arch_spinlock_t lockval;
        
        prefetchw(&lock->slock);
        __asm__ __volatile__(
"1:     ldrex   %0, [%3]\n"
"       add     %1, %0, %4\n"
"       strex   %2, %1, [%3]\n"
"       teq     %2, #0\n"
"       bne     1b"
        : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
        : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
        : "cc"); 

        while (lockval.tickets.next != lockval.tickets.owner) { 
                wfe(); 
                lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
        }

        smp_mb();
}
  • prefetchw
    • pldw 명령을 호출하여 해당 lock 변수를 미리 캐시에 로드한다.
    • 이렇게 미리 로드를 하는 이유는 ldrex 부터 strex 까지의 atomic operation 격으로 동작하는 critical section 영역의 코드를 동작시키는 동안 cpu clock을 적게 소모하게 하여 확률적으로 strex의 실패가 적어지게 유도한다
  • 어셈블리 문장을 좀 더 로직화하여 편하게 보기위해 바꿔보았다.
    • prefetch  &lock->slock
    • do {
    •         lockval = [&lock->slock]
    •         newval = lockval.next + 1
    •        [&lock->slock] = newval (strex에 대한 결과값은 tmp에 저장)
    • while (tmp)
  • 어셈블리 문장은 atomic operation으로 lock->tickets.next++ 를 수행한것이다.
  • while()
    • 값을 증가시키기 전의 lockval.tickets.next와 lockval.tickets.owner가 다른 경우는 이 루틴을 들어오기 전에 이미 lock이 걸려 있었다는 경우로 루프를 돌며 대기 상태로 빠진다.
    • 대기 상태를 빠지는 방법은 다른 CPU에서 arch_spin_unlock()을 호출할 때 sev 명령을 수행하는데 이 이벤트를 수신하여 wfe(wait for event) 함수를 탈출한다.
    • wfe()를 탈출한 후에 lockval.tickets.owner를 갱신 받아 다시 while()문의 조건이 부합될 때까지 루프를 돌며 기다린다.
      • 자기 순번이 올 때까지 (다른 CPU에서 arch_spin_unlock()을 호출할 때 owner 값을 증가시켜 내가 가진 next 값과 동일할 때까지) 루프를 탈출할 수 없다.

arch_spin_trylock()

  • 이 함수는 SMP 시스템에서 Ticket 방식과 LHP 방식의 구현에서 모두에서 사용된다.
static inline int arch_spin_trylock(arch_spinlock_t *lock)
{
        unsigned long contended, res;
        u32 slock;

        prefetchw(&lock->slock);
        do {
                __asm__ __volatile__(
                "       ldrex   %0, [%3]\n"
                "       mov     %2, #0\n"
                "       subs    %1, %0, %0, ror #16\n"
                "       addeq   %0, %0, %4\n"
                "       strexeq %2, %0, [%3]"
                : "=&r" (slock), "=&r" (contended), "=&r" (res)
                : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
                : "cc");
        } while (res);

        if (!contended) {
                smp_mb();
                return 1;
        } else {
                return 0;
        }
}
  • 어셈블러 문장을 좀 더 로직화하여 편하게 보기위해 바꿔보았다.
    • prefetch  &lock->slock
    • do {
    •         slock =  [&lock->slock]
    •         res   = #0
    •         contended = (tickets.owner != tickets.next)
    •         if (contended == 0)  {
    •                 tickets.next++
    •                 [&lock->slock] = slock  (결과값은 res에 저장)
    •         }
    • } while (res)
  • do while()
    • strex의 실패가 발생 시 다시 재시도하여 atomic operation을 동작
  • if (contended == 0)
    • tickets.next 와 tickets.owner가 같으면 아무도 spin_lock을 획득하지 않은 상태(contended == 0)이므로 성공리에 spin_lock을 획득하는 조건이된다.
  • spin lock 획득이 성공하면 tickets.next를 1 증가시킨 후 lock 변수에 저장한다.
  • res가 0이 아닌 경우는 strex 명령으로 저장을 시도 했을 때 실패한 경우이므로 atomic operation을 완료하기 위해 다시 재시도한다.
  • tickets.owner와 tickets.next의 증감 규칙
    • spin_lock을 누군가 획득하는 경우 tickets.next가 1 증가된다.
    • spin_lock을 해제하는 경우 tickets.owner를 1 증가한다.
  • tickets 비트 규칙
    • next: msb 16bits – lock에서 증가(Asm에서 사용), overflow 시 onwer에 영향 없음)
    • owner: lsb 16bits – unlock에서 증가(C에서 사용)

arch_spin_unlock()

static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
        smp_mb();
        lock->tickets.owner++;
        dsb_sev();
}
  • lock->tickets.owner++
    • 여기서 락 변수를 atomic operation을 이용하지 않고 대범(?)하게 증가 시킨 이유
      • 락 획득 시에는 CPU들 끼리 경쟁을 하므로 atomic inc가 중요하지만 락을 헤제 시에는 락 오너만이 해제하므로(경쟁을 하지 않음) atomic inc를 해야 할 이유가 없다.
  • dsb_sev()
    • SMP 머신에서만 유효한 명령으로 spin 되고 있는 다른 CPU에 시그널을 보내어 wfe(wait for event) 상태에서 빠져나오게 한다.

 

Ticket based spinlock 에서 ticket 값 추적

  • 3개의 CPU에서 중첩이 되어 2 개의 CPU에서 spinning을 하는 과정에서 lock 값이 변화되는 것을 보였다.
    • global lock 변수는 메모리에 위치한 lock 값
    • local lock 변수는 arch_spin_lock() 루틴에서 임시로 lock을 획득할 때까지만 사용하는 레지스터이다.
  • 1) 초기 lock 값을 next와 owner 100부터 시작하였다.
    • 100 번 spin_lock()과 spin_unlock()을 반복한 것과 동일하다.
  • 2) CPU-A가 lock이 없는 상태에서 lock 획득을 시도한다. 이 때 성공하면 ticket.next를 증가시키고 critical section에 진입한다.
  • 3) CPU-B가 arch_spin_try_lock()을 시도했다가 실패 한 후 arch_spin_lock()에 진입하여 local lock 변수에 자기 순번을 의미하는 global lock 변수의 ticket.next(101) 값을 받아오고 global lock 변수의 tickets.next는 102로 증가시킨 후 spinning(wfe를 포함하여)한다.
  • 4) CPU-C도 arch_spin_try_lock()을 시도했다가 실패 한 후 arch_spin_lock()에 진입하여 local lock 변수에 자기 순번을 의미하는 global lock 변수의 ticket.next(102) 값을 받아오고 global lock 변수의 tickets.next는 103으로 증가시킨 후 spinning(wfe를 포함하여)한다.
  • 5) CPU-A가 unlock하면서 global tickets.owner를 101로 증가시키고 sev를 호출한다. 이 때 CPU-B는 sev 명령에 의해 wfe 명령에서 깨어나고 global tickets.owner(101)가 자기 순번인 local tickets.next(101)가 동일하기 때문에 spin 루프를 빠져나가면서 critical section에 진입하게 된다.
  • 6) CPU-B도 unlock하면서 global tickets.owner를 102로 증가시키고 sev를 호출한다. 이 때 CPU-C는 sev 명령에 의해 wfe 명령에서 깨어나고 global tickets.owner(102)가 자기 순번인 local tickets.next(102)가 동일하기 때문에 spin 루프를 빠져나가면서 critical section에 진입하게 된다.
  • 7) CPU-C가 unlock하면서 global tickets.owner를 103으로 증가시키고  sev를 호출한다. 하지만 wfe에서 대기하고 있는 CPU가 없어서 깨어날 CPU가 없어서 무시된다.
    • tickets.next와 tickets.owner는 동일하게 103인 상태가 되며 이는 unlock 상태임을 의미한다.
  • 녹색 박스는 critical section을 의미하며 CPU-A, B, C 간에 서로 중첩되지 않음을 확인할 수 있다.

 

Lock 디버깅을 위한 Lockdep 코드 관련

lock_acquire() & lock_release()

kernel/locking/lockdep.c

void lock_acquire(struct lockdep_map *lock, unsigned int subclass,
                          int trylock, int read, int check,
                          struct lockdep_map *nest_lock, unsigned long ip)
{
        unsigned long flags;

        if (unlikely(current->lockdep_recursion))
                return;

        raw_local_irq_save(flags);
        check_flags(flags);

        current->lockdep_recursion = 1;
        trace_lock_acquire(lock, subclass, trylock, read, check, nest_lock, ip);
        __lock_acquire(lock, subclass, trylock, read, check,
                       irqs_disabled_flags(flags), nest_lock, ip, 0);
        current->lockdep_recursion = 0;
        raw_local_irq_restore(flags);
}
EXPORT_SYMBOL_GPL(lock_acquire);

void lock_release(struct lockdep_map *lock, int nested,
                          unsigned long ip)
{
        unsigned long flags;

        if (unlikely(current->lockdep_recursion))
                return;

        raw_local_irq_save(flags);
        check_flags(flags);
        current->lockdep_recursion = 1;
        trace_lock_release(lock, ip);
        __lock_release(lock, nested, ip);
        current->lockdep_recursion = 0;
        raw_local_irq_restore(flags);
}

spin_lock 사용 시 interrupt

  • dead-lock 이외에도 lock을 소유한 상태에서 interrupt 되고 동일한 lock을 호출하는 루틴으로 들어가는 경우 spin되어 빠져나오지 못하게 된다. 따라서 spinlock을 사용 시에는 최우선적으로 local cpu에 대해 interrupt를 disable할 수 있는 함수가 유용하다.
    • .raw_spin_lock_irq()
    • .raw_spin_unlock_irq()
    • .raw_spin_lock_irqsave()
    • .raw_spin_unlock_irqrestore()

 

기타 매크로

define 문

#define WFE(cond)   __ALT_SMP_ASM("wfe" cond, "nop")
#define SEV     __ALT_SMP_ASM(WASM(sev), WASM(nop))

 

#define isb(option) __asm__ __volatile__ ("isb " #option : : : "memory")
#define dsb(option) __asm__ __volatile__ ("dsb " #option : : : "memory")
#define dmb(option) __asm__ __volatile__ ("dmb " #option : : : "memory")

 

#define __ALT_SMP_ASM(smp, up)                      	\
    "9998:  " smp "\n"                      			\
    "   .pushsection \".alt.smp.init\", \"a\"\n"      \
    "   .long   9998b\n"                    			\
    "   " up "\n"                       			\
    "   .popsection\n"

 

#define __lockfunc __attribute__((section(".spinlock.text")))

 

ACCESS_ONCE()

/*
 * Prevent the compiler from merging or refetching accesses.  The compiler
 * is also forbidden from reordering successive instances of ACCESS_ONCE(),
 * but only when the compiler is aware of some particular ordering.  One way
 * to make the compiler aware of ordering is to put the two invocations of
 * ACCESS_ONCE() in different C statements.
 *
 * ACCESS_ONCE will only work on scalar types. For union types, ACCESS_ONCE
 * on a union member will work as long as the size of the member matches the
 * size of the union and the size is smaller than word size.
 *
 * The major use cases of ACCESS_ONCE used to be (1) Mediating communication
 * between process-level code and irq/NMI handlers, all running on the same CPU, * and (2) Ensuring that the compiler does not  fold, spindle, or otherwise
 * mutilate accesses that either do not require ordering or that interact
 * with an explicit memory barrier or atomic instruction that provides the
 * required ordering.
 *
 * If possible use READ_ONCE/ASSIGN_ONCE instead.
 */
#define __ACCESS_ONCE(x) ({ \
         __maybe_unused typeof(x) __var = (__force typeof(x)) 0; \
        (volatile typeof(x) *)&(x); })
#define ACCESS_ONCE(x) (*__ACCESS_ONCE(x))
  • volatile을 사용하여 인수로 지정된 주소를 직접 읽어들이게 컴파일러에게 optimization을 하지 않게 지시한다.
    • compiler optimization이 동작하는 경우 해당 주소를 레지스터로 한 번 읽어 온 후 다시 반복하여 읽어 들일 때 값에 변동이 없다고 판단하는 경우 해당 주소(메모리 주소 또는 I/O 주소)를 access하는 과정을 생략하고 기존에 한 번 불러와서 사용했던 레지스터를 사용하게 되면서 문제가 발생될 수 있는 소지가 있을 때에 사용된다.
  • non-scalar type이 들어오면 gcc의 volatile에서 문제가 발생하여 아예 non-sclar type이 들어오는 경우 error가 발생하게 루틴을 추가하였다.
    • non-sclar type(struct, union 등)을 사용해야 하는 경우 READ_ONCE() 또는 ASSIGN_ONCE()등을 사용한다.
    • 참고: ACCESS_ONCE() and compiler bugs

 

구조체 타입

spinlock_t

  • lock 디버깅용 커널이 아니면  raw_spinlock 하나만 사용한다.
typedef struct spinlock {
        union {
                struct raw_spinlock rlock;

#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
                struct {
                        u8 __padding[LOCK_PADSIZE];
                        struct lockdep_map dep_map;
                };
#endif
        };
} spinlock_t;

 

raw_spinlock_t

  • CONFIG_GENERIC_LOCKBREAK
    • 정의하는 아키텍처가 있고 그렇지 않은 아키텍처가 있으나 ARM 아키텍처에서는 사용되지 않는다.
  • 디버그 용도이외에는 arch_spinlock_t 하나만 사용한다.
typedef struct raw_spinlock {
        arch_spinlock_t raw_lock;

#ifdef CONFIG_GENERIC_LOCKBREAK
        unsigned int break_lock;
#endif
#ifdef CONFIG_DEBUG_SPINLOCK
        unsigned int magic, owner_cpu;
        void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map dep_map;
#endif
} raw_spinlock_t;

 

arch_spin_lock_t

  • 32bit slock과 tickets.next(msb-16bits)  + tickets.owner(lsb-16bits)가 union으로 묶여 있다.
  • 기존 slock에 대해 ticket을 구현하기 위해 slock을 둘로 나누어 사용하였다.
    • next: lock 획득 시 증가
    • owner: unlock 시 증가
    • lock을 여러 CPU가 요청한 경우 각 CPU들은 자신의 lock 값(lock 획득 당시의 ticket.next 증가 전 값)이 다른데 owner 값과 비교하여 같은 경우 자기 차례가 되어 lock을 획득 할 수 있다.
typedef struct {
        union {
                u32 slock;
                struct __raw_tickets {
#ifdef __ARMEB__
                        u16 next;
                        u16 owner;
#else
                        u16 owner;
                        u16 next;
#endif
                } tickets;
        };
} arch_spinlock_t;

 

참고