Mutex for linux kernel space

이 문서에서는 리눅스 kernel space용 뮤텍스를 설명하였고, 만일 user space용 뮤텍스가 필요한 경우 이 문서가 아닌 futex 또는 pthread_mutex(POSIX 뮤텍스) 등을 참고해야 한다. 또한 “뮤텍스를 왜 사용하는가?” 또는 “뮤텍스의 사용 예” 등에 대해서는 이 문서에서 친절히 설명하지 않는다.
  • 최초 뮤텍스 구현은 Ingo Molnar에 의해 시작되었다.
  • 뮤텍스는 커널에서 학계 또는 유사한 이론 서적에서 발견된 ‘상호 배제(mutual exclusiion)’를 참고하는 것 뿐만아니라 공유 메모리에서의 직렬화(serialization)를 수행하는 특정 원시 locking도  참고한다.
  • 2006년에 sleeping 가능한 뮤텍스 락이 소개되었고 이는 바이너리 세마포어와 유사한 동작을 한다.
  • Sleeping lock은 critical section이 긴 경우(long hold times)  효과적이지만 그 특징으로 인해 인터럽트 핸들러에서 사용할 수 없다.
  • 뮤텍스를 빠른 시간 내에 얻을 수 없는 상황에서는 뮤텍스를 얻을 때 까지 현 태스크를 sleep(다른 태스크에 제어권을 넘긴다) 시킨다.
  • Generic Mutex Subsystem은 RT(Real Time) 커널에 있는 소스를 메인 스트림에 upstream 하였는데, 이 소스는 x86 계열의 샘플이고 다른 아키텍처로의 포팅이 쉽게될 수 있도록 가이드한다.
  • 뮤텍스는 실제 원래 설계 및 목적과 다르게 뮤텍스 오브젝트가 세마포어 오브젝트보다 크기가 커져서 CPU cache와 메모리 footprint에 용량이 더 소모되는 단점이 있다.
  • 2013.11월 kernel 디렉토리의 락 관련 화일이 모두 kernel/locking 디렉토리로 이전됨.
  • Kernel용 mutex는 recursive lock을 지원하지 않는다.
  • 성능을 개선하기 위해 성능순으로 3단계 패스(fastpath, midpath, slowpath)를 구성하였다.
  • Fastpath에서는 한 번의 atomic operation 으로 뮤텍스를 가장 빠르게 얻을 수 있되 락을 얻기 전에 unlock 상태의 조건에서만 사용 가능하다.
  • Optimistic spinning lock (adaptive spinning for mutex 또는 advanced MCS (queued) lock) 기술은  midpath로 non-sleepable busy wait 를 구현하였는데 SMP 시스템에서 특정 조건을 만족시키면 사용 가능하다.
  • 여러 문서들에서 뮤텍스가 sleeping lock이라고 소개되지만 실상은 조금 더 복잡하다. 실제 구현에서는 락 상태별로 몇 개의 조건을 성립하면 fastpath나 midpath를 사용하여 sleep 하지 않고 뮤텍스 락을 얻게될 수 있다. 그러나 그 조건에 부합되지 않은 경우 slowpath가 동작하며 이 때 sleep 한다.
  • Wait/Wound Deadlock Proof Mutex(ww-mutex)는 일반 뮤텍스와 달리 deadlock 회피가 가능하다.

 

사전 지식

 

뮤텍스를 분석하는 것은 매우 광범위한 리눅스 커널 지식이 요구된다. 뮤텍스 기술 역시 현재도 끊임없이 추가 기술이 적용되어 가면서 더 복잡해지고 있고  그럼에 따라 기존 도서나 인터넷 자료들에서 설명한 것과 약간씩 달라져가고 있다. 현재 버전의 뮤텍스 소스를 한 번에 모두 이해하기 힘들기 때문에 여러 번에 걸쳐 top-down 방식으로 위에서 아래 세부 루틴으로 단계별로 분석하는 것이 여러 가지로 이해하는데 두뇌 고통을 줄이는데 도움될 것이다. 동반해서 다음과 같이 사전에 알아둬야 할 항목들이 있다.
  • likely(), unlikely() 매크로
  • Atomic operation
    • ldrex, strex
  • Spin lock
    • spin_lock(), spin_unlock()
  • Preemption
    • preempt_enable(), preempt_disable()
  • RT(Real Time) Kernel
  • Interrupt
    • local_irq_enable(), local_irq_disable(), local_irq_save(), local_irq_restore()
  • Lockdep
    • lock_acquire(), lock_release()
  • Scheduler
    • preemption에 따른 priority 개념, sleep 개념정도만

 

뮤텍스 옵션

CONFIG_DEBUG_LOCKDEP

CONFIG_DEBUG_LOCK_ALLOC

  • 락뎁(LOCKDEP), 디버그 스핀락(DEBUG_SPINLOCK), 디버그 뮤텍스(DEBUG MUTEXES) 등을 선택하는 경우 락을 디버깅하기 위해 사용되는데 커널에 의해 락 메모리가 올바르게 해제(kfree(), kmem_cache_free(), free_pages(), vfree(), etc…)되지 않는 경우를 감지한다.
  • 해당 옵션이 동작하는 경우 mutex_lock() 매크로는 mutex_lock_nested() 함수를 호출하고 fastpath 과정을 건너뛴다.
#define mutex_lock(lock) mutex_lock_nested(lock, 0)
void __sched  mutex_lock_nested(struct mutex *lock, unsigned int subclass)
{
        might_sleep();
        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE,
                            subclass, NULL, _RET_IP_, NULL, 0);
}

 

CONFIG_LOCK_STAT

  • 락뎁(LOCKDEP), 디버그 스핀락(DEBUG_SPINLOCK), 디버그 뮤텍스(DEBUG MUTEXES), 디버그 락할당(DEBUG_LOCK_ALLOC) 등을 선택하는 경우 락을 디버깅하기 위해 락 사용 통계를 제공한다.
  •  참고: Documentation/locking/lockstat.txt

CONFIG_DEBUG_MUTEXES

  • 디버깅 기능을 사용할 수 있으며 이로 인해 약간의 제한을 갖지만 락 개체에 대해 분석할 수 있다.
  • 이 옵션을 사용하는 경우 fastpath 루틴은 사용할 수 없으며 항상 slowpath(midpath) 루틴만을 호출한다.
  • 이 옵션의 사용 유무에 따른 구현 루틴 구분
    • .enable:  <linux/mutex-debug.h>, “kernel/locking/mutex-debug.h”, <asm-generic/mutex-null.h>에 있는 헤더파일이 사용됨.
    • .disable: “kernel/locking/mutex.h”, <asm/mutex.h> -> <asm-generic/mutex-dec.h>

 

CONFIG_PROVE_LOCKING

  • lock 체인을 분석하고 잘못을 디텍트해 아래와 같이 경고 메시지를 출력한다.
    • circular_lock_dependency: “[ INFO: possible circular locking dependency detected ]”
    • bad_irq_lock_dependency: “[ INFO: XXXXX-safe -> YYYYY-unsafe lock order detected ]”
    • deadlock_bug: “[ INFO: possible recursive locking detected ]”
    • usage_bug: “[ INFO: inconsistent lock state ]”
    • irq_inversion_bug: “[ INFO: possible irq lock inversion dependency detected ]”
    • nested_lock_not_held: “[ BUG: Nested lock was not taken ]”
    • unlock_imbalance_bug: “[ BUG: bad unlock balance detected! ]”
    • contention_bug: “[ BUG: bad contention detected! ]”
    • held_locks_bug: “[ BUG: %s/%d still has locks held! ]”
  • 참고: Documentation/locking/lockdep-design.txt

Recursive Lock vs Non-Recursive Lock

  • POSIX 뮤텍스는 Recursive Lock을 허락한다. 그 의미는 같은 스레드에서 같은 뮤텍스에 대해 두 번 락을 할 수 있고 dead-lock이 되지 않는다는 뜻이다. 물론 두 번 락을 한 것에 대해 두 번 언락을 해야 한다.
  • POSIX 뮤텍스를 사용하는 pthread_mutex는 PTHREAD_MUTEX_RECURSIVE 속성을 사용하여 Recursive lock을 사용할 수 있다.
  • 커널에서 사용하는 뮤텍스는 Non-Recursive lock 이다. 같은 스레드에서 두 번 락을 하게되면 두 번째 lock을 획득하는 순간 멈춘다(hold)

Wait/Wound Deadlock Proof Mutex

  • Maarten이 개발한 패치로 deadlock을 회피하여 뮤텍스 락을 처리하는 알고리즘으로 ww-mutex라고도 불린다.

mutex8a

  • 커널 코드는 lock ordering과 Lockdep의 체킹 툴로 비정상 상황을 찾아낼 수 있다. 그럼에도 불구하고 deadlock 상황이 발생하는 경우 ww-mutex를 사용하는 경우 다음과 같이 dead-lock 회피 알고리즘이 동작한다.
    • 첫 번째 스레드가 락A를 소유하고, 두 번째 스레드가 락B를 소유한 상태에서
    • 첫 번째 스레드가 락B의 소유를 시도하고, 두 번째 스레드도 락A를 소유하려고 시도할 때 패치 적용전에는 deadlock 상황에 들어간다.
    • 패치가 적용되는 경우 첫 번째 스레드가 락B의 소유를 시도하다 블럭되고
    • 두 번째 스레드가 락A를 소유하려고 시도할 때 서로 교차되는 락을 발견하여 “wounded” 된다.
    • wounded 의미: 현재 기존에 홀드하고 있는 잠금을 해제하고 처음부터 다시 시작해야 한다라고 말하다(told)
    • 두 번째 스레드가 “wound” 처리를 위해 이미 소유하고 있던 락B를 풀었다가 다시 얻으려고 시도하지만
    • 두 번째 스레드가 락B를 풀자마자 그 시점에 첫 번째 스레드가 락B를 먼저 획득한다.
    • 따라서 두 번째 스레드는 락B를 얻지 못하고 블럭된다.
    • 첫 번째 스레드하고 critical section을 지난 후 락B와 락A를 차례대로 반납한다.
    • 첫 번째 스레드가 락B를 반납하자마자 두 번째 스레드가 락B를 소유하게 되고
    • 다시 첫 번째 스레드가 락A를 반납하자마자 역시 두 번째 스레드가 락A를 소유하게 된다.
    • 두 번째 스레드가 critical section을 통과한 후 락A와 락B를 연달아 해제하는 것으로 두 스레드의 모든 락의 사용이 완료된다.
  • 이의 구현을 위해 ww_mutex, ww_class, ww_acquire_ctx라는 구조체가 사용된다.
  • 사용 시
    • mutex_lock() 대신 별도의 ww_mutex_lock()을 사용한다.
    • 헤더 화일로 mutex.h 대신 ww-mutex.h를 사용한다.

Fastpath, Midpath, Slowpath Mutex Lock

성능 관련한 접근 방법으로 뮤텍스 획득을 위해 락상태에 따라 3가지의 방법 중 하나를 가장 빠른 fastpath 부터 midpath, slowpath 까지 선택 사용한다. 처음에 fastpath를 사용하여 뮤텍스가 획득되면 midpath나 slowpath 루틴을 사용하지 않는다. fastpath를 실패한 후 midpath 루틴으로 진입한 경우 역시 뮤텍스가 획득되면 slowpath 루틴을 사용하지 않는다.
 mutex2b
  • fastpath (Atomic Operation)
    • unlock 상태에서 lock을 획득할 때 fastpath를 사용할 수 있다.
    • atomic opeation(몇 개의 인스트럭션이며 아키텍처마다 다름)의 수행만으로 락/언락을 처리하므로 속도가 빠르다.
    • count 변수를 atomic하게 감소(또는 교환) 시키는 기법으로 락을 획득하는 방법이다.
    • 사전에 count 변수가 1(unlock)인 경우만 fastpath 방법으로 lock을 획득할 수 있다.
    • fastpath의 구현은 다음과 같이 두 개로 나뉘어 있다.
      • atomic 기반의 감소(decrement)/증가(increment) 명령으로 구현
      • atomic 기반의 교환(xchg) 명령으로 구현
    • fastpath는 아키텍처에 따라 일반적으로 dec/inc 기반의 사용이 적절한 경우 먼저 선택 사용된다.
  • midpath (Optimistic spinning)
    • optimistic spinning 또는 adaptive spinning for mutex라 불림
    • 이미 락이 걸려 있는 상태에서 아래 조건이 만족하면 빠르게 락을 얻을 가능성이 있어서 midpath를 사용한다.
      • 락 owner 태스크가 러닝중
      • 리스케쥴 요청이 없는 경우 (현재 태스크보다 더 높은 우선 순위(need_resched)로 동작할 태스크가 없는)
    • 뮤텍스 스피너(spinner)는 OSQ(MCS) lock(simple spinlock)을 사용하여 wait_queue에 큐잉(queued up)되고 블러킹되며 기존 spinlock처럼 busy-wait한다. 가장 먼저 큐에 인입된 오직 하나의 스피너만이 뮤텍스를 얻기 위해 경쟁할 수 있다.
    • 실제 구현 루틴에서 fastpath 및 slowpath와 다르게 midpath라는 함수명으로 구분되어 있지 않다.
      • midpath 루틴은 __mutex_lock_slowpath() 루틴의 전반부에서 호출됨
    • CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용하여 동작시킨다.
    • slowpath와 다르게 sleep/wake-up 사이클 없이 락을 획득한다.
    • 다음 문제도 해결 완료
      • cache line 충돌(contention) 문제 해결. (Spin on our own cache line)
      • ticket spinlock처럼 불공정(fairness) 문제를 해결.
      • 참고: Preventing overly-optimistic spinning | LWN.net
  • slowpath
    • mutex 획득의 3가지 방법 중 가장 느린 방법으로 mutex를 얻을 때까지 루프를 돌며 sleep하고 제어권을 다른 태스크에게 양도한다.
    • 디버깅 옵션(CONFIG_DEBUG_MUTEXES)을 사용하는 경우에는 항상 slowpath 방법만을 사용한다.
    • wait_lock spinlock을 사용하고, 태스크는 대기큐에 추가되고 unlock path에 의해 깨어날 때 까지 슬립된다.
    • 여러 태스크가 경쟁적으로 slowpath에서 mutex를 소유하려고 경쟁하는 경우에는 mutex_lock() 함수 접근 순서대로 처리되지 않는다. 각 태스크가 sleep 되었다가 wake-up 될 때마다 mutex를 얻을 수 있는지 조사하고 얻을 수 있을 때 mutex를 획득한다. 나머지 태스크는 다시 sleep되며 루프를 돈다.
      • sleep 기반에서 동작하는 경우에는 각 태스크가 깨어나는 타이밍이 달라서 mutex의 획득 순서에 대하여 공정하게 처리하기 위해 mutex_unlock() 함수 내부에서 대기자들 중 가장 먼저 대기했던 태스크를 먼저 깨울 수 있도록 지시하고 mutex를 release하여 최대한 해당 태스크가 빨리 mutex를 획득할 수 있도록 도와 성능을 높인다.

 

Fastpath, slowpath Mutex Unlock

성능 관련한 접근 방법으로 뮤텍스 해제를 위해 락상태에 따라 2가지의 방법 중 하나를 가장 빠른 fastpath와 slowpath를 선택 사용한다. 처음에 fastpath를 사용하여 뮤텍스가 해제되면 slowpath 루틴을 사용하지 않는다.

mutex-7b

 

Fastpath Mutex Lock 구현 방법

__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath); 루틴은 CPU 아키텍처에 따라 3가지의 루틴 중 하나를 빌드하여 사용된다.
  • dec/inc 기반의 fastpath lock (for ARM)
  • xchg 기반의 fastpath lock
  • fastpath 없이 곧장 slowpath(with midpath) lock

1) dec/inc 기반의 fastpath lock

  • ARM 아키텍처에서 사용
  • 먼저 atomic_dec_return()으로 이루어진 뮤텍스 fastpath 루틴을 동작시켜 count 값 감소 결과 값이 적은 확률로 값이 0보다 작으면 fail_fn(__mutex_lock_slowpath 함수) 호출.
    • atomic_dec_return() 결과값이 0이거나 음수로 예상되는데 이에 따른 의미
      • 0: unlock(1) -> lock(0)으로 상태가 바뀜.
      • 음수: 기존에 뮤텍스 락이 이미 걸린 상태에서 뮤텍스 락이 또 호출되어 중첩(2번 이상)된 경우
    • unlikely() 매크로는 성능향상 즉 optimization을 목적으로 주로 false가 예상되는 경우에 사용된다.
    • atomic_dec()이 값을 감소시키기 전의 값을 리턴하는데 반하여 atomic_dec_return()은 값을 감소 시킨 후의 값을 리턴한다.
include/asm-generic/mutex-dec.h
static inline void __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
    if (unlikely(atomic_dec_return(count) < 0))
        fail_fn(count);
}

 

2) xchg 기반의 fastpath lock

  • 첫 번째, count 값을 0(lock)으로 교체하기 전 count 값이…
    • 적은 확률로 1(unlock)이 아니면 즉, 이미 락이 있으면서 또 락이 호출된 경우 두 번째 조건 수행
    • 많은 확률로 1(unlock)인 경우 조건을 빠져나간다.
      •  fastpath로 싱글 뮤텍스 락 성립
  • 두 번째, count 값을 -1(double lock)로 교체하기 전 count 값이…
    • 많은 확률로 1(unlock)이 아니면 fail_fn(__mutex_lock_slowpath 함수) 호출 (중첩 락 성립)
    • 적은 확률로 1이면 조건을 빠져나간다.
      • fastpath로 싱글 뮤텍스 락 성립
      • 첫 번째 조건 수행 후 두 번째 조건으로 오기 전에 우선 순위가 높은 태스크가 preempt되어 unlock된 후 다시 이 태스크로 돌아와 계속 수행하게 되는 경우 이렇게 unlock 된 상태로 있을 수 있음.
      • 개개의 atomic operation은 수행 결과가 분리될 수 없지만 이렇게 개개의 atomic operation을 수행하는 경우에는 preemption 상황을 준비해야 한다.
include/asm-generic/mutex-xchg.h
static inline void  __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
    if (unlikely(atomic_xchg(count, 0) != 1))
        /* 
         * We failed to acquire the lock, so mark it contended
         * to ensure that any waiting tasks are woken up by the
         * unlock slow path.
         */
        if (likely(atomic_xchg(count, -1) != 1))
            fail_fn(count);
}

 

3) slowpath(midpath)로 직행

  • 항상 fastpath를 생략하고 fail_fn(__mutex_lock_slowpath 함수) 호출.
include/asm-generic/mutex-null.h
#define __mutex_fastpath_lock(count, fail_fn)       fail_fn(count)

 

mutex_lock()

  • mutex_lock()의 흐름은 아래와 같다.  CONFIG_DEBUG_LOCK_ALLOC 기능을 사용하는 경우 mutex_lock_nested() 함수에서 fastpath를 생략하고 곧장 __mutex_lock_common() 함수(midpath와 slowpath 루틴)로 진입한다.
  • 옵션이 없는 경우 ARM 아키텍처인경우 atomic_dec_return()을 호출하는 __mutex_fastpath_lock() 매크로를 호출하여 fastpath를 처음부터 호출하여 사용한다.

mutex

아래 그림은 mutex를 획득하는 조건에 대해서 흐름도로 간략히 표현하였다.

mutex-9

kernel/locking/mutex.c
/**
 * mutex_lock - acquire the mutex
 * @lock: the mutex to be acquired
 *
 * Lock the mutex exclusively for this task. If the mutex is not
 * available right now, it will sleep until it can get it.
 *
 * The mutex must later on be released by the same task that
 * acquired it. Recursive locking is not allowed. The task
 * may not exit without first unlocking the mutex. Also, kernel
 * memory where the mutex resides must not be freed with
 * the mutex still locked. The mutex must first be initialized
 * (or statically defined) before it can be locked. memset()-ing
 * the mutex to 0 is not allowed.
 *
 * ( The CONFIG_DEBUG_MUTEXES .config option turns on debugging
 *   checks that will enforce the restrictions and will also do
 *   deadlock debugging. )
 *
 * This function is similar to (but not equivalent to) down().
 */
void __sched mutex_lock(struct mutex *lock)
{
        might_sleep();
        /*
         * The locking fastpath is the 1->0 transition from
         * 'unlocked' into 'locked' state.
         */
        __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
        mutex_set_owner(lock);
}
mutex_lock() 부분 주석을 간단히 설명한다.
  • 현재 태스크를 위해 베타적으로 뮤텍스를 lock한다. 만일 뮤텍스가 지금 당장 가능하지 않으면 소유할 때 까지 sleep 한다.
  • 뮤텍스는 나중에 뮤텍스를 획득한 동일한 태스크에 의해 release 되야 한다.
  • Recursive lock은 허용하지 않는다.
  • 태스크는 처음 뮤텍스의 unlocking 없이 종료(exit) 할 수 없다.
  • 또한 뮤텍스가 lock된 채로 뮤텍스가 자리한 커널 메모리가 해제(free)될 순 없다.
  • 처음 뮤텍스를 lock하기 전에 먼저 초기화(또는 정적으로 defined) 되어야 한다.
  • memset()으로 뮤텍스를 0으로 초기화 하는 것은 허락되지 않는다.
  • CONFIG_DEBUG_MUTEXES: 이 옵션을 동작시키면(turn on) 제약 사항(fastpath를 수행하지 않는)이 있으며 dead-lock 디버깅 등을 수행할 수 있다.
  • mutex_lock() 함수는 완전히 동일하진 않지만 down() 함수와 유사하다.

소스 루틴을 살펴보면…

  • might_sleep()은 리스케쥴 요청을 받았을 때 sleep하는 preemption points이다. (참고: preemption, might_sleep())
    • 이 루틴은 CONFIG_PREEMPT_VOLUNTARY 옵션을 사용하는 경우만 호출된다.
  • __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
    • fastpath를 호출하고 실패 시 slowpath(midpath)로 구현된 mutex lock 루틴을 호출한다.
    • 아키텍처마다 약간 다르지만 ARM 아키텍처에서는 atomic_dec_return()을 사용하는 fastpath 루틴이다.
    • 위에서 자세히 설명하였었지만 fastpath는 락 count 값이 1(unlock) -> 0(lock)으로 상태가 변환될 때만 성공이다.
  • 뮤텍스 락에 owner를 설정해둔다.
    • CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용한 경우에만 owner를 설정한다.

__mutex_lock_slowpath()

kernel/locking/mutex.c
__visible void __sched __mutex_lock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
                            NULL, _RET_IP_, NULL, 0);
}
  • container_of()는 구조체의 멤버변수를 사용하여 구조체의 처음을 알아내는 함수
  • __mutex_lock_common()에는 뮤텍스 락에 대한 midpath와 slowpath 로직이 담겨있다.

 

__mutex_lock_common()

__mutex_lock_common() 함수내에서 midpath와 slowpath 루틴이 순차적으로 구현되어 있다.
kernel/locking/mutex.c
static __always_inline int __sched __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
                    struct lockdep_map *nest_lock, unsigned long ip,
                    struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
        struct task_struct *task = current;
        struct mutex_waiter waiter;
        unsigned long flags;
        int ret;

        preempt_disable();
        mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

        if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
                /* got the lock, yay! */
                preempt_enable();
                return 0;
        }
처음 시작하는 부분이 midpath(optimistics spin) 루틴으로 먼저 수행된다. midpath 루틴은 non-sleepable spin-wait(루프)으로 구현된다.
  • preempt를 disable하여 리스케쥴링하지 않도록 하므로 슬립되지 않는다.
  • mutex_acquire_nest()은 락 추척을 위한 디버깅 기능을 위해 호출한다.
  • midpath인 mutex_optimistic_spin()으로 락 획득이 성공하면 preempt를 enable 한 후 성공리에 빠져나간다.
  • 인수의 마지막에 use_ww_ctx가 있는데 보통 0이며, ww_mutex를 사용하는 경우엔 1이다.
    • mutex_lock()을 사용하지 않고 dead-lock avoidance 기능이 있는 ww_mutex_lock()을 사용한 경우에 해당
        spin_lock_mutex(&lock->wait_lock, flags);

        /*
         * Once more, try to acquire the lock. Only try-lock the mutex if
         * it is unlocked to reduce unnecessary xchg() operations.
         */
        if (!mutex_is_locked(lock) && (atomic_xchg(&lock->count, 0) == 1))
                goto skip_wait;
 midpath에서 lock 획득이 실패하였으면 slowpath 루틴에서 락을 획득한다. slowpath 루틴은 non-busy(sleepable) wait으로 구현된다.
  • spin_lock_mutex(): 인터럽트되지 않도록 하면서 뮤텍스의 wait_lock에 spinlock을 건다.
    • spin_lock을 거는 이유는 wait_list 등의 자료 구조를 보호하면서 access를 하고 동시에 두 개 이상의 태스크가 접근할 때 조금이라도 먼저 접근한 태스크에게 mutex를 획득할 수 있는 기회를 주기 위함이다.
  • 본격적으로 slowpath를 수행하기 전에 한 번 더 midpath 처럼 락을 얻을 수 있는지 시도한다.
    • !mutex_is_locked():  만일 뮤텍스가 unlock 상태이고(이 루틴까지 오면서 lock 상태가 변해 unlock(release)된 경우에 해당)
    • atomic_xchg(): lock(0)을 획득하였을 때 변경 전 count 값이 1(unlock)이면 락을 정상적으로 획득한 것이므로 skip_wait 라벨로 점프한다.
        debug_mutex_lock_common(lock, &waiter);
        debug_mutex_add_waiter(lock, &waiter, task_thread_info(task));

        /* add waiting tasks to the end of the waitqueue (FIFO): */
        list_add_tail(&waiter.list, &lock->wait_list);
        waiter.task = task;

        lock_contended(&lock->dep_map, ip);
코드는 다음과 같이 동작한다.
  • debug_mutex_lock_common(): 로컬 변수인 waiter를 초기화한다.
  • debug_mutex_add_waiter(): 현재 태스크의 blocked_on에 위에서 초기화한 waiter를 대입한다.
  • waiter(대기자)를 wait_list(FIFO wait queue)의 마지막에 추가한다.
    • wait_list(락의 대기 FIFO )의 마지막에 waiter를 추가한다.
    • waiter.task 현재 task를 대입
  • lock_contended() 함수는 CONFIG_LOCK_STAT이 설정된 상태면 “lock_contension_bug”를 디텍트해 출력할 수 있다.

아래 그림은 wait_list에 waiter가 추가되는 과정을 나타냈다.

  • 처음엔 wait_list가 비어있고 waiter(대기자)가 하나도 없는 경우는 lock owner만이 mutex를 소유한 상태이므로 count=0이다.
  • 두 번째에서 task-A가 wait_list의 뒤에 waiter(대기자)로 추가되었다.
  • 세 번째와 네 번째에서 task-B와 task-C가 순차적으로 wait_list의 뒤에waiter(대기자)로 추가되었다.

