Tutorial: Consolidación y Subida de Datos a Firestore usando Python

Jaime Hernández
4 min readJul 11, 2024

--

Introducción

En este tutorial, aprenderemos cómo consolidar múltiples tablas en archivos JSON en un único archivo, y luego subir esos datos a Firestore usando un script en Python. Esto es útil para integrar datos dispersos en una base de datos centralizada y realizar operaciones de datos más eficientes.

Requisitos Previos

Antes de comenzar, asegúrate de tener lo siguiente instalado y configurado:

  1. Python 3.x
  2. Google Cloud SDK (incluyendo gcloud y gsutil)
  3. Credenciales de una cuenta de servicio de Google Cloud

Paso 1: Configurar el Entorno Virtual de Python

Primero, vamos a crear y activar un entorno virtual para gestionar nuestras dependencias de Python.

python3 -m venv myenv
source myenv/bin/activate

Paso 2: Instalar las Dependencias Necesarias

Instalaremos la biblioteca google-cloud-firestore que nos permitirá interactuar con Firestore.

pip install google-cloud-firestore

Paso 3: Obtener Dumps de las Tablas desde PostgreSQL

Vamos a crear un script Bash para obtener dumps de las tablas desde una base de datos PostgreSQL y convertir esos dumps en archivos JSON.

Script Bash para Obtener Dumps y Convertirlos a JSON

Guarda el siguiente script en un archivo llamado export_to_json.sh.

#!/bin/bash

# Configuración
HOST="localhost"
PORT="5432"
USER="tu_usuario"
DB="tu_base_de_datos"
DIRECTORY="RIM_FIRESTORE"

# Crear el directorio si no existe
mkdir -p $DIRECTORY

# Listar todas las tablas
TABLES=$(psql -h $HOST -p $PORT -U $USER -d $DB -t -c "SELECT tablename FROM pg_tables WHERE schemaname='public';")

# Exportar cada tabla a JSON
for TABLE in $TABLES; do
echo "Exportando tabla: $TABLE"
psql -h $HOST -p $PORT -U $USER -d $DB -c "\copy (SELECT row_to_json(t) FROM (SELECT * FROM $TABLE) t) TO '$DIRECTORY/$TABLE.json'"
done

Ejecutar el Script Bash

Haz que el script sea ejecutable y ejecútalo:

chmod +x export_to_json.sh
./export_to_json.sh

Paso 4: Crear el Script Python para Consolidar y Subir los Datos a Firestore

Vamos a crear un script Python llamado consolidate_upload.py. Este script consolidará los archivos JSON y subirá los datos a Firestore.

Código del Script

Guarda el siguiente código en un archivo llamado consolidate_upload.py.

import os
import json
import sys
from collections import defaultdict
from google.cloud import firestore
from google.oauth2 import service_account

FILES_TO_CONSOLIDATE = [
"1.json",
"2.json",
"3.json",
"4.json",
]

def consolidate_files(directory):
consolidated_data = []
all_fields = set()

# Leer todos los archivos y recolectar todos los campos posibles
for filename in FILES_TO_CONSOLIDATE:
file_path = os.path.join(directory, filename)
if not os.path.exists(file_path):
print(f"Archivo {filename} no encontrado en el directorio {directory}, se omitirá.")
continue

table_name = os.path.splitext(filename)[0] # Remover la extensión .json
with open(file_path, 'r') as f:
for line in f:
try:
document = json.loads(line.strip())
document["source_table"] = table_name # Añadir campo con el nombre de la tabla sin .json
all_fields.update(document.keys())
consolidated_data.append(document)
except json.JSONDecodeError as e:
print(f"Error al decodificar JSON en {file_path}: {e}")

# Homologar la estructura de todos los documentos
homologated_data = []
for document in consolidated_data:
homologated_document = {field: document.get(field, None) for field in all_fields}
homologated_document["source_table"] = document["source_table"] # Asegurar que el campo 'source_table' esté presente
homologated_data.append(homologated_document)

