Python script to fetch precious metal & top crypto prices every minute and store in MySQL

Write a Python script that continuously fetches the following prices every 60 seconds and stores them in a MySQL database: **Precious Metals:** Gold, Silver, Platinum, Palladium **Crypto:** Bitcoin, Ethereum, BNB, Solana, XRP (current top 5 by market cap) Requirements: - Use a free, publicly available API (no paid API key required, or at minimum a free tier with generous limits). Clearly state which API(s) you're using and any signup steps needed. - Fetch all prices in USD. - Store results in a MySQL database with appropriate table schema(s). Include the SQL to create the table(s). - Each record should include the asset name/symbol, price, and timestamp. - The script should run indefinitely, polling every 60 seconds. - Handle common errors gracefully (API timeouts, connection failures, rate limits) — log errors but don't crash. - Use clear, well-structured code with comments. - List all required Python packages (e.g., `requests`, `mysql-connector-python`, etc.) and how to install them.

0
·Anthropic·74.8s·5:23 AM·#1

Precious Metals & Crypto Price Tracker

APIs Used

DataAPISignup Required?
CryptoCoinGecko API v3 (free, no key needed)No — 10-30 calls/min free
Precious MetalsMetals.dev API (free tier, key required)Yes — free signup, 50 req/day on free tier

Why these? CoinGecko's /simple/price endpoint is fully open. For metals, truly free no-key APIs are essentially extinct in 2024-2025. Metals.dev offers the most generous free tier. Alternative: GoldAPI.io (free tier, 300 req/month). I provide a fallback architecture so you can swap providers easily.


Required Packages

pip install requests mysql-connector-python python-dotenv
PackagePurpose
requestsHTTP calls to APIs
mysql-connector-pythonMySQL database connector
python-dotenvLoad DB credentials from .env file

MySQL Table Schema

-- Run this in MySQL before starting the script

CREATE DATABASE IF NOT EXISTS price_tracker
  CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

USE price_tracker;

CREATE TABLE IF NOT EXISTS asset_prices (
    id          BIGINT       AUTO_INCREMENT PRIMARY KEY,
    asset_name  VARCHAR(50)  NOT NULL COMMENT 'Human-readable name (e.g. Gold, Bitcoin)',
    symbol      VARCHAR(10)  NOT NULL COMMENT 'Ticker symbol (e.g. XAU, BTC)',
    asset_type  ENUM('metal', 'crypto') NOT NULL,
    price_usd   DECIMAL(18, 8) NOT NULL COMMENT 'Price in USD',
    fetched_at  DATETIME     NOT NULL COMMENT 'Timestamp when price was fetched',
    created_at  DATETIME     DEFAULT CURRENT_TIMESTAMP,

    INDEX idx_symbol_time (symbol, fetched_at),
    INDEX idx_type_time   (asset_type, fetched_at)
) ENGINE=InnoDB;

.env File

Create a .env file in the same directory as the script:

MYSQL_HOST=localhost
MYSQL_PORT=3306
MYSQL_USER=root
MYSQL_PASSWORD=your_password_here
MYSQL_DATABASE=price_tracker

# Sign up free at https://metals.dev — paste your API key here
METALS_API_KEY=your_metals_dev_api_key_here

The Python Script

#!/usr/bin/env python3
"""
price_tracker.py
~~~~~~~~~~~~~~~~
Continuously fetches precious metal & top crypto prices every 60 seconds
and stores them in a MySQL database.

APIs:
  - Crypto:  CoinGecko (free, no key)
  - Metals:  Metals.dev (free tier, key required — sign up at https://metals.dev)

Usage:
  1. pip install requests mysql-connector-python python-dotenv
  2. Create MySQL database using the schema in the README
  3. Fill in .env with your credentials
  4. python price_tracker.py
"""

import os
import sys
import time
import logging
from datetime import datetime, timezone
from decimal import Decimal

import requests
import mysql.connector
from mysql.connector import Error as MySQLError
from dotenv import load_dotenv

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

load_dotenv()  # Load .env file

# Polling interval in seconds
POLL_INTERVAL = 60

# MySQL connection parameters
DB_CONFIG = {
    "host":     os.getenv("MYSQL_HOST", "localhost"),
    "port":     int(os.getenv("MYSQL_PORT", 3306)),
    "user":     os.getenv("MYSQL_USER", "root"),
    "password": os.getenv("MYSQL_PASSWORD", ""),
    "database": os.getenv("MYSQL_DATABASE", "price_tracker"),
}

# Metals.dev API key (free tier: 50 requests/day)
METALS_API_KEY = os.getenv("METALS_API_KEY", "")

# CoinGecko endpoint — no key needed
COINGECKO_URL = "https://api.coingecko.com/api/v3/simple/price"

