Getting crypto prices update from Bybit exchange

bybit golang websocket

Introduction

In this article I’m going to connect to Bybit API and we are going to connect to it using websocket. The documentation for the API can be found here.

This could be quite handy in case you want to keep track of the prices of a particular crypto asset. In our case we are going to keep track only of TONUSDT pair. Depending on the goals you have you can later use this price result as it might more convenient for you.

As in our previous articles we are going to use Golang for performing this task, so our requirements would be.

Requirements

  1. Golang installed
  2. Bybit account in order to request an API Key

Code with comments

As always I’ll try to be brief. Due to the size of this example, I’ll put only the basics and you can check the full code in this repository

Project structure

The structure of our project will looks like this


$ tree .

.
├── client
│   ├── client.go
│   ├── request.go
│   └── response.go
├── go.mod
├── go.sum
└── main.go

1 directory, 6 files

Main package

This is how will looks like in the main package

package main

import (
	"context"
	"practice/bybit/websocket/bybit"
)

func main() {
	clt, err := bybit.NewClient()
	if err != nil {
		panic(err)
	}

	ctx := context.Background()

	tickerSubsciption := bybit.Request{
		Op: "subscribe",
		Args: []interface{}{
			bybit.TickersTONUSDTTopic,
		},
	}
	subscriptions := []bybit.Request{
		tickerSubsciption,
	}

	clt.Run(ctx, subscriptions)
}

Client package

This package contains the implementation for the client.

file: client/client.go


package client

import (
	"context"
	"encoding/json"
	"fmt"
	"net/url"
	"os"
	"os/signal"
	"time"

	"github.com/gorilla/websocket"
)

// Bybit organize the topics to which
// we can subscribe into Public and Private topics.
// Basically topics to which you can subscribe without authentication
// and the ones you can only subscribe with Authentication.
type ChannelType string

// There are several types of trades on bybit
// we are going to use only spot.
type TradesType string

const (
	PublicChannel  ChannelType = "public"
	PrivateChannel ChannelType = "private"
	Spot           TradesType  = "spot"

	// topic for getting latest prices of
	// TONUSDT pair in spot trading.
	TickersTONUSDTTopic = "tickers.TONUSDT"

	APIVersion           = "v5"
	ByBitWebsocketDomain = "stream.bybit.com"

	// In order to comply with bybit documentation is necessary to
	// send every 20 seconds a ping command to avoid disconnection.
	// A basic heartbeat mechanism.
	PingTimeout = 20

	// Even providing the heartbeat you will suffer disconnections
	// simply because that's how programming works, you should be prepared
	// for every kind of errors.
	MaxRetrialConnections = 10
)

// Client represents connection with ByBit Websocket API.
type Client struct {
	conn *websocket.Conn
}

func NewClient() (*Client, error) {
	conn, err := connect()
	if err != nil {
		return nil, err
	}

	return &Client{conn: conn}, nil
}

func path(channelType ChannelType, operation TradesType) string {
	return fmt.Sprintf("/%s/%s/%s", APIVersion, channelType, operation)
}

// connect to bybit websocket stream.
func connect() (*websocket.Conn, error) {
	spotPath := path(PublicChannel, Spot)

	u := url.URL{Scheme: "wss", Host: ByBitWebsocketDomain, Path: spotPath}
	fmt.Printf("connecting to %s\n", u.String())

	conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)

	return conn, err
}

func (c *Client) sendPing() error {
	pingReq := Request{
		ReqID: "100001",
		Op:    "ping",
	}

	return c.conn.WriteJSON(&pingReq)
}

// replace connection with a new one
func (c *Client) reconnect() error {
	conn, err := connect()
	if err != nil {
		return err
	}

	c.conn = conn

	return nil
}

// Run connect to bybit websocket, general idea of what it does.
// 1. Subscribe to tickers
// 2. Read message from websocket.
// 3. Send every 20 seconds a ping(heartbeat), to avoid disconnections.
// 4. In case of abnormal close of connection, performs a reconnection.
// 5. In case the reconnection exceed the max allowed, shut the program.
// 6. Also listen to Ctr+C commands to shutdown gratefully.
func (c *Client) Run(
	ctx context.Context,
	subscriptions []Request,	
) error {
	// interupt signal necessary for catching Ctr+C
	// and shut down gratefully
	interrupt := make(chan os.Signal, 1)
	signal.Notify(interrupt, os.Interrupt)

	// ticker for sending ping command every 20 seconds
	// for the heartbeat mechanism.
	pingTicker := time.NewTicker(PingTimeout * time.Second)
	// we are going to report our errors in this channel
	errChn := make(chan error)
	done := make(chan struct{})
	var connections int = 0

	// in case we are going to have a disconnection we are going
	// back to this tag to reconnect.
CONNECTION: 
	if connections > 0 {
		err := c.reconnect()
		if err != nil {
			return err
		}
	}
	
	defer func() {
		pingTicker.Stop()
		close(errChn)
		c.conn.Close()
	}()

	// let's increase our connection counter
	// to keep a record of our max retries
	connections++

	// this gourutine will process every message incomming
	// related to the subscriptions passed.
	go func() {
		defer close(done)
		err := c.processRead(ctx, done, subscriptions)
		if err != nil {
			errChn <- err

			return
		}
	}()

	// wait for possible errors and interrupt signals.
	// send ping command every PingTimeout(20) seconds.
	for {
		select {
		case err := <-errChn:
			// not all the errors are retriable
			if waitTime, ok := retriableError(err, connections); ok {
				if connections > MaxRetrialConnections {
					return err
				}

				fmt.Println("RECONNECTING.....")
				fmt.Printf("Sleeping %v milliseconds\n", waitTime)
				time.Sleep(waitTime)
				// go to tag defined above
				goto CONNECTION
			}

			return err

		// in case of Ctr+C let's handle the connection
		case <-interrupt:
			return c.handleInterruptSignal(done)

		// when ticker is triggered send a ping command
		case <-pingTicker.C:
			err := c.sendPing()
			if err != nil {
				return err
			}
		}
	}
}

