Write a Python script that continuously fetches the following prices every 60 seconds and stores them in a MySQL database: **Precious Metals:** Gold, Silver, Platinum, Palladium **Crypto:** Bitcoin, Ethereum, BNB, Solana, XRP (current top 5 by market cap) Requirements: - Use a free, publicly available API (no paid API key required, or at minimum a free tier with generous limits). Clearly state which API(s) you're using and any signup steps needed. - Fetch all prices in USD. - Store results in a MySQL database with appropriate table schema(s). Include the SQL to create the table(s). - Each record should include the asset name/symbol, price, and timestamp. - The script should run indefinitely, polling every 60 seconds. - Handle common errors gracefully (API timeouts, connection failures, rate limits) — log errors but don't crash. - Use clear, well-structured code with comments. - List all required Python packages (e.g., `requests`, `mysql-connector-python`, etc.) and how to install them.
Precious Metals & Crypto Price Tracker
APIs Used
| Data | API | Signup Required? |
|---|---|---|
| Crypto | CoinGecko API v3 (free, no key needed) | No — 10-30 calls/min free |
| Precious Metals | Metals.dev API (free tier, key required) | Yes — free signup, 50 req/day on free tier |
Why these? CoinGecko's
/simple/priceendpoint is fully open. For metals, truly free no-key APIs are essentially extinct in 2024-2025. Metals.dev offers the most generous free tier. Alternative: GoldAPI.io (free tier, 300 req/month). I provide a fallback architecture so you can swap providers easily.
Required Packages
pip install requests mysql-connector-python python-dotenv
| Package | Purpose |
|---|---|
requests | HTTP calls to APIs |
mysql-connector-python | MySQL database connector |
python-dotenv | Load DB credentials from .env file |
MySQL Table Schema
-- Run this in MySQL before starting the script
CREATE DATABASE IF NOT EXISTS price_tracker
CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE price_tracker;
CREATE TABLE IF NOT EXISTS asset_prices (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
asset_name VARCHAR(50) NOT NULL COMMENT 'Human-readable name (e.g. Gold, Bitcoin)',
symbol VARCHAR(10) NOT NULL COMMENT 'Ticker symbol (e.g. XAU, BTC)',
asset_type ENUM('metal', 'crypto') NOT NULL,
price_usd DECIMAL(18, 8) NOT NULL COMMENT 'Price in USD',
fetched_at DATETIME NOT NULL COMMENT 'Timestamp when price was fetched',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_symbol_time (symbol, fetched_at),
INDEX idx_type_time (asset_type, fetched_at)
) ENGINE=InnoDB;
.env File
Create a .env file in the same directory as the script:
MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=your_password_here
MYSQL_DATABASE=price_tracker
# Sign up free at https://metals.dev — paste your API key here
METALS_API_KEY=your_metals_dev_api_key_here
The Python Script
#!/usr/bin/env python3
"""
price_tracker.py
~~~~~~~~~~~~~~~~
Continuously fetches precious metal & top crypto prices every 60 seconds
and stores them in a MySQL database.
APIs:
- Crypto: CoinGecko (free, no key)
- Metals: Metals.dev (free tier, key required — sign up at https://metals.dev)
Usage:
1. pip install requests mysql-connector-python python-dotenv
2. Create MySQL database using the schema in the README
3. Fill in .env with your credentials
4. python price_tracker.py
"""
import os
import sys
import time
import logging
from datetime import datetime, timezone
from decimal import Decimal
import requests
import mysql.connector
from mysql.connector import Error as MySQLError
from dotenv import load_dotenv
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
load_dotenv() # Load .env file
# Polling interval in seconds
POLL_INTERVAL = 60
# MySQL connection parameters
DB_CONFIG = {
"host": os.getenv("MYSQL_HOST", "localhost"),
"port": int(os.getenv("MYSQL_PORT", 3306)),
"user": os.getenv("MYSQL_USER", "root"),
"password": os.getenv("MYSQL_PASSWORD", ""),
"database": os.getenv("MYSQL_DATABASE", "price_tracker"),
}
# Metals.dev API key (free tier: 50 requests/day)
METALS_API_KEY = os.getenv("METALS_API_KEY", "")
# CoinGecko endpoint — no key needed
COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"
# Metals.dev endpoint
METALS_URL = "https://api.metals.dev/v1/latest"
# Mapping: CoinGecko IDs → (display_name, symbol)
CRYPTO_ASSETS = {
"bitcoin": ("Bitcoin", "BTC"),
"ethereum": ("Ethereum", "ETH"),
"binancecoin": ("BNB", "BNB"),
"solana": ("Solana", "SOL"),
"ripple": ("XRP", "XRP"),
}
# Mapping: metals.dev keys → (display_name, symbol)
# metals.dev returns keys like "gold", "silver", "platinum", "palladium"
METAL_ASSETS = {
"gold": ("Gold", "XAU"),
"silver": ("Silver", "XAG"),
"platinum": ("Platinum", "XPT"),
"palladium": ("Palladium", "XPD"),
}
# Request timeout (seconds)
REQUEST_TIMEOUT = 15
# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler("price_tracker.log", encoding="utf-8"),
],
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------
def get_db_connection():
"""Create and return a new MySQL connection."""
return mysql.connector.connect(**DB_CONFIG)
def ensure_table_exists(conn):
"""Create the asset_prices table if it doesn't exist."""
ddl = """
CREATE TABLE IF NOT EXISTS asset_prices (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
asset_name VARCHAR(50) NOT NULL,
symbol VARCHAR(10) NOT NULL,
asset_type ENUM('metal', 'crypto') NOT NULL,
price_usd DECIMAL(18, 8) NOT NULL,
fetched_at DATETIME NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_symbol_time (symbol, fetched_at),
INDEX idx_type_time (asset_type, fetched_at)
) ENGINE=InnoDB;
"""
with conn.cursor() as cur:
cur.execute(ddl)
conn.commit()
logger.info("Ensured asset_prices table exists.")
def insert_prices(conn, records: list[dict]):
"""
Insert a batch of price records into the database.
Each record dict: {asset_name, symbol, asset_type, price_usd, fetched_at}
"""
if not records:
return
sql = """
INSERT INTO asset_prices (asset_name, symbol, asset_type, price_usd, fetched_at)
VALUES (%(asset_name)s, %(symbol)s, %(asset_type)s, %(price_usd)s, %(fetched_at)s)
"""
with conn.cursor() as cur:
cur.executemany(sql, records)
conn.commit()
logger.info("Inserted %d price record(s) into database.", len(records))
# ---------------------------------------------------------------------------
# API fetch functions
# ---------------------------------------------------------------------------
def fetch_crypto_prices() -> list[dict]:
"""
Fetch crypto prices from CoinGecko (free, no API key).
Returns a list of record dicts ready for DB insertion.
"""
ids = ",".join(CRYPTO_ASSETS.keys())
params = {
"ids": ids,
"vs_currencies": "usd",
}
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
response = requests.get(
COINGECKO_URL,
params=params,
timeout=REQUEST_TIMEOUT,
headers={"Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
records = []
for cg_id, (name, symbol) in CRYPTO_ASSETS.items():
if cg_id in data and "usd" in data[cg_id]:
price = Decimal(str(data[cg_id]["usd"]))
records.append({
"asset_name": name,
"symbol": symbol,
"asset_type": "crypto",
"price_usd": price,
"fetched_at": now,
})
logger.info(" %s (%s): $%s", name, symbol, price)
else:
logger.warning(" Missing data for %s in CoinGecko response.", cg_id)
return records
def fetch_metal_prices() -> list[dict]:
"""
Fetch precious metal prices from Metals.dev (free tier, API key required).
Returns a list of record dicts ready for DB insertion.
"""
if not METALS_API_KEY:
logger.warning("METALS_API_KEY not set — skipping precious metals fetch. "
"Sign up free at https://metals.dev")
return []
params = {
"api_key": METALS_API_KEY,
"currency": "USD",
"unit": "toz", # troy ounce
}
now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
response = requests.get(
METALS_URL,
params=params,
timeout=REQUEST_TIMEOUT,
headers={"Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
# metals.dev returns: {"metals": {"gold": 2345.67, "silver": 28.12, ...}}
metals_data = data.get("metals", {})
records = []
for metal_key, (name, symbol) in METAL_ASSETS.items():
if metal_key in metals_data:
price = Decimal(str(metals_data[metal_key]))
records.append({
"asset_name": name,
"symbol": symbol,
"asset_type": "metal",
"price_usd": price,
"fetched_at": now,
})
logger.info(" %s (%s): $%s / troy oz", name, symbol, price)
else:
logger.warning(" Missing data for %s in metals.dev response.", metal_key)
return records
# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------
def run_once(conn) -> int:
"""
Perform a single fetch-and-store cycle.
Returns the number of records inserted.
"""
all_records = []
# --- Fetch crypto prices ---
logger.info("Fetching crypto prices from CoinGecko...")
try:
crypto_records = fetch_crypto_prices()
all_records.extend(crypto_records)
except requests.exceptions.Timeout:
logger.error("CoinGecko request timed out.")
except requests.exceptions.HTTPError as e:
# Handle rate limiting specifically
if e.response is not None and e.response.status_code == 429:
logger.warning("CoinGecko rate limit hit (429). Will retry next cycle.")
else:
logger.error("CoinGecko HTTP error: %s", e)
except requests.exceptions.ConnectionError:
logger.error("CoinGecko connection failed — check your network.")
except (ValueError, KeyError) as e:
logger.error("Failed to parse CoinGecko response: %s", e)
# --- Fetch metal prices ---
logger.info("Fetching precious metal prices from Metals.dev...")
try:
metal_records = fetch_metal_prices()
all_records.extend(metal_records)
except requests.exceptions.Timeout:
logger.error("Metals.dev request timed out.")
except requests.exceptions.HTTPError as e:
if e.response is not None and e.response.status_code == 429:
logger.warning("Metals.dev rate limit hit (429). Will retry next cycle.")
elif e.response is not None and e.response.status_code == 403:
logger.error("Metals.dev returned 403 — check your API key.")
else:
logger.error("Metals.dev HTTP error: %s", e)
except requests.exceptions.ConnectionError:
logger.error("Metals.dev connection failed — check your network.")
except (ValueError, KeyError) as e:
logger.error("Failed to parse Metals.dev response: %s", e)
# --- Store in DB ---
if all_records:
try:
insert_prices(conn, all_records)
except MySQLError as e:
logger.error("MySQL insert failed: %s", e)
# Attempt to reconnect on the next cycle
raise
return len(all_records)
def main():
"""Entry point — connects to MySQL and runs the polling loop forever."""
logger.info("=" * 60)
logger.info("Price Tracker starting up...")
logger.info("Polling interval: %d seconds", POLL_INTERVAL)
logger.info("=" * 60)
# Initial DB connection
try:
conn = get_db_connection()
ensure_table_exists(conn)
logger.info("Connected to MySQL at %s:%s/%s",
DB_CONFIG["host"], DB_CONFIG["port"], DB_CONFIG["database"])
except MySQLError as e:
logger.critical("Cannot connect to MySQL: %s", e)
logger.critical("Exiting. Fix your database config and retry.")
sys.exit(1)
cycle = 0
while True:
cycle += 1
logger.info("--- Cycle %d ---", cycle)
try:
# Ensure connection is still alive; reconnect if needed
if not conn.is_connected():
logger.warning("MySQL connection lost — reconnecting...")
conn = get_db_connection()
logger.info("Reconnected to MySQL.")
count = run_once(conn)
logger.info("Cycle %d complete: %d records stored.", cycle, count)
except MySQLError as e:
logger.error("Database error in cycle %d: %s", cycle, e)
# Try to get a fresh connection next cycle
try:
conn.close()
except Exception:
pass
try:
conn = get_db_connection()
logger.info("Re-established MySQL connection.")
except MySQLError:
logger.error("Could not reconnect to MySQL. Will retry next cycle.")
except Exception as e:
# Catch-all so the loop never dies
logger.exception("Unexpected error in cycle %d: %s", cycle, e)
# Sleep until next cycle
logger.info("Sleeping %d seconds...\n", POLL_INTERVAL)
try:
time.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
logger.info("KeyboardInterrupt received — shutting down gracefully.")
try:
conn.close()
except Exception:
pass
sys.exit(0)
if __name__ == "__main__":
main()
How It Works — Architecture Overview
┌─────────────┐ every 60s ┌──────────────────┐
│ main loop │──────────────▶│ run_once() │
└─────────────┘ │ │
│ ┌─────────────┐ │ GET /simple/price
│ │fetch_crypto │─┼────▶ CoinGecko (free)
│ └─────────────┘ │
│ ┌─────────────┐ │ GET /v1/latest
│ │fetch_metals │─┼────▶ Metals.dev (free tier)
│ └─────────────┘ │
│ │ │
│ insert_prices()│
└─────────┼─────────┘
│
▼
┌──────────────────┐
│ MySQL │
│ asset_prices │
└──────────────────┘
Sample Output
2025-01-15 14:30:00 [INFO] --- Cycle 1 ---
2025-01-15 14:30:00 [INFO] Fetching crypto prices from CoinGecko...
2025-01-15 14:30:01 [INFO] Bitcoin (BTC): $99432.00000000
2025-01-15 14:30:01 [INFO] Ethereum (ETH): $3285.42000000
2025-01-15 14:30:01 [INFO] BNB (BNB): $695.30000000
2025-01-15 14:30:01 [INFO] Solana (SOL): $185.67000000
2025-01-15 14:30:01 [INFO] XRP (XRP): $2.34000000
2025-01-15 14:30:01 [INFO] Fetching precious metal prices from Metals.dev...
2025-01-15 14:30:01 [INFO] Gold (XAU): $2678.50000000 / troy oz
2025-01-15 14:30:01 [INFO] Silver (XAG): $30.45000000 / troy oz
2025-01-15 14:30:01 [INFO] Platinum (XPT): $952.00000000 / troy oz
2025-01-15 14:30:01 [INFO] Palladium (XPD): $978.25000000 / troy oz
2025-01-15 14:30:01 [INFO] Inserted 9 price record(s) into database.
2025-01-15 14:30:01 [INFO] Cycle 1 complete: 9 records stored.
2025-01-15 14:30:01 [INFO] Sleeping 60 seconds...
Query Examples
-- Latest price for each asset
SELECT asset_name, symbol, price_usd, fetched_at
FROM asset_prices p1
INNER JOIN (
SELECT symbol, MAX(fetched_at) AS max_ts
FROM asset_prices
GROUP BY symbol
) p2 ON p1.symbol = p2.symbol AND p1.fetched_at = p2.max_ts;
-- Gold price history for the last 24 hours
SELECT price_usd, fetched_at
FROM asset_prices
WHERE symbol = 'XAU'
AND fetched_at >= NOW() - INTERVAL 24 HOUR
ORDER BY fetched_at;
-- Average Bitcoin price per hour today
SELECT DATE_FORMAT(fetched_at, '%Y-%m-%d %H:00') AS hour,
AVG(price_usd) AS avg_price,
MIN(price_usd) AS low,
MAX(price_usd) AS high
FROM asset_prices
WHERE symbol = 'BTC' AND DATE(fetched_at) = CURDATE()
GROUP BY hour
ORDER BY hour;
Key Design Decisions
| Decision | Rationale |
|---|---|
Decimal for prices | Avoids floating-point rounding errors inherent to float |
DECIMAL(18,8) in MySQL | Handles BTC at 0.50 with equal precision |
| Separate fetch functions | Failure in one API doesn't block the other |
| Reconnection logic | Long-running scripts commonly lose DB connections (MySQL wait_timeout) |
executemany batch insert | Single round-trip for all 9 records per cycle |
| UTC timestamps | Avoids timezone ambiguity — convert to local time at query time |
Rate Limit Budget
| API | Free Limit | Our Usage | Headroom |
|---|---|---|---|
| CoinGecko | ~10-30 req/min | 1 req/min | Plenty |
| Metals.dev | 50 req/day | 1,440 req/day at 1/min | Over limit! |
⚠️ Important: At 1 request/minute, you'll exceed Metals.dev's 50/day free tier. Solutions:
- Poll metals less frequently — change the metals fetch to every 30 minutes (metals don't move as fast as crypto). Add a simple
cycle % 30 == 0check.- Use GoldAPI.io instead — 300 requests/month free (works for ~every-2-hour polling).
- Upgrade to Metals.dev paid tier ($9/mo for 5,000 req/day).
To poll metals every 30 minutes instead, change run_once() to:
# Only fetch metals every 30 cycles (30 minutes)
if cycle % 30 == 1:
logger.info("Fetching precious metal prices from Metals.dev...")
# ... fetch metals ...
else:
logger.info("Skipping metals fetch (next in %d min).", 30 - (cycle % 30))
Comments