mutex5c

 

        for (;;) {
                /*
                 * Lets try to take the lock again - this is needed even if
                 * we get here for the first time (shortly after failing to
                 * acquire the lock), to make sure that we get a wakeup once
                 * it's unlocked. Later on, if we sleep, this is the
                 * operation that gives us the lock. We xchg it to -1, so
                 * that when we release the lock, we properly wake up the
                 * other waiters. We only attempt the xchg if the count is
                 * non-negative in order to avoid unnecessary xchg operations:
                 */
                if (atomic_read(&lock->count) >= 0 &&
                    (atomic_xchg(&lock->count, -1) == 1))
                        break;

                /*
                 * got a signal? (This code gets eliminated in the
                 * TASK_UNINTERRUPTIBLE case.)
                 */
                if (unlikely(signal_pending_state(state, task))) {
                        ret = -EINTR;
                        goto err;
                }

                if (use_ww_ctx && ww_ctx->acquired > 0) {
                        ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
                        if (ret)
                                goto err;
                }
               __set_task_state(task, state);

                /* didn't get the lock, go to sleep: */
                spin_unlock_mutex(&lock->wait_lock, flags);
                schedule_preempt_disabled();
                spin_lock_mutex(&lock->wait_lock, flags);
        }
slowpath  루프 처음마다 락 획득을 시도하고 락 획득이 성공할 때 까지  대기 상태로 들어가는 것을 반복한다.
  • SMP의 여러 코어가 경쟁적으로 atomic_xchg()를 서로 호출하는 경우 각 코어의 캐시에서 update를 하면서 cache chherent 알고리즘을 통해 많은 부하가 생긴다. 이 때 발생하는 cache bouncing 문제를 해결 하기 위해 atomic_xchg()를 호출하기 전에 atomic_read()를 하도록 루틴이 추가되었다.
    • count가 0보다 같거나 크다는 말은 기존에 걸려 있던 락이 풀려 더 이상의 대기자가 없다(락이 하나만 있거나 unlock된 상태)는 뜻이다.
    • atomic_xchg()의 리턴값이 1이면(atomic_xchg가 수행되기 전의 count 값) unlock 상태였다는 뜻이다. 이렇게 lock을 획득하면 루프를 빠져나온다.
  • 작은 확률로 시그널 상태에 따라 시그널이 펜딩된 상태라면 락 획득 실패되며 에러 처리 루틴으로 점프
    • TASK_INTERRUPTIBLE 이거나 TASK_WAKE_KILL 인 상태에서 시그널이 펜딩된경우 true
    • TASK_UNINTERRUPTIBLE 케이스인 경우는 코드를 무시하면 됨.
  • ww_mutex를 사용하면서 ww_ctx->acquired 값이 0보다 크면
    • __ww_mutex_lock_check_stamp()를 호출하여 데드락 상황을 체크하여 true면 에러 처리 루틴으로 점프
  • __set_task_state(task, state)
    • 태스크 상태를 인수로 받은 상태로 변경한다.
      • 인수로 받는 상태는 다음과 같다.
        • mutex_lock(), ww_mutex_lock() -> TASK_UNINTERUPTIBLE
        • mutex_lock_interruptable(), ww_mutex_interruptable() -> TASK_INTERRUPTIBLE
        • mutex_lock_killable() -> TAKS_KILLABLE
  • spin lock을 풀고 슬립한다. 슬립이 깨면 다시 spin lock을 얻은 후 루프 처음으로 되돌아가 다시 뮤텍스 획득을 시도한다.
        __set_task_state(task, TASK_RUNNING);

        mutex_remove_waiter(lock, &waiter, current_thread_info());
        /* set it to 0 if there are no waiters left: */
        if (likely(list_empty(&lock->wait_list)))
                atomic_set(&lock->count, 0);
        debug_mutex_free_waiter(&waiter);
정상적으로 뮤텍스를 획득하여 루프에서 벗어난 경우 (여전히 spin lock은 걸려있는 상태)
  • 태스크 상태를 TASK_RUNNING으로 바꾼다.
    • slowpath에서 sleep 한 후 깨어나 lock을 얻을 때까지 문제없이 잘 동작하려면 명시적으로 태스크를 TASK_RUNNING 상태로 설정해야 한다.
  • mutex_remove_waiter()
    • 현재 태스크의 blocked_on을 null로 초기화 한다. (초기화전에 waiter가 지정)
    • waiter들끼리 연결이 되어 있는 구조에서 자신의 waiter 노드만 제거하고 초기화 한다.
    • waiter->task를 null로 초기화 한다.
  • 많은 확률로 락의 wait_list(대기큐)가 비어 있으면(대기자들이 하나도 없으면) atomic하게 count를 0(single lock, 대기자 없는 락 상태)으로 바꾼다.
  • debug_mutex_free_waiter()
    • waiter 구조체 정보를 깨끗이 지운다.
skip_wait:
        /* got the lock - cleanup and rejoice! */
        lock_acquired(&lock->dep_map, ip);
        mutex_set_owner(lock);

        if (use_ww_ctx) {
                struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
                ww_mutex_set_context_slowpath(ww, ww_ctx);
        }

        spin_unlock_mutex(&lock->wait_lock, flags);
        preempt_enable();
        return 0;

err:
        mutex_remove_waiter(lock, &waiter, task_thread_info(task));
        spin_unlock_mutex(&lock->wait_lock, flags);
        debug_mutex_free_waiter(&waiter);
        mutex_release(&lock->dep_map, 1, ip);
        preempt_enable();
        return ret;
}
fastpath를 제외하고 midpath나 slowpath에서 정상적으로 락을 획득한 경우 skip_wait: 루틴에 도착한다.
  • lock_acquired():  lockdep 디버그 코드로 트레이스 목적이며, lock_contention_bug도 디텍트하여 출력한다.
  • 락 owner를 현재 태스크로 설정
  • ww_mutex를 동작시킨 경우
    • container_of()는 구조체의 멤버변수를 사용하여 구조체의 처음을 알아내는 함수
      • container_of 매크로 | TickTalk
      • ww_mutex_set_context_slowpath()는 deadlock 회피용 slowpath 락으로 분석 생략
  • spin_unlock_mutex(): spin lock을 release 한다.
    • 락 디버그 옵션에서는 리스케쥴 필요한 경우 슬립도 한다.
  • 마지막으로 preempt_enable()을 하고 종료
  • err: 레이블은 뮤텍스 획득 실패  후 처리되는 루틴으로 분석 생략

아래 그림은 wait_list에 waiter가 삭제되는 과정을 나타냈다.

  • 처음엔 wait_list에 waiter(대기자)가 3개가 있는 상태이고 이 때 mutex lock count=-3이다.
    • lock count가 음수인 경우 대기자가 있다는 뜻인데 이 숫자를 파악하여 대기자의 수를 알아낼 수는 없다.
    • 정확한 대기자의 수는 wait_list에 존재하는 waiter 들의 수이다.
  • 두 번째엔 task-A가 wait_list의 선두에서 삭제되고 mutex를 얻게된다.
  • 세 번쨰엔 task-B가 wait_list의 선두에서 삭제되고 mutex를 얻게된다.
  • 네 번째에 최종 task-C가 wait_list에서 빠져나가면서 mutex를 얻게되고 그 때에 mutex lock count=0이다.

mutex-6d

 

midpath (optimistic spinning)

 midpath로 CONFIG_MUTEX_SPIN_ON_OWNER 옵션이 설정된 경우 midpath (adaptive spinning 또는 optimistic spinning) 루틴이 동작한다.
  • owner가 다른 CPU에서 러닝(running) 중이면서 현재 태스크가 리스케쥴할 필요가 없을 때 현재 태스크에서 스핀하며 락을 획득하려 시도한다.(busy(non-sleepable ) wait)
  • owner가 러닝중이라면 그것은 곧 락을 release 할 것이라는 근거이다.
  • 이러한 뮤텍스 구현 방식은 lock field에서 owner를 atomic하게 추적하지 않고 non-atomic하게 추적한다.
    • 당연히 fastpath 및 midpath에서는 빠른 속도를 내는 것이 목적이므로 락에 대한 추적을 하지 않는다. (락 추적 목적으로 관련 기록  루틴을 호출하지 않는다)
  • Optimistic spinning에서는 모든것을 직렬화(serialize)하는 wait_lock에 기반하기 때문에 DBUG_MUTEXES 옵션 즉 디버깅을 위해 잘 동작하지 않는다.
  • 뮤텍스 스피너들은 OSQ(MCS lock)에 큐 up된다. 그래서 오직 하나의 spinner만 뮤텍스에 대해 경쟁한다. 반면에 뮤텍스가 스피닝하지 않을 것 같으면 lock/unlock 오버헤드를 통과하므로 아무 문제 없다.
  • 락을 획득하게 되면 true를 리턴하고, 그렇지 않은 경우 false를 리턴하며 slowpath 로 가서 sleep 할 필요가 있다. 
  • Optimistic spinning은 MCS(Mellor-Crummey and Scott)가 제안한 FIFO queue 구조의 ticket spin lock과 유사한 구조로 구현되었다. (MCS like)

mutex_optimistic_spin()

SMP를 위해 다음의 조건이 성립되면 빠르게 락을 얻을 수 있다고 판단하여 이 함수(midpath)에서 슬립하지 않고 스핀(루프)을 하며 락을 얻기 위해 시도한다.
  • 락 owner가 다른 CPU에서 현재 러닝 중
    • 락 owner가 다른 CPU에서 돌고 있다는 뜻은 곧 critical section이 끝나고 락이 release 될 가능성이 높다는 뜻
  • 현재 태스크의 preemption 요청이 없음
    • 리스케쥴 요청이 없다는 뜻으로 결국 락 오너가 락을 release 하자 마자 현재의 태스크에서 곧 락을 얻을 수 있음을 의미
kernel/locking/mutex.c
/*
* Optimistic spinning.
*
* We try to spin for acquisition when we find that the lock owner
* is currently running on a (different) CPU and while we don't
* need to reschedule. The rationale is that if the lock owner is
* running, it is likely to release the lock soon.
*
* Since this needs the lock owner, and this mutex implementation
* doesn't track the owner atomically in the lock field, we need to
* track it non-atomically.
*
* We can't do this for DEBUG_MUTEXES because that relies on wait_lock
* to serialize everything.
*
* The mutex spinners are queued up using MCS lock so that only one
* spinner can compete for the mutex. However, if mutex spinning isn't
* going to happen, there is no point in going through the lock/unlock
* overhead.
*
* Returns true when the lock was taken, otherwise false, indicating
* that we need to jump to the slowpath and sleep.
*/
static bool mutex_optimistic_spin(struct mutex *lock,
                                  struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
        struct task_struct *task = current;

        if (!mutex_can_spin_on_owner(lock))
                goto done;

        /*
         * In order to avoid a stampede of mutex spinners trying to
         * acquire the mutex all at once, the spinners need to take a
         * MCS (queued) lock first before spinning on the owner field.
         */
        if (!osq_lock(&lock->osq))
                goto done;
소스를 설명하자면
  • !mutex_can_spin_on_owner()
    • 리스케쥴 요청이 있거나 lock의 owner가 러닝중이 아니면 midpath 불성립
  • osq_lock을 획득하지 못하면 midpath 불성립
    • 동시에 midpath에 진입하는 cpu들이 서로 lock을 획득하기 위해 쇄도(반복적인 시도로 인해)하는 것을 방지하기 위해서 serialization을 하여 가장 먼저 OSQ(MCS) lock을 획득한 cpu가 성공적으로 리턴된다.
    • OSQ(MCS) lock은 FIFO에 의한 큐방식으로 가장 처음에 진입한 spinner만이 뮤텍스에 대해 경쟁을 할 권한을 가지고 있다.
    • 따라서 midpath에서도 뮤텍스 획득에 대한 공정성에 접근하였다.
    • FIFO 큐에서 대기하고 있는 cpu 들은 대기하면서 만일 리스케쥴 요청이 있는 경우는 실패로 리턴된다.
    • 물론 OSQ(MCS) lock을 획득한 cpu가 당장 mutex lock을 얻게되는 것이 아니라 이미 mutex lock을 가진 owner가 mutex unlock될 때 까지 spin된 후에 얻게될 것이다.

mutex-3a

        while (true) {
                struct task_struct *owner;

                if (use_ww_ctx && ww_ctx->acquired > 0) {
                        struct ww_mutex *ww;

                        ww = container_of(lock, struct ww_mutex, base);
                        /*
                         * If ww->ctx is set the contents are undefined, only
                         * by acquiring wait_lock there is a guarantee that
                         * they are not invalid when reading.
                         *
                         * As such, when deadlock detection needs to be
                         * performed the optimistic spinning cannot be done.
                         */
                        if (ACCESS_ONCE(ww->ctx))
                                break;
                }

                /*
                 * If there's an owner, wait for it to either
                 * release the lock or go to sleep.
                 */
                owner = ACCESS_ONCE(lock->owner);
                if (owner && !mutex_spin_on_owner(lock, owner))
                        break;

                /* Try to acquire the mutex if it is unlocked. */
                if (mutex_try_to_acquire(lock)) {
                        lock_acquired(&lock->dep_map, ip);

                        if (use_ww_ctx) {
                                struct ww_mutex *ww;
                                ww = container_of(lock, struct ww_mutex, base);

                                ww_mutex_set_context_fastpath(ww, ww_ctx);
                        }

                       mutex_set_owner(lock);
                        osq_unlock(&lock->osq);
                        return true;
                }

                /*
                 * When there's no owner, we might have preempted between the
                 * owner acquiring the lock and setting the owner field. If
                 * we're an RT task that will live-lock because we won't let
                 * the owner complete.
                 */
                if (!owner && (need_resched() || rt_task(task)))
                        break;

                /*
                 * The cpu_relax() call is a compiler barrier which forces
                 * everything in this loop to be re-loaded. We don't need
                 * memory barriers as we'll eventually observe the right
                 * values at the cost of a few extra spins.
                 */
                cpu_relax_lowlatency();
        }
spinning하며(busy wait, sleep하지 않고 루프를 돌며) 락을 획득할 수 있는 조건을 얻으면 리턴한다. 루프를 벗어나면 midpath 실패 처리 루틴으로 빠져나간다.
  • usw_ww_mutex: ww_mutex 를 사용하는 경우이면서 deadlock 디텍션이 된경우(ww->ctx가 true) 루프를 벗어나 에러 처리를 한다.
  • owner 값을 가져와서(volatile 방식으로 메모리 접근) owner가 존재하면서 owner가 스핀하지 않으면 루프를 벗어난다. (midpath 실패)
  • 뮤텍스 획득을 시도하여 성공하면 다음과 같이 처리하고 리턴한다.
    • lock_acquired():  lockdep 디버그 코드로 트레이스 목적이며, lock_contention_bug도 디텍트하여 출력한다.
    • ww_mutex_ctx: ww_mutex를 사용하는 경우 ww_mutex_set_context_fastpath()를 호출한다. (분석 생략)
    • 뮤텍스 오너(owner)를 설정
    • osq(optimistic spinner queue)에서 언락(MCS queue에서 노드 제거)
  • owner가 없으면서 리스케쥴링 요청이 있거나 현재 태스크가 RT task 인 경우 루프를 벗어난다.
    • 락은 걸려있는 상태인데 owner가 없다는 뜻은 owner가 lock을 획득하고 owner필드를 설정하려고 하는 사이 즉 아직 owner field의 설정을 다 못한 사이인 경우라면 현재 태스크가 선점했을 수도 있다고 판단하여 루프를 벗어난다.(midpath 실패)
  • cpu_relax_lowlatency(): 다시 스핀하기 전에 아키텍처에 따라 메모리 배리어를 동작시킨다.
    • cpu_relax()
      • 일반적인 ARMv7 SMP 및 UP 시스템은 컴파일러 배리어인 barrier() 함수만 호출한다.
      • ARMv7 SMP 코어중 Cortex-A9 r2p0 이전 리비전 및 ARMv6 SMP 아키텍처는 자동 store buffer drain 기능이 없어 dmb_ish 명령을 수행하여야 한다.
        osq_unlock(&lock->osq);
done:
        /*
         * If we fell out of the spin path because of need_resched(),
         * reschedule now, before we try-lock the mutex. This avoids getting
         * scheduled out right after we obtained the mutex.
         */
        if (need_resched()) {
                /*
                 * We _should_ have TASK_RUNNING here, but just in case
                 * we do not, make it so, otherwise we might get stuck.
                 */
                __set_current_state(TASK_RUNNING);
                schedule_preempt_disabled();
        }

        return false;
}
루프를 벗어나면 midpath 처리가 실패한 것으로 처리되어 false를 리턴한다.
  • osq(optimistic spinner queue)에서 언락(MCS queue에서 노드 제거)
  • 리스케쥴 요청이 있는 경우 태스크 상태를 러닝으로 바꾸고 preemption되어 리스케쥴링이 되지 않도록 막는다.

 

mutex_can_spin_on_owner()

/*
* Initial check for entering the mutex spinning loop
*/
static inline int mutex_can_spin_on_owner(struct mutex *lock)
{
        struct task_struct *owner;
        int retval = 1;

        if (need_resched())
                return 0;

        rcu_read_lock();
        owner = ACCESS_ONCE(lock->owner);
        if (owner)
                retval = owner->on_cpu;
        rcu_read_unlock();
        /*
         * if lock->owner is not set, the mutex owner may have just acquired
         * it and not set the owner yet or the mutex has been released.
         */
        return retval;
}
리스케쥴 요청이 없으면서 lock의 owner가 러닝중이면(또는 owner가 설정되지 않은 경우) true 리턴
  • 리스케쥴 요청 플래그가 설정되어 있으면 false(0) 리턴
  • owner가 설정되어 있지 않으면 true(1) 리턴
  • owner가 설정되어 있으면 owner->on_cpu 값을 리턴
    • on_cpu: 1=task running, 0=task not running

 

mutex_spin_on_owner()

아래 코드는 2015년 2~4월에 걸쳐 함수가 일부분 변경된 최신 코드이다.(버그 처리와 리팩토링)
락의 owner 태스크가 동작(슬립하거나 다른 태스크로 전환되지 않는)하는 동안 spinning을 한다. midpath의 바람직한 목적은 true로 리턴하여 이 함수를 호출한 루틴에서 락을 획득하려고 시도하는 것이다. 당연히 false로 리턴하면 midpath가 성립되지 않아 slowpath로 넘어갈 것이다.
/*
 * Look out! "owner" is an entirely speculative pointer
 * access and not reliable.
 */
static noinline bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{
    bool ret = true;
    rcu_read_lock();
    while (lock->owner == owner) {
        /*
         * Ensure we emit the owner->on_cpu, dereference _after_
         * checking lock->owner still matches owner. If that fails,
         * owner might point to freed memory. If it still matches,
         * the rcu_read_lock() ensures the memory stays valid.
         */
        barrier();
        if (!owner->on_cpu || need_resched()) {
            ret = false;
            break;
        }
        cpu_relax_lowlatency();
    }
    rcu_read_unlock();
    return ret;
}

루프 전체가 rcu_read_lock()에 들어있으므로 현재 태스크가 슬립되지 않도록 하였고 빠르게 처리가 될 것을 가정하고 작성된 코드이다.

  • 락 owner 태스크의 critical section 내부에서 슬립하지 않은 채 많은 시간 소모를 하는 경우 문제가 된다. (락 owner 태스크에서 preemption 되면 차라리 실패가되어 이 루프를 벗어날 수 있다)
  • lock의 owner와 인수 owner가  같은 경우에만 루프를 돈다. 다른 경우 락 획득 준비가 끝났기 때문에 ture를 리턴한다.
    • 락 owner 태스크에서 critical section을 다 통과한 후 락을 release하면 락의 owner에 null이 들어가는 것을 이용하여 락 사용완료를 기다리는 스핀(루프)이다.
    • lock->owner를 null하고 비교하지 않는 이유는 lock->owner에 다른 태스크가 들어갈 확률도 있기 때문이다. (다른 태스크에서 lock을 획득한 경우)
  • owner의 on_cpu(태스크 가동 여부 플래그)가 false 이거나 리스케쥴 요청이 있는 경우 루프를 벗어나고 false를 리턴한다.

 

아래 두 개의 함수 코드는 변경되었으므로 상단 부분의 mutex_spin_on_owner() 함수 코드를 참고해야 한다.

static noinline  int mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{
        rcu_read_lock();
        while (owner_running(lock, owner)) {
                if (need_resched())
                        break;

                cpu_relax_lowlatency();
        }
        rcu_read_unlock();

        /*
         * We break out the loop above on need_resched() and when the
         * owner changed, which is a sign for heavy contention. Return
         * success only when lock->owner is NULL.
         */
        return lock->owner == NULL;
}

static inline bool owner_running(struct mutex *lock, struct task_struct *owner)
{
        if (lock->owner != owner)
                return false;

        /*
         * Ensure we emit the owner->on_cpu, dereference _after_ checking
         * lock->owner still matches owner, if that fails, owner might
         * point to free()d memory, if it still matches, the rcu_read_lock()
         * ensures the memory stays valid.
         */
        barrier();

        return owner->on_cpu;
}

 

osq_lock()

kernel/locking/osq_lock.c

bool osq_lock(struct optimistic_spin_queue *lock)
{
        struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
        struct optimistic_spin_node *prev, *next;
        int curr = encode_cpu(smp_processor_id());
        int old;

        node->locked = 0;
        node->next = NULL;
        node->cpu = curr;

        old = atomic_xchg(&lock->tail, curr);
        if (old == OSQ_UNLOCKED_VAL)
                return true;

        prev = decode_cpu(old);
        node->prev = prev;
        ACCESS_ONCE(prev->next) = node;

        /*  
         * Normally @prev is untouchable after the above store; because at that
         * moment unlock can proceed and wipe the node element from stack.
         *
         * However, since our nodes are static per-cpu storage, we're
         * guaranteed their existence -- this allows us to apply
         * cmpxchg in an attempt to undo our queueing.
         */

        while (!ACCESS_ONCE(node->locked)) {
                /*
                 * If we need to reschedule bail... so we can block.
                 */
                if (need_resched())
                        goto unqueue;

                cpu_relax_lowlatency();
        }
        return true;
  • struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
    • per-cpu 데이터인 즉 현재 cpu에 매치되는 osq_node 구조체 포인터를 알아온다.
  • int curr = encode_cpu(smp_processor_id());
    • OSQ(MCS) lock에서 사용하는 cpu 번호는 1번 부터 시작하므로 0번 부터 시작하는 현재 CPU 번호에 1을 더한다.
    • curr = 현재 cpu + 1
  • OSQ(MCS) lock에 하나의 엔트리도 없는 경우 가장 빨리 OSQ(MCS) lock을 획득한다.
    • old = atomic_xchg(&lock->tail, curr);
      • &lock->tail에 curr 값을 대입하고 old에 대입 전 lock->tail 값을 가져온다.
    • if (old == OSQ_UNLOCKED_VAL)
      • 가져온 cpu 값이 0인경우는 OSQ(MCS) lock이 unlock 상태이므로 가장 빠르게 OSQ(MCS) lock을 획득하게 되어 루틴을 빠져나간다.
  • 노드를 뒤에 추가한다.
    • prev = decode_cpu(old);
      • per-cpu 데이터를 사용하기 위해 old 값에서 1을 뺀 후 &osq_node 구조체 포인터 값을 알아온다.
    • node->prev = prev;
      • 현재 노드의 앞에 이전 노드 구조체 정보를 대입하여 연결한다.
    • ACCESS_ONCE(prev->next) = node;
      • 이전 노드의 뒤에 현재노드를 연결한다.
  • spin 하며 리스케쥴 요청이 없는 동안 OSQ(MCS) lock을 획득할 때까지 기다린다.
    • while (!ACCESS_ONCE(node->locked)) {
      • node->locked가 0(lock status)인 경우 spin 한다.
        • 0=locked status, 1=unlocked status
    • if (need_resched())
      • 현재 태스크에 리스케쥴 요청이 있느지 확인하여 있는 경우 OSQ(MCS) lock 획득을 포기하기 위해 unqueue 루틴으로 이동한다.
    • cpu_relax_lowlatency();
      • rpi2: ARMv7 SMP인 Cortex-A7은 컴파일러 배리어인 barrier() 함수를 호출한다.
unqueue:
        /*
         * Step - A  -- stabilize @prev
         *
         * Undo our @prev->next assignment; this will make @prev's
         * unlock()/unqueue() wait for a next pointer since @lock points to us
         * (or later).
         */

        for (;;) {
                if (prev->next == node &&
                    cmpxchg(&prev->next, node, NULL) == node)
                        break;

                /*
                 * We can only fail the cmpxchg() racing against an unlock(),
                 * in which case we should observe @node->locked becomming
                 * true.
                 */
                if (smp_load_acquire(&node->locked))
                        return true;

                cpu_relax_lowlatency();

                /*
                 * Or we race against a concurrent unqueue()'s step-B, in which
                 * case its step-C will write us a new @node->prev pointer.
                 */
                prev = ACCESS_ONCE(node->prev);
        }
  •  unqueue 루틴으로 오는 케이스는 lock->owner가 sleep되거나 현재 태스크가 리스케쥴링 요청 받아 midpath를 포기해야 하는 경우 발생한다.
    • OSQ(MCS) lock을 포기하기 위해 OSQ에서 spin되고 있는 현재 cpu에 대한 노드의 연결을 제거해야한다.
    • lock->owner가 sleep되는 경우 OSQ에서 spin되고 있는 모든 cpu에 대한 노드의 연결을 제거하기 위해 경쟁하는 상황이 벌어지므로 이에 대해 문제 없이 동작하도록 2 단계에 대해 나누어 atomic operation을 사용한다.
  • 1단계 prev 노드에서 현재 노드에 대한 연결을 제거될 때까지 루프를 돌며 반복한다. 루프를 반복하는 이유는 여러 cpu가 unqueue를 동시에 진행해야 하는 상황에서 안정적으로 하나씩 처리할 수 있도록 루틴 처리가 되야 한다. 하는 상황에서 내노드에 연결된 이전 노드의 정보를 변경하여 내 연결정보를 없애려고 하는데 이전 노드의 처리가 더 빠르게 처리되는 경우 실시간으로 node->prev 값이 바뀔 수도 있다.
    • if (prev->next == node &&
      • 먼저 현재 노드가 prev->next인지 확인
    • cmpxchg(&prev->next, node, NULL) == node)
      • atomic operation에 의해 prev 노드에 현재 노드가 연결되어 있는 경우 현재 노드를 제거한다.  제거가 성공되면 loop를 빠져나간다.
    • if (smp_load_acquire(&node->locked))
      • node->locked가 1로 설정되어 완전히 종료됨을 알게 되면 루틴을 빠져나간다.
    • cpu_relax_lowlatency();
      • rpi2: 컴파일러 배리어인 barrier()를 호출한다.
    • prev = ACCESS_ONCE(node->prev);
      • 다시 한번 내 노드와 연결되어 있는 이전(prev) 노드를 알아온다.
        /*
         * Step - B -- stabilize @next
         *
         * Similar to unlock(), wait for @node->next or move @lock from @node
         * back to @prev.
         */

        next = osq_wait_next(lock, node, prev);
        if (!next)
                return false;

        /*
         * Step - C -- unlink
         *
         * @prev is stable because its still waiting for a new @prev->next
         * pointer, @next is stable because our @node->next pointer is NULL and
         * it will wait in Step-A.
         */

        ACCESS_ONCE(next->prev) = prev;
        ACCESS_ONCE(prev->next) = next;

        return false;
}
  • 아래의 그림은 4개의 task가 각각의 cpu에서 동작한다고 가정하고 1개의 task는 mutex를 소유한 상태이고 3개의 task는 midpath에 진입하여 spin하고 있는 상태이다.
  • FIFO 구조의 OSQ(MCS) lock에서 가장 선두에 위치한 task 홀로 mutex 소유자가 release되면 곧바로 mutex를 취득할 수 있도록 spin하고 있다.
  • 나머지 2개의 task는 순번대로 연결되어 있다.

osq_lock-2

  • 아래 그림은 3개의 task가 midpath에서 spin하고 있는데 두 번째 task가 리스쥴링 요청을 받아 중간에 unqueue되는 것을 보여준다.
  • 단순히 현재 노드를 제거하고 prev 노드와 next 노드를 연결하면 되는데 실제 코드에서는 prev->next와 node->next에 null을 대입하여 먼저 연결을 끊는다.
    • midpath에서 대기하는 task 들 중 일부 또는 전부가 동시에 unquque되는 상황이라면 각 cpu에서 동시에 이 구조체에 접근하게 되는데 함부로 연결을 끊고 이어 붙이면 서로 중복 처리되어 정상적인 연결이 되지 않기 때문에 atomic operation을 이용하여 null을 집어넣어 먼저 연결을 끊어 주어야 한다.
    • prev->next와 node->next에 null을 집어 넣은 후에는 경쟁(racing) 상황에서 벗어났기 때문에 prev와 next 노드를 연결해준다.

osq_lock-3a

  • 아래 그림은 2개의 task가 경쟁적으로 unqueue되어 빠져나가는 상황을 나타내었는데 두 번째 노드와 세 번째 노드가 거의 동시에 step-A까지 동작하였고 그 다음 step-B 동작이 미세하게 두 번째 노드가 먼저 진입한 상태이다.
    • 1) 두 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
    • 2) 세 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
    • 3) 두 번째 노드가 step-B: node->next가 null인 관계로 null이 아닐 때 까지 spin
    • 4) 세 번째 노드가 step-B: node->next가 null이 아니므로 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 5, 6) 세 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)
    • 7) 두 번째 노드가 spin 하고 있다가 step-B: node->next가 null이 아니게 되자마자 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 8, 9) 두 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)

