Implementando tu propio Writable en Hadoop

0
12374

Implementando tu propio Writable en Hadoop

0. Índice de contenidos.


1. Introducción.

En el anterior tutorial de primeros pasos con MapReduce vimos la forma de implementar un algoritmo MapReduce con Hadoop para calcular el valor medio de los niveles de monóxido de carbono (CO) de cada una de las nueve provincias de Castilla y León. El algoritmo era muy sencillo, nos bastó con emitir en el mapper como clave la provincia y como valor la medida del nivel de CO. El reducer recibía para cada provincia una lista de todos sus niveles y se encargaba de calcular la media.

En este tutorial vamos a extraer algo más de información del dataset, por ejemplo vamos a sacar por cada año la provincia donde se ha registrado el nivel más alto de una sustancia de las que vienen recogidas en el fichero csv. Como en este caso necesitamos tres valores de cada registro, año, provincia y valor de la medida necesitamos crear un registro compuesto para componer la clave que contendrá provincia y año. Para ello vamos a necesitar implementar nuestro propio Writable.

Hadoop por defecto ya nos proporciona unos Writables para los tipos básicos:

  • Text para serializar String
  • IntWritable para serializar Integer
  • FloatWritable (Float), LongWritable (Long), ByteWritable (Byte), DoubleWritable (Double)
  • NullWritable para emitir nulos.

Para casos sencillos como vamos a ver ahora, únicamente recogemos 3 valores, implementar nuestro propio Writable nos soluciona el problema. Si por el contrario necesitamos recoger muchos más campos tendríamos que recurrir a otras estrategias de serialización utilizando librerías como Avro o Thrift de Apache.

Puedes descargarte el código del tutorial desde mi repositorio de github pinchando aquí.

2. Entorno.

El tutorial se ha realizado con el siguiente entorno:

  • Ubuntu 12.04 64 bits
  • Oracle Java SDK 1.6.0_27
  • Apache Hadoop 2.2.0
  • Apache Maven 3.1.1

3. El Writable

Vamos a partir del proyecto que creamos en el tutorial de primeros pasos con MapReduce. En este caso vamos a implementar nuestro propio Writable que utilizaremos para almacenar la clave compuesta por el año y la provincia donde se tomó la medida. Para crear nuestro Writable customizado vamos a crear una clase que implemente el interfaz WritableComparable.

Este interfaz del API de Hadoop es una subinterfaz de la interfaz Writable de Hadoop y la interfaz Comparable de toda la vida. Todo Writable debe tener un constructor por defecto para que el framework MapReduce pueda instanciarlo. Debemos implementar los métodos write utilizado por Hadoop para serializar los valores del objeto a la salida del map y el método readFields de donde los leerá posteriormente para pasárselos a la tarea reduce. Es recomendable también implementar los métodos hashCode e equals.

Al implementar también de Comparable debemos implementar el método compareTo utilizado para ordenar las claves en la fase de suffle and sort.

Por eficiencia es muy recomendable implementar adicionalmente un comparador que pueda comparar los registros de nuestro Writable sin necesidad de deserializarlos en objetos Java.

El bloque estático se encarga de registrar el comparador implementado para ser usado por defecto en la clase MeasureWritable. Es muy eficiente este comparador ya que funciona a nivel de byte.

4. Mapper

Una vez que tenemos nuestro propio Writable encargado de almacenar el año y la provincia vamos a usarlo desde nuestro mapper.

La salida de nuestro mapper será la compuesta por la tupla [MeasureWritable, FloatWritable] que emitirá los valores de nuestro dataset registrando el año y la provincia de la medida como clave y el valor de la medida como valor que emitiremos al reduce. Para poder aprovechar mucho mejor el dataset con los valores de diferentes medidas de la contaminación, la entrada al mapper es parametrizable, es decir cogerá el valor que se le indique por la entrada estándar. Estos valores se corresponden con las posiciones de los datos presentes en el registro.

El método setup se llamará una única vez antes de ejecutar las tareas map y es muy utilizado para inicializar algún recurso. En este caso lo he utilizado para recoger el parámetro indicado para el tipo de medida a consultar. Este parámetro será añadido en el método run cuando se crea el Job.

Para que resulte un poco más sencillo se han guardado en un enumerado asociando el tipo de medida con su posición en la línea del fichero.

5. Reducer

El reducer es muy sencillo, recibirá como clave el writable que contiene por cada año y provincia la lista de medidas tomadas y se encargará de encontrar la mayor de todas. Una vez encontrada emitirá la salida.

El Driver encargado de la ejecución del MapReduce es el típico para una clase configurada con ToolRunner.

Para pasarle el argumento de la entrada que indica el tipo de medida lo hacemos a través de la clase Configuration que recibe el Job cuando es creado.

Ahora que ya tenemos todo el código de nuestro algoritmo lo ejecutamos indicando el tipo de medida que vamos a utilizar, por ejemplo CO. La salida una vez ejecutado el algoritmo queda de la siguiente manera:

Como se puede observar, nos muestra un registro por cada año, indicando la provincia que tuvo el mayor índice registrado de monóxido de carbono y su nivel.

Vamos a probar a sacar los datos de otras medidas que contiene el dataset.

Dióxido de azufre (SO2)

Dióxido de azufre (NO2)

Sin ser muy entendido en la materia a simple vista se puede sacar una conclusión muy importante después de analizar varias medidas tomadas durante varios años y es que los índices de contaminación en general van descendiendo año tras año. Una muy buena noticia!!!

6. Conclusiones.

En esta ocasión hemos aprovechado un poquito más el potencial de Hadoop para analizar un fichero con datos estructurados que nos han ayudado a extraer información importante. Es un pequeño ejemplo de lo mucho que se puede hacer con este framework ya que resulta sencillo programar algoritmos MapReduce y lo más importante que puedan escalar si fuera necesario.

Puedes descargarte el código del tutorial desde mi repositorio de github pinchando aquí.

Espero que te haya sido de ayuda.

Un saludo.

Juan

Dejar respuesta

Please enter your comment!
Please enter your name here