Skip to main content

Command Palette

Search for a command to run...

ETL Serverless con AWS Lambda, S3 y Aurora

Transformación de Data CSV y Carga a DB con Data API

Published
6 min read
ETL Serverless con AWS Lambda, S3 y Aurora
R

Ingeniero en Telecomunicaciones con enfoque en redes, conectividad y automatización. Actualmente estudiando AWS, DevOps y CI/CD. Comparto mi camino con proyectos prácticos y artículos técnicos desde Chile 🇨🇱.

https://robertoespana.hashnode.dev/portafolio

La empresa X trabaja con archivos de facturación y se solicitó agregar una columna con los valores en USD, ya que los datos originales incluyen pesos mexicanos (MXN) y dólares canadienses (CAD). Para esto, se diseñó una arquitectura donde tenemos:

  1. Un bucket S3 configurado con un trigger que se activa con el evento s3:ObjectCreated:PUT al recibir un archivo.

  2. Este evento ejecuta una función Lambda.

  3. La Lambda extrae la data, transforma los valores de MXN y CAD a USD y los agrega en la nueva columna (bill_amount_usd) según cada fila.

  4. Por último, sube los datos del archivo modificado a la base de datos serverless (AWS Aurora Serverless) mediante la Data API.

Como ya voy más avanzado, pasaremos directamente al código. Previamente, ya tengo creada la base de datos con la estructura cargada. La columna final bill_amount_usd será donde se cargarán los datos modificados tras extraer la data del CSV y convertir las monedas MXN y CAD a dólares.

Antes de saltar directamente a la implementación con la base de datos, mostraré un SAM local para ver cómo los datos son tomados y transformados a dólares, confirmando la lógica de conversión:


Desglose del Código (Explicado Paso a Paso)

Carga de Librerías y Definición de Clientes/Conexión

Lo primero es importar las librerías necesarias (boto3, io, csv, logging) y definir las constantes de AWS y los clientes, incluyendo los ARN (Amazon Resource Names) del secreto y del clúster de la base de datos Aurora. También se define la tabla de tasas de conversión (currency_conversion_to_usd):

import boto3
import io
import csv
import logging

REGION = 'us-east-1'
DB_NAME = 'estudiodb' 

secret_store_arn = 'arn:aws:secretsmanager:us-east-1:514145637781:secret:mi-proyecto-db-secret-v1-vyvZgo'
db_cluster_arn = 'arn:aws:rds:us-east-1:514145637781:cluster:mi-proyecto-serverless-cluster'

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3', region_name=REGION)
rds_client = boto3.client('rds-data', region_name=REGION)

# Tasas de conversión (ejemplo)
currency_conversion_to_usd = {'USD': 1.0, 'CAD': 0.79, 'MXN': 0.05}

Función de Ejecución SQL (execute_statement)

Esta función es la clave para la Carga (Load), ya que permite ejecutar consultas SQL en Aurora a través de la Data API, usando el secretArn y el resourceArn para la conexión.

def execute_statement(sql_statement, sql_parameters):
    try:
        response = rds_client.execute_statement(
            secretArn=secret_store_arn,
            database=DB_NAME,
            resourceArn=db_cluster_arn,
            sql=sql_statement,
            parameters=sql_parameters
        )
        return response
    except Exception as e:
        logger.error(f'ERROR: Fallo al ejecutar SQL en Aurora: {e}')
        return None

Función de Procesamiento y Transformación (process_record)

Aquí se lleva a cabo la Transformación (Transform). Se toma el registro del CSV, se divide en atributos individuales, se convierte el bill_amount a flotante, se calcula el usd_amount y se utiliza la función execute_statement para insertar los datos en la base.

