使用 RCU 保护主要用于读取的链表

RCU 最常见的用途之一是保护主要用于读取的链表(list.h 中的 struct list_head)。这种方法的一个主要优点是所有必需的内存顺序都由列表宏提供。本文档描述了几个基于列表的 RCU 用例。

当持有 rcu_read_lock() 迭代列表时,写入器可能会修改列表。读取器保证看到在获取 rcu_read_lock() 之前添加到列表中的所有元素,并且在释放 rcu_read_unlock() 时仍然在列表上。可能会看到或可能不会看到添加到列表或从列表中删除的元素。如果写入器调用 list_replace_rcu(),则读取器可能会看到旧元素或新元素;他们不会同时看到两者,也不会两者都看不到。

示例 1:主要用于读取的列表:延迟销毁

内核中 RCU 列表的一个广泛使用的用例是在系统中无锁迭代所有进程。task_struct::tasks 表示链接所有进程的列表节点。列表可以与任何列表添加或删除并行遍历。

列表的遍历是使用 for_each_process() 完成的,它由 2 个宏定义

#define next_task(p) \
        list_entry_rcu((p)->tasks.next, struct task_struct, tasks)

#define for_each_process(p) \
        for (p = &init_task ; (p = next_task(p)) != &init_task ; )

遍历所有进程列表的代码通常如下所示

rcu_read_lock();
for_each_process(p) {
        /* Do something with p */
}
rcu_read_unlock();

从任务列表中删除进程的简化和高度内联的代码是

void release_task(struct task_struct *p)
{
        write_lock(&tasklist_lock);
        list_del_rcu(&p->tasks);
        write_unlock(&tasklist_lock);
        call_rcu(&p->rcu, delayed_put_task_struct);
}

当进程退出时,release_task() 通过 tasklist_lock 写入器锁保护下的 __exit_signal() 和 __unhash_process() 调用 list_del_rcu(&p->tasks)list_del_rcu() 调用从所有任务的列表中删除该任务。tasklist_lock 阻止并发的列表添加/删除损坏列表。使用 for_each_process() 的读取器不受 tasklist_lock 的保护。为了防止读取器注意到列表指针中的更改,只有在一个或多个宽限期过去后才会释放 task_struct 对象,借助 call_rcu(),它通过 put_task_struct_rcu_user() 调用。这种延迟销毁确保了遍历列表的任何读取器都将看到有效的 p->tasks.next 指针,并且删除/释放可以与列表的遍历并行发生。这种模式也称为**存在锁**,因为 RCU 在所有现有读取器完成后才调用 delayed_put_task_struct() 回调函数,这保证了有问题的 task_struct 对象将在所有可能具有对该对象引用的 RCU 读取器完成之后才存在。

示例 2:在锁外采取读取端操作:没有就地更新

一些读取器-写入器锁定用例在持有读取端锁的同时计算一个值,但在释放该锁后继续使用该值。这些用例通常是转换为 RCU 的好候选者。一个突出的例子涉及网络数据包路由。因为数据包路由数据跟踪计算机外部设备的状态,所以有时会包含过时的数据。因此,一旦计算出路由,就没有必要在数据包传输期间保持路由表静态。毕竟,你可以随意保持路由表静态,但这不会阻止外部 Internet 发生变化,而外部 Internet 的状态才是真正重要的。此外,路由条目通常是添加或删除,而不是就地修改。这是一个光速有限和原子非零大小的罕见例子,实际上有助于使同步更轻量级。

这种类型的 RCU 用例的一个简单示例可以在系统调用审计支持中找到。例如,audit_filter_task() 的读取器-写入器锁定实现可能如下所示

static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
        struct audit_entry *e;
        enum audit_state   state;

        read_lock(&auditsc_lock);
        /* Note: audit_filter_mutex held by caller. */
        list_for_each_entry(e, &audit_tsklist, list) {
                if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                        if (state == AUDIT_STATE_RECORD)
                                *key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
                        read_unlock(&auditsc_lock);
                        return state;
                }
        }
        read_unlock(&auditsc_lock);
        return AUDIT_BUILD_CONTEXT;
}

这里列表在锁下搜索,但在返回相应的值之前释放锁。在这个值被执行时,列表很可能已经被修改了。这是有道理的,因为如果你正在关闭审计,那么审计一些额外的系统调用是可以的。

这意味着 RCU 可以很容易地应用于读取端,如下所示