osq_lock-4a

  • 아래 그림은 2개의 task가 경쟁적으로 unqueue되어 빠져나가는 상황을 나타내었는데 두 번째 노드가 조금 먼저 진입하여 이미 step-B까지 먼저 진행된 상태에서 세 번째 노드가 step-A에 진입한 경우이다.
    • 1) 두 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
    • 2) 두 번째 노드가 step-B: node->next에 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 3) 세 번째 노드가 step-A: prev->next가 null인 관계로 null이 아닐 때 까지 spin 한다.
    • 4, 5) 두 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)
    • 6) 세 번째 노드가 spin 하고 있다가 step-A: prev->next가 null이 아니게 되자마자 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 7) 세 번째 노드가 step-B: node->next에 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 8, 9) 세 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)

osq_lock-5

 

osq_unlock()

kernel/locking/osq_lock.c

void osq_unlock(struct optimistic_spin_queue *lock)
{
        struct optimistic_spin_node *node, *next;
        int curr = encode_cpu(smp_processor_id());

        /*
         * Fast path for the uncontended case.
         */
        if (likely(atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr))
                return;

        /*
         * Second most likely case.
         */
        node = this_cpu_ptr(&osq_node);
        next = xchg(&node->next, NULL);
        if (next) {
                ACCESS_ONCE(next->locked) = 1;
                return;
        }

        next = osq_wait_next(lock, node, NULL);
        if (next)
                ACCESS_ONCE(next->locked) = 1;
}
  • curr는 현재 cpu(0부터 시작) + 1
  • osq_unlock()의 구현도 처리 성능별 3단계로 구현되어 있다.
    • fastpath
      • OSQ의 마지막 노드가 현재 cpu 노드인 경우
      • if (atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr))
        • &lock->tail이 curr와 같은 경우 OSQ_UNLOCKED_VAL(0)로 대입한다.
        • 높은 확률로 atomic_cmpxchg() 함수 결과가 true이면 return 한다.
        • atomic_cmpxchg()의 결과가 curr 라는 것은 FIFO 구조의 OSQ(MCS)의 tail이 curr라는 것과 동일하다.
        • 참고: Atomic Operation | 문c
    • midpath
      • OSQ에서 내 뒤에 노드가 존재하는 경우 다음 노드를 osq spin 상태에서 벗어나도록 locked flag를 1로 설정한다.
      • node = this_cpu_ptr(&osq_node);
        • per-cpu 데이터 구조의 osq_node 구조체 포인터를 알아온다.
      • next = xchg(&node->next, NULL);
        • xchg()
          • #define xchg(ptr,x)      ((__typeof__(*(ptr)))__xchg((unsigned long)(x),(ptr),sizeof(*(ptr))))
        • &node->next <- NULL을 대입하는데 결과로 대입 이전 값을 리턴한다.
        • 참고: Atomic Operation | 문c
      • if (next)
        • 기존에 다음 노드가 존재하였었다면 next->locked에 1(unlocked)을 대입하고 함수를 빠져나온다.
        • 다음 노드가 OSQ(MCS) lock에서 가장 선두 노드이므로 그 노드가 osq spin 상태에서 벗어나도록 지시한다.
    • slowpath
      • 두 번째 노드가 분명 있지만 연결이 되어 있지 않은 경우 연결될 때까지 기다린 후 다음 노드를 알아와서 그 노드의 locked에 1로 설정하여 osq spin 상태에서 벗어나도록 지시한다.
      • next = osq_wait_next(lock, node, NULL)
        • 임시적으로 링크가 아직 연결되지 않아 안정되지 않은 상태인 경우 링크가 연결될 때까지 기다려서 다음 노드를 알아온 다음 next->locked에 1(unlocked)을 대입하고 함수를 빠져나온다.
        • 역시 다음 노드가 OSQ(MCS) lock에서 가장 선두 노드이므로 그 노드가 osq spin 상태에서 벗어나도록 지시한다.

 

아래 그림은 midpath로 mutex 요청이 여러 개가 동시에 인입되어 osq_lock() 함수가 호출되어 OSQ(MCS) lock을 얻게되는 과정을 나타내었다.

  • 적색 locked=1은 osq_unlock() 호출될 때 OSQ 노드 중 두 번째 노드를 locked=1로 표시하여 두 번째 노드가 osq_lock spin 에서 빠져나올 수 있도록 한다.

osq_lock-1b

 

osq_wait_next()

kernel/locking/osq_lock.c

/*
 * Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
 * Can return NULL in case we were the last queued and we updated @lock instead.
 */
static inline struct optimistic_spin_node *
osq_wait_next(struct optimistic_spin_queue *lock,
              struct optimistic_spin_node *node,
              struct optimistic_spin_node *prev)
{
        struct optimistic_spin_node *next = NULL;
        int curr = encode_cpu(smp_processor_id());
        int old;

        /*
         * If there is a prev node in queue, then the 'old' value will be
         * the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if
         * we're currently last in queue, then the queue will then become empty.
         */
        old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;

        for (;;) {
                if (atomic_read(&lock->tail) == curr &&
                    atomic_cmpxchg(&lock->tail, curr, old) == curr) {
                        /*
                         * We were the last queued, we moved @lock back. @prev
                         * will now observe @lock and will complete its
                         * unlock()/unqueue().
                         */
                        break;
                }

                /*
                 * We must xchg() the @node->next value, because if we were to
                 * leave it in, a concurrent unlock()/unqueue() from
                 * @node->next might complete Step-A and think its @prev is
                 * still valid.
                 *
                 * If the concurrent unlock()/unqueue() wins the race, we'll
                 * wait for either @lock to point to us, through its Step-B, or
                 * wait for a new @node->next from its Step-C.
                 */
                if (node->next) {
                        next = xchg(&node->next, NULL);
                        if (next)
                                break;
                }

                cpu_relax_lowlatency();
        }

        return next;
}
  • 내 노드의 다음에 연결된 노드를 알아오는데 다른 cpu가 unque되면서 연결이 잠시 끊어질 때가 있다 그런 경우는 기다렸다가 연결된 후 즉 안정상태가 된 후 알아와야 한다.
  • int curr = encode_cpu(smp_processor_id());
    • curr = cpu 번호 + 1
  • old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
    • prev 노드가 존재하는 경우 prev노드의 cpu 번호(1부터 시작하는)를 알아오고 없으면 0을 대입한다.
    • 0을 대입하는 것은 내 노드가 마지막인데 prev가 없는 경우 empty 큐가 되기 때문이다.
  • if (atomic_read(&lock->tail) == curr &&
    • lock의 tail이 현재 노드이면서 (내 노드가 마지막 노드이면서)
  • atomic_cmpxchg(&lock->tail, curr, old) == curr) {
    • atomic operation으로 lock->tail이 curr인 경우 old로 변경시킨다. 변경 시키기 전의 tail 값이 curr인 경우 루프를 빠져나간다.
    • 마지막 노드 표식(lock->tail)에 prev->cpu 값을 집어 넣어 이전 노드를 마지막 노드로 만든다.
  • if (node->next) {
    • 누군가 실시간으로 내 노드의 next 값에 null을 넣어 연결을 끊지 않았으면
  • next = xchg(&node->next, NULL);
    • 내 노드의 next에 null 값을 집어 넣고 변경되기 전 next 값을 알아와서 next 노드가 존재하는 경우 리턴한다.
  • cpu_relax_lowlatency();
    • 아직 node->next 값이 설정되지 않았으므로 배리어를 동작시킨 후 다시 루프를 돈다.

 

mutex_unlock()

kernel/locking/mutex.c

/**
 * mutex_unlock - release the mutex
 * @lock: the mutex to be released
 *
 * Unlock a mutex that has been locked by this task previously.
 *
 * This function must not be used in interrupt context. Unlocking
 * of a not locked mutex is not allowed.
 *
 * This function is similar to (but not equivalent to) up().
 */
void __sched mutex_unlock(struct mutex *lock)
{
        /*   
         * The unlocking fastpath is the 0->1 transition from 'locked'
         * into 'unlocked' state:
         */
#ifndef CONFIG_DEBUG_MUTEXES
        /*   
         * When debugging is enabled we must not clear the owner before time,
         * the slow path will always be taken, and that clears the owner field
         * after verifying that it was indeed current.
         */
        mutex_clear_owner(lock);
#endif
        __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}

EXPORT_SYMBOL(mutex_unlock);
  • fastpath mutex unlock 처리를 위해 __mutex_fastpath_unlock() 함수를 호출한다.
    • mutex_lock()과 동일하게 fastpath는 atomic operation으로 구현된다.
    • fastpath가 실패하는 경우 __mutex_unlock_slowpath() 함수로 이동한다.

 

__mutex_fastpath_unlock()

include/asm-generic/mutex-dec.h

/**
 *  __mutex_fastpath_unlock - try to promote the count from 0 to 1
 *  @count: pointer of type atomic_t
 *  @fail_fn: function to call if the original value was not 0
 *      
 * Try to promote the count from 0 to 1. If it wasn't 0, call <fail_fn>.
 * In the failure case, this function is allowed to either set the value to
 * 1, or to set it to a value lower than 1.
 *
 * If the implementation sets it to a value of lower than 1, then the
 * __mutex_slowpath_needs_to_unlock() macro needs to return 1, it needs
 * to return 0 otherwise.
 */
static inline void
__mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
        if (unlikely(atomic_inc_return(count) <= 0))
                fail_fn(count);
}

  • fastpath는 atomic operation lock count를 증가시켜 적은 확률로 0이하이면(lock 상태) __mutex_unlock_slowpath() 함수를 호출한다.

 

__mutex_unlock_slowpath()

kernel/locking/mutex.c

/*
 * Release the lock, slowpath:
 */
__visible void 
__mutex_unlock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_unlock_common_slowpath(lock, 1);
}
  • __mutex_unlock_common_slowpath() 루틴을 호출한다.

 

__mutex_unlock_common_slowpath()

kernel/locking/mutex.c

/*
 * Release the lock, slowpath:
 */
static inline void
__mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{
        unsigned long flags;

        /*
         * As a performance measurement, release the lock before doing other
         * wakeup related duties to follow. This allows other tasks to acquire
         * the lock sooner, while still handling cleanups in past unlock calls.
         * This can be done as we do not enforce strict equivalence between the
         * mutex counter and wait_list.
         *
         *
         * Some architectures leave the lock unlocked in the fastpath failure
         * case, others need to leave it locked. In the later case we have to
         * unlock it here - as the lock counter is currently 0 or negative.
         */
        if (__mutex_slowpath_needs_to_unlock())
                atomic_set(&lock->count, 1);

        spin_lock_mutex(&lock->wait_lock, flags);
        mutex_release(&lock->dep_map, nested, _RET_IP_);
        debug_mutex_unlock(lock);

        if (!list_empty(&lock->wait_list)) {
                /* get the first entry from the wait-list: */
                struct mutex_waiter *waiter =
                                list_entry(lock->wait_list.next,
                                           struct mutex_waiter, list);

                debug_mutex_wake_waiter(lock, waiter);

                wake_up_process(waiter->task);
        }

        spin_unlock_mutex(&lock->wait_lock, flags);
}
  • __mutex_slowpath_nees_to_unlock()
    • 아키텍처마다 다르지만 ARM의 경우 1로 설정된다.
      • ARM에서 구현된 midpath를 위한 조건을 동작시키기 위해 1로 설정된다.
  • atomic_set(&lock->count, 1)
    • midpath에서 lock을 획득하기 위해 spin하고 있는 태스크가 있는 경우 mutex를 획득할 수 있는 기회를 slowpath 보다 먼저 주기위해 lock->count를 1(unlock)로 설정한다.
  • if (!list_empty(&lock->wait_list)) {
    • 대기자가 있는 경우
  • struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list);
    • 선두에 있는 대기자를 알아온다.
  • wake_up_process(waiter->task);
    • 알아온 대기자를 깨어나게 요청한다.

다음 그림을 보고 slowpath에서 잠들어 있는 태스크를 깨우는 동작을 확인한다.

  • CPU 1에 있는 mutex는 task가 preemption요청을 받아 slowpath로 동작하여 sleep되어 있다가 CPU2가 unlock될 때 첫 대기자인 CPU1에서 구동되고 있는 태스크를 미리 깨워서 mutex를 얻을 수 있도록 기회를 준다.

mutex-4a

 

기타 함수

spin_lock_mutex()

spin_lock_mutex() 함수는 디버깅용과 non-디버깅용 두 개가 정의 되어 있다.
kernel/locking/mutex-debug.h
#define spin_lock_mutex(lock, flags)            \
    do {                        \
        struct mutex *l = container_of(lock, struct mutex, wait_lock); \
        DEBUG_LOCKS_WARN_ON(in_interrupt());    \
        local_irq_save(flags);          \
        arch_spin_lock(&(lock)->rlock.raw_lock);\
        DEBUG_LOCKS_WARN_ON(l->magic != l); \
    } while (0)

 

kernel/locking/mutex.h

#define spin_lock_mutex(lock, flags) \
        do { spin_lock(lock); (void)(flags); } while (0)

 

debug_mutex_lock_common()

 kernel/locking/mutex-debug.c
void debug_mutex_lock_common(struct mutex *lock, struct mutex_waiter *waiter)
{
        memset(waiter, MUTEX_DEBUG_INIT, sizeof(*waiter));
        waiter->magic = waiter;
        INIT_LIST_HEAD(&waiter->list);
}
  •  waiter 구조체를 초기화한다.

debug_mutex_add_waiter()

kernel/locking/mutex-debug.c

void debug_mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter,
                            struct thread_info *ti)
{
        SMP_DEBUG_LOCKS_WARN_ON(!spin_is_locked(&lock->wait_lock));

        /* Mark the current thread as blocked on the lock: */
        ti->task->blocked_on = waiter;
}
  • 인수로 전달된 현재 context에 연결된 태스크 구조체의 blocked_on에 waiter를 추가한다. (대기 태스크들이 연결된다.)

 

__ww_mutex_lock_check_stamp()

 kernel/locking/mutex.c
static inline int __sched __ww_mutex_lock_check_stamp(struct mutex *lock, struct ww_acquire_ctx *ctx)
{
        struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
        struct ww_acquire_ctx *hold_ctx = ACCESS_ONCE(ww->ctx);

        if (!hold_ctx)
                return 0;

        if (unlikely(ctx == hold_ctx))
                return -EALREADY;

        if (ctx->stamp - hold_ctx->stamp <= LONG_MAX &&
            (ctx->stamp != hold_ctx->stamp || ctx > hold_ctx)) {
#ifdef CONFIG_DEBUG_MUTEXES
                DEBUG_LOCKS_WARN_ON(ctx->contending_lock);
                ctx->contending_lock = ww;
#endif
                return -EDEADLK;
        }

        return 0;
}
  • ww_mutex 사용 시에 사용되는 함수로 deadlock 상황을 디텍트하여 _EDEADLK를 리턴한다.

 

구조체

struct mutex

include/linux/mutex.h
/*
 * Simple, straightforward mutexes with strict semantics:
 *
 * - only one task can hold the mutex at a time
 * - only the owner can unlock the mutex
 * - multiple unlocks are not permitted
 * - recursive locking is not permitted
 * - a mutex object must be initialized via the API
 * - a mutex object must not be initialized via memset or copying
 * - task may not exit with mutex held
 * - memory areas where held locks reside must not be freed
 * - held mutexes must not be reinitialized
 * - mutexes may not be used in hardware or software interrupt
 *   contexts such as tasklets and timers
 *
 * These semantics are fully enforced when DEBUG_MUTEXES is
 * enabled. Furthermore, besides enforcing the above rules, the mutex
 * debugging code also implements a number of additional features
 * that make lock debugging easier and faster:
 *
 * - uses symbolic names of mutexes, whenever they are printed in debug output
 * - point-of-acquire tracking, symbolic lookup of function names
 * - list of all locks held in the system, printout of them
 * - owner tracking
 * - detects self-recursing locks and prints out all relevant info
 * - detects multi-task circular deadlocks and prints out all affected
 *   locks and tasks (and only those tasks)
 */
struct mutex {
        /* 1: unlocked, 0: locked, negative: locked, possible waiters */
        atomic_t                count;
        spinlock_t              wait_lock;
        struct list_head        wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
        struct task_struct      *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map      dep_map;
#endif
};

간단하고도 엄격한 의미의 뮤텍스

  • 한 개의 태스크는 한 번에 하나의 뮤텍스를 hold(lock이 걸려 대기하는 상태)할 수 있다.
  • owner만이 뮤텍스를 unlock 할 수 있다.
  • 다중 unlock은 허가되지 않는다.
  • 재귀(recursive) locking은 허가되지 않는다.
  • 뮤텍스 오브젝트는 반드시 API를 이용하여 초기화 하여야 한다.
  • 뮤텍스 오브젝트는 절대 memset나 memcpy에 의해 초기화되어 사용될 수 없다.
  • 태스크는 뮤텍스가 held된 채로 종료할 수 없다.
  • held된 락이 위치한 메모리 영역은 절대 free 되면 안된다.
  • held된 락은 절대 재 초기화될 수 없다.
  • 뮤텍스는 tasklets와 타이머같은 하드웨어 또는 소프트웨어 인터럽트 컨텍스트를 사용할 수 없다.
  • CONFIG_DEBUG_MUTEXES 옵션을 사용한 경우
    • 디버그 출력 시 뮤텍스의 심볼명 사용
    • 락획득지점(point-of-acquire) 추적, 펑션명으로 심볼 검색(lookup)
    • 시스템에서 held된 모든 락 목록을 출력
    • 소유자 추적
    • 재귀(self-recursing) 락의 검출과 관련 정보 출력
    • 멀티 태스크 dead-lock의 검출과 영향을 끼친 락과 태스크의 출력
  • 구조체 멤버 변수
    • counter: 3가지의 뮤텍스 상태를 나타내는데 초기화 후 unlock(1) 상태로 시작한다.
      • 1: unlock
      • 0: 대기자 없는 lock
      • 음수: 대기자 있는 lock
    • wait_lock: slowpath 구현에 사용되는 spin lock으로 spin_lock_mutex()의 인수로 사용
    • wait_list: 대기자 리스트 (ticket 구현)
    • ownner: 락 소유한 태스크의 포인터, CONFIG_MUTEX_SPIN_ON_OWNER 옵션 또는 락 디버깅에서 사용
    • osq: optimisitc spin queue(spinner MCS lock), CONFIG_MUTEX_SPIN_ON_OWNER 옵션에서 사용
    • magic: 락 디버깅에서 사용
    • dep_map:  CONFIG_DEBUG_LOCK_ALLOC 옵션에서 사용

 

struct mutex_waiter

include/linux/mutex.h

/*
 * This is the control structure for tasks blocked on mutex,
 * which resides on the blocked task's kernel stack:
 */
struct mutex_waiter {
        struct list_head        list;
        struct task_struct      *task;
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
};
  • list
    • wait_list의 뒤에서 각 mutex_waiter들이 연결되는데 사용된다.
  • task
    • 대기하고 있는 태스크

 

struct optimistic_spin_node

include/linux/osq_lock.h

/*
 * An MCS like lock especially tailored for optimistic spinning for sleeping
 * lock implementations (mutex, rwsem, etc).
 */
struct optimistic_spin_node {
        struct optimistic_spin_node *next, *prev;
        int locked; /* 1 if lock acquired */
        int cpu; /* encoded CPU # + 1 value */
};
  • optimistic_spin_node 구조체는 per-cpu 데이터인 전역 변수 osq_node에서 사용된다.
  • next 및 prev에 null이 있는 경우 OSQ(MCS) unlock 상태이고 FIFO 구조로 나중에 추가된 노드들은 꼬리 부분에 추가된다.
  • locked
    • 보통 0상태에 있고 midpath에서 mutex를 얻기 위해 경쟁을 하는 cpu가 osq_unlock()되어 나갈 때 OSQ에서 후 순위로 대기하고 cpu가 osq_lock()에서 빠져나올 수 있도록 하게 한다.
  • cpu
    • 1번 부터 사직되는 CPU 번호

 

struct optimistic_spin_queue

include/linux/osq_lock.h

struct optimistic_spin_queue {
        /*
         * Stores an encoded value of the CPU # of the tail node in the queue.
         * If the queue is empty, then it's set to OSQ_UNLOCKED_VAL.
         */
        atomic_t tail;
};
  • OSQ(MCS)에서 가장 마지막에 위치한 노드로 1번 부터 시작하는 cpu 번호이다. 0이 들어있는 경우 대기자가 없는 경우이다.

 

뮤텍스 초기화

 
뮤텍스의 초기화는 두 가지 방법을 사용할 수 있다.
  • DEFINE_MUTEX() 매크로를 사용하여 정적 선언으로 뮤텍스를 초기화
  • mutex_init() 매크로 함수를 호출하여 다이나믹하게 뮤텍스를 초기화

1) DEFINE_MUTEX()

include/linux/mutex.h
#define DEFINE_MUTEX(mutexname) \
        struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

 

include/linux/mutex-debug.h

#ifdef CONFIG_DEBUG_MUTEXES
#define __DEBUG_MUTEX_INITIALIZER(lockname)                             \
        , .magic = &lockname
#else
# define __DEBUG_MUTEX_INITIALIZER(lockname)
#endif

#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define __DEP_MAP_MUTEX_INITIALIZER(lockname) \
                , .dep_map = { .name = #lockname }
#else
# define __DEP_MAP_MUTEX_INITIALIZER(lockname)
#endif

#define __MUTEX_INITIALIZER(lockname) \
                { .count = ATOMIC_INIT(1) \
                , .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
                , .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
                __DEBUG_MUTEX_INITIALIZER(lockname) \
                __DEP_MAP_MUTEX_INITIALIZER(lockname) }
  • CONFIG_DEBUG_MUTEXES 옵션이 있는 경우 mutex 구조체의 magic 멤버 변수에 lockname의 주소를 대입한다.
  • CONFIG_DEBUG_LOCK_ALLOC 옵션이 있는 경우 mutex 구조체의 dep_map 멤버 구조체 변수중 name에 lockname을 직접 대입한다.

 

2) mutex_init()

include/linux/mutex.h
/**
 * mutex_init - initialize the mutex
 * @mutex: the mutex to be initialized
 *
 * Initialize the mutex to unlocked state.
 *
 * It is not allowed to initialize an already locked mutex.
 */
# define mutex_init(mutex) \
do {                                                    \
        static struct lock_class_key __key;             \
        __mutex_init((mutex), #mutex, &__key);          \
} while (0)
  • 이미 lock된 뮤텍스는 초기화를 허락하지 않는다.

 

kernel/locking/mutex.c

void  __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
        atomic_set(&lock->count, 1);
        spin_lock_init(&lock->wait_lock);
        INIT_LIST_HEAD(&lock->wait_list);
        mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        osq_lock_init(&lock->osq);
#endif

        debug_mutex_init(lock, name, key);
}
  • 뮤텍스 내에서 사용하는 spin_lock은 wait_list라는 링크드 리스트 구조체를 관리하기 위해 필요하다.
  • wait_list를 초기화한다. (리스트의 prev와 next 포인터들은 null이 아니라 wait_list 구조체 자기 자신을 가리킨다)
  • owner는 null로 초기화된다.
  • CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용하는 경우 optimistic spinning queue 락을 초기화한다.
  • debug_mutex_init()함수는 디버그 기능에서 사용하며 설명은 생략

 

Mutex 인터페이스

  • 뮤텍스 획득, 언인터럽터블
    • void mutex_lock(struct mutex *lock);
    • void mutex_lock_nested(struct mutex *lock, unsigned int subclass);
    • int  mutex_trylock(struct mutex *lock);
  • 뮤텍스 획득, 인터럽터블
    • int mutex_lock_interruptible_nested(struct mutex *lock, int subclass);
    • int mutex_lock_interruptible(struct mutex *lock);
  • 뮤텍스 획득, 인터럽터블, 감소시켜서 0되면
    • int atomic_dec_and_mutex_lock(atomic_t *cnt, struct mutex *lock);
  • 뮤텍스 언락
    • void mutex_unlock(struct mutex *lock);
  • 뮤텍스 락 획득 여부 체크
    • int mutex_is_locked(struct mutex *lock);

 

참고

 

might_sleep()

기능

  • CONFIG_PREEMPT_VOLUNTARY 옵션이 동작할 때에만 동작하는 Preemption 포인트
  • 현재 태스크보다 더 높은 우선 순위의 태스크를 빨리 처리할 수 있도록 중간에 리스케쥴링을 허용한다.
  • 리스케쥴링하는 경우 현재 태스크가 preemption되며 이 때 sleep이 일어나게 된다.
  • Preemption Points
    • 리눅스커널은 이렇게 preemption 포인트를 커널의 여러 곳에 뿌려(?) 놓았다.
    • preemption 포인트의 도움을 받아 급한 태스크의 가동에 필요한 latency가 100us 이내로 줄어드는 성과가 있었다.
    • preemption 포인트는 보통 1ms(100us) 이상 소요되는 루틴에는 보통 추가하여 놓는다.

소스 분석

might_sleep()

