Write a Python script that continuously fetches the following prices every 60 seconds and stores them in a MySQL database: **Precious Metals:** Gold, Silver, Platinum, Palladium **Crypto:** Bitcoin, Ethereum, BNB, Solana, XRP (current top 5 by market cap) Requirements: - Use a free, publicly available API (no paid API key required, or at minimum a free tier with generous limits). Clearly state which API(s) you're using and any signup steps needed. - Fetch all prices in USD. - Store results in a MySQL database with appropriate table schema(s). Include the SQL to create the table(s). - Each record should include the asset name/symbol, price, and timestamp. - The script should run indefinitely, polling every 60 seconds. - Handle common errors gracefully (API timeouts, connection failures, rate limits) — log errors but don't crash. - Use clear, well-structured code with comments. - List all required Python packages (e.g., `requests`, `mysql-connector-python`, etc.) and how to install them.
Python Price Tracker for Precious Metals & Cryptocurrencies
Overview
This script fetches precious metal and cryptocurrency prices every 60 seconds and stores them in MySQL. It uses two free APIs:
- Metals API: metals-api.com - Free tier (100 requests/month). Note: This is quite limited, so I'll also provide an alternative using goldapi.io which offers 100 requests/month for free.
- CoinGecko API: coingecko.com/api - No signup required, generous free tier (10-50 calls/minute)
Best Alternative (recommended): Use CoinGecko for crypto AND metals since they support precious metals too!
Required Python Packages
pip install requests mysql-connector-python python-dotenv
MySQL Database Schema
-- Create the database
CREATE DATABASE IF NOT EXISTS price_tracker;
USE price_tracker;
-- Create the prices table
CREATE TABLE IF NOT EXISTS asset_prices (
id INT AUTO_INCREMENT PRIMARY KEY,
asset_symbol VARCHAR(10) NOT NULL,
asset_name VARCHAR(50) NOT NULL,
asset_type ENUM('metal', 'crypto') NOT NULL,
price_usd DECIMAL(20, 8) NOT NULL,
timestamp DATETIME NOT NULL,
INDEX idx_symbol_timestamp (asset_symbol, timestamp),
INDEX idx_timestamp (timestamp)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Python Script
import requests
import mysql.connector
from mysql.connector import Error
import time
import logging
from datetime import datetime
from typing import Dict, List, Optional
import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('price_tracker.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Database configuration
DB_CONFIG = {
'host': os.getenv('DB_HOST', 'localhost'),
'user': os.getenv('DB_USER', 'root'),
'password': os.getenv('DB_PASSWORD', 'your_password'),
'database': os.getenv('DB_NAME', 'price_tracker')
}
# Asset configurations
CRYPTO_ASSETS = {
'bitcoin': 'BTC',
'ethereum': 'ETH',
'binancecoin': 'BNB',
'solana': 'SOL',
'ripple': 'XRP'
}
METAL_ASSETS = {
'gold': 'XAU',
'silver': 'XAG',
'platinum': 'XPT',
'palladium': 'XPD'
}
# API endpoints
COINGECKO_BASE_URL = 'https://api.coingecko.com/api/v3'
class PriceTracker:
"""Main class for tracking and storing asset prices"""
def __init__(self):
self.connection = None
self.connect_to_database()
def connect_to_database(self) -> None:
"""Establish connection to MySQL database"""
try:
self.connection = mysql.connector.connect(**DB_CONFIG)
if self.connection.is_connected():
logger.info("Successfully connected to MySQL database")
except Error as e:
logger.error(f"Error connecting to MySQL: {e}")
raise
def ensure_connection(self) -> None:
"""Ensure database connection is alive, reconnect if needed"""
try:
if self.connection is None or not self.connection.is_connected():
logger.warning("Database connection lost, reconnecting...")
self.connect_to_database()
except Error as e:
logger.error(f"Error ensuring connection: {e}")
def fetch_crypto_prices(self) -> List[Dict]:
"""Fetch cryptocurrency prices from CoinGecko API"""
try:
# CoinGecko API endpoint for multiple coins
ids = ','.join(CRYPTO_ASSETS.keys())
url = f'{COINGECKO_BASE_URL}/simple/price'
params = {
'ids': ids,
'vs_currencies': 'usd'
}
response = requests.get(url, params=params, timeout=10)
response.raise_for_status()
data = response.json()
prices = []
timestamp = datetime.now()
for coin_id, symbol in CRYPTO_ASSETS.items():
if coin_id in data and 'usd' in data[coin_id]:
prices.append({
'symbol': symbol,
'name': coin_id.capitalize(),
'type': 'crypto',
'price': data[coin_id]['usd'],
'timestamp': timestamp
})
logger.info(f"Successfully fetched {len(prices)} crypto prices")
return prices
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching crypto prices: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching crypto prices: {e}")
return []
def fetch_metal_prices(self) -> List[Dict]:
"""Fetch precious metal prices from CoinGecko API (alternative method)"""
try:
# CoinGecko also tracks precious metals as crypto tokens
# We'll use a conversion factor approach or goldapi.io
# For simplicity, using metalpriceapi.com free tier (no auth needed for basic)
# Alternative: Using goldapi.io (requires free API key)
# For this example, I'll use a simpler approach with metals-api.com format
# Since free metal APIs are limited, here's a practical approach:
# Use CoinGecko's precious metal tracking tokens like PAX Gold (PAXG)
# Or use a free API with limited calls
# Free option: metals.live (no auth needed but limited)
url = 'https://www.metals.live/api/v1/spot'
response = requests.get(url, timeout=10)
response.raise_for_status()
data = response.json()
prices = []
timestamp = datetime.now()
# Parse metals.live response format
metal_mapping = {
'gold': ('XAU', 'Gold'),
'silver': ('XAG', 'Silver'),
'platinum': ('XPT', 'Platinum'),
'palladium': ('XPD', 'Palladium')
}
for metal_key, (symbol, name) in metal_mapping.items():
if metal_key in data and 'price' in data[metal_key]:
prices.append({
'symbol': symbol,
'name': name,
'type': 'metal',
'price': data[metal_key]['price'],
'timestamp': timestamp
})
logger.info(f"Successfully fetched {len(prices)} metal prices")
return prices
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching metal prices: {e}")
return []
except Exception as e:
logger.error(f"Unexpected error fetching metal prices: {e}")
return []
def store_prices(self, prices: List[Dict]) -> None:
"""Store fetched prices in MySQL database"""
if not prices:
logger.warning("No prices to store")
return
try:
self.ensure_connection()
cursor = self.connection.cursor()
insert_query = """
INSERT INTO asset_prices
(asset_symbol, asset_name, asset_type, price_usd, timestamp)
VALUES (%s, %s, %s, %s, %s)
"""
records = [
(
price['symbol'],
price['name'],
price['type'],
price['price'],
price['timestamp']
)
for price in prices
]
cursor.executemany(insert_query, records)
self.connection.commit()
logger.info(f"Successfully stored {cursor.rowcount} price records")
cursor.close()
except Error as e:
logger.error(f"Error storing prices in database: {e}")
if self.connection:
self.connection.rollback()
def run(self) -> None:
"""Main loop to continuously fetch and store prices"""
logger.info("Starting price tracker...")
while True:
try:
# Fetch all prices
crypto_prices = self.fetch_crypto_prices()
metal_prices = self.fetch_metal_prices()
# Combine all prices
all_prices = crypto_prices + metal_prices
# Store in database
if all_prices:
self.store_prices(all_prices)
logger.info(f"Cycle complete. Stored {len(all_prices)} prices.")
else:
logger.warning("No prices fetched in this cycle")
# Wait 60 seconds before next fetch
logger.info("Waiting 60 seconds until next fetch...")
time.sleep(60)
except KeyboardInterrupt:
logger.info("Received shutdown signal. Stopping...")
break
except Exception as e:
logger.error(f"Unexpected error in main loop: {e}")
logger.info("Continuing after error... waiting 60 seconds")
time.sleep(60)
# Cleanup
if self.connection and self.connection.is_connected():
self.connection.close()
logger.info("Database connection closed")
def main():
"""Entry point for the script"""
tracker = PriceTracker()
tracker.run()
if __name__ == "__main__":
main()
Environment Configuration (.env file)
Create a .env file in the same directory:
DB_HOST=localhost
DB_USER=root
DB_PASSWORD=your_mysql_password
DB_NAME=price_tracker
Alternative: Using goldapi.io for Metals
If you prefer a dedicated metals API, here's the modified metal fetching function:
def fetch_metal_prices_goldapi(self) -> List[Dict]:
"""Fetch precious metal prices from goldapi.io (requires free API key)"""
API_KEY = os.getenv('GOLDAPI_KEY', 'your_api_key_here')
try:
metals = ['XAU', 'XAG', 'XPT', 'XPD']
prices = []
timestamp = datetime.now()
for metal in metals:
url = f'https://www.goldapi.io/api/{metal}/USD'
headers = {'x-access-token': API_KEY}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
data = response.json()
prices.append({
'symbol': metal,
'name': data.get('metal', metal),
'type': 'metal',
'price': data['price'],
'timestamp': timestamp
})
time.sleep(0.5) # Rate limiting
return prices
except Exception as e:
logger.error(f"Error fetching metal prices: {e}")
return []
Usage Instructions
- Install MySQL and create the database using the SQL schema above
- Install Python packages:
pip install requests mysql-connector-python python-dotenv - Create
.envfile with your MySQL credentials - Run the script:
python price_tracker.py
Notes
- API Limitations: Free tiers have limits. CoinGecko allows 10-50 calls/min. For metals, consider rotating between free APIs or using cached data.
- Error Handling: The script logs errors but continues running
- Database Growth: Consider adding a cleanup job to archive old data
- Production: Add monitoring, alerts, and consider using connection pooling
The script will run indefinitely, logging all activity to both console and price_tracker.log file.
Comments