# Pattern 11 — Per-IP rate limiting in 30 lines, no Redis
## The pain
You opened your free-tier API to the public. Within hours, one IP starts hammering it 50 req/sec because they're testing their integration in a tight loop. You need to slow them down without (a) blocking them entirely, (b) running Redis just for this, (c) installing a "professional" rate limiting library that requires a year of config to do the simple thing.
The simple thing is an in-memory dict with a TTL. It costs nothing to run, it's accurate enough for any free-tier rate limit, and it disappears on restart, which is exactly what you want.
## When to use it
- Single-process Python web service (Flask, FastAPI, Bottle, Starlette).
- You want per-IP throttling at the rate of "tens to hundreds of requests per minute per IP".
- You don't need cross-process or cross-host accuracy. (If you do, you're at the scale where Redis is justified anyway.)
- You're OK with the rate limit resetting on service restart.
## The code
```python
import time
from collections import deque
from threading import Lock
from flask import request, abort
# {ip: deque of timestamps within the rolling window}
_hits: dict[str, deque] = {}
_lock = Lock()
_WINDOW_SECONDS = 60
_MAX_PER_WINDOW = 60 # 60 req/min/IP
def _rate_limit():
"""Call this at the top of any rate-limited route handler.
Aborts with 429 if the caller is over their per-window quota.
"""
ip = request.headers.get("X-Forwarded-For", request.remote_addr) or "unknown"
now = time.time()
cutoff = now - _WINDOW_SECONDS
with _lock:
dq = _hits.setdefault(ip, deque())
# Drop hits outside the window
while dq and dq[0] < cutoff:
dq.popleft()
if len(dq) >= _MAX_PER_WINDOW:
retry_after = int(dq[0] + _WINDOW_SECONDS - now) + 1
abort(429, description=f"rate limit: max {_MAX_PER_WINDOW} req per {_WINDOW_SECONDS}s, retry in {retry_after}s")
dq.append(now)
```
Use it in a route:
```python
@app.route("/api/expensive")
def api_expensive():
_rate_limit()
# ... your handler ...
return jsonify(...)
```
That's the whole pattern. ~25 lines of code, one global dict, one lock, one helper function.
## How it works
Each IP gets a `collections.deque` of timestamps. On every request:
1. Read the IP from `X-Forwarded-For` (if behind a reverse proxy) or `remote_addr`.
2. Drop any timestamps older than the window.
3. If the deque has ≥ N entries, abort with 429.
4. Otherwise, append the current timestamp.
The deque grows at most to `_MAX_PER_WINDOW` entries per IP, then stays bounded. Memory is O(unique_ips × max_per_window). For a free tier with 10k unique IPs/day and 60 req max each, that's 600k float entries, or about 5 MB. Negligible.
## When NOT to use it
- **Multi-process deployments** (e.g. gunicorn with --workers=4). Each worker has its own dict and the limit is per-worker, not per-host. The fix is gunicorn `--workers=1 --threads=8` (which is fine for I/O-bound services) or move to Redis.
- **Cross-host deployments**. Same problem at a bigger scale. Use Redis with `INCR` + `EXPIRE`, or a cloud-managed rate limiter.
- **Burst protection where milliseconds matter** (DDoS mitigation). Use a CDN's rate limiter (Cloudflare free tier does this). This pattern is for rate-limiting the *legitimate users who need to be throttled*, not for blocking attacks.
- **You need rate limit headers** (`X-RateLimit-Remaining`, `X-RateLimit-Reset`) for client UX. The pattern can return them but I left them out for brevity. Add them if you ship a public API that documents them.
## Adding headers (optional)
If you want to expose the remaining quota to callers:
```python
from flask import g
def _rate_limit():
ip = request.headers.get("X-Forwarded-For", request.remote_addr) or "unknown"
now = time.time()
cutoff = now - _WINDOW_SECONDS
with _lock:
dq = _hits.setdefault(ip, deque())
while dq and dq[0] < cutoff:
dq.popleft()
remaining = _MAX_PER_WINDOW - len(dq)
if remaining <= 0:
retry_after = int(dq[0] + _WINDOW_SECONDS - now) + 1
abort(429, description=f"rate limit, retry in {retry_after}s")
dq.append(now)
g.rate_limit_remaining = remaining - 1
g.rate_limit_reset = int(now + _WINDOW_SECONDS)
@app.after_request
def add_rate_limit_headers(response):
if hasattr(g, "rate_limit_remaining"):
response.headers["X-RateLimit-Remaining"] = str(g.rate_limit_remaining)
response.headers["X-RateLimit-Reset"] = str(g.rate_limit_reset)
return response
```
## Memory cleanup
The pattern as shown never garbage-collects IPs that haven't called in a while. For a high-cardinality public API, add a periodic cleanup:
```python
import threading
def _gc_loop():
while True:
time.sleep(300) # every 5 min
cutoff = time.time() - _WINDOW_SECONDS
with _lock:
for ip in list(_hits.keys()):
dq = _hits[ip]
while dq and dq[0] < cutoff:
dq.popleft()
if not dq:
del _hits[ip]
threading.Thread(target=_gc_loop, daemon=True).start()
```
## Further reading
- "Rate Limiting" on the Cloudflare blog — for the "what to do at higher scale" version: https://blog.cloudflare.com/counting-things-a-lot-of-different-things/
- The Python `collections.deque` docs — `deque` has O(1) append and popleft, which is what makes the sliding window O(1) per request: https://docs.python.org/3/library/collections.html#collections.deque
- `Flask`'s `abort()` and error handler docs — for customizing the 429 response body: https://flask.palletsprojects.com/en/3.0.x/errorhandling/
## The summary
You don't need Redis for rate limiting. You don't need a library. You need a dict, a deque, and a lock. The 25-line version above has been in production on my Funding Finder API for 10+ days, with rate limit thresholds of 60/600/3000 req/min for free/paid/pro tiers, and has never lost a request or caused a memory issue.