Mutex for linux kernel space

이 문서에서는 리눅스 kernel space용 뮤텍스를 설명하였고, 만일 user space용 뮤텍스가 필요한 경우 이 문서가 아닌 futex 또는 pthread_mutex(POSIX 뮤텍스) 등을 참고해야 한다. 또한 “뮤텍스를 왜 사용하는가?” 또는 “뮤텍스의 사용 예” 등에 대해서는 이 문서에서 친절히 설명하지 않는다.
  • 최초 뮤텍스 구현은 Ingo Molnar에 의해 시작되었다.
  • 뮤텍스는 커널에서 학계 또는 유사한 이론 서적에서 발견된 ‘상호 배제(mutual exclusiion)’를 참고하는 것 뿐만아니라 공유 메모리에서의 직렬화(serialization)를 수행하는 특정 원시 locking도  참고한다.
  • 2006년에 sleeping 가능한 뮤텍스 락이 소개되었고 이는 바이너리 세마포어와 유사한 동작을 한다.
  • Sleeping lock은 critical section이 긴 경우(long hold times)  효과적이지만 그 특징으로 인해 인터럽트 핸들러에서 사용할 수 없다.
  • 뮤텍스를 빠른 시간 내에 얻을 수 없는 상황에서는 뮤텍스를 얻을 때 까지 현 태스크를 sleep(다른 태스크에 제어권을 넘긴다) 시킨다.
  • Generic Mutex Subsystem은 RT(Real Time) 커널에 있는 소스를 메인 스트림에 upstream 하였는데, 이 소스는 x86 계열의 샘플이고 다른 아키텍처로의 포팅이 쉽게될 수 있도록 가이드한다.
  • 뮤텍스는 실제 원래 설계 및 목적과 다르게 뮤텍스 오브젝트가 세마포어 오브젝트보다 크기가 커져서 CPU cache와 메모리 footprint에 용량이 더 소모되는 단점이 있다.
  • 2013.11월 kernel 디렉토리의 락 관련 화일이 모두 kernel/locking 디렉토리로 이전됨.
  • Kernel용 mutex는 recursive lock을 지원하지 않는다.
  • 성능을 개선하기 위해 성능순으로 3단계 패스(fastpath, midpath, slowpath)를 구성하였다.
  • Fastpath에서는 한 번의 atomic operation 으로 뮤텍스를 가장 빠르게 얻을 수 있되 락을 얻기 전에 unlock 상태의 조건에서만 사용 가능하다.
  • Optimistic spinning lock (adaptive spinning for mutex 또는 advanced MCS (queued) lock) 기술은  midpath로 non-sleepable busy wait 를 구현하였는데 SMP 시스템에서 특정 조건을 만족시키면 사용 가능하다.
  • 여러 문서들에서 뮤텍스가 sleeping lock이라고 소개되지만 실상은 조금 더 복잡하다. 실제 구현에서는 락 상태별로 몇 개의 조건을 성립하면 fastpath나 midpath를 사용하여 sleep 하지 않고 뮤텍스 락을 얻게될 수 있다. 그러나 그 조건에 부합되지 않은 경우 slowpath가 동작하며 이 때 sleep 한다.
  • Wait/Wound Deadlock Proof Mutex(ww-mutex)는 일반 뮤텍스와 달리 deadlock 회피가 가능하다.

 

사전 지식

 

뮤텍스를 분석하는 것은 매우 광범위한 리눅스 커널 지식이 요구된다. 뮤텍스 기술 역시 현재도 끊임없이 추가 기술이 적용되어 가면서 더 복잡해지고 있고  그럼에 따라 기존 도서나 인터넷 자료들에서 설명한 것과 약간씩 달라져가고 있다. 현재 버전의 뮤텍스 소스를 한 번에 모두 이해하기 힘들기 때문에 여러 번에 걸쳐 top-down 방식으로 위에서 아래 세부 루틴으로 단계별로 분석하는 것이 여러 가지로 이해하는데 두뇌 고통을 줄이는데 도움될 것이다. 동반해서 다음과 같이 사전에 알아둬야 할 항목들이 있다.
  • likely(), unlikely() 매크로
  • Atomic operation
    • ldrex, strex
  • Spin lock
    • spin_lock(), spin_unlock()
  • Preemption
    • preempt_enable(), preempt_disable()
  • RT(Real Time) Kernel
  • Interrupt
    • local_irq_enable(), local_irq_disable(), local_irq_save(), local_irq_restore()
  • Lockdep
    • lock_acquire(), lock_release()
  • Scheduler
    • preemption에 따른 priority 개념, sleep 개념정도만

 

뮤텍스 옵션

CONFIG_DEBUG_LOCKDEP

CONFIG_DEBUG_LOCK_ALLOC

  • 락뎁(LOCKDEP), 디버그 스핀락(DEBUG_SPINLOCK), 디버그 뮤텍스(DEBUG MUTEXES) 등을 선택하는 경우 락을 디버깅하기 위해 사용되는데 커널에 의해 락 메모리가 올바르게 해제(kfree(), kmem_cache_free(), free_pages(), vfree(), etc…)되지 않는 경우를 감지한다.
  • 해당 옵션이 동작하는 경우 mutex_lock() 매크로는 mutex_lock_nested() 함수를 호출하고 fastpath 과정을 건너뛴다.
#define mutex_lock(lock) mutex_lock_nested(lock, 0)
void __sched  mutex_lock_nested(struct mutex *lock, unsigned int subclass)
{
        might_sleep();
        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE,
                            subclass, NULL, _RET_IP_, NULL, 0);
}

 

CONFIG_LOCK_STAT

  • 락뎁(LOCKDEP), 디버그 스핀락(DEBUG_SPINLOCK), 디버그 뮤텍스(DEBUG MUTEXES), 디버그 락할당(DEBUG_LOCK_ALLOC) 등을 선택하는 경우 락을 디버깅하기 위해 락 사용 통계를 제공한다.
  •  참고: Documentation/locking/lockstat.txt

CONFIG_DEBUG_MUTEXES

  • 디버깅 기능을 사용할 수 있으며 이로 인해 약간의 제한을 갖지만 락 개체에 대해 분석할 수 있다.
  • 이 옵션을 사용하는 경우 fastpath 루틴은 사용할 수 없으며 항상 slowpath(midpath) 루틴만을 호출한다.
  • 이 옵션의 사용 유무에 따른 구현 루틴 구분
    • .enable:  <linux/mutex-debug.h>, “kernel/locking/mutex-debug.h”, <asm-generic/mutex-null.h>에 있는 헤더파일이 사용됨.
    • .disable: “kernel/locking/mutex.h”, <asm/mutex.h> -> <asm-generic/mutex-dec.h>

 

CONFIG_PROVE_LOCKING

  • lock 체인을 분석하고 잘못을 디텍트해 아래와 같이 경고 메시지를 출력한다.
    • circular_lock_dependency: “[ INFO: possible circular locking dependency detected ]”
    • bad_irq_lock_dependency: “[ INFO: XXXXX-safe -> YYYYY-unsafe lock order detected ]”
    • deadlock_bug: “[ INFO: possible recursive locking detected ]”
    • usage_bug: “[ INFO: inconsistent lock state ]”
    • irq_inversion_bug: “[ INFO: possible irq lock inversion dependency detected ]”
    • nested_lock_not_held: “[ BUG: Nested lock was not taken ]”
    • unlock_imbalance_bug: “[ BUG: bad unlock balance detected! ]”
    • contention_bug: “[ BUG: bad contention detected! ]”
    • held_locks_bug: “[ BUG: %s/%d still has locks held! ]”
  • 참고: Documentation/locking/lockdep-design.txt

Recursive Lock vs Non-Recursive Lock

  • POSIX 뮤텍스는 Recursive Lock을 허락한다. 그 의미는 같은 스레드에서 같은 뮤텍스에 대해 두 번 락을 할 수 있고 dead-lock이 되지 않는다는 뜻이다. 물론 두 번 락을 한 것에 대해 두 번 언락을 해야 한다.
  • POSIX 뮤텍스를 사용하는 pthread_mutex는 PTHREAD_MUTEX_RECURSIVE 속성을 사용하여 Recursive lock을 사용할 수 있다.
  • 커널에서 사용하는 뮤텍스는 Non-Recursive lock 이다. 같은 스레드에서 두 번 락을 하게되면 두 번째 lock을 획득하는 순간 멈춘다(hold)

Wait/Wound Deadlock Proof Mutex

  • Maarten이 개발한 패치로 deadlock을 회피하여 뮤텍스 락을 처리하는 알고리즘으로 ww-mutex라고도 불린다.

mutex8a

  • 커널 코드는 lock ordering과 Lockdep의 체킹 툴로 비정상 상황을 찾아낼 수 있다. 그럼에도 불구하고 deadlock 상황이 발생하는 경우 ww-mutex를 사용하는 경우 다음과 같이 dead-lock 회피 알고리즘이 동작한다.
    • 첫 번째 스레드가 락A를 소유하고, 두 번째 스레드가 락B를 소유한 상태에서
    • 첫 번째 스레드가 락B의 소유를 시도하고, 두 번째 스레드도 락A를 소유하려고 시도할 때 패치 적용전에는 deadlock 상황에 들어간다.
    • 패치가 적용되는 경우 첫 번째 스레드가 락B의 소유를 시도하다 블럭되고
    • 두 번째 스레드가 락A를 소유하려고 시도할 때 서로 교차되는 락을 발견하여 “wounded” 된다.
    • wounded 의미: 현재 기존에 홀드하고 있는 잠금을 해제하고 처음부터 다시 시작해야 한다라고 말하다(told)
    • 두 번째 스레드가 “wound” 처리를 위해 이미 소유하고 있던 락B를 풀었다가 다시 얻으려고 시도하지만
    • 두 번째 스레드가 락B를 풀자마자 그 시점에 첫 번째 스레드가 락B를 먼저 획득한다.
    • 따라서 두 번째 스레드는 락B를 얻지 못하고 블럭된다.
    • 첫 번째 스레드하고 critical section을 지난 후 락B와 락A를 차례대로 반납한다.
    • 첫 번째 스레드가 락B를 반납하자마자 두 번째 스레드가 락B를 소유하게 되고
    • 다시 첫 번째 스레드가 락A를 반납하자마자 역시 두 번째 스레드가 락A를 소유하게 된다.
    • 두 번째 스레드가 critical section을 통과한 후 락A와 락B를 연달아 해제하는 것으로 두 스레드의 모든 락의 사용이 완료된다.
  • 이의 구현을 위해 ww_mutex, ww_class, ww_acquire_ctx라는 구조체가 사용된다.
  • 사용 시
    • mutex_lock() 대신 별도의 ww_mutex_lock()을 사용한다.
    • 헤더 화일로 mutex.h 대신 ww-mutex.h를 사용한다.