func (c *Client) processMsg(ctx context.Context, data []byte) error {
	var msg PublicResponse

	err := json.Unmarshal(data, &msg)
	if err != nil {
		return err
	}

	fmt.Printf("MSG: %s\n", string(data))
	return nil	
}

func (c *Client) processRead(
	ctx context.Context,
	done chan struct{},	
	subscriptions []Request,	
) error {
	// first ping to send.
	c.sendPing()

	for _, subscription := range subscriptions {
		err := c.conn.WriteJSON(subscription)
		if err != nil {
			return fmt.Errorf("sending subscription %w", err)
		}
	}

	for {
		select {
		case <-done:
			return nil
		default:
			_, message, err := c.conn.ReadMessage()
			if err != nil {
				return fmt.Errorf("reading %w", err)
			}

			err = c.processMsg(ctx, message)
			if err != nil {
				fmt.Println("ERR PROCESSING READ: ", err.Error())
			}
		}
	}
}

func (c *Client) handleInterruptSignal(
	done chan struct{},	
) error {
	fmt.Println("Closing connection...it might take a few seconds until the next tickers comes")
	done <- struct{}{}
	// Cleanly close the connection by sending a close message and then
	// waiting (with timeout) for the server to close the connection.
	err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
	if err != nil {
		return err
	}

	select {
	case <-time.After(time.Second):
	}

	return nil
}


func retriableError(err error, connections int) (time.Duration, bool) {
	waitTime := time.Duration(connections) * 500 * time.Millisecond
	// 1006(CloseAbnormalClosure) checking if was an abnormal closure.
	isAbnormalClosure := websocket.IsUnexpectedCloseError(err, websocket.CloseAbnormalClosure)
	// 1001(CloseGoingAway) indicates that an endpoint is "going away", such as a server
	// going down or a browser having navigated away from a page.
	isGoingAway := websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway)

	isRetriable := isAbnormalClosure || isGoingAway

	// 1013(CloseTryAgainLater) not defined in rfc6455 but can be faced as well.
	if websocket.IsUnexpectedCloseError(err, websocket.CloseTryAgainLater) {
		isRetriable = true
		waitTime = time.Duration(connections) * time.Second
	}

	return waitTime, isRetriable
}

Running output

Running this code should produce an output as this one:

connecting to wss://stream.bybit.com/v5/public/spot
MSG: {"success":true,"ret_msg":"pong","conn_id":"d0822166-72b0-44f7-b6d9-101e5c0c98ad","req_id":"100001","op":"ping"}
MSG: {"success":true,"ret_msg":"subscribe","conn_id":"d0822166-72b0-44f7-b6d9-101e5c0c98ad","op":"subscribe"}
MSG: {"topic":"tickers.TONUSDT","ts":1684322899614,"type":"snapshot","cs":9133729124,"data":{"symbol":"TONUSDT","lastPrice":"1.9916","highPrice24h":"2.0063","lowPrice24h":"1.9694","prevPrice24h":"1.9729","volume24h":"115096.79","turnover24h":"228228.375398","price24hPcnt":"0.0095","usdIndexPrice":""}}
MSG: {"topic":"tickers.TONUSDT","ts":1684322975467,"type":"snapshot","cs":9133777693,"data":{"symbol":"TONUSDT","lastPrice":"1.9916","highPrice24h":"2.0063","lowPrice24h":"1.9694","prevPrice24h":"1.9729","volume24h":"115096.79","turnover24h":"228228.375398","price24hPcnt":"0.0095","usdIndexPrice":""}}
MSG: {"success":true,"ret_msg":"pong","conn_id":"d0822166-72b0-44f7-b6d9-101e5c0c98ad","req_id":"100001","op":"ping"}
MSG: {"success":true,"ret_msg":"pong","conn_id":"d0822166-72b0-44f7-b6d9-101e5c0c98ad","req_id":"100001","op":"ping"}
MSG: {"success":true,"ret_msg":"pong","conn_id":"d0822166-72b0-44f7-b6d9-101e5c0c98ad","req_id":"100001","op":"ping"}
MSG: {"topic":"tickers.TONUSDT","ts":1684323035468,"type":"snapshot","cs":9133813935,"data":{"symbol":"TONUSDT","lastPrice":"1.9916","highPrice24h":"2.0063","lowPrice24h":"1.9694","prevPrice24h":"1.9736","volume24h":"115096.79","turnover24h":"228228.375398","price24hPcnt":"0.0091","usdIndexPrice":""}}
MSG: {"success":true,"ret_msg":"pong","conn_id":"d0822166-72b0-44f7-b6d9-101e5c0c98ad","req_id":"100001","op":"ping"}
MSG: {"success":true,"ret_msg":"pong","conn_id":"d0822166-72b0-44f7-b6d9-101e5c0c98ad","req_id":"100001","op":"ping"}  

Notes

If you will need to use some private subscriptions, is mandatory to sign up in Bybit exchange and ask for API KEY. Is always recommended to consult the documentation of the API.