Capítulo 3 Data Pipeline e Infraestructura de AWS
3.1 Data Pipeline
3.1.1 Deployment
Gráfica 2.Data Product Pipeline: Deployment- Script en python que hace peticiones a la API de socrata usando la librería sodapy solicitando todos los registros de la base de datos en formato JSON.
- En la EC2 se recibe el archivo de datos y se guarda en la instancia S3. El script hace las predicciones por fecha y los archivos respuesta se van guardando en una estructura de carpetas dentro de S3. El S3 está encriptado de tal manera que solamente pueden ingresar con las credenciales asignadas.
- Alimentamos el esquema preprocessed en el cual se genera la misma estructura de carpetas y los archivos JSON se guardan en formato parquet.
- Se genera el esquema cleaned a partir del esquema preprocessed. En este esquema se eliminan las columnas que no tienen variabilidad o son en su mayoría valores nulos. Se asignan los tipos de dato a cada columna y se limpian acentos y caracteres extraños de nombres de columnas y observaciones. Se conserva el formato parquet.
Los detalles de la configuración y uso de la arquitura utilizada en AWS se encuenta en scripts aws
3.1.2 Desarrollo
Gráfica 3.Data Product Pipeline: Desarrollo y Producción
3.1.2.1 Extract
- Hacemos una petición a la API solicitando los datos en formato JSON por medio de un script en python. Se hará un petición al día en la que filtramos para incluir solamente los registros que se crearon el día anterior (registros nuevos). La petición actualmente se hace por número de registros. Se modificará para realizar un filtrado utilizando la variable created_date.
- Hacemos una segunda petición a la API solicitando los registros que se cerraron el día anterior en formato JSON. Filtramos la petición por medio de las variables closed_date (actualización de registros existentes).
3.1.2.2 Load
- La petición de los datos en extract nos permite obtener los archivos JSON de la API, uno por cada petición y los registros nuevos se incorporan a la estructura de carpetas en S3 con la fecha de creación del registro (futura estructura). Los registros obtenidos por la variable closed_date se guardan en otra estructura de carpatas generada a partir de las fechas.
3.1.2.3 Transform
- Se corre un script en python (pyspark) que hace consultas a los datos almacenados en parquet, se limpian los datos, se quitan las columnas nulas o que no tienen variabilidad y incorporan los registros al esquema cleaned de la estructura de carpetas.
- Se corre un segundo script en python que transforma el archivo JSON que tiene la actualización de registros existentes a parquet, se lleva al formato cleaned y es guarda en la estructura de carpetas.
Gráfica 4.Extract, Load & Transform
3.2 Infraestructura AWS
Nota: Los scripts asociados a este apartado se encuentran en el archivo conf-AWS.
3.2.1 Configuración del bastión
El bastión es una instancia pequeña t2.micro
con 8gb
de memoria (no es necesario tener algo con mayor capacidad en este punto). Funciona como punto de seguridad y administra a los usuarios para que puedan entrar a la arquitectura de AWS.
Este ec2
vive dentro de una subnet pública en una VPC.
Para tener la conexión al bastión, ocupamos el protocolo ssh
como sigue:
ssh -o "ServerAliveInterval 60" -i /ruta/llave.pem ubuntu@ip-ec2
Para salir, basta con poner exit
en el shell de la instancia.
3.2.2 addus.sh
Este script crea los usuarios dentro del bastión y asigna la misma contraseña a cada uno de ellos, por lo que debemos estar conectados a la instancia. Para poder hacer uso es necesario darle los permisos, es decir, chmod +x addus.sh
. También es importante mencionar que se utiliza un archivo txt users.txt
que contiene el nombre de los usuarios que queremos agregar.
Para la selección de contraseña, utilizamos el encriptador crypt
contenido en python 3.6.5 usando los siguientes comandos:
import crypt
crypt.crypt("contraseña","salt")
La salida de estos comandos debe verse como: 'sa1O7Z1pCJzK.'
; esta debe agregarse al script addus.ssh
en la parte indicada. Para correr el archivo solo es necesario correr el siguiente comando:
./addus.sh
En este punto ya tenemos a los usuarios agregados: todos con la misma contraseña y permisos de super usuario.
3.2.3 Agregar llaves para conexión
Una vez creados los usuarios, copiaremos las llaves públicas de cada uno de los usuarios a su usuario correspondiente.
- Es necesario modificar el archivo
sudo nano /etc/ssh/sshd_config
y cambiar los parámetros a la siguiente configuración:
PubkeyAuthentication yes
PubkeyAuthentication yes
Ahora solo es necesario reiniciar el servicio sshd con el comando:
sudo service sshd restart
- Realizar el copiado de las llaves al bastión, de forma segura:
ssh-copy-id -f -i /ruta/llave/id_llave.pub usuario@ip-ec2
- Finalmente, nos reconectamos al bastión y cambiamos la configuración del archivo
sudo nano /etc/ssh/sshd_config
dejando el siguiente parámetro:
PasswordAuthentication no
- Reiniciamos el servicio
sudo service sshd restart
Para conectarnos usando el protocolo ssh
y la llave privada, utilizamos el siguiente comando:
ssh -o "ServerAliveInterval 60" -i /ruta/llave_privada usuario@ip-ec2
3.2.4 EC2 procesamiento
Esta máquina es para el procesamiento de la información y es donde viven las tareas que realiza luigi
; cabe mencionar que esta instancia es de mayor capacidad. Aquí también debe ser instalado docker
para la reproducibilidad del proyecto.
Esta maquina vive dentro de una VPC con una subnet privada. Para tener acceso a la misma tenemos que hacer un secure copy de la llave.pem al bastion:
ssh -i /tu/ruta/llave-bastion.pem /tu/ruta/llave/ec2/llave.pem ubuntu@ip-ec2:/home/ubuntu/.ssh
Nos conectamos al bastión y en la ruta /home/ubuntu/.ssh
tendremos guardada la llave.pem
. En este punto solo tendremos que hacer la conexión usando el protocolo ssh
.
Para asegurar el correcto funcionamiento del host de luigi se debe habilitar el puerto 8082
. La conexión se realiza usando el siguiente comando en la EC2
luigid
Esto activa el puerto 8082
y permite que nos conectemos usando el navegador de internet en la siguiente liga
ip_ec2:8082
Nota: si falla la conexión a la EC2 y por alguna razón no permite entrar a la liga debemos usar el siguiente comando sudo lsof -t -i tcp:8082 | xargs kill -9
esto termina la conexión previa y así podemos reiniciar.
3.2.5 Para levantar cluster
A continuación se presentan los pasos a seguir para configurar y levantar el cluster, conexión y trabajo dentro del mismo. Para el óptimo desarrollo se tiene como consideración que los datos se encuentran guardados en una S3
.
Conectado en el servicio EMR
de AWS
, damos click en el botón crear cluster
. Nombramos el cluster; en configuración de software escogemos emr-5.29.0
con aplicaciones Spark: Spark 2.4.4 on Hadoop 2.8.5 YARN with Ganglia 3.7.2 and Zeppelin 0.8.2
y finalmente seleccionamos crear cluster
Para la conexión al cluster es recomendable utilizar el explorador chrome
. En el explorador abrimos la siguiente liga para instalar el complemento FoxyProxy Standard
, dentro de este complemento en options > import/export
subimos el archivo foxyproxy-settings.xml
, por último en proxy mode
seleccionamos la opción Use proxies based on their pre-defined patterns and priorities
- Abrimos el tunel
ssh
desde la linea de comandos de la siguiente forma.
ssh -i ~/ruta/llave.pem -ND 8157 hadoop@dns_ip_aws
- Desde
chrome
nos conectamos a la siguiente ruta para utilizarZeppelin
.
dns_ip_aws:8890