이 문서에서는 리눅스 kernel space용 뮤텍스를 설명하였고, 만일 user space용 뮤텍스가 필요한 경우 이 문서가 아닌 futex 또는 pthread_mutex(POSIX 뮤텍스) 등을 참고해야 한다. 또한 “뮤텍스를 왜 사용하는가?” 또는 “뮤텍스의 사용 예” 등에 대해서는 이 문서에서 친절히 설명하지 않는다.
- 최초 뮤텍스 구현은 Ingo Molnar에 의해 시작되었다.
- 뮤텍스는 커널에서 학계 또는 유사한 이론 서적에서 발견된 ‘상호 배제(mutual exclusiion)’를 참고하는 것 뿐만아니라 공유 메모리에서의 직렬화(serialization)를 수행하는 특정 원시 locking도 참고한다.
- 2006년에 sleeping 가능한 뮤텍스 락이 소개되었고 이는 바이너리 세마포어와 유사한 동작을 한다.
- Sleeping lock은 critical section이 긴 경우(long hold times) 효과적이지만 그 특징으로 인해 인터럽트 핸들러에서 사용할 수 없다.
- 뮤텍스를 빠른 시간 내에 얻을 수 없는 상황에서는 뮤텍스를 얻을 때 까지 현 태스크를 sleep(다른 태스크에 제어권을 넘긴다) 시킨다.
- Generic Mutex Subsystem은 RT(Real Time) 커널에 있는 소스를 메인 스트림에 upstream 하였는데, 이 소스는 x86 계열의 샘플이고 다른 아키텍처로의 포팅이 쉽게될 수 있도록 가이드한다.
- 뮤텍스는 실제 원래 설계 및 목적과 다르게 뮤텍스 오브젝트가 세마포어 오브젝트보다 크기가 커져서 CPU cache와 메모리 footprint에 용량이 더 소모되는 단점이 있다.
- 2013.11월 kernel 디렉토리의 락 관련 화일이 모두 kernel/locking 디렉토리로 이전됨.
- Kernel용 mutex는 recursive lock을 지원하지 않는다.
- 성능을 개선하기 위해 성능순으로 3단계 패스(fastpath, midpath, slowpath)를 구성하였다.
- Fastpath에서는 한 번의 atomic operation 으로 뮤텍스를 가장 빠르게 얻을 수 있되 락을 얻기 전에 unlock 상태의 조건에서만 사용 가능하다.
- Optimistic spinning lock (adaptive spinning for mutex 또는 advanced MCS (queued) lock) 기술은 midpath로 non-sleepable busy wait 를 구현하였는데 SMP 시스템에서 특정 조건을 만족시키면 사용 가능하다.
- 여러 문서들에서 뮤텍스가 sleeping lock이라고 소개되지만 실상은 조금 더 복잡하다. 실제 구현에서는 락 상태별로 몇 개의 조건을 성립하면 fastpath나 midpath를 사용하여 sleep 하지 않고 뮤텍스 락을 얻게될 수 있다. 그러나 그 조건에 부합되지 않은 경우 slowpath가 동작하며 이 때 sleep 한다.
- Wait/Wound Deadlock Proof Mutex(ww-mutex)는 일반 뮤텍스와 달리 deadlock 회피가 가능하다.
사전 지식
뮤텍스를 분석하는 것은 매우 광범위한 리눅스 커널 지식이 요구된다. 뮤텍스 기술 역시 현재도 끊임없이 추가 기술이 적용되어 가면서 더 복잡해지고 있고 그럼에 따라 기존 도서나 인터넷 자료들에서 설명한 것과 약간씩 달라져가고 있다. 현재 버전의 뮤텍스 소스를 한 번에 모두 이해하기 힘들기 때문에 여러 번에 걸쳐 top-down 방식으로 위에서 아래 세부 루틴으로 단계별로 분석하는 것이 여러 가지로 이해하는데 두뇌 고통을 줄이는데 도움될 것이다. 동반해서 다음과 같이 사전에 알아둬야 할 항목들이 있다.
- likely(), unlikely() 매크로
- Atomic operation
- Spin lock
- spin_lock(), spin_unlock()
- Preemption
- preempt_enable(), preempt_disable()
- RT(Real Time) Kernel
- Interrupt
- local_irq_enable(), local_irq_disable(), local_irq_save(), local_irq_restore()
- Lockdep
- lock_acquire(), lock_release()
- Scheduler
- preemption에 따른 priority 개념, sleep 개념정도만
뮤텍스 옵션
CONFIG_DEBUG_LOCKDEP
CONFIG_DEBUG_LOCK_ALLOC
- 락뎁(LOCKDEP), 디버그 스핀락(DEBUG_SPINLOCK), 디버그 뮤텍스(DEBUG MUTEXES) 등을 선택하는 경우 락을 디버깅하기 위해 사용되는데 커널에 의해 락 메모리가 올바르게 해제(kfree(), kmem_cache_free(), free_pages(), vfree(), etc…)되지 않는 경우를 감지한다.
- 해당 옵션이 동작하는 경우 mutex_lock() 매크로는 mutex_lock_nested() 함수를 호출하고 fastpath 과정을 건너뛴다.
#define mutex_lock(lock) mutex_lock_nested(lock, 0)
void __sched mutex_lock_nested(struct mutex *lock, unsigned int subclass)
{
might_sleep();
__mutex_lock_common(lock, TASK_UNINTERRUPTIBLE,
subclass, NULL, _RET_IP_, NULL, 0);
}
CONFIG_LOCK_STAT
- 락뎁(LOCKDEP), 디버그 스핀락(DEBUG_SPINLOCK), 디버그 뮤텍스(DEBUG MUTEXES), 디버그 락할당(DEBUG_LOCK_ALLOC) 등을 선택하는 경우 락을 디버깅하기 위해 락 사용 통계를 제공한다.
- 참고: Documentation/locking/lockstat.txt
CONFIG_DEBUG_MUTEXES
- 디버깅 기능을 사용할 수 있으며 이로 인해 약간의 제한을 갖지만 락 개체에 대해 분석할 수 있다.
- 이 옵션을 사용하는 경우 fastpath 루틴은 사용할 수 없으며 항상 slowpath(midpath) 루틴만을 호출한다.
- 이 옵션의 사용 유무에 따른 구현 루틴 구분
- .enable: <linux/mutex-debug.h>, “kernel/locking/mutex-debug.h”, <asm-generic/mutex-null.h>에 있는 헤더파일이 사용됨.
- .disable: “kernel/locking/mutex.h”, <asm/mutex.h> -> <asm-generic/mutex-dec.h>
CONFIG_PROVE_LOCKING
- lock 체인을 분석하고 잘못을 디텍트해 아래와 같이 경고 메시지를 출력한다.
- circular_lock_dependency: “[ INFO: possible circular locking dependency detected ]”
- bad_irq_lock_dependency: “[ INFO: XXXXX-safe -> YYYYY-unsafe lock order detected ]”
- deadlock_bug: “[ INFO: possible recursive locking detected ]”
- usage_bug: “[ INFO: inconsistent lock state ]”
- irq_inversion_bug: “[ INFO: possible irq lock inversion dependency detected ]”
- nested_lock_not_held: “[ BUG: Nested lock was not taken ]”
- unlock_imbalance_bug: “[ BUG: bad unlock balance detected! ]”
- contention_bug: “[ BUG: bad contention detected! ]”
- held_locks_bug: “[ BUG: %s/%d still has locks held! ]”
- 참고: Documentation/locking/lockdep-design.txt
Recursive Lock vs Non-Recursive Lock
- POSIX 뮤텍스는 Recursive Lock을 허락한다. 그 의미는 같은 스레드에서 같은 뮤텍스에 대해 두 번 락을 할 수 있고 dead-lock이 되지 않는다는 뜻이다. 물론 두 번 락을 한 것에 대해 두 번 언락을 해야 한다.
- POSIX 뮤텍스를 사용하는 pthread_mutex는 PTHREAD_MUTEX_RECURSIVE 속성을 사용하여 Recursive lock을 사용할 수 있다.
- 커널에서 사용하는 뮤텍스는 Non-Recursive lock 이다. 같은 스레드에서 두 번 락을 하게되면 두 번째 lock을 획득하는 순간 멈춘다(hold)
Wait/Wound Deadlock Proof Mutex
- Maarten이 개발한 패치로 deadlock을 회피하여 뮤텍스 락을 처리하는 알고리즘으로 ww-mutex라고도 불린다.
- 커널 코드는 lock ordering과 Lockdep의 체킹 툴로 비정상 상황을 찾아낼 수 있다. 그럼에도 불구하고 deadlock 상황이 발생하는 경우 ww-mutex를 사용하는 경우 다음과 같이 dead-lock 회피 알고리즘이 동작한다.
- 첫 번째 스레드가 락A를 소유하고, 두 번째 스레드가 락B를 소유한 상태에서
- 첫 번째 스레드가 락B의 소유를 시도하고, 두 번째 스레드도 락A를 소유하려고 시도할 때 패치 적용전에는 deadlock 상황에 들어간다.
- 패치가 적용되는 경우 첫 번째 스레드가 락B의 소유를 시도하다 블럭되고
- 두 번째 스레드가 락A를 소유하려고 시도할 때 서로 교차되는 락을 발견하여 “wounded” 된다.
- wounded 의미: 현재 기존에 홀드하고 있는 잠금을 해제하고 처음부터 다시 시작해야 한다라고 말하다(told)
- 두 번째 스레드가 “wound” 처리를 위해 이미 소유하고 있던 락B를 풀었다가 다시 얻으려고 시도하지만
- 두 번째 스레드가 락B를 풀자마자 그 시점에 첫 번째 스레드가 락B를 먼저 획득한다.
- 따라서 두 번째 스레드는 락B를 얻지 못하고 블럭된다.
- 첫 번째 스레드하고 critical section을 지난 후 락B와 락A를 차례대로 반납한다.
- 첫 번째 스레드가 락B를 반납하자마자 두 번째 스레드가 락B를 소유하게 되고
- 다시 첫 번째 스레드가 락A를 반납하자마자 역시 두 번째 스레드가 락A를 소유하게 된다.
- 두 번째 스레드가 critical section을 통과한 후 락A와 락B를 연달아 해제하는 것으로 두 스레드의 모든 락의 사용이 완료된다.
- 이의 구현을 위해 ww_mutex, ww_class, ww_acquire_ctx라는 구조체가 사용된다.
- 사용 시
- mutex_lock() 대신 별도의 ww_mutex_lock()을 사용한다.
- 헤더 화일로 mutex.h 대신 ww-mutex.h를 사용한다.
Fastpath, Midpath, Slowpath Mutex Lock
성능 관련한 접근 방법으로 뮤텍스 획득을 위해 락상태에 따라 3가지의 방법 중 하나를 가장 빠른 fastpath 부터 midpath, slowpath 까지 선택 사용한다. 처음에 fastpath를 사용하여 뮤텍스가 획득되면 midpath나 slowpath 루틴을 사용하지 않는다. fastpath를 실패한 후 midpath 루틴으로 진입한 경우 역시 뮤텍스가 획득되면 slowpath 루틴을 사용하지 않는다.
- fastpath (Atomic Operation)
- unlock 상태에서 lock을 획득할 때 fastpath를 사용할 수 있다.
- atomic opeation(몇 개의 인스트럭션이며 아키텍처마다 다름)의 수행만으로 락/언락을 처리하므로 속도가 빠르다.
- count 변수를 atomic하게 감소(또는 교환) 시키는 기법으로 락을 획득하는 방법이다.
- 사전에 count 변수가 1(unlock)인 경우만 fastpath 방법으로 lock을 획득할 수 있다.
- fastpath의 구현은 다음과 같이 두 개로 나뉘어 있다.
- atomic 기반의 감소(decrement)/증가(increment) 명령으로 구현
- atomic 기반의 교환(xchg) 명령으로 구현
- fastpath는 아키텍처에 따라 일반적으로 dec/inc 기반의 사용이 적절한 경우 먼저 선택 사용된다.
- midpath (Optimistic spinning)
- optimistic spinning 또는 adaptive spinning for mutex라 불림
- 이미 락이 걸려 있는 상태에서 아래 조건이 만족하면 빠르게 락을 얻을 가능성이 있어서 midpath를 사용한다.
- 락 owner 태스크가 러닝중
- 리스케쥴 요청이 없는 경우 (현재 태스크보다 더 높은 우선 순위(need_resched)로 동작할 태스크가 없는)
- 뮤텍스 스피너(spinner)는 OSQ(MCS) lock(simple spinlock)을 사용하여 wait_queue에 큐잉(queued up)되고 블러킹되며 기존 spinlock처럼 busy-wait한다. 가장 먼저 큐에 인입된 오직 하나의 스피너만이 뮤텍스를 얻기 위해 경쟁할 수 있다.
- 실제 구현 루틴에서 fastpath 및 slowpath와 다르게 midpath라는 함수명으로 구분되어 있지 않다.
- midpath 루틴은 __mutex_lock_slowpath() 루틴의 전반부에서 호출됨
- CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용하여 동작시킨다.
- slowpath와 다르게 sleep/wake-up 사이클 없이 락을 획득한다.
- 다음 문제도 해결 완료
- slowpath
- mutex 획득의 3가지 방법 중 가장 느린 방법으로 mutex를 얻을 때까지 루프를 돌며 sleep하고 제어권을 다른 태스크에게 양도한다.
- 디버깅 옵션(CONFIG_DEBUG_MUTEXES)을 사용하는 경우에는 항상 slowpath 방법만을 사용한다.
- wait_lock spinlock을 사용하고, 태스크는 대기큐에 추가되고 unlock path에 의해 깨어날 때 까지 슬립된다.
- 여러 태스크가 경쟁적으로 slowpath에서 mutex를 소유하려고 경쟁하는 경우에는 mutex_lock() 함수 접근 순서대로 처리되지 않는다. 각 태스크가 sleep 되었다가 wake-up 될 때마다 mutex를 얻을 수 있는지 조사하고 얻을 수 있을 때 mutex를 획득한다. 나머지 태스크는 다시 sleep되며 루프를 돈다.
- sleep 기반에서 동작하는 경우에는 각 태스크가 깨어나는 타이밍이 달라서 mutex의 획득 순서에 대하여 공정하게 처리하기 위해 mutex_unlock() 함수 내부에서 대기자들 중 가장 먼저 대기했던 태스크를 먼저 깨울 수 있도록 지시하고 mutex를 release하여 최대한 해당 태스크가 빨리 mutex를 획득할 수 있도록 도와 성능을 높인다.
Fastpath, slowpath Mutex Unlock
성능 관련한 접근 방법으로 뮤텍스 해제를 위해 락상태에 따라 2가지의 방법 중 하나를 가장 빠른 fastpath와 slowpath를 선택 사용한다. 처음에 fastpath를 사용하여 뮤텍스가 해제되면 slowpath 루틴을 사용하지 않는다.
Fastpath Mutex Lock 구현 방법
__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath); 루틴은 CPU 아키텍처에 따라 3가지의 루틴 중 하나를 빌드하여 사용된다.
- dec/inc 기반의 fastpath lock (for ARM)
- xchg 기반의 fastpath lock
- fastpath 없이 곧장 slowpath(with midpath) lock
1) dec/inc 기반의 fastpath lock
- ARM 아키텍처에서 사용
- 먼저 atomic_dec_return()으로 이루어진 뮤텍스 fastpath 루틴을 동작시켜 count 값 감소 결과 값이 적은 확률로 값이 0보다 작으면 fail_fn(__mutex_lock_slowpath 함수) 호출.
- atomic_dec_return() 결과값이 0이거나 음수로 예상되는데 이에 따른 의미
- 0: unlock(1) -> lock(0)으로 상태가 바뀜.
- 음수: 기존에 뮤텍스 락이 이미 걸린 상태에서 뮤텍스 락이 또 호출되어 중첩(2번 이상)된 경우
- unlikely() 매크로는 성능향상 즉 optimization을 목적으로 주로 false가 예상되는 경우에 사용된다.
- atomic_dec()이 값을 감소시키기 전의 값을 리턴하는데 반하여 atomic_dec_return()은 값을 감소 시킨 후의 값을 리턴한다.
include/asm-generic/mutex-dec.h
static inline void __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_dec_return(count) < 0))
fail_fn(count);
}
2) xchg 기반의 fastpath lock
- 첫 번째, count 값을 0(lock)으로 교체하기 전 count 값이…
- 적은 확률로 1(unlock)이 아니면 즉, 이미 락이 있으면서 또 락이 호출된 경우 두 번째 조건 수행
- 많은 확률로 1(unlock)인 경우 조건을 빠져나간다.
- 두 번째, count 값을 -1(double lock)로 교체하기 전 count 값이…
- 많은 확률로 1(unlock)이 아니면 fail_fn(__mutex_lock_slowpath 함수) 호출 (중첩 락 성립)
- 적은 확률로 1이면 조건을 빠져나간다.
- fastpath로 싱글 뮤텍스 락 성립
- 첫 번째 조건 수행 후 두 번째 조건으로 오기 전에 우선 순위가 높은 태스크가 preempt되어 unlock된 후 다시 이 태스크로 돌아와 계속 수행하게 되는 경우 이렇게 unlock 된 상태로 있을 수 있음.
- 개개의 atomic operation은 수행 결과가 분리될 수 없지만 이렇게 개개의 atomic operation을 수행하는 경우에는 preemption 상황을 준비해야 한다.
include/asm-generic/mutex-xchg.h
static inline void __mutex_fastpath_lock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_xchg(count, 0) != 1))
/*
* We failed to acquire the lock, so mark it contended
* to ensure that any waiting tasks are woken up by the
* unlock slow path.
*/
if (likely(atomic_xchg(count, -1) != 1))
fail_fn(count);
}
3) slowpath(midpath)로 직행
- 항상 fastpath를 생략하고 fail_fn(__mutex_lock_slowpath 함수) 호출.
include/asm-generic/mutex-null.h
#define __mutex_fastpath_lock(count, fail_fn) fail_fn(count)
mutex_lock()
- mutex_lock()의 흐름은 아래와 같다. CONFIG_DEBUG_LOCK_ALLOC 기능을 사용하는 경우 mutex_lock_nested() 함수에서 fastpath를 생략하고 곧장 __mutex_lock_common() 함수(midpath와 slowpath 루틴)로 진입한다.
- 옵션이 없는 경우 ARM 아키텍처인경우 atomic_dec_return()을 호출하는 __mutex_fastpath_lock() 매크로를 호출하여 fastpath를 처음부터 호출하여 사용한다.
아래 그림은 mutex를 획득하는 조건에 대해서 흐름도로 간략히 표현하였다.
kernel/locking/mutex.c
/**
* mutex_lock - acquire the mutex
* @lock: the mutex to be acquired
*
* Lock the mutex exclusively for this task. If the mutex is not
* available right now, it will sleep until it can get it.
*
* The mutex must later on be released by the same task that
* acquired it. Recursive locking is not allowed. The task
* may not exit without first unlocking the mutex. Also, kernel
* memory where the mutex resides must not be freed with
* the mutex still locked. The mutex must first be initialized
* (or statically defined) before it can be locked. memset()-ing
* the mutex to 0 is not allowed.
*
* ( The CONFIG_DEBUG_MUTEXES .config option turns on debugging
* checks that will enforce the restrictions and will also do
* deadlock debugging. )
*
* This function is similar to (but not equivalent to) down().
*/
void __sched mutex_lock(struct mutex *lock)
{
might_sleep();
/*
* The locking fastpath is the 1->0 transition from
* 'unlocked' into 'locked' state.
*/
__mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
mutex_set_owner(lock);
}
mutex_lock() 부분 주석을 간단히 설명한다.
- 현재 태스크를 위해 베타적으로 뮤텍스를 lock한다. 만일 뮤텍스가 지금 당장 가능하지 않으면 소유할 때 까지 sleep 한다.
- 뮤텍스는 나중에 뮤텍스를 획득한 동일한 태스크에 의해 release 되야 한다.
- Recursive lock은 허용하지 않는다.
- 태스크는 처음 뮤텍스의 unlocking 없이 종료(exit) 할 수 없다.
- 또한 뮤텍스가 lock된 채로 뮤텍스가 자리한 커널 메모리가 해제(free)될 순 없다.
- 처음 뮤텍스를 lock하기 전에 먼저 초기화(또는 정적으로 defined) 되어야 한다.
- memset()으로 뮤텍스를 0으로 초기화 하는 것은 허락되지 않는다.
- CONFIG_DEBUG_MUTEXES: 이 옵션을 동작시키면(turn on) 제약 사항(fastpath를 수행하지 않는)이 있으며 dead-lock 디버깅 등을 수행할 수 있다.
- mutex_lock() 함수는 완전히 동일하진 않지만 down() 함수와 유사하다.
소스 루틴을 살펴보면…
- might_sleep()은 리스케쥴 요청을 받았을 때 sleep하는 preemption points이다. (참고: preemption, might_sleep())
- 이 루틴은 CONFIG_PREEMPT_VOLUNTARY 옵션을 사용하는 경우만 호출된다.
- __mutex_fastpath_lock(&lock->count, __mutex_lock_slowpath);
- fastpath를 호출하고 실패 시 slowpath(midpath)로 구현된 mutex lock 루틴을 호출한다.
- 아키텍처마다 약간 다르지만 ARM 아키텍처에서는 atomic_dec_return()을 사용하는 fastpath 루틴이다.
- 위에서 자세히 설명하였었지만 fastpath는 락 count 값이 1(unlock) -> 0(lock)으로 상태가 변환될 때만 성공이다.
- 뮤텍스 락에 owner를 설정해둔다.
- CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용한 경우에만 owner를 설정한다.
__mutex_lock_slowpath()
kernel/locking/mutex.c
__visible void __sched __mutex_lock_slowpath(atomic_t *lock_count)
{
struct mutex *lock = container_of(lock_count, struct mutex, count);
__mutex_lock_common(lock, TASK_UNINTERRUPTIBLE, 0,
NULL, _RET_IP_, NULL, 0);
}
- container_of()는 구조체의 멤버변수를 사용하여 구조체의 처음을 알아내는 함수
- __mutex_lock_common()에는 뮤텍스 락에 대한 midpath와 slowpath 로직이 담겨있다.
__mutex_lock_common()
__mutex_lock_common() 함수내에서 midpath와 slowpath 루틴이 순차적으로 구현되어 있다.
kernel/locking/mutex.c
static __always_inline int __sched __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
struct lockdep_map *nest_lock, unsigned long ip,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct task_struct *task = current;
struct mutex_waiter waiter;
unsigned long flags;
int ret;
preempt_disable();
mutex_acquire_nest(&lock->dep_map, subclass, 0, nest_lock, ip);
if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
/* got the lock, yay! */
preempt_enable();
return 0;
}
처음 시작하는 부분이 midpath(optimistics spin) 루틴으로 먼저 수행된다. midpath 루틴은 non-sleepable spin-wait(루프)으로 구현된다.
- preempt를 disable하여 리스케쥴링하지 않도록 하므로 슬립되지 않는다.
- mutex_acquire_nest()은 락 추척을 위한 디버깅 기능을 위해 호출한다.
- midpath인 mutex_optimistic_spin()으로 락 획득이 성공하면 preempt를 enable 한 후 성공리에 빠져나간다.
- 인수의 마지막에 use_ww_ctx가 있는데 보통 0이며, ww_mutex를 사용하는 경우엔 1이다.
- mutex_lock()을 사용하지 않고 dead-lock avoidance 기능이 있는 ww_mutex_lock()을 사용한 경우에 해당
spin_lock_mutex(&lock->wait_lock, flags);
/*
* Once more, try to acquire the lock. Only try-lock the mutex if
* it is unlocked to reduce unnecessary xchg() operations.
*/
if (!mutex_is_locked(lock) && (atomic_xchg(&lock->count, 0) == 1))
goto skip_wait;
midpath에서 lock 획득이 실패하였으면 slowpath 루틴에서 락을 획득한다. slowpath 루틴은 non-busy(sleepable) wait으로 구현된다.
- spin_lock_mutex(): 인터럽트되지 않도록 하면서 뮤텍스의 wait_lock에 spinlock을 건다.
- spin_lock을 거는 이유는 wait_list 등의 자료 구조를 보호하면서 access를 하고 동시에 두 개 이상의 태스크가 접근할 때 조금이라도 먼저 접근한 태스크에게 mutex를 획득할 수 있는 기회를 주기 위함이다.
- 본격적으로 slowpath를 수행하기 전에 한 번 더 midpath 처럼 락을 얻을 수 있는지 시도한다.
- !mutex_is_locked(): 만일 뮤텍스가 unlock 상태이고(이 루틴까지 오면서 lock 상태가 변해 unlock(release)된 경우에 해당)
- atomic_xchg(): lock(0)을 획득하였을 때 변경 전 count 값이 1(unlock)이면 락을 정상적으로 획득한 것이므로 skip_wait 라벨로 점프한다.
debug_mutex_lock_common(lock, &waiter);
debug_mutex_add_waiter(lock, &waiter, task_thread_info(task));
/* add waiting tasks to the end of the waitqueue (FIFO): */
list_add_tail(&waiter.list, &lock->wait_list);
waiter.task = task;
lock_contended(&lock->dep_map, ip);
코드는 다음과 같이 동작한다.
- debug_mutex_lock_common(): 로컬 변수인 waiter를 초기화한다.
- debug_mutex_add_waiter(): 현재 태스크의 blocked_on에 위에서 초기화한 waiter를 대입한다.
- waiter(대기자)를 wait_list(FIFO wait queue)의 마지막에 추가한다.
- wait_list(락의 대기 FIFO )의 마지막에 waiter를 추가한다.
- waiter.task 현재 task를 대입
- lock_contended() 함수는 CONFIG_LOCK_STAT이 설정된 상태면 “lock_contension_bug”를 디텍트해 출력할 수 있다.
아래 그림은 wait_list에 waiter가 추가되는 과정을 나타냈다.
- 처음엔 wait_list가 비어있고 waiter(대기자)가 하나도 없는 경우는 lock owner만이 mutex를 소유한 상태이므로 count=0이다.
- 두 번째에서 task-A가 wait_list의 뒤에 waiter(대기자)로 추가되었다.
- 세 번째와 네 번째에서 task-B와 task-C가 순차적으로 wait_list의 뒤에waiter(대기자)로 추가되었다.
for (;;) {
/*
* Lets try to take the lock again - this is needed even if
* we get here for the first time (shortly after failing to
* acquire the lock), to make sure that we get a wakeup once
* it's unlocked. Later on, if we sleep, this is the
* operation that gives us the lock. We xchg it to -1, so
* that when we release the lock, we properly wake up the
* other waiters. We only attempt the xchg if the count is
* non-negative in order to avoid unnecessary xchg operations:
*/
if (atomic_read(&lock->count) >= 0 &&
(atomic_xchg(&lock->count, -1) == 1))
break;
/*
* got a signal? (This code gets eliminated in the
* TASK_UNINTERRUPTIBLE case.)
*/
if (unlikely(signal_pending_state(state, task))) {
ret = -EINTR;
goto err;
}
if (use_ww_ctx && ww_ctx->acquired > 0) {
ret = __ww_mutex_lock_check_stamp(lock, ww_ctx);
if (ret)
goto err;
}
__set_task_state(task, state);
/* didn't get the lock, go to sleep: */
spin_unlock_mutex(&lock->wait_lock, flags);
schedule_preempt_disabled();
spin_lock_mutex(&lock->wait_lock, flags);
}
slowpath 루프 처음마다 락 획득을 시도하고 락 획득이 성공할 때 까지 대기 상태로 들어가는 것을 반복한다.
- SMP의 여러 코어가 경쟁적으로 atomic_xchg()를 서로 호출하는 경우 각 코어의 캐시에서 update를 하면서 cache chherent 알고리즘을 통해 많은 부하가 생긴다. 이 때 발생하는 cache bouncing 문제를 해결 하기 위해 atomic_xchg()를 호출하기 전에 atomic_read()를 하도록 루틴이 추가되었다.
- count가 0보다 같거나 크다는 말은 기존에 걸려 있던 락이 풀려 더 이상의 대기자가 없다(락이 하나만 있거나 unlock된 상태)는 뜻이다.
- atomic_xchg()의 리턴값이 1이면(atomic_xchg가 수행되기 전의 count 값) unlock 상태였다는 뜻이다. 이렇게 lock을 획득하면 루프를 빠져나온다.
- 작은 확률로 시그널 상태에 따라 시그널이 펜딩된 상태라면 락 획득 실패되며 에러 처리 루틴으로 점프
- TASK_INTERRUPTIBLE 이거나 TASK_WAKE_KILL 인 상태에서 시그널이 펜딩된경우 true
- TASK_UNINTERRUPTIBLE 케이스인 경우는 코드를 무시하면 됨.
- ww_mutex를 사용하면서 ww_ctx->acquired 값이 0보다 크면
- __ww_mutex_lock_check_stamp()를 호출하여 데드락 상황을 체크하여 true면 에러 처리 루틴으로 점프
- __set_task_state(task, state)
- 태스크 상태를 인수로 받은 상태로 변경한다.
- 인수로 받는 상태는 다음과 같다.
- mutex_lock(), ww_mutex_lock() -> TASK_UNINTERUPTIBLE
- mutex_lock_interruptable(), ww_mutex_interruptable() -> TASK_INTERRUPTIBLE
- mutex_lock_killable() -> TAKS_KILLABLE
- spin lock을 풀고 슬립한다. 슬립이 깨면 다시 spin lock을 얻은 후 루프 처음으로 되돌아가 다시 뮤텍스 획득을 시도한다.
__set_task_state(task, TASK_RUNNING);
mutex_remove_waiter(lock, &waiter, current_thread_info());
/* set it to 0 if there are no waiters left: */
if (likely(list_empty(&lock->wait_list)))
atomic_set(&lock->count, 0);
debug_mutex_free_waiter(&waiter);
정상적으로 뮤텍스를 획득하여 루프에서 벗어난 경우 (여전히 spin lock은 걸려있는 상태)
- 태스크 상태를 TASK_RUNNING으로 바꾼다.
- slowpath에서 sleep 한 후 깨어나 lock을 얻을 때까지 문제없이 잘 동작하려면 명시적으로 태스크를 TASK_RUNNING 상태로 설정해야 한다.
- mutex_remove_waiter()
- 현재 태스크의 blocked_on을 null로 초기화 한다. (초기화전에 waiter가 지정)
- waiter들끼리 연결이 되어 있는 구조에서 자신의 waiter 노드만 제거하고 초기화 한다.
- waiter->task를 null로 초기화 한다.
- 많은 확률로 락의 wait_list(대기큐)가 비어 있으면(대기자들이 하나도 없으면) atomic하게 count를 0(single lock, 대기자 없는 락 상태)으로 바꾼다.
- debug_mutex_free_waiter()
skip_wait:
/* got the lock - cleanup and rejoice! */
lock_acquired(&lock->dep_map, ip);
mutex_set_owner(lock);
if (use_ww_ctx) {
struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
ww_mutex_set_context_slowpath(ww, ww_ctx);
}
spin_unlock_mutex(&lock->wait_lock, flags);
preempt_enable();
return 0;
err:
mutex_remove_waiter(lock, &waiter, task_thread_info(task));
spin_unlock_mutex(&lock->wait_lock, flags);
debug_mutex_free_waiter(&waiter);
mutex_release(&lock->dep_map, 1, ip);
preempt_enable();
return ret;
}
fastpath를 제외하고 midpath나 slowpath에서 정상적으로 락을 획득한 경우 skip_wait: 루틴에 도착한다.
- lock_acquired(): lockdep 디버그 코드로 트레이스 목적이며, lock_contention_bug도 디텍트하여 출력한다.
- 락 owner를 현재 태스크로 설정
- ww_mutex를 동작시킨 경우
- container_of()는 구조체의 멤버변수를 사용하여 구조체의 처음을 알아내는 함수
- container_of 매크로 | TickTalk
- ww_mutex_set_context_slowpath()는 deadlock 회피용 slowpath 락으로 분석 생략
- spin_unlock_mutex(): spin lock을 release 한다.
- 락 디버그 옵션에서는 리스케쥴 필요한 경우 슬립도 한다.
- 마지막으로 preempt_enable()을 하고 종료
- err: 레이블은 뮤텍스 획득 실패 후 처리되는 루틴으로 분석 생략
아래 그림은 wait_list에 waiter가 삭제되는 과정을 나타냈다.
- 처음엔 wait_list에 waiter(대기자)가 3개가 있는 상태이고 이 때 mutex lock count=-3이다.
- lock count가 음수인 경우 대기자가 있다는 뜻인데 이 숫자를 파악하여 대기자의 수를 알아낼 수는 없다.
- 정확한 대기자의 수는 wait_list에 존재하는 waiter 들의 수이다.
- 두 번째엔 task-A가 wait_list의 선두에서 삭제되고 mutex를 얻게된다.
- 세 번쨰엔 task-B가 wait_list의 선두에서 삭제되고 mutex를 얻게된다.
- 네 번째에 최종 task-C가 wait_list에서 빠져나가면서 mutex를 얻게되고 그 때에 mutex lock count=0이다.
midpath (optimistic spinning)
midpath로 CONFIG_MUTEX_SPIN_ON_OWNER 옵션이 설정된 경우 midpath (adaptive spinning 또는 optimistic spinning) 루틴이 동작한다.
- owner가 다른 CPU에서 러닝(running) 중이면서 현재 태스크가 리스케쥴할 필요가 없을 때 현재 태스크에서 스핀하며 락을 획득하려 시도한다.(busy(non-sleepable ) wait)
- owner가 러닝중이라면 그것은 곧 락을 release 할 것이라는 근거이다.
- 이러한 뮤텍스 구현 방식은 lock field에서 owner를 atomic하게 추적하지 않고 non-atomic하게 추적한다.
- 당연히 fastpath 및 midpath에서는 빠른 속도를 내는 것이 목적이므로 락에 대한 추적을 하지 않는다. (락 추적 목적으로 관련 기록 루틴을 호출하지 않는다)
- Optimistic spinning에서는 모든것을 직렬화(serialize)하는 wait_lock에 기반하기 때문에 DBUG_MUTEXES 옵션 즉 디버깅을 위해 잘 동작하지 않는다.
- 뮤텍스 스피너들은 OSQ(MCS lock)에 큐 up된다. 그래서 오직 하나의 spinner만 뮤텍스에 대해 경쟁한다. 반면에 뮤텍스가 스피닝하지 않을 것 같으면 lock/unlock 오버헤드를 통과하므로 아무 문제 없다.
- 락을 획득하게 되면 true를 리턴하고, 그렇지 않은 경우 false를 리턴하며 slowpath 로 가서 sleep 할 필요가 있다.
- Optimistic spinning은 MCS(Mellor-Crummey and Scott)가 제안한 FIFO queue 구조의 ticket spin lock과 유사한 구조로 구현되었다. (MCS like)
mutex_optimistic_spin()
SMP를 위해 다음의 조건이 성립되면 빠르게 락을 얻을 수 있다고 판단하여 이 함수(midpath)에서 슬립하지 않고 스핀(루프)을 하며 락을 얻기 위해 시도한다.
- 락 owner가 다른 CPU에서 현재 러닝 중
- 락 owner가 다른 CPU에서 돌고 있다는 뜻은 곧 critical section이 끝나고 락이 release 될 가능성이 높다는 뜻
- 현재 태스크의 preemption 요청이 없음
- 리스케쥴 요청이 없다는 뜻으로 결국 락 오너가 락을 release 하자 마자 현재의 태스크에서 곧 락을 얻을 수 있음을 의미
kernel/locking/mutex.c
/*
* Optimistic spinning.
*
* We try to spin for acquisition when we find that the lock owner
* is currently running on a (different) CPU and while we don't
* need to reschedule. The rationale is that if the lock owner is
* running, it is likely to release the lock soon.
*
* Since this needs the lock owner, and this mutex implementation
* doesn't track the owner atomically in the lock field, we need to
* track it non-atomically.
*
* We can't do this for DEBUG_MUTEXES because that relies on wait_lock
* to serialize everything.
*
* The mutex spinners are queued up using MCS lock so that only one
* spinner can compete for the mutex. However, if mutex spinning isn't
* going to happen, there is no point in going through the lock/unlock
* overhead.
*
* Returns true when the lock was taken, otherwise false, indicating
* that we need to jump to the slowpath and sleep.
*/
static bool mutex_optimistic_spin(struct mutex *lock,
struct ww_acquire_ctx *ww_ctx, const bool use_ww_ctx)
{
struct task_struct *task = current;
if (!mutex_can_spin_on_owner(lock))
goto done;
/*
* In order to avoid a stampede of mutex spinners trying to
* acquire the mutex all at once, the spinners need to take a
* MCS (queued) lock first before spinning on the owner field.
*/
if (!osq_lock(&lock->osq))
goto done;
소스를 설명하자면
- !mutex_can_spin_on_owner()
- 리스케쥴 요청이 있거나 lock의 owner가 러닝중이 아니면 midpath 불성립
- osq_lock을 획득하지 못하면 midpath 불성립
- 동시에 midpath에 진입하는 cpu들이 서로 lock을 획득하기 위해 쇄도(반복적인 시도로 인해)하는 것을 방지하기 위해서 serialization을 하여 가장 먼저 OSQ(MCS) lock을 획득한 cpu가 성공적으로 리턴된다.
- OSQ(MCS) lock은 FIFO에 의한 큐방식으로 가장 처음에 진입한 spinner만이 뮤텍스에 대해 경쟁을 할 권한을 가지고 있다.
- 따라서 midpath에서도 뮤텍스 획득에 대한 공정성에 접근하였다.
- FIFO 큐에서 대기하고 있는 cpu 들은 대기하면서 만일 리스케쥴 요청이 있는 경우는 실패로 리턴된다.
- 물론 OSQ(MCS) lock을 획득한 cpu가 당장 mutex lock을 얻게되는 것이 아니라 이미 mutex lock을 가진 owner가 mutex unlock될 때 까지 spin된 후에 얻게될 것이다.
while (true) {
struct task_struct *owner;
if (use_ww_ctx && ww_ctx->acquired > 0) {
struct ww_mutex *ww;
ww = container_of(lock, struct ww_mutex, base);
/*
* If ww->ctx is set the contents are undefined, only
* by acquiring wait_lock there is a guarantee that
* they are not invalid when reading.
*
* As such, when deadlock detection needs to be
* performed the optimistic spinning cannot be done.
*/
if (ACCESS_ONCE(ww->ctx))
break;
}
/*
* If there's an owner, wait for it to either
* release the lock or go to sleep.
*/
owner = ACCESS_ONCE(lock->owner);
if (owner && !mutex_spin_on_owner(lock, owner))
break;
/* Try to acquire the mutex if it is unlocked. */
if (mutex_try_to_acquire(lock)) {
lock_acquired(&lock->dep_map, ip);
if (use_ww_ctx) {
struct ww_mutex *ww;
ww = container_of(lock, struct ww_mutex, base);
ww_mutex_set_context_fastpath(ww, ww_ctx);
}
mutex_set_owner(lock);
osq_unlock(&lock->osq);
return true;
}
/*
* When there's no owner, we might have preempted between the
* owner acquiring the lock and setting the owner field. If
* we're an RT task that will live-lock because we won't let
* the owner complete.
*/
if (!owner && (need_resched() || rt_task(task)))
break;
/*
* The cpu_relax() call is a compiler barrier which forces
* everything in this loop to be re-loaded. We don't need
* memory barriers as we'll eventually observe the right
* values at the cost of a few extra spins.
*/
cpu_relax_lowlatency();
}
spinning하며(busy wait, sleep하지 않고 루프를 돌며) 락을 획득할 수 있는 조건을 얻으면 리턴한다. 루프를 벗어나면 midpath 실패 처리 루틴으로 빠져나간다.
- usw_ww_mutex: ww_mutex 를 사용하는 경우이면서 deadlock 디텍션이 된경우(ww->ctx가 true) 루프를 벗어나 에러 처리를 한다.
- owner 값을 가져와서(volatile 방식으로 메모리 접근) owner가 존재하면서 owner가 스핀하지 않으면 루프를 벗어난다. (midpath 실패)
- 뮤텍스 획득을 시도하여 성공하면 다음과 같이 처리하고 리턴한다.
- lock_acquired(): lockdep 디버그 코드로 트레이스 목적이며, lock_contention_bug도 디텍트하여 출력한다.
- ww_mutex_ctx: ww_mutex를 사용하는 경우 ww_mutex_set_context_fastpath()를 호출한다. (분석 생략)
- 뮤텍스 오너(owner)를 설정
- osq(optimistic spinner queue)에서 언락(MCS queue에서 노드 제거)
- owner가 없으면서 리스케쥴링 요청이 있거나 현재 태스크가 RT task 인 경우 루프를 벗어난다.
- 락은 걸려있는 상태인데 owner가 없다는 뜻은 owner가 lock을 획득하고 owner필드를 설정하려고 하는 사이 즉 아직 owner field의 설정을 다 못한 사이인 경우라면 현재 태스크가 선점했을 수도 있다고 판단하여 루프를 벗어난다.(midpath 실패)
- cpu_relax_lowlatency(): 다시 스핀하기 전에 아키텍처에 따라 메모리 배리어를 동작시킨다.
- cpu_relax()
- 일반적인 ARMv7 SMP 및 UP 시스템은 컴파일러 배리어인 barrier() 함수만 호출한다.
- ARMv7 SMP 코어중 Cortex-A9 r2p0 이전 리비전 및 ARMv6 SMP 아키텍처는 자동 store buffer drain 기능이 없어 dmb_ish 명령을 수행하여야 한다.
osq_unlock(&lock->osq);
done:
/*
* If we fell out of the spin path because of need_resched(),
* reschedule now, before we try-lock the mutex. This avoids getting
* scheduled out right after we obtained the mutex.
*/
if (need_resched()) {
/*
* We _should_ have TASK_RUNNING here, but just in case
* we do not, make it so, otherwise we might get stuck.
*/
__set_current_state(TASK_RUNNING);
schedule_preempt_disabled();
}
return false;
}
루프를 벗어나면 midpath 처리가 실패한 것으로 처리되어 false를 리턴한다.
- osq(optimistic spinner queue)에서 언락(MCS queue에서 노드 제거)
- 리스케쥴 요청이 있는 경우 태스크 상태를 러닝으로 바꾸고 preemption되어 리스케쥴링이 되지 않도록 막는다.
mutex_can_spin_on_owner()
/*
* Initial check for entering the mutex spinning loop
*/
static inline int mutex_can_spin_on_owner(struct mutex *lock)
{
struct task_struct *owner;
int retval = 1;
if (need_resched())
return 0;
rcu_read_lock();
owner = ACCESS_ONCE(lock->owner);
if (owner)
retval = owner->on_cpu;
rcu_read_unlock();
/*
* if lock->owner is not set, the mutex owner may have just acquired
* it and not set the owner yet or the mutex has been released.
*/
return retval;
}
리스케쥴 요청이 없으면서 lock의 owner가 러닝중이면(또는 owner가 설정되지 않은 경우) true 리턴
- 리스케쥴 요청 플래그가 설정되어 있으면 false(0) 리턴
- owner가 설정되어 있지 않으면 true(1) 리턴
- owner가 설정되어 있으면 owner->on_cpu 값을 리턴
- on_cpu: 1=task running, 0=task not running
mutex_spin_on_owner()
아래 코드는 2015년 2~4월에 걸쳐 함수가 일부분 변경된 최신 코드이다.(버그 처리와 리팩토링)
락의 owner 태스크가 동작(슬립하거나 다른 태스크로 전환되지 않는)하는 동안 spinning을 한다. midpath의 바람직한 목적은 true로 리턴하여 이 함수를 호출한 루틴에서 락을 획득하려고 시도하는 것이다. 당연히 false로 리턴하면 midpath가 성립되지 않아 slowpath로 넘어갈 것이다.
/*
* Look out! "owner" is an entirely speculative pointer
* access and not reliable.
*/
static noinline bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{
bool ret = true;
rcu_read_lock();
while (lock->owner == owner) {
/*
* Ensure we emit the owner->on_cpu, dereference _after_
* checking lock->owner still matches owner. If that fails,
* owner might point to freed memory. If it still matches,
* the rcu_read_lock() ensures the memory stays valid.
*/
barrier();
if (!owner->on_cpu || need_resched()) {
ret = false;
break;
}
cpu_relax_lowlatency();
}
rcu_read_unlock();
return ret;
}
루프 전체가 rcu_read_lock()에 들어있으므로 현재 태스크가 슬립되지 않도록 하였고 빠르게 처리가 될 것을 가정하고 작성된 코드이다.
- 락 owner 태스크의 critical section 내부에서 슬립하지 않은 채 많은 시간 소모를 하는 경우 문제가 된다. (락 owner 태스크에서 preemption 되면 차라리 실패가되어 이 루프를 벗어날 수 있다)
- lock의 owner와 인수 owner가 같은 경우에만 루프를 돈다. 다른 경우 락 획득 준비가 끝났기 때문에 ture를 리턴한다.
- 락 owner 태스크에서 critical section을 다 통과한 후 락을 release하면 락의 owner에 null이 들어가는 것을 이용하여 락 사용완료를 기다리는 스핀(루프)이다.
- lock->owner를 null하고 비교하지 않는 이유는 lock->owner에 다른 태스크가 들어갈 확률도 있기 때문이다. (다른 태스크에서 lock을 획득한 경우)
- owner의 on_cpu(태스크 가동 여부 플래그)가 false 이거나 리스케쥴 요청이 있는 경우 루프를 벗어나고 false를 리턴한다.
아래 두 개의 함수 코드는 변경되었으므로 상단 부분의 mutex_spin_on_owner() 함수 코드를 참고해야 한다.
static noinline int mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner)
{
rcu_read_lock();
while (owner_running(lock, owner)) {
if (need_resched())
break;
cpu_relax_lowlatency();
}
rcu_read_unlock();
/*
* We break out the loop above on need_resched() and when the
* owner changed, which is a sign for heavy contention. Return
* success only when lock->owner is NULL.
*/
return lock->owner == NULL;
}
static inline bool owner_running(struct mutex *lock, struct task_struct *owner)
{
if (lock->owner != owner)
return false;
/*
* Ensure we emit the owner->on_cpu, dereference _after_ checking
* lock->owner still matches owner, if that fails, owner might
* point to free()d memory, if it still matches, the rcu_read_lock()
* ensures the memory stays valid.
*/
barrier();
return owner->on_cpu;
}
osq_lock()
kernel/locking/osq_lock.c
bool osq_lock(struct optimistic_spin_queue *lock)
{
struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
struct optimistic_spin_node *prev, *next;
int curr = encode_cpu(smp_processor_id());
int old;
node->locked = 0;
node->next = NULL;
node->cpu = curr;
old = atomic_xchg(&lock->tail, curr);
if (old == OSQ_UNLOCKED_VAL)
return true;
prev = decode_cpu(old);
node->prev = prev;
ACCESS_ONCE(prev->next) = node;
/*
* Normally @prev is untouchable after the above store; because at that
* moment unlock can proceed and wipe the node element from stack.
*
* However, since our nodes are static per-cpu storage, we're
* guaranteed their existence -- this allows us to apply
* cmpxchg in an attempt to undo our queueing.
*/
while (!ACCESS_ONCE(node->locked)) {
/*
* If we need to reschedule bail... so we can block.
*/
if (need_resched())
goto unqueue;
cpu_relax_lowlatency();
}
return true;
- struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
- per-cpu 데이터인 즉 현재 cpu에 매치되는 osq_node 구조체 포인터를 알아온다.
- int curr = encode_cpu(smp_processor_id());
- OSQ(MCS) lock에서 사용하는 cpu 번호는 1번 부터 시작하므로 0번 부터 시작하는 현재 CPU 번호에 1을 더한다.
- curr = 현재 cpu + 1
- OSQ(MCS) lock에 하나의 엔트리도 없는 경우 가장 빨리 OSQ(MCS) lock을 획득한다.
- old = atomic_xchg(&lock->tail, curr);
- &lock->tail에 curr 값을 대입하고 old에 대입 전 lock->tail 값을 가져온다.
- if (old == OSQ_UNLOCKED_VAL)
- 가져온 cpu 값이 0인경우는 OSQ(MCS) lock이 unlock 상태이므로 가장 빠르게 OSQ(MCS) lock을 획득하게 되어 루틴을 빠져나간다.
- 노드를 뒤에 추가한다.
- prev = decode_cpu(old);
- per-cpu 데이터를 사용하기 위해 old 값에서 1을 뺀 후 &osq_node 구조체 포인터 값을 알아온다.
- node->prev = prev;
- 현재 노드의 앞에 이전 노드 구조체 정보를 대입하여 연결한다.
- ACCESS_ONCE(prev->next) = node;
- spin 하며 리스케쥴 요청이 없는 동안 OSQ(MCS) lock을 획득할 때까지 기다린다.
- while (!ACCESS_ONCE(node->locked)) {
- node->locked가 0(lock status)인 경우 spin 한다.
- 0=locked status, 1=unlocked status
- if (need_resched())
- 현재 태스크에 리스케쥴 요청이 있느지 확인하여 있는 경우 OSQ(MCS) lock 획득을 포기하기 위해 unqueue 루틴으로 이동한다.
- cpu_relax_lowlatency();
- rpi2: ARMv7 SMP인 Cortex-A7은 컴파일러 배리어인 barrier() 함수를 호출한다.
unqueue:
/*
* Step - A -- stabilize @prev
*
* Undo our @prev->next assignment; this will make @prev's
* unlock()/unqueue() wait for a next pointer since @lock points to us
* (or later).
*/
for (;;) {
if (prev->next == node &&
cmpxchg(&prev->next, node, NULL) == node)
break;
/*
* We can only fail the cmpxchg() racing against an unlock(),
* in which case we should observe @node->locked becomming
* true.
*/
if (smp_load_acquire(&node->locked))
return true;
cpu_relax_lowlatency();
/*
* Or we race against a concurrent unqueue()'s step-B, in which
* case its step-C will write us a new @node->prev pointer.
*/
prev = ACCESS_ONCE(node->prev);
}
- unqueue 루틴으로 오는 케이스는 lock->owner가 sleep되거나 현재 태스크가 리스케쥴링 요청 받아 midpath를 포기해야 하는 경우 발생한다.
- OSQ(MCS) lock을 포기하기 위해 OSQ에서 spin되고 있는 현재 cpu에 대한 노드의 연결을 제거해야한다.
- lock->owner가 sleep되는 경우 OSQ에서 spin되고 있는 모든 cpu에 대한 노드의 연결을 제거하기 위해 경쟁하는 상황이 벌어지므로 이에 대해 문제 없이 동작하도록 2 단계에 대해 나누어 atomic operation을 사용한다.
- 1단계 prev 노드에서 현재 노드에 대한 연결을 제거될 때까지 루프를 돌며 반복한다. 루프를 반복하는 이유는 여러 cpu가 unqueue를 동시에 진행해야 하는 상황에서 안정적으로 하나씩 처리할 수 있도록 루틴 처리가 되야 한다. 하는 상황에서 내노드에 연결된 이전 노드의 정보를 변경하여 내 연결정보를 없애려고 하는데 이전 노드의 처리가 더 빠르게 처리되는 경우 실시간으로 node->prev 값이 바뀔 수도 있다.
- if (prev->next == node &&
- 먼저 현재 노드가 prev->next인지 확인
- cmpxchg(&prev->next, node, NULL) == node)
- atomic operation에 의해 prev 노드에 현재 노드가 연결되어 있는 경우 현재 노드를 제거한다. 제거가 성공되면 loop를 빠져나간다.
- if (smp_load_acquire(&node->locked))
- node->locked가 1로 설정되어 완전히 종료됨을 알게 되면 루틴을 빠져나간다.
- cpu_relax_lowlatency();
- rpi2: 컴파일러 배리어인 barrier()를 호출한다.
- prev = ACCESS_ONCE(node->prev);
- 다시 한번 내 노드와 연결되어 있는 이전(prev) 노드를 알아온다.
/*
* Step - B -- stabilize @next
*
* Similar to unlock(), wait for @node->next or move @lock from @node
* back to @prev.
*/
next = osq_wait_next(lock, node, prev);
if (!next)
return false;
/*
* Step - C -- unlink
*
* @prev is stable because its still waiting for a new @prev->next
* pointer, @next is stable because our @node->next pointer is NULL and
* it will wait in Step-A.
*/
ACCESS_ONCE(next->prev) = prev;
ACCESS_ONCE(prev->next) = next;
return false;
}
- 아래의 그림은 4개의 task가 각각의 cpu에서 동작한다고 가정하고 1개의 task는 mutex를 소유한 상태이고 3개의 task는 midpath에 진입하여 spin하고 있는 상태이다.
- FIFO 구조의 OSQ(MCS) lock에서 가장 선두에 위치한 task 홀로 mutex 소유자가 release되면 곧바로 mutex를 취득할 수 있도록 spin하고 있다.
- 나머지 2개의 task는 순번대로 연결되어 있다.
- 아래 그림은 3개의 task가 midpath에서 spin하고 있는데 두 번째 task가 리스쥴링 요청을 받아 중간에 unqueue되는 것을 보여준다.
- 단순히 현재 노드를 제거하고 prev 노드와 next 노드를 연결하면 되는데 실제 코드에서는 prev->next와 node->next에 null을 대입하여 먼저 연결을 끊는다.
- midpath에서 대기하는 task 들 중 일부 또는 전부가 동시에 unquque되는 상황이라면 각 cpu에서 동시에 이 구조체에 접근하게 되는데 함부로 연결을 끊고 이어 붙이면 서로 중복 처리되어 정상적인 연결이 되지 않기 때문에 atomic operation을 이용하여 null을 집어넣어 먼저 연결을 끊어 주어야 한다.
- prev->next와 node->next에 null을 집어 넣은 후에는 경쟁(racing) 상황에서 벗어났기 때문에 prev와 next 노드를 연결해준다.
- 아래 그림은 2개의 task가 경쟁적으로 unqueue되어 빠져나가는 상황을 나타내었는데 두 번째 노드와 세 번째 노드가 거의 동시에 step-A까지 동작하였고 그 다음 step-B 동작이 미세하게 두 번째 노드가 먼저 진입한 상태이다.
- 1) 두 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
- 2) 세 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
- 3) 두 번째 노드가 step-B: node->next가 null인 관계로 null이 아닐 때 까지 spin
- 4) 세 번째 노드가 step-B: node->next가 null이 아니므로 null을 대입하여 다음 노드로의 연결을 끊는다.
- 5, 6) 세 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)
- 7) 두 번째 노드가 spin 하고 있다가 step-B: node->next가 null이 아니게 되자마자 null을 대입하여 다음 노드로의 연결을 끊는다.
- 8, 9) 두 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)
- 아래 그림은 2개의 task가 경쟁적으로 unqueue되어 빠져나가는 상황을 나타내었는데 두 번째 노드가 조금 먼저 진입하여 이미 step-B까지 먼저 진행된 상태에서 세 번째 노드가 step-A에 진입한 경우이다.
- 1) 두 번째 노드가 step-A: prev->next에 null을 대입하여 이전 노드로의 연결을 끊는다.
- 2) 두 번째 노드가 step-B: node->next에 null을 대입하여 다음 노드로의 연결을 끊는다.
- 3) 세 번째 노드가 step-A: prev->next가 null인 관계로 null이 아닐 때 까지 spin 한다.
- 4, 5) 두 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)
- 6) 세 번째 노드가 spin 하고 있다가 step-A: prev->next가 null이 아니게 되자마자 null을 대입하여 다음 노드로의 연결을 끊는다.
- 7) 세 번째 노드가 step-B: node->next에 null을 대입하여 다음 노드로의 연결을 끊는다.
- 8, 9) 세 번째 노드가 step-C: 이전 노드와 다음 노드를 서로 연결한다. (자신의 노드는 앞 뒤 노드로 부터의 연결이 unlink 된다.)
osq_unlock()
kernel/locking/osq_lock.c
void osq_unlock(struct optimistic_spin_queue *lock)
{
struct optimistic_spin_node *node, *next;
int curr = encode_cpu(smp_processor_id());
/*
* Fast path for the uncontended case.
*/
if (likely(atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr))
return;
/*
* Second most likely case.
*/
node = this_cpu_ptr(&osq_node);
next = xchg(&node->next, NULL);
if (next) {
ACCESS_ONCE(next->locked) = 1;
return;
}
next = osq_wait_next(lock, node, NULL);
if (next)
ACCESS_ONCE(next->locked) = 1;
}
- curr는 현재 cpu(0부터 시작) + 1
- osq_unlock()의 구현도 처리 성능별 3단계로 구현되어 있다.
- fastpath
- OSQ의 마지막 노드가 현재 cpu 노드인 경우
- if (atomic_cmpxchg(&lock->tail, curr, OSQ_UNLOCKED_VAL) == curr))
- &lock->tail이 curr와 같은 경우 OSQ_UNLOCKED_VAL(0)로 대입한다.
- 높은 확률로 atomic_cmpxchg() 함수 결과가 true이면 return 한다.
- atomic_cmpxchg()의 결과가 curr 라는 것은 FIFO 구조의 OSQ(MCS)의 tail이 curr라는 것과 동일하다.
- 참고: Atomic Operation | 문c
- midpath
- OSQ에서 내 뒤에 노드가 존재하는 경우 다음 노드를 osq spin 상태에서 벗어나도록 locked flag를 1로 설정한다.
- node = this_cpu_ptr(&osq_node);
- per-cpu 데이터 구조의 osq_node 구조체 포인터를 알아온다.
- next = xchg(&node->next, NULL);
- xchg()
- #define xchg(ptr,x) ((__typeof__(*(ptr)))__xchg((unsigned long)(x),(ptr),sizeof(*(ptr))))
- &node->next <- NULL을 대입하는데 결과로 대입 이전 값을 리턴한다.
- 참고: Atomic Operation | 문c
- if (next)
- 기존에 다음 노드가 존재하였었다면 next->locked에 1(unlocked)을 대입하고 함수를 빠져나온다.
- 다음 노드가 OSQ(MCS) lock에서 가장 선두 노드이므로 그 노드가 osq spin 상태에서 벗어나도록 지시한다.
- slowpath
- 두 번째 노드가 분명 있지만 연결이 되어 있지 않은 경우 연결될 때까지 기다린 후 다음 노드를 알아와서 그 노드의 locked에 1로 설정하여 osq spin 상태에서 벗어나도록 지시한다.
- next = osq_wait_next(lock, node, NULL)
- 임시적으로 링크가 아직 연결되지 않아 안정되지 않은 상태인 경우 링크가 연결될 때까지 기다려서 다음 노드를 알아온 다음 next->locked에 1(unlocked)을 대입하고 함수를 빠져나온다.
- 역시 다음 노드가 OSQ(MCS) lock에서 가장 선두 노드이므로 그 노드가 osq spin 상태에서 벗어나도록 지시한다.
아래 그림은 midpath로 mutex 요청이 여러 개가 동시에 인입되어 osq_lock() 함수가 호출되어 OSQ(MCS) lock을 얻게되는 과정을 나타내었다.
- 적색 locked=1은 osq_unlock() 호출될 때 OSQ 노드 중 두 번째 노드를 locked=1로 표시하여 두 번째 노드가 osq_lock spin 에서 빠져나올 수 있도록 한다.
osq_wait_next()
kernel/locking/osq_lock.c
/*
* Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
* Can return NULL in case we were the last queued and we updated @lock instead.
*/
static inline struct optimistic_spin_node *
osq_wait_next(struct optimistic_spin_queue *lock,
struct optimistic_spin_node *node,
struct optimistic_spin_node *prev)
{
struct optimistic_spin_node *next = NULL;
int curr = encode_cpu(smp_processor_id());
int old;
/*
* If there is a prev node in queue, then the 'old' value will be
* the prev node's CPU #, else it's set to OSQ_UNLOCKED_VAL since if
* we're currently last in queue, then the queue will then become empty.
*/
old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
for (;;) {
if (atomic_read(&lock->tail) == curr &&
atomic_cmpxchg(&lock->tail, curr, old) == curr) {
/*
* We were the last queued, we moved @lock back. @prev
* will now observe @lock and will complete its
* unlock()/unqueue().
*/
break;
}
/*
* We must xchg() the @node->next value, because if we were to
* leave it in, a concurrent unlock()/unqueue() from
* @node->next might complete Step-A and think its @prev is
* still valid.
*
* If the concurrent unlock()/unqueue() wins the race, we'll
* wait for either @lock to point to us, through its Step-B, or
* wait for a new @node->next from its Step-C.
*/
if (node->next) {
next = xchg(&node->next, NULL);
if (next)
break;
}
cpu_relax_lowlatency();
}
return next;
}
- 내 노드의 다음에 연결된 노드를 알아오는데 다른 cpu가 unque되면서 연결이 잠시 끊어질 때가 있다 그런 경우는 기다렸다가 연결된 후 즉 안정상태가 된 후 알아와야 한다.
- int curr = encode_cpu(smp_processor_id());
- old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
- prev 노드가 존재하는 경우 prev노드의 cpu 번호(1부터 시작하는)를 알아오고 없으면 0을 대입한다.
- 0을 대입하는 것은 내 노드가 마지막인데 prev가 없는 경우 empty 큐가 되기 때문이다.
- if (atomic_read(&lock->tail) == curr &&
- lock의 tail이 현재 노드이면서 (내 노드가 마지막 노드이면서)
- atomic_cmpxchg(&lock->tail, curr, old) == curr) {
- atomic operation으로 lock->tail이 curr인 경우 old로 변경시킨다. 변경 시키기 전의 tail 값이 curr인 경우 루프를 빠져나간다.
- 마지막 노드 표식(lock->tail)에 prev->cpu 값을 집어 넣어 이전 노드를 마지막 노드로 만든다.
- if (node->next) {
- 누군가 실시간으로 내 노드의 next 값에 null을 넣어 연결을 끊지 않았으면
- next = xchg(&node->next, NULL);
- 내 노드의 next에 null 값을 집어 넣고 변경되기 전 next 값을 알아와서 next 노드가 존재하는 경우 리턴한다.
- cpu_relax_lowlatency();
- 아직 node->next 값이 설정되지 않았으므로 배리어를 동작시킨 후 다시 루프를 돈다.
mutex_unlock()
kernel/locking/mutex.c
/**
* mutex_unlock - release the mutex
* @lock: the mutex to be released
*
* Unlock a mutex that has been locked by this task previously.
*
* This function must not be used in interrupt context. Unlocking
* of a not locked mutex is not allowed.
*
* This function is similar to (but not equivalent to) up().
*/
void __sched mutex_unlock(struct mutex *lock)
{
/*
* The unlocking fastpath is the 0->1 transition from 'locked'
* into 'unlocked' state:
*/
#ifndef CONFIG_DEBUG_MUTEXES
/*
* When debugging is enabled we must not clear the owner before time,
* the slow path will always be taken, and that clears the owner field
* after verifying that it was indeed current.
*/
mutex_clear_owner(lock);
#endif
__mutex_fastpath_unlock(&lock->count, __mutex_unlock_slowpath);
}
EXPORT_SYMBOL(mutex_unlock);
- fastpath mutex unlock 처리를 위해 __mutex_fastpath_unlock() 함수를 호출한다.
- mutex_lock()과 동일하게 fastpath는 atomic operation으로 구현된다.
- fastpath가 실패하는 경우 __mutex_unlock_slowpath() 함수로 이동한다.
__mutex_fastpath_unlock()
include/asm-generic/mutex-dec.h
/**
* __mutex_fastpath_unlock - try to promote the count from 0 to 1
* @count: pointer of type atomic_t
* @fail_fn: function to call if the original value was not 0
*
* Try to promote the count from 0 to 1. If it wasn't 0, call <fail_fn>.
* In the failure case, this function is allowed to either set the value to
* 1, or to set it to a value lower than 1.
*
* If the implementation sets it to a value of lower than 1, then the
* __mutex_slowpath_needs_to_unlock() macro needs to return 1, it needs
* to return 0 otherwise.
*/
static inline void
__mutex_fastpath_unlock(atomic_t *count, void (*fail_fn)(atomic_t *))
{
if (unlikely(atomic_inc_return(count) <= 0))
fail_fn(count);
}
–
- fastpath는 atomic operation lock count를 증가시켜 적은 확률로 0이하이면(lock 상태) __mutex_unlock_slowpath() 함수를 호출한다.
__mutex_unlock_slowpath()
kernel/locking/mutex.c
/*
* Release the lock, slowpath:
*/
__visible void
__mutex_unlock_slowpath(atomic_t *lock_count)
{
struct mutex *lock = container_of(lock_count, struct mutex, count);
__mutex_unlock_common_slowpath(lock, 1);
}
- __mutex_unlock_common_slowpath() 루틴을 호출한다.
__mutex_unlock_common_slowpath()
kernel/locking/mutex.c
/*
* Release the lock, slowpath:
*/
static inline void
__mutex_unlock_common_slowpath(struct mutex *lock, int nested)
{
unsigned long flags;
/*
* As a performance measurement, release the lock before doing other
* wakeup related duties to follow. This allows other tasks to acquire
* the lock sooner, while still handling cleanups in past unlock calls.
* This can be done as we do not enforce strict equivalence between the
* mutex counter and wait_list.
*
*
* Some architectures leave the lock unlocked in the fastpath failure
* case, others need to leave it locked. In the later case we have to
* unlock it here - as the lock counter is currently 0 or negative.
*/
if (__mutex_slowpath_needs_to_unlock())
atomic_set(&lock->count, 1);
spin_lock_mutex(&lock->wait_lock, flags);
mutex_release(&lock->dep_map, nested, _RET_IP_);
debug_mutex_unlock(lock);
if (!list_empty(&lock->wait_list)) {
/* get the first entry from the wait-list: */
struct mutex_waiter *waiter =
list_entry(lock->wait_list.next,
struct mutex_waiter, list);
debug_mutex_wake_waiter(lock, waiter);
wake_up_process(waiter->task);
}
spin_unlock_mutex(&lock->wait_lock, flags);
}
- __mutex_slowpath_nees_to_unlock()
- 아키텍처마다 다르지만 ARM의 경우 1로 설정된다.
- ARM에서 구현된 midpath를 위한 조건을 동작시키기 위해 1로 설정된다.
- atomic_set(&lock->count, 1)
- midpath에서 lock을 획득하기 위해 spin하고 있는 태스크가 있는 경우 mutex를 획득할 수 있는 기회를 slowpath 보다 먼저 주기위해 lock->count를 1(unlock)로 설정한다.
- if (!list_empty(&lock->wait_list)) {
- struct mutex_waiter *waiter = list_entry(lock->wait_list.next, struct mutex_waiter, list);
- wake_up_process(waiter->task);
다음 그림을 보고 slowpath에서 잠들어 있는 태스크를 깨우는 동작을 확인한다.
- CPU 1에 있는 mutex는 task가 preemption요청을 받아 slowpath로 동작하여 sleep되어 있다가 CPU2가 unlock될 때 첫 대기자인 CPU1에서 구동되고 있는 태스크를 미리 깨워서 mutex를 얻을 수 있도록 기회를 준다.
기타 함수
spin_lock_mutex()
spin_lock_mutex() 함수는 디버깅용과 non-디버깅용 두 개가 정의 되어 있다.
kernel/locking/mutex-debug.h
#define spin_lock_mutex(lock, flags) \
do { \
struct mutex *l = container_of(lock, struct mutex, wait_lock); \
DEBUG_LOCKS_WARN_ON(in_interrupt()); \
local_irq_save(flags); \
arch_spin_lock(&(lock)->rlock.raw_lock);\
DEBUG_LOCKS_WARN_ON(l->magic != l); \
} while (0)
kernel/locking/mutex.h
#define spin_lock_mutex(lock, flags) \
do { spin_lock(lock); (void)(flags); } while (0)
debug_mutex_lock_common()
kernel/locking/mutex-debug.c
void debug_mutex_lock_common(struct mutex *lock, struct mutex_waiter *waiter)
{
memset(waiter, MUTEX_DEBUG_INIT, sizeof(*waiter));
waiter->magic = waiter;
INIT_LIST_HEAD(&waiter->list);
}
debug_mutex_add_waiter()
kernel/locking/mutex-debug.c
void debug_mutex_add_waiter(struct mutex *lock, struct mutex_waiter *waiter,
struct thread_info *ti)
{
SMP_DEBUG_LOCKS_WARN_ON(!spin_is_locked(&lock->wait_lock));
/* Mark the current thread as blocked on the lock: */
ti->task->blocked_on = waiter;
}
- 인수로 전달된 현재 context에 연결된 태스크 구조체의 blocked_on에 waiter를 추가한다. (대기 태스크들이 연결된다.)
__ww_mutex_lock_check_stamp()
kernel/locking/mutex.c
static inline int __sched __ww_mutex_lock_check_stamp(struct mutex *lock, struct ww_acquire_ctx *ctx)
{
struct ww_mutex *ww = container_of(lock, struct ww_mutex, base);
struct ww_acquire_ctx *hold_ctx = ACCESS_ONCE(ww->ctx);
if (!hold_ctx)
return 0;
if (unlikely(ctx == hold_ctx))
return -EALREADY;
if (ctx->stamp - hold_ctx->stamp <= LONG_MAX &&
(ctx->stamp != hold_ctx->stamp || ctx > hold_ctx)) {
#ifdef CONFIG_DEBUG_MUTEXES
DEBUG_LOCKS_WARN_ON(ctx->contending_lock);
ctx->contending_lock = ww;
#endif
return -EDEADLK;
}
return 0;
}
구조체
struct mutex
include/linux/mutex.h
/*
* Simple, straightforward mutexes with strict semantics:
*
* - only one task can hold the mutex at a time
* - only the owner can unlock the mutex
* - multiple unlocks are not permitted
* - recursive locking is not permitted
* - a mutex object must be initialized via the API
* - a mutex object must not be initialized via memset or copying
* - task may not exit with mutex held
* - memory areas where held locks reside must not be freed
* - held mutexes must not be reinitialized
* - mutexes may not be used in hardware or software interrupt
* contexts such as tasklets and timers
*
* These semantics are fully enforced when DEBUG_MUTEXES is
* enabled. Furthermore, besides enforcing the above rules, the mutex
* debugging code also implements a number of additional features
* that make lock debugging easier and faster:
*
* - uses symbolic names of mutexes, whenever they are printed in debug output
* - point-of-acquire tracking, symbolic lookup of function names
* - list of all locks held in the system, printout of them
* - owner tracking
* - detects self-recursing locks and prints out all relevant info
* - detects multi-task circular deadlocks and prints out all affected
* locks and tasks (and only those tasks)
*/
struct mutex {
/* 1: unlocked, 0: locked, negative: locked, possible waiters */
atomic_t count;
spinlock_t wait_lock;
struct list_head wait_list;
#if defined(CONFIG_DEBUG_MUTEXES) || defined(CONFIG_MUTEX_SPIN_ON_OWNER)
struct task_struct *owner;
#endif
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
struct optimistic_spin_queue osq; /* Spinner MCS lock */
#endif
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
struct lockdep_map dep_map;
#endif
};
간단하고도 엄격한 의미의 뮤텍스
- 한 개의 태스크는 한 번에 하나의 뮤텍스를 hold(lock이 걸려 대기하는 상태)할 수 있다.
- owner만이 뮤텍스를 unlock 할 수 있다.
- 다중 unlock은 허가되지 않는다.
- 재귀(recursive) locking은 허가되지 않는다.
- 뮤텍스 오브젝트는 반드시 API를 이용하여 초기화 하여야 한다.
- 뮤텍스 오브젝트는 절대 memset나 memcpy에 의해 초기화되어 사용될 수 없다.
- 태스크는 뮤텍스가 held된 채로 종료할 수 없다.
- held된 락이 위치한 메모리 영역은 절대 free 되면 안된다.
- held된 락은 절대 재 초기화될 수 없다.
- 뮤텍스는 tasklets와 타이머같은 하드웨어 또는 소프트웨어 인터럽트 컨텍스트를 사용할 수 없다.
- CONFIG_DEBUG_MUTEXES 옵션을 사용한 경우
- 디버그 출력 시 뮤텍스의 심볼명 사용
- 락획득지점(point-of-acquire) 추적, 펑션명으로 심볼 검색(lookup)
- 시스템에서 held된 모든 락 목록을 출력
- 소유자 추적
- 재귀(self-recursing) 락의 검출과 관련 정보 출력
- 멀티 태스크 dead-lock의 검출과 영향을 끼친 락과 태스크의 출력
- 구조체 멤버 변수
- counter: 3가지의 뮤텍스 상태를 나타내는데 초기화 후 unlock(1) 상태로 시작한다.
- 1: unlock
- 0: 대기자 없는 lock
- 음수: 대기자 있는 lock
- wait_lock: slowpath 구현에 사용되는 spin lock으로 spin_lock_mutex()의 인수로 사용
- wait_list: 대기자 리스트 (ticket 구현)
- ownner: 락 소유한 태스크의 포인터, CONFIG_MUTEX_SPIN_ON_OWNER 옵션 또는 락 디버깅에서 사용
- osq: optimisitc spin queue(spinner MCS lock), CONFIG_MUTEX_SPIN_ON_OWNER 옵션에서 사용
- magic: 락 디버깅에서 사용
- dep_map: CONFIG_DEBUG_LOCK_ALLOC 옵션에서 사용
struct mutex_waiter
include/linux/mutex.h
/*
* This is the control structure for tasks blocked on mutex,
* which resides on the blocked task's kernel stack:
*/
struct mutex_waiter {
struct list_head list;
struct task_struct *task;
#ifdef CONFIG_DEBUG_MUTEXES
void *magic;
#endif
};
- list
- wait_list의 뒤에서 각 mutex_waiter들이 연결되는데 사용된다.
- task
struct optimistic_spin_node
include/linux/osq_lock.h
/*
* An MCS like lock especially tailored for optimistic spinning for sleeping
* lock implementations (mutex, rwsem, etc).
*/
struct optimistic_spin_node {
struct optimistic_spin_node *next, *prev;
int locked; /* 1 if lock acquired */
int cpu; /* encoded CPU # + 1 value */
};
- optimistic_spin_node 구조체는 per-cpu 데이터인 전역 변수 osq_node에서 사용된다.
- next 및 prev에 null이 있는 경우 OSQ(MCS) unlock 상태이고 FIFO 구조로 나중에 추가된 노드들은 꼬리 부분에 추가된다.
- locked
- 보통 0상태에 있고 midpath에서 mutex를 얻기 위해 경쟁을 하는 cpu가 osq_unlock()되어 나갈 때 OSQ에서 후 순위로 대기하고 cpu가 osq_lock()에서 빠져나올 수 있도록 하게 한다.
- cpu
struct optimistic_spin_queue
include/linux/osq_lock.h
struct optimistic_spin_queue {
/*
* Stores an encoded value of the CPU # of the tail node in the queue.
* If the queue is empty, then it's set to OSQ_UNLOCKED_VAL.
*/
atomic_t tail;
};
- OSQ(MCS)에서 가장 마지막에 위치한 노드로 1번 부터 시작하는 cpu 번호이다. 0이 들어있는 경우 대기자가 없는 경우이다.
뮤텍스 초기화
뮤텍스의 초기화는 두 가지 방법을 사용할 수 있다.
- DEFINE_MUTEX() 매크로를 사용하여 정적 선언으로 뮤텍스를 초기화
- mutex_init() 매크로 함수를 호출하여 다이나믹하게 뮤텍스를 초기화
1) DEFINE_MUTEX()
include/linux/mutex.h
#define DEFINE_MUTEX(mutexname) \
struct mutex mutexname = __MUTEX_INITIALIZER(mutexname)
include/linux/mutex-debug.h
#ifdef CONFIG_DEBUG_MUTEXES
#define __DEBUG_MUTEX_INITIALIZER(lockname) \
, .magic = &lockname
#else
# define __DEBUG_MUTEX_INITIALIZER(lockname)
#endif
#ifdef CONFIG_DEBUG_LOCK_ALLOC
# define __DEP_MAP_MUTEX_INITIALIZER(lockname) \
, .dep_map = { .name = #lockname }
#else
# define __DEP_MAP_MUTEX_INITIALIZER(lockname)
#endif
#define __MUTEX_INITIALIZER(lockname) \
{ .count = ATOMIC_INIT(1) \
, .wait_lock = __SPIN_LOCK_UNLOCKED(lockname.wait_lock) \
, .wait_list = LIST_HEAD_INIT(lockname.wait_list) \
__DEBUG_MUTEX_INITIALIZER(lockname) \
__DEP_MAP_MUTEX_INITIALIZER(lockname) }
- CONFIG_DEBUG_MUTEXES 옵션이 있는 경우 mutex 구조체의 magic 멤버 변수에 lockname의 주소를 대입한다.
- CONFIG_DEBUG_LOCK_ALLOC 옵션이 있는 경우 mutex 구조체의 dep_map 멤버 구조체 변수중 name에 lockname을 직접 대입한다.
2) mutex_init()
include/linux/mutex.h
/**
* mutex_init - initialize the mutex
* @mutex: the mutex to be initialized
*
* Initialize the mutex to unlocked state.
*
* It is not allowed to initialize an already locked mutex.
*/
# define mutex_init(mutex) \
do { \
static struct lock_class_key __key; \
__mutex_init((mutex), #mutex, &__key); \
} while (0)
- 이미 lock된 뮤텍스는 초기화를 허락하지 않는다.
kernel/locking/mutex.c
void __mutex_init(struct mutex *lock, const char *name, struct lock_class_key *key)
{
atomic_set(&lock->count, 1);
spin_lock_init(&lock->wait_lock);
INIT_LIST_HEAD(&lock->wait_list);
mutex_clear_owner(lock);
#ifdef CONFIG_MUTEX_SPIN_ON_OWNER
osq_lock_init(&lock->osq);
#endif
debug_mutex_init(lock, name, key);
}
- 뮤텍스 내에서 사용하는 spin_lock은 wait_list라는 링크드 리스트 구조체를 관리하기 위해 필요하다.
- wait_list를 초기화한다. (리스트의 prev와 next 포인터들은 null이 아니라 wait_list 구조체 자기 자신을 가리킨다)
- owner는 null로 초기화된다.
- CONFIG_MUTEX_SPIN_ON_OWNER 옵션을 사용하는 경우 optimistic spinning queue 락을 초기화한다.
- debug_mutex_init()함수는 디버그 기능에서 사용하며 설명은 생략
Mutex 인터페이스
- 뮤텍스 획득, 언인터럽터블
- void mutex_lock(struct mutex *lock);
- void mutex_lock_nested(struct mutex *lock, unsigned int subclass);
- int mutex_trylock(struct mutex *lock);
- 뮤텍스 획득, 인터럽터블
- int mutex_lock_interruptible_nested(struct mutex *lock, int subclass);
- int mutex_lock_interruptible(struct mutex *lock);
- 뮤텍스 획득, 인터럽터블, 감소시켜서 0되면
- int atomic_dec_and_mutex_lock(atomic_t *cnt, struct mutex *lock);
- 뮤텍스 언락
- void mutex_unlock(struct mutex *lock);
- 뮤텍스 락 획득 여부 체크
- int mutex_is_locked(struct mutex *lock);
참고