Interrupts -8- (Workqueue 2)

 

워커

워커 생성

create_worker()

kernel/workqueue.c

/**
 * create_worker - create a new workqueue worker
 * @pool: pool the new worker will belong to
 *
 * Create and start a new worker which is attached to @pool.
 *
 * CONTEXT:
 * Might sleep.  Does GFP_KERNEL allocations.
 *
 * Return:
 * Pointer to the newly created worker.
 */
static struct worker *create_worker(struct worker_pool *pool)
{
        struct worker *worker = NULL;
        int id = -1;
        char id_buf[16];

        /* ID is needed to determine kthread name */
        id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);
        if (id < 0)
                goto fail;

        worker = alloc_worker(pool->node);
        if (!worker)
                goto fail;

        worker->pool = pool;
        worker->id = id;

        if (pool->cpu >= 0)
                snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
                         pool->attrs->nice < 0  ? "H" : "");
        else
                snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);

        worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
                                              "kworker/%s", id_buf);
        if (IS_ERR(worker->task))
                goto fail;

        set_user_nice(worker->task, pool->attrs->nice);

        /* prevent userland from meddling with cpumask of workqueue workers */
        worker->task->flags |= PF_NO_SETAFFINITY;

        /* successful, attach the worker to the pool */
        worker_attach_to_pool(worker, pool);

        /* start the newly created worker */
        spin_lock_irq(&pool->lock);
        worker->pool->nr_workers++;
        worker_enter_idle(worker);
        wake_up_process(worker->task);
        spin_unlock_irq(&pool->lock);

        return worker;

fail:
        if (id >= 0)
                ida_simple_remove(&pool->worker_ida, id);
        kfree(worker);
        return NULL;
}

요청한 워커풀에 워커 스레드를 생성한다.

  • 코드 라인 20~22에서 워커풀에서 생성할 새로운 워커를 위해 id를 받아온다.
    • pool->worker_ida는 워커들의 id의 할당 관리를 위해 IDR Radix tree 기반으로 동작한다.
  • 코드 라인 24~29에서 워커 객체를 할당 받은 후 워커풀 및 id를 지정한다.
  • 코드 라인 31~35에서 워커풀의 이름을 지정한다.
    • cpu로 바운드(지정)된 워커풀은 “<cpu>:<worker id>H”과 같이 표현한다.  ‘H’는 스레드의 nice 값이 디폴트 nice보다 작을때(우선순위가 더 높다) H 문자열이 출력된다.
    • 언바운드된 워커풀은 “u<pool id>:<worker id>”와 같이 표현한다.
  • 코드 라인 37~40에서 kthread를 통해 worker_thread() 함수가 호출되는 스레드를 생성하여 워커의 task에 대입한다.
  • 코드 라인 42에서 워커 스레드의 static_prio 값을 워커풀 속성에 있는 nice 값을 우선순위로 변환한 값으로 설정한다.
  • 코드 라인 45에서 워커 스레드에 PF_NO_SETAFFINITY 플래그를 추가하여 다른 cpu로 마이그레이션되지 않도록 막는다.
  • 코드 라인 48에서 워커 스레드를 워커풀에 지정한다.
  • 코드 라인 52에서 워커풀내의 워커 수를 나타내는 nr_workers를 1 증가시킨다.
  • 코드 라인 53~54에서 워커 스레드를 idle 상태로 진입하게 한 후 다시 깨운다.

 

alloc_worker()

kernel/workqueue.c

static struct worker *alloc_worker(int node)
{
        struct worker *worker;

        worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, node);
        if (worker) {
                INIT_LIST_HEAD(&worker->entry);
                INIT_LIST_HEAD(&worker->scheduled);
                INIT_LIST_HEAD(&worker->node);
                /* on creation a worker is in !idle && prep state */
                worker->flags = WORKER_PREP;
        }
        return worker;
}

워커 구조체를 할당받고 내부에서 관리하는 리스트들을 초기화한다.

 

워커를 워커풀에 연결(attach)

worker_attach_to_pool()

kernel/workqueue.c

/**
 * worker_attach_to_pool() - attach a worker to a pool
 * @worker: worker to be attached
 * @pool: the target pool
 *
 * Attach @worker to @pool.  Once attached, the %WORKER_UNBOUND flag and
 * cpu-binding of @worker are kept coordinated with the pool across
 * cpu-[un]hotplugs.
 */
static void worker_attach_to_pool(struct worker *worker,
                                   struct worker_pool *pool)
{
        mutex_lock(&pool->attach_mutex);

        /*
         * set_cpus_allowed_ptr() will fail if the cpumask doesn't have any
         * online CPUs.  It'll be re-applied when any of the CPUs come up.
         */
        set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);

        /*
         * The pool->attach_mutex ensures %POOL_DISASSOCIATED remains
         * stable across this function.  See the comments above the
         * flag definition for details.
         */
        if (pool->flags & POOL_DISASSOCIATED)
                worker->flags |= WORKER_UNBOUND;

        list_add_tail(&worker->node, &pool->workers);

        mutex_unlock(&pool->attach_mutex);
}

워커를 워커풀에 연결한다.

  • 코드 라인 19에서 워커풀 속성에 있는 cpumask를 워커 스레드에 지정한다.
    • 바운드된 워커풀은 특정 cpu만 마스크비트가 설정되어 있다.
    • 언바운드된 워커풀은 online된 cpu들에 대해 모두 마스크비트가 설정되어 있다.
  • 코드 라인 26~27에서 워커풀이 아직 서비스하지 않는 상태인 경우 워커에 WORKER_UNBOUND 플래그를 설정한다.
  • 코드 라인 29에서 워커풀의 workers 리스트에 워커를 추가한다.

 

워커를 워커풀에서 연결 해제

worker_detach_from_pool()

kernel/workqueue.c

/**
 * worker_detach_from_pool() - detach a worker from its pool
 * @worker: worker which is attached to its pool
 * @pool: the pool @worker is attached to
 *
 * Undo the attaching which had been done in worker_attach_to_pool().  The
 * caller worker shouldn't access to the pool after detached except it has
 * other reference to the pool.
 */
static void worker_detach_from_pool(struct worker *worker,
                                    struct worker_pool *pool)
{
        struct completion *detach_completion = NULL;

        mutex_lock(&pool->attach_mutex);
        list_del(&worker->node);
        if (list_empty(&pool->workers))
                detach_completion = pool->detach_completion;
        mutex_unlock(&pool->attach_mutex);

        /* clear leftover flags without pool->lock after it is detached */
        worker->flags &= ~(WORKER_UNBOUND | WORKER_REBOUND);

        if (detach_completion)
                complete(detach_completion);
}

워커를 워커풀에서 연결해제한다.

  • 코드 라인 16에서 워커풀의 workers 리스트에서 요청한 워커를 제거한다.
  • 코드 라인 17~18에서 워커풀이 빈 경우 워커풀의 detach_completion 상태를 알아온다.
  • 코드 라인 22에서 워커의 WORKER_UNBOUND 및 WORKER_REBOUND 플래그를 제거한다.
  • 코드 라인 24~25에서 detach 작업이 완료될 때 까지 기다린다.

 

워커 Idle 진입

worker_enter_idle()

kernel/workqueue.c

/**
 * worker_enter_idle - enter idle state
 * @worker: worker which is entering idle state
 *
 * @worker is entering idle state.  Update stats and idle timer if
 * necessary.
 *
 * LOCKING:
 * spin_lock_irq(pool->lock).
 */
static void worker_enter_idle(struct worker *worker)
{
        struct worker_pool *pool = worker->pool;

        if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) ||
            WARN_ON_ONCE(!list_empty(&worker->entry) &&
                         (worker->hentry.next || worker->hentry.pprev)))
                return;

        /* can't use worker_set_flags(), also called from create_worker() */
        worker->flags |= WORKER_IDLE;
        pool->nr_idle++;
        worker->last_active = jiffies;

        /* idle_list is LIFO */
        list_add(&worker->entry, &pool->idle_list);

        if (too_many_workers(pool) && !timer_pending(&pool->idle_timer))
                mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT);

        /*
         * Sanity check nr_running.  Because wq_unbind_fn() releases
         * pool->lock between setting %WORKER_UNBOUND and zapping
         * nr_running, the warning may trigger spuriously.  Check iff
         * unbind is not in progress.
         */
        WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
                     pool->nr_workers == pool->nr_idle &&
                     atomic_read(&pool->nr_running));
}

