# Code Examples

Production-ready client implementations with reconnection, heartbeat monitoring, and latency tracking.

{% tabs %}
{% tab title="Python" %}
Requires: `pip install websockets`

```python
import asyncio
import json
import ssl
import time
import websockets

API_KEY = "dsk_your_key_here"
WS_URL  = "wss://cryptolisting.ws"
# Filter specific exchanges (optional):
# WS_URL = "wss://cryptolisting.ws?cex=binance,upbit"

HEARTBEAT_TIMEOUT = 35  # seconds
MAX_RETRIES = 20


def on_announcement(msg: dict):
    now_us = int(time.time() * 1_000_000)
    network_ms = (now_us - msg["dispatchTimestampUs"]) / 1000
    total_ms   = (now_us - msg["publishTimestampUs"]) / 1000

    print(f"[{msg['listingType']}] {msg['ticker']} on {msg['publisher']}")
    print(f"  Title:   {msg['title']}")
    print(f"  Latency: {total_ms:.2f}ms total, {network_ms:.2f}ms network")


async def connect():
    ssl_ctx = ssl.create_default_context()
    ssl_ctx.check_hostname = False
    ssl_ctx.verify_mode = ssl.CERT_NONE

    headers = {"X-API-Key": API_KEY}

    for attempt in range(MAX_RETRIES):
        try:
            async with websockets.connect(
                WS_URL, extra_headers=headers, ssl=ssl_ctx
            ) as ws:
                print("Connected!")

                while True:
                    try:
                        raw = await asyncio.wait_for(
                            ws.recv(), timeout=HEARTBEAT_TIMEOUT
                        )
                    except asyncio.TimeoutError:
                        print("No heartbeat -- reconnecting")
                        break

                    msg = json.loads(raw)

                    if msg["type"] == "welcome":
                        print(f"Welcome: tier={msg['tier']}, "
                              f"cex={msg['allowedCex']}")

                    elif msg["type"] == "announcement":
                        on_announcement(msg)

                    elif msg["type"] == "heartbeat":
                        pass  # Connection alive

        except websockets.ConnectionClosed as e:
            if e.rcvd:
                reason = e.rcvd.reason
                if reason in ("key_expired", "key_revoked"):
                    print(f"Key is no longer valid: {reason}")
                    return
                print(f"Disconnected: {reason}")

        except Exception as e:
            print(f"Connection error: {e}")

        backoff = min(2 ** attempt, 300)
        print(f"Reconnecting in {backoff}s (attempt {attempt + 1})")
        await asyncio.sleep(backoff)

    print("Max retries exceeded")


if __name__ == "__main__":
    asyncio.run(connect())
```

{% endtab %}

{% tab title="Node.js" %}
Requires: `npm install ws`

```javascript
const WebSocket = require("ws");

const API_KEY = "dsk_your_key_here";
const WS_URL = "wss://cryptolisting.ws";
// Filter specific exchanges (optional):
// const WS_URL = "wss://cryptolisting.ws?cex=binance,upbit";

const HEARTBEAT_TIMEOUT_MS = 35_000;
const MAX_RETRIES = 20;

function onAnnouncement(msg) {
  const nowUs = Date.now() * 1000;
  const networkMs = (nowUs - msg.dispatchTimestampUs) / 1000;
  const totalMs = (nowUs - msg.publishTimestampUs) / 1000;

  console.log(`[${msg.listingType}] ${msg.ticker} on ${msg.publisher}`);
  console.log(`  Title:   ${msg.title}`);
  console.log(`  Latency: ${totalMs.toFixed(2)}ms total, ${networkMs.toFixed(2)}ms network`);
}

function connect(attempt = 0) {
  if (attempt >= MAX_RETRIES) {
    console.log("Max retries exceeded");
    return;
  }

  const ws = new WebSocket(WS_URL, {
    headers: { "X-API-Key": API_KEY },
    rejectUnauthorized: false,
  });

  let heartbeatTimer = null;

  function resetHeartbeat() {
    clearTimeout(heartbeatTimer);
    heartbeatTimer = setTimeout(() => {
      console.log("No heartbeat -- reconnecting");
      ws.terminate();
    }, HEARTBEAT_TIMEOUT_MS);
  }

  ws.on("open", () => {
    console.log("Connected!");
    resetHeartbeat();
  });

  ws.on("message", (data) => {
    resetHeartbeat();
    const msg = JSON.parse(data);

    if (msg.type === "welcome") {
      console.log(`Welcome: tier=${msg.tier}, cex=${msg.allowedCex}`);
    } else if (msg.type === "announcement") {
      onAnnouncement(msg);
    }
    // heartbeat: no action needed
  });

  ws.on("close", (code, reason) => {
    clearTimeout(heartbeatTimer);
    const reasonStr = reason.toString();

    if (reasonStr === "key_expired" || reasonStr === "key_revoked") {
      console.log(`Key is no longer valid: ${reasonStr}`);
      return;
    }

    const backoff = Math.min(2 ** attempt, 300);
    console.log(`Disconnected (${code}). Reconnecting in ${backoff}s...`);
    setTimeout(() => connect(attempt + 1), backoff * 1000);
  });

  ws.on("error", (err) => {
    console.error("WebSocket error:", err.message);
  });
}

connect();
```

{% endtab %}

{% tab title="Go" %}
Requires: `go get github.com/gorilla/websocket`

