PyMongo Esencial: Simplificando la Conexión entre MongoDB y Python

Simplifica las Operaciones con MongoDB usando Python: Una Guía para Principiantes sobre Inserción, Consulta y Agregación de Datos de Forma Eficiente para Tus Proyectos Orientados a Datos

🕒 Tiempo estimado de lectura: 15 minutos

En la era del big data, tener una manera poderosa y flexible de gestionar tus operaciones de base de datos es indispensable. MongoDB es una base de datos NoSQL altamente escalable, conocida por su velocidad y facilidad de uso. Junto con el lenguaje de programación Python, tienes una combinación poderosa para proyectos dinámicos y orientados a datos. PyMongo es la biblioteca preferida para conectar MongoDB con Python. Aquí, te guiaremos por los recursos básicos, intermedios y avanzados del PyMongo, enriquecidos con ejemplos prácticos.

Ahora, vamos a sumergirnos en el mundo de MongoDB en Python con algunos ejemplos prácticos que también están disponibles en Google Colab aquí 👨‍🔬.

Introducción a PyMongo 

PyMongo es el driver oficial de Python para MongoDB, permitiéndote interactuar perfectamente con bases de datos MongoDB. Ya sea que estés trabajando en un script simple o en una aplicación compleja orientada a datos, PyMongo proporciona un conjunto de herramientas completo para todas tus operaciones de base de datos.

Antes de sumergirte en las diversas operaciones, necesitas instalar PyMongo:

pip install pymongo

Operaciones Básicas

Conectando con MongoDB

Primero, vamos a conectar a un servidor MongoDB local. En este ejemplo, estamos usando una instancia gratuita de MongoDB Atlas proporcionada por InfinitePy. Es importante notar que el nombre de usuario y la contraseña están hard-coded en este ejemplo. Por motivos de seguridad, en aplicaciones reales, estas credenciales deben ser gestionadas usando variables de entorno o gestores de secretos.

# Importa la clase MongoClient de la biblioteca pymongo.
# Esto nos permitirá conectar a una base de datos MongoDB.
from pymongo import MongoClient

# Define el nombre de usuario para la base de datos MongoDB. Esto debe ser un usuario válido con permisos adecuados.
username = 'infinitepy'

# Define la contraseña para la base de datos MongoDB. Asegúrate de que esta contraseña sea correcta.
# Nunca expongas ni hard-codes contraseñas en tu código por motivos de seguridad. Usa variables de entorno o gestores de secretos en aplicaciones reales.
password = 'ilovepyth0n'

# Crea un objeto MongoClient. Este objeto representa una conexión de cliente con la base de datos MongoDB.
# Usamos una f-string para formatear la string de conexión con el nombre de usuario y la contraseña.
# El URI de conexión incluye el nombre de usuario, contraseña e información del cluster.
client = MongoClient(f'mongodb+srv://{username}:{password}@infinitepy-cluster.s05fljv.mongodb.net/?retryWrites=true&w=majority&appName=infinitepy-cluster')
Notas sobre los parámetros del URI de conexión:
  • mongodb+srv: Especifica el protocolo a ser usado para conectar al cluster MongoDB mediante una string de conexión 'SRV'.

  • {username}:{password}: Sustituye placeholders por el nombre de usuario y la contraseña reales. ⚠️ Asegúrate de que tus credenciales estén seguras y protegidas.

  • ?retryWrites=true: Asegura que las operaciones de escritura sean repetidas una vez si fallan debido a un error de red.

  • &w=majority: Asegura que las operaciones de escritura solo retornen con éxito una vez que la mayoría de los nodos del cluster reconozcan.

  • &appName=infinitepy-cluster: Especifica un nombre personalizado para esta conexión. Útil para fines de monitoreo.

Listar todos los nombres de bases de datos existentes de un cliente MongoDB:

# Usando la instancia del cliente, obtiene la lista de todos los nombres de bases de datos.
# list_database_names() retorna una lista de strings, donde cada string es el nombre de una base de datos.
for db_name in client.list_database_names():
    # Imprime cada nombre de base de datos en la consola.
    print(db_name)