def process_record(record):

    try:
        # Asignación de atributos (Extracción)
        id_rec, company_name, country, city, product_line, item, bill_date, currency, bill_amount = record
    except ValueError:
        logger.error("Error: El registro CSV no tiene el numero correcto de columnas.")
        return

    try:
        bill_amount = float(bill_amount)
    except ValueError:
        logger.error(f"Error: bill_amount no es un numero válido: {bill_amount}")
        return

    # Transformación de moneda
    rate = currency_conversion_to_usd.get(currency)
    usd_amount = 0.0

    if rate:
        usd_amount = bill_amount * rate
    else:
        logger.info(f'No rate found for currency: {currency}.')

    # Sentencia SQL con parámetros (Carga)
    sql_statement = ("INSERT IGNORE INTO billing_data "
                     "(id, company_name, country, city, product_line, "
                     "item, bill_date, currency, bill_amount, bill_amount_usd) "
                     "VALUES (:id, :company_name, :country, :city, :product_line, "
                     ":item, :bill_date, :currency, :bill_amount, :usd_amount)")

    sql_parameters = [
        {'name':'id',           'value': {'stringValue': str(id_rec)}},
        {'name':'company_name', 'value': {'stringValue': str(company_name)}},
        # ... (resto de parámetros)
        {'name':'usd_amount',   'value': {'stringValue': str(usd_amount)}},
    ]

    response = execute_statement(sql_statement, sql_parameters)
    logger.info(f'SQL execution response: {response}')

Función Principal del Lambda (lambda_handler)

Esta es la función principal que maneja la Extracción (Extract) desde S3.

  1. Recibe el evento, extrae el nombre del bucket y del archivo.

  2. Obtiene el archivo CSV desde S3 usando s3_client.get_object().

  3. Decodifica la data y usa csv.reader para saltar el encabezado y procesar cada fila.

  4. Cada fila (registro) se pasa a la función process_record para su transformación y carga.

def lambda_handler(event, context):
    try: 

        bucket_name = event['Records'][0]['s3']['bucket']['name']
        s3_file = event['Records'][0]['s3']['object']['key']

        logger.info(f"Procesando archivo {s3_file} del bucket {bucket_name}")

        response = s3_client.get_object(Bucket=bucket_name, Key=s3_file)
        data = response['Body'].read().decode('utf-8')

        csv_reader = csv.reader(io.StringIO(data))

        try:
            next(csv_reader) # Saltar encabezado
        except StopIteration:
            logger.info("Archivo CSV vacio.")
            return

        for record in csv_reader:
            if record:
                process_record(record)

        logger.info("Lambda ha terminado la ejecución correctamente.")

    except Exception as e:
        logger.error(f"ERROR: error inesperado en lambda_handler: {e}")
        raise e

Código completo

import boto3
import io
import csv
import logging

REGION = 'us-east-1'
DB_NAME = 'estudiodb' 

secret_store_arn = 'arn:aws:secretsmanager:us-east-1:514145637781:secret:mi-proyecto-db-secret-v1-vyvZgo'
db_cluster_arn = 'arn:aws:rds:us-east-1:514145637781:cluster:mi-proyecto-serverless-cluster'

logger = logging.getLogger()
logger.setLevel(logging.INFO)

s3_client = boto3.client('s3', region_name=REGION)
rds_client = boto3.client('rds-data', region_name=REGION)


currency_conversion_to_usd = {'USD': 1.0, 'CAD': 0.79, 'MXN': 0.05}

def execute_statement(sql_statement, sql_parameters):
    try:
        response = rds_client.execute_statement(
            secretArn=secret_store_arn,
            database=DB_NAME,
            resourceArn=db_cluster_arn,
            sql=sql_statement,
            parameters=sql_parameters
        )
        return response
    except Exception as e:
        logger.error(f'ERROR: Fallo al ejecutar SQL en Aurora: {e}')
        return None