static enum audit_state audit_filter_task(struct task_struct *tsk, char **key)
{
        struct audit_entry *e;
        enum audit_state   state;

        rcu_read_lock();
        /* Note: audit_filter_mutex held by caller. */
        list_for_each_entry_rcu(e, &audit_tsklist, list) {
                if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                        if (state == AUDIT_STATE_RECORD)
                                *key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
                        rcu_read_unlock();
                        return state;
                }
        }
        rcu_read_unlock();
        return AUDIT_BUILD_CONTEXT;
}

read_lock() 和 read_unlock() 调用分别变为 rcu_read_lock()rcu_read_unlock(),并且 list_for_each_entry() 变为 list_for_each_entry_rcu()_rcu() 列表遍历原语添加 READ_ONCE() 和诊断检查,以防止在 RCU 读取端临界区之外的错误使用。

更新端的更改也很简单。读取器-写入器锁可以如下所示用于 audit_del_rule() 和 audit_add_rule() 的简化版本中的删除和插入

static inline int audit_del_rule(struct audit_rule *rule,
                                 struct list_head *list)
{
        struct audit_entry *e;

        write_lock(&auditsc_lock);
        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        list_del(&e->list);
                        write_unlock(&auditsc_lock);
                        return 0;
                }
        }
        write_unlock(&auditsc_lock);
        return -EFAULT;         /* No matching rule */
}

static inline int audit_add_rule(struct audit_entry *entry,
                                 struct list_head *list)
{
        write_lock(&auditsc_lock);
        if (entry->rule.flags & AUDIT_PREPEND) {
                entry->rule.flags &= ~AUDIT_PREPEND;
                list_add(&entry->list, list);
        } else {
                list_add_tail(&entry->list, list);
        }
        write_unlock(&auditsc_lock);
        return 0;
}

以下是这两个函数的 RCU 等效项

static inline int audit_del_rule(struct audit_rule *rule,
                                 struct list_head *list)
{
        struct audit_entry *e;

        /* No need to use the _rcu iterator here, since this is the only
         * deletion routine. */
        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        list_del_rcu(&e->list);
                        call_rcu(&e->rcu, audit_free_rule);
                        return 0;
                }
        }
        return -EFAULT;         /* No matching rule */
}

static inline int audit_add_rule(struct audit_entry *entry,
                                 struct list_head *list)
{
        if (entry->rule.flags & AUDIT_PREPEND) {
                entry->rule.flags &= ~AUDIT_PREPEND;
                list_add_rcu(&entry->list, list);
        } else {
                list_add_tail_rcu(&entry->list, list);
        }
        return 0;
}

通常,write_lock() 和 write_unlock() 将被 spin_lock() 和 spin_unlock() 替换。但在这种情况下,所有调用者都持有 audit_filter_mutex,因此不需要额外的锁定。因此可以消除 auditsc_lock,因为使用 RCU 消除了写入器排除读取器的需要。

list_del()list_add()list_add_tail() 原语已被 list_del_rcu()list_add_rcu()list_add_tail_rcu() 替换。_rcu() 列表操作原语添加了弱序 CPU 上所需的内存屏障。list_del_rcu() 原语省略了指针中毒调试辅助代码,否则会导致并发读取器失败。

因此,当读取器可以容忍过时的数据,并且条目被添加或删除,而没有就地修改时,使用 RCU 非常容易!

示例 3:处理就地更新

系统调用审计代码不会就地更新审计规则。但是,如果它这样做了,那么这样做的读取器-写入器锁定代码可能如下所示(假设只更新了 field_count,否则,将需要填充添加的字段)

static inline int audit_upd_rule(struct audit_rule *rule,
                                 struct list_head *list,
                                 __u32 newaction,
                                 __u32 newfield_count)
{
        struct audit_entry *e;
        struct audit_entry *ne;

        write_lock(&auditsc_lock);
        /* Note: audit_filter_mutex held by caller. */
        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        e->rule.action = newaction;
                        e->rule.field_count = newfield_count;
                        write_unlock(&auditsc_lock);
                        return 0;
                }
        }
        write_unlock(&auditsc_lock);
        return -EFAULT;         /* No matching rule */
}

RCU 版本创建一个副本,更新副本,然后用新更新的条目替换旧条目。这个动作序列,允许并发读取,同时创建一个副本以执行更新,这就是 RCU (读取-复制更新) 名称的由来。

