Code Examples

Production-ready client implementations with reconnection, heartbeat monitoring, and latency tracking.

Requires: pip install websockets

import asyncio
import json
import ssl
import time
import websockets

API_KEY = "dsk_your_key_here"
WS_URL  = "wss://cryptolisting.ws"
# Filter specific exchanges (optional):
# WS_URL = "wss://cryptolisting.ws?cex=binance,upbit"

HEARTBEAT_TIMEOUT = 35  # seconds
MAX_RETRIES = 20


def on_announcement(msg: dict):
    now_us = int(time.time() * 1_000_000)
    network_ms = (now_us - msg["dispatchTimestampUs"]) / 1000
    total_ms   = (now_us - msg["publishTimestampUs"]) / 1000

    print(f"[{msg['listingType']}] {msg['ticker']} on {msg['publisher']}")
    print(f"  Title:   {msg['title']}")
    print(f"  Latency: {total_ms:.2f}ms total, {network_ms:.2f}ms network")


async def connect():
    ssl_ctx = ssl.create_default_context()
    ssl_ctx.check_hostname = False
    ssl_ctx.verify_mode = ssl.CERT_NONE

    headers = {"X-API-Key": API_KEY}

    for attempt in range(MAX_RETRIES):
        try:
            async with websockets.connect(
                WS_URL, extra_headers=headers, ssl=ssl_ctx
            ) as ws:
                print("Connected!")

                while True:
                    try:
                        raw = await asyncio.wait_for(
                            ws.recv(), timeout=HEARTBEAT_TIMEOUT
                        )
                    except asyncio.TimeoutError:
                        print("No heartbeat -- reconnecting")
                        break

                    msg = json.loads(raw)

                    if msg["type"] == "welcome":
                        print(f"Welcome: tier={msg['tier']}, "
                              f"cex={msg['allowedCex']}")

                    elif msg["type"] == "announcement":
                        on_announcement(msg)

                    elif msg["type"] == "heartbeat":
                        pass  # Connection alive

        except websockets.ConnectionClosed as e:
            if e.rcvd:
                reason = e.rcvd.reason
                if reason in ("key_expired", "key_revoked"):
                    print(f"Key is no longer valid: {reason}")
                    return
                print(f"Disconnected: {reason}")

        except Exception as e:
            print(f"Connection error: {e}")

        backoff = min(2 ** attempt, 300)
        print(f"Reconnecting in {backoff}s (attempt {attempt + 1})")
        await asyncio.sleep(backoff)

    print("Max retries exceeded")


if __name__ == "__main__":
    asyncio.run(connect())

Last updated

Was this helpful?