Fastpath, Midpath, Slowpath Mutex Lock

성능 관련한 접근 방법으로 뮤텍스 획득을 위해 락상태에 따라 3가지의 방법 중 하나를 가장 빠른 fastpath 부터 midpath, slowpath 까지 선택 사용한다. 처음에 fastpath를 사용하여 뮤텍스가 획득되면 midpath나 slowpath 루틴을 사용하지 않는다. fastpath를 실패한 후 midpath 루틴으로 진입한 경우 역시 뮤텍스가 획득되면 slowpath 루틴을 사용하지 않는다.
 mutex2b
  • fastpath (Atomic Operation)
    • unlock 상태에서 lock을 획득할 때 fastpath를 사용할 수 있다.
    • atomic opeation(몇 개의 인스트럭션이며 아키텍처마다 다름)의 수행만으로 락/언락을 처리하므로 속도가 빠르다.
    • count 변수를 atomic하게 감소(또는 교환) 시키는 기법으로 락을 획득하는 방법이다.
    • 사전에 count 변수가 1(unlock)인 경우만 fastpath 방법으로 lock을 획득할 수 있다.
    • fastpath의 구현은 다음과 같이 두 개로 나뉘어 있다.
      • atomic 기반의 감소(decrement)/증가(increment) 명령으로 구현
      • atomic 기반의 교환(xchg) 명령으로 구현
    • fastpath는 아키텍처에 따라 일반적으로 dec/inc 기반의 사용이 적절한 경우 먼저 선택 사용된다.
  • midpath (Optimistic spinning)
    • optimistic spinning 또는 adaptive spinning for mutex라 불림
    • 이미 락이 걸려 있는 상태에서 아래 조건이 만족하면 빠르게 락을 얻을 가능성이 있어서 midpath를 사용한다.
      • 락 owner 태스크가 러닝중
      • 리스케쥴 요청이 없는 경우 (현재 태스크보다 더 높은 우선 순위(need_resched)로 동작할 태스크가 없는)
    • 뮤텍스 스피너(spinner)는 OSQ(MCS) lock(simple spinlock)을 사용하여 wait_queue에 큐잉(queued up)되고 블러킹되며 기존 spinlock처럼 busy-wait한다. 가장 먼저 큐에 인입된 오직 하나의 스피너만이 뮤텍스를 얻기 위해 경쟁할 수 있다.
    • 실제 구현 루틴에서 fastpath 및 slowpath와 다르게 midpath라는 함수명으로 구분되어 있지 않다.
      • midpath 루틴은 __mutex_lock_slowpath() 루틴의 전반부에서 호출됨
    • CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용하여 동작시킨다.
    • slowpath와 다르게 sleep/wake-up 사이클 없이 락을 획득한다.
    • 다음 문제도 해결 완료
      • cache line 충돌(contention) 문제 해결. (Spin on our own cache line)
      • ticket spinlock처럼 불공정(fairness) 문제를 해결.
      • 참고: Preventing overly-optimistic spinning | LWN.net
  • slowpath
    • mutex 획득의 3가지 방법 중 가장 느린 방법으로 mutex를 얻을 때까지 루프를 돌며 sleep하고 제어권을 다른 태스크에게 양도한다.
    • 디버깅 옵션(CONFIG_DEBUG_MUTEXES)을 사용하는 경우에는 항상 slowpath 방법만을 사용한다.
    • wait_lock spinlock을 사용하고, 태스크는 대기큐에 추가되고 unlock path에 의해 깨어날 때 까지 슬립된다.
    • 여러 태스크가 경쟁적으로 slowpath에서 mutex를 소유하려고 경쟁하는 경우에는 mutex_lock() 함수 접근 순서대로 처리되지 않는다. 각 태스크가 sleep 되었다가 wake-up 될 때마다 mutex를 얻을 수 있는지 조사하고 얻을 수 있을 때 mutex를 획득한다. 나머지 태스크는 다시 sleep되며 루프를 돈다.
      • sleep 기반에서 동작하는 경우에는 각 태스크가 깨어나는 타이밍이 달라서 mutex의 획득 순서에 대하여 공정하게 처리하기 위해 mutex_unlock() 함수 내부에서 대기자들 중 가장 먼저 대기했던 태스크를 먼저 깨울 수 있도록 지시하고 mutex를 release하여 최대한 해당 태스크가 빨리 mutex를 획득할 수 있도록 도와 성능을 높인다.

 

Fastpath, slowpath Mutex Unlock

성능 관련한 접근 방법으로 뮤텍스 해제를 위해 락상태에 따라 2가지의 방법 중 하나를 가장 빠른 fastpath와 slowpath를 선택 사용한다. 처음에 fastpath를 사용하여 뮤텍스가 해제되면 slowpath 루틴을 사용하지 않는다.

mutex-7b

 

Fastpath Mutex Lock 구현 방법

__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath); 루틴은 CPU 아키텍처에 따라 3가지의 루틴 중 하나를 빌드하여 사용된다.
  • dec/inc 기반의 fastpath lock (for ARM)
  • xchg 기반의 fastpath lock
  • fastpath 없이 곧장 slowpath(with midpath) lock

1) dec/inc 기반의 fastpath lock

  • ARM 아키텍처에서 사용
  • 먼저 atomic_dec_return()으로 이루어진 뮤텍스 fastpath 루틴을 동작시켜 count 값 감소 결과 값이 적은 확률로 값이 0보다 작으면 fail_fn(__mutex_lock_slowpath 함수) 호출.
    • atomic_dec_return() 결과값이 0이거나 음수로 예상되는데 이에 따른 의미
      • 0: unlock(1) -> lock(0)으로 상태가 바뀜.
      • 음수: 기존에 뮤텍스 락이 이미 걸린 상태에서 뮤텍스 락이 또 호출되어 중첩(2번 이상)된 경우
    • unlikely() 매크로는 성능향상 즉 optimization을 목적으로 주로 false가 예상되는 경우에 사용된다.
    • atomic_dec()이 값을 감소시키기 전의 값을 리턴하는데 반하여 atomic_dec_return()은 값을 감소 시킨 후의 값을 리턴한다.
include/asm-generic/mutex-dec.h
static inline void __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
    if (unlikely(atomic_dec_return(count) < 0))
        fail_fn(count);
}

 

2) xchg 기반의 fastpath lock

  • 첫 번째, count 값을 0(lock)으로 교체하기 전 count 값이…
    • 적은 확률로 1(unlock)이 아니면 즉, 이미 락이 있으면서 또 락이 호출된 경우 두 번째 조건 수행
    • 많은 확률로 1(unlock)인 경우 조건을 빠져나간다.
      •  fastpath로 싱글 뮤텍스 락 성립
  • 두 번째, count 값을 -1(double lock)로 교체하기 전 count 값이…
    • 많은 확률로 1(unlock)이 아니면 fail_fn(__mutex_lock_slowpath 함수) 호출 (중첩 락 성립)
    • 적은 확률로 1이면 조건을 빠져나간다.
      • fastpath로 싱글 뮤텍스 락 성립
      • 첫 번째 조건 수행 후 두 번째 조건으로 오기 전에 우선 순위가 높은 태스크가 preempt되어 unlock된 후 다시 이 태스크로 돌아와 계속 수행하게 되는 경우 이렇게 unlock 된 상태로 있을 수 있음.
      • 개개의 atomic operation은 수행 결과가 분리될 수 없지만 이렇게 개개의 atomic operation을 수행하는 경우에는 preemption 상황을 준비해야 한다.
include/asm-generic/mutex-xchg.h
static inline void  __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
    if (unlikely(atomic_xchg(count, 0) != 1))
        /* 
         * We failed to acquire the lock, so mark it contended
         * to ensure that any waiting tasks are woken up by the
         * unlock slow path.
         */
        if (likely(atomic_xchg(count, -1) != 1))
            fail_fn(count);
}

 

3) slowpath(midpath)로 직행

  • 항상 fastpath를 생략하고 fail_fn(__mutex_lock_slowpath 함수) 호출.
include/asm-generic/mutex-null.h
#define __mutex_fastpath_lock(count, fail_fn)       fail_fn(count)

 

mutex_lock()

  • mutex_lock()의 흐름은 아래와 같다.  CONFIG_DEBUG_LOCK_ALLOC 기능을 사용하는 경우 mutex_lock_nested() 함수에서 fastpath를 생략하고 곧장 __mutex_lock_common() 함수(midpath와 slowpath 루틴)로 진입한다.
  • 옵션이 없는 경우 ARM 아키텍처인경우 atomic_dec_return()을 호출하는 __mutex_fastpath_lock() 매크로를 호출하여 fastpath를 처음부터 호출하여 사용한다.

mutex

아래 그림은 mutex를 획득하는 조건에 대해서 흐름도로 간략히 표현하였다.

mutex-9

kernel/locking/mutex.c
/**
 * mutex_lock - acquire the mutex
 * @lock: the mutex to be acquired
 *
 * Lock the mutex exclusively for this task. If the mutex is not
 * available right now, it will sleep until it can get it.
 *
 * The mutex must later on be released by the same task that
 * acquired it. Recursive locking is not allowed. The task
 * may not exit without first unlocking the mutex. Also, kernel
 * memory where the mutex resides must not be freed with
 * the mutex still locked. The mutex must first be initialized
 * (or statically defined) before it can be locked. memset()-ing
 * the mutex to 0 is not allowed.
 *
 * ( The CONFIG_DEBUG_MUTEXES .config option turns on debugging
 *   checks that will enforce the restrictions and will also do
 *   deadlock debugging. )
 *
 * This function is similar to (but not equivalent to) down().
 */