might_sleep

아래 소스에서 CONFIG_DEBUG_ATOMIC_SLEEP 옵션이 있는 경우에만 디버그를 위해 __might_sleep()루틴을 호출한다.

#ifdef CONFIG_DEBUG_ATOMIC_SLEEP
/**
* might_sleep - annotation for functions that can sleep
*
* this macro will print a stack trace if it is executed in an atomic
* context (spinlock, irq-handler, ...).
*
* This is a useful debugging help to be able to catch problems early and not
* be bitten later when the calling function happens to sleep when it is not
* supposed to.
*/
# define might_sleep() \
        do { __might_sleep(__FILE__, __LINE__, 0); might_resched(); } while (0)
#else
# define might_sleep() do { might_resched(); } while (0)
#endif

might_resched()는 CONFIG_PREEMPT_VOLUNTARY 옵션이 있는 경우에만 _cond_resched()루틴을 호출하고 없는 경우 아무것도 하지 않는다.

include/linux/kernel.h

#ifdef CONFIG_PREEMPT_VOLUNTARY
# define might_resched() _cond_resched()
#else
# define might_resched() do { } while (0)
#endif
_cond_resched() 함수에서는 preemption이 필요한 상황인 경우 preemption 되며 sleep 한다.
  • should_resched()함수는 preemption 가능한 상태(preempt count가 0)이면서 현재 태스크보다 더 높은 우선 순위를 갖은 태스크가 있는 경우에 true
  • preempt_schedule_commoon() 함수는 태스크의 스케쥴링을 다시 한다.

include/linux/kernel.h

int __sched _cond_resched(void)
{
        if (should_resched()) {
                preempt_schedule_common();
                return 1;
        }
        return 0;
}

should_resched()

should_resched 함수는 리스케쥴이 필요한 상황에서 true로 리턴한다.
  • preemption이 enable(preempt_count()가 0) 이면서
  • 리스케쥴 요청이 있는 경우(tif_need_resched()가 true)

include/asm-generic/preempt.h

/*
 * Returns true when we need to resched and can (barring IRQ state).
 */
static __always_inline bool should_resched(void)
{
        return unlikely(!preempt_count() && tif_need_resched());
}
preempt_count()는 현재 스레드 정보(thread_info 구조체)에서 preempt_count를 리턴.
  • preempt_count 값은 preempt_enable() 호출 시 감소되며 preempt_disable() 호출 시 증가된다.
  • preempt_count 값이 0인 경우 preemption이 가능한 상태가 된다.

include/asm-generic/preempt.h

static __always_inline int preempt_count(void)
{
        return current_thread_info()->preempt_count;
}
tif_need_resched() 매크로는 -> test_thread_flag() 매크로를 호출하고 -> 다시 test_ti_thread_flag() 함수를 호출한다.
  • test_bit() 함수는 비트 operation에 사용되는 함수로 해당 비트가 설정되어 있는지를 체크하여 리턴한다.
  • 스레드 플래그
    • 여러 개의 많은 비트로 구성되어 있는데 그 중 TIF_NEED_RESCHED 비트는 리스케쥴이 필요한 경우 세트된다.

include/linux/thread_info.h

static inline int test_ti_thread_flag(struct thread_info *ti, int flag)
{
        return test_bit(flag, (unsigned long *)&ti->flags);
}

#define test_thread_flag(flag) \
        test_ti_thread_flag(current_thread_info(), flag)

#define tif_need_resched() test_thread_flag(TIF_NEED_RESCHED)

preempt_schedule_common()

preempt_schedule_common() 함수는 실제 리스케쥴을 수행하는 함수이므로 preemption되는 경우 sleep이 일어나는 장소다.
이 함수는 리눅스 커널의 scheduler에 해당하는 주요 함수이므로 scheduler 부분을 분석해야 하므로 이 파트에서는 자세한 분석은 생략한다.
kernel/sched/core.c
static void __sched notrace preempt_schedule_common(void)
{
        do {
                __preempt_count_add(PREEMPT_ACTIVE);
                __schedule();
                __preempt_count_sub(PREEMPT_ACTIVE);

                /*
                 * Check again in case we missed a preemption opportunity
                 * between schedule and now.
                 */
                barrier();
        } while (need_resched());
}

참고

likely() & unlikely()

커널에서 사용되는 매크로로 다음과 같다.

include/linux/compiler.h

#define likely(x)       __builtin_expect((x),1)
#define unlikely(x)     __builtin_expect((x),0)
  • likely()는 true가 될 확률이 높은 조건문에서 사용하여 성능을 높이고자 사용한다.
  • unlikely()는 false가 될 확률이 높은 조건문에서 사용하여 성능을 높이고자 사용한다.

__builtin_expect()

Built-in Function: long __builtin_expect (long exp, long c)
  • gcc 내장 함수로서 컴파일러에게 branch prediction 정보를 주고자하는 함수이다.
  • 자주 사용되지 않는 문장을 함수의 뒷 부분에 배치하여 메모리 캐시나 brach prediction cache에 영향을 주어 성능을 최적화 시키려할 때 사용한다.
  • likely/unlikely와 다르게 userspace에서도 사용할 수 있다.

디버그 추적용

디버그 추적 옵션이 동작하는 경우 사용되는 likely() 및 unlikely() 매크로의 소스는 다음과 같다.

include/linux/compiler.h

#if defined(CONFIG_TRACE_BRANCH_PROFILING) \
    && !defined(DISABLE_BRANCH_PROFILING) && !defined(__CHECKER__)
void ftrace_likely_update(struct ftrace_branch_data *f, int val, int expect);

#define likely_notrace(x)       __builtin_expect(!!(x), 1)
#define unlikely_notrace(x)     __builtin_expect(!!(x), 0)

#define __branch_check__(x, expect) ({                                  \
                        int ______r;                                    \
                        static struct ftrace_branch_data                \
                                __attribute__((__aligned__(4)))         \
                                __attribute__((section("_ftrace_annotated_branch"))) \
                                ______f = {                             \
                                .func = __func__,                       \
                                .file = __FILE__,                       \
                                .line = __LINE__,                       \
                        };                                              \
                        ______r = likely_notrace(x);                    \
                        ftrace_likely_update(&______f, ______r, expect); \
                        ______r;                                        \
                })

/*
 * Using __builtin_constant_p(x) to ignore cases where the return
 * value is always the same.  This idea is taken from a similar patch
 * written by Daniel Walker.
 */
# ifndef likely
#  define likely(x)     (__builtin_constant_p(x) ? !!(x) : __branch_check__(x, 1))
# endif
# ifndef unlikely
#  define unlikely(x)   (__builtin_constant_p(x) ? !!(x) : __branch_check__(x, 0))
# endif

__builtin_constant_p()

__builtin_constant_p는 인수가 컴파일 시 상수로 알려져 있으면 정수 1을 반환하고 그렇지 않으면 0을 반환합니다.

참고

 

Atomic Operation

<kernel v5.10>

Atomic Operation

Atomic Operation은 공유 자원에 대해 멀티 프로세서 및 멀티 스레드가 동시에 경쟁적으로 요청 시 조작(Operation)이 한 번에 하나씩 동작하여 그 중간에 끼어 들 수 없도록 한다.

 

아래 그림은 2개의 멀티 프로세서를 사용하는 시스템에서 Atomic Operation 적용 유무에 따른 결과를 보여준다. (처음 A 값은 100)

  • Atomic Operation을 사용하지 않은 경우
    • 각 CPU에서 명령들을 순서대로 처리하지 않고 거의 동시에 실행하면 공유된 자원의 결과 값이 예측되지 않는다.
  • Atomic Operation을 사용하는 경우
    • 아키텍처마다 많은 방법이 있는데 CAS(Compare-and-Swap) 방식 처럼 hardware가 어느 한 쪽을 지연시켜 동기화를 하거나 또는 LL/SC(Load-Linked and Conditional-Store) 방식처럼 hardware의 도움(exclusive monitor로 태그 사용)을 받아 동시 접근을 인지하여 어느 한 쪽을 실패시키 원복시키며, 다시 재시도하는 방법 등으로 atomic 함수들이 올바른 결과를 얻을 수 있게 작성되어 있다.

 

아키텍처별 Atomic Operation 구현 분류

Atomic Operation은 아키텍처마다 처리하는 명령과 방법이 조금씩 다르므로 정확한 처리 방법은 해당 아키텍처 소스를 참고하길 바라며, 이 글에서는 ARM 및 ARM64를 우선하여 설명한다.

  • 시스템 메모리에 있는 공유 변수를 증/감시키기 위해서는 보통 한 개의 명령으로 처리되지 않아 LL/SC 또는 CAS(LSE 포함) 방식을 사용하여 여러 개의 명령이 순차적으로 수행되어야 하는데 이를 보장하는 방법을 Atomic Operation이라고 하며 시스템에서 사용할 수 있는 가장 작은 단위의 동기화 기법이기도 하다.
  • critical section을 보호하는 각종 lock을 사용하지 않고 atomic operation을 사용하는 이유는 dead-lock 등이 없고 동기화 기법 중 가장 빠른 성능을 가지고 있다.

 

아키텍처들이 사용하는 큰 두 가지 유형

아키텍처들의 atomic 구현 방법은 세부적으로 모두 다르지만 크게 다음 두 가지 방법을 사용한다.

  • LL/SC(Load-Linked and Conditional-Store)
    • load 표식을 한 후 연산 후 동시에 다른 프로세스나 디바이스가 이 영역에 접근 여부를 hardware가 감지하여 없는 경우에만 store한다. 접근이 감지되면 다시 성공할 때까지 반복하는 방법을 사용한다. 단점으로 ARM의 경우 동시 접근한 atomic operation의 처리 순서는 가장 마지막에 접근한 atomic operation을 먼저 성공시킨다.
    • 사용 패턴
      • 1) load from memory
      • 2) 연산(inc/dec, …)
      • 3) store to memory
      • 4) 실패 시 반복
    • 적용 아키텍처
      • ARM
      • 68k
      • Alpha
      • ARM (ARMv6 ~ ARMv8)
        • ARMv8의 경우 Load-Exclusive/Store-Exclusive로도 불리운다. (ldxr/stxr)
      • MIPS
      • POWERPC
    • 참고: Exclusive loads and store | 문c
  • CAS(Compare-and-Swap)
    • hardware lock을 잡고 add 또는 exchange 후 unlock을 하는 방식이다. 언제나 한 번에 성공하므로 반복하지 않는다.
    • 사용 패턴
      • 1) lock
      • 2) 연산(add/exchange)
      • 3) unlock
    • 적용 아키텍처
      • IA64
      • SPARC
      • x64
      • x86
  • LSE(Large System Extensions)
    • ARMv8.1 아키텍처의 경우 LL/SC 방식 이외에도 LSE를 추가적으로 지원한다.
      • LSE를 지원하는 커널은 시스템이 LSE feature를 인식하면 커널 부팅 과정에서 모든 atomic 명령을 LSE 방식으로 명령을 교체하여 동작한다.
    • LSE atomic은 LL/SC 방식 이외에도 hardware lock을 지원하는 LA/SR(Load-Acquire/Store-Release) atomic과 CAS atomic을 동시에 지원한다.
      • 대표 명령: ldar/stlr 명령과 cas 명령
    • 이전 글에서 LSE atomic을 CAS atomic으로 분류하였으나, ARM 매뉴얼은 항상 LSE atomic으로 표기하므로 별도로 분류하였다.

 


ARM 아키텍처에서의 구현

ARM & ARM64 시스템에 구현된 방법은 다음과 같다.

  • ARMv5 이하
    • 모두 UP(Uni Processor) 시스템용으로만 구현되어 있어 인터럽트를 막는 것으로 구현
  • ARMv6, ARMv7
    • UP로 커널을 빌드하여 사용 시 인터럽트를 막는 것으로 구현
    • SMP로 커널을 빌드하여 사용 시 LL/SC 방식으로 구현
  • ARMv8
    • UP/SMP 가리지 않고 LL/SC 방식으로 구현
  • ARMv8.1~
    • UP/SMP 가리지 않고 LSE 방식으로 구현

 

1) UP 시스템에서의 구현 – 인터럽트 disable 구현 방식
  • 해당 atomic operation 수행 중 태스크의 문맥교환이 일어나거나(태스크가 preemption) 인터럽트 루틴에서 해당 메모리에 대해 동시 처리되지 않도록 아예 인터럽트를 막아서 해결한다.
  • UP에서는 인터럽트만 막아도 태스크의 문맥교환이 일어나지 않으므로  인터럽트를 enable하기 전까지 atomic opeation을 보장하게 된다.

 

2) SMP 시스템에서의 구현 – LL/SC 구현 방식
  • UP 이외에도 SMP 시스템까지 동시에 처리 될 수 있도록 LL/SC(Load-Link/Store-Conditional)라는 테크닉을 사용하였다.
  • CPU에서 atomic operation을 수행하는 도중 인터럽트 또는 preemption 되는 것을 막지 않는다.
  • 여러 CPU에서 atomic operation이 동시에 수행되는 것도 막지 않는다.
  • 공유 메모리의 접근이 서로 중복되는 경우가 발생하면 해당 atomic operation을 취소하고 중복이 발생되지 않을 때 까지 retry하는 기법으로 atomic operation이 한 번에 하나만 수행되는 것을 보장한다.
  • 해당 공유 메모리의 접근이 중복되는 것을 알아채는 것은 하드웨어의 도움을 받아 처리한다.
    • ARMv6, ARMv7에서는 ldrex와 strex 명령 쌍으로 처리할 수 있다.
    • ARMv8에서는 ldxr과 stxr 명령 쌍으로 처리할 수 있다. 그 외에 성능 극대화를 위해 배리어의 적용을 없앨 수 있게, 4 가지 타입의 오더링 명령셋이 추가되었다.
      • ARMv8: Cortex A53, A57, A72

 

다음 그림은 LL/SC 방식에서 동시에 요청된 atomic operation들이 처리되는 과정을 보여준다.

 

3) SMP 시스템에서의 구현 – LSE 구현 방식
  • ARMv8.1 이상에서는 LL/SC와는 다른 x86 진영에서 사용하는 방식인 CAS와 유사한 LSE(Large System Extension) atomic 명령이 채용되었다.
    • ARM 진영에서는 ARMv8.0 아키텍처까지 LL/SC 방식을 사용하였고, CPU가 많아지면서 LL/SC의 retry 부담이 커졌다. 따라서 x86 진영에서 사용하는 hardware lock을 사용한 CAS 명령과 비슷하게 동작하는 새로운 ARMv8.1 LSE(Large System Extension) 기능을 지원하는 것으로 변경되었다.
    • 리눅스가  EL2에서 시작하고 성능을 높이기 위해 동작도 EL2에서 수행하려면 ARMv8.1의 VHE(Virtualization Host Extension)와 ARMv8.1의 LSE 기능 둘 다 필요하다. 이 기능이 없으면 EL2에서 부팅하여 stub code만 남기고, EL1으로 전환하여 커널을 운영한다.
    • ARMv8.2: Cortex A75, A76

 

아키텍처별 헤더 파일

Atomic operation은 기본 구현 헤더와 asm-generic 헤더 및 아키텍처에 따라 전용 구현이 준비되어 있다.
  • 기본 구현 헤더
    • include/linux/atomic.h
    • include/linux/atomic-fallback.h
    • include/linux/atomic-arch-fallback.h
    • include/asm-generic/atomic.h
    • include/asm-generic/atomic64.h
    • include/asm-generic/atomic-long.h
  • 아키텍처와 연결을 도와주기 위한 헤더
    •  include/asm-generic/atomic-instrumented.h
  • 아키텍처별 헤더 (asm-generic 보다 아키텍처 전용 코드 사용을 우선)
    •  arch/arm/include/asm/atomic.h
      • 32bit ARM 시스템 전용 코드로 구성
      • asm-generic 보다 아키텍처 전용 코드 사용을 우선한다.
    • arch/arm64/include/asm/atomic.h (ARMv8.x 공통)
      • LL/SC 방식을 사용하는 64bit ARM 시스템 전용 코드로 구성
    • arch/arm64/include/asm/atomic_ll_sc.h (ARMv8)
      • LL/SC 방식을 사용하는 ARMv8 아키텍처 전용 코드로 구성
    • arch/arm64/include/asm/atomic_lse.h (ARMv8.1~)
      • LSE atomic 방식을 사용하는 ARMv8.1~ 아키텍처 전용 코드로 구성

 


SW atomic operation vs HW atomic operation 지원

1) S/W 접근 방법 (ARMv5 까지 사용)

  • 인터럽트를 막음으로 atomic operation을 수행 중 다른 태스크가 preemption 되지 않게 한다.
  • ARMv5까지는 UP(Uni Processor) 시스템이므로 현재 CPU의 인터럽트만 막아도 atomic operation이 성립한다.
  • 참고로 SDRAM에 존재하는 한 변수를 증감하려 할 때 cpu clock은 캐시 상태(hit/miss)에 따라 수 cycle ~ 수십 cycle이 소요된다.
  • 매크로로 만들어진 아래 함수를 설명하면…
    • 현재 CPU의 인터럽트 상태를 보관 하고 disable하여 인터럽트 호출을 막는다.
    • atomic operation에 필요한 add/sub 또는 xchg 연산등을 수행한다.
    • 다시 인터럽트 상태를 원래대로 돌려놓는다.
static inline void atomic_add(int i, atomic_t *v)
{
        unsigned long flags;                
        raw_local_irq_save(flags);
        v->counter += i;
        raw_local_irq_restore(flags);
}

 

2) LL/SC – H/W 접근 방법 (ARMv6 ~ ARMv8까지 사용)

atomic_add() – ARM32

아래는 실제 매크로 코드들을 풀어썻다.

arch/arm/include/asm/atomic.h

static inline void atomic_add(int i, atomic_t *v)
{
        unsigned long tmp;
        int result;

        prefetchw(&v->counter);
        __asm__ __volatile__(
"1:     ldrex   %0, [%3]\n"
"       add     %0, %0, %4\n"
"       strex   %1, %0, [%3]\n"
"       teq     %1, #0\n"
"       bne     1b"
        : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)
        : "r" (&v->counter), "Ir" (i)
        : "cc");                           /* clobbers */
}
  • ldrex/strex 사용: UP에서의 멀티스레드에서의 preemption 이외에도 SMP 시스템에서는 메모리 access가 여러 개의 CPU에서 동시 처리될 수 있기 때문에 데이터에 대해 Atomic operation(load – operation – store, 읽고 연산하고 기록) 수행 시 다른 CPU가 끼어들면 실패를 감지할 수 있어야 한다. 따라서 ARM 아키텍처에서는 이를 위해 ldr과 str 대신 별도로 ldrex와 strex 명령을 사용하여 처리하게 하였다.
  • pld(pldw) 사용:
    • ldrex와 strex 사이에 처리되어야 하는 메모리가 캐시 미스되어 ldrex 부터 strex 까지의 처리에 CPU clock의 지연을 일으키므로 이를 막기 위해 사용하려는 메모리를 ldrex로 로드하기 전에 미리 캐시에 사전 로드하는 방법을 사용하기 위해 ARM 아키텍처 전용 명령인 pld(pldw) 명령을 사용하여 지정된 메모리의 데이터가 캐시에 없는 경우 이를 캐시에 미리로드(linefill)하라고 지시한다.
    • 처음 사용 시 critical section을 최소화 시키는 것과 유사한 효과를 보이므로 strex에서 실패할 확률을 줄여준다.
  • Out of order execution 및 Out of order access memory 기능을 가지고 있는 ARM 아키텍처를 위해 atomic_xxx_return() 및 atomic_cmpxchg_relaxed() 명령의 경우 smp_mb() 베리어를 추가 사용한다.
  • “Qo” memory constraints

 

atomic_add() – ARM64 (ARMv8.0)

아래는 실제 매크로 코드들을 풀어썻다.

arch/arm64/include/asm/atomic_ll_sc.h

static inline void atomic_add(int i, atomic_t *v)
{
        unsigned long tmp;
        int result;

        asm volatile(
"       prfm    pstl1strm, %2\n"
"1:     ldxr    %w0, %2\n"
"       add     %w0, %w0, %w3\n"
"       stxr    %w1, %w0, %2\n"
"       cbnz    %w1, 1b\n"
        : "=&r" (result), "=&r" (tmp), "+Q" (v->counter)
        : __stringify(constraint) "r" (i));
}

 

3) LSE – H/W 접근 방법 (ARMv8.1 LSE)

아래는 실제 매크로 코드들을 풀어썻다.

arch/arm64/include/asm/atomic_lse.h

static inline void atomic_add(int i, atomic_t *v)
{
        asm volatile(
"       .arch_extension lse\n"
"       stadd %w[i], %w[i], %[v]\n"
        : [i] "+r" (i), [v] "+Q" (v->counter)
        : "r" (v)
        : cl);
}
  • ll/sc 방식과는 다르게 prefetch 및 반복문이 없이 하나의 명령문만을 사용한 것을 확인할 수 있다.
  • stadd 명령은 atomic add 명령으로 32비트의 w레지스터 또는 64비트의 x레지스터를 다룬다. 또한 메모리 오더링을 사용하지 않는다.

 


Atomic operation 명령

리눅스 커널이 제공하는 Atomic Operation  명령들을 알아보자. (v=32bit 카운터 값을 가진 atomic_t 구조체 주소)

 

RMW vs non-RMW

atomic API들은 크게 다음과 같이 분류할 수 있다.

  • RMW(Read-Modify-Write)
    • 종류
      • 산술연산(add, sub, inc, dec)
      • 비트조작(xor, andnot, and)
      • 교체(xchg, …)
      • 기타(add_unless, …)
    • 특징
      • 결과 값이 없는 RMW 명령들은 weaked(out-of) 오더를 가질 수 있다.
        • 예) atomic_add()
      • 결과 값이 있는 RMW 명령들은 strong(in, full) 오더를 가진다.
        • 예) atomic_add_return()
  • non-RWM
    • 종류
      • read, set
    • 특징
      • non-RMW 명령들의 특징은 weaked(out-of) 메모리 오더를 가질 수 있다.

 

1) 기본 명령

커널 v4.3-rc1부터 atomic_or() 명령이외에 atomic_xor(), atomic_and() 명령들도 추가되었다.

 

  • atomic_add(i, &v)
    • v값을 읽은 후 i만큼 증가시키고 저장한다.
      • *v += i
  • atomic_sub(i, &v)
    • v값을 읽은 후 i만큼 감소시키고 저장한다.
      • *v -= i
  • atomic_and(i, &v)
    • v값을 읽은 후 i값을 and 연산한 후 저장한다.
      • *v &= i
  • atomic_andnot(i, &v)
    • v값을 읽은 후 i값을 andnot 연산한 후 저장한다.
      • *v &= ~i
  • atomic_or(i, &v)
    • v값을 읽은 후 i값을 or 연산한 후 저장한다.
      • *v |= i
  • atomic_xor(i, &v)
    • v값을 읽은 후 i값을 xor 연산한 후 저장한다.
      • *v ^= i

 

아래 그림은 단순하게 atomic_t 변수 100에 10을 더해 110이 되어 저장하는 과정을 보여준다.

 

2) Return 추가 명령

  • atomic_add_return(i, &v)
    • v값을 읽은 후 i만큼 증가시키고 저장한다. 또한 증가시켜 저장한 값을 반환한다.
      • *v += i
  • atomic_sub_return(i, &v)
    • v값을 읽은 후 i만큼 감소시키고 저장한다. 또한 감소시켜 저장한 값을 반환한다.
      • *v -= i

 

아래 그림은 atomic_t 변수 100에 10을 더해 110이 되어 저장하는 것과, 연산 후의 값을 결과 값으로 반환하는 것을 보여준다.

 

3) Fetch 추가 명령

커널 v4.3-rc1부터 _set, _clear 명령대신 andnot 명령을 사용하게 하였고, 커널 v4.8-rc에서 삭제하였다.

 

  • atomic_fetch_add(i, &v)
    • v값을 읽은 후 i만큼 증가시키고 저장한다. 그리고 연산 전의 v 값을 반환한다.
      • *v += i
  • atomic_fetch_sub(i, &v)
    • v값을 읽은 후 i만큼 증가시키고 저장한다. 그리고 연산 전의 v 값을 반환한다.
      • *v -= i
  • atomic_fetch_and(i, &v)
    • v값을 읽은 후 i값을 and 연산한 후 저장한다. 그리고 연산 전의 v 값을 반환한다.
      • *v &= i
  • atomic_fetch_andnot(i, &v)
    • v값을 읽은 후 i값을 andnot 연산한 후 저장한다. 그리고 연산 전의 v 값을 반환한다.
      • *v &= ~i
  • atomic_fetch_or(i, &v)
  • atomic_fetch_xor(i, &v)
    • v값을 읽은 후 i값을 xor 연산한 후 저장한다. 그리고 연산 전의 v 값을 반환한다.
      • *v ^= i

 

아래 그림은 atomic_t 변수 100에 10을 더해 110이 되어 저장하는 것과, 연산 전의 값을 결과 값으로 반환하는 것을 보여준다.

 

4) Exchange 명령

  • atomic_xchg(&v, new)
    • v값을 읽은 후 new 값으로 교체한 후 다시 저장한다. 그리고 교체 전의 v 값을 반환한다.
      • *v <- new
  • atomic_cmpxchg(&v, old, new)
    • v값을 읽어 old와 같은 경우에만 new 값으로 교체한 후 다시 저장한다. 그리고 교체 전의 v 값을 반환한다.
      • *v <- new (if v == old)

 

아래 그림은 atomic_t 변수를 10으로 교체하여 저장하는 것과, 교체 전 값을 결과 값으로 반환하는 것을 보여준다.

 

아래 그림은 atomic_t 변수가 지정된 값(100)과 같은 경우에만 20으로 교체하여 저장하는 것과, 교체 전 값을 결과 값으로 반환하는 것을 보여준다.

 

  • atomic_try_cmpxchg(&v, &val, new)

 

5) read & set 명령

non-RMW 타입의 atomic 명령들은 다음과 같다.

  • atomic_read(&v)
    • v 주소 값을 읽어온다.
  • atomic_set(&v, i)
    • v 주소 값에 i를 저장한다.
      • *v = i

 

arch/arm/include/asm/atomic.h – for ARM

/*
 * On ARM, ordinary assignment (str instruction) doesn't clear the local
 * strex/ldrex monitor on some implementations. The reason we can use it for
 * atomic_set() is the clrex or dummy strex done on every exception return.
 */
#define atomic_read(v)  READ_ONCE((v)->counter)
#define atomic_set(v,i) WRITE_ONCE(((v)->counter), (i))

READ_ONCE()와 WRITE_ONCE() 매크로 함수는 컴파일러 메모리 배리어 및 아키텍처 메모리 배리어를 사용하여 atomic read/write를 수행한다.

  • ARM 및 ARM64 시스템에서는 atomic_read() 구현 시 1개의 atomic instruction으로 처리가능하므로 아키텍처 배리어는 사용하지 않는다.

 

