李峰峰博客

GCD 底层原理 3 - dispatch_sync

2022-11-12

一、概述

dispatch_sync 用于将一个任务同步地提交到指定的调度队列,并等待该任务完成后才返回。调用者线程会被阻塞,直到任务执行完毕。dispatch_sync 在需要确保任务在某个队列上按顺序执行,前一个任务完成之前,后续任务不会执行。

dispatch_sync 特点:

  • 不会开启新线程
    • dispatch_sync 在调用时,不会创建新的线程来执行任务。它会在指定的队列上同步地执行任务。
  • 按照顺序执行
    • 串行队列
      • 在串行队列上,dispatch_sync 提交的任务会按照队列中的顺序依次执行,确保任务之间的顺序性。
    • 并发队列
      • 同一个线程使用 dispatch_sync 往并发队列提交任务,任务会按照提交顺序执行,因为每个调用会阻塞调用线程,直到当前任务完成。
      • 不同线程使用 dispatch_sync 往同一并发队列提交任务,任务的执行顺序是不可预测的,因为并发队列会并行处理任务,具体顺序取决于系统调度。

使用示例:

1
2
3
4
dispatch_queue_t queue = dispatch_queue_create("com.lixkit.serialQueue", DISPATCH_QUEUE_SERIAL);
dispatch_sync(queue, ^{
// ......
});

二、dispatch_sync

dispatch_sync 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
void dispatch_sync(dispatch_queue_t dq, dispatch_block_t work) {
uintptr_t dc_flags = DC_FLAG_BLOCK; // 初始化标志变量 dc_flags,表示任务是一个 Block

// 检查任务是否包含私有数据
if (unlikely(_dispatch_block_has_private_data(work))) {
// 如果任务包含私有数据,调用 _dispatch_sync_block_with_privdata 处理
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}

// 如果任务不包含私有数据,调用 _dispatch_sync_f 同步执行任务
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

上述逻辑中,先判断传入的 block 是否包含私有数据,当我们通过前面的示例方式调用 dispatch_sync,传入的 block 是不包含私有数据的。当传入的 block 是通过下面方式创建的时候,block 中才会包含私有数据:

1
2
3
dispatch_block_t block = dispatch_block_create(0, ^{
// ......
});

dispatch_block_create 用于创建一个带有附加控制信息的 Block,这些附加信息可以包括优先级、屏障标志、凭证等。例如,dispatch_block_create 的第 1 个参数传 DISPATCH_BLOCK_BARRIER,可以创建一个屏障 Block。在并发队列中,屏障 Block 可以确保在其前面的任务完成后才开始执行,并在其后面的任务开始之前完成。例如:

1
2
3
4
5
6
7
8
9
dispatch_block_t block = dispatch_block_create(DISPATCH_BLOCK_BARRIER, ^{
// 执行一些需要屏障的操作
});

// 提交到队列
dispatch_async(queue, block);

// 可以在需要时取消
dispatch_block_cancel(block);

关于 dispatch_block_create 不是本次源码分析的重点,这里不再深究。也就是说,我们常规使用 dispatch_sync 执行串行、并发队列时,实际会调用 _dispatch_sync_f 函数。

三、_dispatch_sync_f

_dispatch_sync_f 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

static inline void _dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags) {

// 如果队列的宽度为 1(串行队列的 dq_width 为 1)
if (likely(dq->dq_width == 1)) {
// 1、串行队列执行逻辑
// 对于串行队列,调用 _dispatch_barrier_sync_f 执行任务
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}


// 2、并发队列执行逻辑
// 检查队列的类型是否是 _DISPATCH_LANE_TYPE
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
// 如果队列类型不支持 dispatch_sync,触发崩溃
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}

// 将调度队列转换为 dispatch_lane_t 类型
dispatch_lane_t dl = upcast(dq)._dl;

// 全局并发队列和绑定到非调度线程的队列总是进入慢速路径
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
// 如果无法保留同步宽度,调用 _dispatch_sync_f_slow 处理
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}

// 如果目标队列存在目标队列,表示递归目标队列
if (unlikely(dq->do_targetq->do_targetq)) {
// 调用 _dispatch_sync_recurse 递归处理
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}

// 开始同步调度任务的内省
_dispatch_introspection_sync_begin(dl);

// 同步调用并完成任务
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

这里需要分成两部分看:串行队列执行逻辑、并发队列执行逻辑。

在前面分析 dispatch_queue_create 的源码时候就已经知道:

  • 创建串行队列时,dq_width = 1
  • 创建并发队列时,dq_width = 4094

所以对于串行队列,会进入 _dispatch_barrier_sync_f 函数的执行逻辑,并发队列走剩余的逻辑。

四、_dispatch_barrier_sync_f

根据前面源码可知,对于串行队列,会通过 likely(dq->dq_width == 1) 的判断,进入 _dispatch_barrier_sync_f 函数的执行逻辑。

_dispatch_barrier_sync_f 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}

static inline void _dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags) {
// 获取当前线程的线程 ID
dispatch_tid tid = _dispatch_tid_self();

// 检查队列的类型是否是 _DISPATCH_LANE_TYPE
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
// 如果队列类型不支持 dispatch_sync,触发崩溃
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}

// 将调度队列转换为 dispatch_lane_t 类型
dispatch_lane_t dl = upcast(dq)._dl;

// 尝试获取屏障锁
// 更正确的做法是将刚刚获取屏障锁的线程的 QoS 合并到队列状态中。
// 但是这对于快速路径来说太昂贵了,所以跳过它。
// 所选择的折衷是,如果较低优先级线程上的入队与此快速路径争用,
// 该线程可能会收到无用的覆盖。
//
// 全局并发队列和绑定到非调度线程的队列总是进入慢速路径,
// 参见 DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
// 获取屏障锁失败,调用 _dispatch_sync_f_slow 处理
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}