void __sched mutex_lock(struct mutex *lock)
{
        might_sleep();
        /*
         * The locking fastpath is the 1->0 transition from
         * 'unlocked' into 'locked' state.
         */
        __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
        mutex_set_owner(lock);
}
mutex_lock() 부분 주석을 간단히 설명한다.
  • 현재 태스크를 위해 베타적으로 뮤텍스를 lock한다. 만일 뮤텍스가 지금 당장 가능하지 않으면 소유할 때 까지 sleep 한다.
  • 뮤텍스는 나중에 뮤텍스를 획득한 동일한 태스크에 의해 release 되야 한다.
  • Recursive lock은 허용하지 않는다.
  • 태스크는 처음 뮤텍스의 unlocking 없이 종료(exit) 할 수 없다.
  • 또한 뮤텍스가 lock된 채로 뮤텍스가 자리한 커널 메모리가 해제(free)될 순 없다.
  • 처음 뮤텍스를 lock하기 전에 먼저 초기화(또는 정적으로 defined) 되어야 한다.
  • memset()으로 뮤텍스를 0으로 초기화 하는 것은 허락되지 않는다.
  • CONFIG_DEBUG_MUTEXES: 이 옵션을 동작시키면(turn on) 제약 사항(fastpath를 수행하지 않는)이 있으며 dead-lock 디버깅 등을 수행할 수 있다.
  • mutex_lock() 함수는 완전히 동일하진 않지만 down() 함수와 유사하다.

소스 루틴을 살펴보면…

  • might_sleep()은 리스케쥴 요청을 받았을 때 sleep하는 preemption points이다. (참고: preemption, might_sleep())
    • 이 루틴은 CONFIG_PREEMPT_VOLUNTARY 옵션을 사용하는 경우만 호출된다.
  • __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
    • fastpath를 호출하고 실패 시 slowpath(midpath)로 구현된 mutex lock 루틴을 호출한다.
    • 아키텍처마다 약간 다르지만 ARM 아키텍처에서는 atomic_dec_return()을 사용하는 fastpath 루틴이다.
    • 위에서 자세히 설명하였었지만 fastpath는 락 count 값이 1(unlock) -> 0(lock)으로 상태가 변환될 때만 성공이다.
  • 뮤텍스 락에 owner를 설정해둔다.
    • CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용한 경우에만 owner를 설정한다.

__mutex_lock_slowpath()

kernel/locking/mutex.c
__visible void __sched __mutex_lock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
                            NULL, _RET_IP_, NULL, 0);
}
  • container_of()는 구조체의 멤버변수를 사용하여 구조체의 처음을 알아내는 함수
  • __mutex_lock_common()에는 뮤텍스 락에 대한 midpath와 slowpath 로직이 담겨있다.

 

__mutex_lock_common()

__mutex_lock_common() 함수내에서 midpath와 slowpath 루틴이 순차적으로 구현되어 있다.
kernel/locking/mutex.c
static __always_inline int __sched __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
                    struct lockdep_map *nest_lock, unsigned long ip,
                    struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
        struct task_struct *task = current;
        struct mutex_waiter waiter;
        unsigned long flags;
        int ret;

        preempt_disable();
        mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);

        if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
                /* got the lock, yay! */
                preempt_enable();
                return 0;
        }
처음 시작하는 부분이 midpath(optimistics spin) 루틴으로 먼저 수행된다. midpath 루틴은 non-sleepable spin-wait(루프)으로 구현된다.
  • preempt를 disable하여 리스케쥴링하지 않도록 하므로 슬립되지 않는다.
  • mutex_acquire_nest()은 락 추척을 위한 디버깅 기능을 위해 호출한다.
  • midpath인 mutex_optimistic_spin()으로 락 획득이 성공하면 preempt를 enable 한 후 성공리에 빠져나간다.
  • 인수의 마지막에 use_ww_ctx가 있는데 보통 0이며, ww_mutex를 사용하는 경우엔 1이다.
    • mutex_lock()을 사용하지 않고 dead-lock avoidance 기능이 있는 ww_mutex_lock()을 사용한 경우에 해당
        spin_lock_mutex(&lock->wait_lock, flags);

        /*
         * Once more, try to acquire the lock. Only try-lock the mutex if
         * it is unlocked to reduce unnecessary xchg() operations.
         */
        if (!mutex_is_locked(lock) && (atomic_xchg(&lock->count, 0) == 1))
                goto skip_wait;
 midpath에서 lock 획득이 실패하였으면 slowpath 루틴에서 락을 획득한다. slowpath 루틴은 non-busy(sleepable) wait으로 구현된다.
  • spin_lock_mutex(): 인터럽트되지 않도록 하면서 뮤텍스의 wait_lock에 spinlock을 건다.
    • spin_lock을 거는 이유는 wait_list 등의 자료 구조를 보호하면서 access를 하고 동시에 두 개 이상의 태스크가 접근할 때 조금이라도 먼저 접근한 태스크에게 mutex를 획득할 수 있는 기회를 주기 위함이다.
  • 본격적으로 slowpath를 수행하기 전에 한 번 더 midpath 처럼 락을 얻을 수 있는지 시도한다.
    • !mutex_is_locked():  만일 뮤텍스가 unlock 상태이고(이 루틴까지 오면서 lock 상태가 변해 unlock(release)된 경우에 해당)
    • atomic_xchg(): lock(0)을 획득하였을 때 변경 전 count 값이 1(unlock)이면 락을 정상적으로 획득한 것이므로 skip_wait 라벨로 점프한다.
        debug_mutex_lock_common(lock, &waiter);
        debug_mutex_add_waiter(lock, &waiter, task_thread_info(task));

        /* add waiting tasks to the end of the waitqueue (FIFO): */
        list_add_tail(&waiter.list, &lock->wait_list);
        waiter.task = task;

        lock_contended(&lock->dep_map, ip);
코드는 다음과 같이 동작한다.
  • debug_mutex_lock_common(): 로컬 변수인 waiter를 초기화한다.
  • debug_mutex_add_waiter(): 현재 태스크의 blocked_on에 위에서 초기화한 waiter를 대입한다.
  • waiter(대기자)를 wait_list(FIFO wait queue)의 마지막에 추가한다.
    • wait_list(락의 대기 FIFO )의 마지막에 waiter를 추가한다.
    • waiter.task 현재 task를 대입
  • lock_contended() 함수는 CONFIG_LOCK_STAT이 설정된 상태면 “lock_contension_bug”를 디텍트해 출력할 수 있다.

아래 그림은 wait_list에 waiter가 추가되는 과정을 나타냈다.

  • 처음엔 wait_list가 비어있고 waiter(대기자)가 하나도 없는 경우는 lock owner만이 mutex를 소유한 상태이므로 count=0이다.
  • 두 번째에서 task-A가 wait_list의 뒤에 waiter(대기자)로 추가되었다.
  • 세 번째와 네 번째에서 task-B와 task-C가 순차적으로 wait_list의 뒤에waiter(대기자)로 추가되었다.

mutex5c

 

        for (;;) {
                /*
                 * Lets try to take the lock again - this is needed even if
                 * we get here for the first time (shortly after failing to
                 * acquire the lock), to make sure that we get a wakeup once
                 * it's unlocked. Later on, if we sleep, this is the
                 * operation that gives us the lock. We xchg it to -1, so
                 * that when we release the lock, we properly wake up the
                 * other waiters. We only attempt the xchg if the count is
                 * non-negative in order to avoid unnecessary xchg operations:
                 */
                if (atomic_read(&lock->count) >= 0 &&
                    (atomic_xchg(&lock->count, -1) == 1))
                        break;

                /*
                 * got a signal? (This code gets eliminated in the
                 * TASK_UNINTERRUPTIBLE case.)
                 */
                if (unlikely(signal_pending_state(state, task))) {
                        ret = -EINTR;
                        goto err;
                }

                if (use_ww_ctx && ww_ctx->acquired > 0) {
                        ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
                        if (ret)
                                goto err;
                }
               __set_task_state(task, state);

                /* didn't get the lock, go to sleep: */
                spin_unlock_mutex(&lock->wait_lock, flags);
                schedule_preempt_disabled();
                spin_lock_mutex(&lock->wait_lock, flags);
        }
slowpath  루프 처음마다 락 획득을 시도하고 락 획득이 성공할 때 까지  대기 상태로 들어가는 것을 반복한다.
  • SMP의 여러 코어가 경쟁적으로 atomic_xchg()를 서로 호출하는 경우 각 코어의 캐시에서 update를 하면서 cache chherent 알고리즘을 통해 많은 부하가 생긴다. 이 때 발생하는 cache bouncing 문제를 해결 하기 위해 atomic_xchg()를 호출하기 전에 atomic_read()를 하도록 루틴이 추가되었다.
    • count가 0보다 같거나 크다는 말은 기존에 걸려 있던 락이 풀려 더 이상의 대기자가 없다(락이 하나만 있거나 unlock된 상태)는 뜻이다.
    • atomic_xchg()의 리턴값이 1이면(atomic_xchg가 수행되기 전의 count 값) unlock 상태였다는 뜻이다. 이렇게 lock을 획득하면 루프를 빠져나온다.
  • 작은 확률로 시그널 상태에 따라 시그널이 펜딩된 상태라면 락 획득 실패되며 에러 처리 루틴으로 점프
    • TASK_INTERRUPTIBLE 이거나 TASK_WAKE_KILL 인 상태에서 시그널이 펜딩된경우 true
    • TASK_UNINTERRUPTIBLE 케이스인 경우는 코드를 무시하면 됨.
  • ww_mutex를 사용하면서 ww_ctx->acquired 값이 0보다 크면
    • __ww_mutex_lock_check_stamp()를 호출하여 데드락 상황을 체크하여 true면 에러 처리 루틴으로 점프
  • __set_task_state(task, state)
    • 태스크 상태를 인수로 받은 상태로 변경한다.
      • 인수로 받는 상태는 다음과 같다.
        • mutex_lock(), ww_mutex_lock() -> TASK_UNINTERUPTIBLE
        • mutex_lock_interruptable(), ww_mutex_interruptable() -> TASK_INTERRUPTIBLE
        • mutex_lock_killable() -> TAKS_KILLABLE
  • spin lock을 풀고 슬립한다. 슬립이 깨면 다시 spin lock을 얻은 후 루프 처음으로 되돌아가 다시 뮤텍스 획득을 시도한다.
        __set_task_state(task, TASK_RUNNING);

        mutex_remove_waiter(lock, &waiter, current_thread_info());
        /* set it to 0 if there are no waiters left: */
        if (likely(list_empty(&lock->wait_list)))
                atomic_set(&lock->count, 0);
        debug_mutex_free_waiter(&waiter);