요청한 워커 스레드를 idle 상태로 설정하고 idle 카운터를 증가시킨다. 필요 시 idle 카운터를 동작시킨다.

  • 코드 라인 15~18에서 워커가 이미 idle 상태이거나  워커가 워커풀의 idle_list에서 대기중면서 busy_hash[] 리스트에서도 존재하는 경우 경고 메시지를 1번 출력 후 함수를 빠져나간다.
  • 코드 라인 21~23에서 워커에 WORKER_IDLE 플래그를 설정하고 idle 카운터를 1 증가시킨다. last_active에 현재 시각으로 갱신한다.
  • 코드 라인 26에서 워커풀의 idle 리스트에 워커를 추가한다.
  • 코드 라인 28~29에서 워커풀에 워커들이 너무 많이 쉬고 있으면 idle 타이머를 현재 시각 기준으로  IDLE_WORKER_TIMEOUT(5분) 후에 동작하도록 설정한다.
  • 코드 라인 37~39에서 워커풀이 서비스 중이고 워커들이 모두 idle 상태인데 동작 중인 워커들이 있다고 하는 경우 경고메시지를 출력한다.
    • nr_workers(워커풀에 등록된 워커 수) = nr_running(동작중인 워커 수) + nr_idle(idle 상태인 워커 수)

 

too_many_workers()

kernel/workqueue.c

/* Do we have too many workers and should some go away? */
static bool too_many_workers(struct worker_pool *pool)
{
        bool managing = mutex_is_locked(&pool->manager_arb);
        int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
        int nr_busy = pool->nr_workers - nr_idle;

        return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
}

워커풀에 idle 워커들이 busy 워커들의 일정 비율 이상 즉, idle 워커 수가 많은지 여부를 반환한다.

  • 코드 라인 4~6에서 워커풀에 등록된 워커들에서 idle 워커를 제외한 수를 알아온다.
  • 코드 라인 8에서 idle 워커에서 기본 idle 워커 수 2를 뺀 나머지 idle 워커 수가 busy 워커보다 일정 비율(기본 1/4배) 이상인 경우 놀고 있는 idle 워커 수가 많다고 판단하여 true를 반환한다.
    • 조건: (idle 워커 – 2) * MAX_IDLE_WORKERS_RATIO(4) > busy 워커
    • 이 함수가 true가 되는 조건 예)
      • idle=3, busy=0~4
      • idle=4, busy=0~8
      • idle=5, busy=0~12

 

다음 그림은 idle 워커가 너무 많은지 여부를 판단한다.

 

Idle 워커  타임아웃 -> idle 워커 수 줄이기

idle_worker_timeout()

kernel/workqueue.c

static void idle_worker_timeout(unsigned long __pool)
{
        struct worker_pool *pool = (void *)__pool;

        spin_lock_irq(&pool->lock);

        while (too_many_workers(pool)) {
                struct worker *worker;
                unsigned long expires;

                /* idle_list is kept in LIFO order, check the last one */
                worker = list_entry(pool->idle_list.prev, struct worker, entry);
                expires = worker->last_active + IDLE_WORKER_TIMEOUT;

                if (time_before(jiffies, expires)) {
                        mod_timer(&pool->idle_timer, expires);
                        break;
                }

                destroy_worker(worker);
        }

        spin_unlock_irq(&pool->lock);
}

idle 타이머가 만료 시 호출되며 busy 워커에 비해 너무 많은 idle 워커 비율(기본 1/4)가 있다고 판단하면 일정 비율만큼 idle 워커를 줄인다.

  • 코드 라인 7에서 워커풀에 idle 워커들이 busy 워커들의 일정 비율 이상 즉, idle 워커 수가 많은 경우에 한하여 순회한다.
  • 코드 라인 12~21에서 idle_list에서 하나의 워커를 가져와서 워커의 최종 사용 시각이 현재 시각에 비해 IDLE_WORKER_TIMEOUT(5분)을 지나지 않은 경우 만료 시각을 다시 재설정하고 함수를 빠져간다. 이미 지정된 시간을 초과한 경우 워커를 제거하고 계속 루프를 돈다.

 

 

워커 소멸

destroy_worker()

kernel/workqueue.c

/**
 * destroy_worker - destroy a workqueue worker
 * @worker: worker to be destroyed
 *
 * Destroy @worker and adjust @pool stats accordingly.  The worker should
 * be idle.
 *
 * CONTEXT:
 * spin_lock_irq(pool->lock).
 */
static void destroy_worker(struct worker *worker)
{
        struct worker_pool *pool = worker->pool;

        lockdep_assert_held(&pool->lock);

        /* sanity check frenzy */
        if (WARN_ON(worker->current_work) ||
            WARN_ON(!list_empty(&worker->scheduled)) ||
            WARN_ON(!(worker->flags & WORKER_IDLE)))
                return;

        pool->nr_workers--;
        pool->nr_idle--;

        list_del_init(&worker->entry);
        worker->flags |= WORKER_DIE;
        wake_up_process(worker->task);
}

워커를 소멸시킨다. 즉 워커 스레드를 소멸시킨다.

  • 코드 라인 18~21에서 요청한 워크가 동작 중이거나 이 워커에 스케줄된 워크가 있거나 idle 플래그가 없는 경우 경고 메시지를 출력하고 함수를 빠져나간다.
  • 코드 라인 23에서 워커풀에 등록된 워커 수인 nr_workers를 1 감소시킨다.
  • 코드 라인 24에서 워커풀에서 idle 중인 워커 수를 나타내는 nr_idle을 1 감소시킨다.
  • 코드 라인 26에서 워커를 워커풀의 idle_list 에서 제거한다.
  • 코드 라인 27~28에서 워커에 WORKER_DIE 플래그를 설정하고 worker에 연결된 태스크를 깨워 소멸 처리한다.
    • 워커풀에서 워커 id를 반납하고 워커풀에서 디태치하며 워커를 제거한다.
    • 제거중인 상황에서의 태스크명을 “kworker/dying”으로 변경한다.

 

워크가 동작했던 워커 찾기

find_worker_executing_work()

kernel/workqueue.c

/**
 * find_worker_executing_work - find worker which is executing a work
 * @pool: pool of interest
 * @work: work to find worker for
 *
 * Find a worker which is executing @work on @pool by searching
 * @pool->busy_hash which is keyed by the address of @work.  For a worker
 * to match, its current execution should match the address of @work and
 * its work function.  This is to avoid unwanted dependency between
 * unrelated work executions through a work item being recycled while still
 * being executed.
 *
 * This is a bit tricky.  A work item may be freed once its execution
 * starts and nothing prevents the freed area from being recycled for
 * another work item.  If the same work item address ends up being reused
 * before the original execution finishes, workqueue will identify the
 * recycled work item as currently executing and make it wait until the
 * current execution finishes, introducing an unwanted dependency.
 *
 * This function checks the work item address and work function to avoid
 * false positives.  Note that this isn't complete as one may construct a
 * work function which can introduce dependency onto itself through a
 * recycled work item.  Well, if somebody wants to shoot oneself in the
 * foot that badly, there's only so much we can do, and if such deadlock
 * actually occurs, it should be easy to locate the culprit work function.
 *
 * CONTEXT:
 * spin_lock_irq(pool->lock).
 *
 * Return:
 * Pointer to worker which is executing @work if found, %NULL
 * otherwise.
 */
static struct worker *find_worker_executing_work(struct worker_pool *pool,
                                                 struct work_struct *work)
{
        struct worker *worker;

        hash_for_each_possible(pool->busy_hash, worker, hentry,
                               (unsigned long)work)
                if (worker->current_work == work &&
                    worker->current_func == work->func)
                        return worker;

        return NULL;
}

동작중인 워커들에서 워크가 동작하는 워커를 찾아온다. 없으면 null을 반환한다.

  • busy_hash 리스트를 순회하며 요청한 워크가 현재 동작중인 워커를 찾는다.

 

need_more_worker()

kernel/workqueue.c

