Code Examples
import asyncio
import json
import ssl
import time
import websockets
API_KEY = "dsk_your_key_here"
WS_URL = "wss://cryptolisting.ws"
# Filter specific exchanges (optional):
# WS_URL = "wss://cryptolisting.ws?cex=binance,upbit"
HEARTBEAT_TIMEOUT = 35 # seconds
MAX_RETRIES = 20
def on_announcement(msg: dict):
now_us = int(time.time() * 1_000_000)
network_ms = (now_us - msg["dispatchTimestampUs"]) / 1000
total_ms = (now_us - msg["publishTimestampUs"]) / 1000
print(f"[{msg['listingType']}] {msg['ticker']} on {msg['publisher']}")
print(f" Title: {msg['title']}")
print(f" Latency: {total_ms:.2f}ms total, {network_ms:.2f}ms network")
async def connect():
ssl_ctx = ssl.create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = ssl.CERT_NONE
headers = {"X-API-Key": API_KEY}
for attempt in range(MAX_RETRIES):
try:
async with websockets.connect(
WS_URL, extra_headers=headers, ssl=ssl_ctx
) as ws:
print("Connected!")
while True:
try:
raw = await asyncio.wait_for(
ws.recv(), timeout=HEARTBEAT_TIMEOUT
)
except asyncio.TimeoutError:
print("No heartbeat -- reconnecting")
break
msg = json.loads(raw)
if msg["type"] == "welcome":
print(f"Welcome: tier={msg['tier']}, "
f"cex={msg['allowedCex']}")
elif msg["type"] == "announcement":
on_announcement(msg)
elif msg["type"] == "heartbeat":
pass # Connection alive
except websockets.ConnectionClosed as e:
if e.rcvd:
reason = e.rcvd.reason
if reason in ("key_expired", "key_revoked"):
print(f"Key is no longer valid: {reason}")
return
print(f"Disconnected: {reason}")
except Exception as e:
print(f"Connection error: {e}")
backoff = min(2 ** attempt, 300)
print(f"Reconnecting in {backoff}s (attempt {attempt + 1})")
await asyncio.sleep(backoff)
print("Max retries exceeded")
if __name__ == "__main__":
asyncio.run(connect())Last updated
Was this helpful?