RCU(Read Copy Update) -1- (Basic)

<kernel v5.4>

RCU 기초

RCU History

  • RCU는 읽기 동작에서 블러킹 되지 않는 read/write 동기화 메커니즘
  • 2002년 커널 버전 2.5.43에서 소개됨
  • 2005년 PREEMPT_RCU가 추가됨
  • 2009년 user-level RCU도 소개됨

 

장/단점

RCU는 read-side overhead를 최소화하는데 목적이 있기 때문에 동기화 로직이 읽기 동작에 더 많은 비율로 사용되는 경우에만 사용한다. 수정(Update) 동작이 10%이상인 경우 오히려 성능이 떨어지므로 RCU 대신 다른 동기화 기법을 선택해야 한다.

  • 장점
    • read side overhead가 거의 없음. zero wait, zero overhead
      • rcu non-preempt 커널 모델인 경우 zero wait 이지만, rcu preemptible 커널 모델의 경우 태스크 구조체의 rcu_read_lock_nesting 카운터를 증감시켜 사용하므로 zero wait은 아니고 그래도 빠른 편이다.
    • deadlock 이슈 없음
    • priority inversion 이슈 없음 (priority inversion & priority inheritance)
    • unbounded latency 이슈 없음
    • 메모리 leak hazard 이슈 없음
  • 단점
    • 사용이 약간 복잡
    • 쓰기 동작에서는 다른 동기화 기법보다 조금 더 느리다.

 

RCU 구현 옵션

  • Classical RCU – a.k.a tiny RCU
    • CONFIG_TINY_RCU 커널 옵션
    • single 데이터 스트럭처
    • CPU가 많아지는 경우 성능 떨어짐
      • 현재 커널이 SMP가 아니고 non-preemptible로 빌드된 경우 디폴트로 선택되는 RCU 옵션이다.
  • Hierarchical RCU – a.k.a tree RCU
    • CONFIG_TREE_RCU 커널 옵션
    • tree 확장된 RCU 구현
    • 현재 커널이 SMP이고 non-preempt-able로 빌드된 경우 디폴트로 선택되는 RCU 옵션이다.
  • Preemptible tree-based hierarchical RCU
    • tree 확장된 RCU 구현이면서 preemptible(CONFIG_PREEMPT_RCU 커널 옵션)을 사용하는 최근 linux kernel의 기본 RCU이다. (코드 분석에서 이 모델을 우선)
    • read-side critical section에서 preemption이 지원된다. (주의: 그렇다고 슬립 api를 사용하면 안된다)
  • SRCU
    • CONFIG_SRCU 커널 옵션을 추가하고 RCU API 대신 별도의 SRCU API를 사용하는 경우 read-side critical section에서 sleep api를 사용할 수 있다.
      • rcu_read_lock() -> srcu_read_lock()
  • URCU
    • Userspace RCU (liburcu)

 

 RCU 구현 옵션 선택

RCU를 선택하여 사용할 때 커널 옵션은 일반적으로 다음과 같이 나누어 선택할 수 있다.

  • CONFIG_TINY_RCU
    • 조건: !PREEMPTION && !SMP
    • footprint가 작은 시스템에서 운영할 때 적절하다.
  • CONFIG_TREE_RCU
    • 조건: !PREEMPTION && SMP
    • SMP 시스템이고 서버로 운영할 때 적절하다.
  • CONFIG_PREEMPT_RCU (기존 CONFIG_TREE_PREEMPT_RCU)
    • 조건: PREEMPTION
    • 빠른 응답을 요구하는 임베디드 시스템에서 운영할 때 적절하다.
    • 커널 v5.6-rc1 부터 이 옵션이 없이 위의 CONFIG_TREE_RCU 만으로 동작하게 변경하였다.

 

추가적으로 절전을 요구하는 시스템에서 다음과 같은 설정을 사용한다.

  • CONFIG_RCU_FAST_NO_HZ=y
  • CONFIG_RCU_NOCB_CPU=y

 

더 자세한 커널 파라메터 설정은 다음 문서를 참고한다.

 

RCU 성능

reader-side에서의 성능 비교 (rwlock vs RCU)

rcu2

(통계: lwn.net 참고)

 

리눅스 커널에서의 사용율 증가

rcu3

 

일반 lock 획득시의 나쁜 성능

  • lock 획득하는데 소요하는 자원이 critical section을 수행하는 것에 비해 수백배 이상 over-head가 발생한다.

rcu4

 

RCU  기본 요소

RCU는 다음과 같이 3가지의 기본 요소와 특징이 있다.

  • Reader
    • rcu_read_lock()과 rcu_read_unlock() 코드 범위의 Read-side critical section이다.
    • 접근에 대한 주의
      • 가장 주의 할 것으로 보호 받아야 할 데이터(protected rcu data)에 접근할 때 항상 이 영역내에서만 접근하여야 한다.
      • 이 영역을 벗어난 사용은 RCU 규칙을 벗어나는 것이며 존재하지 않는 자료 또는 잘못된 데이터로의 접근이될 수 있다.
      • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 한다.
        • rcu_dereference() 매크로 함수를 사용하면 간단하다.
    • 구현에 따른 차이
      • non-preemptible RCU 커널을 사용하는 경우에는 read_rcu_lock() 내부에서 preemption을 금지하는 코드만 적용된다. sleep하는 blocked api를 사용하면 안되고 가급적 빨리 빠져나와야 한다.
      • preemptible RCU 커널을 사용하는 경우에는 read_rcu_lock() 내부에서 로컬 카운터를 증가시키고 read_rcu_unlock() 내부에서 로컬 카운터를 감소시킨다. read-side critical section 내에서 preemption될 수 있다. 만일 SRCU 커널 옵션도 사용하는 경우 sleep 가능한 blocked api의 사용도 가능하다.
  • Updater
    • 기존의 여러 가지 락 중 하나를 사용하여 데이터를 보호하고 수정한다.
    • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 한다.
      • rcu_dereference() 및 rcu_assign_pointer() 매크로 함수를 사용하면 간단하다.
    • 보호 받을 자료를 복사하여 수정한 후 기존 자료의 회수(free)를 위해 동기 함수인 synchronize_rcu() 또는 비동기 함수인 call_rcu() 함수를 호출하여 아래의 reclaimer를 동작하게 하다.
    • 이 구성요소에서 Read, Copy, Update 과정을 사용한다.
  • Reclaimer
    • Update에서 최종 수정한 객체가 아닌 사용이 완료된 객체들에 대해 cleanup한다.
    • 안전하게 cleanup되도록 삭제할 데이터에 접근하는 Reader 들이 없음을 보장하기 위해 Grace period가 지난 후에 Reclaimer가 동작한다.

rcu1


RCU 기본 API

RCU에는 5개의 기본 API가 사용된다.

  • rcu_read_lock()
    • read side critical section의 시작 부분에 사용되어야 한다.
  • rcu_read_unlock()
    • read side critical section의 끝 부분에 사용되어야 한다.
  • rcu_assign_pointer()
    • 일반 object 포인터를 rcu 보호 포인터에 지정할 때 사용한다.
    • 내부 구현에서 일반 object 포인터와 rcu 보호 포인터 사이에 메모리 베리어가 사용된다.
  • rcu_dereference()
    • rcu 보호 포인터로부터 일반 object 포인터를 알아올 때 사용한다.
    • 내부 구현에서 rcu 보호 포인터와 일반 object 포인터 사이에 메모리 베리어가 사용된다.
  • synchronize_rcu() & call_rcu()
    • synchronize_rcu()
      • sync 방식으로 GP(Grace Period)가 끝나기를 기다린다.
    • call_rcu()
      • async 방식으로 GP(Grace Period)가 끝나면 호출되도록 함수를 지정하여 사용한다.

 

RCU 기본 API만을 사용한 단순 접근 방법

Reader: RCU node를 사용할 때

  • Read side critical section의 시작은 rcu_read_lock()으로 시작한다.
    • non-preemptible RCU 커널을 사용하는 경우에는 이 구간에서 preemption을 금지시킨다.
      • 이 영역에서는 sleep하는 blocked api를 사용하면 안되고 가급적 빨리 빠져나와야 한다.
    • preemptible RCU 커널을 사용하는 경우에는 로컬 카운터를 증가시킨다.
      • 이 영역에서도 preemption이 가능하다. 그러나 내부에서 sleep이 가능한 block-api를 사용하면 안된다.
  • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 하는데 이를 쉽게 할 수 있도록 rcu_dereference()와 같은 매크로 함수를 사용하면 편리하다. 이를 사용하여 안전하게 dereference된 RCU-protected pointer 값을 얻어온다.
    • read critical section을 완료하면 더 이상 dereference된 pointer를 사용할 수 없으므로 이 포인터를 이용하여 얻어올 해당 자료의 l-value 값 들을 읽어온다.(구조체 전체를 복사하여 가져오거나…)
  • Read side critical section의 끝은 read_rcu_unlock() 함수로 완료한다.
    • non-preemptible RCU 커널을 사용하는 경우에는 preemption를 다시 허용시킨다.
    • preemptible RCU 커널을 사용하는 경우에는 로컬 카운터를 감소시킨다.

 

다음 동기화되어 보호받아야 할 데이터의 멤버 a를 얻어오는 코드를 확인한다. (preemptible rcu)

  • 글로벌 참조 카운터의 증가는 conceptual한 것이다.
  • 실제 구현에서의  참조카운터 증가는 현재 태스크 디스크립터에 저장된다. 태스크 디스크립터는 캐시 되어 있을 것이므로 매우 빠른 액세스가 보장된다.
    • (p->rcu_read_lock_nesting)

rcu5b

int foo_get_a(void)
{
	int retval;
	rcu_read_lock();
	retval = rcu_dereference(gbl_foo)->a;
	rcu_read_unlock();
	return retval;
}

 

Updater: RCU node의 수정

  • 기존 노드 데이터를 곧바로 수정하지 않고 복사(Read-Copy)한 후 그 복사한 자료를 수정(Update)해야 하므로 준비 과정으로 새로운 노드 데이터를 먼저 할당한다.
  • Write side critical section의 시작은 기존 lock 중 적절한 하나를 사용한다.
  • rcu_dereference_protected() 매크로 함수를 사용하여 매모리 배리어를 사용한 후 안전하게 노드를 가리키는 포인터를 얻어온다.
  • 기존 노드 데이터에 대한 직접 수정은 금지된다. 곧바로 수정하지 않고 복사한 후에 수정 작업을 진행해야 다른 Reader 들이 안전하게 기존 노드 데이터를 사용할 수 있다.
  • rcu_assign_pointer()를 사용하여 먼저 매모리 배리어를 사용한 후 안전하게 노드 포인터를 교체한다.
  • 교체가 완료되었으면 Write side critical section의 끝으로 사용한 lock을 닫는다.
  • 마지막으로 기존 노드 메모리 삭제를 위해 다른 Reader 들이 접근하지 않는 것이 보장되는 시간에 기존 노드 메모리를 삭제할 수 있도록 rcu 동기 또는 비동기 호출을 한다.
    • call_rcu:
      • rcu 비동기 호출로 non-blocking 함수이다.
      • 모든 기존 RCU read-side critical sections들이 끝나면 인수로 요청한 함수를 호출한다.
    • synchronize_rcu():
      • rcu 동기 호출로 blocking 함수이다.
      • 모든 기존 RCU read-side critical sections들이 끝날 때까지 이 함수내에서 기다리며 그 후 인수로 요청한 함수를 호출한다.

 

아래 그림과 같이 기존 노드를 안전하게 수정하기 위해 복제 후 수정하고 기존 노드는 삭제 요청한다.

rcu7b

void foo_update_a(int new_a)
{
	struct foo *new_fp;
	struct foo *old_fp;
	new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
	spin_lock(&foo_mutex);
	old_fp = rcu_dereference_protected(gbl_foo,
		 lockdep_is_held(&foo_mutex));
	*new_fp = *old_fp;
	new_fp->a = new_a;
	rcu_assign_pointer(gbl_foo, new_fp);
	spin_unlock(&foo_mutex);
	call_rcu(&old_fp->rcu, foo_reclaim); 
}

 

