Programación Reactiva, uso de la librería RxJs

3
26237

La programación reactiva es un paradigma enfocado en el trabajo con flujos de datos de manera asíncrona. En este tutorial veremos la libreria RxJs, no toda, porque es muy extensa, pero sí lo que considero más importante para poder usarla de forma correcta y eficiente.

Índice de contenidos


1. Introducción

Llevo tiempo luchando contra los observables y la verdad es que hasta que no te metes y te pegas bien con algo, no consigues que te quede claro. Así que esta mañana me he levantado con ganas de solucionar esto, he empezado a leer y he pensado que a más de uno le interesaría aclarar este tema, pues venga, hagamos un tutorial para que cualquiera que tenga el mismo problema, disponga de una solución fácil al alcance.


2. Entorno

El tutorial está escrito usando el siguiente entorno:

  • Hardware: Portátil MacBook Pro Retina 15′ (2.5 Ghz Intel Core I7, 16GB DDR3).
  • Sistema Operativo: Mac OS Sierra 10.12.6


3. Programación reactiva

La programación reactiva es la programación con flujos de datos asíncronos.
Los buses de eventos (por ejemplo eventos de clicks) son un flujo de eventos asíncronos, en el que se pueden observar y reaccionar en consecuencia. Se pueden crear streams de datos de cualquier cosa.

Además de esto, disponemos de una increíble toolbox de funciones para combinar, crear y filtrar cualquiera de esos streams. Ahí es donde entra en acción la magia.
Un stream puede usarse como entrada de otro. Incluso los streams múltiples se pueden utilizar como entradas de otro stream. Puedes combinar dos streams. Puedes filtrar una secuencia para obtener otra que solo tenga los eventos que le interesen. Puedes asignar valores de datos de una secuencia a otra nueva. Como podéis ver, las posibilidades son infinitas.

Un stream es una secuencia de eventos ordenados en el tiempo. Puede emitir tres cosas diferentes: un valor (de algún tipo), un error y una señal de «completado».

Capturamos estos eventos emitidos de forma asíncrona, definiendo una función que se ejecutará cuando se emite un valor, otra función cuando se emite un error y otra se emite cuando se completa. A veces estos dos últimos se pueden omitir y nos podemos centrar en la definición de la función de los valores. La «escucha» del stream se llama suscripción. Las funciones que estamos definiendo son observadores. El stream es el sujeto (o «observable») que se observa.

Una vez claros los conceptos vamos a ver un ejemplo donde podamos ver todos los elementos explicados.

En el ejemplo podemos ver como se llama a una función que devuelve el precio de los productos, tiene un filtro para que solo nos devuelva los precios mayores de 30. Después tenemos la suscripción, que si va bien y tenemos el precio, mostramos un mensaje con los precios mayores de 30. Si por algún motivo falla, mostramos un error. Finalmente, con dispose, cerramos la conexión, y dejamos de observar el stream.

/* Get stock  */
const subscription = getAsyncStockData()
  .filter(quote => quote.price > 30)
  .map(quote => quote.price)
  .subscribe(
    price => console.log(`Prices higher than $30: ${price}`),
    err => console.log(`Something went wrong: ${err.message}`)
  );

/*acabamos */
subscription.unsubscribe();


4. RxJS

La librería reactiva RxJS es un conjunto de bibliotecas para generar programas asíncronos y basados ​​en eventos que utilizan secuencias observables y operadores de consulta.

RxJS, nos permite representan flujos de datos asíncronos con Observables, consultar los flujos de datos asíncronos utilizando los operadores y parametrizar la concurrencia en los flujos de datos asíncronos usando Schedulers. En pocas palabras, RxJS = Observables + Operadores + Programadores(schedulers).


4.1. Observables

Un observable es una colección lazy de múltiples valores.

El siguiente ejemplo es un observable que añade los valores 1, 2, 3 sincrónicamente, al subscribirse, y después de un segundo se añade el 4, finalmente completa:

let observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

Nos podemos suscribir al observable mediante el método subscribe() del observable

observable.subscribe(x => console.log(x));

La ejecución del observable es el código que se encuentra en el interior de:

Observable.create(function subscribe(observer) {...}) 

