Использование потоков в DART

Использование потоков данных

5 February 2014 г. 18:31:48

Использование потоков для частого обновления данных.

Перевод статьи Криса Бакета

 

Запуск программы в барузере является совокупностью различных HTML событий, таких как button.onClick или событий на сервере изменения в dart:io. Потоки образуют единый интерфейс ко всему что может отправлять повторяющиеся серии данных.

 

Этот раздел объясняет как использовать потоки используя единый интерфейс.

Справочные материалы: фьючерсы

Что такое потоки?

Использование потоков

Общие методы потоков

Потоки с одним значением

Обработка ошибок в потоках и фьючерсах

Отписывание от потока

Потоки являются общими

Несколько примеров реального использования потоков

Заключение

Об авторе

 

 

Справочные материалы: фьючерсы.

 

Прежде чем мы начнем, очень важно понять что потоки (Streams) являются частью библиотеки dart.async. Они связаны между собой в Dart async и  имеют общую асинхронную природу. (Вы не можете блокировать код в тот момент когда пользователь нажимает кнопку!)

Чтобы найти больше информации о фьючерсах, обратитесь к предыдущему разделу.

 

Что такое потоки?

 

Представьте что вы пишете чат. На одном клиенте вы будете получать сообщения и отображать их пользователю. Вы не сможете написать бесконечный цикл, потомучто он будет блокировать выполнение, поэтому вы должны использовать асинхронные колбэки. Этот случай идеальный для использования потоков: у вас есть одна часть кода которая помещает данные в поток, в втарая часть вашего кода слушает поток

 Основная концепция:

  • Использование потока: Данные отправляются из потока подписчику (StreamSubscriber ) или, возможно нескольким подписчикам.
  • Заполнение потока: получение данных в поток при помощи  StreamController.

 

Итак рассмотрим использование потока в этом разделе,  так как скорее всего в Dart вам понадобиться использовать потоки для существующих API. Заполнение потока будет рассмотрено в будущих статьях.

 

Использование потоков:

 

Давайте взглянем на простой код использующий потоки. Для простоты сейчас мы создадим поток из фиксированного списка используя конструктор fromIterable(), вместо динамического заполнения при помощи StreamController.

var data = [1,2,3,4,5]; // some sample data
var stream = new Stream.fromIterable(data); // create the stream

Теперь у нас есть поток, готовый отправить данные. Мы можем прослушивать этот поток чтобы получить данные.

Обычно для этого используется метод listen(), для подписки на поток. Он содержит несколько необязательный параметров и один обязательный onData - функция обработки обратного вызова:

 import 'dart:async';
 
main() {
  var data = [1,2,3,4,5]; // some sample data
  var stream = new Stream.fromIterable(data);  // create the stream
 
  // subscribe to the streams events
  stream.listen((value) {       //
    print("Received: $value");  // onData handler
  });                           //
}

Метод listen()  срабатывает каждый раз при получении данных. обработчик listen() вызавется для каждого элемента данных. поэтому при выполнении кода выше мы, как и ожидалось, получим:

Received: 1<
Received: 2
Received: 3
Received: 4
Received: 5

Есть и другие способы использования данных из потока, используя такие свойства как  first, last,length, и isEmpty.Каждое из этих свойств возвращает фьючерс. Например:

stream = new Stream.fromIterable([1,2,3,4,5]);
stream.first.then((value) => print("stream.first: $value"));  // 1
 
stream = new Stream.fromIterable([1,2,3,4,5]);
stream.last.then((value) => print("stream.last: $value"));  // 5  
 
stream = new Stream.fromIterable([1,2,3,4,5]);
stream.isEmpty.then((value) => print("stream.isEmpty: $value"));  // false
 
stream = new Stream.fromIterable([1,2,3,4,5]);
stream.length.then((value) => print("stream.length: $value"));  // 5

Бывает 2 вида потоков: с одним получателем (single) или с многими (multiple), так же его называют широковещательным (broadcast). По умолчанию, нажи потоки являются потоками с одним получателем. Это значит что если вы захотите прослушивать поток более одного раза, вы получите исключение, и исолькование любой колбэк функции или свойства-фьючерса будет считаться прослушиванием.

Вы можете конвертировать поток с одним получателем в широковещательный поток используя метод asBroadcastStream() , как показано ниже:

var data = [1,2,3,4,5];
var stream = new Stream.fromIterable(data);
var broadcastStream = stream.asBroadcastStream();
 
broadcastStream.listen((value) => print("stream.listen: $value")); 
broadcastStream.first.then((value) => print("stream.first: $value")); // 1 
broadcastStream.last.then((value) => print("stream.last: $value")); // 5
broadcastStream.isEmpty.then((value) => print("stream.isEmpty: $value")); // false
broadcastStream.length.then((value) => print("stream.length: $value")); // 5

Теперь этот поток доступен нескольким подписчикам и вы можете добавить их. Вы можете проверить является ли поток широковеательным используя свойство stream.isBroadcast.

 

