Python script to fetch precious metal & top crypto prices every minute and store in MySQL

Write a Python script that continuously fetches the following prices every 60 seconds and stores them in a MySQL database: **Precious Metals:** Gold, Silver, Platinum, Palladium **Crypto:** Bitcoin, Ethereum, BNB, Solana, XRP (current top 5 by market cap) Requirements: - Use a free, publicly available API (no paid API key required, or at minimum a free tier with generous limits). Clearly state which API(s) you're using and any signup steps needed. - Fetch all prices in USD. - Store results in a MySQL database with appropriate table schema(s). Include the SQL to create the table(s). - Each record should include the asset name/symbol, price, and timestamp. - The script should run indefinitely, polling every 60 seconds. - Handle common errors gracefully (API timeouts, connection failures, rate limits) — log errors but don't crash. - Use clear, well-structured code with comments. - List all required Python packages (e.g., `requests`, `mysql-connector-python`, etc.) and how to install them.

0
·OpenAI·28.4s·5:23 AM·#1

APIs used (free, no API key required)

  1. Crypto prices (USD): CoinGecko

  2. Precious metals (USD): Metals.live (public JSON)

    • Endpoint: https://api.metals.live/v1/spot
    • No signup/API key required.
    • Returns spot prices for metals (commonly USD/oz).
    • Note: This is a free public endpoint; availability/format can change. If you need a more “official” SLA, you’d typically use a paid provider.

MySQL schema

This design stores one row per asset per poll, with a timestamp. It also prevents duplicates for the same asset at the exact same timestamp.

CREATE DATABASE IF NOT EXISTS market_data;
USE market_data;

CREATE TABLE IF NOT EXISTS prices (
  id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
  asset_type ENUM('metal','crypto') NOT NULL,
  symbol VARCHAR(16) NOT NULL,
  name VARCHAR(64) NOT NULL,
  price_usd DECIMAL(20,8) NOT NULL,
  ts DATETIME(6) NOT NULL,
  source VARCHAR(64) NOT NULL,
  PRIMARY KEY (id),
  UNIQUE KEY uq_asset_ts (symbol, ts),
  KEY idx_ts (ts),
  KEY idx_type_symbol (asset_type, symbol)
);

Required Python packages

pip install requests mysql-connector-python

(Uses only standard library + the two packages above.)


Python script (runs indefinitely, polls every 60 seconds)

Save as price_collector.py:

import time
import logging
from datetime import datetime, timezone

import requests
import mysql.connector
from mysql.connector import Error

# ----------------------------
# Configuration
# ----------------------------
MYSQL_CONFIG = {
    "host": "127.0.0.1",
    "port": 3306,
    "user": "YOUR_USER",
    "password": "YOUR_PASSWORD",
    "database": "market_data",
    "autocommit": True,
}

POLL_SECONDS = 60

# Crypto we want (CoinGecko IDs)
COINGECKO_IDS = {
    "BTC": ("bitcoin", "Bitcoin"),
    "ETH": ("ethereum", "Ethereum"),
    "BNB": ("binancecoin", "BNB"),
    "SOL": ("solana", "Solana"),
    "XRP": ("ripple", "XRP"),
}

# Metals.live returns symbols like: gold, silver, platinum, palladium
METALS = {
    "XAU": ("gold", "Gold"),
    "XAG": ("silver", "Silver"),
    "XPT": ("platinum", "Platinum"),
    "XPD": ("palladium", "Palladium"),
}

COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"
METALS_LIVE_URL = "https://api.metals.live/v1/spot"

HTTP_TIMEOUT = 15  # seconds

# ----------------------------
# Logging
# ----------------------------
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
)

# ----------------------------
# DB helpers
# ----------------------------
def get_db_connection():
    """Create a new DB connection (reconnect on failure)."""
    return mysql.connector.connect(**MYSQL_CONFIG)

def insert_prices(conn, rows):
    """
    Insert multiple price rows.
    rows: list of tuples (asset_type, symbol, name, price_usd, ts, source)
    """
    sql = """
        INSERT INTO prices (asset_type, symbol, name, price_usd, ts, source)
        VALUES (%s, %s, %s, %s, %s, %s)
        ON DUPLICATE KEY UPDATE
            price_usd = VALUES(price_usd),
            source = VALUES(source)
    """
    cur = conn.cursor()
    cur.executemany(sql, rows)
    cur.close()

