使用 RCU 保护主要用于读取的链表¶
RCU 最常见的用途之一是保护主要用于读取的链表(list.h 中的 struct list_head
)。这种方法的一个主要优点是所有必需的内存顺序都由列表宏提供。本文档描述了几个基于列表的 RCU 用例。
当持有 rcu_read_lock()
迭代列表时,写入器可能会修改列表。读取器保证看到在获取 rcu_read_lock()
之前添加到列表中的所有元素,并且在释放 rcu_read_unlock()
时仍然在列表上。可能会看到或可能不会看到添加到列表或从列表中删除的元素。如果写入器调用 list_replace_rcu()
,则读取器可能会看到旧元素或新元素;他们不会同时看到两者,也不会两者都看不到。
示例 1:主要用于读取的列表:延迟销毁¶
内核中 RCU 列表的一个广泛使用的用例是在系统中无锁迭代所有进程。task_struct::tasks
表示链接所有进程的列表节点。列表可以与任何列表添加或删除并行遍历。
列表的遍历是使用 for_each_process()
完成的,它由 2 个宏定义
#define next_task(p) \
list_entry_rcu((p)->tasks.next, struct task_struct, tasks)
#define for_each_process(p) \
for (p = &init_task ; (p = next_task(p)) != &init_task ; )
遍历所有进程列表的代码通常如下所示
rcu_read_lock();
for_each_process(p) {
/* Do something with p */
}
rcu_read_unlock();
从任务列表中删除进程的简化和高度内联的代码是
void release_task(struct task_struct *p)
{
write_lock(&tasklist_lock);
list_del_rcu(&p->tasks);
write_unlock(&tasklist_lock);
call_rcu(&p->rcu, delayed_put_task_struct);
}
当进程退出时,release_task()
通过 tasklist_lock
写入器锁保护下的 __exit_signal() 和 __unhash_process() 调用 list_del_rcu(&p->tasks)
。list_del_rcu()
调用从所有任务的列表中删除该任务。tasklist_lock
阻止并发的列表添加/删除损坏列表。使用 for_each_process()
的读取器不受 tasklist_lock
的保护。为了防止读取器注意到列表指针中的更改,只有在一个或多个宽限期过去后才会释放 task_struct
对象,借助 call_rcu()
,它通过 put_task_struct_rcu_user() 调用。这种延迟销毁确保了遍历列表的任何读取器都将看到有效的 p->tasks.next
指针,并且删除/释放可以与列表的遍历并行发生。这种模式也称为**存在锁**,因为 RCU 在所有现有读取器完成后才调用 delayed_put_task_struct() 回调函数,这保证了有问题的 task_struct
对象将在所有可能具有对该对象引用的 RCU 读取器完成之后才存在。
示例 2:在锁外采取读取端操作:没有就地更新¶
一些读取器-写入器锁定用例在持有读取端锁的同时计算一个值,但在释放该锁后继续使用该值。这些用例通常是转换为 RCU 的好候选者。一个突出的例子涉及网络数据包路由。因为数据包路由数据跟踪计算机外部设备的状态,所以有时会包含过时的数据。因此,一旦计算出路由,就没有必要在数据包传输期间保持路由表静态。毕竟,你可以随意保持路由表静态,但这不会阻止外部 Internet 发生变化,而外部 Internet 的状态才是真正重要的。此外,路由条目通常是添加或删除,而不是就地修改。这是一个光速有限和原子非零大小的罕见例子,实际上有助于使同步更轻量级。
这种类型的 RCU 用例的一个简单示例可以在系统调用审计支持中找到。例如,audit_filter_task()
的读取器-写入器锁定实现可能如下所示
static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
struct audit_entry *e;
enum audit_state state;
read_lock(&auditsc_lock);
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
read_unlock(&auditsc_lock);
return state;
}
}
read_unlock(&auditsc_lock);
return AUDIT_BUILD_CONTEXT;
}
这里列表在锁下搜索,但在返回相应的值之前释放锁。在这个值被执行时,列表很可能已经被修改了。这是有道理的,因为如果你正在关闭审计,那么审计一些额外的系统调用是可以的。
这意味着 RCU 可以很容易地应用于读取端,如下所示
static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
struct audit_entry *e;
enum audit_state state;
rcu_read_lock();
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
rcu_read_unlock();
return state;
}
}
rcu_read_unlock();
return AUDIT_BUILD_CONTEXT;
}
read_lock() 和 read_unlock() 调用分别变为 rcu_read_lock()
和 rcu_read_unlock()
,并且 list_for_each_entry()
变为 list_for_each_entry_rcu()
。_rcu() 列表遍历原语添加 READ_ONCE() 和诊断检查,以防止在 RCU 读取端临界区之外的错误使用。
更新端的更改也很简单。读取器-写入器锁可以如下所示用于 audit_del_rule() 和 audit_add_rule() 的简化版本中的删除和插入
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
write_lock(&auditsc_lock);
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del(&e->list);
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT; /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
write_lock(&auditsc_lock);
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add(&entry->list, list);
} else {
list_add_tail(&entry->list, list);
}
write_unlock(&auditsc_lock);
return 0;
}
以下是这两个函数的 RCU 等效项
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
/* No need to use the _rcu iterator here, since this is the only
* deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
list_del_rcu(&e->list);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
static inline int audit_add_rule(struct audit_entry *entry,
struct list_head *list)
{
if (entry->rule.flags & AUDIT_PREPEND) {
entry->rule.flags &= ~AUDIT_PREPEND;
list_add_rcu(&entry->list, list);
} else {
list_add_tail_rcu(&entry->list, list);
}
return 0;
}
通常,write_lock() 和 write_unlock() 将被 spin_lock() 和 spin_unlock() 替换。但在这种情况下,所有调用者都持有 audit_filter_mutex
,因此不需要额外的锁定。因此可以消除 auditsc_lock,因为使用 RCU 消除了写入器排除读取器的需要。
list_del()
、list_add()
和 list_add_tail()
原语已被 list_del_rcu()
、list_add_rcu()
和 list_add_tail_rcu()
替换。_rcu() 列表操作原语添加了弱序 CPU 上所需的内存屏障。list_del_rcu()
原语省略了指针中毒调试辅助代码,否则会导致并发读取器失败。
因此,当读取器可以容忍过时的数据,并且条目被添加或删除,而没有就地修改时,使用 RCU 非常容易!
示例 3:处理就地更新¶
系统调用审计代码不会就地更新审计规则。但是,如果它这样做了,那么这样做的读取器-写入器锁定代码可能如下所示(假设只更新了 field_count
,否则,将需要填充添加的字段)
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry *e;
struct audit_entry *ne;
write_lock(&auditsc_lock);
/* Note: audit_filter_mutex held by caller. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
e->rule.action = newaction;
e->rule.field_count = newfield_count;
write_unlock(&auditsc_lock);
return 0;
}
}
write_unlock(&auditsc_lock);
return -EFAULT; /* No matching rule */
}
RCU 版本创建一个副本,更新副本,然后用新更新的条目替换旧条目。这个动作序列,允许并发读取,同时创建一个副本以执行更新,这就是 RCU (读取-复制更新) 名称的由来。
audit_upd_rule() 的 RCU 版本如下
static inline int audit_upd_rule(struct audit_rule *rule,
struct list_head *list,
__u32 newaction,
__u32 newfield_count)
{
struct audit_entry *e;
struct audit_entry *ne;
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
if (ne == NULL)
return -ENOMEM;
audit_copy_rule(&ne->rule, &e->rule);
ne->rule.action = newaction;
ne->rule.field_count = newfield_count;
list_replace_rcu(&e->list, &ne->list);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
同样,这假设调用者持有 audit_filter_mutex
。通常,在这种代码中,写入器锁会变成自旋锁。
update_lsm_rule() 做的事情非常相似,对于那些希望查看真正的 Linux 内核代码的人来说。
这种模式的另一个用途可以在 openswitch 驱动程序的 连接跟踪表 代码中的 ct_limit_set()
中找到。该表保存连接跟踪条目,并且对最大条目数有限制。每个区域都有一个这样的表,因此每个区域都有一个 限制。这些区域通过使用 RCU 管理的 hlist 的哈希表映射到它们的限制。当设置新的限制时,将分配一个新的限制对象,并调用 ct_limit_set()
以使用 list_replace_rcu()
替换旧的限制对象。然后使用 kfree_rcu()
在宽限期后释放旧的限制对象。
示例 4:消除过时的数据¶
上面的审计示例容忍过时的数据,就像大多数跟踪外部状态的算法一样。毕竟,考虑到从外部状态更改到 Linux 意识到更改之间存在延迟,因此如前所述,少量额外的 RCU 引起的过时通常不是问题。
但是,在许多示例中不能容忍过时的数据。Linux 内核中的一个例子是 System V IPC(参见 ipc/shm.c 中的 shm_lock() 函数)。此代码在每个条目的自旋锁下检查 deleted 标志,如果设置了 deleted 标志,则假装该条目不存在。为了使其有用,搜索函数必须返回持有每个条目的自旋锁,正如 shm_lock() 实际上所做的那样。
- 快速测验
为了使 deleted 标志技术有用,为什么有必要在从搜索函数返回时持有每个条目的锁?
如果系统调用审计模块需要拒绝过时的数据,一种方法是在 audit_entry
结构中添加一个 deleted
标志和一个 lock
自旋锁,并按如下方式修改 audit_filter_task()
static struct audit_entry *audit_filter_task(struct task_struct *tsk, char **key)
{
struct audit_entry *e;
enum audit_state state;
rcu_read_lock();
list_for_each_entry_rcu(e, &audit_tsklist, list) {
if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
spin_lock(&e->lock);
if (e->deleted) {
spin_unlock(&e->lock);
rcu_read_unlock();
return NULL;
}
rcu_read_unlock();
if (state == AUDIT_STATE_RECORD)
*key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
/* As long as e->lock is held, e is valid and
* its value is not stale */
return e;
}
}
rcu_read_unlock();
return NULL;
}
audit_del_rule()
函数需要在自旋锁下设置 deleted
标志,如下所示
static inline int audit_del_rule(struct audit_rule *rule,
struct list_head *list)
{
struct audit_entry *e;
/* No need to use the _rcu iterator here, since this
* is the only deletion routine. */
list_for_each_entry(e, list, list) {
if (!audit_compare_rule(rule, &e->rule)) {
spin_lock(&e->lock);
list_del_rcu(&e->list);
e->deleted = 1;
spin_unlock(&e->lock);
call_rcu(&e->rcu, audit_free_rule);
return 0;
}
}
return -EFAULT; /* No matching rule */
}
这也假设调用者持有 audit_filter_mutex
。
请注意,此示例假设条目仅添加和删除。需要额外的机制来正确处理 audit_upd_rule() 执行的就地更新。例如,audit_upd_rule() 需要在执行 list_replace_rcu()
时持有旧的 audit_entry
及其替换项的锁。
示例 5:跳过过时的对象¶
对于某些用例,可以通过在读取端列表遍历期间跳过过时的对象来提高读取器性能,其中过时的对象是指将在一个或多个宽限期后删除和销毁的对象。一个这样的例子可以在 timerfd 子系统中找到。当重新编程 CLOCK_REALTIME
时钟(例如,由于设置系统时间)时,所有已编程的依赖于该时钟的 timerfds
都会被触发,并且等待它们的进程会在其计划到期之前被唤醒。为了方便这一点,当在 timerfd_setup_cancel()
中设置时,所有这些计时器都会被添加到 RCU 管理的 cancel_list
中
static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
{
spin_lock(&ctx->cancel_lock);
if ((ctx->clockid == CLOCK_REALTIME ||
ctx->clockid == CLOCK_REALTIME_ALARM) &&
(flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
if (!ctx->might_cancel) {
ctx->might_cancel = true;
spin_lock(&cancel_lock);
list_add_rcu(&ctx->clist, &cancel_list);
spin_unlock(&cancel_lock);
}
} else {
__timerfd_remove_cancel(ctx);
}
spin_unlock(&ctx->cancel_lock);
}
当释放 timerfd(关闭 fd)时,timerfd 对象的 might_cancel
标志会被清除,对象从 cancel_list
中删除并销毁,如 timerfd_release() 的这个简化和内联版本所示
int timerfd_release(struct inode *inode, struct file *file)
{
struct timerfd_ctx *ctx = file->private_data;
spin_lock(&ctx->cancel_lock);
if (ctx->might_cancel) {
ctx->might_cancel = false;
spin_lock(&cancel_lock);
list_del_rcu(&ctx->clist);
spin_unlock(&cancel_lock);
}
spin_unlock(&ctx->cancel_lock);
if (isalarm(ctx))
alarm_cancel(&ctx->t.alarm);
else
hrtimer_cancel(&ctx->t.tmr);
kfree_rcu(ctx, rcu);
return 0;
}
如果设置了 CLOCK_REALTIME
时钟,例如通过时间服务器,则 hrtimer 框架会调用 timerfd_clock_was_set()
,它会遍历 cancel_list
并唤醒等待 timerfd 的进程。在迭代 cancel_list
时,会查询 might_cancel
标志以跳过过时的对象
void timerfd_clock_was_set(void)
{
ktime_t moffs = ktime_mono_to_real(0);
struct timerfd_ctx *ctx;
unsigned long flags;
rcu_read_lock();
list_for_each_entry_rcu(ctx, &cancel_list, clist) {
if (!ctx->might_cancel)
continue;
spin_lock_irqsave(&ctx->wqh.lock, flags);
if (ctx->moffs != moffs) {
ctx->moffs = KTIME_MAX;
ctx->ticks++;
wake_up_locked_poll(&ctx->wqh, EPOLLIN);
}
spin_unlock_irqrestore(&ctx->wqh.lock, flags);
}
rcu_read_unlock();
}
关键点是,因为 RCU 保护的 cancel_list
遍历与对象添加和删除同时发生,所以有时遍历可以访问已从列表中删除的对象。在这个例子中,使用一个标志来跳过这些对象。
总结¶
主要用于读取的基于列表的数据结构,可以容忍过时的数据最适合使用 RCU。最简单的情况是从数据结构中添加或删除条目(或原子地就地修改),但是非原子就地修改可以通过制作副本,更新副本,然后用副本替换原始副本来处理。如果不能容忍过时的数据,那么可以使用 deleted 标志与每个条目的自旋锁结合使用,以便允许搜索函数拒绝新删除的数据。
- 快速测验的答案
为了使 deleted 标志技术有用,为什么有必要在从搜索函数返回时持有每个条目的锁?
如果搜索函数在返回之前释放每个条目的锁,那么无论如何,调用者都将处理过时的数据。如果处理过时的数据确实可以,那么你不需要 deleted 标志。如果处理过时的数据确实是一个问题,那么你需要跨越所有使用返回值的代码持有每个条目的锁。