6) 기타 명령

  • atomic_inc(&v)
    • v값을 읽은 후 1 증가시키고 저장한다.
      • *v++
  • atomic_dec(&v)
    • v값을 읽은 후 1 감소시키고 저장한다.
      • *v–
  • atomic_inc_and_test(&v)
    • v값을 읽은 후 1 증가시키고 저장한다. 또한 증가시킨 값이 0이면 true(1)를 반환하고, 그 외의 값이면 false(0)를 반환한다.
      • *v–
  • atomic_dec_and_test(&v)
    • v값을 읽은 후 1 감소시키고 저장한다. 또한 감소시킨 값이 0이면 true(1)를 반환한고, 그 외의 값이면 false(0)를 반환한다.
      • *v–
  • atomic_inc_return(&v)
    • v값을 읽은 후 1 증가시키고 저장한다. 또한 증가시켜 저장한 값을 반환한다.
      • *v++
  • atomic_dec_return(&v)
    • v값을 읽은 후 1 감소시키고 저장한다. 또한 감소시켜 저장한 값을 반환한다.
      • *v–
  • atomic_add_negative(&v)
    • v 값을 읽은 후 i만큼 증가시키고 저장한다. 또한 증가시켜 저장한 값이 음수(-)이면 true(1)를 반환한고, 그 외의 값이면 false(0)을 반환한다.
      • *v += i
  • atomic_add_unless(&v, i, u)
    • v 값을 읽은 후 u 값과 다른 경우에 한해 i 값을 더해 저장한다. 결과 값이 u 값과 다른 경우 true(1)를 반환한다.

 

아래 그림은 atomic v값이 비교 값(140)이 아닌 경우에만 20을 더하여 저장하는 것을 보여준다. 결과 값이 비교 값(140)과 다른 경우 1을 반환한다.

 

  •  atomic_inc_not_zero(&v)
    • v 값을 읽은 후 0이 아니면 1을 증가시키고 저장한다. 결과 값이 0이면 0을 반환하고, 그 외 1을 반환한다.
  • atomic_inc_not_zero_hint(&v, hint)
    • hint 값에 따라 가 다음과 같은 동작을 한다.
      • hint = 0인 경우
        • atomic_inc_not_zero()와 같이 동작하고,
      • hint가 1인 경우
        • 0이 아닌 경우 exchange
    • 네트워크 스택에서 atomic_inc_not_zero()를 많이 사용하는데, 이 함수는 read/modify/write를 단계를 거친다. 이러한 단계를 거치지 않고 atomic하게 exchange를 할 수 있는 아키텍처를 위해 hint를 제공하는 API를 추가하였다.
      • hint가 제공되는 경우 CAS 방식의 atomic operation을 지원하는 아키텍처의 경우 캐시 코히런스 프로토콜(예: x86의 MESI, arm의 MOESI)의 공유 상태 진입 없이 처리가능하다.
      • 아키텍처 specific한 함수를 include/linux/atomic.h으로 만들었는데 아직 호응은 별로 없다.
        • 현재 /net/atm/pppoatm.c에만 적용되어 있다.
      • 참고: atomic: add atomic_inc_not_zero_hint()
    • 드디어 아키텍처를 가리는 이 함수가 커널 v4.19-rc1에서 제거되었다.
  • atomic_dec_if_positive(&v)

 


Barrier 포함 atomic 명령

성능 향상을 위해 대부분의 아키텍처들이 메모리에 대한 접근 순서를 요구한 순서대로 하지 않고, 때에 따라서는 변경하여 처리한다. atomic_add_return() 타입같은 경우 명령 자체가 순차 처리를 강제하게 되어 있다. 더 극대화된 성능 처리를 위해 이러한 오더링 타입을 변경하도록 atomic operation들도 이에 대응하는 명령들을 아래와 같이 지원한다.

 

relaxed, …

커널 v4.3-rc부터Atomic 오퍼레이션 동작 시 메모리 오더링에 대한 옵션을 사용할 수 있도록 3가지 접미사(_acquire, _release, _relaxed)를 가진 명령들을 포함하였다. 현재 대부분의 아키텍처들은 모든 명령들이 full 오더링 개런티로 동작되고있다. 그러나 일부 아키텍처들은 성능 향상을 위해 일부 함수에 대해 오더링 옵션들을 아래 의도대로 정확히 적용시키고 있다.

  • 무접미사
    • Atomic 오퍼레이션시 full ordering 개런티를 한다. (최저 성능)
    • 순차 처리하도록 강제한다.
      • ARMv8: dmb ish 추가 수행, stxr 대신 xtlxr 사용
  •  *_acquire
    • acquire 이후의 읽기 및 쓰기 조작 전에 이 atomic operation이 완료되는 것을 보장하게 강제한다.
      • ARMv8: ldxr 대신 ldaxr 사용
    • 사용 사례:
      • queued_read_lock() – atomic_add_return_acquire()
      • queued_write_lock() – atomic_cmpxchg_acquire()
      • queued_spin_lock() – atomic_cmpxchg_acquire()
      • osq_lock() – osq_wait_next() – atomic_cmpxchg_acquire()
  • *_release
    • release 이전의 모든 읽기 및 쓰기 작업이 완료된 후 이 atomic operation이 시작하는 것을 보장하게 강제한다.
      • ARMv8: stxr 대신 stlxr 사용
    • 사용 사례:
      • queued_read_unlock()- atomic_sub_return_release()
      • queued_write_unlock()- atomic_cmpxchg_release()
      • queued_spin_unlock() – atomic_cmpxchg_release()
      • osq_unlock() – atomic_cmpxchg_release()
  • *_relaxed
    • Atomic 오퍼레이션 시 ordering 개런티를 하지 않아도 될 때 사용한다. (최고 성능)
      • 강제하지 않기 때문에 아키텍처가 ordering을 하든 안하든 관여하지 않는다.
    • 사용 사례:
      • queued_spin_lock_slowpath() – atomic_cmpxchg_relaxed()

 

아래 그림은 atomic 함수와 배리어 관련 접미사가 붙었을 때 배리어가 동작하는 과정을 보여준다.

  • barrier cost가 높기 때문에 성능을 극대화하려면 *_relaxed를 사용하여 atomic operation 위 아래로 barrier 없이 동작할 수 있는 코드를 사용할 수 있는지 여부를 판단한 후에 적용해야 한다.

 

before | after atomic

여러 개의 atomic opeartion 전/후로 smp_mb()를 수행하는 API들을 없애고, 두 개의 API로 단축하여 사용한다.

 

  • smp_mb__before_atomic()
    • Atomic operation 전에 smp_mb()를 수행한다.
  • smp_mb__after_atomic()
    • Atomic operation 후에 smp_mb()를 수행한다.

 

read/set

  • atomic_read_acquire(&v)
    • 단방향 배리어와 함께 v 주소 값을 읽어온다.
    • 이 매크로 함수는 smp_load_acquire() 배리어 함수를 호출한다.
    • 거의 대부분의 코드들은 atomic_read_acquire() atomic 함수를 사용하지 않고 smp_load_acquire() 배리어 함수를 직접 사용한다.
    • atomic_read_acquire() 사용 사례:
      • include/linux/qrwlock.h – rspin_until_writer_unlock()
  • atomic_set_release(&v, i)
    • 단방향 배리어와 함께 v 주소 값에 i 값을 대입한다.
    • smp_store_release() 배리어 함수를 호출한다.
    • 거의 대부분의 코드들은 atomic_set_release() atomic 함수를 사용하지 않고 smp_store_release() 배리어 함수를 직접 사용한다.
    • atomic_set_release() 사용 사례:
      • kernel/jump_label.c – static_key_slow_inc_cpuslocked()

 


Atomic 함수 분석

산술/비트 연산 관련 – for ARM32 (LL/SC atomic)

atomic_add() 등 6개 함수

아래 ATOMIC_OPS() 및 ATOMIC_OP() 매크로 함수를 통해 다음 함수들을 생성한다.

  • atomic_add()
    • atomic_add_return_relaxed()
    • atomic_fetch_add_relaxed()
  • atomic_sub()
    • atomic_sub_return_relaxed()
    • atomic_fetch_sub_relaxed()

 

arch/arm/include/asm/atomic.h

#define ATOMIC_OPS(op, c_op, asm_op)                                    \
        ATOMIC_OP(op, c_op, asm_op)                                     \
        ATOMIC_OP_RETURN(op, c_op, asm_op)                              \
        ATOMIC_FETCH_OP(op, c_op, asm_op)

ATOMIC_OPS(add, +=, add)
ATOMIC_OPS(sub, -=, sub)

 

atomic_and() 등 12개 함수

아래 ATOMIC_OPS() 및 ATOMIC_OP() 매크로 함수를 통해 다음 함수들을 생성한다.

  • atomic_and()
    • atomic_and_return_relaxed()
    • atomic_fetch_and_relaxed()
  • atomic_andnot()
    • atomic_andnot_return_relaxed()
    • atomic_fetch_andnot_relaxed()
  • atomic_or()
    • atomic_or_return_relaxed()
    • atomic_fetch_or_relaxed()
  • atomic_xor()
    • atomic_xor_return_relaxed()
    • atomic_fetch_xor_relaxed()

 

arch/arm/include/asm/atomic.h

#undef ATOMIC_OPS
#define ATOMIC_OPS(op, c_op, asm_op)                                    \
        ATOMIC_OP(op, c_op, asm_op)                                     \
        ATOMIC_FETCH_OP(op, c_op, asm_op)

ATOMIC_OPS(and, &=, and)
ATOMIC_OPS(andnot, &= ~, bic)
ATOMIC_OPS(or,  |=, orr)
ATOMIC_OPS(xor, ^=, eor)
ATOMIC_OP()

arch/arm/include/asm/atomic.h

/*
 * ARMv6 UP and SMP safe atomic ops.  We use load exclusive and
 * store exclusive to ensure that these are atomic.  We may loop
 * to ensure that the update happens.
 */
#define ATOMIC_OP(op, c_op, asm_op)                                     \
static inline void atomic_##op(int i, atomic_t *v)                      \
{                                                                       \
        unsigned long tmp;                                              \
        int result;                                                     \
                                                                        \
        prefetchw(&v->counter);                                         \
        __asm__ __volatile__("@ atomic_" #op "\n"                       \
"1:     ldrex   %0, [%3]\n"                                             \
"       " #asm_op "     %0, %0, %4\n"                                   \
"       strex   %1, %0, [%3]\n"                                         \
"       teq     %1, #0\n"                                               \
"       bne     1b"                                                     \
        : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)               \
        : "r" (&v->counter), "Ir" (i)                                   \
        : "cc");                                                        \
}

v 주소의 32비트 값을 읽은 후 i 값을 연산(add, sub, and, andnot, or 및 xor)시켜 저장한다. 이를 atomic 하게 처리한다.

  • atomic operation이 실패하여 재반복 하는 case
    • 현재 프로세스가 v 주소에 접근하는 중에 다른 프로세스가 v 라인값이 저장된 캐시 라인에 접근하는 경우 실패

 

ATOMIC_OP_RETURN()

arch/arm/include/asm/atomic.h

#define ATOMIC_OP_RETURN(op, c_op, asm_op)                              \
static inline int atomic_##op##_return_relaxed(int i, atomic_t *v)      \
{                                                                       \
        unsigned long tmp;                                              \
        int result;                                                     \
                                                                        \
        prefetchw(&v->counter);                                         \
                                                                        \
        __asm__ __volatile__("@ atomic_" #op "_return\n"                \
"1:     ldrex   %0, [%3]\n"                                             \
"       " #asm_op "     %0, %0, %4\n"                                   \
"       strex   %1, %0, [%3]\n"                                         \
"       teq     %1, #0\n"                                               \
"       bne     1b"                                                     \
        : "=&r" (result), "=&r" (tmp), "+Qo" (v->counter)               \
        : "r" (&v->counter), "Ir" (i)                                   \
        : "cc");                                                        \
                                                                        \
        smp_mb();                                                       \
                                                                        \
        return result;                                                  \
}

v 주소의 32비트값을 읽은 후 i 값을 연산(add, sub, and, andnot, or 및 xor)시킨 후 저장한다. 이를 atomic 하게 처리하는 것은 동일하지만 결과 값으로 연산시켜 저장한 값을 그대로 반환한다.

 

ATOMIC_FETCH_OP()

arch/arm/include/asm/atomic.h

#define ATOMIC_FETCH_OP(op, c_op, asm_op)                               \
static inline int atomic_fetch_##op##_relaxed(int i, atomic_t *v)       \
{                                                                       \
        unsigned long tmp;                                              \
        int result, val;                                                \
                                                                        \
        prefetchw(&v->counter);                                         \
                                                                        \
        __asm__ __volatile__("@ atomic_fetch_" #op "\n"                 \
"1:     ldrex   %0, [%4]\n"                                             \
"       " #asm_op "     %1, %0, %5\n"                                   \
"       strex   %2, %1, [%4]\n"                                         \
"       teq     %2, #0\n"                                               \
"       bne     1b"                                                     \
        : "=&r" (result), "=&r" (val), "=&r" (tmp), "+Qo" (v->counter)  \
        : "r" (&v->counter), "Ir" (i)                                   \
        : "cc");                                                        \
                                                                        \
        return result;                                                  \
}

v 주소의 32비트값을 읽은 후 i 값을 연산(add, sub, and, andnot, or 및 xor)시키고 저장한다. 이를 atomic 하게 처리하는 것은 동일하지만 결과 값으로 연산전에 fetch한 v값을 반환한다.

 


산술/비트 연산 관련 – ARM64 (LL/SC & LSE atomic 공통)

atomic_add() 등 9개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • atomic_add()
    • atomic_add_return()
      • atomic_add_return_acquire()
      • atomic_add_return_release()
      • atomic_add_return_relaxed()
    • atomic_fetch_add()
      • atomic_fetch_add_acquire()
      • atomic_fetch_add_release()
      • atomic_fetch_add_relaxed()

 

include/asm-generic/atomic-instrumented.h

static __always_inline void
atomic_add(int i, atomic_t *v)
{
        instrument_atomic_read_write(v, sizeof(*v));
        arch_atomic_add(i, v);
}
#define atomic_add atomic_add

#if !defined(arch_atomic_add_return_relaxed) || defined(arch_atomic_add_return)
static __always_inline int
atomic_add_return(int i, atomic_t *v)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_add_return(i, v);
}
#define atomic_add_return atomic_add_return
#endif

#if defined(arch_atomic_add_return_acquire)
static __always_inline int
atomic_add_return_acquire(int i, atomic_t *v)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_add_return_acquire(i, v);
}
#define atomic_add_return_acquire atomic_add_return_acquire
#endif

#if defined(arch_atomic_add_return_release)
static __always_inline int
atomic_add_return_release(int i, atomic_t *v)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_add_return_release(i, v);
}
#define atomic_add_return_release atomic_add_return_release
#endif

#if defined(arch_atomic_add_return_relaxed)
static __always_inline int
atomic_add_return_relaxed(int i, atomic_t *v)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_add_return_relaxed(i, v);
}
#define atomic_add_return_relaxed atomic_add_return_relaxed
#endif

 

atomic_sub() 등 9개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다. (코드 생략)

  • atomic_sub()
    • atomic_sub_return()
      • atomic_sub_return_acquire()
      • atomic_sub_return_release()
      • atomic_sub_return_relaxed()
    • atomic_fetch_sub()
      • atomic_fetch_sub_acquire()
      • atomic_fetch_sub_release()
      • atomic_fetch_sub_relaxed()

 

atomic_inc() 등 9개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다. (코드 생략)

  • atomic_inc()
    • atomic_inc_return()
      • atomic_inc_return_acquire()
      • atomic_inc_return_release()
      • atomic_inc_return_relaxed()
    • atomic_fetch_inc()
      • atomic_fetch_inc_acquire()
      • atomic_fetch_inc_release()
      • atomic_fetch_inc_relaxed()

 

atomic_dec() 등 9개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다. (코드 생략)

  • atomic_dec()
    • atomic_dec_return()
      • atomic_dec_return_acquire()
      • atomic_dec_return_release()
      • atomic_dec_return_relaxed()
    • atomic_fetch_dec()
      • atomic_fetch_dec_acquire()
      • atomic_fetch_dec_release()
      • atomic_fetch_dec_relaxed()

 

atomic_and() 등 5개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다. (코드 생략)

  • atomic_and()
    • atomic_fetch_and()
      • atomic_fetch_and_acquire()
      • atomic_fetch_and_release()
      • atomic_fetch_and_relaxed()

 

atomic_andnot() 등 5개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다. (코드 생략)

  • atomic_andnot()
    • atomic_fetch_andnot()
      • atomic_fetch_andnot_acquire()
      • atomic_fetch_andnot_release()
      • atomic_fetch_andnot_relaxed()

 

atomic_or() 등 5개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다. (코드 생략)

  • atomic_or()
    • atomic_fetch_or()
      • atomic_fetch_or_acquire()
      • atomic_fetch_or_release()
      • atomic_fetch_or_relaxed()

 

atomic_xor() 등 5개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다. (코드 생략)

  • atomic_xor()
    • atomic_fetch_xor()
      • atomic_fetch_xor_acquire()
      • atomic_fetch_xor_release()
      • atomic_fetch_xor_relaxed()

 

arch_atomic_add() 등 6개 함수

아래 ATOMIC_OP() 매크로 함수를 통해 다음 함수들을 생성한다.

  • arch_atomic_andnot()
  • arch_atomic_or()
  • arch_atomic_xor()
  • arch_atomic_add()
  • arch_atomic_and()
  • arch_atomic_sub()

 

arch/arm64/include/asm/atomic.h – for ARM64

#define ATOMIC_OP(op)                                                   \
static inline void arch_##op(int i, atomic_t *v)                        \
{                                                                       \
        __lse_ll_sc_body(op, i, v);                                     \
}

ATOMIC_OP(atomic_andnot)
ATOMIC_OP(atomic_or)
ATOMIC_OP(atomic_xor)
ATOMIC_OP(atomic_add)
ATOMIC_OP(atomic_and)
ATOMIC_OP(atomic_sub)

 

arch_atomic_fetch_add() 등 32개 함수

아래 ATOMIC_FETCH_OPS() 및 ATOMIC_FETCH_OP() 매크로 함수를 통해 다음 함수들이 생성된된다.

  • arch_atomic_fetch_andnot()
    • arch_atomic_fetch_andnot_relaxed()
    • arch_atomic_fetch_andnot_acquire()
    • arch_atomic_fetch_andnot_release()
  • arch_atomic_fetch_or()
    • arch_atomic_fetch_or_relaxed()
    • arch_atomic_fetch_or_acquire()
    • arch_atomic_fetch_or_release()
  • arch_atomic_fetch_xor()
    • arch_atomic_fetch_xor_relaxed()
    • arch_atomic_fetch_xor_acquire()
    • arch_atomic_fetch_xor_release()
  • arch_atomic_fetch_add()
    • arch_atomic_fetch_add_relaxed()
    • arch_atomic_fetch_add_acquire()
    • arch_atomic_fetch_add_release()
  • arch_atomic_fetch_and()
    • arch_atomic_fetch_and_relaxed()
    • arch_atomic_fetch_and_acquire()
    • arch_atomic_fetch_and_release()
  • arch_atomic_fetch_sub()
    • arch_atomic_fetch_sub_relaxed()
    • arch_atomic_fetch_sub_acquire()
    • arch_atomic_fetch_sub_release()
  • arch_atomic_add_return()
    • arch_atomic_add_return_relaxed()
    • arch_atomic_add_return_acquire()
    • arch_atomic_add_return_release()
  • arch_atomic_sub_return()
    • arch_atomic_sub_return_relaxed()
    • arch_atomic_sub_return_acquire()
    • arch_atomic_sub_return_release()

 

arch/arm64/include/asm/atomic.h – for ARM64

