Commit 625434da authored by Linus Torvalds's avatar Linus Torvalds
Browse files

Merge tag 'for-5.13/io_uring-2021-04-27' of git://git.kernel.dk/linux-block

Pull io_uring updates from Jens Axboe:

 - Support for multi-shot mode for POLL requests

 - More efficient reference counting. This is shamelessly stolen from
   the mm side. Even though referencing is mostly single/dual user, the
   128 count was retained to keep the code the same. Maybe this
   should/could be made generic at some point.

 - Removal of the need to have a manager thread for each ring. The
   manager threads only job was checking and creating new io-threads as
   needed, instead we handle this from the queue path.

 - Allow SQPOLL without CAP_SYS_ADMIN or CAP_SYS_NICE. Since 5.12, this
   thread is "just" a regular application thread, so no need to restrict
   use of it anymore.

 - Cleanup of how internal async poll data lifetime is managed.

 - Fix for syzbot reported crash on SQPOLL cancelation.

 - Make buffer registration more like file registrations, which includes
   flexibility in avoiding full set unregistration and re-registration.

 - Fix for io-wq affinity setting.

 - Be a bit more defensive in task->pf_io_worker setup.

 - Various SQPOLL fixes.

 - Cleanup of SQPOLL creds handling.

 - Improvements to in-flight request tracking.

 - File registration cleanups.

 - Tons of cleanups and little fixes

* tag 'for-5.13/io_uring-2021-04-27' of git://git.kernel.dk/linux-block: (156 commits)
  io_uring: maintain drain logic for multishot poll requests
  io_uring: Check current->io_uring in io_uring_cancel_sqpoll
  io_uring: fix NULL reg-buffer
  io_uring: simplify SQPOLL cancellations
  io_uring: fix work_exit sqpoll cancellations
  io_uring: Fix uninitialized variable up.resv
  io_uring: fix invalid error check after malloc
  io_uring: io_sq_thread() no longer needs to reset current->pf_io_worker
  kernel: always initialize task->pf_io_worker to NULL
  io_uring: update sq_thread_idle after ctx deleted
  io_uring: add full-fledged dynamic buffers support
  io_uring: implement fixed buffers registration similar to fixed files
  io_uring: prepare fixed rw for dynanic buffers
  io_uring: keep table of pointers to ubufs
  io_uring: add generic rsrc update with tags
  io_uring: add IORING_REGISTER_RSRC
  io_uring: enumerate dynamic resources
  io_uring: add generic path for rsrc update
  io_uring: preparation for rsrc tagging
  io_uring: decouple CQE filling from requests
  ...
parents c05a182b 7b289c38
Loading
Loading
Loading
Loading
+139 −197
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@
#include <linux/cpu.h>
#include <linux/tracehook.h>

#include "../kernel/sched/sched.h"
#include "io-wq.h"

#define WORKER_IDLE_TIMEOUT	(5 * HZ)
@@ -68,6 +67,7 @@ struct io_worker {
struct io_wqe_acct {
	unsigned nr_workers;
	unsigned max_workers;
	int index;
	atomic_t nr_running;
};

@@ -108,19 +108,16 @@ struct io_wq {
	free_work_fn *free_work;
	io_wq_work_fn *do_work;

	struct task_struct *manager;

	struct io_wq_hash *hash;

	refcount_t refs;
	struct completion exited;

	atomic_t worker_refs;
	struct completion worker_done;

	struct hlist_node cpuhp_node;

	pid_t task_pid;
	struct task_struct *task;
};

static enum cpuhp_state io_wq_online;
@@ -133,8 +130,7 @@ struct io_cb_cancel_data {
	bool cancel_all;
};

static void io_wqe_cancel_pending_work(struct io_wqe *wqe,
				       struct io_cb_cancel_data *match);
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index);

static bool io_worker_get(struct io_worker *worker)
{
@@ -147,23 +143,26 @@ static void io_worker_release(struct io_worker *worker)
		complete(&worker->ref_done);
}

static inline struct io_wqe_acct *io_get_acct(struct io_wqe *wqe, bool bound)
{
	return &wqe->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
}

static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe,
						   struct io_wq_work *work)
{
	if (work->flags & IO_WQ_WORK_UNBOUND)
		return &wqe->acct[IO_WQ_ACCT_UNBOUND];