// 获取屏障锁成功,继续执行
// 如果目标队列也存在目标队列,表示需要递归目标队列
if (unlikely(dl->do_targetq->do_targetq)) {
// 调用 _dispatch_sync_recurse 递归处理,并返回
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}

// 开始同步调度任务的内省,这一步主要是用于记录日志和调试
_dispatch_introspection_sync_begin(dl);

// 执行任务,并重置 dq_state、唤醒线程
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

上述核心逻辑如下:

  • 获取屏障锁
    • 屏障锁用于确保在执行任务时,队列中的其他任务不会同时执行。
    • 获取屏障锁成功:意味着当前队列空闲,可以执行任务。
    • 获取屏障锁失败:意味着当前队列非空闲,正在执行其他任务。
      • 此时会执行 _dispatch_sync_f_slow 函数。
  • 递归目标队列(如果有必要)
  • 执行任务,并重置 dq_state、唤醒线程

接下来,通过源码详细看下各部分逻辑。

1、获取屏障锁

获取屏障锁逻辑如下:

1
2
3
4
5
6
// 尝试获取屏障锁
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
// 如果无法获取屏障锁,表示队列非空闲,调用 _dispatch_sync_f_slow 处理,并返回
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}

获取屏障锁调用的是 _dispatch_queue_try_acquire_barrier_sync 函数:

1
2
3
4
5
static inline bool
_dispatch_queue_try_acquire_barrier_sync(dispatch_queue_class_t dq, uint32_t tid)
{
return _dispatch_queue_try_acquire_barrier_sync_and_suspend(dq._dl, tid, 0);
}

其内部调用的 _dispatch_queue_try_acquire_barrier_sync_and_suspend 函数,内部涉及到一系列宏定义,将该函数完全展开后逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
uint32_t tid, uint64_t suspend_count)
{
// 计算队列的初始状态值,基于队列的宽度
uint64_t init = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);

// 计算新的状态值,组合了多个标志位和信息
uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
_dispatch_lock_value_from_tid(tid) | // 从线程 ID 生成锁值
DISPATCH_QUEUE_UNCONTENDED_SYNC | // 表示同步操作未被争用
(suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL); // 根据挂起计数计算状态增量

uint64_t old_state, new_state;

// 使用原子操作循环尝试更新队列状态
bool _result = false;
// 定义 _p 为指向 dq_state 字段的指针
__typeof__(&(dq)->dq_state) _p = &(dq)->dq_state;
// 使用原子加载操作读取当前字段的值,并保存到 old_state 中
old_state = os_atomic_load(_p, relaxed);

do {

// 从 dq_state 中取出 role
uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
// 检查当前状态是否等于初始状态加上角色信息
if (old_state != (init | role)) {
// 队列非空闲状态,放弃更新并退出 do while 循环。
// 函数返回 false,表示队列是非空闲状态
os_atomic_rmw_loop_give_up(break);
}

// 队列是空闲状态
// 将 role 信息保存到 value 上,生成新的 dq_state
new_state = value | role;

/**
* 尝试将当前字段的值从 old_state 更新为 new_state,如果更新成功,返回 true
* _p 指向 dq_state
*
* 比较:
* 比较 _p 所指向的共享变量的当前值(dq_state)与 old_state 是否相等。
*
* 交换:
* 如果相等,则将 _p 所指向的共享变量的值(dq_state)更新为 new_state,交换操作成功。
* 如果不相等,则将 _p 的当前值(dq_state)写入 old_state,交换操作失败。
*
* 返回结果:
* 返回一个布尔值,表示交换是否成功。
* 如果成功,返回 true,代表队列进入非空闲状态
* 否则,返回 false,并在下一次循环中使用更新后的 old_state 重新尝试。
**/
_result = os_atomic_cmpxchgvw(_p, old_state, new_state, &old_state, acquire);

} while (unlikely(!_result)); // 如果交换未成功,继续循环尝试

return _result; // 返回结果,表示交换是否成功
}

从上述逻辑可知,是利用 dq_state 实现获取屏障锁逻辑,在上一篇 dispatch_queue 源码分析的文章里,已经知道串行队列:

1
2
// DISPATCH_QUEUE_ROLE_BASE_WLH 是 role
dq_state = (DISPATCH_QUEUE_STATE_INIT_VALUE(dq_width) | DISPATCH_QUEUE_ROLE_BASE_WLH) = 0x001ffe2000000000;

所以首次执行 _dispatch_queue_try_acquire_barrier_sync_and_suspend 函数时,一定满足下面条件:

1
old_state == (init | role)

所以会继续执行 dq_state 的赋值逻辑:

1
new_state = value | role;

赋值完成后,_result 值为 true,并且 _dispatch_queue_try_acquire_barrier_sync_and_suspend 函数返回 true,获取屏障锁成功。

这里,我们可以通过一个 Demo 验证下这部分逻辑:

根据上述 Demo 打印结果也可以看出利用 dq_state 实现获取屏障锁逻辑:

  • dq_state 初始值满足 dq_state == (init | role),允许获取屏障锁。
  • 获取屏障锁后,dq_state 被修改(dq_state = value | role),dq_state 不再满 dq_state == (init | role),后续提交的任务将无法再获取屏障锁。
  • 任务执行结束后,dq_state 会重置回初始值,下个任务可以获取屏障锁。

2、递归目标队列

上一步屏障锁如果获取成功,则会继续执行递归目标队列的逻辑,该逻辑如下:

1
2
3
4
5
6
// 如果目标队列也存在目标队列,表示需要递归目标队列
if (unlikely(dl->do_targetq->do_targetq)) {
// 调用 _dispatch_sync_recurse 递归处理,并返回
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}

上述主要逻辑是检查 do_targetq 是否也存在 do_targetq,如果存在,则需要递归队列。

那么,什么情况下 do_targetq 也存在 do_targetq 呢?