Ejecutar el código anterior producirá la siguiente salida:

library
sample_db
sample_mflix
admin
local

Insertando un Documento

Una de las operaciones más básicas es insertar documentos en una colección. Aquí hay un ejemplo simple:

# Accede a la base de datos sample_db; si no existe, MongoDB la crea cuando almacenas algún dato en ella por primera vez.
# La sintaxis 'client.sample_db' es usada para acceder a la base de datos.
db = client.sample_db

# Accede a la colección llamada 'mi_coleccion' en la base de datos sample_db.
collection = db.mi_coleccion

# Crea un diccionario representando el documento a ser insertado en la colección.
# El documento contiene información sobre una persona: nombre, edad y ciudad.
documento = {"nombre": "Juan Silva", "edad": 29, "ciudad": "São Paulo"}

# Inserta el documento en la colección 'mi_coleccion'.
# El método insert_one retorna un objeto result que contiene información sobre la inserción.
result = collection.insert_one(documento)

# Imprime el ID único del documento insertado.
# El atributo inserted_id del objeto result contiene este ID.
print("ID del documento insertado: {document_id}".format(document_id=result.inserted_id))

Consultando Documentos 

Para recuperar documentos, puedes usar el método find:

# Accede a la base de datos sample_db; si no existe, MongoDB la crea cuando almacenas algún dato en ella por primera vez.
db = client.sample_db

# Accede a la colección llamada 'mi_coleccion' en la base de datos sample_db.
collection = db.mi_coleccion

# Encuentra un único documento en la colección 'mi_coleccion' donde el campo 'nombre' sea igual a "Juan Silva".
# El método 'find_one' retorna la primera coincidencia encontrada.
result = collection.find_one({"nombre": "Juan Silva"})

# Imprime el resultado en la consola.
# Si un documento con el 'nombre' "Juan Silva" es encontrado, imprimirá ese documento.
# Si ningún documento así existe, imprimirá 'None'.
print(result)

Ejecutar el código anterior producirá la siguiente salida:

{'_id': ObjectId('666a1d2d23a1c5436afa5f1a'), 'nombre': 'Juan Silva', 'edad': 29, 'ciudad': 'São Paulo'}

⚠️ El ObjectId será diferente para cada nuevo documento creado.

Insertando múltiples Documentos

Puedes automatizar la inserción de múltiples documentos a partir de una lista:

# Accede a la base de datos sample_db; MongoDB la crea si no existe.
db = client.sample_db

# Accede a la colección llamada 'mi_coleccion' dentro de la base de datos sample_db.
coleccion = db.mi_coleccion

# Define una lista de documentos para ser insertados en 'mi_coleccion'.
# Cada documento está representado como un diccionario Python con pares clave-valor.
documentos = [
    {"nombre": "Alice", "edad": 25, "ciudad": "Londres"},
    {"nombre": "Bob", "edad": 30, "ciudad": "San Francisco"},
]

# Inserta la lista de documentos en la colección.
# El método insert_many inserta múltiples documentos de una vez y retorna un objeto que contiene los inserted_ids.
resultado = coleccion.insert_many(documentos)

# Imprimir los IDs únicos de los documentos insertados.
print("Documentos insertados con IDs:", resultado.inserted_ids)

Actualizando Documentos

En aplicaciones del mundo real, frecuentemente necesitarás actualizar registros existentes. Vea cómo se puede hacer:

# Accede a la base de datos 'sample_db'; si no existe, MongoDB la crea cuando almacenas algún dato en ella.
db = client.sample_db

# Accede a la colección llamada 'mi_coleccion' en la base de datos 'sample_db'.
coleccion = db.mi_coleccion