#define ATOMIC_FETCH_OP(name, op)                                       \
static inline int arch_##op##name(int i, atomic_t *v)                   \
{                                                                       \
        return __lse_ll_sc_body(op##name, i, v);                        \
}

#define ATOMIC_FETCH_OPS(op)                                            \
        ATOMIC_FETCH_OP(_relaxed, op)                                   \
        ATOMIC_FETCH_OP(_acquire, op)                                   \
        ATOMIC_FETCH_OP(_release, op)                                   \
        ATOMIC_FETCH_OP(        , op)

ATOMIC_FETCH_OPS(atomic_fetch_andnot)
ATOMIC_FETCH_OPS(atomic_fetch_or)
ATOMIC_FETCH_OPS(atomic_fetch_xor)
ATOMIC_FETCH_OPS(atomic_fetch_add)
ATOMIC_FETCH_OPS(atomic_fetch_and)
ATOMIC_FETCH_OPS(atomic_fetch_sub)
ATOMIC_FETCH_OPS(atomic_add_return)
ATOMIC_FETCH_OPS(atomic_sub_return)

 

CONFIG_ARM64_LSE_ATOMICS

CONFIG_ARM64_LSE_ATOMICS 컴파일러 옵션은 gas 툴이 lse 명령을 지원하는지 여부에 따라 결정된다. 만일 지원하지 않는 경우 LL/SC atomic 명령을 사용한다. gas 툴이 lse 명령을 지원하는 경우 런타임에 ARMv8.1 이상의 아키텍처에 구현된 LSE 기능 지원 여부에 따라 Static branch를 사용하여 자동으로 분기하는 방식으로 LL/SC atomic 명령과 LSE atomic 명령을 선택하여 사용할 수 있도록 한다.

 

__lse_ll_sc_body() – LL/SC 전용
#define __lse_ll_sc_body(op, ...)               __ll_sc_##op(__VA_ARGS__)

LL/SC atomic 명령을 사용하도록 __ll_sc_ 접두사를 사용하는 함수를 사용하도록 한다.

 

__lse_ll_sc_body() – LL/SC & LSE
#define __lse_ll_sc_body(op, ...)                                       \
({                                                                      \
        system_uses_lse_atomics() ?                                     \
                __lse_##op(__VA_ARGS__) :                               \
                __ll_sc_##op(__VA_ARGS__);                              \
})

시스템의 LSE atomic 지원 여부에 따라 분기한다.

  • LSE를 지원하는 경우
    • LSE atomic 명령을 사용하도록 __lse_ 접두사를 사용하는 함수를 호출하도록 한다.
  • LSE를 지원하지 않는 경우
    • LL/SC atomic 명령을 사용하도록 __ll_sc_ 접두사를 사용하는 함수를 호출하도록 한다.

 


for ARM64 (LL/SC atomic)

64bit ARMv8 아케턱처에서도 32비트에서 atomic assembly 명령인 ldrex/strex 명령이 사용하는 LL/SC(Load excLusive/Store excLusive) 방식과 동일한 방식을 사용한다. ARMv8에서는 ldxr과 stxr 명령을 사용한다.

  • ARMv8 아키텍처에서는 return 및 fetch를 포함하는 atomic operation들은 모두 4가지 타입의 배리어 관련 명령들도 제공한다.

 

__ll_sc_atomic_add() 등 18개 함수

아래 ATOMIC_FETCH_OPS(), ATOMIC_OP(), ATOMIC_OP_RETURN() 및 ATOMIC_FETCH_OP() 매크로 함수를 통해 다음 함수들이 생성된다.

  • __ll_sc_atomic_add()
    • __ll_sc_atomic_add_return()
      • __ll_sc_atomic_add_return_relaxed()
      • __ll_sc_atomic_add_return_acquire()
      • __ll_sc_atomic_add_return_release()
    • __ll_sc_atomic_fetch_add()
      • __ll_sc_atomic_fetch_add_relaxed()
      • __ll_sc_atomic_fetch_add_acquire()
      • __ll_sc_atomic_fetch_add_release()
  • __ll_sc_atomic_sub()
    • __ll_sc_atomic_sub_return()
      • __ll_sc_atomic_sub_return_relaxed()
      • __ll_sc_atomic_sub_return_acquire()
      • __ll_sc_atomic_sub_return_release()
    • __ll_sc_atomic_fetch_sub()
      • __ll_sc_atomic_fetch_sub_relaxed()
      • __ll_sc_atomic_fetch_sub_acquire()
      • __ll_sc_atomic_fetch_sub_release()

 

arch/arm64/include/asm/atomic_ll_sc.h

#define ATOMIC_OPS(...)                                                 \
        ATOMIC_OP(__VA_ARGS__)                                          \
        ATOMIC_OP_RETURN(        , dmb ish,  , l, "memory", __VA_ARGS__)\
        ATOMIC_OP_RETURN(_relaxed,        ,  ,  ,         , __VA_ARGS__)\
        ATOMIC_OP_RETURN(_acquire,        , a,  , "memory", __VA_ARGS__)\
        ATOMIC_OP_RETURN(_release,        ,  , l, "memory", __VA_ARGS__)\
        ATOMIC_FETCH_OP (        , dmb ish,  , l, "memory", __VA_ARGS__)\
        ATOMIC_FETCH_OP (_relaxed,        ,  ,  ,         , __VA_ARGS__)\
        ATOMIC_FETCH_OP (_acquire,        , a,  , "memory", __VA_ARGS__)\
        ATOMIC_FETCH_OP (_release,        ,  , l, "memory", __VA_ARGS__)

ATOMIC_OPS(add, add)
ATOMIC_OPS(sub, sub)

 

__ll_sc_atomic_and() 등 20개 함수

아래 ATOMIC_FETCH_OPS(), ATOMIC_OP() 및 ATOMIC_FETCH_OP() 매크로 함수를 통해 다음 함수들이 생성된다.

  • __ll_sc_atomic_and()
    • __ll_sc_atomic_fetch_and()
      • __ll_sc_atomic_fetch_and_relaxed()
      • __ll_sc_atomic_fetch_and_acquire()
      • __ll_sc_atomic_fetch_and_release()
  • __ll_sc_atomic_andnot()
    • __ll_sc_atomic_fetch_andnot()
      • __ll_sc_atomic_fetch_andnot_relaxed()
      • __ll_sc_atomic_fetch_andnot_acquire()
      • __ll_sc_atomic_fetch_andnot_release()
  • __ll_sc_atomic_or()
    • __ll_sc_atomic_fetch_or()
      • __ll_sc_atomic_fetch_or_relaxed()
      • __ll_sc_atomic_fetch_or_acquire()
      • __ll_sc_atomic_fetch_or_release()
  • __ll_sc_atomic_xor()
    • __ll_sc_atomic_fetch_xor()
      • __ll_sc_atomic_fetch_xor_relaxed()
      • __ll_sc_atomic_fetch_xor_acquire()
      • __ll_sc_atomic_fetch_xor_release()

 

arch/arm64/include/asm/atomic_ll_sc.h

#undef ATOMIC_OPS
#define ATOMIC_OPS(...)                                                 \
        ATOMIC_OP(__VA_ARGS__)                                          \
        ATOMIC_FETCH_OP (        , dmb ish,  , l, "memory", __VA_ARGS__)\
        ATOMIC_FETCH_OP (_relaxed,        ,  ,  ,         , __VA_ARGS__)\
        ATOMIC_FETCH_OP (_acquire,        , a,  , "memory", __VA_ARGS__)\
        ATOMIC_FETCH_OP (_release,        ,  , l, "memory", __VA_ARGS__)

ATOMIC_OPS(and, and)
ATOMIC_OPS(andnot, bic)
ATOMIC_OPS(or, orr)
ATOMIC_OPS(xor, eor)

 

ATOMIC_OP()

arch/arm64/include/asm/atomic_ll_sc.h

/*
 * AArch64 UP and SMP safe atomic ops.  We use load exclusive and
 * store exclusive to ensure that these are atomic.  We may loop
 * to ensure that the update happens.
 */
#define ATOMIC_OP(op, asm_op, constraint)                               \
static inline void                                                      \
__ll_sc_atomic_##op(int i, atomic_t *v)                                 \
{                                                                       \
        unsigned long tmp;                                              \
        int result;                                                     \
                                                                        \
        asm volatile("// atomic_" #op "\n"                              \
        __LL_SC_FALLBACK(                                               \
"       prfm    pstl1strm, %2\n"                                        \
"1:     ldxr    %w0, %2\n"                                              \
"       " #asm_op "     %w0, %w0, %w3\n"                                \
"       stxr    %w1, %w0, %2\n"                                         \
"       cbnz    %w1, 1b\n")                                             \
        : "=&r" (result), "=&r" (tmp), "+Q" (v->counter)                \
        : __stringify(constraint) "r" (i));                             \
}

ldxr 및 stxr 명령을 사용하여 성공리에 변경이 된 경우에만 함수를 빠져나간다.

  • prefetch destination word를 위해 prfm 명령을 사용한다.
  • ldxr/stxr 명령은 load/store exclusive register 명령이다.
    • 뒤에 w레지스터가 오는 경우 32bit general 레지스터를 처리
    • 뒤에 x레지스터가 오는 경우 64bit general 레지스터를 처리

 

ATOMIC_OP_RETURN()

arch/arm64/include/asm/atomic_ll_sc.h

#define ATOMIC_OP_RETURN(name, mb, acq, rel, cl, op, asm_op, constraint)\
static inline int                                                       \
__ll_sc_atomic_##op##_return##name(int i, atomic_t *v)                  \
{                                                                       \
        unsigned long tmp;                                              \
        int result;                                                     \
                                                                        \
        asm volatile("// atomic_" #op "_return" #name "\n"              \
        __LL_SC_FALLBACK(                                               \
"       prfm    pstl1strm, %2\n"                                        \
"1:     ld" #acq "xr    %w0, %2\n"                                      \
"       " #asm_op "     %w0, %w0, %w3\n"                                \
"       st" #rel "xr    %w1, %w0, %2\n"                                 \
"       cbnz    %w1, 1b\n"                                              \
"       " #mb )                                                         \
        : "=&r" (result), "=&r" (tmp), "+Q" (v->counter)                \
        : __stringify(constraint) "r" (i)                               \
        : cl);                                                          \
                                                                        \
        return result;                                                  \
}

ldxr 및 stxr 명령을 사용하여 성공리에 변경이 된 경우에만 함수를 빠져나간다. 결과 값으로 변경 후의 값을 반환한다.

  • _acquire 접미사를 사용하는 경우 배리어 적용을 위해 “memory” 클로버를 사용하고, ldxr 대신 ldaxr 명령을 사용한다.
    • ldaxr은 load-acquire exclusive register 명령이다.
  • _release 접미사를 사용하는 경우 배리어 적용을 위해 “memory” 클로버를 사용하고, stxr 대신 stlxr 명령을 사용한다.
    • stlxr은 store-release exclusive register 명령이다.
  • 접미사가 없는 경우 위의 두 배리어를 적용한다.
  • _relaxed 접미사를 사용하는 경우 “memory” 클로버도 사용하지 않고, 어떠한 배리어도 적용하지 않는다.

 

ATOMIC_FETCH_OP()

arch/arm64/include/asm/atomic_ll_sc.h

#define ATOMIC_FETCH_OP(name, mb, acq, rel, cl, op, asm_op, constraint) \
static inline int                                                       \
__ll_sc_atomic_fetch_##op##name(int i, atomic_t *v)                     \
{                                                                       \
        unsigned long tmp;                                              \
        int val, result;                                                \
                                                                        \
        asm volatile("// atomic_fetch_" #op #name "\n"                  \
        __LL_SC_FALLBACK(                                               \
"       prfm    pstl1strm, %3\n"                                        \
"1:     ld" #acq "xr    %w0, %3\n"                                      \
"       " #asm_op "     %w1, %w0, %w4\n"                                \
"       st" #rel "xr    %w2, %w1, %3\n"                                 \
"       cbnz    %w2, 1b\n"                                              \
"       " #mb )                                                         \
        : "=&r" (result), "=&r" (val), "=&r" (tmp), "+Q" (v->counter)   \
        : __stringify(constraint) "r" (i)                               \
        : cl);                                                          \
                                                                        \
        return result;                                                  \
}
  • ldxr 및 stxr 명령을 사용하여 성공리에 변경이 된 경우에만 함수를 빠져나간다. 결과 값으로 변경 전 값을 반환한다.
  • 위의 return 함수들과 동일한 방식으로 4가지 타입의 배리어 명령을 지원한다.

 


for ARM64 (LSE atomic)

__lse_atomic_add()등 4개 함수

아래 ATOMIC_OP() 매크로 함수를 통해 다음 함수들이 생성된다. (ordering을 하지 않는 명령들이다)

  • __lse_atomic_andnot()
  • __lse_atomic_or()
  • __lse_atomic_xor()
  • __lse_atomic_add()

 

arch/arm64/include/asm/atomic_lse.h

#define ATOMIC_OP(op, asm_op)                                           \
static inline void __lse_atomic_##op(int i, atomic_t *v)                \
{                                                                       \
        asm volatile(                                                   \
        __LSE_PREAMBLE                                                  \
"       " #asm_op "     %w[i], %[v]\n"                                  \
        : [i] "+r" (i), [v] "+Q" (v->counter)                           \
        : "r" (v));                                                     \
}

ATOMIC_OP(andnot, stclr)
ATOMIC_OP(or, stset)
ATOMIC_OP(xor, steor)
ATOMIC_OP(add, stadd)

 

__lse_atomic_fetch_add()등 16개 함수

아래 ATOMIC_FETCH_OPS() 및 ATOMIC_FETCH_OP() 매크로 함수를 통해 다음 함수들이 생성된다. (ordering 선택이 필요한 명령들이다)

  • __lse_atomic_andnot()
    • __lse_atomic_andnot_relaxed()
    • __lse_atomic_andnot_acquire()
    • __lse_atomic_andnot_release()
  • __lse_atomic_or()
    • __lse_atomic_or_relaxed()
    • __lse_atomic_or_acquire()
    • __lse_atomic_or_release()
  • __lse_atomic_xor()
    • __lse_atomic_xor_relaxed()
    • __lse_atomic_xor_acquire()
    • __lse_atomic_xor_release()
  • __lse_atomic_add()
    • __lse_atomic_add_relaxed()
    • __lse_atomic_add_acquire()
    • __lse_atomic_add_release()

 

arch/arm64/include/asm/atomic_lse.h

#define ATOMIC_FETCH_OP(name, mb, op, asm_op, cl...)                    \
static inline int __lse_atomic_fetch_##op##name(int i, atomic_t *v)     \
{                                                                       \
        asm volatile(                                                   \
        __LSE_PREAMBLE                                                  \
"       " #asm_op #mb " %w[i], %w[i], %[v]"                             \
        : [i] "+r" (i), [v] "+Q" (v->counter)                           \
        : "r" (v)                                                       \
        : cl);                                                          \
                                                                        \
        return i;                                                       \
}

#define ATOMIC_FETCH_OPS(op, asm_op)                                    \
        ATOMIC_FETCH_OP(_relaxed,   , op, asm_op)                       \
        ATOMIC_FETCH_OP(_acquire,  a, op, asm_op, "memory")             \
        ATOMIC_FETCH_OP(_release,  l, op, asm_op, "memory")             \
        ATOMIC_FETCH_OP(        , al, op, asm_op, "memory")

ATOMIC_FETCH_OPS(andnot, ldclr)
ATOMIC_FETCH_OPS(or, ldset)
ATOMIC_FETCH_OPS(xor, ldeor)
ATOMIC_FETCH_OPS(add, ldadd)

사용하는 lse atomic은 4가지 메모리 오더링 명령을 지원한다.

  • andnot
    • ldclr, ldclra, ldclrl, ldclral
  • or
    • ldset, ldseta, ldsetl, ldsetal
  • xor
    • ldeor, ldeora, ldeorl, ldeoral
  • add
    • ldadd, ldadda, ldaddl, ldaddal

 

__lse_atomic_add_return()등 4개 함수

아래 ATOMIC_OP_ADD_RETURN() 매크로 함수를 통해 다음 함수들이 생성된다. (ordering 선택이 필요한 명령들이다)

  • __lse_atomic_add_return()
    • __lse_atomic_add_return_relaxed()
    • __lse_atomic_add_return_acquire()
    • __lse_atomic_add_return_release()

 

arch/arm64/include/asm/atomic_lse.h

#define ATOMIC_OP_ADD_RETURN(name, mb, cl...)                           \
static inline int __lse_atomic_add_return##name(int i, atomic_t *v)     \
{                                                                       \
        u32 tmp;                                                        \
                                                                        \
        asm volatile(                                                   \
        __LSE_PREAMBLE                                                  \
        "       ldadd" #mb "    %w[i], %w[tmp], %[v]\n"                 \
        "       add     %w[i], %w[i], %w[tmp]"                          \
        : [i] "+r" (i), [v] "+Q" (v->counter), [tmp] "=&r" (tmp)        \
        : "r" (v)                                                       \
        : cl);                                                          \
                                                                        \
        return i;                                                       \
}

ATOMIC_OP_ADD_RETURN(_relaxed,   )
ATOMIC_OP_ADD_RETURN(_acquire,  a, "memory")
ATOMIC_OP_ADD_RETURN(_release,  l, "memory")
ATOMIC_OP_ADD_RETURN(        , al, "memory")

사용하는 lse atomic은 4가지 메모리 오더링 명령을 지원한다.

  • add
    • ldadd, ldadda, ldaddl, ldaddal

 

__lse_atomic_and()

arch/arm64/include/asm/atomic_lse.h

static inline void __lse_atomic_and(int i, atomic_t *v)
{
        asm volatile(
        __LSE_PREAMBLE
        "       mvn     %w[i], %w[i]\n"
        "       stclr   %w[i], %[v]"
        : [i] "+&r" (i), [v] "+Q" (v->counter)
        : "r" (v));
}

and 연산 atomic 함수는 매크로를 사용하지 않고 별도로 작성되어 있다.

 

__lse_atomic_fetch_and()등 4개 함수

아래 ATOMIC_FETCH_OP_AND() 매크로 함수를 통해 다음 함수들이 생성된다. (ordering 선택이 필요한 명령들이다)

  • __lse_atomic_fetch_add()
    • __lse_atomic_fetch_add_relaxed()
    • __lse_atomic_fetch_add_acquire()
    • __lse_atomic_fetch_add_release()

 

arch/arm64/include/asm/atomic_lse.h

#define ATOMIC_FETCH_OP_AND(name, mb, cl...)                            \
static inline int __lse_atomic_fetch_and##name(int i, atomic_t *v)      \
{                                                                       \
        asm volatile(                                                   \
        __LSE_PREAMBLE                                                  \
        "       mvn     %w[i], %w[i]\n"                                 \
        "       ldclr" #mb "    %w[i], %w[i], %[v]"                     \
        : [i] "+&r" (i), [v] "+Q" (v->counter)                          \
        : "r" (v)                                                       \
        : cl);                                                          \
                                                                        \
        return i;                                                       \
}

ATOMIC_FETCH_OP_AND(_relaxed,   )
ATOMIC_FETCH_OP_AND(_acquire,  a, "memory")
ATOMIC_FETCH_OP_AND(_release,  l, "memory")
ATOMIC_FETCH_OP_AND(        , al, "memory")

 

__lse_atomic_sub()

arch/arm64/include/asm/atomic_lse.h

static inline void __lse_atomic_sub(int i, atomic_t *v)
{
        asm volatile(
        __LSE_PREAMBLE
        "       neg     %w[i], %w[i]\n"
        "       stadd   %w[i], %[v]"
        : [i] "+&r" (i), [v] "+Q" (v->counter)
        : "r" (v));
}

sub 연산 atomic 함수는 매크로를 사용하지 않고 별도로 작성되어 있다.

 

__lse_atomic_sub_return()등 4개 함수

아래 ATOMIC_OP_SUB_RETURN() 매크로 함수를 통해 다음 함수들이 생성된다. (ordering 선택이 필요한 명령들이다)

  • __lse_atomic_sub_return()
    • __lse_atomic_sub_return_relaxed()
    • __lse_atomic_sub_return_acquire()
    • __lse_atomic_sub_return_release()

 

arch/arm64/include/asm/atomic_lse.h

#define ATOMIC_OP_SUB_RETURN(name, mb, cl...)                           \
static inline int __lse_atomic_sub_return##name(int i, atomic_t *v)     \
{                                                                       \
        u32 tmp;                                                        \
                                                                        \
        asm volatile(                                                   \
        __LSE_PREAMBLE                                                  \
        "       neg     %w[i], %w[i]\n"                                 \
        "       ldadd" #mb "    %w[i], %w[tmp], %[v]\n"                 \
        "       add     %w[i], %w[i], %w[tmp]"                          \
        : [i] "+&r" (i), [v] "+Q" (v->counter), [tmp] "=&r" (tmp)       \
        : "r" (v)                                                       \
        : cl);                                                  \
                                                                        \
        return i;                                                       \
}

ATOMIC_OP_SUB_RETURN(_relaxed,   )
ATOMIC_OP_SUB_RETURN(_acquire,  a, "memory")
ATOMIC_OP_SUB_RETURN(_release,  l, "memory")
ATOMIC_OP_SUB_RETURN(        , al, "memory")

사용하는 lse atomic은 add 명령을 사용하여 sub를 구현하였고, 4가지 메모리 오더링 명령을 지원한다.

  • ldadd
    • ldadd, ldadda, ldaddl, ldaddal

 

__lse_atomic_fetch_sub()등 4개 함수

아래 ATOMIC_FETCH_OP_SUB() 매크로 함수를 통해 다음 함수들이 생성된다. (ordering 선택이 필요한 명령들이다)

  • __lse_atomic_fetch_sub()
    • __lse_atomic_fetch_sub_relaxed()
    • __lse_atomic_fetch_sub_acquire()
    • __lse_atomic_fetch_sub_release()

 

arch/arm64/include/asm/atomic_lse.h

#define ATOMIC_FETCH_OP_SUB(name, mb, cl...)                            \
static inline int __lse_atomic_fetch_sub##name(int i, atomic_t *v)      \
{                                                                       \
        asm volatile(                                                   \
        __LSE_PREAMBLE                                                  \
        "       neg     %w[i], %w[i]\n"                                 \
        "       ldadd" #mb "    %w[i], %w[i], %[v]"                     \
        : [i] "+&r" (i), [v] "+Q" (v->counter)                          \
        : "r" (v)                                                       \
        : cl);                                                          \
                                                                        \
        return i;                                                       \
}

ATOMIC_FETCH_OP_SUB(_relaxed,   )
ATOMIC_FETCH_OP_SUB(_acquire,  a, "memory")
ATOMIC_FETCH_OP_SUB(_release,  l, "memory")
ATOMIC_FETCH_OP_SUB(        , al, "memory")

 


Exchange atomic – for ARM32 (LL/SC atomic)

atomic_xchg()

arch/arm/include/asm/atomic.h

#define atomic_xchg(v, new) (xchg(&((v)->counter), new))
  • v값을 읽은 후 new 값으로 교체한 후 다시 저장한다. 그리고 교체 전의 v 값을 반환한다.
    • *v <- new

 

xchg()

arch/arm/include/asm/cmpxchg.h

#define xchg(ptr, x) ({                                                 \
        ((__typeof__(*(ptr)))                                           \
                __xchg((unsigned long)(x), (ptr), sizeof(*(ptr))));     \
})

 

__xchg()

arch/arm/include/asm/cmpxchg.h

static inline
unsigned long __xchg(unsigned long x, volatile void *ptr, int size)
{
        unsigned long ret, flags;

        switch (size) {
        case 1:
#ifdef __xchg_u8
                return __xchg_u8(x, ptr);
#else
                local_irq_save(flags);
                ret = *(volatile u8 *)ptr;
                *(volatile u8 *)ptr = x;
                local_irq_restore(flags);
                return ret;
#endif /* __xchg_u8 */

        case 2:
#ifdef __xchg_u16
                return __xchg_u16(x, ptr);
#else
                local_irq_save(flags);
                ret = *(volatile u16 *)ptr;
                *(volatile u16 *)ptr = x;
                local_irq_restore(flags);
                return ret;
#endif /* __xchg_u16 */

        case 4:
#ifdef __xchg_u32
                return __xchg_u32(x, ptr);
#else
                local_irq_save(flags);
                ret = *(volatile u32 *)ptr;
                *(volatile u32 *)ptr = x;
                local_irq_restore(flags);
                return ret;
#endif /* __xchg_u32 */

#ifdef CONFIG_64BIT
        case 8:
#ifdef __xchg_u64
                return __xchg_u64(x, ptr);
#else
                local_irq_save(flags);
                ret = *(volatile u64 *)ptr;
                *(volatile u64 *)ptr = x;
                local_irq_restore(flags);
                return ret;
#endif /* __xchg_u64 */
#endif /* CONFIG_64BIT */

        default:
                __xchg_called_with_bad_pointer();
                return x;
        }
}

 

atomic_cmpxchg_relaxed()

mutex(optimistic spin lock), futex, qrwlock 등에서 사용하는 함수이다.

아래 소스도 ARMv6 이상 SMP 시스템용 코드이다.

  • 주요 어셈블리 코드는 다음과 같다.
    • ldrex   oldval <- [&ptr->counter]
    • mov     res, #0
    • teq     oldval, old
    • strexeq res, new, [&ptr->counter]
  • ptr->counter 값이 old와 같은 경우에만 new 값을 기록한다.
  • smp_mb()
    • 동기화를 위해 결과 값이 store buffer를 통해 메모리에 기록되는데 완전히 기록이 완료될 때까지 기다리고 oldval 값을 리턴한다.

arch/arm/include/asm/atomic.h

static inline int atomic_cmpxchg_relaxed(atomic_t *ptr, int old, int new)
{
        int oldval;
        unsigned long res;

        prefetchw(&ptr->counter);

        do {
                __asm__ __volatile__("@ atomic_cmpxchg\n"
                "ldrex  %1, [%3]\n"
                "mov    %0, #0\n"
                "teq    %1, %4\n"
                "strexeq %0, %5, [%3]\n"
                    : "=&r" (res), "=&r" (oldval), "+Qo" (ptr->counter)
                    : "r" (&ptr->counter), "Ir" (old), "r" (new)
                    : "cc");
        } while (res);

        return oldval;
}

ptr값을 읽어 old와 같은 경우에만 new 값으로 교체한 후 다시 저장한다. 그리고 교체 전의 ptr 값을 반환한다.

  • *ptr <- new (if *ptr == old)

 


Exchange atomic – for ARM64 (LL/SC & LSE atomic 공통)

atomic_xchg() 함수 추적 경로

ARM64 시스템에서 atomic_xchg() 함수를 추적해보면 다음과 같다.

  • atomic_xchg()
    • arch_atomic_xchg()
      • arch_xchg()
        • __xchg_wrapper()
          • __xchg()
            • __xchg_case_32()

 

atomic_xchg()

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • atomic_xchg()
    • atomic_xchg_acquire()
    • atomic_xchg_release()
    • atomic_xchg_relaxed()

 

include/asm-generic/atomic-instrumented.h

#if !defined(arch_atomic_xchg_relaxed) || defined(arch_atomic_xchg)
static __always_inline int
atomic_xchg(atomic_t *v, int i)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_xchg(v, i);
}
#define atomic_xchg atomic_xchg
#endif

#if defined(arch_atomic_xchg_acquire)
static __always_inline int
atomic_xchg_acquire(atomic_t *v, int i)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_xchg_acquire(v, i);
}
#define atomic_xchg_acquire atomic_xchg_acquire
#endif

#if defined(arch_atomic_xchg_release)
static __always_inline int
atomic_xchg_release(atomic_t *v, int i)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_xchg_release(v, i);
}
#define atomic_xchg_release atomic_xchg_release
#endif

#if defined(arch_atomic_xchg_relaxed)
static __always_inline int
atomic_xchg_relaxed(atomic_t *v, int i)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_xchg_relaxed(v, i);
}
#define atomic_xchg_relaxed atomic_xchg_relaxed
#endif

@ptr 주소에 @v 값으로 atomic 하게 치환한다.

 

arch_atomic_xchg()

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • arch_atomic_xchg()
    • arch_atomic_xchg_acquire()
    • arch_atomic_xchg_release()
    • arch_atomic_xchg_relaxed()

 

arch/arm64/include/asm/atomic.h

#define arch_atomic_xchg_relaxed(v, new) \
        arch_xchg_relaxed(&((v)->counter), (new))
#define arch_atomic_xchg_acquire(v, new) \
        arch_xchg_acquire(&((v)->counter), (new))
#define arch_atomic_xchg_release(v, new) \
        arch_xchg_release(&((v)->counter), (new))
#define arch_atomic_xchg(v, new) \
        arch_xchg(&((v)->counter), (new))

 

arch_xchg() 등 4개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • arch_xchg()
    • arch_xchg_acquire()
    • arch_xchg_release()
    • arch_xchg_relaxed()

 

arch/arm64/include/asm/cmpxchg.h

#define arch_xchg_relaxed(...)  __xchg_wrapper(    , __VA_ARGS__)
#define arch_xchg_acquire(...)  __xchg_wrapper(_acq, __VA_ARGS__)
#define arch_xchg_release(...)  __xchg_wrapper(_rel, __VA_ARGS__)
#define arch_xchg(...)          __xchg_wrapper( _mb, __VA_ARGS__)

 

__xchg_wrapper()

arch/arm64/include/asm/cmpxchg.h

#define __xchg_wrapper(sfx, ptr, x)                                     \
({                                                                      \
        __typeof__(*(ptr)) __ret;                                       \
        __ret = (__typeof__(*(ptr)))                                    \
                __xchg##sfx((unsigned long)(x), (ptr), sizeof(*(ptr))); \
        __ret;                                                          \
})

 

__xchg() 등 4개 함수

아래 __XCHG_GEN() 매크로 함수를 통해 메모리 ordering에 따라 다음 함수들이 준비되었다.

  • __xchg()
    • __xchg_acq()
    • __xchg_rel()
    • __xchg_mb()

 

arch/arm64/include/asm/cmpxchg.h

#define __XCHG_GEN(sfx)                                                 \
static __always_inline  unsigned long __xchg##sfx(unsigned long x,      \
                                        volatile void *ptr,             \
                                        int size)                       \
{                                                                       \
        switch (size) {                                                 \
        case 1:                                                         \
                return __xchg_case##sfx##_8(x, ptr);                    \
        case 2:                                                         \
                return __xchg_case##sfx##_16(x, ptr);                   \
        case 4:                                                         \
                return __xchg_case##sfx##_32(x, ptr);                   \
        case 8:                                                         \
                return __xchg_case##sfx##_64(x, ptr);                   \
        default:                                                        \
                BUILD_BUG();                                            \
        }                                                               \
                                                                        \
        unreachable();                                                  \
}

__XCHG_GEN()
__XCHG_GEN(_acq)
__XCHG_GEN(_rel)
__XCHG_GEN(_mb)

 

__xchg_case_8() 등 16개 함수

아래 __XCHG_CASE() 매크로 함수를 통해 메모리 ordering 및 데이터 사이즈에 따라 다음 함수들이 준비되었다.

  • __xchg_case_8()
    • __xchg_case_acq_8()
    • __xchg_case_rel_8()
    • __xchg_case_mb_8()
  • __xchg_case_16()
    • __xchg_case_acq_16()
    • __xchg_case_rel_16()
    • __xchg_case_mb_16()
  • __xchg_case_32()
    • __xchg_case_acq_32()
    • __xchg_case_rel_32()
    • __xchg_case_mb_32()
  • __xchg_case_64()
    • __xchg_case_acq_64()
    • __xchg_case_rel_64()
    • __xchg_case_mb_64()

 

arch/arm64/include/asm/cmpxchg.h

/*
 * We need separate acquire parameters for ll/sc and lse, since the full
 * barrier case is generated as release+dmb for the former and
 * acquire+release for the latter.
 */
#define __XCHG_CASE(w, sfx, name, sz, mb, nop_lse, acq, acq_lse, rel, cl)       \
static inline u##sz __xchg_case_##name##sz(u##sz x, volatile void *ptr)         \
{                                                                               \
        u##sz ret;                                                              \
        unsigned long tmp;                                                      \
                                                                                \
        asm volatile(ARM64_LSE_ATOMIC_INSN(                                     \
        /* LL/SC */                                                             \
        "       prfm    pstl1strm, %2\n"                                        \
        "1:     ld" #acq "xr" #sfx "\t%" #w "0, %2\n"                           \
        "       st" #rel "xr" #sfx "\t%w1, %" #w "3, %2\n"                      \
        "       cbnz    %w1, 1b\n"                                              \
        "       " #mb,                                                          \
        /* LSE atomics */                                                       \
        "       swp" #acq_lse #rel #sfx "\t%" #w "3, %" #w "0, %2\n"            \
                __nops(3)                                                       \
        "       " #nop_lse)                                                     \
        : "=&r" (ret), "=&r" (tmp), "+Q" (*(u##sz *)ptr)                        \
        : "r" (x)                                                               \
        : cl);                                                                  \
                                                                                \
        return ret;                                                             \
}

__XCHG_CASE(w, b,     ,  8,        ,    ,  ,  ,  ,         )
__XCHG_CASE(w, h,     , 16,        ,    ,  ,  ,  ,         )
__XCHG_CASE(w,  ,     , 32,        ,    ,  ,  ,  ,         )
__XCHG_CASE( ,  ,     , 64,        ,    ,  ,  ,  ,         )
__XCHG_CASE(w, b, acq_,  8,        ,    , a, a,  , "memory")
__XCHG_CASE(w, h, acq_, 16,        ,    , a, a,  , "memory")
__XCHG_CASE(w,  , acq_, 32,        ,    , a, a,  , "memory")
__XCHG_CASE( ,  , acq_, 64,        ,    , a, a,  , "memory")
__XCHG_CASE(w, b, rel_,  8,        ,    ,  ,  , l, "memory")
__XCHG_CASE(w, h, rel_, 16,        ,    ,  ,  , l, "memory")
__XCHG_CASE(w,  , rel_, 32,        ,    ,  ,  , l, "memory")
__XCHG_CASE( ,  , rel_, 64,        ,    ,  ,  , l, "memory")
__XCHG_CASE(w, b,  mb_,  8, dmb ish, nop,  , a, l, "memory")
__XCHG_CASE(w, h,  mb_, 16, dmb ish, nop,  , a, l, "memory")
__XCHG_CASE(w,  ,  mb_, 32, dmb ish, nop,  , a, l, "memory")
__XCHG_CASE( ,  ,  mb_, 64, dmb ish, nop,  , a, l, "memory")

atomic xchg 명령을 수행하는데 LSE 기능이 있는 시스템의 경우 lse 방식, 그렇지 않은 경우 ll_sc 방식으로 치환한다.

  • ll/sc의 경우 양방향 베리어에 cost가 높은 dmb ish를 사용한다.

 


atomic_cmpxchg() 함수 추적 경로

ARM64 시스템에서 atomic_cmpxchg() 함수를 추적해보면 다음과 같다.

  • atomic_cmpxchg()
    • arch_atomic_cmpxchg()
      • arch_cmpxchg()
        • __cmpxchg_wrapper()
          • __cmpxchg()
            • __cmpxchg_case_32()
              • __ll_sc__cmpxchg_case_32() 또는 __lse__cmpxchg_case_32()

atomic_cmpxchg() 등 4개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • atomic_cmpxchg()
    • atomic_cmpxchg_acquire()
    • atomic_cmpxchg_release()
    • atomic_cmpxchg_relaxed()

 

include/asm-generic/atomic-instrumented.h

#if !defined(arch_atomic_cmpxchg_relaxed) || defined(arch_atomic_cmpxchg)
static __always_inline int
atomic_cmpxchg(atomic_t *v, int old, int new)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_cmpxchg(v, old, new);
}
#define atomic_cmpxchg atomic_cmpxchg
#endif
#if defined(arch_atomic_cmpxchg_acquire)
static __always_inline int
atomic_cmpxchg_acquire(atomic_t *v, int old, int new)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_cmpxchg_acquire(v, old, new);
}
#define atomic_cmpxchg_acquire atomic_cmpxchg_acquire
#endif
#if defined(arch_atomic_cmpxchg_release)
static __always_inline int
atomic_cmpxchg_release(atomic_t *v, int old, int new)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_cmpxchg_release(v, old, new);
}
#define atomic_cmpxchg_release atomic_cmpxchg_release
#endif
#if defined(arch_atomic_cmpxchg_relaxed)
static __always_inline int
atomic_cmpxchg_relaxed(atomic_t *v, int old, int new)
{
        instrument_atomic_read_write(v, sizeof(*v));
        return arch_atomic_cmpxchg_relaxed(v, old, new);
}
#define atomic_cmpxchg_relaxed atomic_cmpxchg_relaxed
#endif

