Module 1 · Chapter 12 · Lesson 5

API Connections to Brokers and Data Vendors

6 min readSetting Up Your Trading Infrastructure
The Black Book of Day Trading Strategies
Free Book

The Black Book of Day Trading Strategies

1,000 complete strategies · 31 chapters · Full trade plans

API Connections to Brokers

Direct API (Application Programming Interface) connections automate trading. They link your algorithms to broker platforms. This eliminates manual order entry. It reduces latency. APIs allow programmatic access to market data, account information, and order execution.

Brokers offer various API types. REST (Representational State Transfer) APIs use HTTP requests. They are stateless. Each request contains all necessary information. WebSocket APIs maintain persistent connections. They push real-time data updates. FIX (Financial Information eXchange) APIs are industry standards. They optimize for high-frequency trading. FIX messages are bino. They offer low latency.

Choose an API based on your strategy's requirements. A low-frequency strategy might use REST. A high-frequency strategy needs FIX. Interactive Brokers (IBKR) provides a robust TWS API. It supports Java, Python, C#, and C++. Alpaca offers a REST API for commission-free stock and crypto trading. Their market data API provides historical and real-time feeds.

Authenticating with a broker API requires an API key and secret. These credentials identify your application. Protect them. Store them securely, not directly in your code. Use environment variables or a secure vault.

Connecting to IBKR's TWS API involves their Gateway or Trader Workstation. The Gateway runs in the background. It provides API access without the full TWS GUI. For example, to connect in Python:

python
from ibapi.client import EClient
from ibapi.wrapper import EWrapper

class IBAPIApp(EClient, EWrapper):
    def __init__(self):
        EClient.__init__(self, self)

    def error(self, reqId, errorCode, errorString, advancedOrderRejectJson=""):
        print(f"Error: {reqId} {errorCode} {errorString}")

    def nextValidId(self, orderId: int):
        print(f"Next Valid Order ID: {orderId}")

app = IBAPIApp()
app.connect("127.0.0.1", 7497, clientId=1) # Default port for Gateway
app.run()

This code snippet connects to the local IBKR Gateway. It prints errors and the next valid order ID. The clientId identifies your API session. Use unique client IDs for multiple concurrent applications.

Sending an order requires defining a contract and an order object. For example, buying 100 shares of SPY:

python
from ibapi.contract import Contract
from ibapi.order import Order

# Define contract
contract = Contract()
contract.symbol = "SPY"
contract.secType = "STK"
contract.exchange = "SMART"
contract.currency = "USD"

# Define order
order = Order()
order.action = "BUY"
order.totalQuantity = 100
order.orderType = "MKT" # Market order

app.placeOrder(app.nextOrderId(), contract, order)

This places a market buy order for SPY. Ensure your account has sufficient buying power.

Data Vendor API Connections

Mean reversion strategies depend on accurate historical and real-time data. Data vendor APIs provide this information. They offer various data types: tick, minute, daily, fundamental, and alternative data.

Popular data vendors include Polygon.io, Quandl (now part of Nasdaq Data Link), and Refinitiv (formerly Thomson Reuters). Each offers different data granularity and coverage. Polygon.io provides real-time and historical stock, options, and forex data. Nasdaq Data Link specializes in economic and fundamental data.

Connecting to a data vendor API typically involves an API key. This key authorizes your data requests. For Polygon.io, authenticate requests by including your API key in the URL or as a header.

To retrieve historical minute data for SPY from Polygon.io:

python
import requests
import json
from datetime import datetime, timedelta

API_KEY = "YOUR_POLYGON_API_KEY"
ticker = "SPY"
end_date = datetime.now()
start_date = end_date - timedelta(days=5)

url = f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/1/minute/{start_date.strftime('%Y-%m-%d')}/{end_date.strftime('%Y-%m-%d')}?adjusted=true&sort=asc&limit=50000&apiKey={API_KEY}"

response = requests.get(url)
data = json.loads(response.text)

if data['status'] == 'OK':
    for bar in data['results']:
        timestamp = datetime.fromtimestamp(bar['t'] / 1000)
        print(f"Time: {timestamp}, Open: {bar['o']}, High: {bar['h']}, Low: {bar['l']}, Close: {bar['c']}, Volume: {bar['v']}")
else:
    print(f"Error fetching data: {data['error']}")

This script fetches minute-level OHLCV data for SPY over the last five days. It prints each bar's details. Adjust limit for more data. Always respect rate limits imposed by data vendors. Exceeding them leads to temporary or permanent bans.

Real-time data often uses WebSocket connections. Polygon.io offers a WebSocket API for live updates. This is essential for strategies requiring immediate market reactions.

python
import websocket
import json

API_KEY = "YOUR_POLYGON_API_KEY"

