Operadores de programación reactiva en RxJava 2

Si su aplicación de Android va a acumular esas reseñas de cinco estrellas en Google Play, entonces debe poder realizar múltiples tareas.

Como mínimo, los usuarios móviles de hoy en día esperan poder interactuar con su aplicación mientras realizan trabajos en segundo plano. Esto puede sonar sencillo, pero Android es de un solo hilo por defecto, así que si va a cumplir con las expectativas de su audiencia, tarde o temprano tendrá que crear algunos hilos adicionales..

En el artículo anterior de esta serie, obtuvimos una introducción a RxJava, una biblioteca reactiva para JVM que puede ayudarlo a crear aplicaciones de Android que reaccionan a los datos y eventos a medida que ocurren. Pero también puedes usar esta biblioteca para reaccionar ante datos y eventos. simultaneamente.

En esta publicación, te mostraré cómo puedes usar los operadores de RxJava para finalmente hacer que la concurrencia en Android sea una experiencia sin dolor. Al final de este artículo, sabrá cómo usar los operadores de RxJava para crear subprocesos adicionales, especificar el trabajo que debería ocurrir en estos subprocesos y luego publicar los resultados en el importante subproceso de la interfaz de usuario principal de Android, todo con solo pocas lineas de codigo.

Y, dado que ninguna tecnología es perfecta, también le contaré acerca de un gran escollo potencial al agregar la biblioteca RxJava a sus proyectos, antes de mostrarle cómo usar los operadores para garantizar este problema. Nunca Ocurre en tus propios proyectos de Android..

Introduciendo Operadores

RxJava tiene una enorme colección de operadores destinados principalmente a ayudarlo a modificar, filtrar, fusionar y transformar los datos que emite su Observables. Encontrará la lista completa de operadores de RxJava en los documentos oficiales, y aunque nadie espera que memorice cada operador, vale la pena dedicar un tiempo a leer esta lista, solo para tener una idea aproximada de las diferentes transformaciones de datos que puede realizar..

La lista de operadores de RxJava ya es bastante exhaustiva, pero si no puede encontrar el operador perfecto para la transformación de datos que tenía en mente, siempre puede encadenar varios operadores. Aplicando un operador a un Observable normalmente devuelve otro Observable, para que pueda seguir aplicando operadores hasta que obtenga los resultados que desea.

Hay demasiados operadores RxJava para cubrir en un solo artículo, y los documentos oficiales de RxJava ya hacen un buen trabajo al presentar todos los operadores que puede usar para las transformaciones de datos, por lo que me centraré en dos operadores que tienen la El mayor potencial para hacer tu vida como desarrollador de Android más fácil: subscribeOn () y observarOn ()

Multihilo con operadores RxJava

Si su aplicación va a proporcionar la mejor experiencia de usuario posible, debe poder realizar tareas intensivas o de larga duración y realizar múltiples tareas simultáneamente, sin bloquear el importante hilo de la interfaz de usuario principal de Android..

Por ejemplo, imagine que su aplicación necesita obtener cierta información de dos bases de datos diferentes. Si realiza ambas tareas una tras otra en el subproceso principal de Android, no solo esto llevará una cantidad de tiempo significativa, sino que la IU no responderá hasta que su aplicación haya terminado de recuperar cada pieza de información de ambas bases de datos. . No es exactamente una gran experiencia de usuario!

Una solución mucho mejor es crear dos subprocesos adicionales en los que puede realizar ambas tareas simultáneamente sin bloquear el subproceso de la interfaz de usuario principal. Este enfoque significa que el trabajo se completará el doble de rápido, y El usuario podrá continuar interactuando con la interfaz de usuario de su aplicación en todo momento. Potencialmente, es posible que sus usuarios ni siquiera sepan que su aplicación está realizando un trabajo intensivo y de larga ejecución en segundo plano: toda la información de la base de datos simplemente aparecerá en la interfaz de usuario de su aplicación, como por arte de magia.!

Fuera de la caja, Android proporciona algunas herramientas que puede utilizar para crear subprocesos adicionales, incluidos Servicios y Servicio de IntenciónSin embargo, estas soluciones son difíciles de implementar y pueden resultar rápidamente en un código complejo y detallado. Además, si no implementas multihilo correctamente, puedes encontrarte con una aplicación que está perdiendo memoria y generando todo tipo de errores..

Para hacer que el multihilo en Android sea aún más doloroso, el hilo principal de la interfaz de usuario de Android es el único que puede actualizar la interfaz de usuario de su aplicación. Si desea actualizar la interfaz de usuario de su aplicación con el resultado del trabajo realizado en cualquier otro hilo, entonces normalmente necesitarás crear una Entrenador de animales en el hilo principal de la interfaz de usuario, y luego use este Entrenador de animales para transferir datos de su hilo de fondo al hilo principal. Esto significa más código, más complejidad y más oportunidades para que se introduzcan errores en su proyecto..

