Kotlin之Flow学习

官方:kotlinx.coroutines.flow - kotlinx-coroutines-core

流到底是什么?流是可以实现断断续续的收集数据。

冷流 :只有订阅者订阅时,才开始执行发射数据流的代码。并且冷流和订阅者只能是一对一的关系,当有多个不同的订阅者时,消息是重新完整发送的。也就是说对冷流而言,有多个订阅者的时候,他们各自的事件是独立的。

热流:无论有没有订阅者订阅,事件始终都会发生。当 热流有多个订阅者时,热流与订阅者们的关系是一对多的关系,可以与多个订阅者共享信息。

创建Flow:

1
var f = flow<Int> { emit(1) }

single() : 这个流只能返回一个结果

1
2
3
4
var result = f.map {"笑了"+ it + "下"}
.map {"小王"+ it}
.single()
// 打印结果:result = "小王笑了1下"

collect: 打印每次的emit结果:

1
2
3
4
5
6
var f = flow<Int> {emit(1)emit(2)}
f.map {"笑了"+ it + "下"}
.collect{result->
Log.i("zjc","${result}")
}
// 答应结果: 笑了1下 笑了2下

flowOn: 指定当前流在哪个线程执行,他不会影响下一个流的执行线程:

1
2
3
4
5
6
7
lifecycleScope.launch {
f.map {Log.i("zjc","${Thread.currentThread().name}")}// 子线程
.flowOn(Dispatchers.IO)
.map {Log.i("zjc","${Thread.currentThread().name}")}// 主线程
.flowOn(Dispatchers.Main).collect {
}
}

filter操作符 我们可以对结果集添加过滤条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

fun loadData1() = flow {
println("进入加载数据的方法")
for (i in 1..3) {
delay(1000)
emit(i)
}
}

runBlocking {
loadData1().filter {
it > 1
}.collect {
println(it)
}
}
// 结果:2 3

buffer操作符,尽可能减少发送数据和处理事件的时间 Kotlin Flow 看这一篇 带你入门~_黄林晴-CSDN博客

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
var startTime: Long = 0
var endTime: Long = 0


fun loadData1() = flow {
startTime = System.currentTimeMillis() / 1000
for (i in 1..3) {
delay(1000)
emit(i)
}
}


fun main() {
runBlocking {
loadData1().collect { value ->
delay(2000)
println("$value")
}
endTime = System.currentTimeMillis() / 1000
println("处理时间:${endTime - startTime}s")
}
}
/**
*1 2 3
*处理时间 9s
*/

现在改成buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
fun main() {
runBlocking {
loadData1().buffer().collect { value ->
delay(2000)
println("$value")
}
endTime = System.currentTimeMillis() / 1000
println("处理时间:${endTime - startTime}s")
}
}
/**
*1 2 3
*处理时间 8s
*/