Skip to main content

Command Palette

Search for a command to run...

Serverless Data Validation Pipeline con AWS Lambda y S3

Uso de S3 como Trigger para AWS Lambda y Boto3: Un Pipeline de Control de Errores con Descarte a Bucket Secundario

Published
9 min read
Serverless Data Validation Pipeline con AWS Lambda y S3
R

Ingeniero en Telecomunicaciones con enfoque en redes, conectividad y automatización. Actualmente estudiando AWS, DevOps y CI/CD. Comparto mi camino con proyectos prácticos y artículos técnicos desde Chile 🇨🇱.

https://robertoespana.hashnode.dev/portafolio

Siguiendo con el aprendizaje, esta vez utilizaré el servicio S3 como activador de mi función Lambda, ejecutando el siguiente flujo: se cargan archivos CSV con data y, al momento de subirse, se produce un evento “s3:ObjectCreated:PUT”, activando mi función Lambda, analizando la data en búsqueda de errores y descartando los archivos a un segundo bucket de errores. Crearemos los buckets y la función, además de asignar el rol con los permisos necesarios, además de ejecutar y observar el funcionamiento analizando los logs generados por este.


Explicación Fundamental

Continuando, debemos realizar la creación de ambos buckets y recordar sus nombres al momento de decodificar el script.

Los datos tienen una estructura tabular, contando un header con atributos: índice, compañía, país, ciudad, tipo de producto, producto, fecha, tipo de moneda y monto.

Explicaré cómo obtenemos la data: recordar que se carga un archivo CSV. Utilizaré SAM para realizar esta breve demostración para que quede más claro lo fundamental del código. Recordar que, para poder tener éxito en el test local, debemos tener los archivos “event.json” y “template.yaml”. Los dejo por si alguien los quiere probar localmente.

{
    "Records": [
        {
            "s3": {
                "bucket": {
                    "name": "proyecto-test-billing", #nombre del bucket
                    "key": "billing_data_dairy_may_2023.csv" #nombre del archivo en el bucket 
                }
            }
        }
    ]
}
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31

Resources:
  BillingBucketParser:
    Type: AWS::Serverless::Function
    Properties:
      Handler: lambda_function.lambda_handler
      Runtime: python3.12
      CodeUri: .

Previamente, en el bucket tengo cargado un archivo con los datos para realizar la demostración.

Imaginemos que, al momento de ejecutar el comando SAM, se estaría realmente subiendo un archivo al bucket, activando el trigger “s3:ObjectCreated:Put”, activando la Lambda. La Lambda recibe el evento y extrae el nombre del bucket y el nombre del archivo subido. Recordar que esta respuesta siempre es un diccionario en bruto.

El siguiente paso importante es extraer la data del archivo. En este caso defino el objeto indicándole el nombre del bucket y el nombre del archivo. Esto lo almaceno y genero una consulta al bucket (object.get()) y del resultado queremos solo extraer el cuerpo del objeto, que serían los datos. Usamos la función read() para leer y recibimos los bytes del archivo. Para hacerlos legibles utilizamos la función decode('utf-8'), que convierte los bytes en texto.

El resultado hasta el momento de obtener el contenido y decodificarlo en texto sería el siguiente. Como podemos ver, contiene saltos de línea \n que entorpecen el analizar la data. Para poder obtener los saltos de línea como tal debemos hacer uso de la función splitlines().

Ejemplo del contenido obtenido:

"id,name,country\n1,Sady,CL\n2,Pedro,MX\n"

Tomando el string completo obtenido y dividiéndolo por saltos de línea quedará tal que así:

"id,name,country",
"1,Sady,CL",
"2,Pedro,MX"

Recordemos que estos son datos tabulares que se componen de header, índice, columnas y filas. Ahora, para poder trabajar de buena manera y analizar cada una de las filas debemos iterar cada una de la siguiente manera. csv.reader nos permite definir saltar el header (que es el título de cada columna y no queremos utilizarlo) y además delimitar, creando columnas dentro de las filas que ya tenemos, permitiendo realizar las comparativas para validar si nuestros datos son correctos o no.

Hasta el momento, la salida sería la siguiente:

['1,Lone Star Lactose,US,San Antonio,Dairy,Artisan Cheese,2023-05-01,USD,3500.00']
['2,Bluebonnet Butter,US,Dallas,Dairy,Grass-Fed Butter,2023-05-02,USD,3600.00']
['3,Houston Creamery,US,Houston,Dairy,Gourmet Yogurt,2023-05-03,USD,3700.00']
['4,Chicago Cheesemonger,US,Chicago,Dairy,Blue Cheese,2023-05-04,USD,3800.00']
['5,Manhattan Milk,US,New York,Dairy,Organic Milk,2023-05-05,USD,3900.00']
['6,Cheese of the North,CA,Vancouver,Dairy,Cheddar Cheese,2023-05-06,CAD,11000.00']
['7,Toronto Lacto,CA,Toronto,Dairy,Probiotic Kefir,2023-05-07,CAD,11200.00']
['8,Montreal Milkman,CA,Montreal,Dairy,Gouda Cheese,2023-05-08,CAD,11400.00']
['9,Lechería del Distrito,MX,Mexico City,Dairy,Monterey Jack Cheese,2023-05-09,MXN,128000.00']
['10,Lácteos Guadalajara,MX,Guadalajara,Dairy,Cream Cheese,2023-05-10,MXN,130000.00']
['11,Pacific Probiotics,US,San Diego,Dairy,Greek Yogurt,2023/05/11,USD,4000.00']
['12,Desert Dairy,US,Phoenix,Dairy,Cottage Cheese,2023-05-12,USD,4100.00']
['13,Mountain Milk,US,Denver,Dairy,Goat Cheese,2023-05-13,USD,4200.00']
['14,Seattle Creamery,US,Seattle,Dairy,Camembert Cheese,2023-05-14,USD,4300.00']
['15,Las Vegas Lacto,US,Las Vegas,Dairy,Burrata Coffee,2023-05-15,USD,4400.00']

La salida total al ejecutar el script de manera local con SAM sería el siguiente.


Desglose del Código (Explicado Paso a Paso)

Carga de Librerías y Recepción del Evento

Una vez explicada la lógica fundamental, ahora desglosaré el código completo de forma clara.

Primero, cargamos las librerías necesarias, las cuales permiten trabajar con AWS (boto3), CSV, fechas y el procesamiento del evento.
Dentro de la función principal Lambda, recibimos el evento. Desde esta misma variable extraemos dos datos clave:

  • el nombre del bucket,

  • y el nombre del archivo subido.

import json
import boto3
import csv
from datetime import datetime

def lambda_handler(event, context):
    s3 = boto3.resource('s3')

    billing_bucket = event['Records'][0]['s3']['bucket']['name']
    csv_file = event['Records'][0]['s3']['object']['key']

Definición del Bucket de Errores

Seguidamente, creamos otra variable que contiene el nombre del bucket de errores, el cual será el encargado de almacenar los archivos que presenten fallos durante la validación.

    error_bucket = 'proyecto-test-billing-errors'

Obtención y Procesamiento del Archivo

Aquí viene la parte donde obtenemos la data del archivo, proceso que ya explique anteriormente:

  • leemos el archivo desde S3,

  • lo decodificamos desde bytes a texto,

  • limpiamos los saltos de línea,

  • y lo dividimos para poder procesarlo correctamente.

    object = s3.Object(billing_bucket, csv_file)
    data = object.get()['Body'].read().decode('utf-8').splitlines()

Variable de Control de Errores

Luego definimos una variable booleana llamada error_found, que inicialmente está en False.
Esta variable cambiará a True en caso de encontrar algún error durante el análisis del archivo.

    error_found = False

Listas de Valores Válidos

También definimos dos listas adicionales.
Estas contienen los valores válidos para comparar con los datos del CSV y así verificar si cada elemento está correcto o no.

    valid_product_lines = ['Bakery', 'Meat', 'Dairy']
    valid_currencies = ['USD', 'MXN', 'CAD']

Iteración y Validación de Filas

Aquí comienza la iteración fila por fila.

Para cada fila, definimos los elementos que vamos a analizar:

  • fecha → elemento número 6,

  • tipo de producto,

  • moneda,

  • monto, etc.

La lógica de validación se repite para cada campo, pero explicaré solo la primera porque todas siguen el mismo formato.

Lógica general de validación

Si un elemento no se encuentra en la lista válida, entonces:

  • error_found cambia a True.

  • Se imprime el tipo de error detectado.

  • Se muestra el índice donde ocurrió.

  • Se imprime el valor incorrecto.

  • Se detiene el ciclo con break.

Este comportamiento se repite para cada campo en cada fila del archivo.

    for row in csv.reader(data[1:], delimiter=','):
        date = row[6]
        product_line = row[4]
        currency = row[7]
        bill_amount = float(row[8])

        if product_line not in valid_product_lines:
            error_found=True
            print(f"Error in record {row[0]}: unrecognized product line {product_line}.")
            break

        if currency not in valid_currencies:
            error_found=True
            print(f"Error in record {row[0]}: unrecognized currencies {currency}.")
            break

        if bill_amount < 0:
            error_found=True
            print(f"Error in record {row[0]}: bill amount cannot be negative ({bill_amount}).")

        try:
            datetime.strptime(date,'%Y-%m-%d')
        except ValueError:
            error_found = True
            print(f"Error in record {row[0]}: incorrect date format {date}.")
            break

Mover el Archivo Erróneo al Bucket de Errores

Luego pasamos a la parte donde movemos los archivos con fallos hacia el bucket de errores.