정상적으로 뮤텍스를 획득하여 루프에서 벗어난 경우 (여전히 spin lock은 걸려있는 상태)
  • 태스크 상태를 TASK_RUNNING으로 바꾼다.
    • slowpath에서 sleep 한 후 깨어나 lock을 얻을 때까지 문제없이 잘 동작하려면 명시적으로 태스크를 TASK_RUNNING 상태로 설정해야 한다.
  • mutex_remove_waiter()
    • 현재 태스크의 blocked_on을 null로 초기화 한다. (초기화전에 waiter가 지정)
    • waiter들끼리 연결이 되어 있는 구조에서 자신의 waiter 노드만 제거하고 초기화 한다.
    • waiter->task를 null로 초기화 한다.
  • 많은 확률로 락의 wait_list(대기큐)가 비어 있으면(대기자들이 하나도 없으면) atomic하게 count를 0(single lock, 대기자 없는 락 상태)으로 바꾼다.
  • debug_mutex_free_waiter()
    • waiter 구조체 정보를 깨끗이 지운다.
skip_wait:
        /* got the lock - cleanup and rejoice! */
        lock_acquired(&lock->dep_map, ip);
        mutex_set_owner(lock);

        if (use_ww_ctx) {
                struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
                ww_mutex_set_context_slowpath(ww, ww_ctx);
        }

        spin_unlock_mutex(&lock->wait_lock, flags);
        preempt_enable();
        return 0;

err:
        mutex_remove_waiter(lock, &waiter, task_thread_info(task));
        spin_unlock_mutex(&lock->wait_lock, flags);
        debug_mutex_free_waiter(&waiter);
        mutex_release(&lock->dep_map, 1, ip);
        preempt_enable();
        return ret;
}
fastpath를 제외하고 midpath나 slowpath에서 정상적으로 락을 획득한 경우 skip_wait: 루틴에 도착한다.
  • lock_acquired():  lockdep 디버그 코드로 트레이스 목적이며, lock_contention_bug도 디텍트하여 출력한다.
  • 락 owner를 현재 태스크로 설정
  • ww_mutex를 동작시킨 경우
    • container_of()는 구조체의 멤버변수를 사용하여 구조체의 처음을 알아내는 함수
      • container_of 매크로 | TickTalk
      • ww_mutex_set_context_slowpath()는 deadlock 회피용 slowpath 락으로 분석 생략
  • spin_unlock_mutex(): spin lock을 release 한다.
    • 락 디버그 옵션에서는 리스케쥴 필요한 경우 슬립도 한다.
  • 마지막으로 preempt_enable()을 하고 종료
  • err: 레이블은 뮤텍스 획득 실패  후 처리되는 루틴으로 분석 생략

아래 그림은 wait_list에 waiter가 삭제되는 과정을 나타냈다.

  • 처음엔 wait_list에 waiter(대기자)가 3개가 있는 상태이고 이 때 mutex lock count=-3이다.
    • lock count가 음수인 경우 대기자가 있다는 뜻인데 이 숫자를 파악하여 대기자의 수를 알아낼 수는 없다.
    • 정확한 대기자의 수는 wait_list에 존재하는 waiter 들의 수이다.
  • 두 번째엔 task-A가 wait_list의 선두에서 삭제되고 mutex를 얻게된다.
  • 세 번쨰엔 task-B가 wait_list의 선두에서 삭제되고 mutex를 얻게된다.
  • 네 번째에 최종 task-C가 wait_list에서 빠져나가면서 mutex를 얻게되고 그 때에 mutex lock count=0이다.

mutex-6d

 

midpath (optimistic spinning)

 midpath로 CONFIG_MUTEX_SPIN_ON_OWNER 옵션이 설정된 경우 midpath (adaptive spinning 또는 optimistic spinning) 루틴이 동작한다.
  • owner가 다른 CPU에서 러닝(running) 중이면서 현재 태스크가 리스케쥴할 필요가 없을 때 현재 태스크에서 스핀하며 락을 획득하려 시도한다.(busy(non-sleepable ) wait)
  • owner가 러닝중이라면 그것은 곧 락을 release 할 것이라는 근거이다.
  • 이러한 뮤텍스 구현 방식은 lock field에서 owner를 atomic하게 추적하지 않고 non-atomic하게 추적한다.
    • 당연히 fastpath 및 midpath에서는 빠른 속도를 내는 것이 목적이므로 락에 대한 추적을 하지 않는다. (락 추적 목적으로 관련 기록  루틴을 호출하지 않는다)
  • Optimistic spinning에서는 모든것을 직렬화(serialize)하는 wait_lock에 기반하기 때문에 DBUG_MUTEXES 옵션 즉 디버깅을 위해 잘 동작하지 않는다.
  • 뮤텍스 스피너들은 OSQ(MCS lock)에 큐 up된다. 그래서 오직 하나의 spinner만 뮤텍스에 대해 경쟁한다. 반면에 뮤텍스가 스피닝하지 않을 것 같으면 lock/unlock 오버헤드를 통과하므로 아무 문제 없다.
  • 락을 획득하게 되면 true를 리턴하고, 그렇지 않은 경우 false를 리턴하며 slowpath 로 가서 sleep 할 필요가 있다. 
  • Optimistic spinning은 MCS(Mellor-Crummey and Scott)가 제안한 FIFO queue 구조의 ticket spin lock과 유사한 구조로 구현되었다. (MCS like)

mutex_optimistic_spin()

SMP를 위해 다음의 조건이 성립되면 빠르게 락을 얻을 수 있다고 판단하여 이 함수(midpath)에서 슬립하지 않고 스핀(루프)을 하며 락을 얻기 위해 시도한다.
  • 락 owner가 다른 CPU에서 현재 러닝 중
    • 락 owner가 다른 CPU에서 돌고 있다는 뜻은 곧 critical section이 끝나고 락이 release 될 가능성이 높다는 뜻
  • 현재 태스크의 preemption 요청이 없음
    • 리스케쥴 요청이 없다는 뜻으로 결국 락 오너가 락을 release 하자 마자 현재의 태스크에서 곧 락을 얻을 수 있음을 의미
kernel/locking/mutex.c
/*
* Optimistic spinning.
*
* We try to spin for acquisition when we find that the lock owner
* is currently running on a (different) CPU and while we don't
* need to reschedule. The rationale is that if the lock owner is
* running, it is likely to release the lock soon.
*
* Since this needs the lock owner, and this mutex implementation
* doesn't track the owner atomically in the lock field, we need to
* track it non-atomically.
*
* We can't do this for DEBUG_MUTEXES because that relies on wait_lock
* to serialize everything.
*
* The mutex spinners are queued up using MCS lock so that only one
* spinner can compete for the mutex. However, if mutex spinning isn't
* going to happen, there is no point in going through the lock/unlock
* overhead.
*
* Returns true when the lock was taken, otherwise false, indicating
* that we need to jump to the slowpath and sleep.
*/
static bool mutex_optimistic_spin(struct mutex *lock,
                                  struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
        struct task_struct *task = current;

        if (!mutex_can_spin_on_owner(lock))
                goto done;

        /*
         * In order to avoid a stampede of mutex spinners trying to
         * acquire the mutex all at once, the spinners need to take a
         * MCS (queued) lock first before spinning on the owner field.
         */
        if (!osq_lock(&lock->osq))
                goto done;
소스를 설명하자면
  • !mutex_can_spin_on_owner()
    • 리스케쥴 요청이 있거나 lock의 owner가 러닝중이 아니면 midpath 불성립
  • osq_lock을 획득하지 못하면 midpath 불성립
    • 동시에 midpath에 진입하는 cpu들이 서로 lock을 획득하기 위해 쇄도(반복적인 시도로 인해)하는 것을 방지하기 위해서 serialization을 하여 가장 먼저 OSQ(MCS) lock을 획득한 cpu가 성공적으로 리턴된다.
    • OSQ(MCS) lock은 FIFO에 의한 큐방식으로 가장 처음에 진입한 spinner만이 뮤텍스에 대해 경쟁을 할 권한을 가지고 있다.
    • 따라서 midpath에서도 뮤텍스 획득에 대한 공정성에 접근하였다.
    • FIFO 큐에서 대기하고 있는 cpu 들은 대기하면서 만일 리스케쥴 요청이 있는 경우는 실패로 리턴된다.
    • 물론 OSQ(MCS) lock을 획득한 cpu가 당장 mutex lock을 얻게되는 것이 아니라 이미 mutex lock을 가진 owner가 mutex unlock될 때 까지 spin된 후에 얻게될 것이다.

mutex-3a

        while (true) {
                struct task_struct *owner;

                if (use_ww_ctx && ww_ctx->acquired > 0) {
                        struct ww_mutex *ww;

                        ww = container_of(lock, struct ww_mutex, base);
                        /*
                         * If ww->ctx is set the contents are undefined, only
                         * by acquiring wait_lock there is a guarantee that
                         * they are not invalid when reading.
                         *
                         * As such, when deadlock detection needs to be
                         * performed the optimistic spinning cannot be done.
                         */
                        if (ACCESS_ONCE(ww->ctx))
                                break;
                }

                /*
                 * If there's an owner, wait for it to either
                 * release the lock or go to sleep.
                 */
                owner = ACCESS_ONCE(lock->owner);
                if (owner && !mutex_spin_on_owner(lock, owner))
                        break;

                /* Try to acquire the mutex if it is unlocked. */
                if (mutex_try_to_acquire(lock)) {
                        lock_acquired(&lock->dep_map, ip);

                        if (use_ww_ctx) {
                                struct ww_mutex *ww;
                                ww = container_of(lock, struct ww_mutex, base);

                                ww_mutex_set_context_fastpath(ww, ww_ctx);
                        }

                       mutex_set_owner(lock);
                        osq_unlock(&lock->osq);
                        return true;
                }

                /*
                 * When there's no owner, we might have preempted between the
                 * owner acquiring the lock and setting the owner field. If
                 * we're an RT task that will live-lock because we won't let
                 * the owner complete.
                 */
                if (!owner && (need_resched() || rt_task(task)))
                        break;

                /*
                 * The cpu_relax() call is a compiler barrier which forces
                 * everything in this loop to be re-loaded. We don't need
                 * memory barriers as we'll eventually observe the right
                 * values at the cost of a few extra spins.
                 */
                cpu_relax_lowlatency();
        }
spinning하며(busy wait, sleep하지 않고 루프를 돌며) 락을 획득할 수 있는 조건을 얻으면 리턴한다. 루프를 벗어나면 midpath 실패 처리 루틴으로 빠져나간다.
  • usw_ww_mutex: ww_mutex 를 사용하는 경우이면서 deadlock 디텍션이 된경우(ww->ctx가 true) 루프를 벗어나 에러 처리를 한다.
  • owner 값을 가져와서(volatile 방식으로 메모리 접근) owner가 존재하면서 owner가 스핀하지 않으면 루프를 벗어난다. (midpath 실패)
  • 뮤텍스 획득을 시도하여 성공하면 다음과 같이 처리하고 리턴한다.
    • lock_acquired():  lockdep 디버그 코드로 트레이스 목적이며, lock_contention_bug도 디텍트하여 출력한다.
    • ww_mutex_ctx: ww_mutex를 사용하는 경우 ww_mutex_set_context_fastpath()를 호출한다. (분석 생략)
    • 뮤텍스 오너(owner)를 설정
    • osq(optimistic spinner queue)에서 언락(MCS queue에서 노드 제거)
  • owner가 없으면서 리스케쥴링 요청이 있거나 현재 태스크가 RT task 인 경우 루프를 벗어난다.
    • 락은 걸려있는 상태인데 owner가 없다는 뜻은 owner가 lock을 획득하고 owner필드를 설정하려고 하는 사이 즉 아직 owner field의 설정을 다 못한 사이인 경우라면 현재 태스크가 선점했을 수도 있다고 판단하여 루프를 벗어난다.(midpath 실패)
  • cpu_relax_lowlatency(): 다시 스핀하기 전에 아키텍처에 따라 메모리 배리어를 동작시킨다.
    • cpu_relax()
      • 일반적인 ARMv7 SMP 및 UP 시스템은 컴파일러 배리어인 barrier() 함수만 호출한다.
      • ARMv7 SMP 코어중 Cortex-A9 r2p0 이전 리비전 및 ARMv6 SMP 아키텍처는 자동 store buffer drain 기능이 없어 dmb_ish 명령을 수행하여야 한다.
        osq_unlock(&lock->osq);
done:
        /*
         * If we fell out of the spin path because of need_resched(),
         * reschedule now, before we try-lock the mutex. This avoids getting
         * scheduled out right after we obtained the mutex.
         */
        if (need_resched()) {
                /*
                 * We _should_ have TASK_RUNNING here, but just in case
                 * we do not, make it so, otherwise we might get stuck.
                 */
                __set_current_state(TASK_RUNNING);
                schedule_preempt_disabled();
        }

        return false;
}
루프를 벗어나면 midpath 처리가 실패한 것으로 처리되어 false를 리턴한다.
  • osq(optimistic spinner queue)에서 언락(MCS queue에서 노드 제거)
  • 리스케쥴 요청이 있는 경우 태스크 상태를 러닝으로 바꾸고 preemption되어 리스케쥴링이 되지 않도록 막는다.

 

