当某些事件不断地反复发生,使用 Stream
模式比 Future
更好, node.js 中也有 Stream
这个东西,所以熟悉 node.js 的开发者理解 Stream
应该非常简单。
Stream概念
Stream
的概念比较抽象,我们可以将 Stream
看作一条没有水的小溪,每当小溪的上游加一次水后,我们就可以在小溪的下游接一次水。水会经过一段时间从上游流至下游,具体是什么时候加的水,加了多少次,每次需要多久才能流至下游,咱也不知道,咱也不敢问。
我们将加一次水看作一个事件,而这个事件的时间是个不确定的因素,我们不知道事件什么时候发生、什么时候结束,但是我们可以一直在下游等待接水,等待所有的事件完成。整个过程都是异步的,这大概就是 Stream
的概念。
创建 Stream
我们可以通过以下两种方式创建 Stream
:
构造函数
fromFuture
从 Future 创建单订阅流,当 Future 完成时将触发一个 data
或 error
回调函数,然后使用 Down
事件关闭这个流,例如:
void main() {
final request = Future<String>(() {
throw new FormatException('Expected at least 1 section');
});
final streamRequest = Stream.fromFuture(request);
streamRequest.listen((data) => print(data), onError: (e) => print(e));
print(streamRequest);// output: Instance of '_ControllerStream<String>'
// output: FormatException: Expected at least 1 section
}
fromFutures
从一组 Future 创建新的单订阅流,每个 Future 完成时将触发自己的 data
或 error
回调函数,当所有 Future 完成后,订阅流将会关闭,如果 Futures 为空,订阅流则会立即关闭,例如:
void main() {
final request = Future<String>(() {
throw new FormatException('Expected at least 1 section');
});
final request2 = Future<String>(() {
throw new FormatException('Expected at least 2 section');
});
final streamRequest = Stream.fromFutures([request, request2]);
streamRequest.listen((data) => print(data), onError: (e) => print(e));
print(streamRequest);// output: Instance of '_ControllerStream<String>'
/**
* output:
* FormatException: Expected at least 1 section
* FormatException: Expected at least 2 section
*/
}
fromIterable
创建一个单订阅流,这个流从一个可迭代对象中获取数据,例如:
void main() {
final stream = Stream.fromIterable([1, 2, 3]);
print(stream);// output: Instance of '_GeneratedStreamImpl<int>'
}
StreamController
StreamController
为我们提供了非常丰富的功能,我们可以在 StreamController
上发送数据、捕获错误和获取结果,例如:
import 'Dart:async';
void main() {
final controller = StreamController();
controller.sink.add(123);
controller.sink.add('foo');
print(controller);// output: Instance of '_AsyncStreamController<dynamic>'
}
上面的代码中,我们通过 add
方法,可以在流中添加任意类型的数据,如果需要限制数据类型,则可以使用泛型,例如:
import 'Dart:async';
void main() {
StreamController<int> controller = StreamController();
controller.sink.add(123);
controller.sink.add(456);
print(controller);// output: Instance of '_AsyncStreamController<int>'
}
接着,我们可以使用 listen
方法来获取结果,并使用 close
方法关闭 sink
实例,防止内存泄漏和意外行为,例如:
import 'Dart:async';
void main() {
StreamController<int> controller = StreamController();
controller.stream.listen((data) => print(data));
controller.sink.add(123);
controller.sink.add(456);
controller.close();
/**
* output:
* 123
* 456
*/
}
Stream类型
Stream
分为两种类型:
Single subscription streams(单订阅流)
单订阅流通常用于流式传输很大的连续数据块,它在 Stream
的整个生命周期内只允许存在一个 listener
,它在有 listener
之前不会生成事件,并且在 listener
取消收听后会停止发送事件,即使你仍在 sink.add
更多事件。
需要注意的是,即使 listener
取消收听,单订阅流也不允许出现第二个 listener
,否则将会报错,例如:
import 'dart:async';
void main() {
final controller = StreamController();
controller.stream.listen(print);
controller.stream.listen(print);
controller.sink.add(123);
controller.close();
// Unhandled exception: Bad state: Stream has already been listened to.
}
Broadcast streams(广播流)
一般的流都是单订阅流,从 Stream
继承的广播流必须重写 isBroadcast
才会返回 true
,广播流允许存在任意数量的 listener
,并且无论是否存在 listener
,它都能产生事件,所以中途加入的 listener
不会侦听到已发生的事件。
如果在触发事件的同时加入 listener
,则此 listener
不会接收到本次事件,如果 listener
取消收听,则此 listener
会立即停止接收事件。
我们可以通过 asBroadcastStream
在单订阅流之上创建广播流,例如:
import 'dart:async';
void main() {
final controller = StreamController();
final stream = controller.stream.asBroadcastStream();
stream.listen(print);
stream.listen(print);
controller.sink.add(123);
controller.close();
/**
* output:
* 123
* 123
*/
}
Stream转换
Dart 提供了 map
、where
、expand
和 take
方法将已存在的 Stream
转换为新的 Stream
。
map
我们可以使用 map
方法对流的数据进行遍历,和 listen
的效果类似,例如:
import 'dart:async';
void main() {
StreamController<int> controller = StreamController();
controller.stream.map((data) => data += 1).listen((data) => print(data));
controller.sink.add(123);
controller.sink.add(456);
controller.close();
/**
* output:
* 124
* 457
*/
}
上面的代码中,我们向 controller
中添加了 2 个 数值,map
方法使这些数值每次都会自增,所以最后 listen
方法侦听到的结果是 124
和 457
。
where
我们可以使用 where
对流的数据进行筛选,where
方法接受一个匿名函数作为参数,函数的参数是我们向 sink
中添加的数据,函数的返回值为 true
时,数据才会允许通过,例如:
import 'dart:async';
void main() {
StreamController<int> controller = StreamController();
final whereStream = controller.stream.where((data) => data == 123);
whereStream.listen((data) => print(data));
controller.sink.add(123);
controller.sink.add(456);
controller.close();
// output: 123
}
上面的代码中,where
条件中定义了数据必须与 123
相同才会被 listen
侦听到,所以向流中添加了 2 条数据,而打印的结果只有 123
。
expand
我们可以使用 expand
扩展现有的流,此方法接受一个方法作为参数,返回值必须是一个 Iterable
类型的数据,例如:
import 'dart:async';
void main() {
StreamController<int> controller = StreamController();
controller.stream
.expand((data) => [data, data.toDouble()])
.listen((data) => print(data));
controller.sink.add(123);
controller.sink.add(456);
controller.close();
/**
* output:
* 123
* 123.0
* 456
* 456.0
*/
}
上面的代码中,expand
的回调函数每次返回 list
,该 list
包含一个整数及其浮点数,所以最后打印的结果是 4 个数值。
take
我们可以通过 take
方法控制 Stream
中的元素数量,take
方法接受一个数值作为参数,例如:
import 'dart:async';
void main() {
StreamController<int> controller = StreamController();
controller.stream.take(1).listen((data) => print(data));
controller.sink.add(123);
controller.sink.add(456);
controller.close();
// output: 123
}
上面的代码中,我们向 controller
中添加了 2 个 数值,通过 take
方法截取后,最后 listen
方法只侦听到第一个数值。
transform
当我们需要处理更复杂的 Stream
时,我们可以使用 transform
方法,该方法接受一个 StreamTransformer
参数,当我们每次向 sink
中添加数据时,数据会先经过 transform
的处理,例如:
import 'dart:async';
void main() {
StreamController<int> controller = StreamController<int>();
final transformer = StreamTransformer<int, String>
.fromHandlers(handleData: (value, sink) {
value == 123 ? sink.add('test success') : sink.addError('test error');
});
controller.stream
.transform(transformer)
.listen((data) => print(data), onError: (err) => print(err));
controller.sink.add(123);// output: test success
controller.sink.add(456);// output: test error
controller.close();
}
上面的代码中,每次向 sink
中添加数据时, StreamTransformer<S,T>
的 fromHandlers
方法就会返回一条新的 Stream
,其中 S
代表我们所添加的数据类型,在这里我们添加的是 int
类型,T
代表 handleData
返回的类型,每次 handleData
方法返回的是 String
类型。
handleData
方法有两个形参,它们分别是 value
与 sink
,value
代表我们向 sink
中添加的数据,而 sink
是暴露出来供我们使用的,上面的例子中,我们使用了 sink
的 add
方法以及 addError
方法。