Creamos una condición que verifica si error_found es True.
Si es así:

  1. Obtenemos la ubicación del archivo.

  2. Intentamos copiarlo desde el bucket original hacia el bucket de errores.

  3. Mostramos un mensaje indicando que fue movido exitosamente.

  4. Procedemos a eliminarlo del bucket original.

  5. Mostramos un mensaje indicando que fue eliminado.

  6. En caso de error, mostramos la excepción capturada.

    if error_found:
        copy_source = {
            'Bucket': 'proyecto-test-billing',
            'Key': csv_file
        }
        try:
            s3.meta.client.copy(copy_source, error_bucket, csv_file)
            print(f"Move erroneous file to: {error_bucket}.")
            s3.Object(billing_bucket, csv_file).delete()
            print('Deleted original file from bucket.')
        except Exception as e:
            print(f"Error while moving the file: {str(e)}.")

Si No Hubo Errores

Finalmente, si error_found nunca cambió a True (es decir, no hubo errores), simplemente retornamos un status 200, indicando:

“El archivo no contiene errores.”

    else:
        return {
            'statusCode': 200,
            'body': json.dumps('No errors found in the CSV file.')
        }

Código completo

import json
import boto3
import csv
from datetime import datetime


def lambda_handler(event, context):
    s3 = boto3.resource('s3')

    billing_bucket = event['Records'][0]['s3']['bucket']['name']
    csv_file = event['Records'][0]['s3']['object']['key']


    error_bucket = 'proyecto-test-billing-errors'

    object = s3.Object(billing_bucket, csv_file)
    data = object.get()['Body'].read().decode('utf-8').splitlines()

    error_found = False


    valid_product_lines = ['Bakery', 'Meat', 'Dairy']
    valid_currencies = ['USD', 'MXN', 'CAD']


    for row in csv.reader(data[1:], delimiter=','):
        date = row[6]
        product_line = row[4]
        currency = row[7]
        bill_amount = float(row[8])


        if product_line not in valid_product_lines:
            error_found=True
            print(f"Error in record {row[0]}: unrecognized product line {product_line}.")
            break
        if currency not in valid_currencies:
            error_found=True
            print(f"Error in record {row[0]}: unrecognized currencies {currency}.")
            break
        if bill_amount < 0:
            error_found=True
            print(f"Error in record {row[0]}: bill amount cannot be negative ({bill_amount}).")

        try:
            datetime.strptime(date,'%Y-%m-%d')
        except ValueError:
            error_found = True
            print(f"Error in record {row[0]}: incorrect date format {date}.")
            #identifica al n11 pq tiene / ya que deberia tener -
            break

    if error_found:
        copy_source = {
            'Bucket': 'proyecto-test-billing',
            'Key': csv_file
            }
        try:
            s3.meta.client.copy(copy_source, error_bucket, csv_file)
            print(f"Move erroneous file to: {error_bucket}.")
            s3.Object(billing_bucket, csv_file).delete()
            print('Deleted original file from bucket.')
        except Exception as e:
            print(f"Error while moving the file: {str(e)}.")


    else:
        return {
            'statusCode': 200,
            'body': json.dumps('No errors found in the CSV file.')
        }

Configuración del Trigger

Cuando alguien suba el archivo a S3, de manera automática se ejecutará el script en búsqueda de que la moneda, montos y productos coincidan con los definidos por la empresa, además de que la fecha esté formateada correctamente. Si una de estas cosas presenta error, automáticamente el archivo es subido a un segundo bucket de errores y eliminado del bucket original.

Pasos importantes:

  1. Actualizar la función Lambda desde el IDE: clic derecho → cargar archivo a Lambda.

  1. Asignar permisos al rol. (En la práctica de ejemplo se puede dar acceso total; en producción usar mínimo privilegio).

  1. Configurar el trigger S3:

    • Servicio: S3

    • Bucket: (el bucket que generará la alerta)

    • Evento: PUT (es decir, s3:ObjectCreated:Put)

    • Agregar el trigger.

Al final, el trigger quedará configurado tal que así y activará la Lambda cuando se suban objetos.


Demostración

Ahora solo queda probar: iremos al bucket en cuestión y cargaremos 3 archivos con los datos de la empresa y observaremos el comportamiento.

Subo los archivos y, inmediatamente, se genera el trigger y los archivos son procesados por la función.

Al recargar los objetos podemos ver que solo hay un archivo disponible en el bucket principal → ese archivo no contiene errores.

En el bucket de errores podemos ver los archivos movidos.

Para saber en qué índice y qué error fueron encontrados en esos archivos, vamos a CloudWatch Logs y listamos los logs generados por la función.

En este caso:

  • El primer archivo se trasladó y eliminó por formato incorrecto de la fecha (registro detectado en logs).

  • El siguiente archivo se trasladó y eliminó por dos cosas:

    • monto negativo

    • producto desconocido (“candy”).

Con esto doy por finalizado mi cuarto proyecto utilizando Boto3. Una pincelada de cómo podemos automatizar procesos, ganar tiempo, reducir errores y ser más eficientes.

Espero que este proyecto te sea tan útil como a mí.

Saludos.


Referencias y Enlaces