/*
 * Need to wake up a worker?  Called from anything but currently
 * running workers.
 *
 * Note that, because unbound workers never contribute to nr_running, this
 * function will always return %true for unbound pools as long as the
 * worklist isn't empty.
 */
static bool need_more_worker(struct worker_pool *pool)
{
        return !list_empty(&pool->worklist) && __need_more_worker(pool);
}

워커가 더 필요한지 여부를 알아온다.

  • 워커풀에 작업이 대기되어 있고 워커가 더 필요한 경우 true를 반환한다.

 

__need_more_worker()

kernel/workqueue.c

/*
 * Policy functions.  These define the policies on how the global worker
 * pools are managed.  Unless noted otherwise, these functions assume that
 * they're being called with pool->lock held.
 */

static bool __need_more_worker(struct worker_pool *pool)
{
        return !atomic_read(&pool->nr_running);
}

워커풀에서 동작 중인 워커가 없는지 여부를 반환한다. (1=현재 동작중인 워커가 없다.)

 

wake_up_worker()

kernel/workqueue.c

/**
 * wake_up_worker - wake up an idle worker
 * @pool: worker pool to wake worker from
 *
 * Wake up the first idle worker of @pool.
 *
 * CONTEXT:
 * spin_lock_irq(pool->lock).
 */
static void wake_up_worker(struct worker_pool *pool)
{
        struct worker *worker = first_idle_worker(pool);

        if (likely(worker))
                wake_up_process(worker->task);
}

요청한 워커풀의 첫 번째  idle 워커를 깨운다.

 

first_idle_worker()

kernel/workqueue.c

/* Return the first idle worker.  Safe with preemption disabled */
static struct worker *first_idle_worker(struct worker_pool *pool)
{
        if (unlikely(list_empty(&pool->idle_list)))
                return NULL;

        return list_first_entry(&pool->idle_list, struct worker, entry);
}

요청한 워커풀에서 첫 idle 워커를 반환한다.

  • idle_list에서 있는 첫 번째 워커를 반환한다.

 

워커 스레드 동작

worker_thread()

kernel/workqueue.c

/**
 * worker_thread - the worker thread function
 * @__worker: self
 *
 * The worker thread function.  All workers belong to a worker_pool -
 * either a per-cpu one or dynamic unbound one.  These workers process all
 * work items regardless of their specific target workqueue.  The only
 * exception is work items which belong to workqueues with a rescuer which
 * will be explained in rescuer_thread().
 *
 * Return: 0
 */
static int worker_thread(void *__worker)
{
        struct worker *worker = __worker;
        struct worker_pool *pool = worker->pool;

        /* tell the scheduler that this is a workqueue worker */
        worker->task->flags |= PF_WQ_WORKER;
woke_up:
        spin_lock_irq(&pool->lock);

        /* am I supposed to die? */
        if (unlikely(worker->flags & WORKER_DIE)) {
                spin_unlock_irq(&pool->lock);
                WARN_ON_ONCE(!list_empty(&worker->entry));
                worker->task->flags &= ~PF_WQ_WORKER;

                set_task_comm(worker->task, "kworker/dying");
                ida_simple_remove(&pool->worker_ida, worker->id);
                worker_detach_from_pool(worker, pool);
                kfree(worker);
                return 0;
        }

        worker_leave_idle(worker);
recheck:
        /* no more worker necessary? */
        if (!need_more_worker(pool))
                goto sleep;

        /* do we need to manage? */
        if (unlikely(!may_start_working(pool)) && manage_workers(worker))
                goto recheck;

        /*
         * ->scheduled list can only be filled while a worker is
         * preparing to process a work or actually processing it.
         * Make sure nobody diddled with it while I was sleeping.
         */
        WARN_ON_ONCE(!list_empty(&worker->scheduled));
  • 코드 라인 19에서 스케줄러로 하여금 이 태스크가 워커 스레드인 것을 알 수 있게 PF_WQ_WORKER 플래그를 설정한다.
  • 코드 라인 24~34에서 워커 스레드의 종료 요청 플래그가 설정된 경우 워커를 워커풀에서 제거하고 할당 해제 후 종료한다.
  • 코드 라인 36에서 워커 스레드를 idle 상태로 둔다.
  • 코드 라인 39~40에서 추가 적인 워커가 필요하지 않으면 sleep으로 이동한다.
    • 동작중인 워커가 하나도 없으면 더 이상 필요한 워커가 없다고 판단한다.
  • 코드 라인 43~44에서 워커풀에 idle 워커가 없으면 워커를 생성한다. 만일 워커를 생성하기 위한 락 획득 시도가 실패하는 경우 다시 recheck 레이블로 이동하여 다시 시도한다.

 

        /*
         * Finish PREP stage.  We're guaranteed to have at least one idle
         * worker or that someone else has already assumed the manager
         * role.  This is where @worker starts participating in concurrency
         * management if applicable and concurrency management is restored
         * after being rebound.  See rebind_workers() for details.
         */
        worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);

        do {
                struct work_struct *work =
                        list_first_entry(&pool->worklist,
                                         struct work_struct, entry);

                if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
                        /* optimization path, not strictly necessary */
                        process_one_work(worker, work);
                        if (unlikely(!list_empty(&worker->scheduled)))
                                process_scheduled_works(worker);
                } else {
                        move_linked_works(work, &worker->scheduled, NULL);
                        process_scheduled_works(worker);
                }
        } while (keep_working(pool));

        worker_set_flags(worker, WORKER_PREP);
sleep:
        /*
         * pool->lock is held and there's no work to process and no need to
         * manage, sleep.  Workers are woken up only while holding
         * pool->lock or from local cpu, so setting the current state
         * before releasing pool->lock is enough to prevent losing any
         * event.
         */
        worker_enter_idle(worker);
        __set_current_state(TASK_INTERRUPTIBLE);
        spin_unlock_irq(&pool->lock);
        schedule();
        goto woke_up;
}
  • 코드 라인 8에서 워커에서 WORKER_PREP 및 WORKER_REBOUND  플래그를 제거한다.
  • 코드 라인 10~13에서 워커풀에 있는 워크리스트에서 처리할 워크를 가져온다.
  • 코드 라인 15~19에서 높은 확률로 워크에서 linked라는 플래그가 없는 경우 하나의 워크를 처리한다. 그 후 낮은 확률로 워커의 scheduled리스트에 있는 워크도 모두 처리한다.
  • 코드 라인 20~23에서 lined라는 플래그가 있는 경우 다음 연결된 워크를 모두 처리한다.
  • 코드 라인 24에서 워커풀의 워크리스트가 다 처리되어 empty될 때까지 루프를 반복한다.
  • 코드 라인 26에서 워커를 다시 prep 상태로 변경한다.
  • 코드 라인 35~36에서 워커를 idle 상태로 설정하고 태스크 상태는 슬립 중 wakeup할 수 있도록 인터럽터블로 변경한다.
  • 코드 라인 38~39에서 스케줄 함수를 통해 슬립한다. 그 후 깨어나면 다시 처음 루틴부터 시작하도록 woke_up 레이블로 이동한다.

 

다음 그림은 워커 스레드가 워크를 처리하는 과정을 보여준다.

 

manage_workers()

kernel/workqueue.c

/**
 * manage_workers - manage worker pool
 * @worker: self
 *
 * Assume the manager role and manage the worker pool @worker belongs
 * to.  At any given time, there can be only zero or one manager per
 * pool.  The exclusion is handled automatically by this function.
 *
 * The caller can safely start processing works on false return.  On
 * true return, it's guaranteed that need_to_create_worker() is false
 * and may_start_working() is true.
 *
 * CONTEXT:             
 * spin_lock_irq(pool->lock) which may be released and regrabbed
 * multiple times.  Does GFP_KERNEL allocations.
 *
 * Return:
 * %false if the pool doesn't need management and the caller can safely
 * start processing works, %true if management function was performed and
 * the conditions that the caller verified before calling the function may
 * no longer be true.
 */
static bool manage_workers(struct worker *worker)
{
        struct worker_pool *pool = worker->pool;

        /*
         * Anyone who successfully grabs manager_arb wins the arbitration
         * and becomes the manager.  mutex_trylock() on pool->manager_arb
         * failure while holding pool->lock reliably indicates that someone
         * else is managing the pool and the worker which failed trylock
         * can proceed to executing work items.  This means that anyone
         * grabbing manager_arb is responsible for actually performing
         * manager duties.  If manager_arb is grabbed and released without
         * actual management, the pool may stall indefinitely.
         */
        if (!mutex_trylock(&pool->manager_arb))
                return false;

        maybe_create_worker(pool);

        mutex_unlock(&pool->manager_arb);
        return true;
}