	return &wqe->acct[IO_WQ_ACCT_BOUND];
	return io_get_acct(wqe, !(work->flags & IO_WQ_WORK_UNBOUND));
}

static inline struct io_wqe_acct *io_wqe_get_acct(struct io_worker *worker)
{
	struct io_wqe *wqe = worker->wqe;

	if (worker->flags & IO_WORKER_F_BOUND)
		return &wqe->acct[IO_WQ_ACCT_BOUND];
	return io_get_acct(worker->wqe, worker->flags & IO_WORKER_F_BOUND);
}

	return &wqe->acct[IO_WQ_ACCT_UNBOUND];
static void io_worker_ref_put(struct io_wq *wq)
{
	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
}

static void io_worker_exit(struct io_worker *worker)
@@ -193,8 +192,7 @@ static void io_worker_exit(struct io_worker *worker)
	raw_spin_unlock_irq(&wqe->lock);

	kfree_rcu(worker, rcu);
	if (atomic_dec_and_test(&wqe->wq->worker_refs))
		complete(&wqe->wq->worker_done);
	io_worker_ref_put(wqe->wq);
	do_exit(0);
}

@@ -209,7 +207,7 @@ static inline bool io_wqe_run_queue(struct io_wqe *wqe)

/*
 * Check head of free list for an available worker. If one isn't available,
 * caller must wake up the wq manager to create one.
 * caller must create one.
 */
static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
	__must_hold(RCU)
@@ -233,7 +231,7 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)

/*
 * We need a worker. If we find a free one, we're good. If not, and we're
 * below the max number of workers, wake up the manager to create one.
 * below the max number of workers, create one.
 */
static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
@@ -249,8 +247,11 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
	ret = io_wqe_activate_free_worker(wqe);
	rcu_read_unlock();

	if (!ret && acct->nr_workers < acct->max_workers)
		wake_up_process(wqe->wq->manager);
	if (!ret && acct->nr_workers < acct->max_workers) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		create_io_worker(wqe->wq, wqe, acct->index);
	}
}

static void io_wqe_inc_running(struct io_worker *worker)
@@ -260,14 +261,61 @@ static void io_wqe_inc_running(struct io_worker *worker)
	atomic_inc(&acct->nr_running);
}

struct create_worker_data {
	struct callback_head work;
	struct io_wqe *wqe;
	int index;
};

static void create_worker_cb(struct callback_head *cb)
{
	struct create_worker_data *cwd;
	struct io_wq *wq;

	cwd = container_of(cb, struct create_worker_data, work);
	wq = cwd->wqe->wq;
	create_io_worker(wq, cwd->wqe, cwd->index);
	kfree(cwd);
}

static void io_queue_worker_create(struct io_wqe *wqe, struct io_wqe_acct *acct)
{
	struct create_worker_data *cwd;
	struct io_wq *wq = wqe->wq;

	/* raced with exit, just ignore create call */
	if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
		goto fail;

	cwd = kmalloc(sizeof(*cwd), GFP_ATOMIC);
	if (cwd) {
		init_task_work(&cwd->work, create_worker_cb);
		cwd->wqe = wqe;
		cwd->index = acct->index;
		if (!task_work_add(wq->task, &cwd->work, TWA_SIGNAL))
			return;

		kfree(cwd);
	}
fail:
	atomic_dec(&acct->nr_running);
	io_worker_ref_put(wq);
}

static void io_wqe_dec_running(struct io_worker *worker)
	__must_hold(wqe->lock)
{
	struct io_wqe_acct *acct = io_wqe_get_acct(worker);
	struct io_wqe *wqe = worker->wqe;

	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe))
		io_wqe_wake_worker(wqe, acct);
	if (!(worker->flags & IO_WORKER_F_UP))
		return;

	if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) {
		atomic_inc(&acct->nr_running);
		atomic_inc(&wqe->wq->worker_refs);
		io_queue_worker_create(wqe, acct);
	}
}

