sunxin's Studio.

Kotlin协程实现分析

字数统计: 1.6k阅读时长: 7 min
2020/12/11 Share

Kotlin协程实现分析

  • 协程的创建与启动
  • 协程调度器都有哪些,分别表示什么含义
  • delay既然是IO异步任务,是怎么做到延迟协程的代码向下执行的
  • suspend关键字的作用
  • 并发流程控制 async await
  • 协程挂起与恢复原理逆向剖析
  • 如何让一个普通函数成为一个挂起函数,即让调用方以一个同步的方式拿到调用结果

协程的创建与启动

1
2
3
4
5
6
7
8
GlobalScope.launch(Dispatchers.Unconfined) {
Log.e(TAG, "coroutine is running")
val result1 = request1()
val result2 = request2(result1)
val result3 = request3(result2)

updateUI(result3)
}

启动一个协程可以使用launchasync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public fun CoroutineScope.launch(
// 协程调度器 Dispatchers.Unconfined 当前线程
context: CoroutineContext = EmptyCoroutineContext,
// 协程的启动模式
start: CoroutineStart = CoroutineStart.DEFAULT,
// 传入的闭包代码块
block: suspend CoroutineScope.() -> Unit
): Job {
// 包装一层
val newContext = newCoroutineContext(context)
// 判断启动模式
val coroutine = if (start.isLazy)
// 懒加载
LazyStandaloneCoroutine(newContext, block) else
// 非懒加载
StandaloneCoroutine(newContext, active = true)
// 根据启动模式,执行
coroutine.start(start, coroutine, block)
return coroutine
}

// AbstractCoroutine.kt
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
// 会执行到 CoroutineStart 的invoke 方法
start(block, receiver, this)
}

//CoroutineStart.kt
// block: 待执行代码块 completion: 编译器生成的回调
public operator fun <T> invoke(block: suspend () -> T, receiver: R, completion: Continuation<T>) =
// 判断协程启动模式
when (this) {
// 默认 ,可随时取消的
CoroutineStart.DEFAULT ->block.startCoroutineCancellable(receiver, completion)
// 自动
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
// 不指定
CoroutineStart.UNDISPATCHED ->block.startCoroutineUndispatched(receiver, completion)
// 延迟启动
CoroutineStart.LAZY -> Unit // will start lazily
}

// Cancellable.kt
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
}

// Dispatched.kt
internal fun <T> Continuation<T>.resumeCancellable(value: T) = when (this) {
is DispatchedContinuation -> resumeCancellable(value)
else -> resume(value)
}

// Continuation 的扩展函数,恢复线程使用,最终会回调执行到resumeWith,然后执行到invokeSuspend
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))

CoroutineContext

可以理解为协程的上下文,是一种key-value数据结构

CoroutineStart

协程启动模式

模式 说明
CoroutineStart.DEFAULT 默认模式,创建即启动,可随时取消
CoroutineStart.LAZY 延迟启动模式,只有调用start方法才能启动
CoroutineStart.ATOMIC 自动模式,创建即启动,启动前不可取消

协程调度器

image.png

模式 说明
Dispatcher.IO 指定协程运行的线程为IO线程
Dispatcher.Main 指定协程运行的线程为主线程
Dispatcher.Default 默认的,启动协程时会启动一个线程
Dispatcher.Undefined 不指定,就是在当前线程执行,协程恢复后运行的线程取决于挂起时的线程

协程挂起与恢复原理

挂起就是这个协程从当前执行他的线程上脱离。

  • 协程挂起的本质: 就是方法的挂起return
  • 协程恢复的本质: callback的回调(Continution)

挂起函数

1
2
3
4
5
suspend fun request1(): String {
delay(2 * 1000) //不会暂停线程,但会暂停当前所在的协程
Log.e(TAG, "request1 work on ${Thread.currentThread().name}")
return "result from request1"
}

将协程suspend方法通过反编译成Java代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//1. 被标记为suspend的函数会被编译器自动添加 Continuation 参数,实现类是ContinuationImpl
public final Object request1(@NotNull Continuation $completion) {
Object $continuation;

// 这里将 $completion 传入
$continuation = new ContinuationImpl($completion) {
// $FF: synthetic field
Object result;
int label; // 默认 0
Object L$0;

// 2. 当协程恢复的会执行invokeSuspend
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
// 3. 再递归调用自己
return CoroutineScene.this.request1(this);
}
};
}

Object $result = ((<undefinedtype>)$continuation).result;

// 4. 挂起函数标记 CoroutineSingletons.COROUTINE_SUSPENDED
Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();