Cuando un observador se suscribe, se ejecuta el código y produce múltiples valores a lo largo del tiempo, ya sea de forma síncrona o asíncrona.

Hay tres tipos de valores que una ejecución observable puede ofrecer:

  • «Next»: envía un valor como un número, una cadena, un objeto, etc.
  • «Error»: envía un error o excepción de JavaScript.
  • «Complete»: finaliza (no envía un valor).


4.2. Observer

Un «observer» es un consumidor de valores entregados por un Observable. Para usar el Observador, utilizamos el método ‘subscribe’ de un Observable:

observable.subscribe(observer);


4.3. Subscription

Una «subscription» es un objeto que representa un recurso disponible, normalmente la ejecución de un observable. Tiene un importante método, unsubscribe, que libera el recurso o cancela la ejecución del observable.


4.4. Subject

Un «subject» de RxJS es un tipo especial de observable que permite que los valores sean difundidos a muchos observadores. Mientras que los observables simples son unicast (cada observador suscrito posee una ejecución independiente del observable), los sujetos son multicast.

En el ejemplo siguiente, tenemos dos Observadores adjuntos a un Asunto, y alimentamos algunos valores al Asunto:

let subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

subject.next(1);
subject.next(2);

Con la siguiente salida en la consola:

observerA: 1
observerB: 1
observerA: 2
observerB: 2


4.5. Observables Multicast

Un «observable multicast» pasa notificaciones a través de un «subject» que puede tener muchos suscriptores, mientras que un simple «unicast observable» sólo envía notificaciones a un solo Observador.

El operador multicast en realidad lo que hace es que los observadores se suscriben a un sujeto subyacente, y el sujeto se suscribe a la fuente Observable.

El siguiente ejemplo es similar al ejemplo anterior donde usamos observable.subscribe(subject):

const source = Rx.Observable.from([1, 2, 3]);
const subject = new Rx.Subject();
const multicasted = source.multicast(subject);

// esto, internamente sería: `subject.subscribe({...})`:
multicasted.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
  next: (v) => console.log('observerB: ' + v)
});

// esto, internamente sería: `source.subscribe(subject)`:
multicasted.connect();


Multicast devuelve un Observable que se parece a un Observable normal, pero funciona como un Suject cuando se trata de suscribirse.


4.6. Operadores

Los operadores son las piezas esenciales que permiten que el código asincrónico sea compuesto fácilmente de forma declarativa.

Existen infinidad de operadores, clasificados en grupos:

  • Operadores de creación
  • Operadores de transformación
  • Operadores de filtrado
  • Operadores de combinación
  • Operadores de multidifusión
  • Operadores de manejo de errores
  • Operadores de utilidad
  • Operadores de condicionales y booleanos
  • Operadores de matematicos

A continuación veremos los más utilizados.


4.6.1. Operadores de creación

create()

Convierte una función onSubscription en un Observable.

const observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});
observable.subscribe(
  value => console.log(value),
  err => {},
  () => console.log('this is the end')
);

// Logs
// 1
// 2
// 3
// "this is the end"

of()

Simplemente convierte una lista de argumentos en una secuencia observable.

from()

Crea una secuencia Observable de una matriz o un objeto que se puede iterar.


4.6.2. Operadores de transformación

map()

Transforma cada elemento de la secuencia Observable. Se puede considerar similar a la función de mapa de Array.


4.6.3. Operadores de filtrado

filter()

Filtra los elementos de la colección y solo nos obtendremos los que cumplan la condición de filtro.


4.6.4. Operadores de combinación

merge()

Merge se suscribe a cada entrada dada y simplemente reenvía (sin hacer ninguna transformación) todos los valores de todos los observables de entrada a la salida. La salida observable sólo se completa una vez que se han completado todas las observables de entrada. Cualquier error entregado por un observable será emitido inmediatamente en la salida del observable.


4.6.5. Operadores de multidifusión

do()

Intercepta cada emisión en la fuente y ejecuta una función, pero devuelve una salida que es idéntica a la fuente.

Ejemplo: Mapear cada click a la posición clientX de ese click, mientras también se registra el evento de clic

const clicks = Rx.Observable.fromEvent(document, 'click');
const positions = clicks
  .do(ev => console.log(ev))
  .map(ev => ev.clientX);
