Expert guidance for building production-ready microservices e-commerce platforms with Python, FastAPI, and modern DevOps practices. Implements clean architecture, event-driven patterns, and comprehensive observability.
This skill provides expert guidance for developing production-ready microservices-based e-commerce platforms using Python, FastAPI, and modern cloud-native patterns.
Helps you build, maintain, and extend a complete microservices e-commerce system with:
**User Service (Port 8001)**
**Product Service (Port 8002)**
**Order Service (Port 8003)**
**Notification Service (Port 8004)**
**Follow PEP 8 with these conventions:**
```python
class UserRepository(BaseRepository[User]):
pass
def get_user_by_username(username: str) -> Optional[User]:
pass
MAX_RETRY_ATTEMPTS = 3
def _validate_token(self, token: str) -> bool:
pass
```
**Import organization:**
1. Standard library imports
2. Third-party imports
3. Local application imports
**Always use:**
```
service-name/
├── app/
│ ├── api/ # FastAPI routes
│ ├── config/ # Settings and configuration
│ ├── database/ # Database setup
│ ├── models/ # SQLAlchemy models
│ ├── repositories/ # Repository pattern implementation
│ ├── schemas/ # Pydantic schemas
│ ├── services/ # Business logic layer
│ └── utils/ # Utilities (security, clients)
├── alembic/ # Database migrations
├── start.sh # Auto-migration startup script
├── Dockerfile
└── requirements.txt
```
**Always create a base repository:**
```python
from typing import Generic, TypeVar, Type, Optional, List
from sqlalchemy.orm import Session
T = TypeVar('T')
class BaseRepository(Generic[T]):
"""Generic base repository with CRUD operations."""
def __init__(self, db: Session, model: Type[T]):
self.db = db
self.model = model
def create(self, obj_data: dict) -> T:
"""Create new object."""
db_obj = self.model(**obj_data)
self.db.add(db_obj)
self.db.commit()
self.db.refresh(db_obj)
return db_obj
def get_by_id(self, id: int) -> Optional[T]:
"""Get object by ID."""
return self.db.query(self.model).filter(
self.model.id == id
).first()
def get_all(self, skip: int = 0, limit: int = 100) -> List[T]:
"""Get all objects with pagination."""
return self.db.query(self.model).offset(skip).limit(limit).all()
```
**Extend for specific models:**
```python
class UserRepository(BaseRepository[User]):
"""Repository for User model."""
def __init__(self, db: Session):
super().__init__(db, User)
def get_by_username(self, username: str) -> Optional[User]:
"""Get user by username."""
return self.db.query(User).filter(
User.username == username,
User.is_active == True
).first()
```
**Implement business logic in service classes:**
```python
class UserService:
"""Service class for user business logic."""
def __init__(self, db: Session):
self.db = db
self.user_repository = UserRepository(db)
def register_user(self, user_data: UserCreate) -> User:
"""
Register new user with validation.
Args:
user_data: User registration data
Returns:
Created user
Raises:
ValueError: If username already exists
"""
if self.user_repository.exists_by_username(user_data.username):
raise ValueError(f"Username '{user_data.username}' already exists")
hashed_password = get_password_hash(user_data.password)
user_dict = {
"username": user_data.username,
"hashed_password": hashed_password,
"is_active": True
}
return self.user_repository.create(user_dict)
```
**Use comprehensive route documentation:**
```python
@router.post(
"/users",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Create new user",
description="Register a new user account with username and password",
tags=["Users"]
)
async def create_user(
user_data: UserCreate,
db: Session = Depends(get_db)
) -> UserResponse:
"""
Create a new user account.
Args:
user_data: User registration data
db: Database session
Returns:
Created user information
Raises:
HTTPException 400: If username already exists
HTTPException 422: If validation fails
"""
service = UserService(db)
try:
user = service.register_user(user_data)
return UserResponse.model_validate(user)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
```
**Create layered schemas:**
```python
from pydantic import BaseModel, Field, field_validator
from datetime import datetime
class UserBase(BaseModel):
"""Base schema with common attributes."""
username: str = Field(..., min_length=3, max_length=50)
class UserCreate(UserBase):
"""Schema for user creation."""
password: str = Field(..., min_length=6)
@field_validator('password')
def validate_password(cls, v):
if not any(c.isupper() for c in v):
raise ValueError('Password must contain uppercase letter')
return v
class UserResponse(UserBase):
"""Schema for user response (no password)."""
id: int
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
```
```python
from sqlalchemy import Column, Integer, String, Boolean, DateTime, func
from app.database import Base
class User(Base):
"""User model for authentication."""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String(50), unique=True, nullable=False, index=True)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True, nullable=False)
created_at = Column(DateTime, server_default=func.now(), nullable=False)
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
```
**Password hashing with bcrypt:**
```python
import bcrypt
def get_password_hash(password: str) -> str:
"""Hash password using bcrypt."""
hashed = bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
return hashed.decode('utf-8')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
return bcrypt.checkpw(
plain_password.encode('utf-8'),
hashed_password.encode('utf-8')
)
```
**Configuration with environment variables:**
```python
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Application settings from environment variables."""
DATABASE_URL: str
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
USER_SERVICE_URL: str = "http://user-service:8001"
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
```
**REST API client for token validation:**
```python
import httpx
from typing import Optional
class AuthClient:
"""Client for User Service authentication."""
def __init__(self, base_url: str):
self.base_url = base_url
async def validate_token(self, token: str) -> Optional[dict]:
"""
Validate JWT token via User Service.
Args:
token: JWT token to validate
Returns:
User info if valid, None if invalid
"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{self.base_url}/validate-token",
json={"token": token},
timeout=5.0
)
if response.status_code == 200:
data = response.json()
if data.get("valid"):
return {
"username": data.get("username"),
"user_id": data.get("user_id")
}
return None
except httpx.RequestError as e:
logger.error(f"Error validating token: {e}")
return None
```
**RabbitMQ event publisher:**
```python
import aio_pika
import json
class EventPublisher:
"""RabbitMQ event publisher."""
async def publish_order_created(self, order_data: dict):
"""Publish order.created event."""
connection = await aio_pika.connect_robust(
f"amqp://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASSWORD}"
f"@{settings.RABBITMQ_HOST}:{settings.RABBITMQ_PORT}/"
)
async with connection:
channel = await connection.channel()
exchange = await channel.declare_exchange(
"order_events",
aio_pika.ExchangeType.FANOUT
)
message = aio_pika.Message(
body=json.dumps(order_data).encode(),
content_type="application/json"
)
await exchange.publish(message, routing_key="")
```
**RabbitMQ event consumer:**
```python
class EventConsumer:
"""RabbitMQ event consumer."""
async def consume_order_notifications(self):
"""Consume order notification events."""
connection = await aio_pika.connect_robust(
f"amqp://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASSWORD}"
f"@{settings.RABBITMQ_HOST}:{settings.RABBITMQ_PORT}/"
)
channel = await connection.channel()
queue = await channel.declare_queue("order_notifications", durable=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
data = json.loads(message.body.decode())
await self.process_notification(data)
```
```python
import redis.asyncio as redis
import json
class CacheService:
"""Redis cache service."""
def __init__(self):
self.redis = redis.from_url(settings.REDIS_URL)
async def get_product(self, product_id: int) -> Optional[dict]:
"""Get product from cache."""
data = await self.redis.get(f"product:{product_id}")
return json.loads(data) if data else None
async def set_product(self, product_id: int, product_data: dict, ttl: int = 300):
"""Set product in cache with TTL."""
await self.redis.setex(
f"product:{product_id}",
ttl,
json.dumps(product_data)
)
async def invalidate_product(self, product_id: int):
"""Invalidate product cache."""
await self.redis.delete(f"product:{product_id}")
```
```python
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> int:
"""Get current authenticated user ID from JWT token."""
auth_client = AuthClient(settings.USER_SERVICE_URL)
user_info = await auth_client.validate_token(token)
if not user_info:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials"
)
return user_info["user_id"]
@router.get("/me")
async def get_current_user_info(
user_id: int = Depends(get_current_user),
db: Session = Depends(get_db)
):
service = UserService(db)
return service.get_user_by_id(user_id)
```
1. **Database per Service**: Each microservice has its own PostgreSQL database
2. **Authentication Delegation**: Services validate tokens via User Service REST API (never share JWT secrets)
3. **Cache-Aside Pattern**: Read from cache first, fetch from DB on miss, update cache
4. **Event-Driven**: Use RabbitMQ for asynchronous communication between services
5. **Repository Pattern**: Separate data access from business logic
6. **Service Layer**: Encapsulate business rules and validation
7. **Clean Architecture**: Clear separation of concerns (API → Service → Repository → Model)
Leave a review
No reviews yet. Be the first to review this skill!
# Download SKILL.md from killerskills.ai/api/skills/python-microservices-e-commerce-development/raw