```go
package main

import (
	"crypto/tls"
	"encoding/json"
	"fmt"
	"log"
	"math"
	"net/http"
	"time"

	"github.com/gorilla/websocket"
)

const (
	apiKey           = "dsk_your_key_here"
	wsURL            = "wss://cryptolisting.ws"
	heartbeatTimeout = 35 * time.Second
	maxRetries       = 20
)

type Message struct {
	Type                string  `json:"type"`
	Title               string  `json:"title,omitempty"`
	Ticker              string  `json:"ticker,omitempty"`
	Publisher           string  `json:"publisher,omitempty"`
	ListingType         string  `json:"listingType,omitempty"`
	PublishTimestampUs  uint64  `json:"publishTimestampUs,omitempty"`
	DetectedTimestampUs uint64  `json:"detectedTimestampUs,omitempty"`
	DispatchTimestampUs uint64  `json:"dispatchTimestampUs,omitempty"`
	Tier                string  `json:"tier,omitempty"`
	MaxConnections      int     `json:"maxConnections,omitempty"`
	AllowedCex          string  `json:"allowedCex,omitempty"`
	TimestampNs         uint64  `json:"timestampNs,omitempty"`
	TimeUtc             string  `json:"timeUtc,omitempty"`
}

func connect() {
	dialer := websocket.Dialer{
		TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
	}
	headers := http.Header{"X-API-Key": {apiKey}}

	for attempt := 0; attempt < maxRetries; attempt++ {
		conn, _, err := dialer.Dial(wsURL, headers)
		if err != nil {
			log.Printf("Connection failed: %v", err)
		} else {
			log.Println("Connected!")
			conn.SetReadDeadline(time.Now().Add(heartbeatTimeout))

			for {
				_, rawMsg, err := conn.ReadMessage()
				if err != nil {
					log.Printf("Read error: %v", err)
					break
				}

				conn.SetReadDeadline(time.Now().Add(heartbeatTimeout))

				var msg Message
				json.Unmarshal(rawMsg, &msg)

				switch msg.Type {
				case "welcome":
					log.Printf("Welcome: tier=%s, cex=%s", msg.Tier, msg.AllowedCex)
				case "announcement":
					nowUs := uint64(time.Now().UnixMicro())
					totalMs := float64(nowUs-msg.PublishTimestampUs) / 1000
					fmt.Printf("[%s] %s on %s (%.2fms)\n",
						msg.ListingType, msg.Ticker, msg.Publisher, totalMs)
					fmt.Printf("  Title: %s\n", msg.Title)
				}
			}
			conn.Close()
		}

		backoff := math.Min(math.Pow(2, float64(attempt)), 300)
		log.Printf("Reconnecting in %.0fs (attempt %d)", backoff, attempt+1)
		time.Sleep(time.Duration(backoff) * time.Second)
	}

	log.Println("Max retries exceeded")
}

func main() {
	connect()
}
```

{% endtab %}

{% tab title="Rust" %}
Requires in `Cargo.toml`:

```toml
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
native-tls = "0.2"
```

```rust
use futures_util::StreamExt;
use serde::Deserialize;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio_tungstenite::{connect_async_tls_with_config, Connector};

const API_KEY: &str = "dsk_your_key_here";
const WS_URL: &str = "wss://cryptolisting.ws";

#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct Message {
    r#type: String,
    #[serde(default)] title: String,
    #[serde(default)] ticker: String,
    #[serde(default)] publisher: String,
    #[serde(default)] listing_type: String,
    #[serde(default)] publish_timestamp_us: u64,
    #[serde(default)] dispatch_timestamp_us: u64,
    #[serde(default)] tier: String,
    #[serde(default)] allowed_cex: String,
}

#[tokio::main]
async fn main() {
    let tls = native_tls::TlsConnector::builder()
        .danger_accept_invalid_certs(true)
        .build()
        .unwrap();

    let request = http::Request::builder()
        .uri(WS_URL)
        .header("X-API-Key", API_KEY)
        .header("Connection", "Upgrade")
        .header("Upgrade", "websocket")
        .header("Sec-WebSocket-Version", "13")
        .header("Sec-WebSocket-Key", tokio_tungstenite::tungstenite::handshake::client::generate_key())
        .body(())
        .unwrap();

    let (ws, _) = connect_async_tls_with_config(
        request, None, false, Some(Connector::NativeTls(tls))
    ).await.expect("Failed to connect");

    println!("Connected!");

    let (_, mut read) = ws.split();

    while let Some(Ok(frame)) = read.next().await {
        if let Ok(text) = frame.to_text() {
            if let Ok(msg) = serde_json::from_str::<Message>(text) {
                match msg.r#type.as_str() {
                    "welcome" => println!("Welcome: tier={}, cex={}", msg.tier, msg.allowed_cex),
                    "announcement" => {
                        let now_us = SystemTime::now()
                            .duration_since(UNIX_EPOCH).unwrap()
                            .as_micros() as u64;
                        let total_ms = (now_us - msg.publish_timestamp_us) as f64 / 1000.0;
                        println!("[{}] {} on {} ({:.2}ms)",
                            msg.listing_type, msg.ticker, msg.publisher, total_ms);
                    }
                    _ => {}
                }
            }
        }
    }
}
```

{% endtab %}
{% endtabs %}