@ptr 주소에 @old 값이 있는 경우 @v 값으로 atomic 하게 치환한다.

 

arch_atomic_cmpxchg() 등 4개 매크로 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • arch_atomic_cmpxchg()
    • arch_atomic_cmpxchg_acquire()
    • arch_atomic_cmpxchg_release()
    • arch_atomic_cmpxchg_relaxed()

 

arch/arm64/include/asm/atomic.h

#define arch_atomic_cmpxchg_relaxed(v, old, new) \
        arch_cmpxchg_relaxed(&((v)->counter), (old), (new))
#define arch_atomic_cmpxchg_acquire(v, old, new) \
        arch_cmpxchg_acquire(&((v)->counter), (old), (new))
#define arch_atomic_cmpxchg_release(v, old, new) \
        arch_cmpxchg_release(&((v)->counter), (old), (new))
#define arch_atomic_cmpxchg(v, old, new) \
        arch_cmpxchg(&((v)->counter), (old), (new))

 

arch_cmpxchg() 등 4개 매크로 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • arch_cmpxchg()
    • arch_cmpxchg_acquire()
    • arch_cmpxchg_release()
    • arch_cmpxchg_relaxed()

 

arch/arm64/include/asm/cmpxchg.h

/* cmpxchg */
#define arch_cmpxchg_relaxed(...)       __cmpxchg_wrapper(    , __VA_ARGS__)
#define arch_cmpxchg_acquire(...)       __cmpxchg_wrapper(_acq, __VA_ARGS__)
#define arch_cmpxchg_release(...)       __cmpxchg_wrapper(_rel, __VA_ARGS__)
#define arch_cmpxchg(...)               __cmpxchg_wrapper( _mb, __VA_ARGS__)

 

__cmpxchg_wrapper()

arch/arm64/include/asm/cmpxchg.h

#define __cmpxchg_wrapper(sfx, ptr, o, n)                               \
({                                                                      \
        __typeof__(*(ptr)) __ret;                                       \
        __ret = (__typeof__(*(ptr)))                                    \
                __cmpxchg##sfx((ptr), (unsigned long)(o),               \
                                (unsigned long)(n), sizeof(*(ptr)));    \
        __ret;                                                          \
})

 

__cmpxchg() 등 4개 함수

아래 __CMPXCHG_GEN() 매크로 함수를 통해 메모리 ordering에 따라 다음 함수들이 준비되었다.

  • __cmpxchg()
    • __cmpxchg_acq()
    • __cmpxchg_rel()
    • __cmpxchg_mb()

 

arch/arm64/include/asm/cmpxchg.h

#define __CMPXCHG_GEN(sfx)                                              \
static __always_inline unsigned long __cmpxchg##sfx(volatile void *ptr, \
                                           unsigned long old,           \
                                           unsigned long new,           \
                                           int size)                    \
{                                                                       \
        switch (size) {                                                 \
        case 1:                                                         \
                return __cmpxchg_case##sfx##_8(ptr, old, new);          \
        case 2:                                                         \
                return __cmpxchg_case##sfx##_16(ptr, old, new);         \
        case 4:                                                         \
                return __cmpxchg_case##sfx##_32(ptr, old, new);         \
        case 8:                                                         \
                return __cmpxchg_case##sfx##_64(ptr, old, new);         \
        default:                                                        \
                BUILD_BUG();                                            \
        }                                                               \
                                                                        \
        unreachable();                                                  \
}

__CMPXCHG_GEN()
__CMPXCHG_GEN(_acq)
__CMPXCHG_GEN(_rel)
__CMPXCHG_GEN(_mb)

 

__cmpxchg_case_8() 등 16개 함수

아래 __CMPXCHG_CASE() 매크로 함수를 통해 메모리 ordering 및 데이터 사이즈에 따라 다음 함수들이 준비되었다.

  • __cmpxchg_case_8()
    • __cmpxchg_case_acq_8()
    • __cmpxchg_case_rel_8()
    • __cmpxchg_case_mb_8()
  • __cmpxchg_case_16()
    • __cmpxchg_case_acq_16()
    • __cmpxchg_case_rel_16()
    • __cmpxchg_case_mb_16()
  • __cmpxchg_case_32()
    • __cmpxchg_case_acq_32()
    • __cmpxchg_case_rel_32()
    • __cmpxchg_case_mb_32()
  • __cmpxchg_case_64()
    • __cmpxchg_case_acq_64()
    • __cmpxchg_case_rel_64()
    • __cmpxchg_case_mb_64()

 

arch/arm64/include/asm/cmpxchg.h

#define __CMPXCHG_CASE(name, sz)                        \
static inline u##sz __cmpxchg_case_##name##sz(volatile void *ptr,       \
                                              u##sz old,                \
                                              u##sz new)                \
{                                                                       \
        return __lse_ll_sc_body(_cmpxchg_case_##name##sz,               \
                                ptr, old, new);                         \
}

__CMPXCHG_CASE(    ,  8)
__CMPXCHG_CASE(    , 16)
__CMPXCHG_CASE(    , 32)
__CMPXCHG_CASE(    , 64)
__CMPXCHG_CASE(acq_,  8)
__CMPXCHG_CASE(acq_, 16)
__CMPXCHG_CASE(acq_, 32)
__CMPXCHG_CASE(acq_, 64)
__CMPXCHG_CASE(rel_,  8)
__CMPXCHG_CASE(rel_, 16)
__CMPXCHG_CASE(rel_, 32)
__CMPXCHG_CASE(rel_, 64)
__CMPXCHG_CASE(mb_,  8)
__CMPXCHG_CASE(mb_, 16)
__CMPXCHG_CASE(mb_, 32)
__CMPXCHG_CASE(mb_, 64)

 

__ll_sc__cmpxchg_case_8() 등 16개 함수 – for LL/SC atomic

아래 __CMPXCHG_CASE() 매크로 함수를 통해 메모리 ordering 및 데이터 사이즈에 따라 다음 함수들이 준비되었다.

  • __ll_sc__cmpxchg_case_8()
    • __ll_sc__cmpxchg_case_acq_8()
    • __ll_sc__cmpxchg_case_rel_8()
    • __ll_sc__cmpxchg_case_mb_8()
  • __ll_sc__cmpxchg_case_16()
    • __ll_sc__cmpxchg_case_acq_16()
    • __ll_sc__cmpxchg_case_rel_16()
    • __ll_sc__cmpxchg_case_mb_16()
  • __ll_sc__cmpxchg_case_32()
    • __ll_sc__cmpxchg_case_acq_32()
    • __ll_sc__cmpxchg_case_rel_32()
    • __ll_sc__cmpxchg_case_mb_32()
  • __ll_sc__cmpxchg_case_64()
    • __ll_sc__cmpxchg_case_acq_64()
    • __ll_sc__cmpxchg_case_rel_64()
    • __ll_sc__cmpxchg_case_mb_64()

 

arch/arm64/include/asm/atomic_ll_sc.h