比较常见的一个场景是利用 dispatch_set_target_queue 函数设置目标队列,dispatch_set_target_queue 的主要目的是将一个 queue 的任务调度到另一个 queue。当你设置一个 queue 的目标 queue 后,原 queue 中的任务将在目标 queue 中执行,这样可以确保任务的优先级和调度策略与目标 queue 一致。

dispatch_set_target_queue 使用示例如下:

上述示例中,虽然任务是通过 dispatch_async 提交到 concurrentQueue 队列上的,由于设置了 concurrentQueue 的目标队列是 serialQueue,所以任务会在 serialQueue 上执行,最终任务串行执行。

这里调用 _dispatch_sync_recurse 函数内部与后续正常执行任务的逻辑基本一致,这里就不再单独分析。

3、执行任务,并重置 dq_state、唤醒线程

在上一步获取屏障锁成功后,最终会执行 _dispatch_lane_barrier_sync_invoke_and_complete 函数执行任务并重置 dq_state

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void
_dispatch_lane_barrier_sync_invoke_and_complete(dispatch_lane_t dq,
void *ctxt, dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
// 执行任务
_dispatch_sync_function_invoke_inline(dq, ctxt, func);

// 完成跟踪项,标记当前任务或事件的完成,主要用来日志记录、调试
_dispatch_trace_item_complete(dc);

// 检测当前队列状态,如果队列末尾存在任务或宽度大于1,则说明需要进行额外处理
if (unlikely(dq->dq_items_tail || dq->dq_width > 1)) {
// 调用 barrier 完成函数以处理当前的队列状态
return _dispatch_lane_barrier_complete(dq, 0, 0);
}

// 常见的状态检查,具体是检查旧状态中是否有需要处理的标志位
// 指定某些状态需要进一步处理,只有 _dispatch_*_barrier_complete() 能够妥善处理这些状态
const uint64_t fail_unlock_mask = DISPATCH_QUEUE_SUSPEND_BITS_MASK |
DISPATCH_QUEUE_ENQUEUED | DISPATCH_QUEUE_DIRTY |
DISPATCH_QUEUE_RECEIVED_OVERRIDE |
DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;

// 用于存储队列的旧状态和新状态
uint64_t old_state, new_state;
// 用于唤醒的标志位
dispatch_wakeup_flags_t flags = 0;

// 使用原子操作来更新队列状态,确保在多线程环境下的安全性
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
// 更新新状态,表示从旧状态中去掉串行排水拥有的信息
new_state = old_state - DISPATCH_QUEUE_SERIAL_DRAIN_OWNED;
// 清除解除锁定的标志位
new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
// 清除最大优先级的标志位
new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;

// 检查旧状态是否存在需要特殊处理的标志位
if (unlikely(old_state & fail_unlock_mask)) {
// 如果存在这样的标志,则放弃原子操作并调用完成函数,进行后期处理
os_atomic_rmw_loop_give_up({
return _dispatch_lane_barrier_complete(dq, 0, flags);
});
}
});

// 检查队列的状态是基于工作环的状态,确保事件循环没有被当前线程持有
if (_dq_state_is_base_wlh(old_state)) {
_dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
}
}

执行任务调用的是 _dispatch_sync_function_invoke_inline 函数:

1
2
3
4
5
6
7
8
9
10
11
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
// 执行任务
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}

上述执行任务主要是通过执行 _dispatch_client_callout 函数,关于 _dispatch_client_callout 函数,在另一篇文章《GCD 底层原理 1 - dispatch_once》 中有总结,其核心逻辑就是执行具体任务。

_dispatch_sync_function_invoke_inline 中其他的逻辑是线程调度相关,这部分不是本次分析重点,暂时忽略这部分逻辑。

_dispatch_lane_barrier_sync_invoke_and_complete 除了任务的执行外,还有这两部分逻辑:

  • 重置 dq_state
    • 前面已经提到,任务执行完成后,会重置 dq_state,使后续往队列新提交的任务可以成功获取屏障锁。
  • 唤醒线程
    • 如果 dispatch_sync 往串行队列中提交的任务还没有执行完成,再次使用 dispatch_sync 往相同队列提交新任务时,会获取屏障锁失败走 _dispatch_sync_f_slow 的逻辑,_dispatch_sync_f_slow 中会使线程阻塞等待上个任务的完成。当上个任务执行完成后,会唤醒阻塞的线程继续执行任务。

在分析唤醒线程的逻辑前,我们先分析下 _dispatch_sync_f_slow 中阻塞线程的逻辑。

五、_dispatch_sync_f_slow

在前面获取屏障锁失败后,会执行 _dispatch_sync_f_slow 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
// 获取顶层和当前调度队列
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;

// 如果当前队列没有目标队列,直接调用同步函数执行
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}

// 获取当前线程的优先级
pthread_priority_t pp = _dispatch_get_priority();

// 初始化同步上下文结构体
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags, // 设置标志位,表示当前是同步等待者
.dc_func = _dispatch_async_and_wait_invoke, // 设置要调用的函数
.dc_ctxt = &dsc, // 将自身的上下文传递
.dc_other = top_dq, // 保存顶层队列
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG, // 设置优先级
.dc_voucher = _voucher_get(), // 获取当前的凭证
.dsc_func = func, // 设置传入的函数
.dsc_ctxt = ctxt, // 设置传入的上下文
.dsc_waiter = _dispatch_tid_self(), // 获取当前线程的 ID
};

// 用于日志、调试
_dispatch_trace_item_push(top_dq, &dsc);

// 阻塞线程,等待队列的所有权
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

// 检查 dsc_func 是否被清空,如果为 NULL,表示块在其他线程上运行
if (dsc.dsc_func == NULL) {
// dsc_func 被清空意味着这个块在其他线程上运行
dispatch_queue_t stop_dq = dsc.dc_other; // 获取停止队列
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags); // 完成同步操作
}

// 开始同步的内省过程(用于调试)
_dispatch_introspection_sync_begin(top_dq);

// 用于日志、调试
_dispatch_trace_item_pop(top_dq, &dsc);