Reclaimer: RCU node 사용 완료 후 폐기

  • 기존 데이터의 다른 Reader 들에 의해 사용되지 않음이 확인되면 grace periods 이후에 삭제하여 메모리를 회수한다.
  • 참조 카운터가 0이되면 삭제하는 것은 conceptual한 것이다. 글로벌 참조 카운터를 atomic하게 업데이트하는 구조(spin-lock)의 동기화를 사용하지 않아야 하므로 실제 구현은 더 복잡한다.

rcu6b

void foo_reclaim(struct rcu_head *rp)
{
	struct foo *fp = container_of(rp, struct foo, rcu);
	foo_cleanup(fp->a);
	kfree(fp);
} 

 

리스트에서의 RCU 사용

list 전용 API를 사용하여 간단히 interation을 할 수 있다. iteration 중에 방향(direction)을 바꿀 수는 없다.

 

Reader: RCU node를 사용할 때

rcu13

void disp_foo(void)
{
	rcu_read_lock();
	list_for_each_entry_rcu(p, head, list) {
		disp_foo();
	}
	rcu_read_unlock();
}

 

Updater: RCU node의 수정

  • list_replace_rcu()를 수행한 순간 역방향 연결은 끊어진 반면 순방향 연결은 계속 살아있다.

rcu14

void foo_update_a(int new_a)
{
	struct foo *new_fp;
	struct foo *old_fp = search(head, key);
	if (old_fp == NULL) {
	/* Take appropriate action, unlock, and return. */
	}
	new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
	*new_fp = *old_fp;
	new_fp->a = new_a;
	list_replace_rcu(&old_fp->list, &new_fp->list);
	call_rcu(&old_fp->rcu, foo_reclaim); 
}
static inline void list_replace_rcu(struct list_head *old,
                                struct list_head *new)
{
        new->next = old->next;
        new->prev = old->prev;
        rcu_assign_pointer(list_next_rcu(new->prev), new);
        new->next->prev = new;
        old->prev = LIST_POISON2;
}

 

Reclaimer: RCU node 사용 완료 후 폐기

  • Grace periods를 지난 후 삭제 대상 노드를 모두 clean-up한다.

rcu15

void foo_reclaim(struct rcu_head *rp)
{
	struct foo *fp = container_of(rp, struct foo, rcu);
	foo_cleanup(fp->a);
	kfree(fp);
}

 


GP(Grace Period)

사용자는 특정 자료의 동기화를 위해 두 가지의 관점에서 처리한다.

  • 읽기 처리만 수행하는 read-side critical section이 있고 이를 Reader로 부른다.
  • 변경 처리가 포함된 write-side critical section이 있고 이를 Updater 또는 Writer로 부른다.

 

Writer 발생되는 시점, 즉 write-side critical section 부터 시작되는 구간 3가지를 정리해보았다.

  • Removal 구간: write-side critical section 구간에서는 읽어서(Read() 사본(Copy)을 만들고 변경(Update) 작업을 한다.
  • Grace Period 구간: Removal을 수행하는 자료에 관련된 Reader들의 처리가 완료됨을 보장할 수 있도록 대기하는 구간이다.
  • Reclamation 구간: 기존 사본을 원본에 적용하는 구간이다.

 

Removal(Read-Copy-Update)이 진행되는 동안 해당 데이터에 접근하고 있는 Reader들을 보호하기 위해 Removal은 기존 Reader들이 접근하고 있는 자료는 곧바로 수정하지 않고 이를 먼저 Copy한 후 Update하여 사용한다. 결국 Reader들은 원본 자료에만 접근하므로 안전하게 데이터에 접근할 수 있다.

 

Removal이 완료된 후 작업된 사본을 원본에 반영해야 하는데 곧바로 반영하면 각 cpu에서 동시에 처리되고 있는 Reader들에 문제가 발생한다. 따라서 Removal 기간 동안 같은 자료에 접근한 Reader들의 처리가 모두 완료될 때까지 기다려야 하는데 이 구간을 GP라고 한다.

 

GP의 완료를 인지하면 사본을 원본에 반영하고 필요 없어진 자료는 폐기하는 Reclamation 구간이 시작된다. 이러한 처리들은 RCU의 후CB(Call-Back) 함수를 리스트에 보관해 두었다가 Reclamation 구간에서 호출하여 처리한다.

 

다음 그림을 보고 Writer에 의해 발생되는 3가지 구간을 확인한다. 아래의 Grace Period는 conceptual한 의미이고, Min Grace Period로 불린다. Removal 기간 중 관련된 Reader들의 끝 부분이 곧바로 인식되지 않으므로 Grace Period가 끝나는 지점을 감지(detection) 하는 방법을 사용한다.

 

 

Grace Period의 종료 감지

커널이 Min Grace Period가 완료됨을 곧바로 인식하지 못하므로 실제 GP가 완료된 것을 감지하는 것은 특수한 기법이 필요하다. Reader(read-side critical section) 구간에서 성능을 저하시키는 추가 작업을 하지 않는 것이 핵심이므로 이 구간이 완료됨을 알아내기 위해 모든 cpu들이 QS(Quiescent State) 상태를 지나간 경우 GP가 완료된 것으로 판정한다.

  • preemptible RCU를 사용하는 시스템의 경우 GP 종료는 아래 그림의 모든 QS의 완료이외에도 추가로 read-side critical section내에서 preemption된 태스크의 복귀 상태도 확인한다.

 


QS(Quiescent State)

QS(Quiescent State)는 Reader 간의 사이 시간 상태로 GP 구간 전 이미 진행되고 있었던 Reader의 구간이 끝난 것을 인지할 목적으로 사용한다. QS 상태가 패스되었는지 확인하는 방법은 여러 가지가 사용되며 특히 preemption 커널 모델에 따라 조금씩 상이하다. 더 자세한 것은 다음 글의 “Quiscent State 체크 & 리포트”를 참조한다.

 

RCU non-preemptible 커널

  • context switch
    • context switch 발생 시 해당 cpu를 q.s로 체크한다.
  • 유저 모드 또는 idle에서 스케줄 틱
    • 유저 태스크 수행 중이거나 idle 중인 경우 해당 cpu를 q.s로 체크한다.
  • softirqd 실행 중
    • 현재 cpu에서 동작 중인 태스크가 softirqd인 경우 q.s로 체크한다.
  • voluntry 커널에서 cond_resched() 사용
    • preemption pointer 중 하나인 cond_resched() 사용 시 해당 cpu를 q.s로 체크한다.
  • voluntry 커널에서 cond_resched_tasks_rcu_qs() 사용
    • cb용 gp 커널 스레드 또는 nocb용 콜백 처리 루틴의 long loop에서 이 함수를 사용 시 해당 cpu를 q.s로 체크한다.

 

다음 그림과 같이 conceptual 수준에서의 QS는 Reader들 간의 시간으로 각 QS는 기존 Reader의 처리가 완료되었음을 보장하는 기간이다.

Candidate Quiescent State & Observed Quiescent State

다음 그림은 Min GP 이후 detect된 Q.S가 검정색 원으로 표시되었다. 각 cpu의 Q.S가 검출되면 실제 GP가 완료되었음을 알 수 있다.

 

조금 더 conceptual 수준을 구현 레벨로 약간 확장해보면 Write가 끝나면 GP가 시작하는데 이 때 각 cpu의 QS 상태를 모두 클리어한다. 그 후 모든 cpu의 QS가 한 번 이상 패스되면 GP가 끝나고 Reclamation이 진행된다.

 

다음 그림은 커널에서 실제 구현 레벨에서 QS 인지를 하는 과정을 보여준다. RCU Reader 이후 곧바로 cpu가 Quiescent State로 변경되는 것이 아니라 context switch 등이 일어났음을 알 수 있는 포인트에서 Quiescent State가 시작한다.

  • 구현 레벨이 상당히 복잡하므로 아래 그림만 보아서는 쉽게 이해가 가지 않을 수 있다.
  • cpu#0번의 경우를 먼저 살펴보자. conceptual 레벨과 다르게 실제 구현에서는 reader가 완료되자마자 q.s를 인식하지 못한다. 유저 태스크에 진입하면 q.s가 시작하지만 GP가 시작됨과 동시에 q.s가 모두 클리어되었었음을 기억해야 한다. 때문에 다른 태스크의 전환을 수행하는 context switch를 통해 q.s를 인지하게 된다.
  • cpu#3번을 보면 역시 reader가 끝난 후 유저 태스크에 진입하는 것으로 q.s가 시작됨을 알 수 있다. 이 또한 GP가 시작됨과 동시에 클리어되고 스케줄 틱이 발생하여 q.s를 인지하게된다.
  • cpu#5번의 경우 q.s를 인지하였고 모든 cpu의 q.s가 설정되었음을 알게되었다. 이 때 Reclamation 구간이 시작된다.

 

Force Quiescent State

GP가 시작된 후 각 cpu의 qs를 대기하는데 너무 오랫동안 gp가 끝나지 않으면 강제로 qs를 패스상태로 만든다.

 

Brute-Force RCU GP

synchronize_rcu() 함수를 호출하여 gp가 완료되어 동기화를 요청하면 gp가 완료할 때까지 기다린 후 콜백들을 처리한다. 만일 수천 개의 cpu가 가동되는 시스템에서 이러한 요청이 있으면 매우 오랜 시간을 대기해야 하는데 cpu를 그룹별로 묶어 파티셔닝하여 처리하는 구현 방법으로 빠른 처리를 사용하는 방법이 있다. 이 기능은 preemptible rcu 커널 옵션을 사용할 때 synchronize_rcu() 함수의 사용에 대한 급행처리를 할 수 있게하는 기능이다.

 

Deferrable QS

preemptible RCU의 경우 rcu_read_unlock() 시 해당 cpu의 interrupt, bh(softirq), preemption이 하나라도 disable되어 있는 경우 deferred qs로 보고한다.

 


RCU 콜백 처리

3가지 rcu_state

rcu는 스케줄 틱마다 콜백을 처리하기 위해 설계되었고, 한동안 각각의 gp를 관리하는 3개의 state에서 관리하였다가 최근에는 1개의 state로 통합되었다.

통합전
  •  rcu_sched
    • rcu read-side critical section 내에서 preemption되는 것을 막아야 하는 상황에서 사용되며, rcu 콜백은 rcu_sched용 gp가 끝난 후에 처리된다.
  • rcu_bh
    • DDOS attack 상황에서 OOM(Out Of Memory)이 발생하는 것을 방지하기 위해 softirq를 사용하는 네트워크 처리 시 rcu_bh 상태를 추가하여 별도로 관리하는 gp가 끝난 후에 처리한다.
  • rcu_preempt
    • preemptible 커널에서 read side critical 섹션 내에서 preemption을 허용하는 방식을 지원하기 위해 별도록 관리하는 gp가 끝난 후에 처리한다.
통합후
  • rcu_preempt
    • preemptible 커널을 사용하는 경우 read side critical 섹션 내에서 preemption을 허용한다.

 

RCU 콜백

synchronize_rcu() & call_rcu()

  • RCU updater가 콜백 요청을 하는데 모든 RCU read-side critical sections들이 끝난 후에 즉, gp가 끝난 후에 콜백이 처리되기를 기다린다.
  • read-side critical section 사용시 규칙: 모든 read-side critical section 즉 rcu_read_lock() 과 rcu_read_unlock()사이에는 block or sleep되면 안된다. (SRCU는 가능하다)
  • 모든 CPU의 context switch가 완료되면 RCU read-side critical section period는 완전히 끝난것으로 보장할 수 있게 되므로 모든 read-side critical section들이 안전하게 끝난것을 의미한다.
  • 삭제될 old 데이터들은 다른 Reader가 참조하고 있는 동안(Grace Period) 삭제 보류된 채로 있다가 old data를 참조하는 마지막 Reader의 사용이 끝나면 Reclamation을 진행하게 된다.

 

 read-side critical section period 보장

  • qs는 위에서 언급하였듯이 context switch이외에도 user 모드 또는 idle 시 스케줄 틱이 호출된 경우도 qs가 패스된다.
  • 모든 cpu의 qs가 패스되면 gp가 완료되어 reclmation이 진행된다.
    • preemptible rcu 구현이 추가되어 조금 더 복잡해졌다.
    • 최대 두 번의 gp가 완료되어야 reclamation이  진행될 수도 있다.

 

RCU NO-CB

절전 기능을 위해 콜백들을 인터럽트 context에서 호출하지 않고 별도의 cpu에 전용 no-cb 처리용 커널 스레드를 생성하여 운영하는 방법이다.

 


기존 rwlock 구현 소스를 rcu 구현 소스로 변경 예제

struct el {
	struct list_head list;
	long key;
	spinlock_t mutex;
	int data;
};
spinlock_t 	listmutex;                
struct 	el 	head;

 

search()

int search(long key, int *result) 
{
	struct list_head *lp;
	struct el *p;
	read_lock();
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			*result = p->data;
			read_unlock();
			return 1;
		}
	}
	read_unlock();
	return 0;
}
int search(long key, int *result) 
{
	struct list_head *lp;
	struct el *p;
	rcu_read_lock();
	list_for_each_entry_rcu(p, head, lp) {
		if (p->key == key) {
			*result = p->data;
			rcu_read_unlock();
			return 1;
		}
	}
	rcu_read_unlock();
	return 0;
}

 

