ETL Serverless con AWS Lambda, S3 y Aurora
Transformación de Data CSV y Carga a DB con Data API

Ingeniero en Telecomunicaciones con enfoque en redes, conectividad y automatización. Actualmente estudiando AWS, DevOps y CI/CD. Comparto mi camino con proyectos prácticos y artículos técnicos desde Chile 🇨🇱.
https://robertoespana.hashnode.dev/portafolio
La empresa X trabaja con archivos de facturación y se solicitó agregar una columna con los valores en USD, ya que los datos originales incluyen pesos mexicanos (MXN) y dólares canadienses (CAD). Para esto, se diseñó una arquitectura donde tenemos:
Un bucket S3 configurado con un trigger que se activa con el evento
s3:ObjectCreated:PUTal recibir un archivo.Este evento ejecuta una función Lambda.
La Lambda extrae la data, transforma los valores de
MXNyCADaUSDy los agrega en la nueva columna (bill_amount_usd) según cada fila.Por último, sube los datos del archivo modificado a la base de datos serverless (AWS Aurora Serverless) mediante la Data API.

Como ya voy más avanzado, pasaremos directamente al código. Previamente, ya tengo creada la base de datos con la estructura cargada. La columna final bill_amount_usd será donde se cargarán los datos modificados tras extraer la data del CSV y convertir las monedas MXN y CAD a dólares.

Antes de saltar directamente a la implementación con la base de datos, mostraré un SAM local para ver cómo los datos son tomados y transformados a dólares, confirmando la lógica de conversión:

Desglose del Código (Explicado Paso a Paso)
Carga de Librerías y Definición de Clientes/Conexión
Lo primero es importar las librerías necesarias (boto3, io, csv, logging) y definir las constantes de AWS y los clientes, incluyendo los ARN (Amazon Resource Names) del secreto y del clúster de la base de datos Aurora. También se define la tabla de tasas de conversión (currency_conversion_to_usd):
import boto3
import io
import csv
import logging
REGION = 'us-east-1'
DB_NAME = 'estudiodb'
secret_store_arn = 'arn:aws:secretsmanager:us-east-1:514145637781:secret:mi-proyecto-db-secret-v1-vyvZgo'
db_cluster_arn = 'arn:aws:rds:us-east-1:514145637781:cluster:mi-proyecto-serverless-cluster'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3', region_name=REGION)
rds_client = boto3.client('rds-data', region_name=REGION)
# Tasas de conversión (ejemplo)
currency_conversion_to_usd = {'USD': 1.0, 'CAD': 0.79, 'MXN': 0.05}
Función de Ejecución SQL (execute_statement)
Esta función es la clave para la Carga (Load), ya que permite ejecutar consultas SQL en Aurora a través de la Data API, usando el secretArn y el resourceArn para la conexión.
def execute_statement(sql_statement, sql_parameters):
try:
response = rds_client.execute_statement(
secretArn=secret_store_arn,
database=DB_NAME,
resourceArn=db_cluster_arn,
sql=sql_statement,
parameters=sql_parameters
)
return response
except Exception as e:
logger.error(f'ERROR: Fallo al ejecutar SQL en Aurora: {e}')
return None
Función de Procesamiento y Transformación (process_record)
Aquí se lleva a cabo la Transformación (Transform). Se toma el registro del CSV, se divide en atributos individuales, se convierte el bill_amount a flotante, se calcula el usd_amount y se utiliza la función execute_statement para insertar los datos en la base.
def process_record(record):
try:
# Asignación de atributos (Extracción)
id_rec, company_name, country, city, product_line, item, bill_date, currency, bill_amount = record
except ValueError:
logger.error("Error: El registro CSV no tiene el numero correcto de columnas.")
return
try:
bill_amount = float(bill_amount)
except ValueError:
logger.error(f"Error: bill_amount no es un numero válido: {bill_amount}")
return
# Transformación de moneda
rate = currency_conversion_to_usd.get(currency)
usd_amount = 0.0
if rate:
usd_amount = bill_amount * rate
else:
logger.info(f'No rate found for currency: {currency}.')
# Sentencia SQL con parámetros (Carga)
sql_statement = ("INSERT IGNORE INTO billing_data "
"(id, company_name, country, city, product_line, "
"item, bill_date, currency, bill_amount, bill_amount_usd) "
"VALUES (:id, :company_name, :country, :city, :product_line, "
":item, :bill_date, :currency, :bill_amount, :usd_amount)")
sql_parameters = [
{'name':'id', 'value': {'stringValue': str(id_rec)}},
{'name':'company_name', 'value': {'stringValue': str(company_name)}},
# ... (resto de parámetros)
{'name':'usd_amount', 'value': {'stringValue': str(usd_amount)}},
]
response = execute_statement(sql_statement, sql_parameters)
logger.info(f'SQL execution response: {response}')
Función Principal del Lambda (lambda_handler)
Esta es la función principal que maneja la Extracción (Extract) desde S3.
Recibe el evento, extrae el nombre del bucket y del archivo.
Obtiene el archivo CSV desde S3 usando
s3_client.get_object().Decodifica la data y usa
csv.readerpara saltar el encabezado y procesar cada fila.Cada fila (registro) se pasa a la función
process_recordpara su transformación y carga.
def lambda_handler(event, context):
try:
bucket_name = event['Records'][0]['s3']['bucket']['name']
s3_file = event['Records'][0]['s3']['object']['key']
logger.info(f"Procesando archivo {s3_file} del bucket {bucket_name}")
response = s3_client.get_object(Bucket=bucket_name, Key=s3_file)
data = response['Body'].read().decode('utf-8')
csv_reader = csv.reader(io.StringIO(data))
try:
next(csv_reader) # Saltar encabezado
except StopIteration:
logger.info("Archivo CSV vacio.")
return
for record in csv_reader:
if record:
process_record(record)
logger.info("Lambda ha terminado la ejecución correctamente.")
except Exception as e:
logger.error(f"ERROR: error inesperado en lambda_handler: {e}")
raise e
Código completo
import boto3
import io
import csv
import logging
REGION = 'us-east-1'
DB_NAME = 'estudiodb'
secret_store_arn = 'arn:aws:secretsmanager:us-east-1:514145637781:secret:mi-proyecto-db-secret-v1-vyvZgo'
db_cluster_arn = 'arn:aws:rds:us-east-1:514145637781:cluster:mi-proyecto-serverless-cluster'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3', region_name=REGION)
rds_client = boto3.client('rds-data', region_name=REGION)
currency_conversion_to_usd = {'USD': 1.0, 'CAD': 0.79, 'MXN': 0.05}
def execute_statement(sql_statement, sql_parameters):
try:
response = rds_client.execute_statement(
secretArn=secret_store_arn,
database=DB_NAME,
resourceArn=db_cluster_arn,
sql=sql_statement,
parameters=sql_parameters
)
return response
except Exception as e:
logger.error(f'ERROR: Fallo al ejecutar SQL en Aurora: {e}')
return None
def process_record(record):
try:
id_rec, company_name, country, city, product_line, item, bill_date, currency, bill_amount = record
except ValueError:
logger.error("Error: El registro CSV no tiene el numero correcto de columnas.")
return
try:
bill_amount = float(bill_amount)
except ValueError:
logger.error(f"Error: bill_amount no es un numero válido: {bill_amount}")
return
rate = currency_conversion_to_usd.get(currency)
usd_amount = 0.0
if rate:
usd_amount = bill_amount * rate
else:
logger.info(f'No rate found for currency: {currency}.')
sql_statement = ("INSERT IGNORE INTO billing_data "
"(id, company_name, country, city, product_line, "
"item, bill_date, currency, bill_amount, bill_amount_usd) "
"VALUES (:id, :company_name, :country, :city, :product_line, "
":item, :bill_date, :currency, :bill_amount, :usd_amount)")
sql_parameters = [
{'name':'id', 'value': {'stringValue': str(id_rec)}},
{'name':'company_name', 'value': {'stringValue': str(company_name)}},
{'name':'country', 'value': {'stringValue': str(country)}},
{'name':'city', 'value': {'stringValue': str(city)}},
{'name':'product_line', 'value': {'stringValue': str(product_line)}},
{'name':'item', 'value': {'stringValue': str(item)}},
{'name':'bill_date', 'value': {'stringValue': str(bill_date)}},
{'name':'currency', 'value': {'stringValue': str(currency)}},
{'name':'bill_amount', 'value': {'stringValue': str(bill_amount)}},
{'name':'usd_amount', 'value': {'stringValue': str(usd_amount)}},
]
response = execute_statement(sql_statement, sql_parameters)
logger.info(f'SQL execution response: {response}')
def lambda_handler(event, context):
try:
bucket_name = event['Records'][0]['s3']['bucket']['name']
s3_file = event['Records'][0]['s3']['object']['key']
logger.info(f"Procesando archivo {s3_file} del bucket {bucket_name}")
response = s3_client.get_object(Bucket=bucket_name, Key=s3_file)
data = response['Body'].read().decode('utf-8')
csv_reader = csv.reader(io.StringIO(data))
try:
next(csv_reader) # Saltar encabezado
except StopIteration:
logger.info("Archivo CSV vacio.")
return
for record in csv_reader:
if record:
process_record(record)
logger.info("Lambda ha terminado la ejecución correctamente.")
except Exception as e:
logger.error(f"ERROR: error inesperado en lambda_handler: {e}")
raise e
Demostración y Resultado
Pruebas Locales con SAM
Cuando ejecuto en modo local con SAM, podemos ver la siguiente salida, mostrando que la extracción, transformación (cálculo en USD) y la simulación de carga de datos (llamadas a execute_statement) fueron ejecutadas con éxito.