// 执行任务,并重置 dq_state、唤醒线程
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func, top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}

该函数核心逻辑如下:

  • 阻塞线程,等待上个任务执行结束
    • 执行 __DISPATCH_WAIT_FOR_QUEUE__ 函数
  • 解除阻塞后,执行当前任务,并重置 dq_state、唤醒线程
    • 执行 _dispatch_sync_invoke_and_complete_recurse 函数

1、阻塞线程

_dispatch_sync_f_slow 中,阻塞线程是通过调用 __DISPATCH_WAIT_FOR_QUEUE__ 函数实现的,该函数源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
// 获取当前队列的状态
// dq_state = dq_state | DISPATCH_QUEUE_RECEIVED_SYNC_WAIT;
uint64_t dq_state = _dispatch_wait_prepare(dq);

// 检查当前队列是否已被调用线程拥有
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}

// 提交到主线程的块必须在主线程上运行,
// dispatch_async_and_wait 也在远程上下文中执行,而不是当前线程。
// 对于这两种情况,我们需要保存帧链接,以便于 _dispatch_async_and_wait_invoke 使用
_dispatch_thread_frame_save_state(&dsc->dsc_dtf);

if (_dq_state_is_suspended(dq_state) ||
_dq_state_is_base_anon(dq_state)) {
dsc->dc_data = DISPATCH_WLH_ANON;
} else if (_dq_state_is_base_wlh(dq_state)) {
dsc->dc_data = (dispatch_wlh_t)dq;
} else {
_dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
}

if (dsc->dc_data == DISPATCH_WLH_ANON) {
dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
(uint8_t)_dispatch_get_basepri_override_qos_floor(); // 获取基础优先级
_dispatch_thread_event_init(&dsc->dsc_event); // 初始化线程事件
}

// 设置当前的同步上下文
_dispatch_set_current_dsc((void *) dsc);

// 将当前上下文推入队列,设置优先级
dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));

// 记录运行时事件
_dispatch_trace_runtime_event(sync_wait, dq, 0);

if (dsc->dc_data == DISPATCH_WLH_ANON) {
// 阻塞线程
_dispatch_thread_event_wait(&dsc->dsc_event);
} else if (!dsc->dsc_wlh_self_wakeup) {
// 阻塞线程
_dispatch_event_loop_wait_for_ownership(dsc);
}

// 清除当前的同步上下文
_dispatch_clear_current_dsc();

if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_destroy(&dsc->dsc_event);
// 如果 _dispatch_sync_waiter_wake() 给了当前线程一个优先级覆盖,
// 确保根队列看到它。
if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
_dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
}
}
}

上述逻辑中,先看这个判断:

1
2
3
4
5
6
7
8
if (_dq_state_is_suspended(dq_state) ||
_dq_state_is_base_anon(dq_state)) {
dsc->dc_data = DISPATCH_WLH_ANON;
} else if (_dq_state_is_base_wlh(dq_state)) {
dsc->dc_data = (dispatch_wlh_t)dq;
} else {
_dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
}

上述判断条件涉及的函数源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static inline bool
_dq_state_is_suspended(uint64_t dq_state)
{
return dq_state & DISPATCH_QUEUE_SUSPEND_BITS_MASK;
}

static inline bool
_dq_state_is_base_anon(uint64_t dq_state)
{
return dq_state & DISPATCH_QUEUE_ROLE_BASE_ANON;
}

static inline bool
_dq_state_is_base_wlh(uint64_t dq_state)
{
return dq_state & DISPATCH_QUEUE_ROLE_BASE_WLH;
}

根据前面的源码分析可以知道,dq_state 是不包含 DISPATCH_QUEUE_SUSPEND_BITS_MASK 信息的,所以 _dq_state_is_suspended 的判断返回 false

再根据另一篇文章 《GCD 底层原理 2 - dispatch_queue》 中分析得到的结论:

  • 串行队列,role = DISPATCH_QUEUE_ROLE_BASE_WLH;
  • 并发队列时,role = DISPATCH_QUEUE_ROLE_BASE_ANON;

所以对于串行队列,会走进第二个 if 分支里。

所以对于后面的这个判断:

1
2
3
4
5
6
7
if (dsc->dc_data == DISPATCH_WLH_ANON) {
// 阻塞线程
_dispatch_thread_event_wait(&dsc->dsc_event);
} else if (!dsc->dsc_wlh_self_wakeup) {
// 阻塞线程
_dispatch_event_loop_wait_for_ownership(dsc);
}