Pero RxJava cuenta con dos operadores que pueden ayudarlo a evitar gran parte de esta complejidad y potencial de error..

Tenga en cuenta que utiliza estos operadores junto con Programadores, que son esencialmente componentes que le permiten especificar trapos. Por ahora, solo piensa en programador como sinónimo de la palabra hilo.

  • subscribeOn (Scheduler): Por defecto, una Observable emite sus datos en el subproceso donde se declaró la suscripción, es decir, donde llamó a .suscribir método. En Android, este es generalmente el hilo principal de la interfaz de usuario. Puedes usar el subscribeOn () operador para definir un diferente Programador donde el Observable Debe ejecutar y emitir sus datos..
  • observeOn (Programador): Puede utilizar este operador para redirigir su ObservableEmisiones a un diferente. Programador, cambiando efectivamente el hilo donde el ObservableSe envían las notificaciones y, por extensión, el subproceso donde se consumen sus datos..

RxJava viene con una serie de programadores que puede utilizar para crear diferentes subprocesos, incluyendo:

  • Schedulers.io (): Diseñado para ser utilizado para tareas relacionadas con IO. 
  • Schedulers.computation (): Diseñado para ser utilizado para tareas computacionales. De forma predeterminada, la cantidad de subprocesos en el programador de cómputo está limitada a la cantidad de CPU disponibles en su dispositivo.
  • Schedulers.newThread (): Crea un nuevo hilo.

Ahora que tiene una visión general de todas las partes móviles, veamos algunos ejemplos de cómo subscribeOn () y observarOn () Se utilizan, y ver algunos programadores en acción..

subscribeOn ()

En Android, normalmente utilizarás subscribeOn () y un acompañante Programador para cambiar el hilo en el que se realiza algún trabajo de larga duración o trabajo intensivo, de modo que no haya riesgo de bloquear el hilo de la interfaz de usuario principal. Por ejemplo, puede decidir importar una gran cantidad de datos en el io () programador o realizar algunos cálculos en el cálculo() programador.

En el siguiente código, estamos creando un nuevo hilo donde el Observable Ejecutará sus operaciones y emitirá los valores. 1, 2, y 3.

Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer); 

Si bien esto es todo lo que necesita para crear un hilo y comenzar a emitir datos en ese hilo, es posible que desee una confirmación de que este observable realmente está operando en un nuevo hilo. Un método es imprimir el nombre del hilo que su aplicación está usando actualmente, en Android StudioMonitor Logcat.

Convenientemente, en la publicación anterior, Comenzando con RxJava, creamos una aplicación que envía mensajes a Logcat Monitor en varias etapas durante el ciclo de vida del Observable, por lo que podemos reutilizar gran parte de este código..

Abra el proyecto que creó en esa publicación y modifique su código para que use el código anterior. Observable como su fuente Observable. Luego agrega el subscribeOn () Operador y especifique que los mensajes que se envíen a Logcat deben incluir el nombre del hilo actual.

Su proyecto terminado debe verse algo como esto:

importar android.support.v7.app.AppCompatActivity; importar android.os.Bundle; import android.util.Log; importar io.reactivex.Observable; importar io.reactivex.Observer; importar io.reactivex.disposables.Disposable; importar io.reactivex.schedulers.Schedulers; La clase pública MainActivity extiende AppCompatActivity public static final String TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer);  Observador Observer = new Observer() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe" + Thread.currentThread (). GetName ());  @ Anular pública void onNext (valor Integer) Log.e (TAG, "onNext:" + value + Thread.currentThread (). GetName ());  @ Anular el vacío público onError (Throwable e) Log.e (TAG, "onError:");  @Override public void onComplete () Log.e (TAG, "onComplete: All Done!" + Thread.currentThread (). GetName ()); ; 

Asegúrese de que el Logcat Monitor de Android Studio esté abierto (seleccionando Monitor de android pestaña, seguido de Logcat) y luego ejecute su proyecto en un dispositivo físico Android o en un AVD. Debería ver el siguiente resultado en el Monitor Logcat:

Aquí puedes ver que .suscribir se está llamando en el subproceso de la interfaz de usuario principal, pero el observable está operando en un subproceso completamente diferente.

los subscribeOn () el operador tendrá el mismo efecto sin importar dónde lo coloque en la cadena observable; sin embargo, no puedes usar múltiples subscribeOn () Operadores en la misma cadena. Si incluye más de uno subscribeOn (), entonces su cadena lo hará solamente utilizar el subscribeOn () Eso es lo más cercano a la fuente observable..

observarOn ()

diferente a subscribeOn (), donde colocas observarOn () en tu cadena hace importa, ya que este operador solo cambia el hilo que usan los observables que aparecen río abajo