delete()

int delete(long key)
{
	struct el *p;
	write_lock(&listmutex);
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			list_del(&p->list);
			write_unlock(&listmutex);
			kfree(p);
			return 1;
		}
	}
	write_unlock(&listmutex);
	return 0;
}
int delete(long key)
{
	struct el *p;
	spin_lock(&listmutex);
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			list_del_rcu(&p->list);
			spin_unlock(&listmutex);
			synchronize_rcu();
			kfree(p);
			return 1;
		}
	}
	spin_unlock(&listmutex);
	return 0;
}

 


RCU API 목록

RCU list traversal:

  • list_entry_rcu
  • list_first_entry_rcu
  • list_next_rcu
  • list_for_each_entry_rcu
  • list_for_each_entry_continue_rcu
  • hlist_first_rcu
  • hlist_next_rcu
  • hlist_pprev_rcu
  • hlist_for_each_entry_rcu
  • hlist_for_each_entry_rcu_bh
  • hlist_for_each_entry_continue_rcu
  • hlist_for_each_entry_continue_rcu_bh
  • hlist_nulls_first_rcu
  • hlist_nulls_for_each_entry_rcu
  • hlist_bl_first_rcu
  • hlist_bl_for_each_entry_rcu

RCU pointer/list update:

  • rcu_assign_pointer
  • list_add_rcu
  • list_add_tail_rcu
  • list_del_rcu
  • list_replace_rcu
  • hlist_add_behind_rcu
  • hlist_add_before_rcu
  • hlist_add_head_rcu
  • hlist_del_rcu
  • hlist_del_init_rcu
  • hlist_replace_rcu
  • list_splice_init_rcu
  • hlist_nulls_del_init_rcu
  • hlist_nulls_del_rcu
  • hlist_nulls_add_head_rcu
  • hlist_bl_add_head_rcu
  • hlist_bl_del_init_rcu
  • hlist_bl_del_rcu
  • hlist_bl_set_first_rcu

RCU:

  • rcu_read_lock
  • synchronize_net
  • rcu_barrier
  • rcu_read_unlock
  • synchronize_rcu
  • rcu_dereference
  • synchronize_rcu_expedited
  • rcu_read_lock_held
  • call_rcu
  • rcu_dereference_check
  • kfree_rcu
  • rcu_dereference_protected

bh:

  • rcu_read_lock_bh
  • rcu_read_unlock_bh
  • rcu_dereference_bh
  • rcu_dereference_bh_check
  • rcu_read_lock_bh_held
  • call_rcu_bh
  • rcu_barrier_bh
  • synchronize_rcu_bh
  • synchronize_rcu_bh_expedited
  • rcu_dereference_bh_protected

sched:

  • rcu_read_lock_sched
  • rcu_read_unlock_sched
  • rcu_read_lock_sched_notrace
  • rcu_read_unlock_sched_notrace
  • rcu_dereference_sched
  • rcu_dereference_sched_check
  • rcu_read_lock_sched_held
  • synchronize_sched
  • rcu_barrier_sched
  • call_rcu_sched
  • synchronize_sched_expedited
  • rcu_dereference_sched_protected

SRCU:

  • srcu_read_lock
  • synchronize_srcu
  • srcu_barrier
  • srcu_read_unlock
  • call_srcu
  • srcu_dereference
  • synchronize_srcu_expedited
  • srcu_dereference_check
  • srcu_read_lock_held
  • init_srcu_struct
  • cleanup_srcu_struct

All: lockdep-checked RCU-protected pointer access

  • rcu_access_pointer
  • rcu_dereference_raw
  • RCU_LOCKDEP_WARN
  • rcu_sleep_check
  • RCU_NONIDLE

 


Core RCU API

rcu_read_lock()

  • read-side critical section 시작

  • preemptible RCU의 경우 current task의 rcu_read_lock_nesting++ 명령만을 포함한다.
  • 참고로 non-preempt 커널에서는 preempt_disable()은 빈 함수이다.

 

rcu_read_unlock()

 

rcu_assign_pointer()

  • RCU로 보호되는 포인터(RCU-protecte pointer)에 새로운 값을 할당하기 위해 사용

 

rcu_dereference()

  • rcu_dereference()는 안전하게 참조(dereference)된 RCU-protected pointer 값을 얻어온다.
  • weakly ordered CPU(out of order execution)를 위해 메모리 접근에 대한 order를 적절히 관리해야 하는데 rcu_dereference()를 사용하면 이를 완벽히 수행할 수 있다.

 

RCU_INIT_POINTER() 매크로

include/linux/rcupdate.h

/**
 * RCU_INIT_POINTER() - initialize an RCU protected pointer
 * @p: The pointer to be initialized.
 * @v: The value to initialized the pointer to.
 *
 * Initialize an RCU-protected pointer in special cases where readers
 * do not need ordering constraints on the CPU or the compiler.  These
 * special cases are:
 *
 * 1.   This use of RCU_INIT_POINTER() is NULLing out the pointer *or*
 * 2.   The caller has taken whatever steps are required to prevent
 *      RCU readers from concurrently accessing this pointer *or*
 * 3.   The referenced data structure has already been exposed to
 *      readers either at compile time or via rcu_assign_pointer() *and*
 *
 *      a.      You have not made *any* reader-visible changes to
 *              this structure since then *or*
 *      b.      It is OK for readers accessing this structure from its
 *              new location to see the old state of the structure.  (For
 *              example, the changes were to statistical counters or to
 *              other state where exact synchronization is not required.)
 *
 * Failure to follow these rules governing use of RCU_INIT_POINTER() will
 * result in impossible-to-diagnose memory corruption.  As in the structures
 * will look OK in crash dumps, but any concurrent RCU readers might
 * see pre-initialized values of the referenced data structure.  So
 * please be very careful how you use RCU_INIT_POINTER()!!!
 *
 * If you are creating an RCU-protected linked structure that is accessed
 * by a single external-to-structure RCU-protected pointer, then you may
 * use RCU_INIT_POINTER() to initialize the internal RCU-protected
 * pointers, but you must use rcu_assign_pointer() to initialize the
 * external-to-structure pointer *after* you have completely initialized
 * the reader-accessible portions of the linked structure.
 *
 * Note that unlike rcu_assign_pointer(), RCU_INIT_POINTER() provides no
 * ordering guarantees for either the CPU or the compiler.
 */
#define RCU_INIT_POINTER(p, v) \
        do { \
                rcu_check_sparse(p, __rcu); \
                WRITE_ONCE(p, RCU_INITIALIZER(v)); \
        } while (0)

RCU 포인터영역을 Sparse 체크하고 RCU 변수 v를 초기화한다.

  •  sparse:
    • 리눅스 커널의 문제를 찾아주는 툴
    • 스파스는 정적 분석 도구이고, 설치된 후 gcc extension으로 동작
    • 지원 속성으로 noderef, address_space, lock(acquires, releases), …
  • __rcu:
    • __attribute__((noderef, address_space(4)))
    • noderef:
      • 포인터 변수를 사용하여 직접 참조할 수 없다.
      • & 연산자를 사용해서 직접 참조해야 한다
    • address_space:
      • 커널에는 몇개의 주소공간이 있다.
      • 0 : kernel, 1: user, 2: iomem, 3: percpu, 4: __rcu 공간

 

참고

 

BKL(Big Kernel Lock)

  • 커널 버전 2.0에서 SMP가 소개됨
  • BKL은 giant-lock, big-lock 또는 kernel-lock이라고 알려졌었다.
  • 2.0 커널에서는 한 번에 하나의 스레드만이 커널 모드에서 동작하기 위해 lock을 획득하여야 커널 모드로 진입이 되었고, 나머지 CPU는 lock을 획득하기 위해 대기하였다.
  • 성능 및 리얼 타임 application에 대한 latency 이슈로 BKL은 spin-lock, mutex, RCU등으로 대체되기 시작함.
  • 리눅스 초창기에 SMP를 위해 구현된 BKL은 커널 2.6에서 일부 VFS와 몇개의 file system에만 남아있고 거의 대부분 제거되었다.
  • 2011년 커널 2.6.39에서 마지막 BKL 구현이 제거되었다.

BKL Functions

  • lock_kernel(): Acquires the BKL
  • unlock_kernel(): Releases the BKL
  • kernel_locked(): Returns nonzero if the lock is held and zero otherwise (UP always returns nonzero)

Synchronization

  • BKL은 CPU가 동시에 커널에  진입을 하는 것을 막아 동기화 문제를 해결한다.

bkl

Spinlock

<kernel v5.10>

Spin-Lock

critical section에 동시 접근한 스레드들 중 먼저 접근 요청한 스레드만이 critical section을 실행하는 동안 lock을 소유하고, 그 외의 스레드는 spin wait 한다. 다음으로 요청한 스레드들은 먼저 lock을 점유한 스레드의 점유 기간이 아주 짧다는 보장하에 lock 대기 시간 동안 spin wait 한다.

예) 간단한 대표 spin-lock API 사용 예

  • 병렬 프로그래밍을 위해 여러 cpu 또는 여러 context 에서 아래 safe_foo() 함수를 동시에 호출하더라도 <critical section>이 동시에 실행되지 않고, 순서대로(시리얼하게) 실행되는 것을 보장한다.
static DEFINE_SPINLOCK(foo);

void safe_foo()
{
        spin_lock(&foo);

        <critical section>

        spin_unlock(&foo);
}

 

critical section 구간에서 preemption

2009 Kernel summit에서 결정된 사항으로 RT 커널을 지원하면서 spinlock이 preemption이 가능해졌다. spin-lock API는 일반 커널의 경우 critical section 구간에서 preemption을 금지 시키지만, RT 커널의 경우 preemption을 허용한다. 따라서 협의하에 다음과 같이 두 개의 API 구성으로 변경하였다.

  • spin_lock()
    • preemption 될 수 있는 spin lock이다. (RT 커널에서 preemption 된다)
  • raw_spin_lock()
    • preemption 될 수 없는 spin lock이다. (RT 커널에서도 preemption 되지 않는다)
      • critical section 내에서 오랜 시간동안 머무르면 다른 스레드 역시 이 critical section 내부를 접근하지 못하고 장시간 대기하므로 최소한의 시간내에 사용을 마치고 lock을 풀어줘야 한다.

 

다음 그림은 높은 우선 순위의 태스크가 preemption 요청을 해온 상황이다.

 

위 그림의 preemption 요청에 대한 두 가지 커널에서 처리하는 방법이다.

 

raw_spin_lock의 spin 구간에서 처리 방법

  • critical section 구간과는 다르게 lock을 얻지 못해 spin을 하는 구간에서는 preemption 여부가 구현 방법에 따라 다른데 최근 3가지 구현 방법은 다음과 같다.
    • UP 시스템에서는 preemption을 무조건 금지 시킨다.
    • SMP + LHP(Lock-holder Preemption) 방식에서는 spin 하는 동안에는 preemption을 허용한다.
    • SMP + ticket 또는 SMP + queued 방식에서는 순서대로 lock을 획득하는 것이 보장된다. (default)
  • hardware bus locking 사용
    • ARMv6부터 lock 대기시간 시 전력을 줄이고자 lock 대기 시 ARM 이벤트 명령(wfe)을 사용하여 이벤트를 기다린다.
    • unlock을 할 때에는 lock 카운터도 감소 시키면서 ARMv6 이상에서 이벤트 전송 명령(sev)을 보내 lock을 종료시킨다.

 

 Spinning, busy-waitting, busy-looping