将执行 _dispatch_event_loop_wait_for_ownership 函数实现阻塞逻辑,该函数的完整源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
void _dispatch_event_loop_wait_for_ownership(dispatch_sync_context_t dsc) {
#if DISPATCH_USE_KEVENT_WORKLOOP
// 从同步上下文中获取工作循环句柄
dispatch_wlh_t wlh = dsc->dc_data;
// 定义事件结构数组,用于存储待处理事件
dispatch_kevent_s ke[2];
// 设置事件标志
// 注意,这里设置了 KEVENT_FLAG_IMMEDIATE
uint32_t kev_flags = KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS;
uint64_t dq_state; // 队列状态变量
int i, n = 0; // 计数器变量

// 原子加载队列的状态
dq_state = os_atomic_load2o((dispatch_queue_t)wlh, dq_state, relaxed);

/**
检查队列状态,并创建 dispatch_kevent_t

_dq_state_drain_locked 函数:
是否满足 dq_state & DLOCK_OWNER_MASK != 0

_dq_state_is_enqueued_on_target 函数:
是否满足 dq_state & DISPATCH_QUEUE_ENQUEUED = true
*/
if (!_dq_state_drain_locked(dq_state) && _dq_state_is_enqueued_on_target(dq_state)) {
// 此时,如果入队操作与服务线程正在处理同一项目发生竞争,可能导致队列被错误标记
// 为已入队而实际是空队列。这是一个潜在的优先级反转情况。
//
// 如果线程请求被选择来处理事件,但未能进入用户空间进行锁定,
// 因此,每个同步等待者都必须等待服务线程消耗事件请求。
//
// 为避免优先级反转,这里需要驱动一次以确保处理事件。
_dispatch_kq_fill_workloop_event(&ke[n++], DISPATCH_WORKLOOP_ASYNC, wlh, dq_state);
} else if (_dq_state_received_sync_wait(dq_state)) { // dq_state & DISPATCH_QUEUE_RECEIVED_SYNC_WAIT ?
// 如果队列处于同步等待状态,则填充同步等待事件
_dispatch_kq_fill_workloop_sync_event(&ke[n++], DISPATCH_WORKLOOP_SYNC_DISCOVER, wlh, dq_state, _dq_state_drain_owner(dq_state));
}

again:
// 填充同步等待事件,并准备进行等待
_dispatch_kq_fill_workloop_sync_event(&ke[n++], DISPATCH_WORKLOOP_SYNC_WAIT, wlh, dq_state, dsc->dsc_waiter);
// 查询处理事件
n = _dispatch_kq_poll(wlh, ke, n, ke, n, NULL, NULL, kev_flags);

// 处理返回的事件
for (i = 0; i < n; i++) {
long flags = 0;
if (ke[i].fflags & NOTE_WL_SYNC_WAIT) {
flags = DISPATCH_KEVENT_WORKLOOP_ALLOW_EINTR; // 如果是同步等待事件,允许中断
}
_dispatch_kevent_workloop_drain_error(&ke[i], flags); // 检查并处理错误
}

// 如果有事件被处理
if (n) {
dispatch_assert(n == 1 && (ke[0].fflags & NOTE_WL_SYNC_WAIT)); // 确保只有一个待处理事件
_dispatch_kevent_wlh_debug("restarting", &ke[0]); // 调试用,记录重启情况
dq_state = ke[0].ext[EV_EXTIDX_WL_VALUE]; // 更新队列状态
n = 0;
goto again;
}
#endif

// 检查需要取消的状态
if (dsc->dsc_waiter_needs_cancel) {
_dispatch_event_loop_cancel_waiter(dsc); // 取消等待者
dsc->dsc_waiter_needs_cancel = false; // 重置状态
}
// 如果有存储资源需要释放
if (dsc->dsc_release_storage) {
_dispatch_queue_release_storage(dsc->dc_data); // 释放队列存储
}
}

根据源码逻辑中各个判断条件,去除执行不到的逻辑分支,仅保留核心逻辑,精简后的 _dispatch_event_loop_wait_for_ownership 函数逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void _dispatch_event_loop_wait_for_ownership(dispatch_sync_context_t dsc) {

// ......

dispatch_kevent_s ke[2];
// 设置事件标志
// 注意,这里设置了 KEVENT_FLAG_IMMEDIATE
uint32_t kev_flags = KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS;

// ...

again:
// 填充同步等待事件,并准备进行等待
_dispatch_kq_fill_workloop_sync_event(&ke[n++], DISPATCH_WORKLOOP_SYNC_WAIT, wlh, dq_state, dsc->dsc_waiter);
// 查询处理事件
n = _dispatch_kq_poll(wlh, ke, n, ke, n, NULL, NULL, kev_flags);


// 如果有事件被处理
if (n) {
// ......
n = 0;
goto again;
}

}

乍一看上去,线程的阻塞是通过不停的 goto again 实现的,其实不然。先进去看下 _dispatch_kq_poll 函数的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
static int
_dispatch_kq_poll(dispatch_wlh_t wlh, dispatch_kevent_t ke, int n,
dispatch_kevent_t ke_out, int n_out, void *buf, size_t *avail,
uint32_t flags)
{
// 用于标识 kqueue 是否已初始化
bool kq_initialized = false;
int r = 0;

// 使用 dispatch_once_f 确保 _dispatch_kq_init 仅被执行一次,用于初始化 kqueue
dispatch_once_f(&_dispatch_kq_poll_pred, &kq_initialized, _dispatch_kq_init);
if (unlikely(kq_initialized)) {
// 如果当前线程是进行初始化的线程,事件循环需要内存压力源和调试通道,但创建这些会递归调用 _dispatch_kq_poll(),因此不能在 dispatch once 内进行初始化
_dispatch_memorypressure_init();
_voucher_activity_debug_channel_init();
}

#if !DISPATCH_USE_KEVENT_QOS
if (flags & KEVENT_FLAG_ERROR_EVENTS) {
// 如果不使用 kevent_qos 并且标志包含 KEVENT_FLAG_ERROR_EVENTS
// 模拟 KEVENT_FLAG_ERROR_EVENTS
for (r = 0; r < n; r++) {
ke[r].flags |= EV_RECEIPT; // 添加 EV_RECEIPT 标志用于接收错误事件的回执
}
n_out = n; // 设置输出事件数量
}
#endif

retry:
if (unlikely(wlh == NULL)) {
// 如果工作循环句柄 wlh 为 NULL,触发内部崩溃,因为这是无效的
DISPATCH_INTERNAL_CRASH(wlh, "Invalid wlh");
} else if (wlh == DISPATCH_WLH_ANON) {
// 获取 kqueue 文件描述符
int kqfd = _dispatch_kq_fd();
#if DISPATCH_USE_KEVENT_QOS
if (_dispatch_kevent_workqueue_enabled) {
flags |= KEVENT_FLAG_WORKQ; // 如果启用了工作队列,添加 KEVENT_FLAG_WORKQ 标志
}
// 调用 kevent_qos 进行事件轮询,处理 QoS(服务质量)相关的事件
r = kevent_qos(kqfd, ke, n, ke_out, n_out, buf, avail, flags);
#else
(void)buf;
(void)avail;
const struct timespec timeout_immediately = {}, *timeout = NULL;
if (flags & KEVENT_FLAG_IMMEDIATE) timeout = &timeout_immediately;
// 调用 kevent 进行事件轮询
r = kevent(kqfd, ke, n, ke_out, n_out, timeout);
#endif
#if DISPATCH_USE_KEVENT_WORKLOOP
} else {
// 如果使用工作循环
flags |= KEVENT_FLAG_WORKLOOP; // 添加 KEVENT_FLAG_WORKLOOP 标志
if (!(flags & KEVENT_FLAG_ERROR_EVENTS)) {
flags |= KEVENT_FLAG_DYNAMIC_KQ_MUST_EXIST; // 如果未设置错误事件标志,添加动态 kqueue 必须存在标志
}
// 使用 kevent_id 进行工作循环事件的轮询
r = kevent_id((uintptr_t)wlh, ke, n, ke_out, n_out, buf, avail, flags);
#endif // DISPATCH_USE_KEVENT_WORKLOOP
}

// kevent、kevent_id 一般在发生错误的时候返回 -1
if (unlikely(r == -1)) {
// 如果 kevent 调用返回 -1,则根据 errno 进行错误处理
int err = errno;
switch (err) {
case ENOMEM:
_dispatch_temporary_resource_shortage(); // 处理临时资源短缺
/* FALLTHROUGH */
case EINTR:
goto retry; // 如果被信号中断,重试操作
case EBADF:
DISPATCH_CLIENT_CRASH(err, "Do not close random Unix descriptors"); // 文件描述符错误,导致客户端崩溃
#if DISPATCH_USE_KEVENT_WORKLOOP
case ENOENT:
if ((flags & KEVENT_FLAG_ERROR_EVENTS) &&
(flags & KEVENT_FLAG_DYNAMIC_KQ_MUST_EXIST)) {
return 0; // 在某些情况下,忽略错误并返回 0
}
/* FALLTHROUGH */
#endif // DISPATCH_USE_KEVENT_WORKLOOP
default:
DISPATCH_CLIENT_CRASH(err, "Unexpected error from kevent"); // 其他错误导致客户端崩溃
}
}
return r; // 返回处理的事件数量或状态
}