필요한 만큼 요청한 워커풀에 워커를 생성한다. 만일 워커를 생성하기 위해 락 획득 시도가 실패하는 경우 false를 반환한다.

 

maybe_create_worker()

kernel/workqueue.c

/**
 * maybe_create_worker - create a new worker if necessary
 * @pool: pool to create a new worker for
 *
 * Create a new worker for @pool if necessary.  @pool is guaranteed to
 * have at least one idle worker on return from this function.  If
 * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
 * sent to all rescuers with works scheduled on @pool to resolve
 * possible allocation deadlock.
 *
 * On return, need_to_create_worker() is guaranteed to be %false and
 * may_start_working() %true.
 *
 * LOCKING:
 * spin_lock_irq(pool->lock) which may be released and regrabbed
 * multiple times.  Does GFP_KERNEL allocations.  Called only from
 * manager.
 */
static void maybe_create_worker(struct worker_pool *pool)
__releases(&pool->lock)
__acquires(&pool->lock)
{
restart:
        spin_unlock_irq(&pool->lock);

        /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
        mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);

        while (true) {
                if (create_worker(pool) || !need_to_create_worker(pool))
                        break;

                schedule_timeout_interruptible(CREATE_COOLDOWN);

                if (!need_to_create_worker(pool))
                        break;
        }

        del_timer_sync(&pool->mayday_timer);
        spin_lock_irq(&pool->lock);
        /*
         * This is necessary even after a new worker was just successfully
         * created as @pool->lock was dropped and the new worker might have
         * already become busy.
         */
        if (need_to_create_worker(pool))
                goto restart;
}

필요한 만큼 요청한 워커풀에 워커를 생성한다.

  • 코드 라인 27에서 mayday 타이머를 현재 시각 기준으로 만료 시각을 다시 조정한다.
  • 코드 라인 29~37에서 반복하며 필요한 수 만큼 워커를 생성한다.
  • 코드 라인 39에서 mayday 타이머를 제거한다.
  • 코드 라인 46~47에서 다시 한 번 워커를 추가해야 하는지 확인하여 재시도할지 결정한다.

 

need_to_create_worker()

kernel/workqueue.c

/* Do we need a new worker?  Called from manager. */
static bool need_to_create_worker(struct worker_pool *pool)
{
        return need_more_worker(pool) && !may_start_working(pool);
}

요청한 워커풀에 워커가 더 필요하지만 준비된 idle 워커가 없어서 곧바로 동작할 수 없는 경우 워커를 만들 필요가 있는지 여부를 반환한다.

 

연결된(스케줄된) 워크들 처리

move_linked_works()

kernel/workqueue.c

/**                               
 * move_linked_works - move linked works to a list
 * @work: start of series of works to be scheduled
 * @head: target list to append @work to
 * @nextp: out paramter for nested worklist walking
 *
 * Schedule linked works starting from @work to @head.  Work series to
 * be scheduled starts at @work and includes any consecutive work with
 * WORK_STRUCT_LINKED set in its predecessor.
 *      
 * If @nextp is not NULL, it's updated to point to the next work of
 * the last scheduled work.  This allows move_linked_works() to be
 * nested inside outer list_for_each_entry_safe().
 *
 * CONTEXT:
 * spin_lock_irq(pool->lock).
 */
static void move_linked_works(struct work_struct *work, struct list_head *head,
                              struct work_struct **nextp)
{       
        struct work_struct *n;

        /*
         * Linked worklist will always end before the end of the list,
         * use NULL for list head.
         */
        list_for_each_entry_safe_from(work, n, NULL, entry) {
                list_move_tail(&work->entry, head);
                if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
                        break;
        }

        /*
         * If we're already inside safe list traversal and have moved
         * multiple works to the scheduled queue, the next position
         * needs to be updated.
         */
        if (nextp)
                *nextp = n;
}

연결된 워크들을 두 번째 head 리스트로 옮긴다.

  • 코드 라인 27~31에서 워크들을 순회하며 head로 옮긴다. 만일 워크에 linked 속성 플래그가 없는 경우 루프를 벗어난다.
  • 코드 라인 38~39에서 출력 인수 nextp에 처리된 워크를 반환한다.

 

process_scheduled_works()

kernel/workqueue.c

/**
 * process_scheduled_works - process scheduled works
 * @worker: self
 *
 * Process all scheduled works.  Please note that the scheduled list
 * may change while processing a work, so this function repeatedly
 * fetches a work from the top and executes it.
 *
 * CONTEXT:
 * spin_lock_irq(pool->lock) which may be released and regrabbed
 * multiple times.
 */
static void process_scheduled_works(struct worker *worker)
{
        while (!list_empty(&worker->scheduled)) {
                struct work_struct *work = list_first_entry(&worker->scheduled,
                                                struct work_struct, entry);
                process_one_work(worker, work);
        }
}

모든 스케줄된 워크를 처리한다.

  • 코드 라인 15~17에서 워커의 스케줄드 리스트에 있는 워크들 수 만큼 순회하며 첫 워크를 가져온다.
  • 코드 라인 18에서 가져온 워크를 처리하낟.

 

하나의 워크 처리

process_one_work()

워커에서 워크를 하나 처리한다.

kernel/workqueue.c – 1/4

/**
 * process_one_work - process single work
 * @worker: self
 * @work: work to process
 *
 * Process @work.  This function contains all the logics necessary to
 * process a single work including synchronization against and
 * interaction with other workers on the same cpu, queueing and
 * flushing.  As long as context requirement is met, any worker can
 * call this function to process a work.
 *
 * CONTEXT:
 * spin_lock_irq(pool->lock) which is released and regrabbed.
 */
static void process_one_work(struct worker *worker, struct work_struct *work)
__releases(&pool->lock)
__acquires(&pool->lock)
{
        struct pool_workqueue *pwq = get_work_pwq(work);
        struct worker_pool *pool = worker->pool;
        bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;
        int work_color;
        struct worker *collision;
#ifdef CONFIG_LOCKDEP
        /*
         * It is permissible to free the struct work_struct from
         * inside the function that is called from it, this we need to
         * take into account for lockdep too.  To avoid bogus "held
         * lock freed" warnings as well as problems when looking into
         * work->lockdep_map, make a copy and use that here.
         */
        struct lockdep_map lockdep_map;

        lockdep_copy_map(&lockdep_map, &work->lockdep_map);
#endif
        /* ensure we're on the correct CPU */
        WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
                     raw_smp_processor_id() != pool->cpu);
  • 코드 라인 21에서 워크큐에 cpu intensive 플래그가 사용되었는지 여부를 알아온다.

 

kernel/workqueue.c – 2/4

.       /*
         * A single work shouldn't be executed concurrently by
         * multiple workers on a single cpu.  Check whether anyone is
         * already processing the work.  If so, defer the work to the
         * currently executing one.
         */
        collision = find_worker_executing_work(pool, work);
        if (unlikely(collision)) {
                move_linked_works(work, &collision->scheduled, NULL);
                return;
        }

        /* claim and dequeue */
        debug_work_deactivate(work);
        hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
        worker->current_work = work;
        worker->current_func = work->func;
        worker->current_pwq = pwq;
        work_color = get_work_color(work);

        list_del_init(&work->entry);

        /*
         * CPU intensive works don't participate in concurrency management.
         * They're the scheduler's responsibility.  This takes @worker out
         * of concurrency management and the next code block will chain
         * execution of the pending work items.
         */
        if (unlikely(cpu_intensive))
                worker_set_flags(worker, WORKER_CPU_INTENSIVE);

        /*
         * Wake up another worker if necessary.  The condition is always
         * false for normal per-cpu workers since nr_running would always
         * be >= 1 at this point.  This is used to chain execution of the
         * pending work items for WORKER_NOT_RUNNING workers such as the
         * UNBOUND and CPU_INTENSIVE ones.
         */
        if (need_more_worker(pool))
                wake_up_worker(pool);
  • 코드 라인 7~11에서 워터풀내에서 요청한 워크가 이미 처리중인 경우 처리중인 워커의 스케줄드 리스트에 추가하고 함수를 빠져나간다.
    • 같은 워크 요청은 하나의 cpu에서 여러 개의 워커에서 동작못하게 막는다.
  • 코드 라인 15~18에서 busy_hash에 work를 키로 인덱스를 결정하고 워커를 추가한다. 추가할 때 워커에 처리중인 함수와 워크 및 풀워크큐를 지정한다.
  • 코드 라인 19에서 현재 작업의 워크 컬러를 알아온다.
  • 코드 라인 21에서 워크를 기존 리스트에서 제거한다.
  • 코드 라인 29~30에서 cpu intensive 워크큐를 이용하는 경우 워커에 cpu intensive 플래그를 설정한다.
  • 코드 라인 40~41에서 워커가 더 필요한 경우 대기중인 첫 번째 idle 워커를 깨운다.

 