CPU가 쉬지 않고(Non-sleep-able) 특정 컨디션이 될 때 까지 루프를 도는 일을 spinning, busy-waiting 또는 busy-looping 이라 불린다.
루프를 탈출 할 수 있는 컨디션은 주로 다른 CPU에서 전달(조작)하는 특정 변수 값(카운터나 플래그) 또는 시그널로 판단을 한다.

  • CPU가 Non-sleep 한다는 말은 다른 태스크로 전환되지 않도록 preemption 되지 않는다는 의미다.
  • ARM32/64의 경우 절전을 위해 wfe(wait for event) 명령을 사용하여 대기하는 코드를 사용하였다.

 

Spinner

spinner를 굳이 표현하자면 위와 같이 쉬지 않고 도는actor(CPU)를 의미한다.

  • spinner가 spinning을 하는 동안 다른 CPU가 빠르게 시그널(카운터 값 등)을 설정하지 않으면 spinner는 그 시간만큼 계속 루프를 돌아야 한다.
  • 보통 일반 적인 루프들은 조건에 부합되지 않으면 기다리는 동안 자신을 sleep시켜 다른 태스크로 CPU 자원을 양보(yield)를 하는데 그러한 양보로 문맥교환(context switch)이 일어나는 비용이 비싸다고 판단되는 경우에 사용하는 기법이다. 다시 말하면 더 빠르게 루프를 빠져 나갈때 spin을 사용하여 설계된다.

 

Spin 탈출을 위한 컨디션

  • 사용 목적에 따라 preempt_disable() 뿐만 irq_disable()까지 사용할 수도 있다.
  • non-preemption spin lock에 대해서는 아래 그림처럼 spin 하는 동안 preemption 되는 것을 방지하기 위해 critical section의 앞뒤로 preempt_disable()과 preempt_enable() 명령어로 보호를 받고있다.

spinlock3

 

CONFIG_GENERIC_LOCKBREAK

  • 이 커널 옵션은 spin lock이 lock을 얻지 못한 상태에서 spinning 상태인지를 알아내기 위해 다음과 같이 구현되었다.
    • raw_spinlock 구조체에 break_lock이라는 변수를 추가하여 spinning 상태 여부를 나타낸다.
    • SMP + LHP 방식에 break_lock을 설정/해제 하도록 구현되었다.
    • 단점으로 int 하나면 구현되는 raw_spinlock 구조체가 두 배로 커지는 문제가 있다.
  • 현재 커널은 어떠한 아키텍처도 이 옵션을 사용하지 않는다.
    • SMP + ticket에 대한 루틴이 구현되면서 spin_is_contended()라는 함수가 만들어졌다.
      • lock이 spin중인지를 break_lock 변수 없이 ticket.owner와 ticket.next의 차이가 1을 초과하는 경우 spinning 상태인지 알아낼 수 있게 되었다.
    • 2013년 12월 마지막으로 사용했었던 arm64 아키텍처 코드에서도 삭제되면서 이제는 필요 없어진 옵션이다.
    • 참고:

 

Spinlock의 명명 체계

spinlock에 대한 명명은 다음 3단계로 이루어진다.

  • 1) spin_lock:
    • RT 커널에서는 preemptible spinlock으로 동작하지만, 일반 커널에서는 raw_spin_lock을 호출하여 non-preemptible spinlock으로 동작한다.
  • 2) raw_spin_lock:
    • RT 커널이든 일반 커널이든 non-preemptible spinlock으로 동작한다.
    • raw_spin_lock -> _raw_spin_lock -> __raw_spin_lock 으로 구현되었다.
    • UP 방식과 SMP 방식을 나누었다.
      • UP 방식에서는 아키텍처별 전용 코드를 사용하지 않고, generic 코드로만 구성했다.
      • SMP 방식에서는 높은 성능 구현을 위해 아키텍처별 전용 코드를 사용한다.
        • LHP 방식에서는 대부분의 코드가 generic 코드이지만 Queued or Ticket 방식에서 사용하는 arch_spin_trylock()를 필요로한다.
        • Ticket 및 Queue 방식은 아래의 아키텍처 전용 코드를 사용한다.
  • 3) arch_spin_lock:
    • 하드웨어 레벨의 아키텍처별 구현
    • UP를 위한 하드웨어 레벨의 코드는 없다.
    • SMP인 경우
      • ARM32에서 ticket based spin lock 구현 (ARM64도 v4.1까지 사용)
      • ARM64에서 queued spin lock 구현(v4.2 부터)

 

3가지 구현 방법: UP vs LHP vs Ticket or Queue

UP

  • 1개의 cpu 만을 사용하는 UP 시스템의 경우 두 개 이상의 태스크에서 동시에 critical section을 진입하는 것을 방지하려면 preemption을 disable하는 것만으로도 다른 태스크로의 전환이 불가능해진다. 따라서 spin_lock() 함수는 내부에서 preempt_disable() 함수만을 사용한다.
  • 다만 preemption을 disable 하여 다른 태스크로의 전환을 금지하여도 irq 또는 nmi가 발생하여 동작하는 루틴에서 critical section을 보호해야 하는 경우 preemption disable 만으로는 불가능해진다. 이러한 경우에는 spin_lock_irq() 또는 spin_lock_irqsave()를 사용해야 한다.

 

LHP(Lock-Holder Preemption)

  • lock 획득 후 critical section에서 preemption을 차단하지만, lock 획득 전에 spin wait을 하는 구간에서 preemption을 허용하는 기능이다.
  • 어떤 알고리즘에서 사용한 spin lock이 오랜 시간 spin wait 될 수 밖에 없을 때에 preemption이 가능하게 해주는 구조로 preemption latency를 줄일 수 있어 뛰어난 real time application을 지원하기 위해 효과적이다.

 

Ticket base 또는 Queued spin-lock

 

다음 그림은 spin_lock() 호출 후 3가지 구현 방법에 대한 호출 과정을 보여준다.

  • ARM64의 경우 C. SMP & Ticket(or Queue)을 사용한다.
  • 주의: PREEMPT_RT 커널 옵션은 RT 커널용이다.

 

다음 그림은 LHP 구현의 경우 spin wait 중에 preemption을 허용하는 모습을 보여준다.

 


spin_lock API

다음은 RT Linux용 spin lock과 일반 Linux용 spin lock이다.

1) RT Linux용 spin_lock()

  • preemption이 가능한 down_mutex를 호출함.
static inline void spin_lock(spinlock_t *lock)
static void __spin_lock(spinlock_t *lock, unsigned long eip)
{
	SAVE_BKL(_down_mutex(&lock->lock, eip));
}

2) 일반 Linux 용 spin_lock()

  • 일반 리눅스 커널은 spin lock에서 preemption이 지원되지 않는다. 그러므로 spin_lock()은 raw_spin_lock()을 호출한다.
    • raw_spin_lock() 내부에서 preempt_disable()을 동작시키므로 결국 spinlock은 preemption 되지 않음을 알 수 있다.

includelinux/spinlock.h

/*
 * Define the various spin_lock methods.  Note we define these
 * regardless of whether CONFIG_SMP or CONFIG_PREEMPT are set. The
 * various methods are defined as nops in the case they are not
 * required.
 */
#define raw_spin_lock(lock)     _raw_spin_lock(lock)

 


raw_spin_lock API

  • raw_spin_lock()은 preemption이 되지 않는 것으로 규정되어 있다.
  • raw_spin_lock()의 명명 체계는 다음과 같다.
    • raw_spin_lock() -> _raw_spin_lock() -> __raw_spin_lock()
  • 3가지 구현 방법은 다음과 같다.

 

1) raw_spin_lock() – UP

include/linux/spinlock_api_up.h

#define _raw_spin_lock(lock)                    __LOCK(lock)
  • UP 시스템에서는 단순히 __LOCK()을 호출한다.

 

__LOCK()

include/linux/spinlock_api_up.h

#define __LOCK(lock) \
  do { preempt_disable(); ___LOCK(lock); } while (0)
  • preempt_disable() 한 후 ___LOCK() 함수에서는 Sparse 정적 코드 분석 툴을 위한 매크로를 호출한다.

 

___LOCK()

include/linux/spinlock_api_up.h

/*
 * In the UP-nondebug case there's no real locking going on, so the
 * only thing we have to do is to keep the preempt counts and irq
 * flags straight, to suppress compiler warnings of unused lock
 * variables, and to add the proper checker annotations:
 */
#define ___LOCK(lock) \
  do { __acquire(lock); (void)(lock); } while (0)
  • __acquire() 매크로는 Sparse 정적 코드 분석툴을 사용하여 lock에 대한 적절한 체크를 수행한다.
    • #define __acquire(x)   __context__(x,1)

 

2) raw_spin_lock() – SMP

 

_raw_spin_trylock()

kernel/locking/spinlock.c

int __lockfunc _raw_spin_trylock(raw_spinlock_t *lock)
{
        return __raw_spin_trylock(lock);
}
EXPORT_SYMBOL(_raw_spin_trylock);

 

_raw_spin_lock()

kernel/locking/spinlock.c

void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
{
        __raw_spin_lock(lock);
}
EXPORT_SYMBOL(_raw_spin_lock);

 

_raw_spin_unlock()

kernel/locking/spinlock.c

void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
{
        __raw_spin_unlock(lock);
}
EXPORT_SYMBOL(_raw_spin_unlock);

 

__raw_spin_trylock()

include/linux/spinlock_api_smp.h

static inline int __raw_spin_trylock(raw_spinlock_t *lock)
{
        preempt_disable();
        if (do_raw_spin_trylock(lock)) {
                spin_acquire(&lock->dep_map, 0, 1, _RET_IP_);
                return 1;
        }
        preempt_enable();
        return 0;
}

 

__raw_spin_lock()

LHP용과 그 외(Queued or Ticket) 구현 방식의 코드가 각각이므로 이 부분만 아래로 옮겼다.

 

__raw_spin_unlock()

include/linux/spinlock_api_smp.h

static inline void __raw_spin_unlock(raw_spinlock_t *lock)
{
        spin_release(&lock->dep_map, _RET_IP_);
        do_raw_spin_unlock(lock);
        preempt_enable();
}

 

2-1) SMP for LHP

아래 그림과 같이 critical section에서는 preemption이 불가능하지만 루프를 돌며 spin하는 동안은 preemption이 가능한 구조이다.

  • LHP에서 spin 하는 cpu들간의 우선 순위별 진입은 지원하지 않는다.

 

__raw_spin_lock() – for LHP

__raw_spin_lock() 함수는 아래 BUILD_LOCK_OPS() 매크로 함수를 사용하여 만들어진다.

 

kernel/locking/spinlock.c

