/* $NetBSD: taskq.c,v 1.10.2.1 2019/08/20 11:45:35 martin Exp $ */ /*- * Copyright (c) 2019 The NetBSD Foundation, Inc. * All rights reserved. * * This code is derived from software contributed to The NetBSD Foundation * by Juergen Hannken-Illjes. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND CONTRIBUTORS * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS * BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include #include #include #include #include #include #include #include #include struct taskq_executor { struct threadpool_job te_job; /* Threadpool job serving the queue. */ taskq_t *te_self; /* Backpointer to the queue. */ unsigned te_running:1; /* True if the job is running. */ }; struct taskq { int tq_nthreads; /* # of threads serving queue. */ pri_t tq_pri; /* Scheduling priority. */ uint_t tq_flags; /* Saved flags from taskq_create. */ int tq_active; /* # of tasks (queued or running). */ int tq_running; /* # of jobs currently running. */ int tq_waiting; /* # of jobs currently idle. */ unsigned tq_destroyed:1; /* True if queue gets destroyed. */ kmutex_t tq_lock; /* Queue and job lock. */ kcondvar_t tq_cv; /* Queue condvar. */ struct taskq_executor *tq_executor; /* Array of jobs. */ struct threadpool *tq_threadpool; /* Pool backing the jobs. */ SIMPLEQ_HEAD(, taskq_ent) tq_list; /* Queue of tasks waiting. */ }; taskq_t *system_taskq; /* General purpose task queue. */ static specificdata_key_t taskq_lwp_key; /* Null or taskq this thread runs. */ /* * Threadpool job to service tasks from task queue. * Runs until the task queue gets destroyed or the queue is empty for 10 secs. */ static void task_executor(struct threadpool_job *job) { struct taskq_executor *state = (struct taskq_executor *)job; taskq_t *tq = state->te_self; taskq_ent_t *tqe; bool is_dynamic; int error; lwp_setspecific(taskq_lwp_key, tq); mutex_enter(&tq->tq_lock); while (!tq->tq_destroyed) { if (SIMPLEQ_EMPTY(&tq->tq_list)) { if (ISSET(tq->tq_flags, TASKQ_DYNAMIC)) break; tq->tq_waiting++; error = cv_timedwait(&tq->tq_cv, &tq->tq_lock, mstohz(10000)); tq->tq_waiting--; if (SIMPLEQ_EMPTY(&tq->tq_list)) { if (error) break; continue; } } tqe = SIMPLEQ_FIRST(&tq->tq_list); KASSERT(tqe != NULL); SIMPLEQ_REMOVE_HEAD(&tq->tq_list, tqent_list); is_dynamic = tqe->tqent_dynamic; tqe->tqent_queued = 0; mutex_exit(&tq->tq_lock); (*tqe->tqent_func)(tqe->tqent_arg); mutex_enter(&tq->tq_lock); if (is_dynamic) kmem_free(tqe, sizeof(*tqe)); tq->tq_active--; } state->te_running = 0; tq->tq_running--; threadpool_job_done(job); mutex_exit(&tq->tq_lock); lwp_setspecific(taskq_lwp_key, NULL); } void taskq_init(void) { lwp_specific_key_create(&taskq_lwp_key, NULL); system_taskq = taskq_create("system_taskq", ncpu * 4, PRI_KERNEL, 4, 512, TASKQ_DYNAMIC | TASKQ_PREPOPULATE); KASSERT(system_taskq != NULL); } void taskq_fini(void) { taskq_destroy(system_taskq); lwp_specific_key_delete(taskq_lwp_key); } /* * Dispatch a task entry creating executors as neeeded. */ static void taskq_dispatch_common(taskq_t *tq, taskq_ent_t *tqe, uint_t flags) { int i; KASSERT(mutex_owned(&tq->tq_lock)); if (ISSET(flags, TQ_FRONT)) SIMPLEQ_INSERT_HEAD(&tq->tq_list, tqe, tqent_list); else SIMPLEQ_INSERT_TAIL(&tq->tq_list, tqe, tqent_list); tqe->tqent_queued = 1; tq->tq_active++; if (tq->tq_waiting) { cv_signal(&tq->tq_cv); mutex_exit(&tq->tq_lock); return; } if (tq->tq_running < tq->tq_nthreads) { for (i = 0; i < tq->tq_nthreads; i++) { if (!tq->tq_executor[i].te_running) { tq->tq_executor[i].te_running = 1; tq->tq_running++; threadpool_schedule_job(tq->tq_threadpool, &tq->tq_executor[i].te_job); break; } } } mutex_exit(&tq->tq_lock); } /* * Allocate and dispatch a task entry. */ taskqid_t taskq_dispatch(taskq_t *tq, task_func_t func, void *arg, uint_t flags) { taskq_ent_t *tqe; KASSERT(!ISSET(flags, ~(TQ_SLEEP | TQ_NOSLEEP | TQ_NOQUEUE))); KASSERT(ISSET(tq->tq_flags, TASKQ_DYNAMIC) || !ISSET(flags, TQ_NOQUEUE)); if (ISSET(flags, (TQ_SLEEP | TQ_NOSLEEP)) == TQ_NOSLEEP) tqe = kmem_alloc(sizeof(*tqe), KM_NOSLEEP); else tqe = kmem_alloc(sizeof(*tqe), KM_SLEEP); if (tqe == NULL) return (taskqid_t) NULL; mutex_enter(&tq->tq_lock); if (ISSET(flags, TQ_NOQUEUE) && tq->tq_active == tq->tq_nthreads) { mutex_exit(&tq->tq_lock); kmem_free(tqe, sizeof(*tqe)); return (taskqid_t) NULL; } tqe->tqent_dynamic = 1; tqe->tqent_queued = 0; tqe->tqent_func = func; tqe->tqent_arg = arg; taskq_dispatch_common(tq, tqe, flags); return (taskqid_t) tqe; } /* * Dispatch a preallocated task entry. * Assume caller zeroed it. */ void taskq_dispatch_ent(taskq_t *tq, task_func_t func, void *arg, uint_t flags, taskq_ent_t *tqe) { KASSERT(!ISSET(flags, ~(TQ_FRONT))); tqe->tqent_func = func; tqe->tqent_arg = arg; mutex_enter(&tq->tq_lock); taskq_dispatch_common(tq, tqe, flags); } /* * Wait until all tasks have completed. */ void taskq_wait(taskq_t *tq) { KASSERT(!taskq_member(tq, curlwp)); mutex_enter(&tq->tq_lock); while (tq->tq_active) kpause("qwait", false, 1, &tq->tq_lock); mutex_exit(&tq->tq_lock); } /* * True if the current thread is an executor for this queue. */ int taskq_member(taskq_t *tq, kthread_t *thread) { KASSERT(thread == curlwp); return (lwp_getspecific(taskq_lwp_key) == tq); } /* * Create a task queue. * Allocation hints are ignored. */ taskq_t * taskq_create(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, uint_t flags) { int i; struct threadpool *threadpool; taskq_t *tq; KASSERT(!ISSET(flags, ~(TASKQ_DYNAMIC | TASKQ_PREPOPULATE | TASKQ_THREADS_CPU_PCT))); if (threadpool_get(&threadpool, pri) != 0) return NULL; if (ISSET(flags, TASKQ_THREADS_CPU_PCT)) nthreads = MAX((ncpu * nthreads) / 100, 1); tq = kmem_zalloc(sizeof(*tq), KM_SLEEP); tq->tq_nthreads = nthreads; tq->tq_pri = pri; tq->tq_flags = flags; mutex_init(&tq->tq_lock, NULL, MUTEX_DEFAULT, IPL_NONE); cv_init(&tq->tq_cv, NULL, CV_DEFAULT, NULL); SIMPLEQ_INIT(&tq->tq_list); tq->tq_executor = kmem_alloc(sizeof(*tq->tq_executor) * nthreads, KM_SLEEP); for (i = 0; i < nthreads; i++) { threadpool_job_init(&tq->tq_executor[i].te_job, task_executor, &tq->tq_lock, "%s/%d", name, i); tq->tq_executor[i].te_self = tq; tq->tq_executor[i].te_running = 0; } tq->tq_threadpool = threadpool; return tq; } taskq_t * taskq_create_proc(const char *name, int nthreads, pri_t pri, int minalloc, int maxalloc, struct proc *proc, uint_t flags) { return taskq_create(name, nthreads, pri, minalloc, maxalloc, flags); } /* * Destroy a task queue. */ void taskq_destroy(taskq_t *tq) { int i; taskq_ent_t *tqe; KASSERT(!taskq_member(tq, curlwp)); /* Wait for tasks to complete. */ taskq_wait(tq); /* Mark destroyed and ask running executors to quit. */ mutex_enter(&tq->tq_lock); tq->tq_destroyed = 1; cv_broadcast(&tq->tq_cv); /* Wait for all executors to quit. */ while (tq->tq_running > 0) kpause("tqdestroy", false, 1, &tq->tq_lock); mutex_exit(&tq->tq_lock); for (i = 0; i < tq->tq_nthreads; i++) threadpool_job_destroy(&tq->tq_executor[i].te_job); threadpool_put(tq->tq_threadpool, tq->tq_pri); mutex_destroy(&tq->tq_lock); cv_destroy(&tq->tq_cv); kmem_free(tq->tq_executor, sizeof(*tq->tq_executor) * tq->tq_nthreads); kmem_free(tq, sizeof(*tq)); }