kernel/workqueue.c – 3/4

.       /*
         * Record the last pool and clear PENDING which should be the last
         * update to @work.  Also, do this inside @pool->lock so that
         * PENDING and queued state changes happen together while IRQ is
         * disabled.
         */
        set_work_pool_and_clear_pending(work, pool->id);

        spin_unlock_irq(&pool->lock);

        lock_map_acquire_read(&pwq->wq->lockdep_map);
        lock_map_acquire(&lockdep_map);
        trace_workqueue_execute_start(work);
        worker->current_func(work);
        /*
         * While we must be careful to not use "work" after this, the trace
         * point will only record its address.
         */
        trace_workqueue_execute_end(work);
        lock_map_release(&lockdep_map);
        lock_map_release(&pwq->wq->lockdep_map);

        if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
                pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
                       "     last function: %pf\n",
                       current->comm, preempt_count(), task_pid_nr(current),
                       worker->current_func);
                debug_show_held_locks(current);
                dump_stack();
        }
  • 코드 라인 7에서 워크에 워커풀 id를 기록하고 pending 비트를 포함한 나머지 플래그들도 모두 지운다.
  • 코드 라인 14에서 워커에 지정된 처리 함수를 호출한다. (워크에 지정된 함수)
  • 코드 라인 23~30에서 PREEMPT_ACTIVE를 뺀 preempt 카운터가 0이 아니면 즉, preempt가 disable된 경우 에러 메시지를 출력하고 스택을 덤프한다.

 

kernel/workqueue.c – 4/4

        /*
         * The following prevents a kworker from hogging CPU on !PREEMPT
         * kernels, where a requeueing work item waiting for something to
         * happen could deadlock with stop_machine as such work item could
         * indefinitely requeue itself while all other CPUs are trapped in
         * stop_machine. At the same time, report a quiescent RCU state so
         * the same condition doesn't freeze RCU.
         */
        cond_resched_rcu_qs();

        spin_lock_irq(&pool->lock);

        /* clear cpu intensive status */
        if (unlikely(cpu_intensive))
                worker_clr_flags(worker, WORKER_CPU_INTENSIVE);

        /* we're done with it, release */
        hash_del(&worker->hentry);
        worker->current_work = NULL;
        worker->current_func = NULL;
        worker->current_pwq = NULL;
        worker->desc_valid = false;
        pwq_dec_nr_in_flight(pwq, work_color);
}
  • 코드 라인 9에서 리스케줄 요청이 있는 경우 스케줄을 진행하고 rcu quiesecent state를 등록한다.
  • 코드 라인 14에서 워커에서 cpu intensive 플래그를 제거한다.
  • 코드 라인 17에서 busy_hash에서 워커를 제거한다.
  • 코드 라인 18~21에서 워커에 설정한 워크 정보를 초기화한다.
  • 코드 라인 22에서 워크 컬러에 해당하는 현재 처리 중인 워크 수를 1 감소시킨다.

 

워크

static 워크 생성

컴파일 타임 워크 생성 및 초기화 매크로

  • DECLARE_WORK()

 

컴파일 타임 기타 선언 및 초기 매크로

  • DECLARE_DELAYED_WORK()
  • DECLARE_DEFERABLE_WORK()
  • INIT_WORK_ONSTACK()
  • INIT_DELAYED_WORK()
  • INIT_DELAYED_WORK_ONSTACK()
  • INIT_DEFERRABLE_WORK()
  • INIT_DEFERRABLE_WORK_ONSTACK()

 

DECLARE_WORK()

include/linux/workqueue.h

#define DECLARE_WORK(n, f)                                              \
        struct work_struct n = __WORK_INITIALIZER(n, f)

컴파일 타임에 워크를 static 하게 선언하고 워크 함수를 지정한다.

 

 

#define __WORK_INITIALIZER(n, f) {                                      \
        .data = WORK_DATA_STATIC_INIT(),                                \
        .entry  = { &(n).entry, &(n).entry },                           \
        .func = (f),                                                    \
        __WORK_INIT_LOCKDEP_MAP(#n, &(n))                               \
        }

워크 구조체의 주요 멤버를 설정하고 초기화한다.

  • data 멤버의 초기 설정 값으로 아직 워크풀이 지정되지 않았고 컴파일 타임에 static하게 생성되었음을 알린다.
  • entry 멤버는 아직 워크풀에 등록되지 않았으므로 자기 자신을 가리키게 초기화한다.
  • func 멤버는 실행될 함수를 지정한다.

 

#define WORK_DATA_STATIC_INIT() \
        ATOMIC_LONG_INIT(WORK_STRUCT_NO_POOL | WORK_STRUCT_STATIC)

워크의 data 멤버의 초기 설정 값으로 아직 워크풀이 지정되지 않았고 컴파일 타임에 static하게 생성되었음을 알린다.

  • WORK_STRUCT_NO_POOL 값은 플래그들이 있는 lsb 몇 비트를 제외하고 pool id에 해당하는 모든 비트가 1로 설정된다.
  • WORK_STRUCT_STATIC 플래그는 CONFIG_DEBUG_OBJECT 커널 옵션을 사용하는 경우에만 bit4에 해당하는 플래그가 추가되고 1로 설정된다.

 

dynamic 워크 생성

동적으로 사용되며 기존 워크 구조체를 초기화한다.

  • INIT_WORK()

 

INIT_WORK()

include/linux/workqueue.h

#define INIT_WORK(_work, _func)                                         \
        __INIT_WORK((_work), (_func), 0)

요청한 워크에 워크 함수를 지정하고 초기화한다.  (런타임에 사용)

 

 

#define __INIT_WORK(_work, _func, _onstack)                             \
        do {                                                            \
                __init_work((_work), _onstack);                         \
                (_work)->data = (atomic_long_t) WORK_DATA_INIT();       \
                INIT_LIST_HEAD(&(_work)->entry);                        \
                (_work)->func = (_func);                                \
        } while (0)
#endif

런타임에 요청한 워크에 함수를 지정하고 초기화한다.

  • data 멤버에는초기 설정 값으로 아직 워크풀이 지정되지 않았음을 알린다.
  • entry 멤버는 아직 워크풀에 등록되지 않았으므로 자기 자신을 가리키게 초기화한다.
  • func 멤버는 실행될 함수를 지정한다.

 

#define WORK_DATA_INIT()        ATOMIC_LONG_INIT(WORK_STRUCT_NO_POOL)

워크의 data 멤버 초기값으로 pool id 필드를 WORK_STRUCT_NO_POOL로 설정하여 아직 워크풀이 지정되지 않았음을 알린다.

  • WORK_STRUCT_NO_POOL 값은 플래그들이 있는 lsb 몇 비트를 제외하고 pool id에 해당하는 모든 비트가 1로 설정된다.

 

워크를 워크큐에 엔큐

queue_work()

include/linux/workqueue.h

/**
 * queue_work - queue work on a workqueue
 * @wq: workqueue to use
 * @work: work to queue
 *
 * Returns %false if @work was already on a queue, %true otherwise.
 *
 * We queue the work to the CPU on which it was submitted, but if the CPU dies
 * it can be processed by another CPU.
 */
static inline bool queue_work(struct workqueue_struct *wq,
                              struct work_struct *work)
{
        return queue_work_on(WORK_CPU_UNBOUND, wq, work);
}