mutex_can_spin_on_owner()

/*
* Initial check for entering the mutex spinning loop
*/
static inline int mutex_can_spin_on_owner(struct mutex *lock)
{
        struct task_struct *owner;
        int retval = 1;

        if (need_resched())
                return 0;

        rcu_read_lock();
        owner = ACCESS_ONCE(lock->owner);
        if (owner)
                retval = owner->on_cpu;
        rcu_read_unlock();
        /*
         * if lock->owner is not set, the mutex owner may have just acquired
         * it and not set the owner yet or the mutex has been released.
         */
        return retval;
}
리스케쥴 요청이 없으면서 lock의 owner가 러닝중이면(또는 owner가 설정되지 않은 경우) true 리턴
  • 리스케쥴 요청 플래그가 설정되어 있으면 false(0) 리턴
  • owner가 설정되어 있지 않으면 true(1) 리턴
  • owner가 설정되어 있으면 owner->on_cpu 값을 리턴
    • on_cpu: 1=task running, 0=task not running

 

mutex_spin_on_owner()

아래 코드는 2015년 2~4월에 걸쳐 함수가 일부분 변경된 최신 코드이다.(버그 처리와 리팩토링)
락의 owner 태스크가 동작(슬립하거나 다른 태스크로 전환되지 않는)하는 동안 spinning을 한다. midpath의 바람직한 목적은 true로 리턴하여 이 함수를 호출한 루틴에서 락을 획득하려고 시도하는 것이다. 당연히 false로 리턴하면 midpath가 성립되지 않아 slowpath로 넘어갈 것이다.
/*
 * Look out! "owner" is an entirely speculative pointer
 * access and not reliable.
 */
static noinline bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{
    bool ret = true;
    rcu_read_lock();
    while (lock->owner == owner) {
        /*
         * Ensure we emit the owner->on_cpu, dereference _after_
         * checking lock->owner still matches owner. If that fails,
         * owner might point to freed memory. If it still matches,
         * the rcu_read_lock() ensures the memory stays valid.
         */
        barrier();
        if (!owner->on_cpu || need_resched()) {
            ret = false;
            break;
        }
        cpu_relax_lowlatency();
    }
    rcu_read_unlock();
    return ret;
}

루프 전체가 rcu_read_lock()에 들어있으므로 현재 태스크가 슬립되지 않도록 하였고 빠르게 처리가 될 것을 가정하고 작성된 코드이다.

  • 락 owner 태스크의 critical section 내부에서 슬립하지 않은 채 많은 시간 소모를 하는 경우 문제가 된다. (락 owner 태스크에서 preemption 되면 차라리 실패가되어 이 루프를 벗어날 수 있다)
  • lock의 owner와 인수 owner가  같은 경우에만 루프를 돈다. 다른 경우 락 획득 준비가 끝났기 때문에 ture를 리턴한다.
    • 락 owner 태스크에서 critical section을 다 통과한 후 락을 release하면 락의 owner에 null이 들어가는 것을 이용하여 락 사용완료를 기다리는 스핀(루프)이다.
    • lock->owner를 null하고 비교하지 않는 이유는 lock->owner에 다른 태스크가 들어갈 확률도 있기 때문이다. (다른 태스크에서 lock을 획득한 경우)
  • owner의 on_cpu(태스크 가동 여부 플래그)가 false 이거나 리스케쥴 요청이 있는 경우 루프를 벗어나고 false를 리턴한다.

 

아래 두 개의 함수 코드는 변경되었으므로 상단 부분의 mutex_spin_on_owner() 함수 코드를 참고해야 한다.

static noinline  int mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{
        rcu_read_lock();
        while (owner_running(lock, owner)) {
                if (need_resched())
                        break;

                cpu_relax_lowlatency();
        }
        rcu_read_unlock();

        /*
         * We break out the loop above on need_resched() and when the
         * owner changed, which is a sign for heavy contention. Return
         * success only when lock->owner is NULL.
         */
        return lock->owner == NULL;
}

static inline bool owner_running(struct mutex *lock, struct task_struct *owner)
{
        if (lock->owner != owner)
                return false;

        /*
         * Ensure we emit the owner->on_cpu, dereference _after_ checking
         * lock->owner still matches owner, if that fails, owner might
         * point to free()d memory, if it still matches, the rcu_read_lock()
         * ensures the memory stays valid.
         */
        barrier();

        return owner->on_cpu;
}

 

osq_lock()

kernel/locking/osq_lock.c

