RCU和可卸载模块

[最初发表于 LWN 2007 年 1 月 14 日:http://lwn.net/Articles/217484/]

RCU 更新器有时使用 call_rcu() 来启动对宽限期经过的异步等待。此原语接受一个指向位于 RCU 保护的数据结构中的 rcu_head 结构体的指针,以及另一个指向稍后可能被调用以释放该结构体的函数的指针。然后,从 IRQ 上下文中删除链表中的元素 p 的代码可能如下

list_del_rcu(p);
call_rcu(&p->rcu, p_callback);

由于 call_rcu() 永远不会阻塞,因此可以在 IRQ 上下文中安全地使用此代码。函数 p_callback() 可以定义如下

static void p_callback(struct rcu_head *rp)
{
        struct pstruct *p = container_of(rp, struct pstruct, rcu);

        kfree(p);
}

卸载使用 call_rcu() 的模块

但是,如果 p_callback() 函数是在可卸载模块中定义的呢?

如果我们在某些 RCU 回调挂起时卸载模块,则执行这些回调的 CPU 将在稍后被调用时感到非常失望,如 http://lwn.net/images/ns/kernel/rcu-drop.jpg 中生动地描绘的那样。

我们可以尝试在模块退出代码路径中放置一个 synchronize_rcu(),但这并不足够。虽然 synchronize_rcu() 确实会等待宽限期经过,但它不会等待回调完成。

人们可能会试图尝试几个背靠背的 synchronize_rcu() 调用,但这仍然不能保证有效。如果 RCU 回调负载非常重,那么可能会推迟某些回调,以便允许进行其他处理。例如,在实时内核中需要这种延迟,以避免过度的调度延迟。

rcu_barrier()

这种情况可以通过 rcu_barrier() 原语来处理。 rcu_barrier() 不是等待宽限期经过,而是等待所有未完成的 RCU 回调完成。请注意,rcu_barrier() 意味着 synchronize_rcu(),特别是,如果没有 RCU 回调在任何地方排队,rcu_barrier() 有权立即返回,而不等待任何事情,更不用说宽限期了。

使用 rcu_barrier() 的伪代码如下

  1. 阻止发布任何新的 RCU 回调。

  2. 执行 rcu_barrier()

  3. 允许卸载模块。

还有一个用于 SRCU 的 srcu_barrier() 函数,当然,您必须将 srcu_barrier() 的风格与 call_srcu() 的风格相匹配。如果您的模块使用多个 srcu_struct 结构,那么在卸载该模块时,它也必须使用多个 srcu_barrier() 调用。例如,如果它在 srcu_struct_1 上使用 call_rcu()call_srcu(),并在 srcu_struct_2 上使用 call_srcu(),那么在卸载时将需要以下三行代码

1  rcu_barrier();
2  srcu_barrier(&srcu_struct_1);
3  srcu_barrier(&srcu_struct_2);

如果延迟至关重要,则可以使用工作队列并发运行这三个函数。

rcutorture 模块的旧版本在其退出函数中使用 rcu_barrier(),如下所示

 1  static void
 2  rcu_torture_cleanup(void)
 3  {
 4    int i;
 5
 6    fullstop = 1;
 7    if (shuffler_task != NULL) {
 8      VERBOSE_PRINTK_STRING("Stopping rcu_torture_shuffle task");
 9      kthread_stop(shuffler_task);
10    }
11    shuffler_task = NULL;
12
13    if (writer_task != NULL) {
14      VERBOSE_PRINTK_STRING("Stopping rcu_torture_writer task");
15      kthread_stop(writer_task);
16    }
17    writer_task = NULL;
18
19    if (reader_tasks != NULL) {
20      for (i = 0; i < nrealreaders; i++) {
21        if (reader_tasks[i] != NULL) {
22          VERBOSE_PRINTK_STRING(
23            "Stopping rcu_torture_reader task");
24          kthread_stop(reader_tasks[i]);
25        }
26        reader_tasks[i] = NULL;
27      }
28      kfree(reader_tasks);
29      reader_tasks = NULL;
30    }
31    rcu_torture_current = NULL;
32
33    if (fakewriter_tasks != NULL) {
34      for (i = 0; i < nfakewriters; i++) {
35        if (fakewriter_tasks[i] != NULL) {
36          VERBOSE_PRINTK_STRING(
37            "Stopping rcu_torture_fakewriter task");
38          kthread_stop(fakewriter_tasks[i]);
39        }
40        fakewriter_tasks[i] = NULL;
41      }
42      kfree(fakewriter_tasks);
43      fakewriter_tasks = NULL;
44    }
45
46    if (stats_task != NULL) {
47      VERBOSE_PRINTK_STRING("Stopping rcu_torture_stats task");
48      kthread_stop(stats_task);
49    }
50    stats_task = NULL;
51
52    /* Wait for all RCU callbacks to fire. */
53    rcu_barrier();
54
55    rcu_torture_stats_print(); /* -After- the stats thread is stopped! */
56
57    if (cur_ops->cleanup != NULL)
58      cur_ops->cleanup();
59    if (atomic_read(&n_rcu_torture_error))
60      rcu_torture_print_module_parms("End of test: FAILURE");
61    else
62      rcu_torture_print_module_parms("End of test: SUCCESS");
63  }

第 6 行设置了一个全局变量,以防止任何 RCU 回调重新发布自身。这在大多数情况下是不必要的,因为 RCU 回调很少包括对 call_rcu() 的调用。但是,rcutorture 模块是此规则的一个例外,因此需要设置此全局变量。

第 7-50 行停止了与 rcutorture 模块关联的所有内核任务。因此,一旦执行到达第 53 行,将不会发布更多 rcutorture RCU 回调。第 53 行上的 rcu_barrier() 调用会等待任何预先存在的回调完成。

然后,第 55-62 行打印状态并执行特定于操作的清理,然后返回,允许完成模块卸载操作。

快速测验 #1

还有其他任何情况需要 rcu_barrier() 吗?

快速测验 #1 的答案

您的模块可能存在其他复杂情况。例如,如果您的模块从定时器调用 call_rcu(),您需要首先避免发布新的定时器,取消(或等待)所有已发布的定时器,然后才能调用 rcu_barrier() 以等待任何剩余的 RCU 回调完成。

当然,如果您的模块使用 call_rcu(),您需要在卸载之前调用 rcu_barrier()。同样,如果您的模块使用 call_srcu(),您需要在卸载之前调用 srcu_barrier(),并且在同一个 srcu_struct 结构上调用。如果您的模块使用 call_rcu() call_srcu(),那么(如上所述)您需要调用 rcu_barrier() srcu_barrier()

实现 rcu_barrier()

Dipankar Sarma 对 rcu_barrier() 的实现利用了以下事实:RCU 回调一旦在每个 CPU 队列之一上排队,就永远不会重新排序。他的实现将 RCU 回调排队在每个 CPU 回调队列上,然后等待它们全部开始执行,此时,保证所有较早的 RCU 回调都已完成。

rcu_barrier() 的原始代码大致如下

 1  void rcu_barrier(void)
 2  {
 3    BUG_ON(in_interrupt());
 4    /* Take cpucontrol mutex to protect against CPU hotplug */
 5    mutex_lock(&rcu_barrier_mutex);
 6    init_completion(&rcu_barrier_completion);
 7    atomic_set(&rcu_barrier_cpu_count, 1);
 8    on_each_cpu(rcu_barrier_func, NULL, 0, 1);
 9    if (atomic_dec_and_test(&rcu_barrier_cpu_count))
10      complete(&rcu_barrier_completion);
11    wait_for_completion(&rcu_barrier_completion);
12    mutex_unlock(&rcu_barrier_mutex);
13  }

第 3 行验证调用者是否在进程上下文中,第 5 行和第 12 行使用 rcu_barrier_mutex 来确保一次只有一个 rcu_barrier() 使用全局完成和计数器,这些计数器在第 6 行和第 7 行初始化。第 8 行使每个 CPU 调用 rcu_barrier_func(),如下所示。请注意,on_each_cpu() 的参数列表中的最后一个 “1” 确保对 rcu_barrier_func() 的所有调用在 on_each_cpu() 返回之前完成。第 9 行从 rcu_barrier_cpu_count 中删除初始计数,如果此计数现在为零,则第 10 行完成完成,这会阻止第 11 行阻塞。无论哪种方式,第 11 行都会等待(如果需要)完成。

快速测验 #2

为什么第 8 行不将 rcu_barrier_cpu_count 初始化为零,从而避免对第 9 行和第 10 行的需求?

快速测验 #2 的答案

此代码在 2008 年重写,之后多次重写,但这仍然给出了总体思路。

rcu_barrier_func() 在每个 CPU 上运行,它调用 call_rcu() 以发布 RCU 回调,如下所示

 1  static void rcu_barrier_func(void *notused)
 2  {
 3    int cpu = smp_processor_id();
 4    struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
 5    struct rcu_head *head;
 6
 7    head = &rdp->barrier;
 8    atomic_inc(&rcu_barrier_cpu_count);
 9    call_rcu(head, rcu_barrier_callback);
10  }

第 3 行和第 4 行定位 RCU 的内部每个 CPU rcu_data 结构,该结构包含稍后调用 call_rcu() 所需的 struct rcu_head。第 7 行获取指向此 struct rcu_head 的指针,第 8 行递增全局计数器。此计数器稍后将由回调递减。然后,第 9 行在当前 CPU 的队列上注册 rcu_barrier_callback()。

rcu_barrier_callback() 函数只是原子地递减 rcu_barrier_cpu_count 变量,并在其达到零时完成完成,如下所示

1  static void rcu_barrier_callback(struct rcu_head *notused)
2  {
3    if (atomic_dec_and_test(&rcu_barrier_cpu_count))
4      complete(&rcu_barrier_completion);
5  }
快速测验 #3

如果 CPU 0 的 rcu_barrier_func() 立即执行(因此将 rcu_barrier_cpu_count 递增到值 1),但其他 CPU 的 rcu_barrier_func() 调用延迟了完整的宽限期会发生什么?这是否会导致 rcu_barrier() 过早返回?

快速测验 #3 的答案

当前的 rcu_barrier() 实现更加复杂,这是由于需要避免干扰空闲 CPU(尤其是在电池供电的系统上)以及需要最小程度地干扰实时系统中的非空闲 CPU。此外,还应用了大量的优化。但是,上面的代码说明了这些概念。

rcu_barrier() 摘要

rcu_barrier() 原语的使用频率相对较低,因为大多数使用 RCU 的代码都在核心内核中,而不是在模块中。但是,如果您从可卸载模块中使用 RCU,则需要使用 rcu_barrier(),以便可以安全地卸载您的模块。

快速测验的答案

快速测验 #1

还有其他任何情况需要 rcu_barrier() 吗?

答案

有趣的是,rcu_barrier() 最初不是为模块卸载而实现的。 Nikita Danilov 在文件系统中使用 RCU,这导致了文件系统卸载时的类似情况。 Dipankar Sarma 对 rcu_barrier() 进行了编码以响应,以便 Nikita 可以在文件系统卸载过程中调用它。

很久以后,我自己在实现 rcutorture 时遇到了 RCU 模块卸载问题,并发现 rcu_barrier() 也解决了这个问题。

返回快速测验 #1

快速测验 #2

为什么第 8 行不将 rcu_barrier_cpu_count 初始化为零,从而避免对第 9 行和第 10 行的需求?

答案

假设第 8 行上显示的 on_each_cpu() 函数被延迟,因此 CPU 0 的 rcu_barrier_func() 执行并且相应的宽限期经过,所有这些都在 CPU 1 的 rcu_barrier_func() 开始执行之前。这将导致 rcu_barrier_cpu_count 递减到零,因此第 11 行的 wait_for_completion() 将立即返回,而未能等待 CPU 1 的回调被调用。

请注意,当 rcu_barrier() 代码在 2005 年首次添加时,这不是一个问题。这是因为 on_each_cpu() 禁用了抢占,这充当了 RCU 读取端临界区,从而阻止了 CPU 0 的宽限期完成,直到 on_each_cpu() 处理了所有 CPU。

但是,随着 v4.20 左右的 RCU 风格整合,再次排除了这种可能性,因为整合后的 RCU 再次等待代码的非抢占区域。

但是,额外的计数仍然可能是一个好主意。依赖于这些偶然的实现可能会在实现更改时导致以后的意外错误。

返回快速测验 #2

快速测验 #3

如果 CPU 0 的 rcu_barrier_func() 立即执行(因此将 rcu_barrier_cpu_count 递增到值 1),但其他 CPU 的 rcu_barrier_func() 调用延迟了完整的宽限期会发生什么?这是否会导致 rcu_barrier() 过早返回?

答案

这不可能发生。原因是 on_each_cpu() 的最后一个参数(等待标志)设置为 “1”。此标志通过 smp_call_function() 并进一步传递到 smp_call_function_on_cpu(),导致后者旋转,直到 rcu_barrier_func() 的跨 CPU 调用完成。这本身会阻止宽限期在非 CONFIG_PREEMPTION 内核上完成,因为每个 CPU 必须经过上下文切换(或其他静止状态)才能完成宽限期。但是,这在 CONFIG_PREEMPTION 内核中没有用处。

因此,on_each_cpu() 在其调用 smp_call_function() 期间以及本地调用 rcu_barrier_func() 期间禁用抢占。由于最近的 RCU 实现将禁用抢占的代码区域视为 RCU 读取端临界区,因此这会阻止宽限期完成。这意味着所有 CPU 都已执行 rcu_barrier_func(),然后第一个 rcu_barrier_callback() 才可能执行,进而防止 rcu_barrier_cpu_count 过早达到零。

但是,如果 on_each_cpu() 曾经决定放弃禁用抢占(由于实时延迟考虑,这很可能发生),那么将 rcu_barrier_cpu_count 初始化为 1 将会挽救局面。

返回快速测验 #3