作者简介
胡哲宁,西安邮电大学计算机科学与技术专业大二学生。
由于协程本身对操作系统的不可见性,协程中出现的 BUG 往往不能通过一些已有的工具去排查。在谷歌内部有一套闭源的用户态任务调度框架 SwitchTo
, 这个框架可以为谷歌提供延迟敏感的服务,对运行的内容进行细粒度的用户空间控制/调度,它可以让内核来实现上下文的切换,同时将任务何时切换,何时恢复的工作交给了用户态的程序来做,这样既可以实现在任务间协作式切换的功能,又可以不丧失内核对于任务的控制和观察能力。谷歌去年恢复尝试将其 SwitchTo
API 上游引入 Linux。相关补丁见:[1],[2],[3],[4].
1pid_t switchto_wait(timespec *timeout)
2/* Enter an 'unscheduled state', until our control is re-initiated by another thread or external event (signal). */
3void switchto_resume(pid_t tid)
4/* Resume regular execution of tid */
5pid_t switchto_switch(pid_t tid)
6/* Synchronously transfer control to target sibling thread, leaving the current thread unscheduled.Analogous to:Atomically { Resume(t1); Wait(NULL); }
7*/
这是使用 SwitchTo
和使用其他线程间切换的组件的上下文切换性能对比:
Benchmark
Time(ns)
CPU(ns)
Iterations
BM_Futex
2905
1958
1000000
BM_GoogleMutex
3102
2326
1000000
BM_SwitchTo
179
178
3917412
BM_SwitchResume
2734
1554
1000000
可以看到在使用 SwitchTo
后切换的性能比其他组件提高了一个数量级别。
SwitchTo
是如何做到在切换性能上大幅度领先的呢?我们暂时可能无法看到它们,但让我们来看看 Peter Oskolkov
向 LKML(Linux Kernel Mail List) 提出的补丁中有关 futex_swap()
的实现。可以确定的是,SwitchTo
构建在这个内核函数之上。
futex
全称 fast user-space locking
,快速用户空间互斥锁,作为内核中一种基本的同步原语,它提供了非常快速的无竞争锁获取和释放,用于构建复杂的同步结构:互斥锁、条件变量、信号量等。由于 futex
的一些机制和使用过于复杂,glibc
没有为 futex
提供包装器,但我们仍然可以使用 syscall
来调用这个 极其 hack 的系统调用。
1static int futex(uint32_t *uaddr, int futex_op, uint32_t val,
2 const struct timespec *timeout, uint32_t *uaddr2,
3 uint32_t val3)
4 {
5 return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
6}
uaddr
: 一个四字节的用户空间地址。多个任务间可以通过 *uaddr
的值的变化来控制阻塞或者运行。
futex_op
: 用于控制 futex
执行的命令 如 FUTEX_WAIT
,FUTEX_WAKE
,FUTEX_LOCK_PI
,FUTEX_UNLOCK_PI
…
val
: 在不同的 futex_op
具有不同的含义,如在 futex(uaddr, FUTEX_WAKE)
中作为唤醒等待在该 futex
上所有任务的数量。
timeout
: 作为等待(如 FUTEX_WAIT
)的超时时间。
uaddr2
: uaddr2
参数是一个四字节的用户空间地址 在需要的场景使用(如后文的 FUTEX_SWAP
)。
val3
: 整数参数val3
的解释取决于在操作上。
由于用户模式和内核模式之间的上下文切换很昂贵,futex
实现的同步结构会尽可能多地留在用户空间,这意味着它们只需要执行更少的系统调用。futex
的状态存储在用户空间变量中,futex
可以通过一些原子操作在没有竞争的情况下更改 futex
的状态,而无需系统调用的开销。
在看 futex_swap()
之前让我们先看看 内核中 与 futex
最重要的两个内核函数:
1static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset);
简单来说 对于 futex_wait()
有用的参数就只有 uaddr
,val
,abs_time
,就像 futex_wait(uaddr,val,abs_time)
。其含义是当这个用户空间地址 uaddr
的值等于传入的参数 val
的时候睡眠,即 if (*uaddr == val) wait()
. futex_wake()
可以将它唤醒,另外还可以通过指定超时时间来超时唤醒。
1static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
2 ktime_t *abs_time, u32 bitset)
3{
4 struct hrtimer_sleeper timeout, *to;
5 struct restart_block *restart;
6 struct futex_hash_bucket *hb;
7 struct futex_q q = futex_q_init;
8 int ret;
9
10 if (!bitset)
11 return -EINVAL;
12 q.bitset = bitset;
13 /* 设置定时器 */
14 to = futex_setup_timer(abs_time, &timeout, flags,
15 current->timer_slack_ns);
16retry:
17 /*
18 * Prepare to wait on uaddr. On success, holds hb lock and increments
19 * q.key refs.
20 */
21 /* 获取哈希桶自旋锁 如果 *uaddr == val return -EWOULDBLOCK 否则返回 0 */
22 ret = futex_wait_setup(uaddr, val, flags, &q, &hb);
23 if (ret)
24 goto out;
25
26 /* queue_me and wait for wakeup, timeout, or a signal. */
27 /*将当前任务状态改为TASK_INTERRUPTIBLE,并将当前任务插入到futex等待队列,释放哈希桶自旋锁,然后重新调度*/
28 futex_wait_queue_me(hb, &q, to);
29
30 /* If we were woken (and unqueued), we succeeded, whatever. */
31 ret = 0;
32 /* unqueue_me() drops q.key ref */
33 /* 如果 unqueue_me 返回 0 表示已经被删除则是正常唤醒跳到 out 否则则是超时触发 */
34 if (!unqueue_me(&q))
35 goto out;
36 ret = -ETIMEDOUT;
37 if (to && !to->task)
38 goto out;
39
40 /*
41 * We expect signal_pending(current), but we might be the
42 * victim of a spurious wakeup as well.
43 */
44 if (!signal_pending(current))
45 goto retry;
46
47 ret = -ERESTARTSYS;
48 if (!abs_time)
49 goto out;
50
51 restart = ¤t->restart_block;
52 restart->futex.uaddr = uaddr;
53 restart->futex.val = val;
54 restart->futex.time = *abs_time;
55 restart->futex.bitset = bitset;
56 restart->futex.flags = flags | FLAGS_HAS_TIMEOUT;
57
58 ret = set_restart_fn(restart, futex_wait_restart);
59
60out:
61 if (to) {
62 /* 即将结束,取消定时任务 */
63 hrtimer_cancel(&to->timer);
64 destroy_hrtimer_on_stack(&to->timer);
65 }
66 return ret;
67}
futex
内部采用了哈希表的数据结构来保存那些需要睡眠的任务。通过用户空间地址 uaddr
,flag
,以及 futex
的读写状态可以计算出相同的 key
值,将需要睡眠的任务的 task_struct
放到对应的哈希桶上的优先链表的节点中。
futex_wait()
流程:
寻找 futex
对应的 key
,获取 key
对应的哈希桶。
获取哈希桶自旋锁,如果 *uaddr == val
返回错误给用户态。
否则将当前任务状态改为 TASK_INTERRUPTIBLE
,并将当前任务插入到 futex
等待队列,释放哈希桶自旋锁,然后调度器重新调度。
从睡眠中苏醒,进行超时和唤醒两种情况的相应处理,返回用户态。
1static int futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset);
futex_wake()
的参数稍微简单一些,最重要的只有一个用户地址 uaddr
,以及触发唤醒的任务数最大值 nr_wake
。
1/*
2 * Wake up waiters matching bitset queued on this futex (uaddr).
3 */
4static int
5futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
6{
7 struct futex_hash_bucket *hb;
8 struct futex_q *this, *next;
9 union futex_key key = FUTEX_KEY_INIT;
10 int ret;
11 DEFINE_WAKE_Q(wake_q);
12
13 if (!bitset)
14 return -EINVAL;
15
16 /* 寻找 futex 对应的 key */
17 ret = get_futex_key(uaddr, flags & FLAGS_SHARED, &key, FUTEX_READ);
18 if (unlikely(ret != 0))
19 return ret;
20
21 /* 获取 key 对应的哈希桶 */
22 hb = hash_futex(&key);
23
24 /* Make sure we really have tasks to wakeup */
25 /* 如果哈希桶桑没有等待者...那么谁也不需要被唤醒 */
26 if (!hb_waiters_pending(hb))
27 return ret;
28
29 /* 获取当前哈希桶的自旋锁 */
30 spin_lock(&hb->lock);
31
32 /* 遍历这个哈系桶上的优先链表 */
33 plist_for_each_entry_safe(this, next, &hb->chain, list) {
34 /* 如果 this 项的 key 与 futex 对应的 key 相同 说明该项在等待 futex */
35 if (match_futex (&this->key, &key)) {
36 if (this->pi_state || this->rt_waiter) {
37 ret = -EINVAL;
38 break;
39 }
40
41 /* Check if one of the bits is set in both bitsets */
42 if (!(this->bitset & bitset))
43 continue;
44
45 /* 将 this 添加到唤醒队列 wake_q 中 */
46 mark_wake_futex(&wake_q, this);
47 /* ret此时为0 递增至 nr_wake 最大唤醒任务数量则退出循环 */
48 if (++ret >= nr_wake)
49 break;
50 }
51 }
52
53 spin_unlock(&hb->lock);
54 wake_up_q(&wake_q);
55 return ret;
56}
futex_wake()
流程:
寻找 futex
对应的 key
,获取 key
对应的哈希桶。
获取哈希桶的自旋锁,遍历这个哈系桶上的优先链表,如果当前任务的 key
与 futex
对应的 key
相同,说明该任务在等待 futex,将当前任务添加到唤醒队列 wake_q
中,如果达到了 nr_wake
个,则退出循环。
释放哈希桶自旋锁,唤醒队列 wake_q
中每一个任务。
因此,通过 futex_wait()
和 futex_wake()
,我们可以实现任务的等待和唤醒,见 [5] man 手册中的小 demo。
基于以上的了解,现在我们来看 Peter Oskolkov
向内核提交的 FUTEX_SWAP
补丁系列。首先看 [4] 中有关 FUTEX_SWAP
的相关测试补丁,其中关键的功能函数 futex_swap_op()
引起了我们的注意:
1void futex_swap_op(int mode, futex_t *futex_this, futex_t *futex_that)
2{
3 int ret;
4
5 switch (mode) {
6 case SWAP_WAKE_WAIT:
7 futex_set(futex_this, FUTEX_WAITING);
8 futex_set(futex_that, FUTEX_WAKEUP);
9 futex_wake(futex_that, 1, FUTEX_PRIVATE_FLAG);
10 futex_wait(futex_this, FUTEX_WAITING, NULL, FUTEX_PRIVATE_FLAG);
11 if (*futex_this != FUTEX_WAKEUP) {
12 fprintf(stderr, "unexpected futex_this value on wakeup\n");
13 exit(1);
14 }
15 break;
16
17 case SWAP_SWAP:
18 futex_set(futex_this, FUTEX_WAITING);
19 futex_set(futex_that, FUTEX_WAKEUP);
20 ret = futex_swap(futex_this, FUTEX_WAITING, NULL,
21 futex_that, FUTEX_PRIVATE_FLAG);
22 if (ret < 0 && errno == ENOSYS) {
23 /* futex_swap not implemented */
24 perror("futex_swap");
25 exit(1);
26 }
27 if (*futex_this != FUTEX_WAKEUP) {
28 fprintf(stderr, "unexpected futex_this value on wakeup\n");
29 exit(1);
30 }
31 break;
32
33 default:
34 fprintf(stderr, "unknown mode in %s\n", __func__);
35 exit(1);
36 }
37}
其中比较了使用 FUTEX_WAIT
, FUTEX_WAKE
实现线程切换以及使用 FUTEX_SWAP
实现线程切换的两种方式。
其中 通过调用参数含有 futex_this
和 futex_that
的 futex_swap()
,
1ret = futex_swap(futex_this, FUTEX_WAITING, NULL,
2 futex_that, FUTEX_PRIVATE_FLAG);
代替了下面 futex_wake()
和 futex_wait()
的两步操作,
1futex_wake(futex_that, 1, FUTEX_PRIVATE_FLAG);
2futex_wait(futex_this, FUTEX_WAITING, NULL, FUTEX_PRIVATE_FLAG);
实现了让当前线程睡眠,并切换到指定线程的作用。
1$ ./futex_swap -i 100000
2
3
4------- running SWAP_WAKE_WAIT -----------
5
6
7completed 100000 swap and back iterations in 820683263 ns: 4103 ns per swap
8PASS
9
10
11------- running SWAP_SWAP -----------
12
13
14completed 100000 swap and back iterations in 124034476 ns: 620 ns per swap
15PASS
可见在 100k 级别的任务切换批处理上,使用新接口 futex_swap() 的上下文切换性能要比之前好很多。
1/*
2- * Wake up waiters matching bitset queued on this futex (uaddr).
3+ * Prepare wake queue matching bitset queued on this futex (uaddr).
4 */
5 static int
6-futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
7+prepare_wake_q(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset,
8+ struct wake_q_head *wake_q)
9 {
10 struct futex_hash_bucket *hb;
11 struct futex_q *this, *next;
12 union futex_key key = FUTEX_KEY_INIT;
13 int ret;
14- DEFINE_WAKE_Q(wake_q);
15
16 if (!bitset)
17 return -EINVAL;
18@@ -1629,20 +1629,34 @@ futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
19 if (!(this->bitset & bitset))
20 continue;
21
22- mark_wake_futex(&wake_q, this);
23+ mark_wake_futex(wake_q, this);
24 if (++ret >= nr_wake)
25 break;
26 }
27 }
28
29 spin_unlock(&hb->lock);
30- wake_up_q(&wake_q);
31 out_put_key:
32 put_futex_key(&key);
33 out:
34 return ret;
35 }
36
37+/*
38+ * Wake up waiters matching bitset queued on this futex (uaddr).
39+ */
40+static int
41+futex_wake(u32 __user *uaddr, unsigned int flags, int nr_wake, u32 bitset)
42+{
43+ int ret;
44+ DEFINE_WAKE_Q(wake_q);
45+
46+ ret = prepare_wake_q(uaddr, flags, nr_wake, bitset, &wake_q);
47+ wake_up_q(&wake_q);
48+
49+ return ret;
50+}
51+
52 s
首先是通过从 futex_wake() 中抽出一个 prepare_wake_q()
获得 nr_wake
个等待在 futex
的任务并填入到传入的唤醒队列 wake_q
中。
1+static int futex_swap(u32 __user *uaddr, unsigned int flags, u32 val,
2+ ktime_t *abs_time, u32 __user *uaddr2)
3+{
4+ u32 bitset = FUTEX_BITSET_MATCH_ANY;
5+ struct task_struct *next = NULL;
6+ DEFINE_WAKE_Q(wake_q);
7+ int ret;
8+
9+ ret = prepare_wake_q(uaddr2, flags, 1, bitset, &wake_q);
10+ if (!wake_q_empty(&wake_q)) {
11+ /* Pull the first wakee out of the queue to swap into. */
12+ next = container_of(wake_q.first, struct task_struct, wake_q);
13+ wake_q.first = wake_q.first->next;
14+ next->wake_q.next = NULL;
15+ /*
16+ * Note that wake_up_q does not touch wake_q.last, so we
17+ * do not bother with it here.
18+ */
19+ wake_up_q(&wake_q);
20+ }
21+ if (ret < 0)
22+ return ret;
23+
24+ return futex_wait(uaddr, flags, val, abs_time, bitset, next);
25+}
futex_swap()
流程:
获得等待在 uaddr2
上的预备唤醒队列,记录队列第一个任务为 next
,对其他任务则执行唤醒。
对 uaddr1
执行 futex_wait()
,传入 next
。
我们看看 futex_wait()
上发生了哪些更改:
1@@ -2600,9 +2614,12 @@ static int fixup_owner(u32 __user *uaddr, struct futex_q *q, int locked)
2 * @hb: the futex hash bucket, must be locked by the caller
3 * @q: the futex_q to queue up on
4 * @timeout: the prepared hrtimer_sleeper, or null for no timeout
5+ * @next: if present, wake next and hint to the scheduler that we'd
6+ * prefer to execute it locally.
7 */
8 static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
9- struct hrtimer_sleeper *timeout)
10+ struct hrtimer_sleeper *timeout,
11+ struct task_struct *next)
12 {
13 /*
14 * The task state is guaranteed to be set before another task can
15@@ -2627,10 +2644,27 @@ static void futex_wait_queue_me(struct futex_hash_bucket *hb, struct futex_q *q,
16 * flagged for rescheduling. Only call schedule if there
17 * is no timeout, or if it has yet to expire.
18 */
19- if (!timeout || timeout->task)
20+ if (!timeout || timeout->task) {
21+ if (next) {
22+ /*
23+ * wake_up_process() below will be replaced
24+ * in the next patch with
25+ * wake_up_process_prefer_current_cpu().
26+ */
27+ wake_up_process(next);
28+ put_task_struct(next);
29+ next = NULL;
30+ }
31 freezable_schedule();
32+ }
33 }
34 __set_current_state(TASK_RUNNING);
35+
36+ if (next) {
37+ /* Maybe call wake_up_process_prefer_current_cpu()? */
38+ wake_up_process(next);
39+ put_task_struct(next);
40+ }
41 }
42
43@@ -2710,7 +2744,7 @@ static int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags,
44 }
45
46 static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
47- ktime_t *abs_time, u32 bitset)
48+ ktime_t *abs_time, u32 bitset, struct task_struct *next)
49 {
50 struct hrtimer_sleeper timeout, *to;
51 struct restart_block *restart;
52@@ -2734,7 +2768,8 @@ static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
53 goto out;
54
55 /* queue_me and wait for wakeup, timeout, or a signal. */
56- futex_wait_queue_me(hb, &q, to);
57+ futex_wait_queue_me(hb, &q, to, next);
58+ next = NULL;
59
60 /* If we were woken (and unqueued), we succeeded, whatever. */
61 ret = 0;
62@@ -2767,6 +2802,10 @@ static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
63 ret = -ERESTART_RESTARTBLOCK;
64
65 out:
66+ if (next) {
67+ wake_up_process(next);
68+ put_task_struct(next);
69+ }
70 if (to) {
71 hrtimer_cancel(&to->timer);
72 destroy_hrtimer_on_stack(&to->timer);
73@@ -2774,7 +2813,6 @@ static int futex_wait(u32 __user *uaddr, unsigned int flags, u32 val,
74 return ret;
75 }
我们可以看到 futex_wait()
传入的 next
任务在两种情况下会被唤醒:
当前任务从对 uaddr
的等待中苏醒,接着在futex_wait
结束的时候执行 wake_up_process()
切换到 next
任务。
在 futex_wait_queue_me()
中等待超时(也代表着当前任务从对锁的等待中结束),执行 wake_up_process()
切换到 next
任务。
通过对 futex
的魔改, 我们仿佛有了在用户态使用 switch_to()
指定任务切换的能力,这真是让人感到兴奋!这就是用户模式线程的用途:极低的切换开销,意味着我们操作系统可以支持的数以千计的线程可以提高到 10 倍以上甚至百万级别!
不过很可惜,该补丁似乎被 Linux 内核社区遗弃。如今补丁作者 Peter Oskolkov
正在试图向 Linux 内核引入另外一套 Google 的用户态任务调度框架 Fiber
[7],来支持 Linux 世界中 c 系程序员对协作式任务切换的需求。
[1] https://lore.kernel.org/lkml/414e292195d720c780fab2781c749df3be6566aa.camel@posk.io/
[2] https://lore.kernel.org/lkml/48058b850de10f949f96b4f311adb649b1fb3ff2.camel@posk.io/
[3] https://lore.kernel.org/lkml/d5cf58486a6a5e41581bed9183e8a831908ede0b.camel@posk.io/
[4] https://lore.kernel.org/lkml/a06a25f1380e0da48946b1bb958e1745e5fac964.camel@posk.io/
[5] https://man7.org/linux/man-pages/man2/futex.2.html
[6] https://lwn.net/Articles/360699/
[7] https://lore.kernel.org/lkml/20210520183614.1227046-1-posk@google.com/
本文来源于 陈莉君老师 “Linux内核之旅”公众号。