bool osq_lock(struct optimistic_spin_queue *lock)
{
        struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
        struct optimistic_spin_node *prev, *next;
        int curr = encode_cpu(smp_processor_id());
        int old;

        node->locked = 0;
        node->next = NULL;
        node->cpu = curr;

        old = atomic_xchg(&lock->tail, curr);
        if (old == OSQ_UNLOCKED_VAL)
                return true;

        prev = decode_cpu(old);
        node->prev = prev;
        ACCESS_ONCE(prev->next) = node;

        /*  
         * Normally @prev is untouchable after the above store; because at that
         * moment unlock can proceed and wipe the node element from stack.
         *
         * However, since our nodes are static per-cpu storage, we're
         * guaranteed their existence -- this allows us to apply
         * cmpxchg in an attempt to undo our queueing.
         */

        while (!ACCESS_ONCE(node->locked)) {
                /*
                 * If we need to reschedule bail... so we can block.
                 */
                if (need_resched())
                        goto unqueue;

                cpu_relax_lowlatency();
        }
        return true;
  • struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
    • per-cpu 데이터인 즉 현재 cpu에 매치되는 osq_node 구조체 포인터를 알아온다.
  • int curr = encode_cpu(smp_processor_id());
    • OSQ(MCS) lock에서 사용하는 cpu 번호는 1번 부터 시작하므로 0번 부터 시작하는 현재 CPU 번호에 1을 더한다.
    • curr = 현재 cpu + 1
  • OSQ(MCS) lock에 하나의 엔트리도 없는 경우 가장 빨리 OSQ(MCS) lock을 획득한다.
    • old = atomic_xchg(&lock->tail, curr);
      • &lock->tail에 curr 값을 대입하고 old에 대입 전 lock->tail 값을 가져온다.
    • if (old == OSQ_UNLOCKED_VAL)
      • 가져온 cpu 값이 0인경우는 OSQ(MCS) lock이 unlock 상태이므로 가장 빠르게 OSQ(MCS) lock을 획득하게 되어 루틴을 빠져나간다.
  • 노드를 뒤에 추가한다.
    • prev = decode_cpu(old);
      • per-cpu 데이터를 사용하기 위해 old 값에서 1을 뺀 후 &osq_node 구조체 포인터 값을 알아온다.
    • node->prev = prev;
      • 현재 노드의 앞에 이전 노드 구조체 정보를 대입하여 연결한다.
    • ACCESS_ONCE(prev->next) = node;
      • 이전 노드의 뒤에 현재노드를 연결한다.
  • spin 하며 리스케쥴 요청이 없는 동안 OSQ(MCS) lock을 획득할 때까지 기다린다.
    • while (!ACCESS_ONCE(node->locked)) {
      • node->locked가 0(lock status)인 경우 spin 한다.
        • 0=locked status, 1=unlocked status
    • if (need_resched())
      • 현재 태스크에 리스케쥴 요청이 있느지 확인하여 있는 경우 OSQ(MCS) lock 획득을 포기하기 위해 unqueue 루틴으로 이동한다.
    • cpu_relax_lowlatency();
      • rpi2: ARMv7 SMP인 Cortex-A7은 컴파일러 배리어인 barrier() 함수를 호출한다.
unqueue:
        /*
         * Step - A  -- stabilize @prev
         *
         * Undo our @prev->next assignment; this will make @prev's
         * unlock()/unqueue() wait for a next pointer since @lock points to us
         * (or later).
         */

        for (;;) {
                if (prev->next == node &&
                    cmpxchg(&prev->next, node, NULL) == node)
                        break;

                /*
                 * We can only fail the cmpxchg() racing against an unlock(),
                 * in which case we should observe @node->locked becomming
                 * true.
                 */
                if (smp_load_acquire(&node->locked))
                        return true;

                cpu_relax_lowlatency();

                /*
                 * Or we race against a concurrent unqueue()'s step-B, in which
                 * case its step-C will write us a new @node->prev pointer.
                 */
                prev = ACCESS_ONCE(node->prev);
        }
  •  unqueue 루틴으로 오는 케이스는 lock->owner가 sleep되거나 현재 태스크가 리스케쥴링 요청 받아 midpath를 포기해야 하는 경우 발생한다.
    • OSQ(MCS) lock을 포기하기 위해 OSQ에서 spin되고 있는 현재 cpu에 대한 노드의 연결을 제거해야한다.
    • lock->owner가 sleep되는 경우 OSQ에서 spin되고 있는 모든 cpu에 대한 노드의 연결을 제거하기 위해 경쟁하는 상황이 벌어지므로 이에 대해 문제 없이 동작하도록 2 단계에 대해 나누어 atomic operation을 사용한다.
  • 1단계 prev 노드에서 현재 노드에 대한 연결을 제거될 때까지 루프를 돌며 반복한다. 루프를 반복하는 이유는 여러 cpu가 unqueue를 동시에 진행해야 하는 상황에서 안정적으로 하나씩 처리할 수 있도록 루틴 처리가 되야 한다. 하는 상황에서 내노드에 연결된 이전 노드의 정보를 변경하여 내 연결정보를 없애려고 하는데 이전 노드의 처리가 더 빠르게 처리되는 경우 실시간으로 node->prev 값이 바뀔 수도 있다.
    • if (prev->next == node &&
      • 먼저 현재 노드가 prev->next인지 확인
    • cmpxchg(&prev->next, node, NULL) == node)
      • atomic operation에 의해 prev 노드에 현재 노드가 연결되어 있는 경우 현재 노드를 제거한다.  제거가 성공되면 loop를 빠져나간다.
    • if (smp_load_acquire(&node->locked))
      • node->locked가 1로 설정되어 완전히 종료됨을 알게 되면 루틴을 빠져나간다.
    • cpu_relax_lowlatency();
      • rpi2: 컴파일러 배리어인 barrier()를 호출한다.
    • prev = ACCESS_ONCE(node->prev);
      • 다시 한번 내 노드와 연결되어 있는 이전(prev) 노드를 알아온다.
        /*
         * Step - B -- stabilize @next
         *
         * Similar to unlock(), wait for @node->next or move @lock from @node
         * back to @prev.
         */

        next = osq_wait_next(lock, node, prev);
        if (!next)
                return false;

        /*
         * Step - C -- unlink
         *
         * @prev is stable because its still waiting for a new @prev->next
         * pointer, @next is stable because our @node->next pointer is NULL and
         * it will wait in Step-A.
         */

        ACCESS_ONCE(next->prev) = prev;
        ACCESS_ONCE(prev->next) = next;

        return false;
}
  • 아래의 그림은 4개의 task가 각각의 cpu에서 동작한다고 가정하고 1개의 task는 mutex를 소유한 상태이고 3개의 task는 midpath에 진입하여 spin하고 있는 상태이다.
  • FIFO 구조의 OSQ(MCS) lock에서 가장 선두에 위치한 task 홀로 mutex 소유자가 release되면 곧바로 mutex를 취득할 수 있도록 spin하고 있다.
  • 나머지 2개의 task는 순번대로 연결되어 있다.

osq_lock-2

  • 아래 그림은 3개의 task가 midpath에서 spin하고 있는데 두 번째 task가 리스쥴링 요청을 받아 중간에 unqueue되는 것을 보여준다.
  • 단순히 현재 노드를 제거하고 prev 노드와 next 노드를 연결하면 되는데 실제 코드에서는 prev->next와 node->next에 null을 대입하여 먼저 연결을 끊는다.
    • midpath에서 대기하는 task 들 중 일부 또는 전부가 동시에 unquque되는 상황이라면 각 cpu에서 동시에 이 구조체에 접근하게 되는데 함부로 연결을 끊고 이어 붙이면 서로 중복 처리되어 정상적인 연결이 되지 않기 때문에 atomic operation을 이용하여 null을 집어넣어 먼저 연결을 끊어 주어야 한다.
    • prev->next와 node->next에 null을 집어 넣은 후에는 경쟁(racing) 상황에서 벗어났기 때문에 prev와 next 노드를 연결해준다.

osq_lock-3a

  • 아래 그림은 2개의 task가 경쟁적으로 unqueue되어 빠져나가는 상황을 나타내었는데 두 번째 노드와 세 번째 노드가 거의 동시에 step-A까지 동작하였고 그 다음 step-B 동작이 미세하게 두 번째 노드가 먼저 진입한 상태이다.
    • 1) 두 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
    • 2) 세 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
    • 3) 두 번째 노드가 step-B: node->next가 null인 관계로 null이 아닐 때 까지 spin
    • 4) 세 번째 노드가 step-B: node->next가 null이 아니므로 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 5, 6) 세 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)
    • 7) 두 번째 노드가 spin 하고 있다가 step-B: node->next가 null이 아니게 되자마자 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 8, 9) 두 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)

osq_lock-4a

  • 아래 그림은 2개의 task가 경쟁적으로 unqueue되어 빠져나가는 상황을 나타내었는데 두 번째 노드가 조금 먼저 진입하여 이미 step-B까지 먼저 진행된 상태에서 세 번째 노드가 step-A에 진입한 경우이다.
    • 1) 두 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
    • 2) 두 번째 노드가 step-B: node->next에 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 3) 세 번째 노드가 step-A: prev->next가 null인 관계로 null이 아닐 때 까지 spin 한다.
    • 4, 5) 두 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)
    • 6) 세 번째 노드가 spin 하고 있다가 step-A: prev->next가 null이 아니게 되자마자 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 7) 세 번째 노드가 step-B: node->next에 null을 대입하여 다음 노드로의 연결을 끊는다.
    • 8, 9) 세 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)

osq_lock-5

 

osq_unlock()

kernel/locking/osq_lock.c

void osq_unlock(struct optimistic_spin_queue *lock)
{
        struct optimistic_spin_node *node, *next;
        int curr = encode_cpu(smp_processor_id());

        /*
         * Fast path for the uncontended case.
         */
        if (likely(atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr))
                return;

        /*
         * Second most likely case.
         */
        node = this_cpu_ptr(&osq_node);
        next = xchg(&node->next, NULL);
        if (next) {
                ACCESS_ONCE(next->locked) = 1;
                return;
        }

        next = osq_wait_next(lock, node, NULL);
        if (next)
                ACCESS_ONCE(next->locked) = 1;
}
  • curr는 현재 cpu(0부터 시작) + 1
  • osq_unlock()의 구현도 처리 성능별 3단계로 구현되어 있다.
    • fastpath
      • OSQ의 마지막 노드가 현재 cpu 노드인 경우
      • if (atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr))
        • &lock->tail이 curr와 같은 경우 OSQ_UNLOCKED_VAL(0)로 대입한다.
        • 높은 확률로 atomic_cmpxchg() 함수 결과가 true이면 return 한다.
        • atomic_cmpxchg()의 결과가 curr 라는 것은 FIFO 구조의 OSQ(MCS)의 tail이 curr라는 것과 동일하다.
        • 참고: Atomic Operation | 문c
    • midpath
      • OSQ에서 내 뒤에 노드가 존재하는 경우 다음 노드를 osq spin 상태에서 벗어나도록 locked flag를 1로 설정한다.
      • node = this_cpu_ptr(&osq_node);
        • per-cpu 데이터 구조의 osq_node 구조체 포인터를 알아온다.
      • next = xchg(&node->next, NULL);
        • xchg()
          • #define xchg(ptr,x)      ((__typeof__(*(ptr)))__xchg((unsigned long)(x),(ptr),sizeof(*(ptr))))
        • &node->next <- NULL을 대입하는데 결과로 대입 이전 값을 리턴한다.
        • 참고: Atomic Operation | 문c
      • if (next)
        • 기존에 다음 노드가 존재하였었다면 next->locked에 1(unlocked)을 대입하고 함수를 빠져나온다.
        • 다음 노드가 OSQ(MCS) lock에서 가장 선두 노드이므로 그 노드가 osq spin 상태에서 벗어나도록 지시한다.
    • slowpath
      • 두 번째 노드가 분명 있지만 연결이 되어 있지 않은 경우 연결될 때까지 기다린 후 다음 노드를 알아와서 그 노드의 locked에 1로 설정하여 osq spin 상태에서 벗어나도록 지시한다.
      • next = osq_wait_next(lock, node, NULL)
        • 임시적으로 링크가 아직 연결되지 않아 안정되지 않은 상태인 경우 링크가 연결될 때까지 기다려서 다음 노드를 알아온 다음 next->locked에 1(unlocked)을 대입하고 함수를 빠져나온다.
        • 역시 다음 노드가 OSQ(MCS) lock에서 가장 선두 노드이므로 그 노드가 osq spin 상태에서 벗어나도록 지시한다.

 

아래 그림은 midpath로 mutex 요청이 여러 개가 동시에 인입되어 osq_lock() 함수가 호출되어 OSQ(MCS) lock을 얻게되는 과정을 나타내었다.

  • 적색 locked=1은 osq_unlock() 호출될 때 OSQ 노드 중 두 번째 노드를 locked=1로 표시하여 두 번째 노드가 osq_lock spin 에서 빠져나올 수 있도록 한다.

osq_lock-1b

 

osq_wait_next()

kernel/locking/osq_lock.c

/*
 * Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
 * Can return NULL in case we were the last queued and we updated @lock instead.
 */