/*
@@ -280,6 +328,8 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
{
	bool worker_bound, work_bound;

	BUILD_BUG_ON((IO_WQ_ACCT_UNBOUND ^ IO_WQ_ACCT_BOUND) != 1);

	if (worker->flags & IO_WORKER_F_FREE) {
		worker->flags &= ~IO_WORKER_F_FREE;
		hlist_nulls_del_init_rcu(&worker->nulls_node);
@@ -292,16 +342,11 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker,
	worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0;
	work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0;
	if (worker_bound != work_bound) {
		int index = work_bound ? IO_WQ_ACCT_UNBOUND : IO_WQ_ACCT_BOUND;
		io_wqe_dec_running(worker);
		if (work_bound) {
			worker->flags |= IO_WORKER_F_BOUND;
			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--;
			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++;
		} else {
			worker->flags &= ~IO_WORKER_F_BOUND;
			wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++;
			wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--;
		}
		worker->flags ^= IO_WORKER_F_BOUND;
		wqe->acct[index].nr_workers--;
		wqe->acct[index ^ 1].nr_workers++;
		io_wqe_inc_running(worker);
	 }
}
@@ -486,9 +531,8 @@ static int io_wqe_worker(void *data)
	char buf[TASK_COMM_LEN];

	worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
	io_wqe_inc_running(worker);

	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task_pid);
	snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
	set_task_comm(current, buf);

	while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
@@ -552,8 +596,7 @@ void io_wq_worker_running(struct task_struct *tsk)

/*
 * Called when worker is going to sleep. If there are no workers currently
 * running and we have work pending, wake up a free one or have the manager
 * set one up.
 * running and we have work pending, wake up a free one or create a new one.
 */
void io_wq_worker_sleeping(struct task_struct *tsk)
{
@@ -573,7 +616,7 @@ void io_wq_worker_sleeping(struct task_struct *tsk)
	raw_spin_unlock_irq(&worker->wqe->lock);
}

static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
{
	struct io_wqe_acct *acct = &wqe->acct[index];
	struct io_worker *worker;
@@ -583,7 +626,7 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)

	worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, wqe->node);
	if (!worker)
		return false;
		goto fail;

	refcount_set(&worker->ref, 1);
	worker->nulls_node.pprev = NULL;
@@ -591,14 +634,13 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
	spin_lock_init(&worker->lock);
	init_completion(&worker->ref_done);

	atomic_inc(&wq->worker_refs);

	tsk = create_io_thread(io_wqe_worker, worker, wqe->node);
	if (IS_ERR(tsk)) {
		if (atomic_dec_and_test(&wq->worker_refs))
			complete(&wq->worker_done);
		kfree(worker);
		return false;
fail:
		atomic_dec(&acct->nr_running);
		io_worker_ref_put(wq);
		return;
	}

	tsk->pf_io_worker = worker;
@@ -617,20 +659,6 @@ static bool create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
	acct->nr_workers++;
	raw_spin_unlock_irq(&wqe->lock);
	wake_up_new_task(tsk);
	return true;
}

static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index)
	__must_hold(wqe->lock)
{
	struct io_wqe_acct *acct = &wqe->acct[index];

	if (acct->nr_workers && test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state))
		return false;
	/* if we have available workers or no work, no need */
	if (!hlist_nulls_empty(&wqe->free_list) || !io_wqe_run_queue(wqe))
		return false;
	return acct->nr_workers < acct->max_workers;
}

/*
@@ -665,93 +693,11 @@ static bool io_wq_worker_wake(struct io_worker *worker, void *data)
	return false;
}

static void io_wq_check_workers(struct io_wq *wq)
{
	int node;

	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
		bool fork_worker[2] = { false, false };

		if (!node_online(node))
			continue;

		raw_spin_lock_irq(&wqe->lock);
		if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND))
			fork_worker[IO_WQ_ACCT_BOUND] = true;
		if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND))
			fork_worker[IO_WQ_ACCT_UNBOUND] = true;
		raw_spin_unlock_irq(&wqe->lock);
		if (fork_worker[IO_WQ_ACCT_BOUND])
			create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND);
		if (fork_worker[IO_WQ_ACCT_UNBOUND])
			create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND);
	}
}

static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
{
	return true;
}

static void io_wq_cancel_pending(struct io_wq *wq)
{
	struct io_cb_cancel_data match = {
		.fn		= io_wq_work_match_all,
		.cancel_all	= true,
	};
	int node;

	for_each_node(node)
		io_wqe_cancel_pending_work(wq->wqes[node], &match);
}

/*
 * Manager thread. Tasked with creating new workers, if we need them.
 */