Общие методы потоков

 

В классе Stram доступно большое количество методов. В следующей часте я опишу некоторые из них наибоее общие. Посмотрите полный список методов доступен в документации 

 

Подмножества данных потока

 

 У потоко весть несколько полезных методов для извлечения части данных, которая была отправлена из потока. Методы  take(), skip(), takeWhile(), skipWhile() и  where() позволят вам получить подмножество данных, как показано в примере ниже. Каждый вывод использует свой собственный поток, который Вы можете прослушивать.

broadcastStream
    .where((value) => value % 2 == 0) // divisible by 2
    .listen((value) => print("where: $value")); // where: 2
                                                // where: 4
 
broadcastStream
    .take(3) // takes only the first three elements
    .listen((value) => print("take: $value")); // take: 1
                                               // take: 2
                                               // take: 3
 
broadcastStream
    .skip(3)  // skips the first three elements
    .listen((value) => print("skip: $value")); // skip: 4
                                               // skip: 5
 
broadcastStream
    .takeWhile((value) => value < 3) // take while true
    .listen((value) => print("takeWhile: $value")); // takeWhile: 1
                                                    // takeWhile: 2
 
broadcastStream
    .skipWhile((value) => value < 3) // skip while true
    .listen((value) => print("skipWhile: $value")); // skipWhile: 4
                                                    // skipWhile: 5

 

Преобразование потока данных

 

Еще один полезный метод это transform(), который принимает экземпляр StreamTransformer. Это позволит вам изменять содержание потока. Конструктор StreamTransformer принимает функцию handleData , которая вызывается для каждого значения передаваемое в поток. Вы можете изменять эти значения как Вам угодно, и отправит из назад в StreamSink, что приведет к изменению значений котораые будут выведены методом transform(). Пример ниже принимает данные  [1,2,3,4,5] и преобазует каждый элемент в две новые строки  "Message n" и "Body n". каждая из этих строк помещается в новый поток.

 // Определим преобразование функции
var transformer = new StreamTransformer(handleData: (value, sink) {
  // create two new values from the original value
  sink.add("Message: $value");
  sink.add("Body: $value");
});
  
// Преобразуем поток и прослушаем его
stream.transform(transformer).listen((value) => print("listen: $value"));

Выполнение этой программы выведет следующее:

listen: Message: 1
listen: Body: 1
listen: Message: 2
listen: Body: 2
listen: Message: 3
listen: Body: 3
listen: Message: 4
listen: Body: 4
listen: Message: 5
listen: Body: 5

 Возможно большинство общих преобразований используется для преобразования списка в строку используя UTF8.decoder из библиотеки dart:convert, например когда происходит чтение данных из файла или HTTP запроска, как на примере ниже:

File file = new File("some_file.txt");
file.openRead()
    .transform(UTF8.decoder) // use a UTF8.decoder
    .listen((String data) => print(data), // output the data
        onError: (error) => print("Error, could not open file"),
        onDone: () => print("Finished reading data"));

 

Валидация потока данных

 

Иногда, возникает необходимость поверить данные полученные из потока по определенным условиям. Следующие функции any(), every(), contains() возвращают фьючерс   Future<bool>.

 broadcastStream
    .any((value) => value < 5)
    .then((result) => print("Any less than 5?: $result")); // true
  
broadcastStream
    .every((value) => value < 5)
    .then((result) => print("All less than 5?: $result")); // false
  
broadcastStream
    .then((result) => print("Contains 4?: $result")); // true

 

Потоки с одним значением

 

Иногда поток проектируется таким образом, чтобы возвращать только одно значение и вы хотите быть уверенным в том что вы получите только одно значение из потока. Геттер single и метод singleWhere() оба возвращают фьючерс, который содержит только одно значние, или вызывают исключение в обратном случае. Например, наш набор данных содержит 5 элементов [1,2,3,4,5]. Следущий код вернет только элемент 1:

 broadcastStream
    .singleWhere((value) => value < 2)  // there is only one value less than 2
    .then((value) => print("single value: $value")); 
    // outputs: single value: 1

Таким образом, следующий код вызовет ошибку и прервет выполнение приложения (потому что ошибка не обработана):

 broadcastStream
    .single  // will fail - there is more than one value in the stream
    .then((value) => print("single value: $value"));

Это приводит нас к…

 

Обработка ошибок в потоках и фьючерсах

 

Вот великолепная статья про обработку ошибок фьючерсов основываясь на API , поэтому я не буду повторять его. Полезно отметить что мы можем переписать предыдущий снипет включив обработчик ошибки, то есть мы можем определить что вызов single  завершился ошибкой. Фьючерс then() возвращает фьючерс которым мы можем обработать при помощи catchError(). Обработчик catchError будет перехватывать все ошибки которые возникнут в then():

broadcastStream
    .single  // will fail - there is more than one value in the stream
    .then((value) => print("single value: $value")) 
    .catchError((err) => print("Expected Error: $err")); // catch any error in the then()
    // output: Bad State: More than one element