# Ejecuta la operación de actualización. El método 'update_one' encuentra el primer documento que coincide con el filtro y actualiza ese documento.
resultado = coleccion.update_one(
    {"nombre": "Juan Silva"},  # Filtro: encuentra el documento donde el nombre es 'Juan Silva'
    {"$set": {"edad": 30}}     # Actualización: establece la edad como 30
)

# Verifica si algún documento fue modificado.
if resultado.modified_count > 0:
    print("Documento actualizado con éxito.")
else:
    print("Ningún documento fue actualizado.")

Borrando Documentos

Eliminar documentos también es algo simple:

# Accede a la base de datos sample_db; MongoDB la crea si no existe.
db = client.sample_db

# Accede a la colección llamada 'mi_coleccion' dentro de la base de datos sample_db.
coleccion = db.mi_coleccion

# Intenta borrar un único documento de la colección 'mi_coleccion' donde el campo 'nombre' es "Juan Silva".
# El método `delete_one` borra el primer documento que corresponde a los criterios del filtro.
resultado = coleccion.delete_one({"nombre": "Juan Silva"})

# Verifica si un documento fue realmente borrado examinando el atributo 'deleted_count'.
# Este atributo retorna el número de documentos que fueron borrados.
if resultado.deleted_count > 0:
    # Si deleted_count es mayor que 0, significa que al menos un documento fue borrado.
    print("Documento borrado con éxito.")
else:
    # Si deleted_count es 0, significa que ningún documento correspondió a los criterios del filtro.
    print("Ningún documento fue borrado.")

Operaciones Avanzadas

Framework de Agregación

El Framework de Agregación de MongoDB es una poderosa herramienta para procesar datos y transformar documentos dentro de una colección. En lugar de usar código imperativo para procesar datos, puedes definir un pipeline de operaciones que el MongoDB ejecutará. Cada etapa en el pipeline transforma los documentos a medida que pasan. El framework de agregación es particularmente útil para tareas como filtrado, agrupamiento, cálculo de promedios y otros tipos de procesamiento de datos.

Conceptos y Etapas Principales

  • Pipeline: Una secuencia de etapas. Los documentos entran en el pipeline y son procesados secuencialmente por cada etapa.

  • Etapa: Cada etapa opera en los documentos y puede realizar operaciones como filtrado, agrupamiento, proyección, ordenamiento y más.

  • Etapas Comunes:

  • $match: Filtra documentos por una condición especificada. Similar a la cláusula WHERE en SQL.

  • $group: Agrupa documentos por una clave especificada y puede calcular valores agregados como sumas, promedios y contajes.

  • $project: Remodela documentos, incluyendo o excluyendo campos o calculando nuevos campos.

  • $sort: Ordena documentos basados en uno o más campos.

  • $limit y $skip: Controlan el número de documentos a retornar, permitiendo paginación.

En el ejemplo de código proporcionado:

  • Paso 1 ($match): El pipeline comienza con el filtrado de documentos. En este caso, solo los documentos en los que el campo edad es mayor o igual a 25 son pasados a la siguiente etapa.

  • Paso 2 ($group): Los documentos filtrados se agrupan por el campo ciudad (_id se define como $city). Para cada ciudad, la edad promedio se calcula utilizando el operador acumulador $avg y el resultado se almacena en el campo average_age.

Este pipeline de dos etapas calcula efectivamente la edad promedio de todas las personas con 25 años o más para cada ciudad en la colección.

# Accede a la base de datos sample_db; MongoDB la crea si no existe.
db = client.sample_db

# Accede a la colección llamada 'mi_coleccion' dentro de la base de datos sample_db.
coleccion = db.mi_coleccion

# Define un pipeline de agregación para procesar documentos en la colección.
# El pipeline es una lista de etapas, donde cada etapa transforma los documentos a medida que pasan.
# Las etapas son procesadas en orden, de la primera a la última.
pipeline = [
    # La etapa $match filtra los documentos por la condición especificada.
    # Aquí, estamos interesados solo en documentos donde el campo edad sea mayor o igual a 25.
    {"$match": {"edad": {"$gte": 25}}},

    # La etapa $group agrupa los documentos por un identificador especificado.
    # Aquí, estamos agrupando documentos por el campo 'ciudad'.
    # El campo '_id' es usado como el identificador del grupo.
    # El campo 'media_edad' es calculado como el valor promedio del campo 'edad' para cada grupo.
    {"$group": {"_id": "$ciudad", "media_edad": {"$avg": "$edad"}}},
]