switch(((<undefinedtype>)$continuation).label) {
// 判断label为0
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).L$0 = this;
// label赋值为1
((<undefinedtype>)$continuation).label = 1;
// 判断delay的返回值 == var6 也即是CoroutineSingletons.COROUTINE_SUSPENDED 的时候return,执行挂起
// 当delay执行完成之后,会回调continuation的invokeSuspend方法,再次调用
if (DelayKt.delay(2000L, (Continuation)$continuation) == var6) {
// 返回挂起标记,直接return了,这里表示挂起了,下面不再执行
return var6;
}
break;
case 1:
CoroutineScene var7 = (CoroutineScene)((<undefinedtype>)$continuation).L$0;
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

String var2 = "after delay";
boolean var3 = false;
System.out.println(var2);
return "result from request1";
}

看DelayKt.delay 的返回值得时候,需要找到DelayKt的class文件并反编译成java源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// DelayKt.java
// 传入 Continuation
public static final Object delay(long timeMillis, @NotNull Continuation $completion) {
if (timeMillis <= 0L) {
return Unit.INSTANCE;
} else {
int $i$f$suspendCancellableCoroutine = false;
int var5 = false;
CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted($completion), 1);
CancellableContinuation cont = (CancellableContinuation)cancellable$iv;
int var8 = false;
getDelay(cont.getContext()).scheduleResumeAfterDelay(timeMillis, cont);
// 看下getResult方法
Object var10000 = cancellable$iv.getResult();
if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
DebugProbesKt.probeCoroutineSuspended($completion);
}

return var10000;
}
}

看下协程时怎么改变方法的状态的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
internal fun getResult(): Any? {
installParentCancellationHandler()
// 如果trySuspend返回true,就返回协程挂起标记
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state
if (state is CompletedExceptionally) throw recoverStackTrace(state.cause, this)
// if the parent job was already cancelled, then throw the corresponding cancellation exception
// otherwise, there is a race is suspendCancellableCoroutine { cont -> ... } does cont.resume(...)
// before the block returns. This getResult would return a result as opposed to cancellation
// exception that should have happened if the continuation is dispatched for execution later.
if (resumeMode == MODE_CANCELLABLE) {
val job = context[Job]
if (job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelResult(state, cause)
throw recoverStackTrace(cause, this)
}
}
return getSuccessfulResult(state)
}

// 方法的状态
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2

private val _decision = atomic(UNDECIDED)

private fun trySuspend(): Boolean {
// 轮询方法的状态
_decision.loop { decision ->
when (decision) {
//没有决定,第一次肯定返回true
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}

image.png

图示

image.png

协程的应用

如何让一个普通函数成为一个挂起函数,即让调用方以一个同步的方式拿到调用结果

读取assets目录下的文件

以同步的形式拿到返回值

suspendCancellableCoroutine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
object CoroutineScene3 {
suspend fun parseAssetsFile(assetManager: AssetManager, fileName: String): String {
// suspendCoroutine<> { }
return suspendCancellableCoroutine { continuation ->
Thread(Runnable {
val inputStream = assetManager.open(fileName)
val br = BufferedReader(InputStreamReader(inputStream))
var line: String?
var stringBuilder = StringBuilder()

// while ((line=br.readLine())!=null){
//
// }

do {
line = br.readLine()
if (line != null) stringBuilder.append(line) else break
} while (true)

inputStream.close()
br.close()

Thread.sleep(2000)

Log.e("coroutine", "parseassetsfile completed")
continuation.resumeWith(Result.success(stringBuilder.toString()))
}).start()
}
}
}

以同步的方式调用

1
val content = CoroutineScene3.parseAssetsFile(assets, "config.json")

小结

  1. kotlin协程的核心是线程的挂起和恢复,挂起和恢复的核心是 return - callback
  2. suspend本质就是一个提醒,在反编译成java代码后会添加一个回调Continuation
  3. 协程的非阻塞挂起实质上是以非阻塞式的形式写出了阻塞式的代码
  4. 让一个普通函数成为一个挂起函数
CATALOG
  1. 1. Kotlin协程实现分析
  2. 2. 协程的创建与启动
    1. 2.1. CoroutineContext
    2. 2.2. CoroutineStart
  3. 3. 协程调度器
  4. 4. 协程挂起与恢复原理
    1. 4.1. 挂起函数
  5. 5. 协程的应用
    1. 5.1. 读取assets目录下的文件
      1. 5.1.1. suspendCancellableCoroutine
  6. 6. 小结