# Metals.dev endpoint
METALS_URL = "https://api.metals.dev/v1/latest"

# Mapping: CoinGecko IDs → (display_name, symbol)
CRYPTO_ASSETS = {
    "bitcoin":  ("Bitcoin",  "BTC"),
    "ethereum": ("Ethereum", "ETH"),
    "binancecoin": ("BNB",   "BNB"),
    "solana":   ("Solana",   "SOL"),
    "ripple":   ("XRP",      "XRP"),
}

# Mapping: metals.dev keys → (display_name, symbol)
# metals.dev returns keys like "gold", "silver", "platinum", "palladium"
METAL_ASSETS = {
    "gold":      ("Gold",      "XAU"),
    "silver":    ("Silver",    "XAG"),
    "platinum":  ("Platinum",  "XPT"),
    "palladium": ("Palladium", "XPD"),
}

# Request timeout (seconds)
REQUEST_TIMEOUT = 15

# ---------------------------------------------------------------------------
# Logging setup
# ---------------------------------------------------------------------------

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler("price_tracker.log", encoding="utf-8"),
    ],
)
logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# Database helpers
# ---------------------------------------------------------------------------

def get_db_connection():
    """Create and return a new MySQL connection."""
    return mysql.connector.connect(**DB_CONFIG)


def ensure_table_exists(conn):
    """Create the asset_prices table if it doesn't exist."""
    ddl = """
    CREATE TABLE IF NOT EXISTS asset_prices (
        id          BIGINT       AUTO_INCREMENT PRIMARY KEY,
        asset_name  VARCHAR(50)  NOT NULL,
        symbol      VARCHAR(10)  NOT NULL,
        asset_type  ENUM('metal', 'crypto') NOT NULL,
        price_usd   DECIMAL(18, 8) NOT NULL,
        fetched_at  DATETIME     NOT NULL,
        created_at  DATETIME     DEFAULT CURRENT_TIMESTAMP,
        INDEX idx_symbol_time (symbol, fetched_at),
        INDEX idx_type_time   (asset_type, fetched_at)
    ) ENGINE=InnoDB;
    """
    with conn.cursor() as cur:
        cur.execute(ddl)
    conn.commit()
    logger.info("Ensured asset_prices table exists.")


def insert_prices(conn, records: list[dict]):
    """
    Insert a batch of price records into the database.

    Each record dict: {asset_name, symbol, asset_type, price_usd, fetched_at}
    """
    if not records:
        return

    sql = """
        INSERT INTO asset_prices (asset_name, symbol, asset_type, price_usd, fetched_at)
        VALUES (%(asset_name)s, %(symbol)s, %(asset_type)s, %(price_usd)s, %(fetched_at)s)
    """
    with conn.cursor() as cur:
        cur.executemany(sql, records)
    conn.commit()
    logger.info("Inserted %d price record(s) into database.", len(records))

# ---------------------------------------------------------------------------
# API fetch functions
# ---------------------------------------------------------------------------

def fetch_crypto_prices() -> list[dict]:
    """
    Fetch crypto prices from CoinGecko (free, no API key).

    Returns a list of record dicts ready for DB insertion.
    """
    ids = ",".join(CRYPTO_ASSETS.keys())
    params = {
        "ids": ids,
        "vs_currencies": "usd",
    }
    now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

    response = requests.get(
        COINGECKO_URL,
        params=params,
        timeout=REQUEST_TIMEOUT,
        headers={"Accept": "application/json"},
    )
    response.raise_for_status()
    data = response.json()

    records = []
    for cg_id, (name, symbol) in CRYPTO_ASSETS.items():
        if cg_id in data and "usd" in data[cg_id]:
            price = Decimal(str(data[cg_id]["usd"]))
            records.append({
                "asset_name": name,
                "symbol":     symbol,
                "asset_type": "crypto",
                "price_usd":  price,
                "fetched_at": now,
            })
            logger.info("  %s (%s): $%s", name, symbol, price)
        else:
            logger.warning("  Missing data for %s in CoinGecko response.", cg_id)

    return records


def fetch_metal_prices() -> list[dict]:
    """
    Fetch precious metal prices from Metals.dev (free tier, API key required).

    Returns a list of record dicts ready for DB insertion.
    """
    if not METALS_API_KEY:
        logger.warning("METALS_API_KEY not set — skipping precious metals fetch. "
                       "Sign up free at https://metals.dev")
        return []

    params = {
        "api_key":  METALS_API_KEY,
        "currency": "USD",
        "unit":     "toz",  # troy ounce
    }
    now = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")

    response = requests.get(
        METALS_URL,
        params=params,
        timeout=REQUEST_TIMEOUT,
        headers={"Accept": "application/json"},
    )
    response.raise_for_status()
    data = response.json()

    # metals.dev returns: {"metals": {"gold": 2345.67, "silver": 28.12, ...}}
    metals_data = data.get("metals", {})

    records = []
    for metal_key, (name, symbol) in METAL_ASSETS.items():
        if metal_key in metals_data:
            price = Decimal(str(metals_data[metal_key]))
            records.append({
                "asset_name": name,
                "symbol":     symbol,
                "asset_type": "metal",
                "price_usd":  price,
                "fetched_at": now,
            })
            logger.info("  %s (%s): $%s / troy oz", name, symbol, price)
        else:
            logger.warning("  Missing data for %s in metals.dev response.", metal_key)

    return records

