40 minLesson 33 of 34
Final Projects
Project: REST API with FastAPI
Project: REST API with FastAPI
You'll build a fully-featured REST API for a task management application — complete with authentication, database persistence, input validation, pagination, and proper error handling. This is production-quality code you can deploy and show employers.
What You'll Build
POST /auth/register → Create account
POST /auth/login → Get JWT token
GET /tasks → List tasks (with filters & pagination)
POST /tasks → Create task
GET /tasks/{id} → Get single task
PUT /tasks/{id} → Update task
DELETE /tasks/{id} → Delete task
GET /tasks/stats → Completion stats
GET /health → Health check
Project Structure
task_api/
├── main.py # App entry point
├── database.py # SQLAlchemy setup
├── models.py # Database models
├── schemas.py # Pydantic schemas
├── auth.py # JWT authentication
├── routers/
│ ├── auth.py
│ └── tasks.py
├── requirements.txt
└── .env
Step 1: Setup & Dependencies
# requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.32.0
sqlalchemy==2.0.36
pydantic[email]==2.9.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-dotenv==1.0.1
python-multipart==0.0.12
# .env
DATABASE_URL=sqlite:///./tasks.db
SECRET_KEY=your-super-secret-key-change-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
Step 2: Database Setup
# database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./tasks.db")
engine = create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False} if "sqlite" in DATABASE_URL else {}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Step 3: Database Models
# models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Enum
from sqlalchemy.orm import relationship
from datetime import datetime
import enum
from database import Base
class Priority(str, enum.Enum):
low = "low"
medium = "medium"
high = "high"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String, unique=True, index=True, nullable=False)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
tasks = relationship("Task", back_populates="owner", cascade="all, delete-orphan")
class Task(Base):
__tablename__ = "tasks"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False)
description = Column(String(2000))
completed = Column(Boolean, default=False)
priority = Column(Enum(Priority), default=Priority.medium)
due_date = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
owner_id = Column(Integer, ForeignKey("users.id"), nullable=False)
owner = relationship("User", back_populates="tasks")
Step 4: Pydantic Schemas
# schemas.py
from pydantic import BaseModel, EmailStr, field_validator
from typing import Optional
from datetime import datetime
from models import Priority
# --- User schemas ---
class UserCreate(BaseModel):
email: EmailStr
username: str
password: str
@field_validator('username')
@classmethod
def username_alphanumeric(cls, v):
if not v.replace('_', '').isalnum():
raise ValueError('Username must be alphanumeric (underscores allowed)')
if len(v) < 3:
raise ValueError('Username must be at least 3 characters')
return v
@field_validator('password')
@classmethod
def password_strength(cls, v):
if len(v) < 8:
raise ValueError('Password must be at least 8 characters')
return v
class UserResponse(BaseModel):
id: int
email: str
username: str
is_active: bool
created_at: datetime
model_config = {"from_attributes": True}
# --- Task schemas ---
class TaskCreate(BaseModel):
title: str
description: Optional[str] = None
priority: Priority = Priority.medium
due_date: Optional[datetime] = None
@field_validator('title')
@classmethod
def title_not_empty(cls, v):
if not v.strip():
raise ValueError('Title cannot be empty')
return v.strip()
class TaskUpdate(BaseModel):
title: Optional[str] = None
description: Optional[str] = None
completed: Optional[bool] = None
priority: Optional[Priority] = None
due_date: Optional[datetime] = None
class TaskResponse(BaseModel):
id: int
title: str
description: Optional[str]
completed: bool
priority: Priority
due_date: Optional[datetime]
created_at: datetime
updated_at: datetime
owner_id: int
model_config = {"from_attributes": True}
class PaginatedTasks(BaseModel):
items: list[TaskResponse]
total: int
page: int
per_page: int
pages: int
# --- Auth schemas ---
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
user_id: Optional[int] = None
Step 5: Authentication
# auth.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
import os
from database import get_db
import models
from schemas import TokenData
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
SECRET_KEY = os.getenv("SECRET_KEY", "changeme")
ALGORITHM = os.getenv("ALGORITHM", "HS256")
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "30"))
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
payload = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
payload["exp"] = expire
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session = Depends(get_db)
) -> models.User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload.get("sub")
if user_id is None:
raise credentials_exception
token_data = TokenData(user_id=int(user_id))
except JWTError:
raise credentials_exception
user = db.query(models.User).filter(models.User.id == token_data.user_id).first()
if user is None or not user.is_active:
raise credentials_exception
return user
Step 6: Auth Router
# routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from database import get_db
import models
from schemas import UserCreate, UserResponse, Token
from auth import hash_password, verify_password, create_access_token
router = APIRouter(prefix="/auth", tags=["Authentication"])
@router.post("/register", response_model=UserResponse, status_code=201)
def register(user_data: UserCreate, db: Session = Depends(get_db)):
if db.query(models.User).filter(models.User.email == user_data.email).first():
raise HTTPException(400, detail="Email already registered")
if db.query(models.User).filter(models.User.username == user_data.username).first():
raise HTTPException(400, detail="Username already taken")
user = models.User(
email=user_data.email,
username=user_data.username,
hashed_password=hash_password(user_data.password)
)
db.add(user)
db.commit()
db.refresh(user)
return user
@router.post("/login", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = db.query(models.User).filter(models.User.username == form_data.username).first()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}
)
token = create_access_token({"sub": str(user.id)})
return {"access_token": token, "token_type": "bearer"}
@router.get("/me", response_model=UserResponse)
def get_me(current_user: models.User = Depends(get_current_user_from_auth)):
return current_user
# Import at end to avoid circular
from auth import get_current_user as get_current_user_from_auth
Step 7: Tasks Router
# routers/tasks.py
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from sqlalchemy import func
import math
from database import get_db
import models
from schemas import TaskCreate, TaskUpdate, TaskResponse, PaginatedTasks
from auth import get_current_user
router = APIRouter(prefix="/tasks", tags=["Tasks"])
@router.get("", response_model=PaginatedTasks)
def list_tasks(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
completed: bool | None = None,
priority: str | None = None,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
query = db.query(models.Task).filter(models.Task.owner_id == current_user.id)
if completed is not None:
query = query.filter(models.Task.completed == completed)
if priority:
query = query.filter(models.Task.priority == priority)
total = query.count()
tasks = query.offset((page - 1) * per_page).limit(per_page).all()
return {
"items": tasks,
"total": total,
"page": page,
"per_page": per_page,
"pages": math.ceil(total / per_page)
}
@router.post("", response_model=TaskResponse, status_code=201)
def create_task(
task_data: TaskCreate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = models.Task(**task_data.model_dump(), owner_id=current_user.id)
db.add(task)
db.commit()
db.refresh(task)
return task
@router.get("/stats")
def get_stats(
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
base = db.query(models.Task).filter(models.Task.owner_id == current_user.id)
total = base.count()
completed = base.filter(models.Task.completed == True).count()
by_priority = {
p: base.filter(models.Task.priority == p).count()
for p in ["low", "medium", "high"]
}
return {
"total": total,
"completed": completed,
"pending": total - completed,
"completion_rate": round(completed / total * 100, 1) if total > 0 else 0,
"by_priority": by_priority
}
@router.get("/{task_id}", response_model=TaskResponse)
def get_task(
task_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.owner_id == current_user.id
).first()
if not task:
raise HTTPException(404, detail="Task not found")
return task
@router.put("/{task_id}", response_model=TaskResponse)
def update_task(
task_id: int,
task_data: TaskUpdate,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.owner_id == current_user.id
).first()
if not task:
raise HTTPException(404, detail="Task not found")
update_data = task_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(task, field, value)
db.commit()
db.refresh(task)
return task
@router.delete("/{task_id}", status_code=204)
def delete_task(
task_id: int,
db: Session = Depends(get_db),
current_user: models.User = Depends(get_current_user)
):
task = db.query(models.Task).filter(
models.Task.id == task_id,
models.Task.owner_id == current_user.id
).first()
if not task:
raise HTTPException(404, detail="Task not found")
db.delete(task)
db.commit()
Step 8: Main App
# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from database import engine
import models
from routers import auth, tasks
@asynccontextmanager
async def lifespan(app: FastAPI):
models.Base.metadata.create_all(bind=engine)
yield
app = FastAPI(
title="Task Manager API",
description="A production-quality task management REST API",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)
app.include_router(auth.router)
app.include_router(tasks.router)
@app.get("/health")
def health_check():
return {"status": "ok", "version": "1.0.0"}
Running & Testing
# Run the API
uvicorn main:app --reload
# Visit the auto-generated docs
open http://localhost:8000/docs
# Test with curl
curl -X POST "http://localhost:8000/auth/register" \
-H "Content-Type: application/json" \
-d '{"email":"user@example.com","username":"testuser","password":"password123"}'
# Login and capture token
TOKEN=$(curl -s -X POST "http://localhost:8000/auth/login" \
-d "username=testuser&password=password123" | python3 -c "import sys,json;print(json.load(sys.stdin)['access_token'])")
# Create a task
curl -X POST "http://localhost:8000/tasks" \
-H "Authorization: Bearer $TOKEN" \
-H "Content-Type: application/json" \
-d '{"title":"Buy groceries","priority":"high"}'
# List tasks
curl "http://localhost:8000/tasks?page=1&per_page=10" \
-H "Authorization: Bearer $TOKEN"
This project demonstrates the complete FastAPI stack that powers real production APIs — authentication, database ORM, input validation, pagination, and clean router organization. The same patterns scale to thousands of users.
Next project: AI Writing Assistant — combining everything with the OpenAI and Anthropic APIs.
📱
Get Notes Free →Get this course's notes on Telegram!
Free cheat sheets, summaries & practice exercises