#define BUILD_LOCK_OPS(op, locktype)                                    \
void __lockfunc __raw_##op##_lock(locktype##_t *lock)                   \
{                                                                       \
        for (;;) {                                                      \
                preempt_disable();                                      \
                if (likely(do_raw_##op##_trylock(lock)))                \
                        break;                                          \
                preempt_enable();                                       \
                                                                        \
                if (!(lock)->break_lock)                                \
                        (lock)->break_lock = 1;                         \
                while (!raw_##op##_can_lock(lock) && (lock)->break_lock)\
                        arch_##op##_relax(&lock->raw_lock);             \
        }                                                               \
        (lock)->break_lock = 0;                                         \
}                                                                       \
BUILD_LOCK_OPS(spin, raw_spinlock);
BUILD_LOCK_OPS(read, rwlock);
BUILD_LOCK_OPS(write, rwlock);

LHP 방식의 spin lock을 획득한다. spin 하는 동안 preemption을 잠깐씩 활성화하여 우선 순위가 높은 태스크가 있는지 확인하고, 스케줄링할 준비를 한다.

  • 코드 라인 4~8에서 lock을 획득할 수 있는지 시도한다. 그 동안 preemption을 끈다. 일반적으로 lock contension 상황이 아닌 경우가 대부분이므로 높은 확률로 do_raw_spin_trylock() 함수가 true가 될 수 있다고 판단하여 likely 함수를 사용하였다.
  • 코드 라인 10~11에서 spin하는 동안은 항상 1로 설정된다.
    • raw_spin_in_contended() 함수를 통해 spin 중인지 알아내기 위한 플래그로 사용된다.
  • 코드 라인 12~14에서 lock을 획득할 수 있는 상태가 될 때까지 spin하며 내부 루프를 돈다. lock 획득 가능 상태가 되면 다시 외부 루프를 돈다.
  • 코드 라인 15에서 spin에서 빠져나왔으므로 spin 하지 않는다고 플래그를 설정한다.

 

BUILD_LOCK_OPS() 매크로를 사용하여 다음 함수들이 만들어진다. (단 irqsave, irq, bh 접미사로 끝나는 함수 코드는 생략)

  • __raw_spin_lock()
  • __raw_spin_lock_irqsave()
  • __raw_spin_lock_irq()
  • __raw_spin_lock_bh()
  • __raw_read_lock()
  • __raw_read_lock_irqsave()
  • __raw_read_lock_irq()
  • __raw_read_lock_bh()
  • __raw_write_lock()
  • __raw_write_lock_irqsave()
  • __raw_write_lock_irq()
  • __raw_write_lock_bh()

 

raw_spin_can_lock()

kernel/locking/spinlock.c

/**
 * raw_spin_can_lock - would raw_spin_trylock() succeed?
 * @lock: the spinlock in question.
 */
#define raw_spin_can_lock(lock) (!raw_spin_is_locked(lock))

lock을 얻을 수 있는지 여부를 판단한다.

 

raw_spin_is_locked()

include/linux/spinlock.h

#define raw_spin_is_locked(lock)        arch_spin_is_locked(&(lock)->raw_lock)

lock이 걸려 있는지 여부를 판단을 한다.

 

2-2) SMP for Queued or Ticket

  • Queue / Ticket 기능을 구현하여 다음과 같은 장점을 갖게되었다.
    • 공정성
      • 초기 spin lock은 lock 획득 순서가 공정하지 않았었는데 커널 2.6.25 부터 ticket을 부여받아 차례 대로 획득 가능해졌다.
    • cache bouncing 문제 제거
      • cache coherent 기능에 의해 두 개 이상의 CPU가 lock을 획득하기 위해 spin 하는 동안 strex 명령을 반복하여 사용하므로 spin 하는 CPU들에서 cache line의 로드와 invalidate(강제적인 eviction)를 반복하면서 성능이 저하된다. 이를 막기 위해 lock 값을 둘로 나누어 둘 값을 비교하면서 자기 차례가 아닌 경우에는 write 즉 strex 동작을 하지 않도록 하여 이 문제를 해결하였다.
      • cache bouncing 문제도 심각하게 lock contention을 야기하고 lock contention은 성능을 떨어뜨리는 큰 원인이된다.

 

__raw_spin_lock() – for Queued or Ticket

include/linux/spinlock_api_smp.h

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
        preempt_disable();
        spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
        LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

Queued 또는 Ticket 방식의 spin lock을 획득한다.

  • 코드 라인 3에서 preemption을 비활성화한다.
  • 코드 라인 4에서 Lockdep 디버깅용 코드를 수행한다.
  • 코드 라인 5에서 실제 spin lock을 얻기 위해 spin 한다.
    • 내부에서는 do_raw_spin_try_lock() 함수를 먼저 이용해보고 안되면 do_raw_spin_lock() 함수를 호출하여 spin한다.

 

LOCK_CONTENDED()

include/linux/lockdep.h

#define LOCK_CONTENDED(_lock, try, lock)                        \
do {                                                            \
        if (!try(_lock)) {                                      \
                lock_contended(&(_lock)->dep_map, _RET_IP_);    \
                lock(_lock);                                    \
        }                                                       \
        lock_acquired(&(_lock)->dep_map, _RET_IP_);                     \
} while (0)

@try 함수를 수행시켜 lock 획득을 시도하고, 실패하는 경우 lock contension 표시를 한 후 @lock 함수를 실행시켜 lock을 획득한다.

  • lock_contended() 함수는 lock contension 설정을 하여 lockdep 디버깅에서 추적을 위해 수행하는 함수이다.
  • lock_acquired() 함수는 lock 획득 설정을 하여 lockdep 디버깅에서 오류 추적을 위해 수행하는 함수이다.

 


arch_spinlock API

SMP 시스템을 위해 Queued 방식과 Ticket 방식의 두 가지 하드웨어 레벨 구현이 준비되어 있다.

 

Queued 방식 spin-lock – Generic

Queued spin-lock은 현재 다음 아키텍처에 구현되어 사용가능하다. 그 외의 아키텍처는 앞으로 구현되어야 한다.

  • arm64
  • mips
  • openrisc
  • sparc
  • x86
  • xtensa

 

include/asm-generic/qspinlock.h

/*
 * Remapping spinlock architecture specific functions to the corresponding
 * queued spinlock functions.
 */
#define arch_spin_is_locked(l)          queued_spin_is_locked(l)
#define arch_spin_is_contended(l)       queued_spin_is_contended(l)
#define arch_spin_value_unlocked(l)     queued_spin_value_unlocked(l)
#define arch_spin_lock(l)               queued_spin_lock(l)
#define arch_spin_trylock(l)            queued_spin_trylock(l)
#define arch_spin_unlock(l)             queued_spin_unlock(l)

Queued 방식 spin lock 구현을 위해 arch_* 함수들이 queued_spin_* 함수들로 매핑되었다.

 

다음 그림은 queued spin-lock에서 사용하는 arch_spinlock_t 타입과 멤버 및 관련 매크로 상수들을 보여준다.

  • tail_cpu
    • 0 값을 사용할 수 없다. 따라서 cpu 번호 + 1로 인코딩 값을 사용한다.
  • tail_idx
    • spin lock은 각 cpu 마다 최대 4번 nest될 수 있다. 이를 구분하기 위해 4개의 노드를 나누어 처리한다.
    • 최대 4번 nest된 경우 다음과 같은 순서로 tail_idx가 증가된다.
      • task, softirq, hardirq, nmi

 

다음과 같이 spin-lock 값은 간단히 (x,y,z)로 표현할 수 있다.

 

다음 그림과 같이 lock owner를 제외한 나머지 spin-lock들은 모두 대기하며, 세 번째 요청부터는 별도의 mcs queue를 구성하여 대기한다.

 

queued_spin_trylock()

include/asm-generic/qspinlock.h

/**
 * queued_spin_trylock - try to acquire the queued spinlock
 * @lock : Pointer to queued spinlock structure
 * Return: 1 if lock acquired, 0 if failed
 */
static __always_inline int queued_spin_trylock(struct qspinlock *lock)
{
        u32 val = atomic_read(&lock->val);

        if (unlikely(val))
                return 0;

        return likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL));
}

queued spin-lock을 획득 시도한다. 만일 획득한 경우 1을 반환하고, 실패한 경우 0을 반환한다.

  • 코드 라인 3~6에서 lock 값을 읽어와서 낮은 확률로 이미 lock이 걸려 있는 상태라면 0을 반환한다.
  • 코드 라인 8에서 lock 값을 1로 변경 시도한다. 성공한 경우 1을 반환하고, 실패한 경우 0을 반환한다.

 

queued_spin_lock()

include/asm-generic/qspinlock.h

/**
 * queued_spin_lock - acquire a queued spinlock
 * @lock: Pointer to queued spinlock structure
 */
static __always_inline void queued_spin_lock(struct qspinlock *lock)
{
        u32 val = 0;

        if (likely(atomic_try_cmpxchg_acquire(&lock->val, &val, _Q_LOCKED_VAL)))
                return;

        queued_spin_lock_slowpath(lock, val);
}

queued spin-lock을 획득한다.

첫 번째 cpu의 lock 획득(lock owner)
  • 코드 라인 5~6에서 fast-path queued spin-lock을 얻는다.
    • lock 값이 0인 경우, 즉 lock 경합이 없는 첫 cpu인 경우 손쉽게 lock을 획득할 수 있다. (uncontended)
      • (0,0,0) -> (0,0,1)
  • 코드 라인 8에서 slow-path queued spin-lock으로 전환한다.

 

queued_spin_unlock()

include/asm-generic/qspinlock.h

/**
 * queued_spin_unlock - release a queued spinlock
 * @lock : Pointer to queued spinlock structure
 */
static __always_inline void queued_spin_unlock(struct qspinlock *lock)
{
        /*
         * unlock() needs release semantics:
         */
        smp_store_release(&lock->locked, 0);
}

queued spin-lock을 획득 해제한다.

Lock Contension

queued spin lock의 경우 lock을 획득할 때 다음과 같이 4개의 lock contension 상황을 구별하였다.

  • uncontended
    • 경쟁하는 스레드가 없이 한 번에 lock owner가 된 경우이다.
    • (0,0,0) –> (0,0,1)
  • pending
    • 이미 먼저 처리 중인 lock owner가 있고, 그 후에 첫 번째로 spin wait 중에 lock owner가 된 경우이다.
    • (0,1,1) –> (0,1,0) –> (0,0,1)
  • uncontended queue
    • queue의 head에서 대기 중이고, 먼저 처리 중인 lock owner와 pending 중인 스레드의 lock 처리가 모두 완료되어 lock owner가 된 경우이다.
    • (n,x,y) –> (n,0,0) –> (0,0,1)
  • contended queue
    • queue에서 대기 중이고, head가 아니지만 먼저 처리 중인 lock owner와 pending 중인 스레드 그리고 queue의 내 앞에서 대기중인 스레드들의 lock 처리가 모두 완료되어 lock owner가 된 경우이다.
    • (*,x,y) –> (*,0,0) –> (0,0,1)

 

다음 그림은 파란 박스의 lock 요청자가 lock을 소유할 때 각각의 lock contension 상태를 보여준다.

  • contended queue 상태에 있는 cpu는 mcs 노드의 locked가 1로 풀릴 때 까지 spin 한다.

 

queued_spin_lock_slowpath()

kernel/locking/qspinlock.c -1/4-

/**
 * queued_spin_lock_slowpath - acquire the queued spinlock
 * @lock: Pointer to queued spinlock structure
 * @val: Current value of the queued spinlock 32-bit word
 *
 * (queue tail, pending bit, lock value)
 *
 *              fast     :    slow                                  :    unlock
 *                       :                                          :
 * uncontended  (0,0,0) -:--> (0,0,1) ------------------------------:--> (*,*,0)
 *                       :       | ^--------.------.             /  :
 *                       :       v           \      \            |  :
 * pending               :    (0,1,1) +--> (0,1,0)   \           |  :
 *                       :       | ^--'              |           |  :
 *                       :       v                   |           |  :
 * uncontended           :    (n,x,y) +--> (n,0,0) --'           |  :
 *   queue               :       | ^--'                          |  :
 *                       :       v                               |  :
 * contended             :    (*,x,y) +--> (*,0,0) ---> (*,0,1) -'  :
 *   queue               :         ^--'                             :
 */
