Video 1: Data Modeling with Pydantic
In the first video, I dive deep into data modeling using Pydantic v2, showing how to create robust, type-safe models with built-in validation. The key insight is using model inheritance to separate concerns between what clients send and what the database returns.
class Product(BaseModel):
"""Fields that are intrinsic to a product"""
name: Annotated[str, Field(min_length=1, max_length=255)]
description: Annotated[Optional[str], Field(None, max_length=1000)]
category: Annotated[str, Field(min_length=1, max_length=100)]
price: Annotated[Decimal, Field(gt=0, decimal_places=2)]
sku: Annotated[str, Field(min_length=1, max_length=50, pattern=r"^[A-Z0-9-]+$")]
quantity: Annotated[int, Field(ge=0)] = 0
status: ProductStatus = ProductStatus.ACTIVE
model_config = ConfigDict(extra="forbid", str_strip_whitespace=True)
class ProductResponse(Product):
"""All product fields plus system-generated fields."""
id: str
etag: str = Field(alias="_etag") # Cosmos DB concurrency control token
last_updated: datetime
Video 2: Client Configuration and Connection Management
The second video covers setting up a production-ready Azure Cosmos DB client with proper authentication, connection pooling, and singleton pattern implementation. I show how to use Azure’s managed identity for secure, credential-free authentication.
async def _ensure_client() -> CosmosClient:
"""
Ensures a single CosmosClient instance exists for the lifetime of the application.
"""
global _client, _credential
if _client is None:
try:
logger.info("Initializing Cosmos DB client with DefaultAzureCredential")
_credential = DefaultAzureCredential()
client_options = {
"connection_timeout": 60,
}
_client = CosmosClient(COSMOSDB_ENDPOINT, _credential, **client_options)
logger.info(
"Cosmos DB client initialized successfully",
extra={
"endpoint": COSMOSDB_ENDPOINT,
"auth_method": "DefaultAzureCredential",
},
)
except Exception as e:
logger.error(f"Failed to initialize Cosmos DB client: {e}")
raise
return _client
Video 3: Async Operations Done Right
In the third video, I demonstrate how to leverage FastAPI’s async capabilities with Azure Cosmos DB’s async SDK. The combination of async/await patterns with dependency injection creates clean, testable code.
async def update_product(
container: ContainerProxy,
identifier: VersionedProductIdentifier,
updates: ProductUpdate
) -> ProductResponse:
"""Update an existing product with optimistic concurrency control."""
update_dict = updates.model_dump(exclude_unset=True)
normalized_category = normalize_category(identifier.category)
update_dict["last_updated"] = datetime.now(timezone.utc).isoformat()
# Create patch operations for partial updates
patch_operations = []
for key, value in update_dict.items():
if key not in ["id", "category", "_etag"]:
patch_operations.append(
{"op": "set", "path": f"/{key}", "value": value}
)
try:
result = await container.patch_item(
item=identifier.id,
partition_key=normalized_category,
patch_operations=patch_operations,
headers={"if-match": identifier.etag}, # ETag for concurrency control
)
return ProductResponse.model_validate(result)
except Exception as e:
handle_cosmos_error(e, operation="update", product_id=identifier.id)
Video 4: Batch Operations for Performance
The fourth video explores batch operations, showing how to process multiple items efficiently using Azure Cosmos DB’s batch API and Python’s asyncio for concurrent execution across partitions.
async def _execute_batch_by_category(
items_by_category: dict[str, list[T]],
process_func: Callable[[str, list[T]], Any],
result_extractor: Callable[[Any], list[R]] | None = None
) -> list[R] | list[Any]:
"""
Generic function to execute batch operations grouped by category
with concurrent processing.
"""
# Schedule tasks to run concurrently for each category
tasks = [
asyncio.create_task(process_func(category, items))
for category, items in items_by_category.items()
]
# Wait for all tasks to complete and gather results
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for exceptions and collect successful results
all_results = []
exceptions = []
for result in results:
if isinstance(result, Exception):
exceptions.append(result)
else:
if result_extractor and result is not None:
all_results.extend(result_extractor(result))
if exceptions:
raise exceptions[0]
return all_results
Video 5: Centralized Error Handling and Logging
The final video demonstrates how to implement centralized error handling that transforms low-level database errors into meaningful application exceptions, complete with structured logging for observability.
def handle_cosmos_error(e: Exception, operation: str, **context: Any) -> None:
"""Convert Cosmos DB exceptions to application-specific exceptions."""
if isinstance(e, CosmosBatchOperationError):
status_code = getattr(e, 'status_code', None)
if status_code == 409:
error_message = str(e).lower()
if "unique index constraint" in error_message and "/sku" in error_message:
sku = context.get("sku", "unknown")
raise ProductDuplicateSKUError(
f"Product with SKU '{sku}' already exists"
) from e
else:
# Handle ID conflicts - prefer showing SKU if available
sku = context.get("sku")
if sku:
raise ProductAlreadyExistsError(
f"Product with SKU '{sku}' already exists"
) from e
@app.exception_handler(ApplicationError)
async def application_error_handler(request: Request, exc: ApplicationError) -> JSONResponse:
"""Handle application-specific errors with structured logging."""
logger = logging.getLogger(request.scope.get("route").endpoint.__module__)
# Log at appropriate level based on status code
log_level = "warning" if exc.status_code < 500 else "error"
extra = {"error_type": type(exc).__name__}
getattr(logger, log_level)(str(exc), extra=extra, exc_info=exc)
Conclusion
- Leveraging Pydantic for strong typing and validation
- Implementing proper async patterns for optimal performance
- Using batch operations to minimize round trips to the database
- Building comprehensive error handling for production reliability
- Maintaining clean architecture with clear separation of concerns
Leave a review
Tell us about your Azure Cosmos DB experience! Leave a review on PeerSpot and we’ll gift you $50. Get started here.
About Azure Cosmos DB
Azure Cosmos DB is a fully managed and serverless NoSQL and vector database for modern app development, including AI applications. With its SLA-backed speed and availability as well as instant dynamic scalability, it is ideal for real-time NoSQL and MongoDB applications that require high performance and distributed computing over massive volumes of NoSQL and vector data.
To stay in the loop on Azure Cosmos DB updates, follow us on X, YouTube, and LinkedIn.
To quickly build your first database, watch our Get Started videos on YouTube and explore ways to dev/test free.
0 comments
Be the first to start the discussion.