Обработка ошибок с помощью StreamSubscription

Когда Вы используете функцию listen() чтобы прослушивать значения приходящие из потока, вы опционально можете добавить обработчик ошибок. Функция listen  создает экземляр StreamSubscription .

StreamSubscription  содердит несколько обработчиков: onData, onError и onDone. кадлый из них может быть определен в фунцие listen() или позже используя полученый объект StreamSubscription. Отметим что обработчик onError, который вы можете использывать для перехватывания ошибок потока:

 // setup the handlers through the subscription's handler methods
var subscription = stream.listen(null);
subscription.onData((value) => print("listen: $value"));
subscription.onError((err) => print("error: $err"));
subscription.onDone(() => print("done"));

и:

// setup the handlers as arguments to the listen() function
var subscription = stream.listen(
    (value) => print("listen: $value"),
    onError: (err) => print("error: $err"),
    onDone: () => print("done"));

Обе программы выведут одно и то же:

listen: 1
listen: 2
listen: 3
listen: 4
listen: 5
done

Одно из преимуществ раздельного использования var subscription = stream.listen(null) и последующей настройки обработчка onData  означает что ввы можете использовать объект subscription  непосредственно в обработчике.

Обработчик onDone  будет вызван когда данных больше нет и базовый поток закрыт.

 

Отписывание от потока

 

Вы можете использовать объект StreamSubscription чтобы отписаться от потока используя метод cancel(). Например слушатель из следуюещего кода отписывается от потока после получения значения 2, поэтому никогда не получит сообщение из onDone():

 var subscription = stream.listen(null);
subscription.onData((value) {
  print("listen: $value");
  if (value == 2) subscription.cancel(); // cancel the subscription
});
subscription.onError((err) => print("error: $err"));
subscription.onDone(() => print("done"));

 

Потоки являются общими

 

Все классы потоков являются обобщенными, это значит что в обработчике вы можете  получить строго типизированные данные.  Например, если вы создадите поток Stream<String>, то все последующие функции обработки будут ожидать строки, что показано в коде ниже:

var data = [1,2,3,4,5]; // ints, valid
// var data = ["1","2","3","4","5"]; // strings, not valid
var stream = new Stream<int>.fromIterable(data); // Stream<int>
stream.listen((value) { // value must be an int
  print("listen: $value");
});

 

Несколько примеров реального использования потоков

 

Вы видели как использовать данные из потока, а теперь давайте взглянем на несколько примеров из реального мира: обработка нажатий кнопок, и чтение данных из файла.

 

Нажатие кнопок в dart:html

 

Кнопки имеют несколько событий onSomeEvent  определенных как поток. Поток onClick определен как Stream<MouseEvent>. Этот тип означает что данные, которые Вы получаюте когда прослушиваете поток onClick будут иметь тип MouseEvents.

Код ниже настраивает кнопки и несколько обработчиков событий. Один обработчик событий остается зарегистрированным, а оставшееся отменяет регистрацию после третьего нажатия.

 import 'dart:html';
 
void main() {
  var button = new ButtonElement();
  document.body.children.add(button);
  
  button.text = "Foo"; 
  var clickCount = 0;
  button.onClick.listen((mouseEvent) {
    print("clicked"); // remain subscribed for all clicks
  });
  
  var subscription = button.onClick.listen(null);<
  subscription.onData((mouseEvent) {
    print("copy that");
    clickCount++;
    window.alert("Clicked");
    if (clickCount == 3) {
      subscription.cancel(); // unsubscribe after the third click
    }
  });  
}

Когда кнопка нажата, обработчик нажатий наращивает счетчик. После третьего нажатия второе события отписывает само себя.

 

Чтение файлов в dart:io

 

Второй пример из реального мира демонстрирует как читать какой нибудь файл из файловой системы. file.openRead() возвращает поток, в котором находится содеражние файла. Поток ( который содержит список List<int>) декодируется при помощи класса UTF8.decoder из dart:convert предназначенного для преобразования UTF-8.

 

import 'dart:io';
 
main() {
  File file = new File("some_file.txt");
  file.openRead()
      .transform(UTF8.decoder) // use a UTF8.decoder
      .listen((String data) => print(data), // output the data
        onError: (error) => print("Error, could not open file"),
        onDone: () => print("Finished reading data"));
}

 

Заключение

 

Потоки являются универсальным асинхронным API во всем Dart, предоставляя мощный инструмент для работы с данными независимо от того являются данные событиями, байтами или вашими собственными классами.

 

Об авторе

 

Chris Buckett is a Technical Manager for Entity Group Ltd, responsible for building and delivering enterprise client-server webapps, mostly with GWT, Java and .Net. He runs the dartwatch.com blog, and has written the book Dart in Action, which is available at manning.com.

 


Оставьте свой комментарий

comments powered by Disqus
Меню

Cult of digits 2014 Яндекс.Метрика