# Ejecuta el pipeline de agregación en la colección.
# El método aggregate() retorna un iterable con los resultados del pipeline.
resultado = coleccion.aggregate(pipeline)

# Itera sobre el resultado y imprime cada documento.
# Cada documento en el resultado representa una ciudad y su respectiva edad promedio.
for doc in resultado:
    print(doc)

Ejecutar el código anterior producirá la siguiente salida:

{'_id': 'São Paulo', 'media_edad': 29.0}
{'_id': 'Londres', 'media_edad': 25.0}
{'_id': 'San Francisco', 'media_edad': 30.0}

Ejemplo Práctico

Creando un pipeline de agregación complejo para encontrar la edad máxima por ciudad:

# Accede a la base de datos sample_db; MongoDB la crea si no existe.
db = client.sample_db

# Accede a la colección llamada 'mi_coleccion' dentro de la base de datos sample_db.
coleccion = db.mi_coleccion

# Define un pipeline de agregación para procesar los datos en la colección.
# El pipeline contiene una secuencia de etapas, cada una representada por un documento.
pipeline = [
    # La etapa $group agrupa documentos por el campo 'ciudad'.
    # Crea un nuevo documento para cada grupo donde '_id' es el nombre de la ciudad.
    # También calcula la edad máxima para cada grupo y almacena en 'edad_max'.
    {"$group": {"_id": "$ciudad", "edad_max": {"$max": "$edad"}}},

    # La etapa $sort ordena los documentos agrupados por el campo 'edad_max' en orden decreciente (-1).
    {"$sort": {"edad_max": -1}}
]

# Ejecuta el pipeline de agregación en la colección.
# 'coleccion.aggregate(pipeline)' retorna un cursor para el resultado de la agregación.
resultado = coleccion.aggregate(pipeline)

# Itera sobre los documentos en el cursor de resultado.
# Cada documento en 'resultado' es un diccionario que contiene '_id' (el nombre de la ciudad) y 'edad_max'.
for doc in resultado:
    # Imprime el nombre de la ciudad y la edad máxima para cada grupo.
    print(f"Ciudad: {doc['_id']}, Edad Máxima: {doc['edad_max']}")

Ejecutar el código anterior producirá la siguiente salida:

Ciudad: San Francisco, Edad Máxima: 30
Ciudad: São Paulo, Edad Máxima: 29
Ciudad: Londres, Edad Máxima: 25

Proyecto en Acción: De la Teoría a la Práctica 

Ahora que hemos cubierto lo básico, vamos a poner nuestro conocimiento en práctica creando una aplicación simple para gestionar una biblioteca de libros. Este proyecto demostrará operaciones CRUD (Crear, Leer, Actualizar, Borrar) fundamentales usando MongoDB con la ayuda de pymongo, mostrando estos conceptos en un escenario del mundo real. Además, implementaremos técnicas de logging en Python discutidas en nuestro último texto aquí.

import logging

# Crear un objeto logger con el nombre 'mongo_logger'
mongo_logger = logging.getLogger('mongo_logger')

# Definir el nivel de registro como DEBUG. Esto significa que todos los mensajes con nivel DEBUG y superiores serán registrados
mongo_logger.setLevel(logging.DEBUG)

# Crear manejadores que imprimen mensajes de log en la consola (salida estándar) y escriben mensajes de log en un archivo llamado 'mongo_logger.log'
console_handler = logging.StreamHandler()
file_handler = logging.FileHandler('mongo_logger.log')

# Definir un formateador que especifica el formato de los mensajes de log
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# Definir el formateador para el manejador de la consola y para el manejador de archivos
console_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)

