import aiohttp
import asyncio
from pytz import timezone
import random
import numpy as np
import sys
import datetime
import time
import requests
import json
from collections import deque
from datetime import timedelta
# Constants for OKX API
OKX_BASE_URL = 'https://www.okx.com'
OHLCV_ENDPOINT = '/api/v5/market/candles'
def getusdtswapsyms_list():
# Define the URL and the arguments
url = "https://www.okx.com/api/v5/public/instruments"
args = {"instType": "SWAP"}
# Make the GET request and store the response --- with proxy
response = requests.get(url, params=args)
# turn syms infos string into dictionary
instrums = json.loads(response.text)
# grab all usdt swaps from all syms
symlist = [i['instId'] for i in instrums['data'] if i['instId'].endswith('-USDT-SWAP') and i['instId']!='DMAIL-USDT-SWAP']
return symlist
syms_list = getusdtswapsyms_list()
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15'
'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15'
]
headers = {'User-Agent': random.choice(user_agents)}
request_times = deque(maxlen=40)
rate_limit = 40
rate_limit_window = 2.0 # seconds
async def _wait_for_rate_limit():
"""Ensure we don't exceed rate limit."""
while True: # This loop will keep checking until we can add a request
now = datetime.datetime.now() # Always get the current time
if len(request_times) < rate_limit:
# If we are below the rate limit, simply append now and exit
request_times.append(now)
return # Exit the function to proceed with the request
# else, it reached 40 items.....
# Check the oldest request time
time_diff = now - request_times[0]
if time_diff.total_seconds() < rate_limit_window:
# too fast....Wait until the oldest request is old enough
await asyncio.sleep(rate_limit_window - time_diff.total_seconds())
now = datetime.datetime.now() # Always get the current time
# whether too fast or not, adding new one, popping out the oldest one automatically
# print("It's safe to add a new request; the oldest request is outside of the rate limit window.")
request_times.append(now)
return # Exit to proceed with the request
# Fetch historical OHLCV data for a symbol with pagination
async def fetch(symbol, session, tf, limit):
"""
Fetches historical OHLCV data for a given symbol with pagination.
Makes multiple requests via OKX API to handle large datasets.
Args:
symbol (str): Trading symbol, e.g., "BTC-USDT-SWAP"
tf (str): Timeframe, e.g., "1m" or "1h".
limit (int): Total number of candles to fetch.
session (aiohttp.ClientSession): Shared aiohttp session.
Returns:
list: List of OHLCV data, where each entry is [timestamp, open, high, low, close, volume].
"""
ohlcv_data = []
max_entries_per_request = 300 # API limit
endpoint = f"{OKX_BASE_URL}{OHLCV_ENDPOINT}"
# Pagination control
remaining_candles = limit
after = None # Timestamp for pagination (None means fetch latest data)
while remaining_candles > 0:
# Set up request parameters
params = {
'instId': symbol,
'bar': tf, # Timeframe, e.g., 1m, 1h, etc.
'limit': min(remaining_candles, max_entries_per_request), # How many bars to fetch
}
if after:
params['after'] = after # Progressively fetch data older than this timestamp
await _wait_for_rate_limit() # <----RATE LIMIT CALLED, before every get
async with session.get(endpoint, params=params, headers=headers) as response:
if response.status == 429: # Too Many Requests
retry_after = int(response.headers.get('Retry-After', 1))
print(f"Rate limit hit for {symbol}. Retrying after {retry_after} seconds.")
await asyncio.sleep(retry_after)
continue
if response.status != 200:
print(f"Error fetching data for {symbol}: {response.status}, {await response.text()}")
break
data = await response.json()
fetched_candles = data.get('data', [])
# Stop the loop if no new candles are fetched
if not fetched_candles:
# print("No more candles to fetch. Stopping.")
break
# Append the fetched candles to our dataset
ohlcv_data.extend(fetched_candles)
# Update the after parameter for the next pagination request
last_timestamp = fetched_candles[-1][0] # Get the last timestamp from fetched data
after = str(int(last_timestamp) - 1) # Set before to one millisecond after the last timestamp
# Decrease the number of remaining candles by the number fetched
remaining_candles -= len(fetched_candles)
print(f"Completed fetching {len(ohlcv_data)} candles for {symbol}.")
# Convert timestamp values into datetime while handling DST properly
for sublist in ohlcv_data:
# naive_dt = datetime.datetime.fromtimestamp(int(sublist[0]) / 1000) # Naive datetime (no timezone info)
# sublist[0] = timezone.localize(naive_dt, is_dst=None) # Localize with timezone, automatically resolve ambiguity
local_time = datetime.datetime.fromtimestamp(int(sublist[0]) / 1000).astimezone(timezone('US/Eastern'))
sublist[0] = local_time.astimezone(timezone('Europe/Lisbon')).replace(tzinfo=None)
sublist[-1] = int(sublist[-1]) # Convert last element to integer
for i in range(1, len(sublist) - 1):
sublist[i] = float(sublist[i]) # Convert values to float
# return ohlcv_data
return np.array(ohlcv_data)
# Main function: Fetch OHLCV data for multiple symbols concurrently
async def fetchall(tf, limit):
"""
Fetches historical OHLCV for multiple symbols concurrently.
Args:
symbols (list of str): List of trading symbols (e.g., BTC-USDT-SWAP).
tf (str): Timeframe, e.g., "1m" or "1h".
limit (int): Total number of candles to fetch.
Returns:
dict: A dictionary where keys are symbols and values are their OHLCV data.
"""
async with aiohttp.ClientSession() as session:
tasks = [fetch(symbol,session, tf, limit) for symbol in syms_list]
results = await asyncio.gather(*tasks)
candle_dict = {syms_list[i]: results[i] for i in range(len(syms_list))}
for key, value in candle_dict.items():
if isinstance(value, np.ndarray) and value.size == 0:
print(f'{key} has empty data.')
return candle_dict
# Example usage
async def main():
timeframe = "1H" # Define your timeframe
howmanyklines = 1440 # Total K-lines you want to fetch
st = datetime.datetime.now()
candles = await fetchall(timeframe, howmanyklines)
print("download costed time: ",datetime.datetime.now()-st)
# print(candles)
print(len(candles))
asyncio.run(main())
Click Run or press shift + ENTER to run code