使用 RCU 保护以读取为主的链表¶
RCU 最常见的用途之一是保护以读取为主的链表(list.h 中的 struct list_head
)。这种方法的一个巨大优势是,所有必需的内存排序都由列表宏提供。本文档描述了几个基于列表的 RCU 用例。
当持有 rcu_read_lock()
时迭代列表时,写入者可以修改列表。读者保证看到所有在他们获取 rcu_read_lock()
之前添加到列表中的元素,并且在他们释放 rcu_read_unlock()
时仍然在列表上。添加到列表或从列表中删除的元素可能会或可能不会被看到。如果写入者调用 list_replace_rcu()
,读者可能会看到旧元素或新元素;他们不会同时看到两者,也不会两者都看不到。
示例 1:以读取为主的列表:延迟销毁¶
内核中 RCU 列表的一个广泛使用的用例是在系统中无锁地迭代所有进程。task_struct::tasks
表示链接所有进程的列表节点。可以并行遍历列表,而无需考虑任何列表的添加或删除。
列表的遍历是使用 for_each_process()
完成的,该宏由以下 2 个宏定义
#define next_task(p) \
list_entry_rcu((p)->tasks.next, struct task_struct, tasks)
#define for_each_process(p) \
for (p = &init_task ; (p = next_task(p)) != &init_task ; )
遍历所有进程列表的代码通常如下所示
rcu_read_lock();
for_each_process(p) {
/* Do something with p */
}
rcu_read_unlock();
从任务列表中删除进程的简化和高度内联的代码是
void release_task(struct task_struct *p)
{
write_lock(&tasklist_lock);
list_del_rcu(&p->tasks);
write_unlock(&tasklist_lock);
call_rcu(&p->rcu, delayed_put_task_struct);
}
当进程退出时,release_task()
通过 __exit_signal()
和 __unhash_process()
在 tasklist_lock
写入器锁保护下调用 list_del_rcu(&p->tasks)
。list_del_rcu()
调用从所有任务列表中删除该任务。tasklist_lock
防止并发的列表添加/删除损坏列表。使用 for_each_process()
的读取器不受 tasklist_lock
的保护。为了防止读取器注意到列表指针中的更改,task_struct
对象仅在经过一个或多个宽限期后才会被释放,这要借助 call_rcu()
,该函数通过 put_task_struct_rcu_user() 调用。这种延迟销毁确保任何遍历列表的读取器都将看到有效的 p->tasks.next
指针,并且删除/释放可以与列表的遍历并行发生。这种模式也称为**存在锁**,因为 RCU 会阻止调用 delayed_put_task_struct() 回调函数,直到所有现有的读取器完成,这保证了所讨论的 task_struct
对象将一直存在,直到所有可能对该对象具有引用的 RCU 读取器完成之后。
示例 2:在锁外执行的读取端操作:无就地更新¶
一些读写锁用例会在持有读取端锁的同时计算一个值,但在释放该锁后会继续使用该值。这些用例通常是转换为 RCU 的好选择。一个突出的例子是网络数据包路由。由于数据包路由数据跟踪计算机外部的设备状态,因此有时会包含过时的数据。因此,一旦计算出路由,就不需要在数据包传输期间保持路由表静态。毕竟,您可以随意保持路由表静态,但这不会阻止外部 Internet 发生变化,而外部 Internet 的状态才是真正重要的。此外,路由条目通常是添加或删除,而不是就地修改。这是一个罕见的例子,光速的有限性和原子的非零大小实际上有助于减轻同步的重量。
在系统调用审计支持中可以找到这种 RCU 用例的直接示例。例如,audit_filter_task()
的读写锁实现可能如下所示
static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
struct audit_entry *e;
enum audit_state state;
read_lock(&auditsc_lock);
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
read_unlock(&auditsc_lock);
return state;
}
}
read_unlock(&auditsc_lock);
return AUDIT_BUILD_CONTEXT;
}
在这里,列表在锁下被搜索,但在返回相应值之前会释放锁。在处理此值时,列表很可能已被修改。这是有道理的,因为如果您正在关闭审计,则审计一些额外的系统调用是可以的。
这意味着 RCU 可以很容易地应用于读取端,如下所示
static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
struct audit_entry *e;
enum audit_state state;
rcu_read_lock();
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
rcu_read_unlock();
return state;
}
}
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
read_lock() 和 read_unlock() 调用分别已变为 rcu_read_lock()
和 rcu_read_unlock()
,list_for_each_entry()
已变为 list_for_each_entry_rcu()
。**_rcu()** 列表遍历原语为在 RCU 读取端临界区之外的错误使用添加了 READ_ONCE() 和诊断检查。
更新端的更改也很简单。读写锁可以如下用于 audit_del_rule() 和 audit_add_rule() 的这些简化版本中的删除和插入
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
write_lock(&auditsc_lock);
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del(&e->list);
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT; /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
write_lock(&auditsc_lock);
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add(&entry->list, list);
} else {
list_add_tail(&entry->list, list);
}
write_unlock(&auditsc_lock);
return 0;
}
以下是这两个函数的 RCU 等效项
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
/* No need to use the _rcu iterator here, since this is the only
* deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del_rcu(&e->list);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add_rcu(&entry->list, list);
} else {
list_add_tail_rcu(&entry->list, list);
}
return 0;
}
通常,write_lock() 和 write_unlock() 将被 spin_lock() 和 spin_unlock() 替换。但在这种情况下,所有调用方都持有 audit_filter_mutex
,因此不需要额外的锁定。因此,可以消除 auditsc_lock,因为使用 RCU 消除了写入者排除读取者的需要。
list_del()
、list_add()
和 list_add_tail()
原语已替换为 list_del_rcu()
、list_add_rcu()
和 list_add_tail_rcu()
。**_rcu()** 列表操作原语添加了在弱排序 CPU 上所需的内存屏障。list_del_rcu()
原语省略了指针中毒调试辅助代码,否则会导致并发读取器失败。
因此,当读取器可以容忍过时的数据,并且当条目被添加或删除,而没有就地修改时,使用 RCU 非常容易!
示例 3:处理就地更新¶
系统调用审计代码不会就地更新审计规则。但是,如果这样做,则执行此操作的读写锁代码可能如下所示(假设仅更新 field_count
,否则,需要填充添加的字段)
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry *e;
struct audit_entry *ne;
write_lock(&auditsc_lock);
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
e->rule.action = newaction;
e->rule.field_count = newfield_count;
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT; /* No matching rule */
}
RCU 版本会创建一个副本,更新该副本,然后用新更新的条目替换旧条目。这种操作序列,允许并发读取,同时创建副本进行更新,正是 RCU(读取-复制更新)得名的原因。
audit_upd_rule()
的 RCU 版本如下:
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry *e;
struct audit_entry *ne;
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
if (ne == NULL)
return -ENOMEM;
audit_copy_rule(&ne->rule, &e->rule);
ne->rule.action = newaction;
ne->rule.field_count = newfield_count;
list_replace_rcu(&e->list, &ne->list);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
同样,这里假设调用者持有 audit_filter_mutex
。通常,在这种类型的代码中,写入锁会变成自旋锁。
对于那些喜欢查看真实 Linux 内核代码的人来说,update_lsm_rule()
的做法非常相似。
这种模式的另一个应用可以在 openswitch 驱动程序的连接跟踪表代码中的 ct_limit_set()
中找到。该表保存连接跟踪条目,并限制最大条目数。每个区域有一个这样的表,因此每个区域有一个限制。区域通过使用 RCU 管理的哈希链的哈希表映射到它们的限制。当设置新的限制时,会分配一个新的限制对象,并调用 ct_limit_set()
以使用 list_replace_rcu()
将旧的限制对象替换为新的限制对象。然后使用 kfree_rcu()
在宽限期后释放旧的限制对象。
示例 4:消除过时数据¶
上面的审计示例可以容忍过时的数据,就像大多数跟踪外部状态的算法一样。毕竟,考虑到从外部状态更改到 Linux 感知到更改之间存在延迟,因此如前所述,少量额外的 RCU 引起的过时通常不是问题。
但是,在许多情况下,不能容忍过时的数据。Linux 内核中的一个例子是 System V IPC(请参阅 ipc/shm.c 中的 shm_lock() 函数)。此代码在每个条目的自旋锁下检查一个deleted标志,如果设置了deleted标志,则假装该条目不存在。为了使其有用,搜索函数必须返回并持有每个条目的自旋锁,正如 shm_lock() 实际所做的那样。
- 快速测验
为了使 deleted 标志技术有用,为什么必须在从搜索函数返回时持有每个条目的锁?
如果系统调用审计模块需要拒绝过时的数据,一种实现方法是在 audit_entry
结构中添加一个 deleted
标志和一个 lock
自旋锁,并按如下方式修改 audit_filter_task()
static enum audit_state audit_filter_task(struct task_struct *tsk)
{
struct audit_entry *e;
enum audit_state state;
rcu_read_lock();
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
spin_lock(&e->lock);
if (e->deleted) {
spin_unlock(&e->lock);
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
rcu_read_unlock();
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
return state;
}
}
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
audit_del_rule()
函数需要在自旋锁下设置 deleted
标志,如下所示:
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
/* No need to use the _rcu iterator here, since this
* is the only deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
spin_lock(&e->lock);
list_del_rcu(&e->list);
e->deleted = 1;
spin_unlock(&e->lock);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
这也假设调用者持有 audit_filter_mutex
。
请注意,此示例假设仅添加和删除条目。需要额外的机制来正确处理 audit_upd_rule()
执行的就地更新。首先,audit_upd_rule()
在执行 list_replace_rcu()
时,需要同时持有旧的 audit_entry
和其替换条目的锁。
示例 5:跳过过时对象¶
对于某些用例,可以通过在读取侧列表遍历期间跳过过时对象来提高读取器性能,其中过时对象是指在经过一个或多个宽限期后将被删除和销毁的对象。一个这样的例子可以在 timerfd 子系统中找到。当重新编程 CLOCK_REALTIME
时钟(例如,由于设置了系统时间),则所有依赖于此時钟的已编程 timerfds
都会被触发,并且等待它们的进程会在其计划过期之前被唤醒。为了方便起见,所有此类计时器在 timerfd_setup_cancel()
中设置时都会添加到 RCU 管理的 cancel_list
中。
static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
{
spin_lock(&ctx->cancel_lock);
if ((ctx->clockid == CLOCK_REALTIME ||
ctx->clockid == CLOCK_REALTIME_ALARM) &&
(flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
if (!ctx->might_cancel) {
ctx->might_cancel = true;
spin_lock(&cancel_lock);
list_add_rcu(&ctx->clist, &cancel_list);
spin_unlock(&cancel_lock);
}
} else {
__timerfd_remove_cancel(ctx);
}
spin_unlock(&ctx->cancel_lock);
}
当释放 timerfd(关闭 fd)时,会清除 timerfd 对象的 might_cancel
标志,该对象会从 cancel_list
中移除并销毁,如下面的 timerfd_release() 的简化和内联版本所示:
int timerfd_release(struct inode *inode, struct file *file)
{
struct timerfd_ctx *ctx = file->private_data;
spin_lock(&ctx->cancel_lock);
if (ctx->might_cancel) {
ctx->might_cancel = false;
spin_lock(&cancel_lock);
list_del_rcu(&ctx->clist);
spin_unlock(&cancel_lock);
}
spin_unlock(&ctx->cancel_lock);
if (isalarm(ctx))
alarm_cancel(&ctx->t.alarm);
else
hrtimer_cancel(&ctx->t.tmr);
kfree_rcu(ctx, rcu);
return 0;
}
如果设置了 CLOCK_REALTIME
时钟(例如由时间服务器设置),则 hrtimer 框架会调用 timerfd_clock_was_set()
,它会遍历 cancel_list
并唤醒等待 timerfd 的进程。在迭代 cancel_list
时,会查询 might_cancel
标志以跳过过时对象。
void timerfd_clock_was_set(void)
{
ktime_t moffs = ktime_mono_to_real(0);
struct timerfd_ctx *ctx;
unsigned long flags;
rcu_read_lock();
list_for_each_entry_rcu(ctx, &cancel_list, clist) {
if (!ctx->might_cancel)
continue;
spin_lock_irqsave(&ctx->wqh.lock, flags);
if (ctx->moffs != moffs) {
ctx->moffs = KTIME_MAX;
ctx->ticks++;
wake_up_locked_poll(&ctx->wqh, EPOLLIN);
}
spin_unlock_irqrestore(&ctx->wqh.lock, flags);
}
rcu_read_unlock();
}
关键点在于,由于 RCU 保护的 cancel_list
遍历与对象的添加和删除同时发生,因此有时遍历会访问已从列表中删除的对象。在此示例中,使用一个标志来跳过此类对象。
总结¶
主要用于读取且可以容忍过时数据的基于列表的数据结构最适合使用 RCU。最简单的情况是条目要么添加到数据结构中,要么从数据结构中删除(或原子地就地修改),但是非原子就地修改可以通过创建副本、更新副本,然后用副本替换原始数据来处理。如果不能容忍过时的数据,则可以结合使用 deleted 标志和每个条目的自旋锁,以便允许搜索函数拒绝新删除的数据。
- 快速测验的答案
为了使 deleted 标志技术有用,为什么必须在从搜索函数返回时持有每个条目的锁?
如果搜索函数在返回之前释放每个条目的锁,则调用者无论如何都会处理过时的数据。如果处理过时的数据确实可以,那么您不需要 deleted 标志。如果处理过时的数据确实是个问题,那么您需要在使用返回值的整个代码中持有每个条目的锁。