根据上述源码可知,_dispatch_kq_poll 内部核心逻辑是对 kevent/kevent_id 的调用。根据内部判断条件可以看出对于串行队列,会调用 kevent_id 函数。_dispatch_kq_poll 返回值实际上就是 kevent/kevent_id 函数的返回值。

kevent 和 kevent_id 是什么呢?
kevent 通常与 kqueue 结合使用,它们提供了一种高效的方式来监控事件的发生。

  • kqueue
    • 事件通知接口,它允许应用程序监控多个事件源,并在事件发生时获取通知。
    • 用内核级的事件通知机制,减少了用户空间和内核空间之间的上下文切换,提高了性能。
  • kevent
    • 是一个系统调用,通过 kevent() 系统调用,将事件注册到 kqueue 中。
    • 使用 kevent() 系统调用,可以等待事件发生并获取已触发的事件列表。

kevent 函数定义如下:

1
2
3
4
int kevent(int kq,
const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout);

各参数含义如下:

  • kq
    • kqueue 描述符,可以使用 kqueue() 系统调用获取 kq,用于标识一个事件队列。
  • changelist
    • 指定要添加、修改或删除的事件。
  • nchanges
    • changelist 数组中的元素数量。
  • eventlist
    • 用于接收已经发生的事件。调用成功返回后,这个数组包含了触发的事件信息。
  • nevents
    • eventlist 数组的大小,指定可以返回的最大事件数量。
  • timeout
    • 指定等待事件发生的超时时间。如果为 NULLkevent() 系统调用将阻塞线程直到监听的事件发生。

kevent 返回一个正整数,表示已发生的事件的数量。kevent_idkevent 核心功能是一样的,kevent_idkevent 的变体,提供了更细粒度的事件管理。可以在 Apple 开源的 XNU 源码中看到 kevent/kevent_id 实现逻辑,链接:apple-oss-distributions/xnu

总结来说,kevent/kevent_id 是一个系统调用,可以用来监听注册的事件的发生,其返回了已发生的事件的数量。

但是,从 _dispatch_event_loop_wait_for_ownershipkev_flags 的配置:

1
2
3
// 设置事件标志
// 注意,这里设置了 KEVENT_FLAG_IMMEDIATE
uint32_t kev_flags = KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS;

这里有两个配置:

  • KEVENT_FLAG_IMMEDIATE
    • 使 kevent_id/kevent 是非阻塞的,即调用立刻返回,不用等到事件发生时再返回,所以增加了该配置后会使调用不阻塞线程。
  • KEVENT_FLAG_ERROR_EVENTS
    • 即使发生错误也要返回事件,如果在处理 changelist 时遇到错误,会将错误信息作为一个事件返回给用户,而不是直接返回错误码。

所以,从表面上看,调用 kevent_id 函数的时候,是非阻塞的,同时也说明了 _dispatch_kq_poll 函数是非阻塞的。kevent_id 返回了本次调用监听到的发生的事件的数量(含错误事件数量),由于 kevent_id 函数的调用是非阻塞的,所以根本没有机会等到监听的事件的发生,也就是说 _dispatch_kq_poll 正常情况下返回都是 0。

_dispatch_event_loop_wait_for_ownership 函数的实现中,只有 _dispatch_kq_poll 返回值大于 0 时,才会 goto again 逻辑。所以,_dispatch_event_loop_wait_for_ownership 对线程的阻塞,不是靠不停 goto again 实现的。

那么问题就来了,既然不会不停地 goto again,看起来,_dispatch_event_loop_wait_for_ownership 对线程的阻塞,只能是靠 kevent_id 函数了。但是 kevent_id 不是配置了 KEVENT_FLAG_IMMEDIATE 之后,就变成了非阻塞的了吗?

kevent_id 到底会不会阻塞线程呢?
忽略其他参数配置,单纯从 kevent_id 传入 KEVENT_FLAG_IMMEDIATE 这个参数看,kevent_id 确实不会阻塞线程。但是,需要再看下传入的 dispatch_kevent_s 信息:

1
_dispatch_kq_fill_workloop_sync_event(&ke[n++], DISPATCH_WORKLOOP_SYNC_WAIT, wlh, dq_state, dsc->dsc_waiter);

这里 _dispatch_kq_fill_workloop_sync_event 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
static void
_dispatch_kq_fill_workloop_sync_event(dispatch_kevent_t ke, int which,
dispatch_wlh_t wlh, uint64_t dq_state, dispatch_tid tid)
{
dispatch_queue_t dq = (dispatch_queue_t)wlh;
pthread_priority_t pp = 0;
uint32_t fflags = 0;
uint64_t mask = 0;
uint16_t action = 0;

switch (which) {
case DISPATCH_WORKLOOP_SYNC_DISCOVER:
dispatch_assert(_dq_state_received_sync_wait(dq_state));
dispatch_assert(_dq_state_in_uncontended_sync(dq_state));
action = EV_ADD | EV_DISABLE;
fflags = NOTE_WL_SYNC_WAKE | NOTE_WL_DISCOVER_OWNER |
NOTE_WL_IGNORE_ESTALE;
mask = DISPATCH_QUEUE_ROLE_MASK | DISPATCH_QUEUE_RECEIVED_SYNC_WAIT |
DISPATCH_QUEUE_UNCONTENDED_SYNC;
break;

case DISPATCH_WORKLOOP_SYNC_WAIT:
action = EV_ADD | EV_DISABLE;
fflags = NOTE_WL_SYNC_WAIT;
pp = _dispatch_get_priority();
if (_dispatch_qos_from_pp(pp) == 0) {
pp = _dispatch_qos_to_pp(DISPATCH_QOS_DEFAULT);
}
break;

case DISPATCH_WORKLOOP_SYNC_FAKE:
action = EV_ADD | EV_DISABLE;
fflags = NOTE_WL_SYNC_WAKE;
break;

case DISPATCH_WORKLOOP_SYNC_WAKE:
dispatch_assert(_dq_state_drain_locked_by(dq_state, tid));
action = EV_ADD | EV_DISABLE;
fflags = NOTE_WL_SYNC_WAKE | NOTE_WL_DISCOVER_OWNER;
break;

case DISPATCH_WORKLOOP_SYNC_END:
action = EV_DELETE | EV_ENABLE;
fflags = NOTE_WL_SYNC_WAKE | NOTE_WL_END_OWNERSHIP;
break;

default:
DISPATCH_INTERNAL_CRASH(which, "Invalid transition");
}

*ke = (dispatch_kevent_s){
.ident = tid,
.filter = EVFILT_WORKLOOP,
.flags = action,
.fflags = fflags,
.udata = (uintptr_t)wlh,
.qos = (__typeof__(ke->qos))pp,

.ext[EV_EXTIDX_WL_MASK] = mask,
.ext[EV_EXTIDX_WL_VALUE] = dq_state,
};
if (fflags & NOTE_WL_DISCOVER_OWNER) {
ke->ext[EV_EXTIDX_WL_ADDR] = (uintptr_t)&dq->dq_state;
}
_dispatch_kevent_wlh_debug(_dispatch_workloop_actions[which], ke);
}

由于传入的 which 参数是 DISPATCH_WORKLOOP_SYNC_WAIT,所以 kefflags 的值是:

1
fflags = NOTE_WL_SYNC_WAIT

NOTE_WL_SYNC_WAIT 不是在 libdispatch 定义的,是在 XNU 中定义的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* data/hint fflags for EVFILT_WORKLOOP, shared with userspace
*
* The ident for thread requests should be the dynamic ID of the workloop
* The ident for each sync waiter must be unique to that waiter [for this workloop]
*
*
* Commands:
*
* @const NOTE_WL_SYNC_WAIT [in/out]
* This bit is set when the caller is waiting to become the owner of a workloop.
* If the NOTE_WL_SYNC_WAKE bit is already set then the caller is not blocked,
* else it blocks until it is set.
*
* The QoS field of the knote is used to push on other owners or servicers.
*
* @const NOTE_WL_SYNC_WAKE [in/out]
* Marks the waiter knote as being eligible to become an owner
* This bit can only be set once, trying it again will fail with EALREADY.
*
* This modifier doesn't affect NOTE_WL_END_OWNERSHIP.
*/
#define NOTE_WL_SYNC_WAIT 0x00000004
#define NOTE_WL_SYNC_WAKE 0x00000008

NOTE_WL_SYNC_WAIT 对应的,还有个 NOTE_WL_SYNC_WAKE,两者作用如下:

  • NOTE_WL_SYNC_WAIT
    • 这个标志表示调用者正在等待成为工作循环的所有者。
    • 如果设置了这个标志,调用线程会阻塞,直到它被唤醒成为所有者。
    • 可以用来实现线程间的同步,让一个线程等待获得工作循环的所有权。
  • NOTE_WL_SYNC_WAKE
    • 这个标志用于标记等待的 knote 可以成为所有者了。
    • 设置这个标志会唤醒之前因 NOTE_WL_SYNC_WAIT 而阻塞的线程

所以,由于传入的 ke(dispatch_kevent_s) 信息配置了 NOTE_WL_SYNC_WAIT,所以及时调用 kevent_id 时配置了 KEVENT_FLAG_IMMEDIATEkevent_id 仍然会阻塞线程,直到使用 NOTE_WL_SYNC_WAKE 再次唤醒被阻塞的线程。

2、唤醒阻塞的线程

经过上一步的分析,线程的阻塞、唤醒逻辑就比较清晰了。

再回头看下 _dispatch_sync_f_slow 实现逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
// ......

// 阻塞线程,等待队列的所有权
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

// ......

// 执行任务,并重置 dq_state、唤醒线程
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func, top_dc_flags
DISPATCH_TRACE_ARG(&dsc)); // 传递追踪参数
}

