Capítulo 3 Data Pipeline e Infraestructura de AWS

3.1 Data Pipeline

3.1.1 Deployment

Gráfica 2.Data Product Pipeline: Deployment

  1. Script en python que hace peticiones a la API de socrata usando la librería sodapy solicitando todos los registros de la base de datos en formato JSON.
  2. En la EC2 se recibe el archivo de datos y se guarda en la instancia S3. El script hace las predicciones por fecha y los archivos respuesta se van guardando en una estructura de carpetas dentro de S3. El S3 está encriptado de tal manera que solamente pueden ingresar con las credenciales asignadas.
  3. Alimentamos el esquema preprocessed en el cual se genera la misma estructura de carpetas y los archivos JSON se guardan en formato parquet.
  4. Se genera el esquema cleaned a partir del esquema preprocessed. En este esquema se eliminan las columnas que no tienen variabilidad o son en su mayoría valores nulos. Se asignan los tipos de dato a cada columna y se limpian acentos y caracteres extraños de nombres de columnas y observaciones. Se conserva el formato parquet.

Los detalles de la configuración y uso de la arquitura utilizada en AWS se encuenta en scripts aws

3.1.2 Desarrollo

Gráfica 3.Data Product Pipeline: Desarrollo y Producción

3.1.2.1 Extract

  1. Hacemos una petición a la API solicitando los datos en formato JSON por medio de un script en python. Se hará un petición al día en la que filtramos para incluir solamente los registros que se crearon el día anterior (registros nuevos). La petición actualmente se hace por número de registros. Se modificará para realizar un filtrado utilizando la variable created_date.
  2. Hacemos una segunda petición a la API solicitando los registros que se cerraron el día anterior en formato JSON. Filtramos la petición por medio de las variables closed_date (actualización de registros existentes).

3.1.2.2 Load

  1. La petición de los datos en extract nos permite obtener los archivos JSON de la API, uno por cada petición y los registros nuevos se incorporan a la estructura de carpetas en S3 con la fecha de creación del registro (futura estructura). Los registros obtenidos por la variable closed_date se guardan en otra estructura de carpatas generada a partir de las fechas.

3.1.2.3 Transform

  1. Se corre un script en python (pyspark) que hace consultas a los datos almacenados en parquet, se limpian los datos, se quitan las columnas nulas o que no tienen variabilidad y incorporan los registros al esquema cleaned de la estructura de carpetas.
  2. Se corre un segundo script en python que transforma el archivo JSON que tiene la actualización de registros existentes a parquet, se lleva al formato cleaned y es guarda en la estructura de carpetas.

Gráfica 4.Extract, Load & Transform

3.2 Infraestructura AWS

Nota: Los scripts asociados a este apartado se encuentran en el archivo conf-AWS.

3.2.1 Configuración del bastión

El bastión es una instancia pequeña t2.micro con 8gb de memoria (no es necesario tener algo con mayor capacidad en este punto). Funciona como punto de seguridad y administra a los usuarios para que puedan entrar a la arquitectura de AWS.

Este ec2 vive dentro de una subnet pública en una VPC.

Para tener la conexión al bastión, ocupamos el protocolo ssh como sigue:

ssh -o "ServerAliveInterval 60" -i /ruta/llave.pem  ubuntu@ip-ec2

Para salir, basta con poner exit en el shell de la instancia.

3.2.2 addus.sh

Este script crea los usuarios dentro del bastión y asigna la misma contraseña a cada uno de ellos, por lo que debemos estar conectados a la instancia. Para poder hacer uso es necesario darle los permisos, es decir, chmod +x addus.sh. También es importante mencionar que se utiliza un archivo txt users.txt que contiene el nombre de los usuarios que queremos agregar.

Para la selección de contraseña, utilizamos el encriptador crypt contenido en python 3.6.5 usando los siguientes comandos:

import crypt
crypt.crypt("contraseña","salt")