return homologated_data, all_fields

def save_consolidated_data(data, output_file):
with open(output_file, 'w') as f:
json.dump(data, f, indent=4)
print(f"Datos consolidados guardados en {output_file}")

def preview_data(data, fields, num_records_per_table=5):
print("Previsualización de los datos consolidados:")
print(f"Total de registros: {len(data)}")
print(f"Campos homologados: {', '.join(fields)}")

# Previsualizar los registros por tabla de origen
tables = defaultdict(list)
for record in data:
tables[record["source_table"]].append(record)

for table_name, records in tables.items():
print(f"\nTabla: {table_name}")
for record in records[:num_records_per_table]:
print(record)
if len(records) > num_records_per_table:
print(f"...y {len(records) - num_records_per_table} registros más.")

def upload_to_firestore(data, credentials_path, collection_name="consolidate_mpn"):
# Configurar las credenciales del servicio
credentials = service_account.Credentials.from_service_account_file(credentials_path)
db = firestore.Client(credentials=credentials)

total_docs = len(data)
collection_ref = db.collection(collection_name)
for i, document in enumerate(data):
doc_id = str(document.get('id', ''))
if doc_id:
doc_ref = collection_ref.document(doc_id)
doc = doc_ref.get()
if doc.exists:
source_table = document["source_table"]
print(f"Documento con ID {doc_id} de la tabla {source_table} ya existe en la colección {collection_name}. Se eliminará y se volverá a crear.")
doc_ref.delete()

# Insertar documento en Firestore
doc_ref.set(document)
else:
collection_ref.add(document)

# Mostrar el progreso
print(f"Progreso: {i + 1}/{total_docs} documentos subidos ({(i + 1) / total_docs * 100:.2f}%)")

print("Datos consolidados subidos a Firestore.")

if __name__ == "__main__":
if len(sys.argv) != 3:
print("Uso: python consolidate_upload.py <directorio> <ruta_a_credenciales_json>")
sys.exit(1)

directory = sys.argv[1]
credentials_path = sys.argv[2]
output_file = "consolidated_data.json"

# Consolidar archivos
consolidated_data, all_fields = consolidate_files(directory)

# Guardar datos consolidados en un archivo JSON
save_consolidated_data(consolidated_data, output_file)

# Previsualizar datos consolidados
preview_data(consolidated_data, all_fields)

# Preguntar al usuario si desea subir los datos a Firestore
user_input = input("¿Desea subir los datos consolidados a Firestore? (s/n): ")
if user_input.lower() == 's':
upload_to_firestore(consolidated_data, credentials_path)
else:
print("Subida cancelada.")

Descripción del Script

Consolidar Archivos JSON:

  • Lee varios archivos JSON de un directorio.
  • Añade un campo source_table para indicar el archivo de origen de cada registro.
  • Homologa la estructura de los documentos para asegurarse de que todos los registros tienen los mismos campos.

Guardar Datos Consolidados:

  • Guarda los datos consolidados en un archivo JSON.

Previsualizar los Datos:

  • Muestra una previsualización de los datos consolidados en la consola.

Subir Datos a Firestore:

  • Sube los datos consolidados a Firestore en la colección consolidate_mpn.
  • Muestra el progreso de la subida en la consola.
  • Si un documento ya existe, lo elimina y lo vuelve a crear.

Paso 4: Ejecutar el Script

Ejecuta el script pasando el directorio que contiene los archivos JSON y la ruta al archivo de credenciales:

python consolidate_upload.py RIM_FIRESTORE /ruta/a/tus/credenciales.json

Estructura de Directorios y Archivos

Asegúrate de que tu estructura de directorios se vea así:

proyecto/

├── myenv/ (entorno virtual)
├── consolidate_upload.py
├── /ruta/a/tus/credenciales.json
└── ARCHIVOS_DUMP/
├── 1.json
├── 2.json
├── 3.json
├── 4.json
├── ...

--

--