#define __CMPXCHG_CASE(w, sfx, name, sz, mb, acq, rel, cl, constraint)  \
static inline u##sz                                                     \
__ll_sc__cmpxchg_case_##name##sz(volatile void *ptr,                    \
                                         unsigned long old,             \
                                         u##sz new)                     \
{                                                                       \
        unsigned long tmp;                                              \
        u##sz oldval;                                                   \
                                                                        \
        /*                                                              \
         * Sub-word sizes require explicit casting so that the compare  \
         * part of the cmpxchg doesn't end up interpreting non-zero     \
         * upper bits of the register containing "old".                 \
         */                                                             \
        if (sz < 32)                                                    \
                old = (u##sz)old;                                       \
                                                                        \
        asm volatile(                                                   \
        __LL_SC_FALLBACK(                                               \
        "       prfm    pstl1strm, %[v]\n"                              \
        "1:     ld" #acq "xr" #sfx "\t%" #w "[oldval], %[v]\n"          \
        "       eor     %" #w "[tmp], %" #w "[oldval], %" #w "[old]\n"  \
        "       cbnz    %" #w "[tmp], 2f\n"                             \
        "       st" #rel "xr" #sfx "\t%w[tmp], %" #w "[new], %[v]\n"    \
        "       cbnz    %w[tmp], 1b\n"                                  \
        "       " #mb "\n"                                              \
        "2:")                                                           \
        : [tmp] "=&r" (tmp), [oldval] "=&r" (oldval),                   \
          [v] "+Q" (*(u##sz *)ptr)                                      \
        : [old] __stringify(constraint) "r" (old), [new] "r" (new)      \
        : cl);                                                          \
                                                                        \
        return oldval;                                                  \
}

/*
 * Earlier versions of GCC (no later than 8.1.0) appear to incorrectly
 * handle the 'K' constraint for the value 4294967295 - thus we use no
 * constraint for 32 bit operations.
 */
__CMPXCHG_CASE(w, b,     ,  8,        ,  ,  ,         , K)
__CMPXCHG_CASE(w, h,     , 16,        ,  ,  ,         , K)
__CMPXCHG_CASE(w,  ,     , 32,        ,  ,  ,         , K)
__CMPXCHG_CASE( ,  ,     , 64,        ,  ,  ,         , L)
__CMPXCHG_CASE(w, b, acq_,  8,        , a,  , "memory", K)
__CMPXCHG_CASE(w, h, acq_, 16,        , a,  , "memory", K)
__CMPXCHG_CASE(w,  , acq_, 32,        , a,  , "memory", K)
__CMPXCHG_CASE( ,  , acq_, 64,        , a,  , "memory", L)
__CMPXCHG_CASE(w, b, rel_,  8,        ,  , l, "memory", K)
__CMPXCHG_CASE(w, h, rel_, 16,        ,  , l, "memory", K)
__CMPXCHG_CASE(w,  , rel_, 32,        ,  , l, "memory", K)
__CMPXCHG_CASE( ,  , rel_, 64,        ,  , l, "memory", L)
__CMPXCHG_CASE(w, b,  mb_,  8, dmb ish,  , l, "memory", K)
__CMPXCHG_CASE(w, h,  mb_, 16, dmb ish,  , l, "memory", K)
__CMPXCHG_CASE(w,  ,  mb_, 32, dmb ish,  , l, "memory", K)
__CMPXCHG_CASE( ,  ,  mb_, 64, dmb ish,  , l, "memory", L)

@ptr 주소에 담긴 값을 읽어온 값이 입출력 인자 @old와 동일한 경우에만 입력 인자 @new 값을 기록한다. 또한 기록 여부와 상관 없이 입출력 인자 @old에 기록 전의 값을 알아온다.

  • 코드 라인 15~16에서 8비트 또는 16비트 사이즈 타입인 경우 해당 타입에 맞게 old 값을 변환한다.
  • 코드 라인 20에서 입력 인자 @ptr 주소에 담긴 값을 캐시에 미리 로드해둔다.
    • exclusive 모니터에 의해 해당 캐시라인이 atomic 경쟁이 감지되면 실패하여 다시 시도하는데 이 retry latency를 짧게 하기 위해 캐시에 미리 로드한다.
  • 코드 라인 21에서 입력 인자 @ptr 주소에 담긴 값을 임시 변수 oldval에 로드해온다.
  • 코드 라인 22~23에서 읽어온 oldval 값에 입출력 인자 @old 값과 비교하여 다른 경우 입력 인자 @new 값을 기록하지 않기 위해 레이블 2로 이동한다.
  • 코드 라인 24~25에서 입력 인자 @new 값을 @ptr 주소에 기록하는데, 만일 @ptr 주소가 담긴 캐시라인에 외부에서 atomic 접근이 감지되면 실패로 인식하여 재시도 하기 위해 레이블 1로 이동한다.
  • 코드 라인 26에서 양방향 베리어가 필요한 atomic 명령인 경우 dmb ish를 수행한다.

 

__lse__cmpxchg_case_8() 등 16개 함수 – for LSE atomic

아래 __CMPXCHG_CASE() 매크로 함수를 통해 메모리 ordering 및 데이터 사이즈에 따라 다음 함수들이 준비되었다.

  • __lse__cmpxchg_case_8()
    • __lse__cmpxchg_case_acq_8()
    • __lse__cmpxchg_case_rel_8()
    • __lse__cmpxchg_case_mb_8()
  • __lse__cmpxchg_case_16()
    • __lse__cmpxchg_case_acq_16()
    • __lse__cmpxchg_case_rel_16()
    • __lse__cmpxchg_case_mb_16()
  • __lse__cmpxchg_case_32()
    • __lse__cmpxchg_case_acq_32()
    • __lse__cmpxchg_case_rel_32()
    • __lse__cmpxchg_case_mb_32()
  • __lse__cmpxchg_case_64()
    • __lse__cmpxchg_case_acq_64()
    • __lse__cmpxchg_case_rel_64()
    • __lse__cmpxchg_case_mb_64()

 

arch/arm64/include/asm/atomic_lse.h

#define __CMPXCHG_CASE(w, sfx, name, sz, mb, cl...)                     \
static __always_inline u##sz                                            \
__lse__cmpxchg_case_##name##sz(volatile void *ptr,                      \
                                              u##sz old,                \
                                              u##sz new)                \
{                                                                       \
        register unsigned long x0 asm ("x0") = (unsigned long)ptr;      \
        register u##sz x1 asm ("x1") = old;                             \
        register u##sz x2 asm ("x2") = new;                             \
        unsigned long tmp;                                              \
                                                                        \
        asm volatile(                                                   \
        __LSE_PREAMBLE                                                  \
        "       mov     %" #w "[tmp], %" #w "[old]\n"                   \
        "       cas" #mb #sfx "\t%" #w "[tmp], %" #w "[new], %[v]\n"    \
        "       mov     %" #w "[ret], %" #w "[tmp]"                     \
        : [ret] "+r" (x0), [v] "+Q" (*(unsigned long *)ptr),            \
          [tmp] "=&r" (tmp)                                             \
        : [old] "r" (x1), [new] "r" (x2)                                \
        : cl);                                                          \
                                                                        \
        return x0;                                                      \
}

__CMPXCHG_CASE(w, b,     ,  8,   )
__CMPXCHG_CASE(w, h,     , 16,   )
__CMPXCHG_CASE(w,  ,     , 32,   )
__CMPXCHG_CASE(x,  ,     , 64,   )
__CMPXCHG_CASE(w, b, acq_,  8,  a, "memory")
__CMPXCHG_CASE(w, h, acq_, 16,  a, "memory")
__CMPXCHG_CASE(w,  , acq_, 32,  a, "memory")
__CMPXCHG_CASE(x,  , acq_, 64,  a, "memory")
__CMPXCHG_CASE(w, b, rel_,  8,  l, "memory")
__CMPXCHG_CASE(w, h, rel_, 16,  l, "memory")
__CMPXCHG_CASE(w,  , rel_, 32,  l, "memory")
__CMPXCHG_CASE(x,  , rel_, 64,  l, "memory")
__CMPXCHG_CASE(w, b,  mb_,  8, al, "memory")
__CMPXCHG_CASE(w, h,  mb_, 16, al, "memory")
__CMPXCHG_CASE(w,  ,  mb_, 32, al, "memory")
__CMPXCHG_CASE(x,  ,  mb_, 64, al, "memory")

@ptr 주소에 담긴 값을 읽어온 값이 입출력 인자 @old와 동일한 경우에만 입력 인자 @new 값을 기록한다. 또한 기록 여부와 상관 없이 입출력 인자 @old에 기록 전의 값을 알아온다.

  • 코드 라인 14에서 임시 변수 레지스터 tmp에 @old 값을 대입해둔다.
  • 코드 라인 15~16에서 cas 명령을 사용하여 @ptr 주소에 담긴 값이 tmp와 같은 경우 @new 값을 기록한다. 기록 여부와 상관 없이 tmp에 @ptr 주소에 담겼었던 값을 알아온 후 결과 값으로 반환한다.

 


기타 Read/Write(Set) 관련 – ARM64

atomic_read() 및 atomic_set() 등 4개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • atomic_read()
    • atomic_read_acquire()
  • atomic_set()
    • atomic_set_release()

 

static __always_inline int
atomic_read(const atomic_t *v)
{
        instrument_atomic_read(v, sizeof(*v));
        return arch_atomic_read(v);
}
#define atomic_read atomic_read

#if defined(arch_atomic_read_acquire)
static __always_inline int
atomic_read_acquire(const atomic_t *v)
{
        instrument_atomic_read(v, sizeof(*v));
        return arch_atomic_read_acquire(v);
}
#define atomic_read_acquire atomic_read_acquire
#endif

static __always_inline void
atomic_set(atomic_t *v, int i)
{
        instrument_atomic_write(v, sizeof(*v));
        arch_atomic_set(v, i);
}
#define atomic_set atomic_set

#if defined(arch_atomic_set_release)
static __always_inline void
atomic_set_release(atomic_t *v, int i)
{
        instrument_atomic_write(v, sizeof(*v));
        arch_atomic_set_release(v, i);
}
#define atomic_set_release atomic_set_release
#endif

 

arch_atomic_read() 등 4개 함수

다음 함수들이 메모리 ordering 옵션에 따라 준비되어 있다.

  • arch_atomic_read()
    • arch_atomic_read_acquire()
  • arch_atomic_set()
    • arch_atomic_set_release()

 

arch/arm64/include/asm/atomic.h

#define arch_atomic_read(v)                     __READ_ONCE((v)->counter)
#define arch_atomic_set(v, i)                   __WRITE_ONCE(((v)->counter), (i))
  • arch_atomic_read()
    • atomic_t 주소 @v가 가리키는 값을 한 번에(atomically) 읽어온다.
  • arch_atomic_set()
    • atomic_t 주소 @v에 @i 값을 한 번에(atomically) 기록한다.

 

아래 두 개 함수는 arm64 아키텍처에 정의되지 않아 fallback 함수를 사용한다.

include/linux/atomic-arch-fallback.h

#ifndef arch_atomic_read_acquire
static __always_inline int
arch_atomic_read_acquire(const atomic_t *v)
{
        return smp_load_acquire(&(v)->counter);
}
#define arch_atomic_read_acquire arch_atomic_read_acquire
#endif

#ifndef arch_atomic_set_release
static __always_inline void
arch_atomic_set_release(atomic_t *v, int i)
{
        smp_store_release(&(v)->counter, i);
}
#define arch_atomic_set_release arch_atomic_set_release
#endif

smp_load_acquire() 및 smp_store_release() 단방향 베리어 함수를 호출하여 사용한다.

  • smp_load_acquire() 및 smp_store_release() 함수는 다음 글을 참고

 

조건부 Read Atomic

atomic_cond_read_acquire()

include/linux/atomic.h

#define atomic_cond_read_acquire(v, c) smp_cond_load_acquire(&(v)->counter, (c))

주소 v의 데이터를 읽은 후 조건 c가 true인 경우 읽은 데이터를 반환한다. 데이터를 읽을 때 단방향 acquire 베리어를 사용한다. 만일 조건을 만족하지 못하는 경우 반복하며 시도한다.

  • ARM64의 경우 절전을 위해 spin wait 시 wfe(wait for event)와 sev를 사용한다.
  • smp_cond_load_acquire() 함수는 다음 글을 참고

 

atomic_cond_read_relaxed()

include/linux/atomic.h

#define atomic_cond_read_relaxed(v, c) smp_cond_load_relaxed(&(v)->counter, (c))

주소 v의 데이터를 읽은 후 조건 c가 true인 경우 읽은 데이터를 반환한다. 만일 조건을 만족하지 못하는 경우 반복하며 시도한다.

  • ARM64의 경우 절전을 위해 spin wait 시 wfe(wait for event)와 sev를 사용한다.

 

위의 64비트 액세스 API는 다음과 같다.

  • atomic64_cond_read_acquire()
  • atomic64_cond_read_relaxed()

 


atomic 구조체 타입

atomic_t

include/linux/types.h

typedef struct {
        int counter;
} atomic_t;
  • 32bit counter 변수 하나로만 구성되어 있다.

 

atomic64_t

include/linux/types.h

#ifdef CONFIG_64BIT
typedef struct {
        long counter;
} atomic64_t;
#endif
  • 64bit counter 변수 하나로만 구성되어 있다.

 

참고

 

RCU(Read Copy Update) -1- (Basic)

<kernel v5.4>

RCU 기초

RCU History

  • RCU는 읽기 동작에서 블러킹 되지 않는 read/write 동기화 메커니즘
  • 2002년 커널 버전 2.5.43에서 소개됨
  • 2005년 PREEMPT_RCU가 추가됨
  • 2009년 user-level RCU도 소개됨

 

장/단점

RCU는 read-side overhead를 최소화하는데 목적이 있기 때문에 동기화 로직이 읽기 동작에 더 많은 비율로 사용되는 경우에만 사용한다. 수정(Update) 동작이 10%이상인 경우 오히려 성능이 떨어지므로 RCU 대신 다른 동기화 기법을 선택해야 한다.

  • 장점
    • read side overhead가 거의 없음. zero wait, zero overhead
      • rcu non-preempt 커널 모델인 경우 zero wait 이지만, rcu preemptible 커널 모델의 경우 태스크 구조체의 rcu_read_lock_nesting 카운터를 증감시켜 사용하므로 zero wait은 아니고 그래도 빠른 편이다.
    • deadlock 이슈 없음
    • priority inversion 이슈 없음 (priority inversion & priority inheritance)
    • unbounded latency 이슈 없음
    • 메모리 leak hazard 이슈 없음
  • 단점
    • 사용이 약간 복잡
    • 쓰기 동작에서는 다른 동기화 기법보다 조금 더 느리다.

 

RCU 구현 옵션

  • Classical RCU – a.k.a tiny RCU
    • CONFIG_TINY_RCU 커널 옵션
    • single 데이터 스트럭처
    • CPU가 많아지는 경우 성능 떨어짐
      • 현재 커널이 SMP가 아니고 non-preemptible로 빌드된 경우 디폴트로 선택되는 RCU 옵션이다.
  • Hierarchical RCU – a.k.a tree RCU
    • CONFIG_TREE_RCU 커널 옵션
    • tree 확장된 RCU 구현
    • 현재 커널이 SMP이고 non-preempt-able로 빌드된 경우 디폴트로 선택되는 RCU 옵션이다.
  • Preemptible tree-based hierarchical RCU
    • tree 확장된 RCU 구현이면서 preemptible(CONFIG_PREEMPT_RCU 커널 옵션)을 사용하는 최근 linux kernel의 기본 RCU이다. (코드 분석에서 이 모델을 우선)
    • read-side critical section에서 preemption이 지원된다. (주의: 그렇다고 슬립 api를 사용하면 안된다)
  • SRCU
    • CONFIG_SRCU 커널 옵션을 추가하고 RCU API 대신 별도의 SRCU API를 사용하는 경우 read-side critical section에서 sleep api를 사용할 수 있다.
      • rcu_read_lock() -> srcu_read_lock()
  • URCU
    • Userspace RCU (liburcu)

 

 RCU 구현 옵션 선택

RCU를 선택하여 사용할 때 커널 옵션은 일반적으로 다음과 같이 나누어 선택할 수 있다.

  • CONFIG_TINY_RCU
    • 조건: !PREEMPTION && !SMP
    • footprint가 작은 시스템에서 운영할 때 적절하다.
  • CONFIG_TREE_RCU
    • 조건: !PREEMPTION && SMP
    • SMP 시스템이고 서버로 운영할 때 적절하다.
  • CONFIG_PREEMPT_RCU (기존 CONFIG_TREE_PREEMPT_RCU)
    • 조건: PREEMPTION
    • 빠른 응답을 요구하는 임베디드 시스템에서 운영할 때 적절하다.
    • 커널 v5.6-rc1 부터 이 옵션이 없이 위의 CONFIG_TREE_RCU 만으로 동작하게 변경하였다.

 

추가적으로 절전을 요구하는 시스템에서 다음과 같은 설정을 사용한다.

  • CONFIG_RCU_FAST_NO_HZ=y
  • CONFIG_RCU_NOCB_CPU=y

 

더 자세한 커널 파라메터 설정은 다음 문서를 참고한다.

 

RCU 성능

reader-side에서의 성능 비교 (rwlock vs RCU)

rcu2

(통계: lwn.net 참고)

 

리눅스 커널에서의 사용율 증가

rcu3

 

일반 lock 획득시의 나쁜 성능

  • lock 획득하는데 소요하는 자원이 critical section을 수행하는 것에 비해 수백배 이상 over-head가 발생한다.

rcu4

 

RCU  기본 요소

RCU는 다음과 같이 3가지의 기본 요소와 특징이 있다.

  • Reader
    • rcu_read_lock()과 rcu_read_unlock() 코드 범위의 Read-side critical section이다.
    • 접근에 대한 주의
      • 가장 주의 할 것으로 보호 받아야 할 데이터(protected rcu data)에 접근할 때 항상 이 영역내에서만 접근하여야 한다.
      • 이 영역을 벗어난 사용은 RCU 규칙을 벗어나는 것이며 존재하지 않는 자료 또는 잘못된 데이터로의 접근이될 수 있다.
      • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 한다.
        • rcu_dereference() 매크로 함수를 사용하면 간단하다.
    • 구현에 따른 차이
      • non-preemptible RCU 커널을 사용하는 경우에는 read_rcu_lock() 내부에서 preemption을 금지하는 코드만 적용된다. sleep하는 blocked api를 사용하면 안되고 가급적 빨리 빠져나와야 한다.
      • preemptible RCU 커널을 사용하는 경우에는 read_rcu_lock() 내부에서 로컬 카운터를 증가시키고 read_rcu_unlock() 내부에서 로컬 카운터를 감소시킨다. read-side critical section 내에서 preemption될 수 있다. 만일 SRCU 커널 옵션도 사용하는 경우 sleep 가능한 blocked api의 사용도 가능하다.
  • Updater
    • 기존의 여러 가지 락 중 하나를 사용하여 데이터를 보호하고 수정한다.
    • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 한다.
      • rcu_dereference() 및 rcu_assign_pointer() 매크로 함수를 사용하면 간단하다.
    • 보호 받을 자료를 복사하여 수정한 후 기존 자료의 회수(free)를 위해 동기 함수인 synchronize_rcu() 또는 비동기 함수인 call_rcu() 함수를 호출하여 아래의 reclaimer를 동작하게 하다.
    • 이 구성요소에서 Read, Copy, Update 과정을 사용한다.
  • Reclaimer
    • Update에서 최종 수정한 객체가 아닌 사용이 완료된 객체들에 대해 cleanup한다.
    • 안전하게 cleanup되도록 삭제할 데이터에 접근하는 Reader 들이 없음을 보장하기 위해 Grace period가 지난 후에 Reclaimer가 동작한다.

rcu1


RCU 기본 API

RCU에는 5개의 기본 API가 사용된다.

  • rcu_read_lock()
    • read side critical section의 시작 부분에 사용되어야 한다.
  • rcu_read_unlock()
    • read side critical section의 끝 부분에 사용되어야 한다.
  • rcu_assign_pointer()
    • 일반 object 포인터를 rcu 보호 포인터에 지정할 때 사용한다.
    • 내부 구현에서 일반 object 포인터와 rcu 보호 포인터 사이에 메모리 베리어가 사용된다.
  • rcu_dereference()
    • rcu 보호 포인터로부터 일반 object 포인터를 알아올 때 사용한다.
    • 내부 구현에서 rcu 보호 포인터와 일반 object 포인터 사이에 메모리 베리어가 사용된다.
  • synchronize_rcu() & call_rcu()
    • synchronize_rcu()
      • sync 방식으로 GP(Grace Period)가 끝나기를 기다린다.
    • call_rcu()
      • async 방식으로 GP(Grace Period)가 끝나면 호출되도록 함수를 지정하여 사용한다.

 

RCU 기본 API만을 사용한 단순 접근 방법

Reader: RCU node를 사용할 때

  • Read side critical section의 시작은 rcu_read_lock()으로 시작한다.
    • non-preemptible RCU 커널을 사용하는 경우에는 이 구간에서 preemption을 금지시킨다.
      • 이 영역에서는 sleep하는 blocked api를 사용하면 안되고 가급적 빨리 빠져나와야 한다.
    • preemptible RCU 커널을 사용하는 경우에는 로컬 카운터를 증가시킨다.
      • 이 영역에서도 preemption이 가능하다. 그러나 내부에서 sleep이 가능한 block-api를 사용하면 안된다.
  • 실제 보호 받아야 할 데이터에 접근하기 전에 메모리 배리어를 사용한 후에 참조하여야 하는데 이를 쉽게 할 수 있도록 rcu_dereference()와 같은 매크로 함수를 사용하면 편리하다. 이를 사용하여 안전하게 dereference된 RCU-protected pointer 값을 얻어온다.
    • read critical section을 완료하면 더 이상 dereference된 pointer를 사용할 수 없으므로 이 포인터를 이용하여 얻어올 해당 자료의 l-value 값 들을 읽어온다.(구조체 전체를 복사하여 가져오거나…)
  • Read side critical section의 끝은 read_rcu_unlock() 함수로 완료한다.
    • non-preemptible RCU 커널을 사용하는 경우에는 preemption를 다시 허용시킨다.
    • preemptible RCU 커널을 사용하는 경우에는 로컬 카운터를 감소시킨다.

 

다음 동기화되어 보호받아야 할 데이터의 멤버 a를 얻어오는 코드를 확인한다. (preemptible rcu)

  • 글로벌 참조 카운터의 증가는 conceptual한 것이다.
  • 실제 구현에서의  참조카운터 증가는 현재 태스크 디스크립터에 저장된다. 태스크 디스크립터는 캐시 되어 있을 것이므로 매우 빠른 액세스가 보장된다.
    • (p->rcu_read_lock_nesting)

rcu5b

int foo_get_a(void)
{
	int retval;
	rcu_read_lock();
	retval = rcu_dereference(gbl_foo)->a;
	rcu_read_unlock();
	return retval;
}

 

Updater: RCU node의 수정

  • 기존 노드 데이터를 곧바로 수정하지 않고 복사(Read-Copy)한 후 그 복사한 자료를 수정(Update)해야 하므로 준비 과정으로 새로운 노드 데이터를 먼저 할당한다.
  • Write side critical section의 시작은 기존 lock 중 적절한 하나를 사용한다.
  • rcu_dereference_protected() 매크로 함수를 사용하여 매모리 배리어를 사용한 후 안전하게 노드를 가리키는 포인터를 얻어온다.
  • 기존 노드 데이터에 대한 직접 수정은 금지된다. 곧바로 수정하지 않고 복사한 후에 수정 작업을 진행해야 다른 Reader 들이 안전하게 기존 노드 데이터를 사용할 수 있다.
  • rcu_assign_pointer()를 사용하여 먼저 매모리 배리어를 사용한 후 안전하게 노드 포인터를 교체한다.
  • 교체가 완료되었으면 Write side critical section의 끝으로 사용한 lock을 닫는다.
  • 마지막으로 기존 노드 메모리 삭제를 위해 다른 Reader 들이 접근하지 않는 것이 보장되는 시간에 기존 노드 메모리를 삭제할 수 있도록 rcu 동기 또는 비동기 호출을 한다.
    • call_rcu:
      • rcu 비동기 호출로 non-blocking 함수이다.
      • 모든 기존 RCU read-side critical sections들이 끝나면 인수로 요청한 함수를 호출한다.
    • synchronize_rcu():
      • rcu 동기 호출로 blocking 함수이다.
      • 모든 기존 RCU read-side critical sections들이 끝날 때까지 이 함수내에서 기다리며 그 후 인수로 요청한 함수를 호출한다.

 

아래 그림과 같이 기존 노드를 안전하게 수정하기 위해 복제 후 수정하고 기존 노드는 삭제 요청한다.

rcu7b

void foo_update_a(int new_a)
{
	struct foo *new_fp;
	struct foo *old_fp;
	new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
	spin_lock(&foo_mutex);
	old_fp = rcu_dereference_protected(gbl_foo,
		 lockdep_is_held(&foo_mutex));
	*new_fp = *old_fp;
	new_fp->a = new_a;
	rcu_assign_pointer(gbl_foo, new_fp);
	spin_unlock(&foo_mutex);
	call_rcu(&old_fp->rcu, foo_reclaim); 
}

 

Reclaimer: RCU node 사용 완료 후 폐기

  • 기존 데이터의 다른 Reader 들에 의해 사용되지 않음이 확인되면 grace periods 이후에 삭제하여 메모리를 회수한다.
  • 참조 카운터가 0이되면 삭제하는 것은 conceptual한 것이다. 글로벌 참조 카운터를 atomic하게 업데이트하는 구조(spin-lock)의 동기화를 사용하지 않아야 하므로 실제 구현은 더 복잡한다.

rcu6b

void foo_reclaim(struct rcu_head *rp)
{
	struct foo *fp = container_of(rp, struct foo, rcu);
	foo_cleanup(fp->a);
	kfree(fp);
} 

 

리스트에서의 RCU 사용

list 전용 API를 사용하여 간단히 interation을 할 수 있다. iteration 중에 방향(direction)을 바꿀 수는 없다.

 

Reader: RCU node를 사용할 때

rcu13

void disp_foo(void)
{
	rcu_read_lock();
	list_for_each_entry_rcu(p, head, list) {
		disp_foo();
	}
	rcu_read_unlock();
}

 

Updater: RCU node의 수정

  • list_replace_rcu()를 수행한 순간 역방향 연결은 끊어진 반면 순방향 연결은 계속 살아있다.

rcu14

void foo_update_a(int new_a)
{
	struct foo *new_fp;
	struct foo *old_fp = search(head, key);
	if (old_fp == NULL) {
	/* Take appropriate action, unlock, and return. */
	}
	new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);
	*new_fp = *old_fp;
	new_fp->a = new_a;
	list_replace_rcu(&old_fp->list, &new_fp->list);
	call_rcu(&old_fp->rcu, foo_reclaim); 
}
static inline void list_replace_rcu(struct list_head *old,
                                struct list_head *new)
{
        new->next = old->next;
        new->prev = old->prev;
        rcu_assign_pointer(list_next_rcu(new->prev), new);
        new->next->prev = new;
        old->prev = LIST_POISON2;
}

 

Reclaimer: RCU node 사용 완료 후 폐기

  • Grace periods를 지난 후 삭제 대상 노드를 모두 clean-up한다.

rcu15

void foo_reclaim(struct rcu_head *rp)
{
	struct foo *fp = container_of(rp, struct foo, rcu);
	foo_cleanup(fp->a);
	kfree(fp);
}

 


GP(Grace Period)

사용자는 특정 자료의 동기화를 위해 두 가지의 관점에서 처리한다.

  • 읽기 처리만 수행하는 read-side critical section이 있고 이를 Reader로 부른다.
  • 변경 처리가 포함된 write-side critical section이 있고 이를 Updater 또는 Writer로 부른다.

 

Writer 발생되는 시점, 즉 write-side critical section 부터 시작되는 구간 3가지를 정리해보았다.

  • Removal 구간: write-side critical section 구간에서는 읽어서(Read() 사본(Copy)을 만들고 변경(Update) 작업을 한다.
  • Grace Period 구간: Removal을 수행하는 자료에 관련된 Reader들의 처리가 완료됨을 보장할 수 있도록 대기하는 구간이다.
  • Reclamation 구간: 기존 사본을 원본에 적용하는 구간이다.

 

Removal(Read-Copy-Update)이 진행되는 동안 해당 데이터에 접근하고 있는 Reader들을 보호하기 위해 Removal은 기존 Reader들이 접근하고 있는 자료는 곧바로 수정하지 않고 이를 먼저 Copy한 후 Update하여 사용한다. 결국 Reader들은 원본 자료에만 접근하므로 안전하게 데이터에 접근할 수 있다.

 

Removal이 완료된 후 작업된 사본을 원본에 반영해야 하는데 곧바로 반영하면 각 cpu에서 동시에 처리되고 있는 Reader들에 문제가 발생한다. 따라서 Removal 기간 동안 같은 자료에 접근한 Reader들의 처리가 모두 완료될 때까지 기다려야 하는데 이 구간을 GP라고 한다.

 

GP의 완료를 인지하면 사본을 원본에 반영하고 필요 없어진 자료는 폐기하는 Reclamation 구간이 시작된다. 이러한 처리들은 RCU의 후CB(Call-Back) 함수를 리스트에 보관해 두었다가 Reclamation 구간에서 호출하여 처리한다.

 

다음 그림을 보고 Writer에 의해 발생되는 3가지 구간을 확인한다. 아래의 Grace Period는 conceptual한 의미이고, Min Grace Period로 불린다. Removal 기간 중 관련된 Reader들의 끝 부분이 곧바로 인식되지 않으므로 Grace Period가 끝나는 지점을 감지(detection) 하는 방법을 사용한다.

 

 

Grace Period의 종료 감지

커널이 Min Grace Period가 완료됨을 곧바로 인식하지 못하므로 실제 GP가 완료된 것을 감지하는 것은 특수한 기법이 필요하다. Reader(read-side critical section) 구간에서 성능을 저하시키는 추가 작업을 하지 않는 것이 핵심이므로 이 구간이 완료됨을 알아내기 위해 모든 cpu들이 QS(Quiescent State) 상태를 지나간 경우 GP가 완료된 것으로 판정한다.

  • preemptible RCU를 사용하는 시스템의 경우 GP 종료는 아래 그림의 모든 QS의 완료이외에도 추가로 read-side critical section내에서 preemption된 태스크의 복귀 상태도 확인한다.

 


QS(Quiescent State)

QS(Quiescent State)는 Reader 간의 사이 시간 상태로 GP 구간 전 이미 진행되고 있었던 Reader의 구간이 끝난 것을 인지할 목적으로 사용한다. QS 상태가 패스되었는지 확인하는 방법은 여러 가지가 사용되며 특히 preemption 커널 모델에 따라 조금씩 상이하다. 더 자세한 것은 다음 글의 “Quiscent State 체크 & 리포트”를 참조한다.

 

RCU non-preemptible 커널

  • context switch
    • context switch 발생 시 해당 cpu를 q.s로 체크한다.
  • 유저 모드 또는 idle에서 스케줄 틱
    • 유저 태스크 수행 중이거나 idle 중인 경우 해당 cpu를 q.s로 체크한다.
  • softirqd 실행 중
    • 현재 cpu에서 동작 중인 태스크가 softirqd인 경우 q.s로 체크한다.
  • voluntry 커널에서 cond_resched() 사용
    • preemption pointer 중 하나인 cond_resched() 사용 시 해당 cpu를 q.s로 체크한다.
  • voluntry 커널에서 cond_resched_tasks_rcu_qs() 사용
    • cb용 gp 커널 스레드 또는 nocb용 콜백 처리 루틴의 long loop에서 이 함수를 사용 시 해당 cpu를 q.s로 체크한다.

 

다음 그림과 같이 conceptual 수준에서의 QS는 Reader들 간의 시간으로 각 QS는 기존 Reader의 처리가 완료되었음을 보장하는 기간이다.

Candidate Quiescent State & Observed Quiescent State

다음 그림은 Min GP 이후 detect된 Q.S가 검정색 원으로 표시되었다. 각 cpu의 Q.S가 검출되면 실제 GP가 완료되었음을 알 수 있다.

 

조금 더 conceptual 수준을 구현 레벨로 약간 확장해보면 Write가 끝나면 GP가 시작하는데 이 때 각 cpu의 QS 상태를 모두 클리어한다. 그 후 모든 cpu의 QS가 한 번 이상 패스되면 GP가 끝나고 Reclamation이 진행된다.

 

다음 그림은 커널에서 실제 구현 레벨에서 QS 인지를 하는 과정을 보여준다. RCU Reader 이후 곧바로 cpu가 Quiescent State로 변경되는 것이 아니라 context switch 등이 일어났음을 알 수 있는 포인트에서 Quiescent State가 시작한다.

  • 구현 레벨이 상당히 복잡하므로 아래 그림만 보아서는 쉽게 이해가 가지 않을 수 있다.
  • cpu#0번의 경우를 먼저 살펴보자. conceptual 레벨과 다르게 실제 구현에서는 reader가 완료되자마자 q.s를 인식하지 못한다. 유저 태스크에 진입하면 q.s가 시작하지만 GP가 시작됨과 동시에 q.s가 모두 클리어되었었음을 기억해야 한다. 때문에 다른 태스크의 전환을 수행하는 context switch를 통해 q.s를 인지하게 된다.
  • cpu#3번을 보면 역시 reader가 끝난 후 유저 태스크에 진입하는 것으로 q.s가 시작됨을 알 수 있다. 이 또한 GP가 시작됨과 동시에 클리어되고 스케줄 틱이 발생하여 q.s를 인지하게된다.
  • cpu#5번의 경우 q.s를 인지하였고 모든 cpu의 q.s가 설정되었음을 알게되었다. 이 때 Reclamation 구간이 시작된다.

 

Force Quiescent State

GP가 시작된 후 각 cpu의 qs를 대기하는데 너무 오랫동안 gp가 끝나지 않으면 강제로 qs를 패스상태로 만든다.

 

Brute-Force RCU GP

synchronize_rcu() 함수를 호출하여 gp가 완료되어 동기화를 요청하면 gp가 완료할 때까지 기다린 후 콜백들을 처리한다. 만일 수천 개의 cpu가 가동되는 시스템에서 이러한 요청이 있으면 매우 오랜 시간을 대기해야 하는데 cpu를 그룹별로 묶어 파티셔닝하여 처리하는 구현 방법으로 빠른 처리를 사용하는 방법이 있다. 이 기능은 preemptible rcu 커널 옵션을 사용할 때 synchronize_rcu() 함수의 사용에 대한 급행처리를 할 수 있게하는 기능이다.

 

Deferrable QS

preemptible RCU의 경우 rcu_read_unlock() 시 해당 cpu의 interrupt, bh(softirq), preemption이 하나라도 disable되어 있는 경우 deferred qs로 보고한다.

 


RCU 콜백 처리

3가지 rcu_state

rcu는 스케줄 틱마다 콜백을 처리하기 위해 설계되었고, 한동안 각각의 gp를 관리하는 3개의 state에서 관리하였다가 최근에는 1개의 state로 통합되었다.

통합전
  •  rcu_sched
    • rcu read-side critical section 내에서 preemption되는 것을 막아야 하는 상황에서 사용되며, rcu 콜백은 rcu_sched용 gp가 끝난 후에 처리된다.
  • rcu_bh
    • DDOS attack 상황에서 OOM(Out Of Memory)이 발생하는 것을 방지하기 위해 softirq를 사용하는 네트워크 처리 시 rcu_bh 상태를 추가하여 별도로 관리하는 gp가 끝난 후에 처리한다.
  • rcu_preempt
    • preemptible 커널에서 read side critical 섹션 내에서 preemption을 허용하는 방식을 지원하기 위해 별도록 관리하는 gp가 끝난 후에 처리한다.
통합후
  • rcu_preempt
    • preemptible 커널을 사용하는 경우 read side critical 섹션 내에서 preemption을 허용한다.

 

RCU 콜백

synchronize_rcu() & call_rcu()

  • RCU updater가 콜백 요청을 하는데 모든 RCU read-side critical sections들이 끝난 후에 즉, gp가 끝난 후에 콜백이 처리되기를 기다린다.
  • read-side critical section 사용시 규칙: 모든 read-side critical section 즉 rcu_read_lock() 과 rcu_read_unlock()사이에는 block or sleep되면 안된다. (SRCU는 가능하다)
  • 모든 CPU의 context switch가 완료되면 RCU read-side critical section period는 완전히 끝난것으로 보장할 수 있게 되므로 모든 read-side critical section들이 안전하게 끝난것을 의미한다.
  • 삭제될 old 데이터들은 다른 Reader가 참조하고 있는 동안(Grace Period) 삭제 보류된 채로 있다가 old data를 참조하는 마지막 Reader의 사용이 끝나면 Reclamation을 진행하게 된다.

 

 read-side critical section period 보장

  • qs는 위에서 언급하였듯이 context switch이외에도 user 모드 또는 idle 시 스케줄 틱이 호출된 경우도 qs가 패스된다.
  • 모든 cpu의 qs가 패스되면 gp가 완료되어 reclmation이 진행된다.
    • preemptible rcu 구현이 추가되어 조금 더 복잡해졌다.
    • 최대 두 번의 gp가 완료되어야 reclamation이  진행될 수도 있다.

 

RCU NO-CB

절전 기능을 위해 콜백들을 인터럽트 context에서 호출하지 않고 별도의 cpu에 전용 no-cb 처리용 커널 스레드를 생성하여 운영하는 방법이다.

 


기존 rwlock 구현 소스를 rcu 구현 소스로 변경 예제

struct el {
	struct list_head list;
	long key;
	spinlock_t mutex;
	int data;
};
spinlock_t 	listmutex;                
struct 	el 	head;

 

search()

int search(long key, int *result) 
{
	struct list_head *lp;
	struct el *p;
	read_lock();
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			*result = p->data;
			read_unlock();
			return 1;
		}
	}
	read_unlock();
	return 0;
}
int search(long key, int *result) 
{
	struct list_head *lp;
	struct el *p;
	rcu_read_lock();
	list_for_each_entry_rcu(p, head, lp) {
		if (p->key == key) {
			*result = p->data;
			rcu_read_unlock();
			return 1;
		}
	}
	rcu_read_unlock();
	return 0;
}

 

delete()

int delete(long key)
{
	struct el *p;
	write_lock(&listmutex);
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			list_del(&p->list);
			write_unlock(&listmutex);
			kfree(p);
			return 1;
		}
	}
	write_unlock(&listmutex);
	return 0;
}
int delete(long key)
{
	struct el *p;
	spin_lock(&listmutex);
	list_for_each_entry(p, head, lp) {
		if (p->key == key) {
			list_del_rcu(&p->list);
			spin_unlock(&listmutex);
			synchronize_rcu();
			kfree(p);
			return 1;
		}
	}
	spin_unlock(&listmutex);
	return 0;
}

 


RCU API 목록

RCU list traversal:

  • list_entry_rcu
  • list_first_entry_rcu
  • list_next_rcu
  • list_for_each_entry_rcu
  • list_for_each_entry_continue_rcu
  • hlist_first_rcu
  • hlist_next_rcu
  • hlist_pprev_rcu
  • hlist_for_each_entry_rcu
  • hlist_for_each_entry_rcu_bh
  • hlist_for_each_entry_continue_rcu
  • hlist_for_each_entry_continue_rcu_bh
  • hlist_nulls_first_rcu
  • hlist_nulls_for_each_entry_rcu
  • hlist_bl_first_rcu
  • hlist_bl_for_each_entry_rcu

RCU pointer/list update:

  • rcu_assign_pointer
  • list_add_rcu
  • list_add_tail_rcu
  • list_del_rcu
  • list_replace_rcu
  • hlist_add_behind_rcu
  • hlist_add_before_rcu
  • hlist_add_head_rcu
  • hlist_del_rcu
  • hlist_del_init_rcu
  • hlist_replace_rcu
  • list_splice_init_rcu
  • hlist_nulls_del_init_rcu
  • hlist_nulls_del_rcu
  • hlist_nulls_add_head_rcu
  • hlist_bl_add_head_rcu
  • hlist_bl_del_init_rcu
  • hlist_bl_del_rcu
  • hlist_bl_set_first_rcu

RCU:

  • rcu_read_lock
  • synchronize_net
  • rcu_barrier
  • rcu_read_unlock
  • synchronize_rcu
  • rcu_dereference
  • synchronize_rcu_expedited
  • rcu_read_lock_held
  • call_rcu
  • rcu_dereference_check
  • kfree_rcu
  • rcu_dereference_protected

bh:

  • rcu_read_lock_bh
  • rcu_read_unlock_bh
  • rcu_dereference_bh
  • rcu_dereference_bh_check
  • rcu_read_lock_bh_held
  • call_rcu_bh
  • rcu_barrier_bh
  • synchronize_rcu_bh
  • synchronize_rcu_bh_expedited
  • rcu_dereference_bh_protected

sched:

  • rcu_read_lock_sched
  • rcu_read_unlock_sched
  • rcu_read_lock_sched_notrace
  • rcu_read_unlock_sched_notrace
  • rcu_dereference_sched
  • rcu_dereference_sched_check
  • rcu_read_lock_sched_held
  • synchronize_sched
  • rcu_barrier_sched
  • call_rcu_sched
  • synchronize_sched_expedited
  • rcu_dereference_sched_protected

SRCU:

  • srcu_read_lock
  • synchronize_srcu
  • srcu_barrier
  • srcu_read_unlock
  • call_srcu
  • srcu_dereference
  • synchronize_srcu_expedited
  • srcu_dereference_check
  • srcu_read_lock_held
  • init_srcu_struct
  • cleanup_srcu_struct

All: lockdep-checked RCU-protected pointer access

  • rcu_access_pointer
  • rcu_dereference_raw
  • RCU_LOCKDEP_WARN
  • rcu_sleep_check
  • RCU_NONIDLE

 


Core RCU API

rcu_read_lock()

  • read-side critical section 시작

  • preemptible RCU의 경우 current task의 rcu_read_lock_nesting++ 명령만을 포함한다.
  • 참고로 non-preempt 커널에서는 preempt_disable()은 빈 함수이다.

 

rcu_read_unlock()

 

rcu_assign_pointer()

  • RCU로 보호되는 포인터(RCU-protecte pointer)에 새로운 값을 할당하기 위해 사용

 

rcu_dereference()

  • rcu_dereference()는 안전하게 참조(dereference)된 RCU-protected pointer 값을 얻어온다.
  • weakly ordered CPU(out of order execution)를 위해 메모리 접근에 대한 order를 적절히 관리해야 하는데 rcu_dereference()를 사용하면 이를 완벽히 수행할 수 있다.

 

RCU_INIT_POINTER() 매크로

include/linux/rcupdate.h

/**
 * RCU_INIT_POINTER() - initialize an RCU protected pointer
 * @p: The pointer to be initialized.
 * @v: The value to initialized the pointer to.
 *
 * Initialize an RCU-protected pointer in special cases where readers
 * do not need ordering constraints on the CPU or the compiler.  These
 * special cases are:
 *
 * 1.   This use of RCU_INIT_POINTER() is NULLing out the pointer *or*
 * 2.   The caller has taken whatever steps are required to prevent
 *      RCU readers from concurrently accessing this pointer *or*
 * 3.   The referenced data structure has already been exposed to
 *      readers either at compile time or via rcu_assign_pointer() *and*
 *
 *      a.      You have not made *any* reader-visible changes to
 *              this structure since then *or*
 *      b.      It is OK for readers accessing this structure from its
 *              new location to see the old state of the structure.  (For
 *              example, the changes were to statistical counters or to
 *              other state where exact synchronization is not required.)
 *
 * Failure to follow these rules governing use of RCU_INIT_POINTER() will
 * result in impossible-to-diagnose memory corruption.  As in the structures
 * will look OK in crash dumps, but any concurrent RCU readers might
 * see pre-initialized values of the referenced data structure.  So
 * please be very careful how you use RCU_INIT_POINTER()!!!
 *
 * If you are creating an RCU-protected linked structure that is accessed
 * by a single external-to-structure RCU-protected pointer, then you may
 * use RCU_INIT_POINTER() to initialize the internal RCU-protected
 * pointers, but you must use rcu_assign_pointer() to initialize the
 * external-to-structure pointer *after* you have completely initialized
 * the reader-accessible portions of the linked structure.
 *
 * Note that unlike rcu_assign_pointer(), RCU_INIT_POINTER() provides no
 * ordering guarantees for either the CPU or the compiler.
 */
#define RCU_INIT_POINTER(p, v) \
        do { \
                rcu_check_sparse(p, __rcu); \
                WRITE_ONCE(p, RCU_INITIALIZER(v)); \
        } while (0)

RCU 포인터영역을 Sparse 체크하고 RCU 변수 v를 초기화한다.

  •  sparse:
    • 리눅스 커널의 문제를 찾아주는 툴
    • 스파스는 정적 분석 도구이고, 설치된 후 gcc extension으로 동작
    • 지원 속성으로 noderef, address_space, lock(acquires, releases), …
  • __rcu:
    • __attribute__((noderef, address_space(4)))
    • noderef:
      • 포인터 변수를 사용하여 직접 참조할 수 없다.
      • & 연산자를 사용해서 직접 참조해야 한다
    • address_space:
      • 커널에는 몇개의 주소공간이 있다.
      • 0 : kernel, 1: user, 2: iomem, 3: percpu, 4: __rcu 공간

 

참고