How to Add JWT Authentication in FastAPI – A Practical Guide

FastAPI is a modern, fast, web framework for building APIs with Python 3.6+ based on standard Python type hints. It has become increasingly popular due to its automatic API documentation, built-in validation and serialization, dependency injection, and outstanding performance.

One common requirement when building web applications and APIs is authentication – ensuring only authorized users can access certain routes and perform actions. JSON Web Tokens (JWTs) are a secure, compact way for handling authentication in FastAPI and many other web frameworks.

In this practical guide, we‘ll walk through the process of adding JWT authentication to a FastAPI application step-by-step. You‘ll learn how to:

  • Hash and verify passwords for secure storage
  • Generate JWT access and refresh tokens
  • Create routes for signup, login, and getting the current user
  • Protect routes by requiring a valid access token
  • Handle refresh tokens and token invalidation
  • Deploy your application with authentication

Whether you‘re a beginner or experienced Python developer, by the end of this guide you‘ll have a solid understanding of authentication in FastAPI and be able to implement it in your own projects. Let‘s get started!

Project Setup

First, create a new directory for the project and set up a virtual environment:

mkdir fastapi-jwt-auth
cd fastapi-jwt-auth
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

Install the required packages:

pip install fastapi uvicorn python-jose passlib python-multipart

Here‘s what each library does:

  • fastapi – The FastAPI web framework
  • uvicorn – An ASGI server for running FastAPI
  • python-jose – For generating and verifying JWTs
  • passlib – Password hashing utilities
  • python-multipart – Parsing form data for the login route

Now create a file main.py with a basic FastAPI app:

from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def root():
    return {"message": "Hello World!"}

Start the server with:

uvicorn main:app --reload

Visit http://localhost:8000 in your browser. You should see the JSON response {"message": "Hello World!"}. We now have a working FastAPI app to build upon.

Password Hashing

When allowing users to sign up and log in with a password, it‘s critical to never store plaintext passwords. Instead, passwords should be hashed using a secure one-way hash function. We‘ll use the bcrypt algorithm via passlib.

Create a file auth.py with the following:

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def get_password_hash(password):
    return pwd_context.hash(password)

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

The pwd_context uses the bcrypt hashing algorithm. The get_password_hash function takes a plain password string and returns a hash that can safely be stored in a database. verify_password compares a plain password to a hash and returns True if they match.

Generating JWTs

Next let‘s add functions for generating JWT access and refresh tokens. These will encode a user‘s ID in the token‘s payload.

Add the following to auth.py:

from datetime import datetime, timedelta
from typing import Optional

from fastapi import Depends, HTTPException, status
from jose import JWTError, jwt
from pydantic import BaseModel

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7  # 7 days

class TokenData(BaseModel):
    username: str

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def create_refresh_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=REFRESH_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

The SECRET_KEY is used for signing the JWTs and should be kept secret. The access token will expire after 30 minutes while the refresh token lasts for 7 days.

The create_access_token and create_refresh_token functions take a dictionary payload and optional expiration time, add the expiration to the payload, and return the encoded JWT.

User Registration and Login

With password hashing and JWT generation in place, we can implement the routes for user registration and login.

Define Pydantic models for the request and response data:

from pydantic import BaseModel, EmailStr

class UserCreate(BaseModel):
    username: str
    email: EmailStr
    password: str

class User(BaseModel):
    id: int
    username: str
    email: EmailStr

class UserInDB(User):
    hashed_password: str

class Token(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str

Add a fake "database" for demo purposes:

fake_users_db = {
    "alice": {
        "id": 1,
        "username": "alice",
        "email": "[email protected]",
        "hashed_password": "$2b$12$UNUiNWcgpLwrsivjxRzX9.fhExTqHNAd1s1pu1rXC6nPfThTVIHlK",
    }
}

The hashed password is "secret" from the user alice.

Now add the signup and login routes:

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(db, username: str, password: str):
    user = get_user(db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

@app.post("/signup", response_model=User)
def create_user(user: UserCreate):
    if user.username in fake_users_db:
        raise HTTPException(status_code=400, detail="Username already registered")
    hashed_password = get_password_hash(user.password)
    db_user = UserInDB(
        id=len(fake_users_db) + 1,
        username=user.username,
        email=user.email,
        hashed_password=hashed_password
    )
    fake_users_db[user.username] = db_user.dict()
    return User(**db_user.dict())

@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token = create_access_token(data={"sub": user.username})
    refresh_token = create_refresh_token(data={"sub": user.username})
    return {"access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer"}

The /signup route takes a UserCreate object (username, email, password), hashes the password, saves the user to the "database", and returns a User response.

The /token route is the login endpoint. It authenticates the username and password from the OAuth2PasswordRequestForm, raises an error if invalid, and returns a new access and refresh token if valid.

Protecting Routes

To protect a route so that only authenticated requests with a valid access token can access it, use the oauth2_scheme dependency.

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user

The get_current_user dependency function decodes and validates the JWT in the Authorization header of the request. If valid, it returns the user from the database.

Depending on get_current_user in the /users/me path operation means the route is protected – only requests with a valid access token can call it.

Refresh Tokens

Access tokens are short-lived for security – typically 15-60 minutes. This means the client needs to re-authenticate frequently. Refresh tokens allow the client to get a new access token without entering their credentials each time.

To implement refresh tokens, we‘ll add a new /refresh endpoint that takes a valid refresh token and returns a new access token if the refresh token hasn‘t expired. Update main.py:

@app.post("/refresh", response_model=Token)
async def refresh_token(refresh_token: str):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user(fake_users_db, username=username)
    if user is None:
        raise credentials_exception
    new_access_token = create_access_token(data={"sub": user.username})
    return {"access_token": new_access_token, "refresh_token": refresh_token, "token_type": "bearer"}

The flow is:

  1. Client includes refresh token in request to /refresh
  2. Validate refresh token and extract username
  3. Look up user and generate new access token
  4. Return new access token to client

Rotating refresh tokens, where a new refresh token is issued each time, is a good security practice but beyond the scope of this guide.

Deployment

To deploy the FastAPI app, first set the SECRET_KEY environment variable used to sign the JWTs. Never hardcode secrets!

A production-ready way to run FastAPI is using Gunicorn, a Python WSGI HTTP server:

pip install gunicorn
gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app

This runs 4 worker processes using Uvicorn. See the FastAPI docs for more on deploying to production.

Conclusion

Congratulations! You now have a basic understanding of implementing JWT authentication and protected routes in a FastAPI application. We covered all the key components:

  • Hashing passwords with bcrypt
  • Generating JWTs that encode a user‘s identity
  • Endpoints for registering, logging in, and getting the current user
  • Protecting certain routes by requiring a valid access token
  • Refreshing access tokens with a longer-lived refresh token
  • Securely deploying the application

There are many more advanced concepts to learn, such as OAuth 2.0 flows, refresh token rotation, social login, permission-based authorization, and multi-factor authentication. But with the foundation from this guide, you‘re well on your way to securing your FastAPI apps and APIs.

The full code is available on GitHub. Thanks for reading!

Similar Posts