static inline struct optimistic_spin_node *
osq_wait_next(struct optimistic_spin_queue *lock,
              struct optimistic_spin_node *node,
              struct optimistic_spin_node *prev)
{
        struct optimistic_spin_node *next = NULL;
        int curr = encode_cpu(smp_processor_id());
        int old;

        /*
         * If there is a prev node in queue, then the 'old' value will be
         * the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if
         * we're currently last in queue, then the queue will then become empty.
         */
        old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;

        for (;;) {
                if (atomic_read(&lock->tail) == curr &&
                    atomic_cmpxchg(&lock->tail, curr, old) == curr) {
                        /*
                         * We were the last queued, we moved @lock back. @prev
                         * will now observe @lock and will complete its
                         * unlock()/unqueue().
                         */
                        break;
                }

                /*
                 * We must xchg() the @node->next value, because if we were to
                 * leave it in, a concurrent unlock()/unqueue() from
                 * @node->next might complete Step-A and think its @prev is
                 * still valid.
                 *
                 * If the concurrent unlock()/unqueue() wins the race, we'll
                 * wait for either @lock to point to us, through its Step-B, or
                 * wait for a new @node->next from its Step-C.
                 */
                if (node->next) {
                        next = xchg(&node->next, NULL);
                        if (next)
                                break;
                }

                cpu_relax_lowlatency();
        }

        return next;
}
  • 내 노드의 다음에 연결된 노드를 알아오는데 다른 cpu가 unque되면서 연결이 잠시 끊어질 때가 있다 그런 경우는 기다렸다가 연결된 후 즉 안정상태가 된 후 알아와야 한다.
  • int curr = encode_cpu(smp_processor_id());
    • curr = cpu 번호 + 1
  • old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
    • prev 노드가 존재하는 경우 prev노드의 cpu 번호(1부터 시작하는)를 알아오고 없으면 0을 대입한다.
    • 0을 대입하는 것은 내 노드가 마지막인데 prev가 없는 경우 empty 큐가 되기 때문이다.
  • if (atomic_read(&lock->tail) == curr &&
    • lock의 tail이 현재 노드이면서 (내 노드가 마지막 노드이면서)
  • atomic_cmpxchg(&lock->tail, curr, old) == curr) {
    • atomic operation으로 lock->tail이 curr인 경우 old로 변경시킨다. 변경 시키기 전의 tail 값이 curr인 경우 루프를 빠져나간다.
    • 마지막 노드 표식(lock->tail)에 prev->cpu 값을 집어 넣어 이전 노드를 마지막 노드로 만든다.
  • if (node->next) {
    • 누군가 실시간으로 내 노드의 next 값에 null을 넣어 연결을 끊지 않았으면
  • next = xchg(&node->next, NULL);
    • 내 노드의 next에 null 값을 집어 넣고 변경되기 전 next 값을 알아와서 next 노드가 존재하는 경우 리턴한다.
  • cpu_relax_lowlatency();
    • 아직 node->next 값이 설정되지 않았으므로 배리어를 동작시킨 후 다시 루프를 돈다.

 

mutex_unlock()

kernel/locking/mutex.c

/**
 * mutex_unlock - release the mutex
 * @lock: the mutex to be released
 *
 * Unlock a mutex that has been locked by this task previously.
 *
 * This function must not be used in interrupt context. Unlocking
 * of a not locked mutex is not allowed.
 *
 * This function is similar to (but not equivalent to) up().
 */
void __sched mutex_unlock(struct mutex *lock)
{
        /*   
         * The unlocking fastpath is the 0->1 transition from 'locked'
         * into 'unlocked' state:
         */
#ifndef CONFIG_DEBUG_MUTEXES
        /*   
         * When debugging is enabled we must not clear the owner before time,
         * the slow path will always be taken, and that clears the owner field
         * after verifying that it was indeed current.
         */
        mutex_clear_owner(lock);
#endif
        __mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}

EXPORT_SYMBOL(mutex_unlock);
  • fastpath mutex unlock 처리를 위해 __mutex_fastpath_unlock() 함수를 호출한다.
    • mutex_lock()과 동일하게 fastpath는 atomic operation으로 구현된다.
    • fastpath가 실패하는 경우 __mutex_unlock_slowpath() 함수로 이동한다.

 

__mutex_fastpath_unlock()

include/asm-generic/mutex-dec.h

/**
 *  __mutex_fastpath_unlock - try to promote the count from 0 to 1
 *  @count: pointer of type atomic_t
 *  @fail_fn: function to call if the original value was not 0
 *      
 * Try to promote the count from 0 to 1. If it wasn't 0, call <fail_fn>.
 * In the failure case, this function is allowed to either set the value to
 * 1, or to set it to a value lower than 1.
 *
 * If the implementation sets it to a value of lower than 1, then the
 * __mutex_slowpath_needs_to_unlock() macro needs to return 1, it needs
 * to return 0 otherwise.
 */
static inline void
__mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
        if (unlikely(atomic_inc_return(count) <= 0))
                fail_fn(count);
}

  • fastpath는 atomic operation lock count를 증가시켜 적은 확률로 0이하이면(lock 상태) __mutex_unlock_slowpath() 함수를 호출한다.

 

__mutex_unlock_slowpath()

kernel/locking/mutex.c

/*
 * Release the lock, slowpath:
 */
__visible void 
__mutex_unlock_slowpath(atomic_t *lock_count)
{
        struct mutex *lock = container_of(lock_count, struct mutex, count);

        __mutex_unlock_common_slowpath(lock, 1);
}
  • __mutex_unlock_common_slowpath() 루틴을 호출한다.

 

__mutex_unlock_common_slowpath()

kernel/locking/mutex.c

/*
 * Release the lock, slowpath:
 */
static inline void
__mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{
        unsigned long flags;

        /*
         * As a performance measurement, release the lock before doing other
         * wakeup related duties to follow. This allows other tasks to acquire
         * the lock sooner, while still handling cleanups in past unlock calls.
         * This can be done as we do not enforce strict equivalence between the
         * mutex counter and wait_list.
         *
         *
         * Some architectures leave the lock unlocked in the fastpath failure
         * case, others need to leave it locked. In the later case we have to
         * unlock it here - as the lock counter is currently 0 or negative.
         */
        if (__mutex_slowpath_needs_to_unlock())
                atomic_set(&lock->count, 1);

        spin_lock_mutex(&lock->wait_lock, flags);
        mutex_release(&lock->dep_map, nested, _RET_IP_);
        debug_mutex_unlock(lock);

        if (!list_empty(&lock->wait_list)) {
                /* get the first entry from the wait-list: */
                struct mutex_waiter *waiter =
                                list_entry(lock->wait_list.next,
                                           struct mutex_waiter, list);

                debug_mutex_wake_waiter(lock, waiter);

                wake_up_process(waiter->task);
        }

        spin_unlock_mutex(&lock->wait_lock, flags);
}
  • __mutex_slowpath_nees_to_unlock()
    • 아키텍처마다 다르지만 ARM의 경우 1로 설정된다.
      • ARM에서 구현된 midpath를 위한 조건을 동작시키기 위해 1로 설정된다.
  • atomic_set(&lock->count, 1)
    • midpath에서 lock을 획득하기 위해 spin하고 있는 태스크가 있는 경우 mutex를 획득할 수 있는 기회를 slowpath 보다 먼저 주기위해 lock->count를 1(unlock)로 설정한다.
  • if (!list_empty(&lock->wait_list)) {
    • 대기자가 있는 경우
  • struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list);
    • 선두에 있는 대기자를 알아온다.
  • wake_up_process(waiter->task);
    • 알아온 대기자를 깨어나게 요청한다.

다음 그림을 보고 slowpath에서 잠들어 있는 태스크를 깨우는 동작을 확인한다.

  • CPU 1에 있는 mutex는 task가 preemption요청을 받아 slowpath로 동작하여 sleep되어 있다가 CPU2가 unlock될 때 첫 대기자인 CPU1에서 구동되고 있는 태스크를 미리 깨워서 mutex를 얻을 수 있도록 기회를 준다.

mutex-4a

 

기타 함수

spin_lock_mutex()

spin_lock_mutex() 함수는 디버깅용과 non-디버깅용 두 개가 정의 되어 있다.
kernel/locking/mutex-debug.h
#define spin_lock_mutex(lock, flags)            \
    do {                        \
        struct mutex *l = container_of(lock, struct mutex, wait_lock); \
        DEBUG_LOCKS_WARN_ON(in_interrupt());    \
        local_irq_save(flags);          \
        arch_spin_lock(&(lock)->rlock.raw_lock);\
        DEBUG_LOCKS_WARN_ON(l->magic != l); \
    } while (0)

 

kernel/locking/mutex.h

#define spin_lock_mutex(lock, flags) \
        do { spin_lock(lock); (void)(flags); } while (0)

 

debug_mutex_lock_common()

 kernel/locking/mutex-debug.c
void debug_mutex_lock_common(struct mutex *lock, struct mutex_waiter *waiter)
{
        memset(waiter, MUTEX_DEBUG_INIT, sizeof(*waiter));
        waiter->magic = waiter;
        INIT_LIST_HEAD(&waiter->list);
}
  •  waiter 구조체를 초기화한다.

debug_mutex_add_waiter()

kernel/locking/mutex-debug.c

void debug_mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter,
                            struct thread_info *ti)
{
        SMP_DEBUG_LOCKS_WARN_ON(!spin_is_locked(&lock->wait_lock));

        /* Mark the current thread as blocked on the lock: */
        ti->task->blocked_on = waiter;
}
  • 인수로 전달된 현재 context에 연결된 태스크 구조체의 blocked_on에 waiter를 추가한다. (대기 태스크들이 연결된다.)

 

__ww_mutex_lock_check_stamp()

 kernel/locking/mutex.c
static inline int __sched __ww_mutex_lock_check_stamp(struct mutex *lock, struct ww_acquire_ctx *ctx)
{
        struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
        struct ww_acquire_ctx *hold_ctx = ACCESS_ONCE(ww->ctx);

        if (!hold_ctx)
                return 0;

        if (unlikely(ctx == hold_ctx))
                return -EALREADY;

        if (ctx->stamp - hold_ctx->stamp <= LONG_MAX &&
            (ctx->stamp != hold_ctx->stamp || ctx > hold_ctx)) {
#ifdef CONFIG_DEBUG_MUTEXES
                DEBUG_LOCKS_WARN_ON(ctx->contending_lock);
                ctx->contending_lock = ww;
#endif
                return -EDEADLK;
        }

        return 0;
}
  • ww_mutex 사용 시에 사용되는 함수로 deadlock 상황을 디텍트하여 _EDEADLK를 리턴한다.

 