void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
{
        struct mcs_spinlock *prev, *next, *node;
        u32 old, tail;
        int idx;

        BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));

        if (pv_enabled())
                goto pv_queue;

        if (virt_spin_lock(lock))
                return;

        /*
         * Wait for in-progress pending->locked hand-overs with a bounded
         * number of spins so that we guarantee forward progress.
         *
         * 0,1,0 -> 0,0,1
         */
        if (val == _Q_PENDING_VAL) {
                int cnt = _Q_PENDING_LOOPS;
                val = atomic_cond_read_relaxed(&lock->val,
                                               (VAL != _Q_PENDING_VAL) || !cnt--);
        }

        /*
         * If we observe any contention; queue.
         */
        if (val & ~_Q_LOCKED_MASK)
                goto queue;

        /*
         * trylock || pending
         *
         * 0,0,* -> 0,1,* -> 0,0,1 pending, trylock
         */
        val = queued_fetch_set_pending_acquire(lock);

        /*
         * If we observe contention, there is a concurrent locker.
         *
         * Undo and queue; our setting of PENDING might have made the
         * n,0,0 -> 0,0,0 transition fail and it will now be waiting
         * on @next to become !NULL.
         */
        if (unlikely(val & ~_Q_LOCKED_MASK)) {

                /* Undo PENDING if we set it. */
                if (!(val & _Q_PENDING_MASK))
                        clear_pending(lock);

                goto queue;
        }

        /*
         * We're pending, wait for the owner to go away.
         *
         * 0,1,1 -> 0,1,0
         *
         * this wait loop must be a load-acquire such that we match the
         * store-release that clears the locked bit and create lock
         * sequentiality; this is because not all
         * clear_pending_set_locked() implementations imply full
         * barriers.
         */
        if (val & _Q_LOCKED_MASK)
                atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK));

        /*
         * take ownership and clear the pending bit.
         *
         * 0,1,0 -> 0,0,1
         */
        clear_pending_set_locked(lock);
        lockevent_inc(lock_pending);
        return;

queued spin-lock을 slow-path 방법으로 획득한다.

  • 코드 라인 9~10에서 커널 설정이 para-virtual 스핀락을 지원하는 경우 pv_queue로 이동한다.
  • 코드 라인 12~13에서 virtual 스핀락을 지원하는 경우 그냥 함수를 빠져나간다.
    • 현재 x86 아키텍처에서만 지원할 수 있다.
두 번째 cpu가 lock owner로 핸드오버되는 순간이다. 그 동안 세 번째 cpu는 잠시 대기
  • 코드 라인 21~25에서 lock owner인 첫 번째 cpu가 lock을 이미 해제하였고, 두 번째 cpu가 pending 상태에서 lock owner로 아직 전환되지 않은 짧은 순간이다. 이런 경우 세 번째 cpu는 잠시 spin 한다. (펏 번째 lock owner가 없어졌으므로, 세 번째 cpu -> 두 번째 cpu로 포지션 변경)
    • 0,1,0 -> 0,0,1로 변경될 때까지 대기한다.
세 번째 cpu 이상은 mcs queue로 이동
  • 코드 라인 30~31에서 이미 두 번째 cpu가 lock을 획득하기 위해 대기하는 중이다. 세 번째 cpu 부터는 queue로 이동한다.
두 번째 cpu는 pending 상태로 대기
  • 코드 라인 38에서 lock->val 값을 val 변수로 읽어오고, lock->val에는 pending 비트를 설정한다.
    • 0,0,* -> *,1,*
  • 코드 라인 47~54에서 낮은 확률로 lock contension 상황이되어 두 번째 cpu가 세 번째 이상으로 밀려난 경우이다. 즉 위 코드에서 pending 비트를 설정하기 전부터 이미 두 번째 cpu 이상이 끼어들어 대기중인 경우(조금 전에 읽어온 val 값이 tail 또는 pending 설정) queue로 이동한다. 이 때 기존에 읽은 값에서 pending 설정이 없었으면 원래대로 돌리기 위해 lock->pending 비트를 제거한다.
  • 코드 라인 67~68에서 val 값에 lock 설정된 경우 lock 값이 0이될 때까지 spin 하며 대기한다. 즉 두 번째 cpu는 여기에서 lock이 풀릴때까지 대기한다.
  • 코드 라인 76~79에서 두 번째 cpu는 lock owner가 된다. 즉 lock->pending을 클리어하고 lock->locked를 설정한다. 그리고 lock_pending 통계를 증가시킨 후 함수를 빠져나간다.
    • (0,1,0) -> (0,0,1)

 

다음 그림은 2개의 spin-lock 요청을 처리할 때에는 mcs queue(3개 이상 요청 시 사용)를 사용할 필요없이 단순하게 처리되는 모습을 보여준다.

  • spin-lock 요청은 tail cpu id와 tail index id에 대해서 정확히 표시해야 하지만, 시각적으로 단순한게 표시하기 위해 CPU A, B, C, … 로 처리하였다.

 

kernel/locking/qspinlock.c -2/4-

        /*
         * End of pending bit optimistic spinning and beginning of MCS
         * queuing.
         */
queue:
        lockevent_inc(lock_slowpath);
pv_queue:
        node = this_cpu_ptr(&qnodes[0].mcs);
        idx = node->count++;
        tail = encode_tail(smp_processor_id(), idx);

        /*
         * 4 nodes are allocated based on the assumption that there will
         * not be nested NMIs taking spinlocks. That may not be true in
         * some architectures even though the chance of needing more than
         * 4 nodes will still be extremely unlikely. When that happens,
         * we fall back to spinning on the lock directly without using
         * any MCS node. This is not the most elegant solution, but is
         * simple enough.
         */
        if (unlikely(idx >= MAX_NODES)) {
                lockevent_inc(lock_no_node);
                while (!queued_spin_trylock(lock))
                        cpu_relax();
                goto release;
        }

        node = grab_mcs_node(node, idx);

        /*
         * Keep counts of non-zero index values:
         */
        lockevent_cond_inc(lock_use_node2 + idx - 1, idx);

        /*
         * Ensure that we increment the head node->count before initialising
         * the actual node. If the compiler is kind enough to reorder these
         * stores, then an IRQ could overwrite our assignments.
         */
        barrier();

        node->locked = 0;
        node->next = NULL;
        pv_init_node(node);

        /*
         * We touched a (possibly) cold cacheline in the per-cpu queue node;
         * attempt the trylock once more in the hope someone let go while we
         * weren't watching.
         */
        if (queued_spin_trylock(lock))
                goto release;

        /*
         * Ensure that the initialisation of @node is complete before we
         * publish the updated tail via xchg_tail() and potentially link
         * @node into the waitqueue via WRITE_ONCE(prev->next, node) below.
         */
        smp_wmb();
세 번째 cpu부터 처리하는 mcs queue
  • 코드 라인 5에서 queue: 레이블이다. 3 개 이상의 cpu가 spin lock 경합을 할 때 이곳에서 mcs queue 기반의 처리를 수행한다.
  • 코드 라인 6에서 lock_slowpath 카운터를 증가시킨다.
  • 코드 라인 7에서 pv_queue: 레이블이다.
  • 코드 라인 8~10에서 현재 cpu에 대해 spin lock의 tail 값을 구한다.
    • 참고로 같은 cpu로 spin_lock이 호출되는 경우는 최대 4번이며 처음 idx 값은 0부터 사용되므로 최대 idx 값은 3이다.
  • 코드 라인 21~26에서 예외 처리 로직이다. 가능성은 없지만 혹시라도 idx 값이 4 이상인 경우 lock_no_node 카운터를 증가시키고, try lock 방법으로만 락을 반복하여 획득 시도를 하고, 획득 후에는 release 레이블을 통해 함수를 빠져나간다.
  • 코드 라인 28에서 현재 cpu의 idx에 대한 mcs 노드를 가져온다.
  • 코드 라인 33에서 idx가 0이 아닌 경우에만 다음과 같은 카운터를 증가시킨다.
    • idx가 1인 경우 lock_use_node2
    • idx가 2인 경우 lock_use_node3
    • idx가 3인 경우 lock_use_node4
  • 코드 라인 40에서 mcs 노드를 초기화 전에 node->count의 기록이 확실히 먼저 수행되도록 컴파일러 베리어를 사용했다.
  • 코드 라인 42~44에서 mcs 노드를 초기화한다. (node->count 값은 초기화하지 않는다)
    • para-virtual qspin lock을 사용하는 경우 노드 초기화를 위해 kernel/locking/qspinlock_paravirt.h 파일에 있는 pv_init_node()를 사용한다. 그리고 cpu_running 상태로 시작한다.
  • 코드 라인 51~52에서 혹시나 앞선 두 cpu들의 lock이 모두 release되어 lock을 획득 시도하여 성공한 경우 release 레이블을 통해 함수를 빠져나간다.
  • 코드 라인 59에서 노드의 tail을 갱신하기 전에 앞서 초기화된 노드를 먼저 publish하기 위해 smp 베리어를 수행한다.

 

다음 그림은 동일한 cpu에서 spin-lock이 호출될 때 최대 4번까지 nest하여 호출되므로, mcs queue에서 대기할 cpu 정보를 표현하는 per-cpu mcs 노드가 최대 4개까지 사용됨을 보여준다.

 

다음 그림은 nest된 spin-lock의 자료 상태를 보여준다.

 

Spin-lock Revisit

동일 cpu에서 spin_lock() 함수 호출이 두 번 발생할 수 있는 케이스가 있다.

 

아래 그림과 같은 상황을 고려해보자.

 

위와 같은 사례가 발생하면 irq context의 spin_lock() 함수를 호출하자 마자 spin wait 상태를 반복하여 누가 풀어줄 방법도 없이 정지하게 된다.

  • 이를 방지하기 위해 task 및 irq context에서 spin_lock(&foo) 함수를 사용하는 대신 spin_lock_irq(&foo) 함수를 사용해여 irq가 진입하지 못하도록 원천적으로 막아야 한다.
  • bh(bottom-half) context에서도 spin-lock의 재진입을 막기 위해 spin_lock_bh() 함수를 사용한다.

 

kernel/locking/qspinlock.c -3/4-

.       /*
         * Publish the updated tail.
         * We have already touched the queueing cacheline; don't bother with
         * pending stuff.
         *
         * p,*,* -> n,*,*
         */
        old = xchg_tail(lock, tail);
        next = NULL;

        /*
         * if there was a previous node; link it and wait until reaching the
         * head of the waitqueue.
         */
        if (old & _Q_TAIL_MASK) {
                prev = decode_tail(old);

                /* Link @node into the waitqueue. */
                WRITE_ONCE(prev->next, node);

                pv_wait_node(node, prev);
                arch_mcs_spin_lock_contended(&node->locked);

                /*
                 * While waiting for the MCS lock, the next pointer may have
                 * been set by another lock waiter. We optimistically load
                 * the next pointer & prefetch the cacheline for writing
                 * to reduce latency in the upcoming MCS unlock operation.
                 */
                next = READ_ONCE(node->next);
                if (next)
                        prefetchw(next);
        }

        /*
         * we're at the head of the waitqueue, wait for the owner & pending to
         * go away.
         *
         * *,x,y -> *,0,0
         *
         * this wait loop must use a load-acquire such that we match the
         * store-release that clears the locked bit and create lock
         * sequentiality; this is because the set_locked() function below
         * does not imply a full barrier.
         *
         * The PV pv_wait_head_or_lock function, if active, will acquire
         * the lock and return a non-zero value. So we have to skip the
         * atomic_cond_read_acquire() call. As the next PV queue head hasn't
         * been designated yet, there is no way for the locked value to become
         * _Q_SLOW_VAL. So both the set_locked() and the
         * atomic_cmpxchg_relaxed() calls will be safe.
         *
         * If PV isn't active, 0 will be returned instead.
         *
         */
        if ((val = pv_wait_head_or_lock(lock, node)))
                goto locked;

        val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK));
세 번째 cpu부터 mcs 큐에 노드 추가
  • 코드 라인 8에서 lock->tail이 새롭게 추가할 노드를 가리키도록 변경한다.
    • (p,*.*) -> (n,*,*)
  • 코드 라인 9에서 일단 next에 null을 대입한다.
차순위 노드들의 대기
  • 코드 라인 15~16에서 tail에 이미 기존 노드가 존재하는 경우이다. tail에 연결된 노드의 앞에 있는 노드를 prev에 알아온다.
  • 코드 라인 19에서 대기 큐의 가장 마지막에 연결한다.
  • 코드 라인 21에서 pv_wait_node()의 경우도 para-virtual spin lock 에 대한 추가 코드로 대기하는 동안 vcpu를 끈다. vcpu_halted 상태에서 대기하다가 깨어난 후 vcpu_running 상태로 변경된다.
  • 코드 라인 22에서 mcs queue에서 내 노드가 가장 선두가 될 때까지 스핀하며 대기한다.
    • mcs 큐에서 나보다 이전에 진입한 mcs 노드가 있기 때문에 내 노드의 locked가 0인동안 spin 하면서 대기한다.
    • 앞선 노드가 락 owner가 되어 큐에서 빠져 나가는 순간 내 노드의 locked를 1로 변경해준다. 이 때 내 mcs 노드는 mcs 큐에서 가장 선두가 됨을 의미한다.
  • 코드 라인 30~32에서 현재 추가하는 노드의 다음에 값이 있는 경우 추가된 노드를 먼저 캐시에 로드한다.