# ---------------------------------------------------------------------------
# Main loop
# ---------------------------------------------------------------------------

def run_once(conn) -> int:
    """
    Perform a single fetch-and-store cycle.
    Returns the number of records inserted.
    """
    all_records = []

    # --- Fetch crypto prices ---
    logger.info("Fetching crypto prices from CoinGecko...")
    try:
        crypto_records = fetch_crypto_prices()
        all_records.extend(crypto_records)
    except requests.exceptions.Timeout:
        logger.error("CoinGecko request timed out.")
    except requests.exceptions.HTTPError as e:
        # Handle rate limiting specifically
        if e.response is not None and e.response.status_code == 429:
            logger.warning("CoinGecko rate limit hit (429). Will retry next cycle.")
        else:
            logger.error("CoinGecko HTTP error: %s", e)
    except requests.exceptions.ConnectionError:
        logger.error("CoinGecko connection failed — check your network.")
    except (ValueError, KeyError) as e:
        logger.error("Failed to parse CoinGecko response: %s", e)

    # --- Fetch metal prices ---
    logger.info("Fetching precious metal prices from Metals.dev...")
    try:
        metal_records = fetch_metal_prices()
        all_records.extend(metal_records)
    except requests.exceptions.Timeout:
        logger.error("Metals.dev request timed out.")
    except requests.exceptions.HTTPError as e:
        if e.response is not None and e.response.status_code == 429:
            logger.warning("Metals.dev rate limit hit (429). Will retry next cycle.")
        elif e.response is not None and e.response.status_code == 403:
            logger.error("Metals.dev returned 403 — check your API key.")
        else:
            logger.error("Metals.dev HTTP error: %s", e)
    except requests.exceptions.ConnectionError:
        logger.error("Metals.dev connection failed — check your network.")
    except (ValueError, KeyError) as e:
        logger.error("Failed to parse Metals.dev response: %s", e)

    # --- Store in DB ---
    if all_records:
        try:
            insert_prices(conn, all_records)
        except MySQLError as e:
            logger.error("MySQL insert failed: %s", e)
            # Attempt to reconnect on the next cycle
            raise

    return len(all_records)


def main():
    """Entry point — connects to MySQL and runs the polling loop forever."""
    logger.info("=" * 60)
    logger.info("Price Tracker starting up...")
    logger.info("Polling interval: %d seconds", POLL_INTERVAL)
    logger.info("=" * 60)

    # Initial DB connection
    try:
        conn = get_db_connection()
        ensure_table_exists(conn)
        logger.info("Connected to MySQL at %s:%s/%s",
                     DB_CONFIG["host"], DB_CONFIG["port"], DB_CONFIG["database"])
    except MySQLError as e:
        logger.critical("Cannot connect to MySQL: %s", e)
        logger.critical("Exiting. Fix your database config and retry.")
        sys.exit(1)

    cycle = 0
    while True:
        cycle += 1
        logger.info("--- Cycle %d ---", cycle)

        try:
            # Ensure connection is still alive; reconnect if needed
            if not conn.is_connected():
                logger.warning("MySQL connection lost — reconnecting...")
                conn = get_db_connection()
                logger.info("Reconnected to MySQL.")

            count = run_once(conn)
            logger.info("Cycle %d complete: %d records stored.", cycle, count)

        except MySQLError as e:
            logger.error("Database error in cycle %d: %s", cycle, e)
            # Try to get a fresh connection next cycle
            try:
                conn.close()
            except Exception:
                pass
            try:
                conn = get_db_connection()
                logger.info("Re-established MySQL connection.")
            except MySQLError:
                logger.error("Could not reconnect to MySQL. Will retry next cycle.")

        except Exception as e:
            # Catch-all so the loop never dies
            logger.exception("Unexpected error in cycle %d: %s", cycle, e)

        # Sleep until next cycle
        logger.info("Sleeping %d seconds...\n", POLL_INTERVAL)
        try:
            time.sleep(POLL_INTERVAL)
        except KeyboardInterrupt:
            logger.info("KeyboardInterrupt received — shutting down gracefully.")
            try:
                conn.close()
            except Exception:
                pass
            sys.exit(0)