positions.subscribe(x => console.log(x));

multicast()

Devuelve un Observable que emite los resultados de la invocación de un selector en los elementos emitidos por un ConnectableObservable(clase que extiende de Observable) que comparte una sola suscripción al stream subyacente.


4.6.6. Operadores de manejo de errores

catch()

Detecta errores en el observable para ser manejado devolviendo un nuevo observable o lanzando un error.


4.6.7. Operadores de utilidad

toPromise()

Convierte una secuencia Observable en una promesa compatible con ES2015.

 
let source = Rx.Observable
  .just(42)
  .toPromise();

source.then((value) => console.log('Value: %s', value));
// => Value: 42

// Rejected Promise
// Using normal ES2015
let source = Rx.Observable
  .throw(new Error('woops'))
  .toPromise();

source
  .then((value) => console.log('Value: %s', value))
  .catch((err) => console.log('Error: %s', err));
// => Error: Error: woops

toArray()

Convierte una secuencia Observable en un array.


4.6.8. Operadores condicionales

find()

Emite sólo el primer valor emitido por la fuente Observable que cumple con alguna condición.

Ejemplo: buscar y emitir el primer click que sucede en un elemento ‘DIV’

const clicks = Rx.Observable.fromEvent(document, 'click');
const result = clicks.find(ev => ev.target.tagName === 'DIV');
result.subscribe(x => console.log(x));


4.6.9. Operadores matematicos

count()

Cuenta el número de emisiones en la fuente y emite ese número cuando se completa la fuente.

Ejemplo: contar cuántos números impares hay entre 1 y 100

const numbers = Rx.Observable.range(1, 100);
const result = numbers.count(i => i % 2 === 1);
result.subscribe(x => console.log(x));


4.7. Scheduler

Un scheduler controla cuándo se inicia una suscripción y cuándo se entregan las notificaciones. Consta de tres componentes.

Estructura de datos.

Sabe cómo almacenar y poner en cola tareas basadas en la prioridad u otros criterios.

Contexto de ejecución.

Denota dónde y cuándo se ejecuta la tarea. Un ejemplo sería que se ejecutara inmediatamente, también podría ejecutarse en otro mecanismo de devolución de llamada como setTimeout.

Reloj (virtual)

Proporciona una noción de «tiempo» para ello se usa el método now(). Las tareas que se programan en un programador en particular se adhieren sólo al tiempo indicado por ese reloj.

const observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
})
.observeOn(Rx.Scheduler.async);

console.log('just before subscribe');
observable.subscribe({
  next: x => console.log('got value ' + x),
  error: err => console.error('something wrong occurred: ' + err),
  complete: () => console.log('done'),
});
console.log('just after subscribe');

salida:
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done


5. ¿Qué pasa con las promesas?

Para terminar, solo comentar que las promesas son buenas para resolver operaciones asíncronas, donde el comportamiento esperado es un valor. Las extensiones reactivas para JavaScript unifican tanto el mundo de las promesas como las devoluciones de llamadas, así como datos de eventos. Con los stream observables podemos reaccionar ante un único valor o un flujo de valores.


6. Conclusiones

Un vez estudiado el tema de los observables, quedo sorprendido con lo útil y versátil que es la librería Rx. Podemos hacer observables de lo que queramos y manejarlos a nuestro antojo. Un gran avance en Angular al incorporar los observables y un gran acierto haberlos estudiado a fondo.

Un saludo

Hasta la proxima…


7. Referencias

3 COMENTARIOS

  1. Brother te escapaste con el tutorial, antes de empezar este tuttorial no tenia ni la mas remota idea de lo que copiaba a otros desarrolladores y despues de leer este tuto ya seo capaz de utilizar rxJS mas a fondo, GRACIAS y continua haciendo estos tutoriales

DEJA UNA RESPUESTA

Por favor ingrese su comentario!

He leído y acepto la política de privacidad

Por favor ingrese su nombre aquí

Información básica acerca de la protección de datos

  • Responsable:
  • Finalidad:
  • Legitimación:
  • Destinatarios:
  • Derechos:
  • Más información: Puedes ampliar información acerca de la protección de datos en el siguiente enlace:política de privacidad