워크큐에 워크를 엔큐한다. 가능하면 현재 cpu에 작업을 시키도록 요청한다. 이미 워크큐에 등록되어 있는 상태이면 실패를 반환한다.

 

queue_work_on()

kernel/workqueue.c

/**
 * queue_work_on - queue work on specific cpu
 * @cpu: CPU number to execute work on
 * @wq: workqueue to use
 * @work: work to queue
 *
 * We queue the work to a specific CPU, the caller must ensure it
 * can't go away.
 *
 * Return: %false if @work was already on a queue, %true otherwise.
 */
bool queue_work_on(int cpu, struct workqueue_struct *wq,
                   struct work_struct *work)
{
        bool ret = false;
        unsigned long flags;

        local_irq_save(flags);

        if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
                __queue_work(cpu, wq, work);
                ret = true;   
        }
        
        local_irq_restore(flags);
        return ret;
}
EXPORT_SYMBOL(queue_work_on);

워크가 이미 등록된 상태가 아니면 워크큐에 워크를 엔큐한다. 가능하면 현재 cpu에 작업을 시키도록 요청한다. 이미 워크큐에 등록되어 있는 상태이면 실패를 반환한다.

  • 워크에 pending 플래그를 보고 워크가 이미 워크큐에 엔큐되어 있다는 것을 알 수 있다.

 

__queue_work()

워크를 워크큐에 엔큐한다.

kernel/workqueue.c – 1/2

static void __queue_work(int cpu, struct workqueue_struct *wq,
                         struct work_struct *work)
{
        struct pool_workqueue *pwq;
        struct worker_pool *last_pool;
        struct list_head *worklist;
        unsigned int work_flags;
        unsigned int req_cpu = cpu;

        /*
         * While a work item is PENDING && off queue, a task trying to
         * steal the PENDING will busy-loop waiting for it to either get
         * queued or lose PENDING.  Grabbing PENDING and queueing should
         * happen with IRQ disabled.
         */
        WARN_ON_ONCE(!irqs_disabled());

        debug_work_activate(work);

        /* if draining, only works from the same workqueue are allowed */
        if (unlikely(wq->flags & __WQ_DRAINING) &&
            WARN_ON_ONCE(!is_chained_work(wq)))
                return;
retry:
        if (req_cpu == WORK_CPU_UNBOUND)
                cpu = raw_smp_processor_id();

        /* pwq which will be used unless @work is executing elsewhere */
        if (!(wq->flags & WQ_UNBOUND))
                pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
        else
                pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));

        /*
         * If @work was previously on a different pool, it might still be
         * running there, in which case the work needs to be queued on that
         * pool to guarantee non-reentrancy.
         */
        last_pool = get_work_pool(work);
        if (last_pool && last_pool != pwq->pool) {
                struct worker *worker;

                spin_lock(&last_pool->lock);

                worker = find_worker_executing_work(last_pool, work);

                if (worker && worker->current_pwq->wq == wq) {
                        pwq = worker->current_pwq;
                } else {
                        /* meh... not running there, queue here */
                        spin_unlock(&last_pool->lock);
                        spin_lock(&pwq->pool->lock);
                }
        } else {
                spin_lock(&pwq->pool->lock);
        }
  • 코드 라인 21~23에서 draining 플래그가 있는 워크큐이면서 워크큐의 워커가 현재 실행중이 아니면 경고 메시지를 출력하고 함수를 빠져나간다.
  • 코드 라인 25~26에서 WORK_CPU_UNBOUND cpu로 요청한 경우 일단 현재 cpu로 설정한다.
  • 코드 라인 29~32에서 unbound 워크큐가 아닌 경우 요청 cpu에 대한 풀워크큐를 알아온다. unbound 워크큐인 경우 현재 cpu 노드의 풀워크큐를 알아온다.
    • wq->cpu_pwqs:
      • cpu 바운드 워크큐인 경우 cpu별로 사용
    •  wq->numa_pwq_tbl[node]:
      • unbound 워크큐이면 노드별로 사용
  • 코드 라인 39~45에서 워크가 마지막으로 동작했었던 워커풀이 조금전에 알아온 풀워크큐가 가리키는 워커풀과 다른 경우 워크가 동작했었던 마지막 풀에서 요청한 워크가 현재 동작중인 워커를 알아온다.
  • 코드 라인 47~53에서 워커가 현재 동작중인 풀워크큐의 워커인 경우 워커에서 현재 동작하는 풀워크큐를 그대로 사용한다.
    • 워크의 재진입을 허용하지 않게 하는 것을 보장하기 위해 기존 워크가 수행되었던 워커의 풀워크큐를 그대로 사용한다.
    • 워크가 이미 동작 중인 경우 다른 cpu를 사용하는 워커풀로 전달되지 않도록 현재 동작 중인 워커가 있는 워커풀로 워크를 보낸다. 동일 워크들은 절대 동시 처리되지 않게 한다.

 

kernel/workqueue.c – 2/2

        /*
         * pwq is determined and locked.  For unbound pools, we could have
         * raced with pwq release and it could already be dead.  If its
         * refcnt is zero, repeat pwq selection.  Note that pwqs never die
         * without another pwq replacing it in the numa_pwq_tbl or while
         * work items are executing on it, so the retrying is guaranteed to
         * make forward-progress.
         */
        if (unlikely(!pwq->refcnt)) {
                if (wq->flags & WQ_UNBOUND) {
                        spin_unlock(&pwq->pool->lock);
                        cpu_relax();
                        goto retry;
                }
                /* oops */
                WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
                          wq->name, cpu);
        }

        /* pwq determined, queue */
        trace_workqueue_queue_work(req_cpu, pwq, work);

        if (WARN_ON(!list_empty(&work->entry))) {
                spin_unlock(&pwq->pool->lock);
                return;
        }

        pwq->nr_in_flight[pwq->work_color]++;
        work_flags = work_color_to_flags(pwq->work_color);

        if (likely(pwq->nr_active < pwq->max_active)) {
                trace_workqueue_activate_work(work);
                pwq->nr_active++;
                worklist = &pwq->pool->worklist;
        } else {
                work_flags |= WORK_STRUCT_DELAYED;
                worklist = &pwq->delayed_works;
        }

        insert_work(pwq, work, worklist, work_flags);

        spin_unlock(&pwq->pool->lock);
}
  • 코드 라인 9~18에서 풀워크큐의 참조 카운터가 0인 경우 경고 메시지를 출력한다. 단 unbound 워크큐인경우 retry 한다.
    • 풀워크큐가 race 상황에서 release되었을 수도 있다. 그런 경우 다시 풀워크큐를 알아온다.
  • 코드 라인 23~26에서 워크가 아무 곳에도 등록되지 않은 경우 함수를 빠져나간다.
  • 코드 라인 28~29에서 워크 컬러에 해당하는 처리중인 워크 수를 증가시키고 워크 컬러 값을 플래그 값으로 변환한다.
  • 코드 라인 31~40에서 풀워크큐에 등록된 워크의 수가 최대 제한에 걸리지 않았으면 워커 수 카운터인 nr_active를 1 증가시키고 풀워크큐가 가리키는 워커풀의 워크리스트에 워크를 추가한다.  만일 최대 제한을 초과한 경우 워크에 delay 플래그를 추가한 후 풀워크큐의 delayed_works 리스트에 추가한다.
    • 최대 제한 수는 512이며 unbound 워크큐인 경우 512와 cpu*4 중 큰 수를 사용한다.

 

insert_work()

kernel/workqueue.c

/**
 * insert_work - insert a work into a pool
 * @pwq: pwq @work belongs to
 * @work: work to insert
 * @head: insertion point
 * @extra_flags: extra WORK_STRUCT_* flags to set
 *
 * Insert @work which belongs to @pwq after @head.  @extra_flags is or'd to
 * work_struct flags.
 *
 * CONTEXT:
 * spin_lock_irq(pool->lock).
 */
static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
                        struct list_head *head, unsigned int extra_flags)
{
        struct worker_pool *pool = pwq->pool;

        /* we own @work, set data and link */
        set_work_pwq(work, pwq, extra_flags);
        list_add_tail(&work->entry, head);
        get_pwq(pwq);