# ----------------------------
# Fetchers
# ----------------------------
def fetch_crypto_prices():
    """
    Fetch crypto prices from CoinGecko in USD.
    Returns list of dict rows: {asset_type, symbol, name, price_usd, source}
    """
    ids = ",".join(v[0] for v in COINGECKO_IDS.values())
    params = {"ids": ids, "vs_currencies": "usd"}

    r = requests.get(COINGECKO_URL, params=params, timeout=HTTP_TIMEOUT)
    # Handle rate limit / errors
    if r.status_code == 429:
        raise RuntimeError("CoinGecko rate limited (HTTP 429).")
    r.raise_for_status()

    data = r.json()
    out = []
    for symbol, (cg_id, name) in COINGECKO_IDS.items():
        price = data.get(cg_id, {}).get("usd")
        if price is None:
            logging.warning("Missing CoinGecko price for %s (%s)", symbol, cg_id)
            continue
        out.append(
            {
                "asset_type": "crypto",
                "symbol": symbol,
                "name": name,
                "price_usd": float(price),
                "source": "coingecko",
            }
        )
    return out

def fetch_metal_prices():
    """
    Fetch metal spot prices from metals.live.
    Returns list of dict rows: {asset_type, symbol, name, price_usd, source}

    metals.live format is typically a list like:
      [["gold", 2034.12], ["silver", 23.45], ...]
    """
    r = requests.get(METALS_LIVE_URL, timeout=HTTP_TIMEOUT)
    if r.status_code == 429:
        raise RuntimeError("metals.live rate limited (HTTP 429).")
    r.raise_for_status()

    data = r.json()
    # Convert list of pairs -> dict
    spot = {}
    for item in data:
        if isinstance(item, list) and len(item) >= 2:
            spot[item[0]] = item[1]

    out = []
    for symbol, (key, name) in METALS.items():
        price = spot.get(key)
        if price is None:
            logging.warning("Missing metals.live price for %s (%s)", symbol, key)
            continue
        out.append(
            {
                "asset_type": "metal",
                "symbol": symbol,
                "name": name,
                "price_usd": float(price),
                "source": "metals.live",
            }
        )
    return out

# ----------------------------
# Main loop
# ----------------------------
def run_forever():
    conn = None
    backoff = 5  # seconds, grows on repeated failures (simple backoff)
    max_backoff = 300

    while True:
        start = time.time()
        ts = datetime.now(timezone.utc).replace(tzinfo=None)  # store as UTC naive

        try:
            if conn is None or not conn.is_connected():
                conn = get_db_connection()
                logging.info("Connected to MySQL.")

            rows = []

            # Fetch from APIs
            crypto = fetch_crypto_prices()
            metals = fetch_metal_prices()

            for item in crypto + metals:
                rows.append(
                    (
                        item["asset_type"],
                        item["symbol"],
                        item["name"],
                        item["price_usd"],
                        ts,
                        item["source"],
                    )
                )

            if rows:
                insert_prices(conn, rows)
                logging.info("Inserted %d rows @ %s UTC", len(rows), ts.isoformat())
            else:
                logging.warning("No rows to insert this cycle.")

            # Reset backoff after success
            backoff = 5

        except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
            logging.error("Network error: %s", e)

        except requests.exceptions.HTTPError as e:
            # Includes non-200 responses after raise_for_status()
            logging.error("HTTP error: %s", e)

        except (Error, RuntimeError) as e:
            # MySQL errors or explicit rate-limit errors
            logging.error("Runtime/DB error: %s", e)
            # If DB connection is bad, drop it so we reconnect next loop
            try:
                if conn is not None:
                    conn.close()
            except Exception:
                pass
            conn = None

        except Exception as e:
            # Catch-all so the script doesn't die
            logging.exception("Unexpected error: %s", e)

        # Sleep until next minute tick (accounting for processing time)
        elapsed = time.time() - start
        sleep_for = max(POLL_SECONDS - elapsed, 1)

        # If we hit repeated failures, apply a small backoff (still "every ~60s" when healthy)
        if backoff > 5:
            sleep_for = max(sleep_for, backoff)

        time.sleep(sleep_for)

        # Increase backoff a bit if we keep failing (simple approach)
        # (We don't know success/failure here; for simplicity, increase only if conn is None)
        if conn is None:
            backoff = min(int(backoff * 1.5), max_backoff)

if __name__ == "__main__":
    run_forever()

Notes / tips

  • Timestamps: stored as UTC (recommended). If you prefer MySQL to set timestamps, you can use CURRENT_TIMESTAMP(6) in SQL, but then you’d store fewer client-side details.
  • “Top 5 by market cap”: this script uses the specific list you provided (BTC, ETH, BNB, SOL, XRP). If you want it to dynamically determine top 5 each run, say so and I’ll adjust to use CoinGecko’s markets endpoint.
  • If you want one table per asset type (metals vs crypto) or additional columns (bid/ask, volume, etc.), ask and I’ll adapt the schema and inserts.

Comments