5 of 30 patterns. The other 25 are $19 — see the landing page for the full pack.
★ = free pattern
Pattern 01 · FREE
You wrote a Flask (or FastAPI, or Bottle, or anything-WSGI) app. It runs locally with python app.py. You SSH into a $5 VPS, git pull, and now you need it to run forever: restart on crash, restart on reboot, log somewhere you can tail -f, and let you systemctl restart it without remembering a magic incantation.
Half the internet will tell you to use Docker for this. Docker on a $5 VPS, for one process, with no horizontal scaling, is overkill. systemd has been on every Linux box since 2015 and it does exactly what you need in 12 lines.
journalctl./etc/systemd/system/myapp.service:
[Unit]
Description=My Flask app
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/root/myapp
ExecStart=/usr/bin/python3 /root/myapp/app.py
Restart=on-failure
RestartSec=3
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Then:
systemctl daemon-reload
systemctl enable --now myapp.service
# Check status
systemctl status myapp.service
# View logs (live)
journalctl -u myapp.service -f
# Restart after deploying new code
systemctl restart myapp.service
That's it. There's nothing else you need.
This is a single-host pattern. If you need:
systemd-socket-activation with two services taking turns, or a real load balancer).If you have ≥3 of those needs, look at Docker Swarm or Kubernetes. If you have 0 or 1, stay with this pattern. You'll save 100+ hours of yak-shaving.
User=rootRunning as root is fine on a $5 personal VPS where you're the only user. It's NOT fine if multiple humans share the box, or if the service exposes file uploads / process exec / SQL injection surfaces. For those cases, create a dedicated user:
useradd -r -s /usr/sbin/nologin myapp
chown -R myapp:myapp /root/myapp
# Then in the unit:
# User=myapp
The unit file is unchanged otherwise.
man systemd.service — the canonical reference, surprisingly readableman systemd.unit — the [Unit] section options, including Requires=, After=, Wants=This is the actual unit file running my Funding Finder API service on a $5 Hetzner VPS (truncated from the full file). Resident memory: ~25 MB. Restart count over 30 days: 4 (all clean exits during deploys).
[Unit]
Description=Funding Finder API
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/root/project_30d/artifacts/funding_finder
ExecStart=/usr/bin/python3 /root/project_30d/artifacts/funding_finder/api.py
Restart=on-failure
RestartSec=5
Environment=PORT=8083
[Install]
WantedBy=multi-user.target
12 lines of config. Has been running for 10+ days uninterrupted across many systemctl restart deploys. Zero Docker. Zero Kubernetes. Zero regrets.
Pattern 06 · FREE
You picked SQLite because you didn't want to run a separate database process on your $5 VPS. Then you started writing concurrently from a Flask request handler and a background collector, and SQLite locked up. Or you noticed every INSERT was taking 10+ ms even though the disk is fast. Or both.
The default SQLite mode is rollback journal + synchronous=FULL, which is correct for embedded use cases (mobile, desktop) but wrong for a server-class workload with multiple concurrent writers and an SSD. The fix is two PRAGMAs.
In your DB connection helper, set both PRAGMAs immediately after connecting:
import sqlite3
from contextlib import contextmanager
DB_PATH = "/var/data/myapp.db"
@contextmanager
def conn():
c = sqlite3.connect(DB_PATH)
c.row_factory = sqlite3.Row
c.execute("PRAGMA journal_mode=WAL")
c.execute("PRAGMA synchronous=NORMAL")
try:
yield c
c.commit()
finally:
c.close()
That's it. Two lines. They're idempotent — running them on every connection is fine.
journal_mode=WAL switches SQLite from rollback-journal mode to write-ahead-log mode. The implications:
<dbname>-wal sidecar file and are checkpointed back to the main DB file periodically (default every 1000 pages).synchronous=NORMAL tells SQLite to call fsync() on the WAL file at every commit, but NOT at every page write within a transaction. The implications:
fsync() was in flight when the power went out). The DB itself never corrupts — only that one transaction is lost.synchronous=FULL calls fsync() after every page write, which is paranoia for SSD-class storage and was originally written for spinning rust where fsync() cost was much smaller relative to seek cost.synchronous=FULL instead.For the other 99% of use cases, set both PRAGMAs and move on.
I run a collector that does ~6,800 INSERT OR REPLACE operations every 5 minutes (one per perpetual futures contract across 20 exchanges). With WAL + synchronous=NORMAL on a Hetzner $5 VPS NVMe:
With the default rollback-journal mode, the API server would intermittently return 500s when the collector was committing because of lock contention. With WAL, that's gone.
synchronous PRAGMA spec — https://www.sqlite.org/pragma.html#pragma_synchronousTwo lines. Set them once. Forget about SQLite concurrency for the rest of your life:
c.execute("PRAGMA journal_mode=WAL")
c.execute("PRAGMA synchronous=NORMAL")
Pattern 11 · FREE
You opened your free-tier API to the public. Within hours, one IP starts hammering it 50 req/sec because they're testing their integration in a tight loop. You need to slow them down without (a) blocking them entirely, (b) running Redis just for this, (c) installing a "professional" rate limiting library that requires a year of config to do the simple thing.
The simple thing is an in-memory dict with a TTL. It costs nothing to run, it's accurate enough for any free-tier rate limit, and it disappears on restart, which is exactly what you want.
import time
from collections import deque
from threading import Lock
from flask import request, abort
# {ip: deque of timestamps within the rolling window}
_hits: dict[str, deque] = {}
_lock = Lock()
_WINDOW_SECONDS = 60
_MAX_PER_WINDOW = 60 # 60 req/min/IP
def _rate_limit():
"""Call this at the top of any rate-limited route handler.
Aborts with 429 if the caller is over their per-window quota.
"""
ip = request.headers.get("X-Forwarded-For", request.remote_addr) or "unknown"
now = time.time()
cutoff = now - _WINDOW_SECONDS
with _lock:
dq = _hits.setdefault(ip, deque())
# Drop hits outside the window
while dq and dq[0] < cutoff:
dq.popleft()
if len(dq) >= _MAX_PER_WINDOW:
retry_after = int(dq[0] + _WINDOW_SECONDS - now) + 1
abort(429, description=f"rate limit: max {_MAX_PER_WINDOW} req per {_WINDOW_SECONDS}s, retry in {retry_after}s")
dq.append(now)
Use it in a route:
@app.route("/api/expensive")
def api_expensive():
_rate_limit()
# ... your handler ...
return jsonify(...)
That's the whole pattern. ~25 lines of code, one global dict, one lock, one helper function.
Each IP gets a collections.deque of timestamps. On every request: 1. Read the IP from X-Forwarded-For (if behind a reverse proxy) or remote_addr. 2. Drop any timestamps older than the window. 3. If the deque has ≥ N entries, abort with 429. 4. Otherwise, append the current timestamp.
The deque grows at most to _MAX_PER_WINDOW entries per IP, then stays bounded. Memory is O(unique_ips × max_per_window). For a free tier with 10k unique IPs/day and 60 req max each, that's 600k float entries, or about 5 MB. Negligible.
--workers=1 --threads=8 (which is fine for I/O-bound services) or move to Redis.INCR + EXPIRE, or a cloud-managed rate limiter.X-RateLimit-Remaining, X-RateLimit-Reset) for client UX. The pattern can return them but I left them out for brevity. Add them if you ship a public API that documents them.If you want to expose the remaining quota to callers:
from flask import g
def _rate_limit():
ip = request.headers.get("X-Forwarded-For", request.remote_addr) or "unknown"
now = time.time()
cutoff = now - _WINDOW_SECONDS
with _lock:
dq = _hits.setdefault(ip, deque())
while dq and dq[0] < cutoff:
dq.popleft()
remaining = _MAX_PER_WINDOW - len(dq)
if remaining <= 0:
retry_after = int(dq[0] + _WINDOW_SECONDS - now) + 1
abort(429, description=f"rate limit, retry in {retry_after}s")
dq.append(now)
g.rate_limit_remaining = remaining - 1
g.rate_limit_reset = int(now + _WINDOW_SECONDS)
@app.after_request
def add_rate_limit_headers(response):
if hasattr(g, "rate_limit_remaining"):
response.headers["X-RateLimit-Remaining"] = str(g.rate_limit_remaining)
response.headers["X-RateLimit-Reset"] = str(g.rate_limit_reset)
return response
The pattern as shown never garbage-collects IPs that haven't called in a while. For a high-cardinality public API, add a periodic cleanup:
import threading
def _gc_loop():
while True:
time.sleep(300) # every 5 min
cutoff = time.time() - _WINDOW_SECONDS
with _lock:
for ip in list(_hits.keys()):
dq = _hits[ip]
while dq and dq[0] < cutoff:
dq.popleft()
if not dq:
del _hits[ip]
threading.Thread(target=_gc_loop, daemon=True).start()
collections.deque docs — deque has O(1) append and popleft, which is what makes the sliding window O(1) per request: https://docs.python.org/3/library/collections.html#collections.dequeFlask's abort() and error handler docs — for customizing the 429 response body: https://flask.palletsprojects.com/en/3.0.x/errorhandling/You don't need Redis for rate limiting. You don't need a library. You need a dict, a deque, and a lock. The 25-line version above has been in production on my Funding Finder API for 10+ days, with rate limit thresholds of 60/600/3000 req/min for free/paid/pro tiers, and has never lost a request or caused a memory issue.
Pattern 17 · FREE
You need to call an external API 285 times to fetch funding rates for every perpetual contract on OKX. The exchange rate-limits public endpoints to 10 requests/second. You can either:
1. Sequential: 285 calls × 100 ms each = 28.5 seconds. Way too slow for a 5-minute refresh cycle that has 19 other exchanges to hit. 2. Async with aiohttp + asyncio.gather: fast, but now you've pulled in aiohttp, you've rewritten your code in async def, your test suite needs pytest-asyncio, your stack traces are 80% framework noise, and you're debugging "RuntimeError: This event loop is already running" during interactive REPL sessions. 3. concurrent.futures.ThreadPoolExecutor with requests: 8 worker threads, 8 seconds end-to-end, zero new dependencies, your code stays synchronous, your tests stay synchronous, your stack traces stay readable.
For I/O-bound workloads with a known concurrency cap, option 3 is almost always the right call. Async is for when you have thousands of concurrent connections per process and need fine-grained scheduling. For "fan out 285 HTTP calls under a 10 req/s ceiling", threads are the boring, working answer.
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
session = requests.Session()
session.headers.update({"User-Agent": "myapp/1.0"})
def fetch_one(inst_id: str) -> dict | None:
"""Fetch one instrument's funding rate. Returns None on any failure."""
try:
r = session.get(
"https://www.okx.com/api/v5/public/funding-rate",
params={"instId": inst_id},
timeout=10,
)
if r.status_code != 200:
return None
rows = r.json().get("data", [])
return rows[0] if rows else None
except Exception:
return None
def fetch_all(inst_ids: list[str], workers: int = 8) -> list[dict]:
"""Fan out fetches with `workers` threads. Returns successful results in
completion order (not input order)."""
results = []
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(fetch_one, inst): inst for inst in inst_ids}
for fut in as_completed(futures):
r = fut.result()
if r is not None:
results.append(r)
return results
That's it. Pass a list of N work items, get back a list of results, with up to workers calls in flight at any moment.
The right number is just below the rate limit divided by the per-call latency.
The general rule: start with workers = (rate_limit_per_sec × p95_latency_seconds) × 2 and round up to a small number like 4 or 8. Then verify with the API's response headers (most exchanges expose X-RateLimit-Remaining).
If you go too high and start getting 429s, just lower the worker count. There's no clever solution — just fewer threads.
ProcessPoolExecutor or just call NumPy.asyncio or a message queue. Threads don't compose well past the simplest patterns.The naive version above swallows all errors inside fetch_one and returns None, which is fine for the "best effort, drop failures" use case. For more visibility:
from collections import Counter
def fetch_all_with_stats(inst_ids: list[str], workers: int = 8) -> tuple[list[dict], Counter]:
results = []
failures = Counter()
with ThreadPoolExecutor(max_workers=workers) as pool:
futures = {pool.submit(fetch_one, inst): inst for inst in inst_ids}
for fut in as_completed(futures):
inst = futures[fut]
try:
r = fut.result()
except Exception as e:
failures[type(e).__name__] += 1
continue
if r is None:
failures["empty_response"] += 1
else:
results.append(r)
return results, failures
Now you can log len(results) vs dict(failures) and notice if the failure rate creeps above some threshold.
The OKX fetcher in production:
The async version of this would be ~30% faster end-to-end (5 sec vs 8 sec) but would require: aiohttp, an async test harness, asyncio.run() boilerplate, and an async def rewrite of every fetcher in the file. Not worth the 3 seconds.
concurrent.futures documentation — https://docs.python.org/3/library/concurrent.futures.htmlFor "fan out 100-500 HTTP calls under a rate limit", ThreadPoolExecutor is the boring answer. 8 lines of code, zero new deps, your codebase stays synchronous, your tests stay synchronous, and your stack traces stay readable. Async is for when you need 10,000+ concurrent connections per process. Most projects never get there. Don't pre-optimize.
Pattern 22 · FREE
Your service is in production. Something will eventually go wrong: an exchange API will return garbage, a disk will fill up, a cron job will silently stop running. You need someone (you) to find out within minutes, not hours, not the next morning when a customer emails you.
PagerDuty exists for this. It costs €19/user/month, has a configuration UI deeper than IKEA assembly instructions, and is wildly overkill if you're a solo dev with one production service.
The boring answer: Telegram bot. Free. The Bot API has been stable since 2015. You create a bot in 60 seconds via @BotFather, get a token, and from then on you can curl your bot a message and your phone buzzes within ~2 seconds. There is nothing else to configure.
1. Open Telegram, search for @BotFather, send /newbot. Follow the prompts. Get a token like 1234567890:ABCdEFghIJklMNOpqrSTUvwxyz. Save it as TELEGRAM_BOT_TOKEN. 2. Send any message to your new bot (you have to message it first, before it can message you). 3. Get your chat_id by visiting https://api.telegram.org/bot<TOKEN>/getUpdates in a browser. Look for "chat":{"id":12345678} in the response. Save that as TELEGRAM_CHAT_ID.
That's it. You're done.
A 10-line helper module that any other module can import:
import os
import requests
TOKEN = os.environ.get("TELEGRAM_BOT_TOKEN")
CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID")
def send_alert(message: str, parse_mode: str = "Markdown") -> bool:
"""Send a message to the configured Telegram chat. Returns True on success.
Silent no-op if creds are missing (so dev environments don't spam).
"""
if not (TOKEN and CHAT_ID):
return False
try:
r = requests.post(
f"https://api.telegram.org/bot{TOKEN}/sendMessage",
data={"chat_id": CHAT_ID, "text": message, "parse_mode": parse_mode},
timeout=10,
)
return r.json().get("ok", False)
except Exception:
return False
Use it from anywhere:
from alerts import send_alert
if disk_usage > 0.9:
send_alert(f"⚠️ disk at {disk_usage:.0%} on `{hostname}`")
if collector_age > 600:
send_alert(f"❌ collector stale: last fetch {collector_age}s ago")
# Or for deploys:
send_alert(f"✅ deploy of `{commit_sha[:7]}` complete")
Your phone buzzes within 2 seconds. No SaaS account, no SDK, no webhooks UI, no pager rotation.
The same pattern works for:
@username as the chat_id (no token needed).chat_id env vars.For the solo-dev / small-team / "I just want my phone to buzz when X happens" case, none of these apply. Telegram wins.
Markdown formatting. Use parse_mode=Markdown to get bold (*text*), italic (_text_), inline code (` text `), and code blocks (triple backticks). It makes alerts much easier to scan.
Don't include sensitive secrets. Treat the bot like a webhook with no auth: anyone who has the token can read every message you've ever sent through it. Don't include API keys, customer data, or session tokens in alerts.
Rate-limit your own alerts. If your service catches an exception in a loop, you don't want it to spam Telegram 10 times/sec. Add a deduplication window:
import time
_last_alert: dict[str, float] = {}
def send_alert_once(key: str, message: str, cooldown: int = 600) -> bool:
"""Send an alert at most once per `cooldown` seconds per `key`."""
now = time.time()
if key in _last_alert and now - _last_alert[key] < cooldown:
return False
_last_alert[key] = now
return send_alert(message)
Now you can call send_alert_once("disk_full", "...") from a hot path without flooding.
The Telegram Bot API is free. Forever. No "freemium" rug-pull, no surprise tiers, no token costs after $X. The bot has worked the same way since 2015 and has not become a product they're trying to monetize. It's the closest thing in 2026 to "infrastructure that just works for free, indefinitely."
@BotFather documentation — https://core.telegram.org/bots/features#botfatherYou don't need PagerDuty for a side project. You need a Telegram bot, a 10-line helper, and two env vars. Your phone will buzz within 2 seconds when something breaks, and you'll have spent €0 to set it up. I've used this pattern for every service I've shipped in the last three years.