def process_record(record):

    try:
        id_rec, company_name, country, city, product_line, item, bill_date, currency, bill_amount = record
    except ValueError:
        logger.error("Error: El registro CSV no tiene el numero correcto de columnas.")
        return

    try:
        bill_amount = float(bill_amount)
    except ValueError:
        logger.error(f"Error: bill_amount no es un numero válido: {bill_amount}")
        return

    rate = currency_conversion_to_usd.get(currency)
    usd_amount = 0.0

    if rate:
        usd_amount = bill_amount * rate
    else:
        logger.info(f'No rate found for currency: {currency}.')

    sql_statement = ("INSERT IGNORE INTO billing_data "
                     "(id, company_name, country, city, product_line, "
                     "item, bill_date, currency, bill_amount, bill_amount_usd) "
                     "VALUES (:id, :company_name, :country, :city, :product_line, "
                     ":item, :bill_date, :currency, :bill_amount, :usd_amount)")

    sql_parameters = [
        {'name':'id',           'value': {'stringValue': str(id_rec)}},
        {'name':'company_name', 'value': {'stringValue': str(company_name)}},
        {'name':'country',      'value': {'stringValue': str(country)}},
        {'name':'city',         'value': {'stringValue': str(city)}},
        {'name':'product_line', 'value': {'stringValue': str(product_line)}},
        {'name':'item',         'value': {'stringValue': str(item)}},
        {'name':'bill_date',    'value': {'stringValue': str(bill_date)}},
        {'name':'currency',     'value': {'stringValue': str(currency)}},
        {'name':'bill_amount',  'value': {'stringValue': str(bill_amount)}},
        {'name':'usd_amount',   'value': {'stringValue': str(usd_amount)}},
    ]

    response = execute_statement(sql_statement, sql_parameters)
    logger.info(f'SQL execution response: {response}')

def lambda_handler(event, context):
    try: 

        bucket_name = event['Records'][0]['s3']['bucket']['name']
        s3_file = event['Records'][0]['s3']['object']['key']

        logger.info(f"Procesando archivo {s3_file} del bucket {bucket_name}")

        response = s3_client.get_object(Bucket=bucket_name, Key=s3_file)
        data = response['Body'].read().decode('utf-8')

        csv_reader = csv.reader(io.StringIO(data))

        try:
            next(csv_reader) # Saltar encabezado
        except StopIteration:
            logger.info("Archivo CSV vacio.")
            return

        for record in csv_reader:
            if record:
                process_record(record)

        logger.info("Lambda ha terminado la ejecución correctamente.")

    except Exception as e:
        logger.error(f"ERROR: error inesperado en lambda_handler: {e}")
        raise e

Demostración y Resultado

Pruebas Locales con SAM

Cuando ejecuto en modo local con SAM, podemos ver la siguiente salida, mostrando que la extracción, transformación (cálculo en USD) y la simulación de carga de datos (llamadas a execute_statement) fueron ejecutadas con éxito.

Permisos del Rol de Lambda (IAM)

Eliminados los datos de la base de datos (para una práctica limpia), se asignan los permisos necesarios para que Lambda pueda interactuar con la Data API SQL, los Secrets Manager y el bucket S3. Para esto, creé policies definiendo el ARN del clúster y el secret de mi base de datos, lo que resulta en 4 permisos asociados al rol.

Carga y Resultado Final

  1. Verificación de DB Vacía: Hacemos una consulta a la base de datos, devolviendo solo una tabla vacía, corroborando que no hay datos.

  2. Activación del Trigger: Configurado previamente el trigger en S3 (evento PUT), procedo a cargar un archivo dentro del bucket.

    Ejecución Autónoma: Al finalizar la carga, de manera automática se ejecuta Lambda, procesando el archivo como lo expliqué.

  3. Verificación en DB: Vamos a la base de datos, hacemos la consulta y podemos ver la tabla con todos los datos del archivo subido, además del nuevo atributo bill_amount_usd con los valores correctamente convertidos a dólares.

Observabilidad (CloudWatch Logs)

Por último, observamos los logs en CloudWatch, ya que integramos observabilidad a través de logging, además de los logs automáticos que envía Lambda.

Como podemos ver, se procesa cada una de las filas del archivo, ejecutando el script anterior y cargando/modificando los parámetros definidos de manera exitosa.


Con esto doy por finalizado mi quinto proyecto utilizando Boto3 , entrando a lo que ya es ETL: Extraer data, Transformarla y Cargarla, proceso que se utiliza ampliamente hoy en día.

Espero que este proyecto te sea tan útil como a mí.

Saludos.


Referencias y Enlaces