def on_message(ws, message):
    data = json.loads(message)
    for event in data:
        if event['ev'] == 'A': # Aggregate minute bar
            timestamp = datetime.fromtimestamp(event['s'] / 1000)
            print(f"Real-time Bar: {timestamp}, Open: {event['o']}, Close: {event['c']}, Volume: {event['v']}")
        elif event['ev'] == 'T': # Trade event
            print(f"Real-time Trade: Symbol: {event['sym']}, Price: {event['p']}, Size: {event['s']}")


def on_error(ws, error):
    print(f"Error: {error}")

def on_close(ws, close_status_code, close_msg):
    print("Closed connection")

def on_open(ws):
    print("Opened connection")
    ws.send(json.dumps({"action": "auth", "params": API_KEY}))
    ws.send(json.dumps({"action": "subscribe", "params": "T.SPY,A.SPY"})) # Subscribe to trades and minute bars for SPY

websocket.enableTrace(True)
ws = websocket.WebSocketApp("wss://socket.polygon.io/stocks",
                            on_open=on_open,
                            on_message=on_message,
                            on_error=on_error,
                            on_close=on_close)
ws.run_forever()

This WebSocket client connects to Polygon.io. It authenticates and subscribes to real-time trades and minute bars for SPY. It prints incoming data.

Error Handling and Rate Limiting

API connections are not always stable. Implement robust error handling. Network issues, invalid requests, and server outages occur. Use try-except blocks in Python to catch exceptions.

For example, when making an HTTP request:

python
import requests

try:
    response = requests.get("https://api.example.com/data")
    response.raise_for_status()  # Raise an HTTPError for bad responses (4xx or 5xx)
    data = response.json()
except requests.exceptions.HTTPError as errh:
    print(f"Http Error: {errh}")
except requests.exceptions.ConnectionError as errc:
    print(f"Error Connecting: {errc}")
except requests.exceptions.Timeout as errt:
    print(f"Timeout Error: {errt}")
except requests.exceptions.RequestException as err:
    print(f"Something Else: {err}")

This code catches various requests library exceptions. It distinguishes between HTTP errors, connection errors, and timeouts.

Rate limiting restricts the number of API calls within a time window. Exceeding limits results in error responses (e.g., HTTP 429 Too Many Requests). Implement exponential backoff. If a request fails due to rate limiting, wait a short period and retry. Increase the wait time with each subsequent failure.

Example of a simple retry mechanism:

python
import time
import requests

def make_api_request(url, max_retries=5, initial_delay=1):
    delay = initial_delay
    for i in range(max_retries):
        try:
            response = requests.get(url)
            if response.status_code == 429:
                print(f"Rate limit hit. Retrying in {delay} seconds...")
                time.sleep(delay)
                delay *= 2  # Exponential backoff
                continue
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Request failed: {e}. Retrying in {delay} seconds...")
            time.sleep(delay)
            delay *= 2
    raise Exception(f"Failed after {max_retries} retries.")

# Example usage:
# data = make_api_request("https://api.polygon.io/v2/aggs/ticker/SPY/range/1/day/2023-01-01/2023-01-05?apiKey=YOUR_POLYGON_API_KEY")
# print(data)

This function retries API calls. It waits longer after each rate limit error or connection issue. This prevents overwhelming the API.

Data Normalization and Storage

Raw data from APIs often requires normalization. Different vendors use different formats for timestamps, prices, and volumes. Convert all data to a consistent format. For example, standardize timestamps to UTC. Store prices as decimals, not integers scaled by 100.

After retrieving data, store it efficiently. Databases are ideal for this. PostgreSQL or MySQL for relational data. MongoDB for document-oriented data. For time-series data, consider specialized databases like InfluxDB or TimescaleDB.

Store historical data locally. This reduces reliance on external APIs for backtesting. It speeds up data access. For example, save minute data to a CSV or Parquet file.

python
import pandas as pd
from datetime import datetime

# Assume 'data' is the list of dictionaries from the Polygon.io API call
# Each dictiono looks like: {'t': 1672531200000, 'o': 380.0, 'h': 380.5, 'l': 379.8, 'c': 380.2, 'v': 1000000}

if 'results' in data and data['results']:
    df = pd.DataFrame(data['results'])
    df['t'] = pd.to_datetime(df['t'], unit='ms', utc=True) # Convert timestamp to UTC datetime
    df.rename(columns={'t': 'timestamp', 'o': 'open', 'h': 'high', 'l': 'low', 'c': 'close', 'v': 'volume'}, inplace=True)
    df.set_index('timestamp', inplace=True)
    df.to_parquet('SPY_minute_data_2023.parquet')
    print("Data saved to SPY_minute_data_2023.parquet")
else:
    print("No data to save.")

This code converts Polygon.io's minute data into a Pandas DataFrame. It renames columns and sets the timestamp as the index. Finally, it saves the DataFrame to a Parquet file. Parquet is an efficient columnar storage format. It supports data compression.

Automate data ingestion. Schedule scripts to download and update your local database daily. This ensures your mean reversion models always use fresh data.

Build a modular API client. Separate data retrieval, order placement, and error handling logic. This improves code maintainability. It simplifies debugging.