header-bg.jpg
Dart 异步支持之 Stream
发表于 2019-09-04 15:21
|
分类于 Dart
|
评论次数 0
|
阅读次数 56

QQ图片20190703111655.png

当某些事件不断地反复发生,使用 Stream 模式比 Future 更好, node.js 中也有 Stream 这个东西,所以熟悉 node.js 的开发者理解 Stream 应该非常简单。

Stream概念

Stream 的概念比较抽象,我们可以将 Stream 看作一条没有水的小溪,每当小溪的上游加一次水后,我们就可以在小溪的下游接一次水。水会经过一段时间从上游流至下游,具体是什么时候加的水,加了多少次,每次需要多久才能流至下游,咱也不知道,咱也不敢问。

我们将加一次水看作一个事件,而这个事件的时间是个不确定的因素,我们不知道事件什么时候发生、什么时候结束,但是我们可以一直在下游等待接水,等待所有的事件完成。整个过程都是异步的,这大概就是 Stream 的概念。

创建 Stream

我们可以通过以下两种方式创建 Stream

构造函数

从 Future 创建单订阅流,当 Future 完成时将触发一个 dataerror 回调函数,然后使用 Down 事件关闭这个流,例如:

void main() {
  final request = Future<String>(() {
    throw new FormatException('Expected at least 1 section');
  });

  final streamRequest = Stream.fromFuture(request);
  streamRequest.listen((data) => print(data), onError: (e) => print(e));
  print(streamRequest);// output: Instance of '_ControllerStream<String>'
  // output: FormatException: Expected at least 1 section
}

从一组 Future 创建新的单订阅流,每个 Future 完成时将触发自己的 dataerror 回调函数,当所有 Future 完成后,订阅流将会关闭,如果 Futures 为空,订阅流则会立即关闭,例如:

void main() {
  final request = Future<String>(() {
    throw new FormatException('Expected at least 1 section');
  });

  final request2 = Future<String>(() {
    throw new FormatException('Expected at least 2 section');
  });

  final streamRequest = Stream.fromFutures([request, request2]);
  streamRequest.listen((data) => print(data), onError: (e) => print(e));
  print(streamRequest);// output: Instance of '_ControllerStream<String>'
  
  /**
   * output: 
   * FormatException: Expected at least 1 section
   * FormatException: Expected at least 2 section
   */
}

创建一个单订阅流,这个流从一个可迭代对象中获取数据,例如:

void main() {
  final stream = Stream.fromIterable([1, 2, 3]);
  print(stream);// output: Instance of '_GeneratedStreamImpl<int>'
}

StreamController

StreamController 为我们提供了非常丰富的功能,我们可以在 StreamController 上发送数据、捕获错误和获取结果,例如:

import 'Dart:async';

void main() {
  final controller = StreamController();
  controller.sink.add(123);
  controller.sink.add('foo');
  print(controller);// output: Instance of '_AsyncStreamController<dynamic>'
}

上面的代码中,我们通过 add 方法,可以在流中添加任意类型的数据,如果需要限制数据类型,则可以使用泛型,例如:

import 'Dart:async';

void main() {
  StreamController<int> controller = StreamController();
  controller.sink.add(123);
  controller.sink.add(456);
  print(controller);// output: Instance of '_AsyncStreamController<int>'
}

接着,我们可以使用 listen 方法来获取结果,并使用 close 方法关闭 sink 实例,防止内存泄漏和意外行为,例如:

import 'Dart:async';

void main() {
  StreamController<int> controller = StreamController();
  controller.stream.listen((data) => print(data));
  controller.sink.add(123);
  controller.sink.add(456);
  controller.close();

  /**
   * output:
   * 123
   * 456
   */
}

Stream类型

Stream 分为两种类型:

Single subscription streams(单订阅流)

单订阅流通常用于流式传输很大的连续数据块,它在 Stream 的整个生命周期内只允许存在一个 listener,它在有 listener 之前不会生成事件,并且在 listener 取消收听后会停止发送事件,即使你仍在 sink.add 更多事件。

需要注意的是,即使 listener 取消收听,单订阅流也不允许出现第二个 listener,否则将会报错,例如:

import 'dart:async';

void main() {
  final controller = StreamController();

  controller.stream.listen(print);
  controller.stream.listen(print);

  controller.sink.add(123);
  controller.close();
  
  // Unhandled exception: Bad state: Stream has already been listened to.
}

Broadcast streams(广播流)