헤드 노드의 대기
  • 코드 라인 56~57에서 mcs queue에서 내 노드가 선두에서 대기하기 위해 vcpu를 끄기 위한 para-virtual spin lock에 대한 추가 코드이다.
  • 코드 라인 59에서 mcs 큐의 선두에서 lock을 획득할 때 까지 대기한다.
    • lock->lock_pending=0 이어야 lock 획득이 가능하다
    • (*,x,y) -> (n,0,0)

 

다음 그림은 세 번째 spin-lock 요청할 때 mcs queue를 이용하여 대기 큐에 추가되는 모습을 보여준다.

  • val->tail은 mcs queue에 가장 마지막에 진입한 cpu를 가리킨다.

 

다음 그림은 head 노드(CPU C)가 lock을 획득하여 빠져나갈 때 다음 노드를 head 노드로 만드는 과정을 보여준다.

  • 다음 노드의 locked 값을 0으로 만들어 spin wait 중인 차순위 노드를 깨워 head를 만들고 lock->locked_pending에서 다시 spin wait하게 한다.

 

kernel/locking/qspinlock.c -4/4-

locked:
        /*
         * claim the lock:
         *
         * n,0,0 -> 0,0,1 : lock, uncontended
         * *,*,0 -> *,*,1 : lock, contended
         *
         * If the queue head is the only one in the queue (lock value == tail)
         * and nobody is pending, clear the tail code and grab the lock.
         * Otherwise, we only need to grab the lock.
         */

        /*
         * In the PV case we might already have _Q_LOCKED_VAL set, because
         * of lock stealing; therefore we must also allow:
         *
         * n,0,1 -> 0,0,1
         *
         * Note: at this point: (val & _Q_PENDING_MASK) == 0, because of the
         *       above wait condition, therefore any concurrent setting of
         *       PENDING will make the uncontended transition fail.
         */
        if ((val & _Q_TAIL_MASK) == tail) {
                if (atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
                        goto release; /* No contention */
        }

        /*
         * Either somebody is queued behind us or _Q_PENDING_VAL got set
         * which will then detect the remaining tail and queue behind us
         * ensuring we'll see a @next.
         */
        set_locked(lock);

        /*
         * contended path; wait for next if not observed yet, release.
         */
        if (!next)
                next = smp_cond_load_relaxed(&node->next, (VAL));

        arch_mcs_spin_unlock_contended(&next->locked);
        pv_kick_node(lock, next);

release:
        /*
         * release the node
         */
        __this_cpu_dec(qnodes[0].mcs.count);
}
EXPORT_SYMBOL(queued_spin_lock_slowpath);
헤드 노드의 lock 획득
  • 코드 라인 1에서 locked: 레이블이다. 큐의 헤드 노드가 lock을 획득하였다.
  • 코드 라인 23~26에서 노드가 큐의 가장 마지막인 경우이고, lock contension 없이 lock을 획득한 경우(tail=0, pending=0, lock=1) release 레이블을 통해 함수를 빠져나간다.
    • (n,0,1) -> (0,0,1)
  • 코드 라인 33에서 lock contension 상태이다. 이러한 경우 tail=0, pending=0, lock=1로 기록한다.
    • (p,0,1) -> (0,0,1)
  • 코드 라인 38~39에서 현재 노드의 현재 추가한 노드 뒤로 새로운 노드가 끼어들어왔는지 알기 위해 next 노드를 다시 읽어온다.
  • 코드 라인 41에서 헤드의 다음 노드를 locked=1로 변경하여 다음 노드가 spin에서 깨어나 헤드가 될 수 있게 해준다.
  • 코드 라인 42에서 vcpu_halted -> vcpu_hashed 상태로 변경하는 para-virtual spin-lock 대한 추가 코드이다.
  • 코드 라인 44에서 release: 레이블이다.
  • 코드 라인 48에서 lock을 획득했으므로 mcs 큐에서 대기중인 cpu 수를 감소시킨다.

 

다음 그림은 mcs queue에서 대기하던 cpu들이 spin-lock을 획득하는 순서를 보여준다.

  • mcs queue에서 가장 처음 진입하여 대기하고 있던 cpu는 spin-lock을 획득하기 위해 val->pending과 val->locked가 0이되는 것을 감시한다.

 

락 이벤트 카운터

LOCK_EVENT_COUNTS 커널 옵션을 사용하여 다음 락들에 대해 디버깅 목적의 락 이벤트 카운터를 관리한다. 이들을 보려면 CONFIG_DEBUG_FS 커널 옵션이 준비되어 있을 때 debugfs(디폴트 마운트: /sys/kernel/debug)를 마운트하여 사용하는데 lock_event_counts 디렉토리에서 각 이벤트들을 읽거나 기록할 수 있다.

  • queued spin lock
    • lock_pending
      • pending 카운터
    • lock_slowpath
      • slowpath 진입 카운터
    • lock_use_node2
      • 동일한 cpu에서 context 전환하여 nest 하여 두 번째 spin-lock 요청한 경우
    • lock_use_node3
      • 동일한 cpu에서 context 전환하여 nest 하여 세 번째 spin-lock 요청한 경우
    • lock_use_node4
      • 동일한 cpu에서 context 전환하여 nest 하여 네 번째 spin-lock 요청한 경우
    • lock_no_node
      • 동일한 cpu에서 context 전환하여 nest 하여 다섯 번째 이상에서 spin-lock 요청한 경우 (노드 오버플로우 오류)
  • para-virtual queued spin lock
    • 생략
  • rw semaphore
    • 생략

 


Ticket 방식 spin-lock

다음 그림은 Ticket spin-lock에서 사용하는 arch_spinlock_t 타입과 멤버를 보여준다.

 

arch_spin_trylock() – ARM32

arch/arm/include/asm/spinlock.h

static inline int arch_spin_trylock(arch_spinlock_t *lock)
{
        unsigned long contended, res;
        u32 slock;

        prefetchw(&lock->slock);
        do {
                __asm__ __volatile__(
                "       ldrex   %0, [%3]\n"
                "       mov     %2, #0\n"
                "       subs    %1, %0, %0, ror #16\n"
                "       addeq   %0, %0, %4\n"
                "       strexeq %2, %0, [%3]"
                : "=&r" (slock), "=&r" (contended), "=&r" (res)
                : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
                : "cc");
        } while (res);

        if (!contended) {
                smp_mb();
                return 1;
        } else {
                return 0;
        }
}

Ticket 방식의 spin lock 획득을 시도한다. lock contension 이 없는 경우 1을 반환하고, lock contension이 있으면 0을 반환한다.

  • 코드 라인 6에서 lock->slock을 미리 캐시에 로드해둔다.
    • pldw 명령을 호출하여 해당 lock 변수를 미리 캐시에 로드한다.
    • 이렇게 미리 로드를 하는 이유는 ldrex 부터 strex 까지의 atomic operation 격으로 동작하는 critical section 영역의 코드를 동작시키는 동안 cpu clock을 적게 소모하게 하여 확률적으로 strex의 실패가 적어지게 유도한다
  • 코드 라인 7~17에서 lock->slock에 ticket 값을 증가시켜 atomic 기록한다.
    • 한 번이라도 실패하는 경우 contended가 1로 설정된다.
  • 코드 라인 19~24에서 lock contension 이 없는 경우 1을 반환하고, lock contension이 있으면 0을 반환한다.

 

티켓 기록 규칙
  • tickets.next 와 tickets.owner가 같으면 아무도 spin_lock을 획득하지 않은 상태(contended == 0)이므로 성공리에 spin_lock을 획득하는 조건이된다.
  • spin lock 획득이 성공하면 tickets.next를 1 증가시킨 후 lock 변수에 저장한다.
  • res가 0이 아닌 경우는 strex 명령으로 저장을 시도 했을 때 실패한 경우이므로 atomic operation을 완료하기 위해 다시 재시도한다.
  • tickets.owner와 tickets.next의 증감 규칙
    • spin_lock을 누군가 획득하는 경우 tickets.next가 1 증가된다.
    • spin_lock을 해제하는 경우 tickets.owner를 1 증가한다.
  • tickets 비트 규칙
    • next: msb 16bits – lock에서 증가(Asm에서 사용), overflow 시 onwer에 영향 없음)
    • owner: lsb 16bits – unlock에서 증가(C에서 사용)

 

어셈블러 문장을 좀 더 로직화하여 편하게 보기위해 바꿔보았다.

prefetch  &lock->slock
do {
        slock =  [&lock->slock]
        res   = #0
        contended = (tickets.owner != tickets.next)
        if (contended == 0)  {
                tickets.next++
                [&lock->slock] = slock  (결과값은 res에 저장)
        }
} while (res)

 

arch_spin_lock() – ARM32

arch/arm/include/asm/spinlock.h

/*
 * ARMv6 ticket-based spin-locking.
 *
 * A memory barrier is required after we get a lock, and before we
 * release it, because V6 CPUs are assumed to have weakly ordered
 * memory.
 */
static inline void arch_spin_lock(arch_spinlock_t *lock)
{
        unsigned long tmp;
        u32 newval;
        arch_spinlock_t lockval;
        
        prefetchw(&lock->slock);
        __asm__ __volatile__(
"1:     ldrex   %0, [%3]\n"
"       add     %1, %0, %4\n"
"       strex   %2, %1, [%3]\n"
"       teq     %2, #0\n"
"       bne     1b"
        : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
        : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
        : "cc"); 

        while (lockval.tickets.next != lockval.tickets.owner) { 
                wfe(); 
                lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
        }

        smp_mb();
}

Ticket 방식의 spin lock을 획득한다. 다른 스레드들이 spin lock을 획득한 경우 자기 차례가 올 때까지 spin 한다.

  • 코드 라인 7에서 lock->slock을 미리 캐시에 로드해둔다
  • 코드 라인 8~16에서 인라인 어셈블리 문장은 atomic operation으로 lock->tickets.next++ 를 수행한것이다.
  • 코드 라인 18~21에서 lock을 획득할 때까지 spin 한다.
    • 값을 증가시키기 전의 lockval.tickets.next와 lockval.tickets.owner가 다른 경우는 이 루틴을 들어오기 전에 이미 lock이 걸려 있었다는 경우로 루프를 돌며 대기 상태로 빠진다.
    • 대기 상태를 빠지는 방법은 다른 CPU에서 arch_spin_unlock()을 호출할 때 sev 명령을 수행하는데 이 이벤트를 수신하여 wfe(wait for event) 함수를 탈출한다.
    • wfe()를 탈출한 후에 lockval.tickets.owner를 갱신 받아 다시 while()문의 조건이 부합될 때까지 루프를 돌며 기다린다.
      • 자기 순번이 올 때까지 (다른 CPU에서 arch_spin_unlock()을 호출할 때 owner 값을 증가시켜 내가 가진 next 값과 동일할 때까지) 루프를 탈출할 수 없다.
  • 코드 라인 23에서 lock->slock의 기록 순서를 보호하기 위해 메모리 베리어를 수행한다.

 

어셈블러 문장을 좀 더 로직화하여 편하게 보기위해 바꿔보았다.

