Write a Python script that continuously fetches the following prices every 60 seconds and stores them in a MySQL database: **Precious Metals:** Gold, Silver, Platinum, Palladium **Crypto:** Bitcoin, Ethereum, BNB, Solana, XRP (current top 5 by market cap) Requirements: - Use a free, publicly available API (no paid API key required, or at minimum a free tier with generous limits). Clearly state which API(s) you're using and any signup steps needed. - Fetch all prices in USD. - Store results in a MySQL database with appropriate table schema(s). Include the SQL to create the table(s). - Each record should include the asset name/symbol, price, and timestamp. - The script should run indefinitely, polling every 60 seconds. - Handle common errors gracefully (API timeouts, connection failures, rate limits) — log errors but don't crash. - Use clear, well-structured code with comments. - List all required Python packages (e.g., `requests`, `mysql-connector-python`, etc.) and how to install them.
APIs used (free, no API key required)
-
Crypto prices (USD): CoinGecko
- Endpoint:
https://api.coingecko.com/api/v3/simple/price - No signup/API key required (public/free).
- Docs: https://www.coingecko.com/en/api/documentation
- Endpoint:
-
Precious metals (USD): Metals.live (public JSON)
- Endpoint:
https://api.metals.live/v1/spot - No signup/API key required.
- Returns spot prices for metals (commonly USD/oz).
- Note: This is a free public endpoint; availability/format can change. If you need a more “official” SLA, you’d typically use a paid provider.
- Endpoint:
MySQL schema
This design stores one row per asset per poll, with a timestamp. It also prevents duplicates for the same asset at the exact same timestamp.
CREATE DATABASE IF NOT EXISTS market_data;
USE market_data;
CREATE TABLE IF NOT EXISTS prices (
id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
asset_type ENUM('metal','crypto') NOT NULL,
symbol VARCHAR(16) NOT NULL,
name VARCHAR(64) NOT NULL,
price_usd DECIMAL(20,8) NOT NULL,
ts DATETIME(6) NOT NULL,
source VARCHAR(64) NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uq_asset_ts (symbol, ts),
KEY idx_ts (ts),
KEY idx_type_symbol (asset_type, symbol)
);
Required Python packages
pip install requests mysql-connector-python
(Uses only standard library + the two packages above.)
Python script (runs indefinitely, polls every 60 seconds)
Save as price_collector.py:
import time
import logging
from datetime import datetime, timezone
import requests
import mysql.connector
from mysql.connector import Error
# ----------------------------
# Configuration
# ----------------------------
MYSQL_CONFIG = {
"host": "127.0.0.1",
"port": 3306,
"user": "YOUR_USER",
"password": "YOUR_PASSWORD",
"database": "market_data",
"autocommit": True,
}
POLL_SECONDS = 60
# Crypto we want (CoinGecko IDs)
COINGECKO_IDS = {
"BTC": ("bitcoin", "Bitcoin"),
"ETH": ("ethereum", "Ethereum"),
"BNB": ("binancecoin", "BNB"),
"SOL": ("solana", "Solana"),
"XRP": ("ripple", "XRP"),
}
# Metals.live returns symbols like: gold, silver, platinum, palladium
METALS = {
"XAU": ("gold", "Gold"),
"XAG": ("silver", "Silver"),
"XPT": ("platinum", "Platinum"),
"XPD": ("palladium", "Palladium"),
}
COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"
METALS_LIVE_URL = "https://api.metals.live/v1/spot"
HTTP_TIMEOUT = 15 # seconds
# ----------------------------
# Logging
# ----------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# ----------------------------
# DB helpers
# ----------------------------
def get_db_connection():
"""Create a new DB connection (reconnect on failure)."""
return mysql.connector.connect(**MYSQL_CONFIG)
def insert_prices(conn, rows):
"""
Insert multiple price rows.
rows: list of tuples (asset_type, symbol, name, price_usd, ts, source)
"""
sql = """
INSERT INTO prices (asset_type, symbol, name, price_usd, ts, source)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
price_usd = VALUES(price_usd),
source = VALUES(source)
"""
cur = conn.cursor()
cur.executemany(sql, rows)
cur.close()
# ----------------------------
# Fetchers
# ----------------------------
def fetch_crypto_prices():
"""
Fetch crypto prices from CoinGecko in USD.
Returns list of dict rows: {asset_type, symbol, name, price_usd, source}
"""
ids = ",".join(v[0] for v in COINGECKO_IDS.values())
params = {"ids": ids, "vs_currencies": "usd"}
r = requests.get(COINGECKO_URL, params=params, timeout=HTTP_TIMEOUT)
# Handle rate limit / errors
if r.status_code == 429:
raise RuntimeError("CoinGecko rate limited (HTTP 429).")
r.raise_for_status()
data = r.json()
out = []
for symbol, (cg_id, name) in COINGECKO_IDS.items():
price = data.get(cg_id, {}).get("usd")
if price is None:
logging.warning("Missing CoinGecko price for %s (%s)", symbol, cg_id)
continue
out.append(
{
"asset_type": "crypto",
"symbol": symbol,
"name": name,
"price_usd": float(price),
"source": "coingecko",
}
)
return out
def fetch_metal_prices():
"""
Fetch metal spot prices from metals.live.
Returns list of dict rows: {asset_type, symbol, name, price_usd, source}
metals.live format is typically a list like:
[["gold", 2034.12], ["silver", 23.45], ...]
"""
r = requests.get(METALS_LIVE_URL, timeout=HTTP_TIMEOUT)
if r.status_code == 429:
raise RuntimeError("metals.live rate limited (HTTP 429).")
r.raise_for_status()
data = r.json()
# Convert list of pairs -> dict
spot = {}
for item in data:
if isinstance(item, list) and len(item) >= 2:
spot[item[0]] = item[1]
out = []
for symbol, (key, name) in METALS.items():
price = spot.get(key)
if price is None:
logging.warning("Missing metals.live price for %s (%s)", symbol, key)
continue
out.append(
{
"asset_type": "metal",
"symbol": symbol,
"name": name,
"price_usd": float(price),
"source": "metals.live",
}
)
return out
# ----------------------------
# Main loop
# ----------------------------
def run_forever():
conn = None
backoff = 5 # seconds, grows on repeated failures (simple backoff)
max_backoff = 300
while True:
start = time.time()
ts = datetime.now(timezone.utc).replace(tzinfo=None) # store as UTC naive
try:
if conn is None or not conn.is_connected():
conn = get_db_connection()
logging.info("Connected to MySQL.")
rows = []
# Fetch from APIs
crypto = fetch_crypto_prices()
metals = fetch_metal_prices()
for item in crypto + metals:
rows.append(
(
item["asset_type"],
item["symbol"],
item["name"],
item["price_usd"],
ts,
item["source"],
)
)
if rows:
insert_prices(conn, rows)
logging.info("Inserted %d rows @ %s UTC", len(rows), ts.isoformat())
else:
logging.warning("No rows to insert this cycle.")
# Reset backoff after success
backoff = 5
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
logging.error("Network error: %s", e)
except requests.exceptions.HTTPError as e:
# Includes non-200 responses after raise_for_status()
logging.error("HTTP error: %s", e)
except (Error, RuntimeError) as e:
# MySQL errors or explicit rate-limit errors
logging.error("Runtime/DB error: %s", e)
# If DB connection is bad, drop it so we reconnect next loop
try:
if conn is not None:
conn.close()
except Exception:
pass
conn = None
except Exception as e:
# Catch-all so the script doesn't die
logging.exception("Unexpected error: %s", e)
# Sleep until next minute tick (accounting for processing time)
elapsed = time.time() - start
sleep_for = max(POLL_SECONDS - elapsed, 1)
# If we hit repeated failures, apply a small backoff (still "every ~60s" when healthy)
if backoff > 5:
sleep_for = max(sleep_for, backoff)
time.sleep(sleep_for)
# Increase backoff a bit if we keep failing (simple approach)
# (We don't know success/failure here; for simplicity, increase only if conn is None)
if conn is None:
backoff = min(int(backoff * 1.5), max_backoff)
if __name__ == "__main__":
run_forever()
Notes / tips
- Timestamps: stored as UTC (recommended). If you prefer MySQL to set timestamps, you can use
CURRENT_TIMESTAMP(6)in SQL, but then you’d store fewer client-side details. - “Top 5 by market cap”: this script uses the specific list you provided (BTC, ETH, BNB, SOL, XRP). If you want it to dynamically determine top 5 each run, say so and I’ll adjust to use CoinGecko’s markets endpoint.
- If you want one table per asset type (metals vs crypto) or additional columns (bid/ask, volume, etc.), ask and I’ll adapt the schema and inserts.
Comments