一般的流都是单订阅流,从 Stream 继承的广播流必须重写 isBroadcast 才会返回 true,广播流允许存在任意数量的 listener,并且无论是否存在 listener,它都能产生事件,所以中途加入的 listener 不会侦听到已发生的事件。

如果在触发事件的同时加入 listener,则此 listener 不会接收到本次事件,如果 listener 取消收听,则此 listener 会立即停止接收事件。

我们可以通过 asBroadcastStream 在单订阅流之上创建广播流,例如:

import 'dart:async';

void main() {
  final controller = StreamController();

  final stream = controller.stream.asBroadcastStream();

  stream.listen(print);
  stream.listen(print);

  controller.sink.add(123);
  controller.close();
  
  /**
   * output:
   * 123
   * 123
   */
}

Stream转换

Dart 提供了 mapwhereexpandtake 方法将已存在的 Stream 转换为新的 Stream

map

我们可以使用 map 方法对流的数据进行遍历,和 listen 的效果类似,例如:

import 'dart:async';

void main() {
  StreamController<int> controller = StreamController();

  controller.stream.map((data) => data += 1).listen((data) => print(data));
  controller.sink.add(123);
  controller.sink.add(456);
  controller.close();
  /**
   * output:
   * 124
   * 457
   */
}

上面的代码中,我们向 controller 中添加了 2 个 数值,map 方法使这些数值每次都会自增,所以最后 listen 方法侦听到的结果是 124457

where

我们可以使用 where 对流的数据进行筛选,where 方法接受一个匿名函数作为参数,函数的参数是我们向 sink 中添加的数据,函数的返回值为 true 时,数据才会允许通过,例如:

import 'dart:async';

void main() {
  StreamController<int> controller = StreamController();

  final whereStream = controller.stream.where((data) => data == 123);
  whereStream.listen((data) => print(data));

  controller.sink.add(123);
  controller.sink.add(456);
  controller.close();
  // output: 123
}

上面的代码中,where 条件中定义了数据必须与 123 相同才会被 listen 侦听到,所以向流中添加了 2 条数据,而打印的结果只有 123

expand

我们可以使用 expand 扩展现有的流,此方法接受一个方法作为参数,返回值必须是一个 Iterable 类型的数据,例如:

import 'dart:async';

void main() {
  StreamController<int> controller = StreamController();

  controller.stream
      .expand((data) => [data, data.toDouble()])
      .listen((data) => print(data));
  controller.sink.add(123);
  controller.sink.add(456);
  controller.close();
  /**
   * output:
   * 123
   * 123.0
   * 456
   * 456.0
   */
}

上面的代码中,expand 的回调函数每次返回 list,该 list 包含一个整数及其浮点数,所以最后打印的结果是 4 个数值。

take

我们可以通过 take 方法控制 Stream 中的元素数量,take 方法接受一个数值作为参数,例如:

import 'dart:async';

void main() {
  StreamController<int> controller = StreamController();

  controller.stream.take(1).listen((data) => print(data));
  controller.sink.add(123);
  controller.sink.add(456);
  controller.close();
  // output: 123
}

上面的代码中,我们向 controller 中添加了 2 个 数值,通过 take 方法截取后,最后 listen 方法只侦听到第一个数值。

transform

当我们需要处理更复杂的 Stream 时,我们可以使用 transform 方法,该方法接受一个 StreamTransformer 参数,当我们每次向 sink 中添加数据时,数据会先经过 transform 的处理,例如:

import 'dart:async';

void main() {
  StreamController<int> controller = StreamController<int>();

  final transformer = StreamTransformer<int, String>
      .fromHandlers(handleData: (value, sink) {
        value == 123 ? sink.add('test success') : sink.addError('test error');
      });

  controller.stream
      .transform(transformer)
      .listen((data) => print(data), onError: (err) => print(err));

  controller.sink.add(123);// output: test success
  controller.sink.add(456);// output: test error
  controller.close();
}

上面的代码中,每次向 sink 中添加数据时, StreamTransformer<S,T>fromHandlers 方法就会返回一条新的 Stream,其中 S 代表我们所添加的数据类型,在这里我们添加的是 int 类型,T 代表 handleData 返回的类型,每次 handleData 方法返回的是 String 类型。

handleData 方法有两个形参,它们分别是 valuesinkvalue 代表我们向 sink 中添加的数据,而 sink 是暴露出来供我们使用的,上面的例子中,我们使用了 sinkadd 方法以及 addError 方法。

发布评论
还没有评论,快来抢沙发吧!