static int io_wq_manager(void *data)
{
	struct io_wq *wq = data;
	char buf[TASK_COMM_LEN];
	int node;

	snprintf(buf, sizeof(buf), "iou-mgr-%d", wq->task_pid);
	set_task_comm(current, buf);

	do {
		set_current_state(TASK_INTERRUPTIBLE);
		io_wq_check_workers(wq);
		schedule_timeout(HZ);
		if (signal_pending(current)) {
			struct ksignal ksig;

			if (!get_signal(&ksig))
				continue;
			set_bit(IO_WQ_BIT_EXIT, &wq->state);
		}
	} while (!test_bit(IO_WQ_BIT_EXIT, &wq->state));

	io_wq_check_workers(wq);

	rcu_read_lock();
	for_each_node(node)
		io_wq_for_each_worker(wq->wqes[node], io_wq_worker_wake, NULL);
	rcu_read_unlock();

	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);
	wait_for_completion(&wq->worker_done);

	spin_lock_irq(&wq->hash->wait.lock);
	for_each_node(node)
		list_del_init(&wq->wqes[node]->wait.entry);
	spin_unlock_irq(&wq->hash->wait.lock);

	io_wq_cancel_pending(wq);
	complete(&wq->exited);
	do_exit(0);
}

static void io_run_cancel(struct io_wq_work *work, struct io_wqe *wqe)
{
	struct io_wq *wq = wqe->wq;
@@ -783,39 +729,13 @@ static void io_wqe_insert_work(struct io_wqe *wqe, struct io_wq_work *work)
	wq_list_add_after(&work->list, &tail->list, &wqe->work_list);
}

static int io_wq_fork_manager(struct io_wq *wq)
{
	struct task_struct *tsk;

	if (wq->manager)
		return 0;

	WARN_ON_ONCE(test_bit(IO_WQ_BIT_EXIT, &wq->state));

	init_completion(&wq->worker_done);
	atomic_set(&wq->worker_refs, 1);
	tsk = create_io_thread(io_wq_manager, wq, NUMA_NO_NODE);
	if (!IS_ERR(tsk)) {
		wq->manager = get_task_struct(tsk);
		wake_up_new_task(tsk);
		return 0;
	}

	if (atomic_dec_and_test(&wq->worker_refs))
		complete(&wq->worker_done);

	return PTR_ERR(tsk);
}

static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work)
{
	struct io_wqe_acct *acct = io_work_get_acct(wqe, work);
	int work_flags;
	unsigned long flags;

	/* Can only happen if manager creation fails after exec */
	if (io_wq_fork_manager(wqe->wq) ||
	    test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
	if (test_bit(IO_WQ_BIT_EXIT, &wqe->wq->state)) {
		io_run_cancel(work, wqe);
		return;
	}
@@ -970,17 +890,12 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
			    int sync, void *key)
{
	struct io_wqe *wqe = container_of(wait, struct io_wqe, wait);
	int ret;

	list_del_init(&wait->entry);

	rcu_read_lock();
	ret = io_wqe_activate_free_worker(wqe);
	io_wqe_activate_free_worker(wqe);
	rcu_read_unlock();

	if (!ret)
		wake_up_process(wqe->wq->manager);

	return 1;
}

@@ -1021,6 +936,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
			goto err;
		wq->wqes[node] = wqe;
		wqe->node = alloc_node;
		wqe->acct[IO_WQ_ACCT_BOUND].index = IO_WQ_ACCT_BOUND;
		wqe->acct[IO_WQ_ACCT_UNBOUND].index = IO_WQ_ACCT_UNBOUND;
		wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
		atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0);
		wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers =
@@ -1035,12 +952,10 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
		INIT_LIST_HEAD(&wqe->all_list);
	}

	wq->task_pid = current->pid;
	init_completion(&wq->exited);
	wq->task = get_task_struct(data->task);
	refcount_set(&wq->refs, 1);

	ret = io_wq_fork_manager(wq);
	if (!ret)
	atomic_set(&wq->worker_refs, 1);
	init_completion(&wq->worker_done);
	return wq;
err:
	io_wq_put_hash(data->hash);
@@ -1054,14 +969,49 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
	return ERR_PTR(ret);
}

