RCU 和可卸载模块

[最初发表于 LWN 2007 年 1 月 14 日: http://lwn.net/Articles/217484/]

RCU 更新器有时会使用 call_rcu() 来启动一个异步等待,等待宽限期过去。这个原语接受一个指向 RCU 保护数据结构内的 rcu_head 结构的指针,以及另一个指向一个函数的指针,该函数可能会稍后被调用来释放该结构。从 IRQ 上下文中删除链表中的元素 p 的代码可能如下所示

list_del_rcu(p);
call_rcu(&p->rcu, p_callback);

由于 call_rcu() 永远不会阻塞,所以这段代码可以在 IRQ 上下文中安全使用。函数 p_callback() 的定义可能如下所示

static void p_callback(struct rcu_head *rp)
{
        struct pstruct *p = container_of(rp, struct pstruct, rcu);

        kfree(p);
}

卸载使用 call_rcu() 的模块

但是,如果 p_callback() 函数是在一个可卸载模块中定义的呢?

如果我们在某些 RCU 回调仍在等待时卸载了模块,那么执行这些回调的 CPU 稍后在调用时将会非常失望,正如 http://lwn.net/images/ns/kernel/rcu-drop.jpg 中形象地描绘的那样。

我们可以尝试在模块退出代码路径中放置一个 synchronize_rcu(),但这还不够。虽然 synchronize_rcu() 会等待宽限期过去,但它不会等待回调完成。

人们可能会想尝试几个背靠背的 synchronize_rcu() 调用,但这仍然不能保证工作。如果 RCU 回调负载非常重,那么一些回调可能会被延迟,以便允许其他处理继续进行。仅举一个例子,实时内核中需要这种延迟,以避免过度的调度延迟。

rcu_barrier()

这种情况可以通过 rcu_barrier() 原语来处理。 rcu_barrier() 不是等待宽限期过去,而是等待所有未完成的 RCU 回调完成。请注意,rcu_barrier() **不** 意味着 synchronize_rcu(),特别是,如果没有在任何地方排队 RCU 回调,rcu_barrier() 有权立即返回,而无需等待任何东西,更不用说等待宽限期。

使用 rcu_barrier() 的伪代码如下

  1. 阻止发布任何新的 RCU 回调。

  2. 执行 rcu_barrier()

  3. 允许卸载模块。

还有一个用于 SRCU 的 srcu_barrier() 函数,您当然必须将 srcu_barrier() 的风格与 call_srcu() 的风格相匹配。如果您的模块使用多个 srcu_struct 结构,那么在卸载该模块时,它还必须使用 srcu_barrier() 的多次调用。例如,如果它在 srcu_struct_1 上使用 call_rcu(),在 srcu_struct_1 上使用 call_srcu(),并在 srcu_struct_2 上使用 call_srcu(),那么在卸载时将需要以下三行代码

1  rcu_barrier();
2  srcu_barrier(&srcu_struct_1);
3  srcu_barrier(&srcu_struct_2);

如果延迟至关重要,则可以使用工作队列并发运行这三个函数。

rcutorture 模块的旧版本在其退出函数中使用了 rcu_barrier(),如下所示

 1  static void
 2  rcu_torture_cleanup(void)
 3  {
 4    int i;
 5
 6    fullstop = 1;
 7    if (shuffler_task != NULL) {
 8      VERBOSE_PRINTK_STRING("Stopping rcu_torture_shuffle task");
 9      kthread_stop(shuffler_task);
10    }
11    shuffler_task = NULL;
12
13    if (writer_task != NULL) {
14      VERBOSE_PRINTK_STRING("Stopping rcu_torture_writer task");
15      kthread_stop(writer_task);
16    }
17    writer_task = NULL;
18
19    if (reader_tasks != NULL) {
20      for (i = 0; i < nrealreaders; i++) {
21        if (reader_tasks[i] != NULL) {
22          VERBOSE_PRINTK_STRING(
23            "Stopping rcu_torture_reader task");
24          kthread_stop(reader_tasks[i]);
25        }
26        reader_tasks[i] = NULL;
27      }
28      kfree(reader_tasks);
29      reader_tasks = NULL;
30    }
31    rcu_torture_current = NULL;
32
33    if (fakewriter_tasks != NULL) {
34      for (i = 0; i < nfakewriters; i++) {
35        if (fakewriter_tasks[i] != NULL) {
36          VERBOSE_PRINTK_STRING(
37            "Stopping rcu_torture_fakewriter task");
38          kthread_stop(fakewriter_tasks[i]);
39        }
40        fakewriter_tasks[i] = NULL;
41      }
42      kfree(fakewriter_tasks);
43      fakewriter_tasks = NULL;
44    }
45
46    if (stats_task != NULL) {
47      VERBOSE_PRINTK_STRING("Stopping rcu_torture_stats task");
48      kthread_stop(stats_task);
49    }
50    stats_task = NULL;
51
52    /* Wait for all RCU callbacks to fire. */
53    rcu_barrier();
54
55    rcu_torture_stats_print(); /* -After- the stats thread is stopped! */
56
57    if (cur_ops->cleanup != NULL)
58      cur_ops->cleanup();
59    if (atomic_read(&n_rcu_torture_error))
60      rcu_torture_print_module_parms("End of test: FAILURE");
61    else
62      rcu_torture_print_module_parms("End of test: SUCCESS");
63  }

第 6 行设置了一个全局变量,以防止任何 RCU 回调重新发布它们自己。在大多数情况下,这没有必要,因为 RCU 回调很少包括对 call_rcu() 的调用。但是,rcutorture 模块是此规则的例外,因此需要设置此全局变量。

第 7-50 行停止了与 rcutorture 模块关联的所有内核任务。因此,一旦执行到达第 53 行,就不会再发布任何 rcutorture RCU 回调。第 53 行的 rcu_barrier() 调用等待任何预先存在的回调完成。

然后,第 55-62 行打印状态并执行特定于操作的清理,然后返回,允许完成模块卸载操作。

快速测验 #1

还有其他需要 rcu_barrier() 的情况吗?

快速测验 #1 的答案

您的模块可能有其他复杂情况。例如,如果您的模块从计时器调用 call_rcu(),则需要首先避免发布新的计时器,取消(或等待)所有已发布的计时器,然后才调用 rcu_barrier() 以等待任何剩余的 RCU 回调完成。

当然,如果您的模块使用 call_rcu(),则需要在卸载之前调用 rcu_barrier()。同样,如果您的模块使用 call_srcu(),则需要在卸载之前在同一个 srcu_struct 结构上调用 srcu_barrier()。如果您的模块使用 call_rcu() **和** call_srcu(),那么(如上所述)您将需要调用 rcu_barrier() **和** srcu_barrier()

实现 rcu_barrier()

Dipankar Sarma 对 rcu_barrier() 的实现利用了 RCU 回调在排队到每个 CPU 队列之一后永远不会被重新排序的事实。他的实现将 RCU 回调排队到每个 CPU 回调队列上,然后等待直到它们全部开始执行,此时,保证所有较早的 RCU 回调都已完成。

rcu_barrier() 的原始代码大致如下

 1  void rcu_barrier(void)
 2  {
 3    BUG_ON(in_interrupt());
 4    /* Take cpucontrol mutex to protect against CPU hotplug */
 5    mutex_lock(&rcu_barrier_mutex);
 6    init_completion(&rcu_barrier_completion);
 7    atomic_set(&rcu_barrier_cpu_count, 1);
 8    on_each_cpu(rcu_barrier_func, NULL, 0, 1);
 9    if (atomic_dec_and_test(&rcu_barrier_cpu_count))
10      complete(&rcu_barrier_completion);
11    wait_for_completion(&rcu_barrier_completion);
12    mutex_unlock(&rcu_barrier_mutex);
13  }

第 3 行验证调用者是否在进程上下文中,第 5 行和第 12 行使用 rcu_barrier_mutex 来确保一次只有一个 rcu_barrier() 正在使用全局完成和计数器,它们在第 6 行和第 7 行初始化。第 8 行导致每个 CPU 调用 rcu_barrier_func(),如下所示。请注意,on_each_cpu() 的参数列表中的最后一个“1”确保在 on_each_cpu() 返回之前,对 rcu_barrier_func() 的所有调用都将完成。第 9 行从 rcu_barrier_cpu_count 中删除初始计数,如果此计数现在为零,则第 10 行将完成最终化,从而防止第 11 行阻塞。无论如何,第 11 行都会等待(如果需要)完成。

快速测验 #2

为什么第 8 行不将 rcu_barrier_cpu_count 初始化为零,从而避免了第 9 行和第 10 行的需要?

快速测验 #2 的答案

此代码在 2008 年进行了重写,此后又进行了多次重写,但这仍然给出了总体思路。

rcu_barrier_func() 在每个 CPU 上运行,它会在其中调用 call_rcu() 以发布 RCU 回调,如下所示

 1  static void rcu_barrier_func(void *notused)
 2  {
 3    int cpu = smp_processor_id();
 4    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
 5    struct rcu_head *head;
 6
 7    head = &rdp->barrier;
 8    atomic_inc(&rcu_barrier_cpu_count);
 9    call_rcu(head, rcu_barrier_callback);
10  }

第 3 行和第 4 行定位 RCU 内部每个 CPU 的 rcu_data 结构,其中包含 struct rcu_head,该结构是稍后调用 call_rcu() 所必需的。第 7 行获取指向此 struct rcu_head 的指针,第 8 行递增全局计数器。此计数器稍后将由回调递减。然后,第 9 行在当前 CPU 的队列上注册 rcu_barrier_callback()。

rcu_barrier_callback() 函数只是原子地递减 rcu_barrier_cpu_count 变量,并在其达到零时完成最终处理,如下所示

1  static void rcu_barrier_callback(struct rcu_head *notused)
2  {
3    if (atomic_dec_and_test(&rcu_barrier_cpu_count))
4      complete(&rcu_barrier_completion);
5  }
快速测验 #3

如果 CPU 0 的 rcu_barrier_func() 立即执行(因此将 rcu_barrier_cpu_count 递增为 1),但其他 CPU 的 rcu_barrier_func() 调用延迟了一个完整的宽限期,会发生什么情况?这会不会导致 rcu_barrier() 过早返回?

快速测验 #3 的答案

当前的 rcu_barrier() 实现更加复杂,这是由于需要避免干扰空闲 CPU(尤其是在电池供电的系统上)以及需要尽可能少地干扰实时系统中的非空闲 CPU。此外,还应用了许多优化。但是,上面的代码说明了这些概念。

rcu_barrier() 总结

rcu_barrier() 原语的使用频率相对较低,因为大多数使用 RCU 的代码都在核心内核中,而不是在模块中。但是,如果您是从可卸载模块中使用 RCU,则需要使用 rcu_barrier(),以便可以安全地卸载您的模块。

快速测验的答案

快速测验 #1

还有其他需要 rcu_barrier() 的情况吗?

答案

有趣的是,rcu_barrier() 最初不是为模块卸载而实现的。Nikita Danilov 在一个文件系统中使用了 RCU,这导致了文件系统卸载时出现类似的情况。Dipankar Sarma 编写了 rcu_barrier() 作为响应,以便 Nikita 可以在文件系统卸载过程中调用它。

后来,我自己在实现 rcutorture 时遇到了 RCU 模块卸载问题,并发现 rcu_barrier() 也解决了这个问题。

回到快速测验 #1

快速测验 #2

为什么第 8 行不将 rcu_barrier_cpu_count 初始化为零,从而避免了第 9 行和第 10 行的需要?

答案

假设第 8 行显示的 on_each_cpu() 函数被延迟,导致 CPU 0 的 rcu_barrier_func() 执行并且相应的宽限期经过,所有这些都发生在 CPU 1 的 rcu_barrier_func() 开始执行之前。这将导致 rcu_barrier_cpu_count 递减为零,因此第 11 行的 wait_for_completion() 会立即返回,未能等待 CPU 1 的回调被调用。

请注意,当 rcu_barrier() 代码在 2005 年首次添加时,这并不是问题。这是因为 on_each_cpu() 禁用了抢占,这充当了 RCU 读取侧的临界区,从而防止 CPU 0 的宽限期在 on_each_cpu() 处理完所有 CPU 之前完成。但是,随着可抢占 RCU 的出现,rcu_barrier() 不再等待可抢占内核中不可抢占的代码区域,这是新函数 rcu_barrier_sched() 的工作。

但是,随着围绕 v4.20 的 RCU 风格整合,这种可能性再次被排除,因为整合后的 RCU 再次等待不可抢占的代码区域。

尽管如此,额外的计数可能仍然是一个好主意。依赖这些偶然的实现可能会在实现更改时导致以后出现令人惊讶的错误。

回到快速测验 #2

快速测验 #3

如果 CPU 0 的 rcu_barrier_func() 立即执行(因此将 rcu_barrier_cpu_count 递增为 1),但其他 CPU 的 rcu_barrier_func() 调用延迟了一个完整的宽限期,会发生什么情况?这会不会导致 rcu_barrier() 过早返回?

答案

这不可能发生。原因是 on_each_cpu() 的最后一个参数,即等待标志,设置为 “1”。此标志被传递给 smp_call_function() 并进一步传递给 smp_call_function_on_cpu(),导致后者一直自旋直到 rcu_barrier_func() 的跨 CPU 调用完成。这本身将阻止在非 CONFIG_PREEMPTION 内核上完成宽限期,因为每个 CPU 必须经历上下文切换(或其他静止状态)才能完成宽限期。但是,这在 CONFIG_PREEMPTION 内核中是没用的。

因此,on_each_cpu() 在其调用 smp_call_function() 时以及本地调用 rcu_barrier_func() 时禁用抢占。由于最近的 RCU 实现将禁用抢占的代码区域视为 RCU 读取侧的临界区,因此这可以防止宽限期完成。这意味着所有 CPU 都已执行 rcu_barrier_func(),然后第一个 rcu_barrier_callback() 才可能执行,这反过来又防止 rcu_barrier_cpu_count 过早达到零。

但是,如果 on_each_cpu() 决定放弃禁用抢占,这很可能由于实时延迟的考虑而发生,那么将 rcu_barrier_cpu_count 初始化为 1 将会解决问题。

回到快速测验 #3