Por ejemplo, si insertó lo siguiente en su cadena, cada observable que aparezca en la cadena a partir de este punto utilizará el nuevo hilo..

.observeOn (Schedulers.newThread ())

Esta cadena continuará ejecutándose en el nuevo hilo hasta que encuentre otro observarOn () operador, en cuyo punto cambiará al hilo especificado por ese operador. Puede controlar el hilo donde observables específicos envían sus notificaciones insertando múltiples observarOn () operadores en su cadena.

Al desarrollar aplicaciones de Android, generalmente usarás observarOn () para enviar el resultado del trabajo realizado en subprocesos de fondo al subproceso principal de la interfaz de usuario de Android. La forma más fácil de redireccionar las emisiones al hilo principal de la interfaz de usuario de Android es usar el AndroidSchedulers.mainThread Scheduler, que se incluye como parte de la biblioteca RxAndroid, en lugar de la biblioteca RxJava. 

La biblioteca RxAndroid incluye enlaces específicos de Android para RxJava 2, por lo que es un recurso adicional valioso para los desarrolladores de Android (y algo que veremos con mucho más detalle en la próxima publicación de esta serie).

Para agregar RxAndroid a su proyecto, abra su nivel de módulo construir.gradle Archivo y agregue la última versión de la biblioteca a la sección de dependencias. En el momento de escribir este artículo, la última versión de RxAndroid era 2.0.1, así que estoy agregando lo siguiente:

dependencias … compile 'io.reactivex.rxjava2: rxandroid: 2.0.1'

Después de agregar esta biblioteca a su proyecto, puede especificar que los resultados de un observable deben enviarse al hilo principal de la interfaz de usuario de su aplicación, utilizando una sola línea de código:

.observeOn (AndroidSchedulers.mainThread ())

Teniendo en cuenta que la comunicación con el subproceso de la interfaz de usuario principal de su aplicación ocupa una página completa de los documentos oficiales de Android, esta es una gran mejora que potencialmente podría ahorrarle mucho tiempo al crear aplicaciones de Android de multiproceso..

El mayor inconveniente de RxJava

Si bien RxJava tiene mucho que ofrecer a los desarrolladores de Android, ninguna tecnología es perfecta, y RxJava tiene un gran escollo que tiene el potencial de bloquear su aplicación..

De forma predeterminada, RxJava opera un flujo de trabajo basado en empuje: los datos se producen en sentido ascendente por un Observable, y luego se empuja hacia abajo a la asignada Observador. El principal problema con un flujo de trabajo basado en empuje es lo fácil que es para el productor (en este caso, el Observable) para emitir artículos demasiado rápido para el consumidor (Observador) para procesar.

Un hablador Observable y un lento Observador puede dar como resultado rápidamente una acumulación de elementos no consumidos, lo que va a engullir los recursos del sistema e incluso puede resultar en una OutOfMemoryException. Este problema se conoce como contrapresión.

Si sospecha que se está produciendo una contrapresión en su aplicación, existen algunas soluciones posibles, incluido el uso de un operador para reducir la cantidad de elementos que se producen..

Creación de períodos de muestreo con muestra() y primero ()

Si una Observable está emitiendo una gran cantidad de elementos, entonces puede que no sea necesario para el Observador para recibir cada solo uno de esos artículos.

Si puede ignorar con seguridad algunos de los ObservableEn las emisiones, hay algunos operadores que puede usar para crear períodos de muestreo y luego seleccionar los valores específicos que se emiten durante estos períodos:

  • los muestra() El operador verifica la salida del Observable en los intervalos especificados por usted, y luego toma el elemento más reciente que se emitió durante ese período de muestreo. Por ejemplo, si incluyes .muestra (5, SEGUNDOS) en su proyecto, el observador recibirá el último valor que se emitió durante cada intervalo de cinco segundos. 
  • los acelerador primero () El operador toma el primer valor que se emitió durante el período de muestreo. Por ejemplo, si incluyes .primer acelerador (5, SEGUNDOS) entonces el observador recibirá el primer valor que se emite durante cada intervalo de cinco segundos.  

Emisiones por lotes con buffer()

Si no puede omitir las emisiones de manera segura, es posible que todavía pueda aliviar la presión de un luchador. Observador agrupando las emisiones en lotes y luego enviando en masa. El procesamiento de las emisiones por lotes suele ser más eficiente que el procesamiento de múltiples emisiones por separado, por lo que este enfoque debería mejorar la tasa de consumo.

Puede crear emisiones por lotes utilizando el buffer() operador. Aquí estamos usando buffer() para unir todos los artículos emitidos durante un período de tres segundos:

Observable.range (0, 10) .buffer (3, SEGUNDOS) .subscribe (System.out :: println);

