Python
import aiohttp
import asyncio
from pytz import timezone
import random
import numpy as np
import sys
import datetime
import time
import requests
import json
from collections import deque
from datetime import timedelta

# Constants for OKX API
OKX_BASE_URL = 'https://www.okx.com'
OHLCV_ENDPOINT = '/api/v5/market/candles'

def getusdtswapsyms_list():
    # Define the URL and the arguments
    url = "https://www.okx.com/api/v5/public/instruments"
    args = {"instType": "SWAP"}

    # Make the GET request and store the response   --- with proxy
    response = requests.get(url, params=args)

    # turn syms infos string into dictionary
    instrums = json.loads(response.text) 

    # grab all usdt swaps from all syms
    symlist = [i['instId'] for i in instrums['data'] if i['instId'].endswith('-USDT-SWAP') and i['instId']!='DMAIL-USDT-SWAP']

    return symlist

syms_list = getusdtswapsyms_list()

user_agents = [
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36'
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15'
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 13_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15'
]
headers = {'User-Agent': random.choice(user_agents)}

request_times = deque(maxlen=40)
rate_limit = 40
rate_limit_window = 2.0  # seconds

async def _wait_for_rate_limit():
    """Ensure we don't exceed rate limit."""
    
    while True:  # This loop will keep checking until we can add a request
        now = datetime.datetime.now()  # Always get the current time

        if len(request_times) < rate_limit:
            # If we are below the rate limit, simply append now and exit
            request_times.append(now)
            return  # Exit the function to proceed with the request
        
        # else, it reached 40 items.....
        # Check the oldest request time
        time_diff = now - request_times[0]

        if time_diff.total_seconds() < rate_limit_window:
            # too fast....Wait until the oldest request is old enough
            await asyncio.sleep(rate_limit_window - time_diff.total_seconds())
            now = datetime.datetime.now()  # Always get the current time

        # whether too fast or not, adding new one, popping out the oldest one automatically
        # print("It's safe to add a new request; the oldest request is outside of the rate limit window.")
        request_times.append(now)
        return  # Exit to proceed with the request
        
# Fetch historical OHLCV data for a symbol with pagination
async def fetch(symbol, session, tf, limit):
    """
    Fetches historical OHLCV data for a given symbol with pagination.
    Makes multiple requests via OKX API to handle large datasets.

    Args:
        symbol (str): Trading symbol, e.g., "BTC-USDT-SWAP"
        tf (str): Timeframe, e.g., "1m" or "1h".
        limit (int): Total number of candles to fetch.
        session (aiohttp.ClientSession): Shared aiohttp session.

    Returns:
        list: List of OHLCV data, where each entry is [timestamp, open, high, low, close, volume].
    """
        
    ohlcv_data = []
    max_entries_per_request = 300  # API limit 
    endpoint = f"{OKX_BASE_URL}{OHLCV_ENDPOINT}"
    
    # Pagination control
    remaining_candles = limit
    after = None  # Timestamp for pagination (None means fetch latest data)

    while remaining_candles > 0:
        # Set up request parameters
        params = {
            'instId': symbol,
            'bar': tf,  # Timeframe, e.g., 1m, 1h, etc.
            'limit': min(remaining_candles, max_entries_per_request),  # How many bars to fetch
        }
        if after:
            params['after'] = after  # Progressively fetch data older than this timestamp

        await _wait_for_rate_limit()  # <----RATE LIMIT CALLED, before every get
        
        async with session.get(endpoint, params=params, headers=headers) as response:
            if response.status == 429:  # Too Many Requests
                retry_after = int(response.headers.get('Retry-After', 1))
                print(f"Rate limit hit for {symbol}. Retrying after {retry_after} seconds.")
                await asyncio.sleep(retry_after)
                continue
            
            if response.status != 200:
                print(f"Error fetching data for {symbol}: {response.status}, {await response.text()}")
                break

            data = await response.json()
            fetched_candles = data.get('data', [])

            # Stop the loop if no new candles are fetched
            if not fetched_candles:
                # print("No more candles to fetch. Stopping.")
                break

            # Append the fetched candles to our dataset
            ohlcv_data.extend(fetched_candles)
            
            # Update the after parameter for the next pagination request
            last_timestamp = fetched_candles[-1][0]  # Get the last timestamp from fetched data
            after = str(int(last_timestamp) - 1) # Set before to one millisecond after the last timestamp

            # Decrease the number of remaining candles by the number fetched
            remaining_candles -= len(fetched_candles)

    print(f"Completed fetching {len(ohlcv_data)} candles for {symbol}.")
       
    # Convert timestamp values into datetime while handling DST properly
    for sublist in ohlcv_data:
        # naive_dt = datetime.datetime.fromtimestamp(int(sublist[0]) / 1000)  # Naive datetime (no timezone info)
        # sublist[0] = timezone.localize(naive_dt, is_dst=None)  # Localize with timezone, automatically resolve ambiguity
        local_time = datetime.datetime.fromtimestamp(int(sublist[0]) / 1000).astimezone(timezone('US/Eastern'))
        sublist[0] = local_time.astimezone(timezone('Europe/Lisbon')).replace(tzinfo=None)
        sublist[-1] = int(sublist[-1])  # Convert last element to integer
        for i in range(1, len(sublist) - 1):
            sublist[i] = float(sublist[i])  # Convert values to float

    # return ohlcv_data
    return np.array(ohlcv_data)

# Main function: Fetch OHLCV data for multiple symbols concurrently
async def fetchall(tf, limit):
    """
    Fetches historical OHLCV for multiple symbols concurrently.

    Args:
        symbols (list of str): List of trading symbols (e.g., BTC-USDT-SWAP).
        tf (str): Timeframe, e.g., "1m" or "1h".
        limit (int): Total number of candles to fetch.

    Returns:
        dict: A dictionary where keys are symbols and values are their OHLCV data.
    """
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(symbol,session, tf, limit) for symbol in syms_list]
        results = await asyncio.gather(*tasks)

    candle_dict = {syms_list[i]: results[i] for i in range(len(syms_list))}
    for key, value in candle_dict.items():
        if isinstance(value, np.ndarray) and value.size == 0:
            print(f'{key} has empty data.') 
    return candle_dict

# Example usage
async def main():
    timeframe = "1H"  # Define your timeframe
    howmanyklines = 1440  # Total K-lines you want to fetch
    st = datetime.datetime.now()
    candles = await fetchall(timeframe, howmanyklines)
    print("download costed time: ",datetime.datetime.now()-st)
    # print(candles)
    print(len(candles))
    
asyncio.run(main())
line 193, in <module>
    asyncio.run(main())
  File "/lib/python312.zip/asyncio/runners.py", line 190, in run
    raise RuntimeError(
RuntimeError: asyncio.run() cannot be called from a running event loop