        /*
         * Ensure either wq_worker_sleeping() sees the above
         * list_add_tail() or we see zero nr_running to avoid workers lying
         * around lazily while there are works to be processed.
         */
        smp_mb();

        if (__need_more_worker(pool))
                wake_up_worker(pool);
}

요청한 리스트에 워크를 추가한다.

  • 코드 라인 20에서 워크 데이터가 풀워크큐를 가리키게 하고 요청한 플래그들을 추가한다.
  • 코드 라인 21에서 요청한 리스트의 후미에 워크를 추가한다.
  • 코드 라인 22에서 풀워크큐의 참조카운터를 1 증가시킨다.
  • 코드 라인 31~32에서 워커풀에서 동작중인 워커가 없는 경우 idle 워커를 깨운다.

 

set_work_pwq()

kernel/workqueue.c

static void set_work_pwq(struct work_struct *work, struct pool_workqueue *pwq,
                         unsigned long extra_flags)
{
        set_work_data(work, (unsigned long)pwq,
                      WORK_STRUCT_PENDING | WORK_STRUCT_PWQ | extra_flags);
}

워크에 풀워크큐를 지정하고 요청한 플래그 이외에도 기본적으로 WORK_STRUCT_PENDING 및 WORK_STRUCT_PWQ 를 추가한다.

 

set_work_data()

kernel/workqueue.c

/*
 * While queued, %WORK_STRUCT_PWQ is set and non flag bits of a work's data
 * contain the pointer to the queued pwq.  Once execution starts, the flag
 * is cleared and the high bits contain OFFQ flags and pool ID.
 *
 * set_work_pwq(), set_work_pool_and_clear_pending(), mark_work_canceling()
 * and clear_work_data() can be used to set the pwq, pool or clear
 * work->data.  These functions should only be called while the work is
 * owned - ie. while the PENDING bit is set.
 *
 * get_work_pool() and get_work_pwq() can be used to obtain the pool or pwq
 * corresponding to a work.  Pool is available once the work has been
 * queued anywhere after initialization until it is sync canceled.  pwq is
 * available only while the work item is queued.
 *
 * %WORK_OFFQ_CANCELING is used to mark a work item which is being
 * canceled.  While being canceled, a work item may have its PENDING set
 * but stay off timer and worklist for arbitrarily long and nobody should
 * try to steal the PENDING bit.
 */
static inline void set_work_data(struct work_struct *work, unsigned long data,
                                 unsigned long flags)
{
        WARN_ON_ONCE(!work_pending(work));
        atomic_long_set(&work->data, data | flags | work_static(work));
}

워크에 data(풀워크큐 또는 pool id)와 플래그를 더해 설정한다.

 

구조체

workqueue_struct 구조체

kernel/workqueue.c

/*
 * The externally visible workqueue.  It relays the issued work items to
 * the appropriate worker_pool through its pool_workqueues.
 */
struct workqueue_struct {
        struct list_head        pwqs;           /* WR: all pwqs of this wq */
        struct list_head        list;           /* PL: list of all workqueues */

        struct mutex            mutex;          /* protects this wq */
        int                     work_color;     /* WQ: current work color */
        int                     flush_color;    /* WQ: current flush color */
        atomic_t                nr_pwqs_to_flush; /* flush in progress */
        struct wq_flusher       *first_flusher; /* WQ: first flusher */
        struct list_head        flusher_queue;  /* WQ: flush waiters */
        struct list_head        flusher_overflow; /* WQ: flush overflow list */

        struct list_head        maydays;        /* MD: pwqs requesting rescue */
        struct worker           *rescuer;       /* I: rescue worker */

        int                     nr_drainers;    /* WQ: drain in progress */
        int                     saved_max_active; /* WQ: saved pwq max_active */

        struct workqueue_attrs  *unbound_attrs; /* WQ: only for unbound wqs */
        struct pool_workqueue   *dfl_pwq;       /* WQ: only for unbound wqs */

#ifdef CONFIG_SYSFS
        struct wq_device        *wq_dev;        /* I: for sysfs interface */
#endif
#ifdef CONFIG_LOCKDEP
        struct lockdep_map      lockdep_map;
#endif
        char                    name[WQ_NAME_LEN]; /* I: workqueue name */

        /* hot fields used during command issue, aligned to cacheline */
        unsigned int            flags ____cacheline_aligned; /* WQ: WQ_* flags */
        struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
        struct pool_workqueue __rcu *numa_pwq_tbl[]; /* FR: unbound pwqs indexed by node */
};
  • pwqs
    • 워크큐에 소속된 풀워크큐들
  • list
    • 모든 워크큐가 전역 workqueues 리스트에 연결될 떄 사용하는 list 노드이다.
  • work_color
    • 현재 워크 컬러
  • flush_color
    • 현재 플러시 컬러
  • nr_pwqs_to_flush
    • 플러시될 풀워크큐 수로 플러시가 진행중일 때 사용된다.
  • *first_flusher
    • 처음 플러셔(플러시 요청)
  • flusher_queue
    • 플러셔 리스트에 플러시 요청이 쌓이며 하나씩 처리하기 위해 first_flusher로 옮긴다.
    • 플러시가 완료되면 first_flusher는 null이 되고 flusher_queue 리스트도 비게된다.
  • flusher_overflow
    • 플러시 오버플로 리스트로 플러시 컬러 공간이 부족할 때 플러시 요청을 이 리스트에 추가한다.
  • maydays
    • 구조 요청한 풀워크큐 리스트
  • nr_drainers
    • 진행중인 drainer 수로 워크큐를 비워달라고 요청받으면 1 증가되고 드레이닝이 시작하고 완료되면 1 감소된다.
  • saved_max_active
    • 등록할 수 있는 최대 active 워크 수로 이 수를 초과하는 워크의 경우 워커풀로 배포하지 않고 풀워크큐의 delayed_works 리스트에서 대기하게 한다.
  • *unbound_attrs
    • 언바운드 워크큐 속성
  • *dfl_pwq
    • 디폴트 언바운드 풀워크큐를 가리키며 cpu on/off 시 numa_pwq_tbl[]의 이용이 힘들 때 잠시 fall-back용으로 사용한다.
  • wq_dev
    • sysfs 인터페이스
  • name[]
    • 워크큐명
  • flags
    • 플래그들
  • *cpu_pwqs
    • per-cpu 풀워크큐들
  • *numa_pwq_tbl[]
    • 노드별 언바운드 풀워크큐

 

workqueue_attrs 구조체

include/linux/workqueue.h

/*      
 * A struct for workqueue attributes.  This can be used to change
 * attributes of an unbound workqueue.
 *      
 * Unlike other fields, ->no_numa isn't a property of a worker_pool.  It
 * only modifies how apply_workqueue_attrs() select pools and thus doesn't
 * participate in pool hash calculations or equality comparisons.
 */
struct workqueue_attrs {
        int                     nice;           /* nice level */
        cpumask_var_t           cpumask;        /* allowed CPUs */
        bool                    no_numa;        /* disable NUMA affinity */
};
  • nice
    • nice 우선순위
  • cpumask
    • 허락된 cpu들
  • no_numa
    • NUMA 노드 정보를 disable

 

wq_flusher 구조체

kernel/workqueue.c

/*
 * Structure used to wait for workqueue flush.
 */
struct wq_flusher {
        struct list_head        list;           /* WQ: list of flushers */
        int                     flush_color;    /* WQ: flush color waiting for */
        struct completion       done;           /* flush completion */
};
  • list
    • 플러셔(플러시 요청)를 플러셔 리스트에 추가할 때 사용되는 연결 노드
  • flush_color
    • 플러시 컬러
  • done
    • 플러시 완료(completion) 대기를 위해 사용

 

pool_workqueue 구조체

kernel/workqueue.c

/*
 * The per-pool workqueue.  While queued, the lower WORK_STRUCT_FLAG_BITS
 * of work_struct->data are used for flags and the remaining high bits
 * point to the pwq; thus, pwqs need to be aligned at two's power of the
 * number of flag bits.
 */