Permisos del Rol de Lambda (IAM)
Eliminados los datos de la base de datos (para una práctica limpia), se asignan los permisos necesarios para que Lambda pueda interactuar con la Data API SQL, los Secrets Manager y el bucket S3. Para esto, creé policies definiendo el ARN del clúster y el secret de mi base de datos, lo que resulta en 4 permisos asociados al rol.



Carga y Resultado Final
Verificación de DB Vacía: Hacemos una consulta a la base de datos, devolviendo solo una tabla vacía, corroborando que no hay datos.

Activación del Trigger: Configurado previamente el trigger en S3 (evento PUT), procedo a cargar un archivo dentro del bucket.
Ejecución Autónoma: Al finalizar la carga, de manera automática se ejecuta Lambda, procesando el archivo como lo expliqué.

Verificación en DB: Vamos a la base de datos, hacemos la consulta y podemos ver la tabla con todos los datos del archivo subido, además del nuevo atributo
bill_amount_usdcon los valores correctamente convertidos a dólares.

Observabilidad (CloudWatch Logs)
Por último, observamos los logs en CloudWatch, ya que integramos observabilidad a través de logging, además de los logs automáticos que envía Lambda.
Como podemos ver, se procesa cada una de las filas del archivo, ejecutando el script anterior y cargando/modificando los parámetros definidos de manera exitosa.

Con esto doy por finalizado mi quinto proyecto utilizando Boto3 , entrando a lo que ya es ETL: Extraer data, Transformarla y Cargarla, proceso que se utiliza ampliamente hoy en día.
Espero que este proyecto te sea tan útil como a mí.
Saludos.