_dispatch_sync_invoke_and_complete_recurse 函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static void
_dispatch_sync_invoke_and_complete_recurse(dispatch_queue_class_t dq,
void *ctxt, dispatch_function_t func, uintptr_t dc_flags
DISPATCH_TRACE_ARG(void *dc))
{
// 执行任务
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
// 重置 dq_state、唤醒线程
_dispatch_sync_complete_recurse(dq._dq, NULL, dc_flags);
}

static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
uintptr_t dc_flags)
{
bool barrier = (dc_flags & DC_FLAG_BARRIER);
do {
if (dq == stop_dq) return;
if (barrier) {
dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
} else {
_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
}
dq = dq->do_targetq;
barrier = (dq->dq_width == 1);
} while (unlikely(dq->do_targetq));
}

到这里,再回头看下调用 _dispatch_sync_f_slow 函数地方:

1
2
3
4
5
6
// 尝试获取屏障锁
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
// 如果无法获取屏障锁,调用 _dispatch_sync_f_slow 处理
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}

传入的 dc_flags 实际为:

1
dc_flags = DC_FLAG_BARRIER | dc_flags;

所以,_dispatch_sync_complete_recursebarriertrue,因此会执行:

1
dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);

dx_wakeup 是个宏:

1
2
#define dx_wakeup(x, y, z) dx_vtable(x)->dq_wakeup(x, y, z)
#define dx_vtable(x) (&(x)->do_vtable->_os_obj_vtable)

将宏完全展开:

1
&(dq)->do_vtable->_os_obj_vtable->dq_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE)

到这里,就需要再回头看下另篇文章《GCD 底层原理 2 - dispatch_queue》里的关于串行队列虚表 vtable 的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
const struct dispatch_lane_vtable_s _dispatch_queue_serial_vtable = {
._os_obj_xref_dispose = _dispatch_xref_dispose,
._os_obj_dispose = _dispatch_dispose,
._os_obj_vtable = {
.do_kind = "queue_serial",
.do_type = DISPATCH_QUEUE_SERIAL_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,

.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_push,
},
};

所以,前面 dx_wakeup 的调用,实际调用的是 _dispatch_lane_wakeup 函数,该函数实现源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}

经过下面函数执行路径:

1
2
3
4
5
_dispatch_queue_wakeup
⬇️
_dispatch_workloop_barrier_complete
⬇️
_dispatch_event_loop_end_ownership

会调用到 _dispatch_event_loop_end_ownership 函数,_dispatch_event_loop_end_ownership 函数核心逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void
_dispatch_event_loop_end_ownership(dispatch_wlh_t wlh, uint64_t old_state,
uint64_t new_state, uint32_t flags)
{
uint32_t kev_flags = KEVENT_FLAG_IMMEDIATE | KEVENT_FLAG_ERROR_EVENTS;
dispatch_kevent_s ke[2];
int n = 0;

// ......

if (!_dq_state_in_uncontended_sync(old_state)) {
dispatch_tid tid = _dispatch_tid_self();
_dispatch_kq_fill_workloop_sync_event(&ke[n++],
DISPATCH_WORKLOOP_SYNC_END, wlh, new_state, tid);
}

if (_dispatch_kq_poll(wlh, ke, n, ke, n, NULL, NULL, kev_flags)) {
_dispatch_kevent_workloop_drain_error(&ke[0], 0);
__builtin_unreachable();
}

// ......
}

这里又看到了熟悉的逻辑:

  • 调用 _dispatch_kq_fill_workloop_sync_event 创建 ke
  • 调用 _dispatch_kq_poll

这里调用 _dispatch_kq_fill_workloop_sync_event 时,传入的是 DISPATCH_WORKLOOP_SYNC_END,结合前面提到的 _dispatch_kq_fill_workloop_sync_event 源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static void
_dispatch_kq_fill_workloop_sync_event(dispatch_kevent_t ke, int which,
dispatch_wlh_t wlh, uint64_t dq_state, dispatch_tid tid)
{
// ......

switch (which) {

// ......

case DISPATCH_WORKLOOP_SYNC_END:
action = EV_DELETE | EV_ENABLE;
fflags = NOTE_WL_SYNC_WAKE | NOTE_WL_END_OWNERSHIP;
break;

default:
DISPATCH_INTERNAL_CRASH(which, "Invalid transition");
}

*ke = (dispatch_kevent_s){
.ident = tid,
.filter = EVFILT_WORKLOOP,
.flags = action,
.fflags = fflags,
.udata = (uintptr_t)wlh,
.qos = (__typeof__(ke->qos))pp,

.ext[EV_EXTIDX_WL_MASK] = mask,
.ext[EV_EXTIDX_WL_VALUE] = dq_state,
};

// ......
}

可以知道,这里 fflags 配置的是:

1
fflags = NOTE_WL_SYNC_WAKE | NOTE_WL_END_OWNERSHIP;

结合前面 NOTE_WL_SYNC_WAITNOTE_WL_SYNC_WAKE 相关内容可以知道,配置了 NOTE_WL_SYNC_WAKE 之后,在 _dispatch_kq_poll 内部调用的 kevent_id,会使当前唤醒当前被阻塞的线程,开始执行当前任务。

除此之外,在前面获取屏障锁成功之后的逻辑,所调用的 _dispatch_lane_barrier_sync_invoke_and_complete 函数,内部也是基本相同的唤起阻塞线程的逻辑。

可以使用一个简单的 Demo 验证这个流程:

添加一个 Symbolic 断点,当调用到 kevent_id 函数时候,打印出调用堆栈:

编译运行,打印信息如下:

可以看到,打印的堆栈信息,和前面分析得出的结论一致。

六、总结

前面总结了使用 dispatch_sync 往串行队列提交任务的源码执行逻辑,对于 dispatch_sync 往并发队列提交任务的处理流程,将在后续分析 dispatch_async 源码时一起分析。

可以用下图表示上述的源码逻辑:

Tags: 底层