struct pool_workqueue {
        struct worker_pool      *pool;          /* I: the associated pool */
        struct workqueue_struct *wq;            /* I: the owning workqueue */
        int                     work_color;     /* L: current color */
        int                     flush_color;    /* L: flushing color */
        int                     refcnt;         /* L: reference count */
        int                     nr_in_flight[WORK_NR_COLORS];
                                                /* L: nr of in_flight works */
        int                     nr_active;      /* L: nr of active works */
        int                     max_active;     /* L: max active works */
        struct list_head        delayed_works;  /* L: delayed works */
        struct list_head        pwqs_node;      /* WR: node on wq->pwqs */
        struct list_head        mayday_node;    /* MD: node on wq->maydays */

        /*
         * Release of unbound pwq is punted to system_wq.  See put_pwq()
         * and pwq_unbound_release_workfn() for details.  pool_workqueue
         * itself is also sched-RCU protected so that the first pwq can be
         * determined without grabbing wq->mutex.
         */
        struct work_struct      unbound_release_work;
        struct rcu_head         rcu;
} __aligned(1 << WORK_STRUCT_FLAG_BITS);
  • *pool
    • 연결된 워커풀
  • *wq
    • 워크큐
  • work_color
    • 워크 컬러로 새 워크마다 이 컬러의 워크 컬러를 사용한다.
    • 플러시 요청이 발생한 경우 다음 워크 컬러를 선택한다. (0 ~ 14이내에서 순환 증가)
  • flush_color
    • 플러시 컬러로 플러시 요청이 있는 경우 현재 사용한 워크 컬러를 지정한다. 그리고 이 플러시 컬러에 해당하는 워크들의 작업이 끝날때까지 기다린다. (플러시)
  • refcnt
    • 참조카운터
  • nr_in_flight[]
    • 컬러별 현재 워커가 처리중인 워크 수를 담는다.
  • nr_active
    • active 워크 수
    • 지연된 워크는 포함되지 않는다.
  • max_active
    • 동시 처리 제한(최대 active 워크 수)
  • delayed_works
    • 동시 처리 제한(max_active)을 초과한 워크 요청이 대기하는 리스트이다.
    • suspend PM 기능이 동작하는 경우 모든 인입되는 워크도 이 곳에서 대기된다.
  • pwqs_node
    • 노드별 풀워크큐
  • mayday_node
    • 워크큐의 maydays 리스트에 추가될 때 연결되는 리스트 노드
  • unbound_release_work
    • 풀워크큐가 내장하여 사용하는 워크이다. 언바운드 워커풀을 제거할 때 사용한다.
    • pwq_unbound_release_workfn() 함수를 호출하여 워커풀을 제거하는데 모든 워커풀이 제거된 워크큐도 함께 제거된다.
  • rcu
    • 풀워크큐 구조체를 rcu 방식으로 풀워크큐 slab 캐시로 free할 때 사용하는 rcu 노드이다.

 

worker_pool 구조체

kernel/workqueue.c

/*
 * Structure fields follow one of the following exclusion rules.
 *
 * I: Modifiable by initialization/destruction paths and read-only for
 *    everyone else.
 *
 * P: Preemption protected.  Disabling preemption is enough and should
 *    only be modified and accessed from the local cpu.
 *
 * L: pool->lock protected.  Access with pool->lock held.
 *
 * X: During normal operation, modification requires pool->lock and should
 *    be done only from local cpu.  Either disabling preemption on local
 *    cpu or grabbing pool->lock is enough for read access.  If
 *    POOL_DISASSOCIATED is set, it's identical to L.
 *
 * A: pool->attach_mutex protected.
 *
 * PL: wq_pool_mutex protected.
 *
 * PR: wq_pool_mutex protected for writes.  Sched-RCU protected for reads.
 *
 * WQ: wq->mutex protected.
 *
 * WR: wq->mutex protected for writes.  Sched-RCU protected for reads.
 *
 * MD: wq_mayday_lock protected.
 */

/* struct worker is defined in workqueue_internal.h */

 

struct worker_pool {
        spinlock_t              lock;           /* the pool lock */
        int                     cpu;            /* I: the associated cpu */
        int                     node;           /* I: the associated node ID */
        int                     id;             /* I: pool ID */
        unsigned int            flags;          /* X: flags */

        struct list_head        worklist;       /* L: list of pending works */
        int                     nr_workers;     /* L: total number of workers */

        /* nr_idle includes the ones off idle_list for rebinding */
        int                     nr_idle;        /* L: currently idle ones */

        struct list_head        idle_list;      /* X: list of idle workers */
        struct timer_list       idle_timer;     /* L: worker idle timeout */
        struct timer_list       mayday_timer;   /* L: SOS timer for workers */

        /* a workers is either on busy_hash or idle_list, or the manager */
        DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
                                                /* L: hash of busy workers */

        /* see manage_workers() for details on the two manager mutexes */
        struct mutex            manager_arb;    /* manager arbitration */
        struct mutex            attach_mutex;   /* attach/detach exclusion */
        struct list_head        workers;        /* A: attached workers */
        struct completion       *detach_completion; /* all workers detached */

        struct ida              worker_ida;     /* worker IDs for task name */

        struct workqueue_attrs  *attrs;         /* I: worker attributes */
        struct hlist_node       hash_node;      /* PL: unbound_pool_hash node */
        int                     refcnt;         /* PL: refcnt for unbound pools */

        /*
         * The current concurrency level.  As it's likely to be accessed
         * from other CPUs during try_to_wake_up(), put it in a separate
         * cacheline.
         */
        atomic_t                nr_running ____cacheline_aligned_in_smp;

        /*
         * Destruction of pool is sched-RCU protected to allow dereferences
         * from get_work_pool().
         */
        struct rcu_head         rcu;
} ____cacheline_aligned_in_smp;
  • cpu
    • cpu 번호
  • node
    • 노드번호
  • id
    • 워커풀 id
  • flags
    • 워커풀의 플래그
  • worklist
    • 처리할 워크가 담기는 리스트
  • nr_workers
    • 워커풀에 등록된 워커 수
  • nr_idle
    • 워커풀에 대기중인 워커 수
  • idle_list
    • idle 워커 리스트
  • idle_timer
    • idle 타이머
    • 5분 간격으로 너무 많이 대기중인 워커들을 소멸시킨다.
  • mayday_timer
    • mayday 타이머
    • 0.1초 간격으로 워크들의 데드락 상황을 파악하여 rescuer_thread를 깨운다.
  • busy_hash[]
    • busy 워커들이 있는 해시 리스트
  • workers
    • 연결된 워커들 리스트
  • *detach_completion
    • 모든 워커들을 detach할 때 사용
  • worker_ida
    • 워커들 id를 발급하는 IDA 트리
  • attrs
    • 워크큐 속성(nice 및 cpumask)
    • 언바운드 워크큐는 같은 속성을 사용하는 경우 워커풀을 공유하여 사용한다.
  • hash_node
    • 언바운드 풀 해시 노드
  • refcount
    • 참조 카운터
  • nr_running
    • 워크를 처리중인 워커 수
    • 다른 cpu들에서 try_to_wake_up() 함수를 통해 atomic하게 접근되는 변수로 이용되고 캐시 라인 바이트 수만큼 정렬되어 사용된다.
  • rcu
    • 워커풀을 소멸시킬 때 rcu를 사용하여 수행한다.

 

work_struct 구조체

include/linux/workqueue.h

struct work_struct {
        atomic_long_t data;
        struct list_head entry;
        work_func_t func;
#ifdef CONFIG_LOCKDEP
        struct lockdep_map lockdep_map;
#endif
};
  • data
    • 풀워크큐 주소 또는 pool id가 저장되고 하위 플래그들이 구성된다. (본문참고)
  • entry
    • 리스트에 연결될 때 사용하는 리스트 노드
  • func
    • 작업 호출 함수

 

delayed_work 구조체

include/linux/workqueue.h

struct delayed_work {
        struct work_struct work;
        struct timer_list timer;

        /* target workqueue and CPU ->timer uses to queue ->work */
        struct workqueue_struct *wq; 
        int cpu;                    
};
  • work
    • 워크 구조체가 포함된다.
  • timer
    • 지연 타이머
  • *wq
    • 타이머가 만료되어 워크가 등록될 워크큐를 가리킨다.
  • cpu
    • 워크가 동작할 cpu 지정

 

참고

 

답글 남기기

이메일은 공개되지 않습니다. 필수 입력창은 * 로 표시되어 있습니다.