구조체

struct mutex

include/linux/mutex.h
/*
 * Simple, straightforward mutexes with strict semantics:
 *
 * - only one task can hold the mutex at a time
 * - only the owner can unlock the mutex
 * - multiple unlocks are not permitted
 * - recursive locking is not permitted
 * - a mutex object must be initialized via the API
 * - a mutex object must not be initialized via memset or copying
 * - task may not exit with mutex held
 * - memory areas where held locks reside must not be freed
 * - held mutexes must not be reinitialized
 * - mutexes may not be used in hardware or software interrupt
 *   contexts such as tasklets and timers
 *
 * These semantics are fully enforced when DEBUG_MUTEXES is
 * enabled. Furthermore, besides enforcing the above rules, the mutex
 * debugging code also implements a number of additional features
 * that make lock debugging easier and faster:
 *
 * - uses symbolic names of mutexes, whenever they are printed in debug output
 * - point-of-acquire tracking, symbolic lookup of function names
 * - list of all locks held in the system, printout of them
 * - owner tracking
 * - detects self-recursing locks and prints out all relevant info
 * - detects multi-task circular deadlocks and prints out all affected
 *   locks and tasks (and only those tasks)
 */
struct mutex {
        /* 1: unlocked, 0: locked, negative: locked, possible waiters */
        atomic_t                count;
        spinlock_t              wait_lock;
        struct list_head        wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
        struct task_struct      *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
        struct lockdep_map      dep_map;
#endif
};

간단하고도 엄격한 의미의 뮤텍스

  • 한 개의 태스크는 한 번에 하나의 뮤텍스를 hold(lock이 걸려 대기하는 상태)할 수 있다.
  • owner만이 뮤텍스를 unlock 할 수 있다.
  • 다중 unlock은 허가되지 않는다.
  • 재귀(recursive) locking은 허가되지 않는다.
  • 뮤텍스 오브젝트는 반드시 API를 이용하여 초기화 하여야 한다.
  • 뮤텍스 오브젝트는 절대 memset나 memcpy에 의해 초기화되어 사용될 수 없다.
  • 태스크는 뮤텍스가 held된 채로 종료할 수 없다.
  • held된 락이 위치한 메모리 영역은 절대 free 되면 안된다.
  • held된 락은 절대 재 초기화될 수 없다.
  • 뮤텍스는 tasklets와 타이머같은 하드웨어 또는 소프트웨어 인터럽트 컨텍스트를 사용할 수 없다.
  • CONFIG_DEBUG_MUTEXES 옵션을 사용한 경우
    • 디버그 출력 시 뮤텍스의 심볼명 사용
    • 락획득지점(point-of-acquire) 추적, 펑션명으로 심볼 검색(lookup)
    • 시스템에서 held된 모든 락 목록을 출력
    • 소유자 추적
    • 재귀(self-recursing) 락의 검출과 관련 정보 출력
    • 멀티 태스크 dead-lock의 검출과 영향을 끼친 락과 태스크의 출력
  • 구조체 멤버 변수
    • counter: 3가지의 뮤텍스 상태를 나타내는데 초기화 후 unlock(1) 상태로 시작한다.
      • 1: unlock
      • 0: 대기자 없는 lock
      • 음수: 대기자 있는 lock
    • wait_lock: slowpath 구현에 사용되는 spin lock으로 spin_lock_mutex()의 인수로 사용
    • wait_list: 대기자 리스트 (ticket 구현)
    • ownner: 락 소유한 태스크의 포인터, CONFIG_MUTEX_SPIN_ON_OWNER 옵션 또는 락 디버깅에서 사용
    • osq: optimisitc spin queue(spinner MCS lock), CONFIG_MUTEX_SPIN_ON_OWNER 옵션에서 사용
    • magic: 락 디버깅에서 사용
    • dep_map:  CONFIG_DEBUG_LOCK_ALLOC 옵션에서 사용

 

struct mutex_waiter

include/linux/mutex.h

/*
 * This is the control structure for tasks blocked on mutex,
 * which resides on the blocked task's kernel stack:
 */
struct mutex_waiter {
        struct list_head        list;
        struct task_struct      *task;
#ifdef CONFIG_DEBUG_MUTEXES
        void                    *magic;
#endif
};
  • list
    • wait_list의 뒤에서 각 mutex_waiter들이 연결되는데 사용된다.
  • task
    • 대기하고 있는 태스크

 

struct optimistic_spin_node

include/linux/osq_lock.h

/*
 * An MCS like lock especially tailored for optimistic spinning for sleeping
 * lock implementations (mutex, rwsem, etc).
 */
struct optimistic_spin_node {
        struct optimistic_spin_node *next, *prev;
        int locked; /* 1 if lock acquired */
        int cpu; /* encoded CPU # + 1 value */
};
  • optimistic_spin_node 구조체는 per-cpu 데이터인 전역 변수 osq_node에서 사용된다.
  • next 및 prev에 null이 있는 경우 OSQ(MCS) unlock 상태이고 FIFO 구조로 나중에 추가된 노드들은 꼬리 부분에 추가된다.
  • locked
    • 보통 0상태에 있고 midpath에서 mutex를 얻기 위해 경쟁을 하는 cpu가 osq_unlock()되어 나갈 때 OSQ에서 후 순위로 대기하고 cpu가 osq_lock()에서 빠져나올 수 있도록 하게 한다.
  • cpu
    • 1번 부터 사직되는 CPU 번호

 

struct optimistic_spin_queue

include/linux/osq_lock.h

struct optimistic_spin_queue {
        /*
         * Stores an encoded value of the CPU # of the tail node in the queue.
         * If the queue is empty, then it's set to OSQ_UNLOCKED_VAL.
         */
        atomic_t tail;
};
  • OSQ(MCS)에서 가장 마지막에 위치한 노드로 1번 부터 시작하는 cpu 번호이다. 0이 들어있는 경우 대기자가 없는 경우이다.

 

뮤텍스 초기화

 
뮤텍스의 초기화는 두 가지 방법을 사용할 수 있다.
  • DEFINE_MUTEX() 매크로를 사용하여 정적 선언으로 뮤텍스를 초기화
  • mutex_init() 매크로 함수를 호출하여 다이나믹하게 뮤텍스를 초기화

1) DEFINE_MUTEX()

include/linux/mutex.h
#define DEFINE_MUTEX(mutexname) \
        struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)

 

include/linux/mutex-debug.h

#ifdef CONFIG_DEBUG_MUTEXES
#define __DEBUG_MUTEX_INITIALIZER(lockname)                             \
        , .magic = &lockname
#else
# define __DEBUG_MUTEX_INITIALIZER(lockname)
#endif

#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define __DEP_MAP_MUTEX_INITIALIZER(lockname) \
                , .dep_map = { .name = #lockname }
#else
# define __DEP_MAP_MUTEX_INITIALIZER(lockname)
#endif

#define __MUTEX_INITIALIZER(lockname) \
                { .count = ATOMIC_INIT(1) \
                , .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
                , .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
                __DEBUG_MUTEX_INITIALIZER(lockname) \
                __DEP_MAP_MUTEX_INITIALIZER(lockname) }
  • CONFIG_DEBUG_MUTEXES 옵션이 있는 경우 mutex 구조체의 magic 멤버 변수에 lockname의 주소를 대입한다.
  • CONFIG_DEBUG_LOCK_ALLOC 옵션이 있는 경우 mutex 구조체의 dep_map 멤버 구조체 변수중 name에 lockname을 직접 대입한다.

 

2) mutex_init()

include/linux/mutex.h
/**
 * mutex_init - initialize the mutex
 * @mutex: the mutex to be initialized
 *
 * Initialize the mutex to unlocked state.
 *
 * It is not allowed to initialize an already locked mutex.
 */
# define mutex_init(mutex) \
do {                                                    \
        static struct lock_class_key __key;             \
        __mutex_init((mutex), #mutex, &__key);          \
} while (0)
  • 이미 lock된 뮤텍스는 초기화를 허락하지 않는다.

 

kernel/locking/mutex.c

void  __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
        atomic_set(&lock->count, 1);
        spin_lock_init(&lock->wait_lock);
        INIT_LIST_HEAD(&lock->wait_list);
        mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
        osq_lock_init(&lock->osq);
#endif

        debug_mutex_init(lock, name, key);
}
  • 뮤텍스 내에서 사용하는 spin_lock은 wait_list라는 링크드 리스트 구조체를 관리하기 위해 필요하다.
  • wait_list를 초기화한다. (리스트의 prev와 next 포인터들은 null이 아니라 wait_list 구조체 자기 자신을 가리킨다)
  • owner는 null로 초기화된다.
  • CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용하는 경우 optimistic spinning queue 락을 초기화한다.
  • debug_mutex_init()함수는 디버그 기능에서 사용하며 설명은 생략

 

Mutex 인터페이스

  • 뮤텍스 획득, 언인터럽터블
    • void mutex_lock(struct mutex *lock);
    • void mutex_lock_nested(struct mutex *lock, unsigned int subclass);
    • int  mutex_trylock(struct mutex *lock);
  • 뮤텍스 획득, 인터럽터블
    • int mutex_lock_interruptible_nested(struct mutex *lock, int subclass);
    • int mutex_lock_interruptible(struct mutex *lock);
  • 뮤텍스 획득, 인터럽터블, 감소시켜서 0되면
    • int atomic_dec_and_mutex_lock(atomic_t *cnt, struct mutex *lock);
  • 뮤텍스 언락
    • void mutex_unlock(struct mutex *lock);
  • 뮤텍스 락 획득 여부 체크
    • int mutex_is_locked(struct mutex *lock);

 

참고

 

3 thoughts to “Mutex for linux kernel space”

  1. 와 대단하십니다!!
    도대체 얼마나 어떻게 공부를 하시면 이럴수가 있는거죠??
    ㅜㅜ

  2. 네트웍 및 시스템 프로그래밍등을 잘하시려면 해당 API를 잘 사용하는 것 이외에도,
    리눅스 커널에대한 이해가 바탕이되는 것이 좋습니다. 감사합니다. ^^

댓글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다