La salida de estos comandos debe verse como: 'sa1O7Z1pCJzK.'; esta debe agregarse al script addus.ssh en la parte indicada. Para correr el archivo solo es necesario correr el siguiente comando:

./addus.sh

En este punto ya tenemos a los usuarios agregados: todos con la misma contraseña y permisos de super usuario.

3.2.3 Agregar llaves para conexión

Una vez creados los usuarios, copiaremos las llaves públicas de cada uno de los usuarios a su usuario correspondiente.

  1. Es necesario modificar el archivo sudo nano /etc/ssh/sshd_config y cambiar los parámetros a la siguiente configuración:
PubkeyAuthentication yes
PubkeyAuthentication yes

Ahora solo es necesario reiniciar el servicio sshd con el comando:

sudo service sshd restart
  1. Realizar el copiado de las llaves al bastión, de forma segura:
ssh-copy-id -f -i /ruta/llave/id_llave.pub usuario@ip-ec2
  1. Finalmente, nos reconectamos al bastión y cambiamos la configuración del archivo sudo nano /etc/ssh/sshd_config dejando el siguiente parámetro:
PasswordAuthentication no
  1. Reiniciamos el servicio sudo service sshd restart

Para conectarnos usando el protocolo ssh y la llave privada, utilizamos el siguiente comando:

ssh -o "ServerAliveInterval 60" -i /ruta/llave_privada  usuario@ip-ec2

3.2.4 EC2 procesamiento

Esta máquina es para el procesamiento de la información y es donde viven las tareas que realiza luigi; cabe mencionar que esta instancia es de mayor capacidad. Aquí también debe ser instalado docker para la reproducibilidad del proyecto.

Esta maquina vive dentro de una VPC con una subnet privada. Para tener acceso a la misma tenemos que hacer un secure copy de la llave.pem al bastion:

ssh -i  /tu/ruta/llave-bastion.pem /tu/ruta/llave/ec2/llave.pem ubuntu@ip-ec2:/home/ubuntu/.ssh

Nos conectamos al bastión y en la ruta /home/ubuntu/.ssh tendremos guardada la llave.pem. En este punto solo tendremos que hacer la conexión usando el protocolo ssh.

Para asegurar el correcto funcionamiento del host de luigi se debe habilitar el puerto 8082. La conexión se realiza usando el siguiente comando en la EC2

luigid

Esto activa el puerto 8082 y permite que nos conectemos usando el navegador de internet en la siguiente liga

ip_ec2:8082

Nota: si falla la conexión a la EC2 y por alguna razón no permite entrar a la liga debemos usar el siguiente comando sudo lsof -t -i tcp:8082 | xargs kill -9 esto termina la conexión previa y así podemos reiniciar.

3.2.5 Para levantar cluster

A continuación se presentan los pasos a seguir para configurar y levantar el cluster, conexión y trabajo dentro del mismo. Para el óptimo desarrollo se tiene como consideración que los datos se encuentran guardados en una S3.

Conectado en el servicio EMR de AWS, damos click en el botón crear cluster. Nombramos el cluster; en configuración de software escogemos emr-5.29.0 con aplicaciones Spark: Spark 2.4.4 on Hadoop 2.8.5 YARN with Ganglia 3.7.2 and Zeppelin 0.8.2 y finalmente seleccionamos crear cluster

Para la conexión al cluster es recomendable utilizar el explorador chrome. En el explorador abrimos la siguiente liga para instalar el complemento FoxyProxy Standard, dentro de este complemento en options > import/export subimos el archivo foxyproxy-settings.xml, por último en proxy mode seleccionamos la opción Use proxies based on their pre-defined patterns and priorities

  • Abrimos el tunel ssh desde la linea de comandos de la siguiente forma.
ssh -i ~/ruta/llave.pem -ND 8157 hadoop@dns_ip_aws
  • Desde chrome nos conectamos a la siguiente ruta para utilizar Zeppelin.
dns_ip_aws:8890