audit_upd_rule() 的 RCU 版本如下

static inline int audit_upd_rule(struct audit_rule *rule,
                                 struct list_head *list,
                                 __u32 newaction,
                                 __u32 newfield_count)
{
        struct audit_entry *e;
        struct audit_entry *ne;

        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        ne = kmalloc(sizeof(*entry), GFP_ATOMIC);
                        if (ne == NULL)
                                return -ENOMEM;
                        audit_copy_rule(&ne->rule, &e->rule);
                        ne->rule.action = newaction;
                        ne->rule.field_count = newfield_count;
                        list_replace_rcu(&e->list, &ne->list);
                        call_rcu(&e->rcu, audit_free_rule);
                        return 0;
                }
        }
        return -EFAULT;         /* No matching rule */
}

同样,这假设调用者持有 audit_filter_mutex。通常,在这种代码中,写入器锁会变成自旋锁。

update_lsm_rule() 做的事情非常相似,对于那些希望查看真正的 Linux 内核代码的人来说。

这种模式的另一个用途可以在 openswitch 驱动程序的 连接跟踪表 代码中的 ct_limit_set() 中找到。该表保存连接跟踪条目,并且对最大条目数有限制。每个区域都有一个这样的表,因此每个区域都有一个 限制。这些区域通过使用 RCU 管理的 hlist 的哈希表映射到它们的限制。当设置新的限制时,将分配一个新的限制对象,并调用 ct_limit_set() 以使用 list_replace_rcu() 替换旧的限制对象。然后使用 kfree_rcu() 在宽限期后释放旧的限制对象。

示例 4:消除过时的数据

上面的审计示例容忍过时的数据,就像大多数跟踪外部状态的算法一样。毕竟,考虑到从外部状态更改到 Linux 意识到更改之间存在延迟,因此如前所述,少量额外的 RCU 引起的过时通常不是问题。

但是,在许多示例中不能容忍过时的数据。Linux 内核中的一个例子是 System V IPC(参见 ipc/shm.c 中的 shm_lock() 函数)。此代码在每个条目的自旋锁下检查 deleted 标志,如果设置了 deleted 标志,则假装该条目不存在。为了使其有用,搜索函数必须返回持有每个条目的自旋锁,正如 shm_lock() 实际上所做的那样。

快速测验

为了使 deleted 标志技术有用,为什么有必要在从搜索函数返回时持有每个条目的锁?

快速测验的答案

如果系统调用审计模块需要拒绝过时的数据,一种方法是在 audit_entry 结构中添加一个 deleted 标志和一个 lock 自旋锁,并按如下方式修改 audit_filter_task()

static struct audit_entry *audit_filter_task(struct task_struct *tsk, char **key)
{
        struct audit_entry *e;
        enum audit_state   state;

        rcu_read_lock();
        list_for_each_entry_rcu(e, &audit_tsklist, list) {
                if (audit_filter_rules(tsk, &e->rule, NULL, &state)) {
                        spin_lock(&e->lock);
                        if (e->deleted) {
                                spin_unlock(&e->lock);
                                rcu_read_unlock();
                                return NULL;
                        }
                        rcu_read_unlock();
                        if (state == AUDIT_STATE_RECORD)
                                *key = kstrdup(e->rule.filterkey, GFP_ATOMIC);
                        /* As long as e->lock is held, e is valid and
                         * its value is not stale */
                        return e;
                }
        }
        rcu_read_unlock();
        return NULL;
}

audit_del_rule() 函数需要在自旋锁下设置 deleted 标志,如下所示

static inline int audit_del_rule(struct audit_rule *rule,
                                 struct list_head *list)
{
        struct audit_entry *e;

        /* No need to use the _rcu iterator here, since this
         * is the only deletion routine. */
        list_for_each_entry(e, list, list) {
                if (!audit_compare_rule(rule, &e->rule)) {
                        spin_lock(&e->lock);
                        list_del_rcu(&e->list);
                        e->deleted = 1;
                        spin_unlock(&e->lock);
                        call_rcu(&e->rcu, audit_free_rule);
                        return 0;
                }
        }
        return -EFAULT;         /* No matching rule */
}

这也假设调用者持有 audit_filter_mutex

请注意,此示例假设条目仅添加和删除。需要额外的机制来正确处理 audit_upd_rule() 执行的就地更新。例如,audit_upd_rule() 需要在执行 list_replace_rcu() 时持有旧的 audit_entry 及其替换项的锁。