if __name__ == "__main__":
    main()

How It Works — Architecture Overview

┌─────────────┐  every 60s   ┌──────────────────┐
│  main loop  │──────────────▶│   run_once()     │
└─────────────┘               │                  │
                              │  ┌─────────────┐ │     GET /simple/price
                              │  │fetch_crypto │─┼────▶ CoinGecko (free)
                              │  └─────────────┘ │
                              │  ┌─────────────┐ │     GET /v1/latest
                              │  │fetch_metals │─┼────▶ Metals.dev (free tier)
                              │  └─────────────┘ │
                              │         │         │
                              │    insert_prices()│
                              └─────────┼─────────┘
                                        │
                                        ▼
                              ┌──────────────────┐
                              │  MySQL           │
                              │  asset_prices    │
                              └──────────────────┘

Sample Output

2025-01-15 14:30:00 [INFO] --- Cycle 1 ---
2025-01-15 14:30:00 [INFO] Fetching crypto prices from CoinGecko...
2025-01-15 14:30:01 [INFO]   Bitcoin (BTC): $99432.00000000
2025-01-15 14:30:01 [INFO]   Ethereum (ETH): $3285.42000000
2025-01-15 14:30:01 [INFO]   BNB (BNB): $695.30000000
2025-01-15 14:30:01 [INFO]   Solana (SOL): $185.67000000
2025-01-15 14:30:01 [INFO]   XRP (XRP): $2.34000000
2025-01-15 14:30:01 [INFO] Fetching precious metal prices from Metals.dev...
2025-01-15 14:30:01 [INFO]   Gold (XAU): $2678.50000000 / troy oz
2025-01-15 14:30:01 [INFO]   Silver (XAG): $30.45000000 / troy oz
2025-01-15 14:30:01 [INFO]   Platinum (XPT): $952.00000000 / troy oz
2025-01-15 14:30:01 [INFO]   Palladium (XPD): $978.25000000 / troy oz
2025-01-15 14:30:01 [INFO] Inserted 9 price record(s) into database.
2025-01-15 14:30:01 [INFO] Cycle 1 complete: 9 records stored.
2025-01-15 14:30:01 [INFO] Sleeping 60 seconds...

Query Examples

-- Latest price for each asset
SELECT asset_name, symbol, price_usd, fetched_at
FROM asset_prices p1
INNER JOIN (
    SELECT symbol, MAX(fetched_at) AS max_ts
    FROM asset_prices
    GROUP BY symbol
) p2 ON p1.symbol = p2.symbol AND p1.fetched_at = p2.max_ts;

-- Gold price history for the last 24 hours
SELECT price_usd, fetched_at
FROM asset_prices
WHERE symbol = 'XAU'
  AND fetched_at >= NOW() - INTERVAL 24 HOUR
ORDER BY fetched_at;

-- Average Bitcoin price per hour today
SELECT DATE_FORMAT(fetched_at, '%Y-%m-%d %H:00') AS hour,
       AVG(price_usd) AS avg_price,
       MIN(price_usd) AS low,
       MAX(price_usd) AS high
FROM asset_prices
WHERE symbol = 'BTC' AND DATE(fetched_at) = CURDATE()
GROUP BY hour
ORDER BY hour;

Key Design Decisions

DecisionRationale
Decimal for pricesAvoids floating-point rounding errors inherent to float
DECIMAL(18,8) in MySQLHandles BTC at 100k+andXRPat100k+ and XRP at 0.50 with equal precision
Separate fetch functionsFailure in one API doesn't block the other
Reconnection logicLong-running scripts commonly lose DB connections (MySQL wait_timeout)
executemany batch insertSingle round-trip for all 9 records per cycle
UTC timestampsAvoids timezone ambiguity — convert to local time at query time

Rate Limit Budget

APIFree LimitOur UsageHeadroom
CoinGecko~10-30 req/min1 req/minPlenty
Metals.dev50 req/day1,440 req/day at 1/minOver limit!

⚠️ Important: At 1 request/minute, you'll exceed Metals.dev's 50/day free tier. Solutions:

  1. Poll metals less frequently — change the metals fetch to every 30 minutes (metals don't move as fast as crypto). Add a simple cycle % 30 == 0 check.
  2. Use GoldAPI.io instead — 300 requests/month free (works for ~every-2-hour polling).
  3. Upgrade to Metals.dev paid tier ($9/mo for 5,000 req/day).

To poll metals every 30 minutes instead, change run_once() to:

# Only fetch metals every 30 cycles (30 minutes)
if cycle % 30 == 1:
    logger.info("Fetching precious metal prices from Metals.dev...")
    # ... fetch metals ...
else:
    logger.info("Skipping metals fetch (next in %d min).", 30 - (cycle % 30))

Comments