static void io_wq_destroy_manager(struct io_wq *wq)
static bool io_task_work_match(struct callback_head *cb, void *data)
{
	if (wq->manager) {
		wake_up_process(wq->manager);
		wait_for_completion(&wq->exited);
		put_task_struct(wq->manager);
		wq->manager = NULL;
	struct create_worker_data *cwd;

	if (cb->func != create_worker_cb)
		return false;
	cwd = container_of(cb, struct create_worker_data, work);
	return cwd->wqe->wq == data;
}

static void io_wq_exit_workers(struct io_wq *wq)
{
	struct callback_head *cb;
	int node;

	set_bit(IO_WQ_BIT_EXIT, &wq->state);

	if (!wq->task)
		return;

	while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
		struct create_worker_data *cwd;

		cwd = container_of(cb, struct create_worker_data, work);
		atomic_dec(&cwd->wqe->acct[cwd->index].nr_running);
		io_worker_ref_put(wq);
		kfree(cwd);
	}

	rcu_read_lock();
	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];

		io_wq_for_each_worker(wqe, io_wq_worker_wake, NULL);
		spin_lock_irq(&wq->hash->wait.lock);
		list_del_init(&wq->wqes[node]->wait.entry);
		spin_unlock_irq(&wq->hash->wait.lock);
	}
	rcu_read_unlock();
	io_worker_ref_put(wq);
	wait_for_completion(&wq->worker_done);
	put_task_struct(wq->task);
	wq->task = NULL;
}

static void io_wq_destroy(struct io_wq *wq)
@@ -1070,8 +1020,7 @@ static void io_wq_destroy(struct io_wq *wq)

	cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);

	set_bit(IO_WQ_BIT_EXIT, &wq->state);
	io_wq_destroy_manager(wq);
	io_wq_exit_workers(wq);

	for_each_node(node) {
		struct io_wqe *wqe = wq->wqes[node];
@@ -1095,21 +1044,14 @@ void io_wq_put(struct io_wq *wq)

void io_wq_put_and_exit(struct io_wq *wq)
{
	set_bit(IO_WQ_BIT_EXIT, &wq->state);
	io_wq_destroy_manager(wq);
	io_wq_exit_workers(wq);
	io_wq_put(wq);
}

static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
{
	struct task_struct *task = worker->task;
	struct rq_flags rf;
	struct rq *rq;

	rq = task_rq_lock(task, &rf);
	do_set_cpus_allowed(task, cpumask_of_node(worker->wqe->node));
	task->flags |= PF_NO_SETAFFINITY;
	task_rq_unlock(rq, task, &rf);
	set_cpus_allowed_ptr(worker->task, cpumask_of_node(worker->wqe->node));

	return false;
}

+1 −0
Original line number Diff line number Diff line
@@ -116,6 +116,7 @@ static inline void io_wq_put_hash(struct io_wq_hash *hash)

struct io_wq_data {
	struct io_wq_hash *hash;
	struct task_struct *task;
	io_wq_work_fn *do_work;
	free_work_fn *free_work;
};
+1375 −1237

File changed.

Preview size limit exceeded, changes collapsed.

+5 −7
Original line number Diff line number Diff line
@@ -7,19 +7,17 @@

#if defined(CONFIG_IO_URING)
struct sock *io_uring_get_socket(struct file *file);
void __io_uring_task_cancel(void);
void __io_uring_files_cancel(struct files_struct *files);
void __io_uring_cancel(struct files_struct *files);
void __io_uring_free(struct task_struct *tsk);

static inline void io_uring_task_cancel(void)
static inline void io_uring_files_cancel(struct files_struct *files)
{
	if (current->io_uring)
		__io_uring_task_cancel();
		__io_uring_cancel(files);
}
static inline void io_uring_files_cancel(struct files_struct *files)
static inline void io_uring_task_cancel(void)
{
	if (current->io_uring)
		__io_uring_files_cancel(files);
	return io_uring_files_cancel(NULL);
}
static inline void io_uring_free(struct task_struct *tsk)
{
+2 −0
Original line number Diff line number Diff line
@@ -22,6 +22,8 @@ enum task_work_notify_mode {
int task_work_add(struct task_struct *task, struct callback_head *twork,
			enum task_work_notify_mode mode);

struct callback_head *task_work_cancel_match(struct task_struct *task,
	bool (*match)(struct callback_head *, void *data), void *data);
struct callback_head *task_work_cancel(struct task_struct *, task_work_func_t);
void task_work_run(void);

Loading