Alternativamente, puedes usar buffer() para crear un lote que consiste en un número específico de emisiones. Por ejemplo, aquí estamos diciendo buffer() para agrupar las emisiones en grupos de cuatro:

Observable.range (0, 10) .buffer (4) .subscribe (System.out :: println);

Reemplazo de Observables con Flowables

Un método alternativo para reducir el número de emisiones es reemplazar el Observable eso te está causando problemas con una Fluido.

En RxJava 2, el equipo de RxJava decidió dividir el estándar Observable en dos tipos: el tipo regular que hemos estado viendo a lo largo de esta serie, y Fluidos.

Fluidos funciona de la misma manera que Observables, pero con una gran diferencia: FluidoSolo enviamos tantos artículos como el observador solicite. Si tienes un Observable que está emitiendo más elementos de los que su observador asignado puede consumir, entonces puede considerar cambiar a un Fluido en lugar.

Antes de que puedas comenzar a usar FluidoEn sus proyectos, debe agregar la siguiente declaración de importación:

importar io.reactivex.Flowable;

A continuación, puede crear Fluidos utilizando exactamente las mismas técnicas utilizadas para crear Observables. Por ejemplo, cada uno de los siguientes fragmentos de código creará un Fluido que es capaz de emitir datos:

Fluido flowable = Flowable.fromArray (nueva Cadena [] "south", "north", "west", "east"); ... flowable.subscribe ()
Fluido flowable = Flowable.range (0, 20);… flowable.subscribe ()

En este punto, puede que se pregunte: ¿por qué usaría Observables cuando puedo usar Fluido¿Y no tienes que preocuparte por la contrapresión? La respuesta es que un Fluido incurre más de una sobrecarga que un regular Observable, por lo tanto, en el interés de crear una aplicación de alto rendimiento, debe seguir con Observables a menos que sospeche que su aplicación está teniendo problemas con la contrapresión.

Individual

UNA Fluido no es la única variación en Observable que encontrarás en RxJava, ya que la biblioteca también incluye el Soltero clase.

Individual Son útiles cuando solo necesitas emitir un valor. En estos escenarios, creando un Observable puede sentirse como una exageración, pero un Soltero está diseñado para simplemente emitir un solo valor y luego completarse, ya sea llamando al:

  • onSuccess (): Los Soltero emite su único valor.  
  • onError (): Si el Soltero no puede emitir su elemento, luego pasará este método el resultado Tirable.

UNA Soltero llamará a uno de estos métodos solamente, y luego terminará inmediatamente.

Veamos un ejemplo de una Soltero en acción-nuevamente, para ahorrar tiempo estamos reutilizando el código:

importar android.os.Bundle; importar android.support.v7.app.AppCompatActivity; import android.util.Log; importar io.reactivex.Single; importar io.reactivex.SingleObserver; importar io.reactivex.disposables.Disposable; La clase pública MainActivity extiende AppCompatActivity public static final String TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Single.just ("Hello World") .subscribe (getSingleObserver ());  SingleObserver privado getSingleObserver () return new SingleObserver() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe");  @Override public void onSuccess (valor de cadena) Log.e (TAG, "onSuccess:" + valor);  @ Anular el vacío público onError (Throwable e) Log.e (TAG, "onError:"); ; 

Ejecute su proyecto en un AVD o dispositivo físico de Android, y verá la siguiente salida en el Monitor Logcat de Android Studio:

Si cambias de opinión y quieres convertir un Soltero en una Observable en cualquier momento, una vez más, RxJava tiene todos los operadores que necesita, incluidos:

  • fusionarse con(): Fusiona multiples Individual en una sola Observable
  • concatWith (): Cadena los elementos emitidos por multiples. Individual juntos, para formar un Observable emisión. 
  • toObservable (): Convierte a Soltero en una Observable que emite el elemento que fue originalmente emitido por el Single y luego se completa.

Resumen

En esta publicación exploramos algunos operadores de RxJava que puede usar para crear y administrar múltiples subprocesos, sin la complejidad y el potencial de error que tradicionalmente se acompaña de multithreading en Android. También vimos cómo puede utilizar la biblioteca RxAndroid para comunicarse con el importante subproceso de la interfaz de usuario principal de Android mediante una única línea de código, y cómo garantizar que la contrapresión no se convierta en un problema en su aplicación.

Hemos tocado la biblioteca RxAndroid varias veces a lo largo de esta serie, pero esta biblioteca está repleta de enlaces RxJava específicos de Android que pueden ser muy valiosos cuando se trabaja con RxJava en la plataforma Android, por lo que en la publicación final de esta serie estar mirando la biblioteca RxAndroid con mucho más detalle.

Hasta entonces, echa un vistazo a algunas de nuestras otras publicaciones sobre codificación para Android!