示例 5:跳过过时的对象

对于某些用例,可以通过在读取端列表遍历期间跳过过时的对象来提高读取器性能,其中过时的对象是指将在一个或多个宽限期后删除和销毁的对象。一个这样的例子可以在 timerfd 子系统中找到。当重新编程 CLOCK_REALTIME 时钟(例如,由于设置系统时间)时,所有已编程的依赖于该时钟的 timerfds 都会被触发,并且等待它们的进程会在其计划到期之前被唤醒。为了方便这一点,当在 timerfd_setup_cancel() 中设置时,所有这些计时器都会被添加到 RCU 管理的 cancel_list

static void timerfd_setup_cancel(struct timerfd_ctx *ctx, int flags)
{
        spin_lock(&ctx->cancel_lock);
        if ((ctx->clockid == CLOCK_REALTIME ||
             ctx->clockid == CLOCK_REALTIME_ALARM) &&
            (flags & TFD_TIMER_ABSTIME) && (flags & TFD_TIMER_CANCEL_ON_SET)) {
                if (!ctx->might_cancel) {
                        ctx->might_cancel = true;
                        spin_lock(&cancel_lock);
                        list_add_rcu(&ctx->clist, &cancel_list);
                        spin_unlock(&cancel_lock);
                }
        } else {
                __timerfd_remove_cancel(ctx);
        }
        spin_unlock(&ctx->cancel_lock);
}

当释放 timerfd(关闭 fd)时,timerfd 对象的 might_cancel 标志会被清除,对象从 cancel_list 中删除并销毁,如 timerfd_release() 的这个简化和内联版本所示

int timerfd_release(struct inode *inode, struct file *file)
{
        struct timerfd_ctx *ctx = file->private_data;

        spin_lock(&ctx->cancel_lock);
        if (ctx->might_cancel) {
                ctx->might_cancel = false;
                spin_lock(&cancel_lock);
                list_del_rcu(&ctx->clist);
                spin_unlock(&cancel_lock);
        }
        spin_unlock(&ctx->cancel_lock);

        if (isalarm(ctx))
                alarm_cancel(&ctx->t.alarm);
        else
                hrtimer_cancel(&ctx->t.tmr);
        kfree_rcu(ctx, rcu);
        return 0;
}

如果设置了 CLOCK_REALTIME 时钟,例如通过时间服务器,则 hrtimer 框架会调用 timerfd_clock_was_set(),它会遍历 cancel_list 并唤醒等待 timerfd 的进程。在迭代 cancel_list 时,会查询 might_cancel 标志以跳过过时的对象

void timerfd_clock_was_set(void)
{
        ktime_t moffs = ktime_mono_to_real(0);
        struct timerfd_ctx *ctx;
        unsigned long flags;

        rcu_read_lock();
        list_for_each_entry_rcu(ctx, &cancel_list, clist) {
                if (!ctx->might_cancel)
                        continue;
                spin_lock_irqsave(&ctx->wqh.lock, flags);
                if (ctx->moffs != moffs) {
                        ctx->moffs = KTIME_MAX;
                        ctx->ticks++;
                        wake_up_locked_poll(&ctx->wqh, EPOLLIN);
                }
                spin_unlock_irqrestore(&ctx->wqh.lock, flags);
        }
        rcu_read_unlock();
}

关键点是,因为 RCU 保护的 cancel_list 遍历与对象添加和删除同时发生,所以有时遍历可以访问已从列表中删除的对象。在这个例子中,使用一个标志来跳过这些对象。

总结

主要用于读取的基于列表的数据结构,可以容忍过时的数据最适合使用 RCU。最简单的情况是从数据结构中添加或删除条目(或原子地就地修改),但是非原子就地修改可以通过制作副本,更新副本,然后用副本替换原始副本来处理。如果不能容忍过时的数据,那么可以使用 deleted 标志与每个条目的自旋锁结合使用,以便允许搜索函数拒绝新删除的数据。

快速测验的答案

为了使 deleted 标志技术有用,为什么有必要在从搜索函数返回时持有每个条目的锁?

如果搜索函数在返回之前释放每个条目的锁,那么无论如何,调用者都将处理过时的数据。如果处理过时的数据确实可以,那么你不需要 deleted 标志。如果处理过时的数据确实是一个问题,那么你需要跨越所有使用返回值的代码持有每个条目的锁。

返回快速测验