# Añadir el manejador de la consola al logger. Esto significa que los mensajes de log serán mostrados en la consola
mongo_logger.addHandler(console_handler)
# Añadir el manejador de archivos al logger. Esto significa que los mensajes de log serán escritos en el archivo 'mongo_logger.log'
mongo_logger.addHandler(file_handler)

# Función para obtener una base de datos específica
def get_database(db_name):
    # Registrar que estamos accediendo a la base de datos especificada
    mongo_logger.info(f"Accediendo a la base de datos '{db_name}'.")
    # Retornar el objeto de la base de datos del cliente
    return client[db_name]

# Función para obtener una colección específica de una base de datos
def get_collection(db, collection_name):
    # Registrar que estamos accediendo a la colección especificada dentro de la base de datos proporcionada
    mongo_logger.info(f"Accediendo a la colección '{collection_name}' dentro de la base de datos '{db.name}'.")
    # Retornar el objeto de la colección de la base de datos
    return db[collection_name]

# Función para insertar múltiples documentos de libros en una colección
def insert_books(collection, books):
    # Registrar que estamos insertando nuevos documentos de libros
    mongo_logger.info("Insertando nuevos documentos de libros en la colección.")
    # Insertar la lista de libros en la colección usando insert_many
    result = collection.insert_many(books)
    # Registrar los IDs de los documentos insertados
    mongo_logger.info(f'IDs de los libros insertados: {result.inserted_ids}')
    # Retornar el resultado de la operación de inserción
    return result

# Función para encontrar libros por un autor específico en una colección
def find_books_by_author(collection, author_name):
    # Registrar que estamos consultando por libros del autor especificado
    mongo_logger.info(f"Consultando la colección por todos los libros del autor {author_name}.")
    # Consultar la colección por libros con el nombre del autor proporcionado
    books = collection.find({"author": author_name})
    # Registrar el número de libros encontrados (convertir el cursor a lista para contar)
    mongo_logger.info(f'Encontrados {len(list(books))} libros del autor {author_name}.')
    # Retornar el cursor para la lista de libros encontrados
    return books

# Función para actualizar el género de un libro por su título
def update_book_genre(collection, title, new_genre):
    # Registrar que estamos actualizando el género del libro especificado
    mongo_logger.info(f"Actualizando el género de '{title}' a '{new_genre}'.")
    # Actualizar el género del libro con el título proporcionado
    update_result = collection.update_one(
        {"title": title},
        {"$set": {"genre": new_genre}}
    )
    # Registrar el número de documentos correspondientes y modificados
    mongo_logger.info(f'Modificados {update_result.matched_count} documento(s) y {update_result.modified_count} documento(s).')
    # Retornar el resultado de la operación de actualización
    return update_result

# Función para borrar un libro por su título
def delete_book(collection, title):
    # Registrar que estamos borrando el libro con el título especificado
    mongo_logger.info(f"Borrando el libro con el título '{title}'.")
    # Borrar el documento del libro con el título proporcionado
    delete_result = collection.delete_one({"title": title})
    # Registrar el número de documentos borrad

Conclusión 

Integrando PyMongo en tus proyectos Python, puedes aprovechar el poder de MongoDB de forma sencilla. Desde inserciones básicas y consultas hasta agregaciones avanzadas e indexación, PyMongo ofrece un conjunto robusto de herramientas para simplificar tus operaciones de base de datos. Seas un desarrollador Python principiante, intermedio o avanzado, esta guía proporciona una base exhaustiva para aprovechar todo el potencial de PyMongo.

No dudes en responder a este boletín con cualquier pregunta o tema que te gustaría que abordáramos en el futuro.

Si te gustó este boletín, no olvides suscribirte para recibir actualizaciones periódicas. Comparte con tus amigos y colegas interesados en Python y vamos a crecer juntos en nuestra comunidad de programadores.

InfinitePy Newsletter - Tu fuente para aprendizaje e inspiración en Python.