prefetch  &lock->slock
do {
        lockval = [&lock->slock]
        newval = lockval.next + 1
        [&lock->slock] = newval (strex에 대한 결과값은 tmp에 저장)
while (tmp)

 

arch_spin_unlock() – ARM32

arch/arm/include/asm/spinlock.h

static inline void arch_spin_unlock(arch_spinlock_t *lock)
{
        smp_mb();
        lock->tickets.owner++;
        dsb_sev();
}

Ticket 방식의 spin lock을 획득 해제한다.

  • 코드 라인 3에서 lock 변수의 로드 순서를 후 순위 보장하기 위해 메모리 베리어를 수행한다.
  • 코드 라인 4에서 owner 티켓을 증가시킨다.
    • 여기서 락 변수를 atomic operation을 이용하지 않고 대범(?)하게 증가 시킨 이유
      • 락 획득 시에는 CPU들 끼리 경쟁을 하므로 atomic inc가 중요하지만 락을 헤제 시에는 락 오너만이 해제하므로(경쟁을 하지 않음) atomic inc를 해야 할 이유가 없다.
  • 코드 라인 5에서 혹시 wfe 명령을 사용하여 대기 중인 spinner가 있으면 wfe(wait for event) 상태에서 빠져나오게 한다.

 

arch_spin_is_locked() – ARM32

arch/arm/include/asm/spinlock.h

static inline int arch_spin_is_locked(arch_spinlock_t *lock)
{
        return !arch_spin_value_unlocked(READ_ONCE(*lock));
}

unlock 상태를 판별하여 반대로 리턴한다.

  • READ_ONCE()
    • 인수의 사이즈에 따라 volatile 방식으로 읽어온다.

 

arch_spin_value_unlocked() – ARM32

arch/arm/include/asm/spinlock.h

static inline int arch_spin_value_unlocked(arch_spinlock_t lock)
{
        return lock.tickets.owner == lock.tickets.next;
}

락 카운터인 ticket.owner와 ticket.next가 같은 경우가 unlock 상태이다.

  • 기존 spinlock 구현 방식에서는 lock과 unlock시 lock 변수의 증/감 상태로 lock/unlock 상태를 알았었는데 ticket based spinlock이 구현되면서 lock/unlock 상태 여부는 tickets.owner와 tickets.next 값의 동일 여부로 확인할 수 있게 바뀌었다.

 

Ticket based spinlock 에서 ticket 값 추적

  • 3개의 CPU에서 중첩이 되어 2 개의 CPU에서 spinning을 하는 과정에서 lock 값이 변화되는 것을 보였다.
    • global lock 변수는 메모리에 위치한 lock 값
    • local lock 변수는 arch_spin_lock() 루틴에서 임시로 lock을 획득할 때까지만 사용하는 레지스터이다.
  • 1) 초기 lock 값을 next와 owner 100부터 시작하였다.
    • 100 번 spin_lock()과 spin_unlock()을 반복한 것과 동일하다.
  • 2) CPU-A가 lock이 없는 상태에서 lock 획득을 시도한다. 이 때 성공하면 ticket.next를 증가시키고 critical section에 진입한다.
  • 3) CPU-B가 arch_spin_try_lock()을 시도했다가 실패 한 후 arch_spin_lock()에 진입하여 local lock 변수에 자기 순번을 의미하는 global lock 변수의 ticket.next(101) 값을 받아오고 global lock 변수의 tickets.next는 102로 증가시킨 후 spinning(wfe를 포함하여)한다.
  • 4) CPU-C도 arch_spin_try_lock()을 시도했다가 실패 한 후 arch_spin_lock()에 진입하여 local lock 변수에 자기 순번을 의미하는 global lock 변수의 ticket.next(102) 값을 받아오고 global lock 변수의 tickets.next는 103으로 증가시킨 후 spinning(wfe를 포함하여)한다.
  • 5) CPU-A가 unlock하면서 global tickets.owner를 101로 증가시키고 sev를 호출한다. 이 때 CPU-B는 sev 명령에 의해 wfe 명령에서 깨어나고 global tickets.owner(101)가 자기 순번인 local tickets.next(101)가 동일하기 때문에 spin 루프를 빠져나가면서 critical section에 진입하게 된다.
  • 6) CPU-B도 unlock하면서 global tickets.owner를 102로 증가시키고 sev를 호출한다. 이 때 CPU-C는 sev 명령에 의해 wfe 명령에서 깨어나고 global tickets.owner(102)가 자기 순번인 local tickets.next(102)가 동일하기 때문에 spin 루프를 빠져나가면서 critical section에 진입하게 된다.
  • 7) CPU-C가 unlock하면서 global tickets.owner를 103으로 증가시키고  sev를 호출한다. 하지만 wfe에서 대기하고 있는 CPU가 없어서 깨어날 CPU가 없어서 무시된다.
    • tickets.next와 tickets.owner는 동일하게 103인 상태가 되며 이는 unlock 상태임을 의미한다.
  • 녹색 박스는 critical section을 의미하며 CPU-A, B, C 간에 서로 중첩되지 않음을 확인할 수 있다.

 


Lock 디버깅을 위한 Lockdep 코드 관련

lock_acquire()

kernel/locking/lockdep.c

void lock_acquire(struct lockdep_map *lock, unsigned int subclass,
                          int trylock, int read, int check,
                          struct lockdep_map *nest_lock, unsigned long ip)
{
        unsigned long flags;

        if (unlikely(current->lockdep_recursion))
                return;

        raw_local_irq_save(flags);
        check_flags(flags);

        current->lockdep_recursion = 1;
        trace_lock_acquire(lock, subclass, trylock, read, check, nest_lock, ip);
        __lock_acquire(lock, subclass, trylock, read, check,
                       irqs_disabled_flags(flags), nest_lock, ip, 0);
        current->lockdep_recursion = 0;
        raw_local_irq_restore(flags);
}
EXPORT_SYMBOL_GPL(lock_acquire);

 

lock_release()

kernel/locking/lockdep.c

void lock_release(struct lockdep_map *lock, int nested,
                          unsigned long ip)
{
        unsigned long flags;

        if (unlikely(current->lockdep_recursion))
                return;

        raw_local_irq_save(flags);
        check_flags(flags);
        current->lockdep_recursion = 1;
        trace_lock_release(lock, ip);
        __lock_release(lock, nested, ip);
        current->lockdep_recursion = 0;
        raw_local_irq_restore(flags);
}

 


Spin-lock 변형

 

Interrupt context에서의 spin-lock

dead-lock 이외에도 lock을 소유한 상태에서 interrupt 되고 동일한 lock을 호출하는 루틴으로 들어가는 경우 spin되어 빠져나오지 못하게 된다. 따라서 spinlock을 사용 시에는 최우선적으로 local cpu에 대해 interrupt를 disable할 수 있는 함수가 유용하다.

  • spin_trylock_irq()
  • spin_lock_irq()
  • spin_unlock_irq()
  • spin_lock_irqsave()
  • spin_unlock_irqrestore()

 

bottom-half context에서의 spin-lock

  • spin_trylock_bh()
  • spin_lock_bh()
  • spin_unlock_bh()

 


기타 매크로

define 문

arch/arm/include/asm/spinlock.h

#define WFE(cond)   __ALT_SMP_ASM("wfe" cond, "nop")
#define SEV     __ALT_SMP_ASM(WASM(sev), WASM(nop))

부트 타임에 SMP 시스템인 경우 wfe 및 sev 명령을 사용하게 하고, 그렇지 않은 경우 아무 일도 하지 않는 nop 명령을 수행하게 한다.

 

#define isb(option) __asm__ __volatile__ ("isb " #option : : : "memory")
#define dsb(option) __asm__ __volatile__ ("dsb " #option : : : "memory")
#define dmb(option) __asm__ __volatile__ ("dmb " #option : : : "memory")

 

__ALT_SMP_ASM()

arch/arm/include/asm/processor.h

#define __ALT_SMP_ASM(smp, up)                      	\
    "9998:  " smp "\n"                      			\
    "   .pushsection \".alt.smp.init\", \"a\"\n"      \
    "   .long   9998b\n"                    			\
    "   " up "\n"                       			\
    "   .popsection\n"

 


구조체 타입

spinlock_t

include/linux/spinlock_types.h

typedef struct spinlock {
        union {
                struct raw_spinlock rlock;

#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
                struct {
                        u8 __padding[LOCK_PADSIZE];
                        struct lockdep_map dep_map;
                };
#endif
        };
} spinlock_t;

lock 디버깅용 커널이 아니면  rlock 멤버 하나만 사용한다.

 

raw_spinlock_t

include/linux/spinlock_types.h

typedef struct raw_spinlock {
        arch_spinlock_t raw_lock;
#ifdef CONFIG_DEBUG_SPINLOCK
        unsigned int magic, owner_cpu;
        void *owner;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map dep_map;
#endif
} raw_spinlock_t;

디버그 용도이외에는 raw_lock 멤버 하나만 사용한다.

 

arch_spinlock_t – GENERIC(ARM64 포함)

include/asm-generic/qspinlock_types.h

typedef struct qspinlock {
        union {
                atomic_t val;

                /*
                 * By using the whole 2nd least significant byte for the
                 * pending bit, we can allow better optimization of the lock
                 * acquisition for the pending bit holder.
                 */
#ifdef __LITTLE_ENDIAN
                struct {
                        u8      locked;
                        u8      pending;
                };
                struct {
                        u16     locked_pending;
                        u16     tail;
                };
#else
                struct {
                        u16     tail;
                        u16     locked_pending;
                };
                struct {
                        u8      reserved[2];
                        u8      pending;
                        u8      locked;
                };
#endif
        };
} arch_spinlock_t;

queued spin-lock 구현에 사용한 arch_spnlock_t 타입이다.

  • val
    • 32bit spin-lock 값 (유니온 타입으로 아래 값들을 모두 포함한 값이다)
  • locked
    • 락 여부로 lock=1, unlock=0
  • pending
    • 두 번째 spin-lock 요청자가 대기 중인지 여부를 나타낸다. 1=pending, 0=no pending
  • locked_pending
    • 위의 locked 비트들과 pending 비트들이 같이 16바이트로 구성된 값이다.
  • tail
    • mcs queue에 대기하는 가장 마지막 cpu와 인덱스(tail_idx) 값이 담겨있다.
    • 단 cpu 번호는 1부터 시작하므로 cpu 0번의 경우 1 값이 사용된다. (based 1)

 

arch_spinlock_t – ARM32

arch/arm/include/asm-generic/qspinlock_types.h

typedef struct {
        union {
                u32 slock;
                struct __raw_tickets {
#ifdef __ARMEB__
                        u16 next;
                        u16 owner;
#else
                        u16 owner;
                        u16 next;
#endif
                } tickets;
        };
} arch_spinlock_t;

ticket spin-lock 구현에 사용한 arch_spnlock_t 타입이다.

  • 32bit slock과 tickets.next(msb-16bits)  + tickets.owner(lsb-16bits)가 union으로 묶여 있다.
  • 기존 slock에 대해 ticket을 구현하기 위해 slock을 둘로 나누어 사용하였다.
    • next: lock 획득 시 증가
    • owner: unlock 시 증가
    • lock을 여러 CPU가 요청한 경우 각 CPU들은 자신의 lock 값(lock 획득 당시의 ticket.next 증가 전 값)이 다른데 owner 값과 비교하여 같은 경우 자기 차례가 되어 lock을 획득 할 수 있다.

 

mcs_spinlock 구조체

kernel/locking/mcs_spinlock.h

struct mcs_spinlock {
        struct mcs_spinlock *next;
        int locked; /* 1 if lock acquired */
        int count;  /* nesting count, see qspinlock.c */
};

queued spin-lock 구현에 사용되는 mcs 노드에 대한 구조체이다.

  • *next
    • 다음 mcs spinlock 노드를 가리킨다.
    • locked
      • 1=lock, 0=unlock
    • count
      • 현재 cpu의 nest count로 최대 4

 

qnode 구조체

kernel/locking/qspinlock.c

/*
 * On 64-bit architectures, the mcs_spinlock structure will be 16 bytes in
 * size and four of them will fit nicely in one 64-byte cacheline. For
 * pvqspinlock, however, we need more space for extra data. To accommodate
 * that, we insert two more long words to pad it up to 32 bytes. IOW, only
 * two of them can fit in a cacheline in this case. That is OK as it is rare
 * to have more than 2 levels of slowpath nesting in actual use. We don't
 * want to penalize pvqspinlocks to optimize for a rare case in native
 * qspinlocks.
 */
struct qnode {
        struct mcs_spinlock mcs;
#ifdef CONFIG_PARAVIRT_SPINLOCKS
        long reserved[2];
#endif
};

queued spin-lock 구현에 사용되는 mcs 노드에 대하 구조체이다.

 

참고