Sanskar's blog

GoLoadBalance v0.1





Sections:




Introduction:



Hi and welcome to my first ever blog post! I am Sanskar Gauchan, and I like to build and play around with cool stuff. Lately, I have been spending some time building a load balancer with Golang. Partly because I wanted to become more proficient with Go and its standard library, and also to understand how the core of a load balancer works. After building this project, I realized that I love Golang a lot more than I thought I would. But, I’ll go more into detail about that in the later sections of this blog post.


Motivation: I guess this is more of a fun project for me than trying to build something production-ready. My motivation was to simply have fun with Golang while building something interesting.


End Goal: My objective isn’t to build a production-ready load balancer, but rather to understand its inner workings. By the end, I aim to have a version that functions correctly and can handle stress, route requests, and more.


If you want to see the code, it can be found at my Github.


Before we start, a crash course in Go Routines:



I will be working extensively using Go routines through out this project, so, we should have a understanding of how Go routines work.


So, let’s start with Go Routines. Go routines are the primary building blocks of Go’s concurrency mechanism. Unlike in C, where you’d use pthread_create and pthread_join, or in Rust, where you’d use thread::spawn and join to create and join threads, Go offers a more straightforward syntax. This simplicity makes it easier for developers to execute tasks concurrently. The syntax is as follows:

go doMyTask();

Now, voila! Your task is now running inside a Go routine. It’s important to note that Go routines are not individual OS-level threads. While they resemble threads in that they have their own stack (though the stack size for Go routines is smaller compared to OS-level threads, which often allocate at least 1MB), they are managed differently in terms of CPU time allocation.


The distinction lies in the management of execution. In Go, the runtime manages a set number of OS-level threads, which defaults to the number of cores on your machine. Go’s runtime then oversees these OS-level threads to execute concurrent tasks. This means that if you initiate 100 Go routines, you aren’t launching 100 OS-level threads. Instead, you’re signaling to Go that you want these routines to execute concurrently. The Go runtime ensures that these routines are distributed across the available threads in the process. In essence, Go possesses an intelligent scheduler that determines which Go routines operate on which threads at any particular moment.


You might wonder, “Doesn’t the CPU already manage thread execution by giving a thread a certain amount of time before context switching and transferring control to another process?” So, why the need for a scheduler within Go’s runtime?


The answer lies in the efficiency of Go’s runtime in managing concurrent functions. It’s adept at assessing priorities and determining which function to execute when. For instance, let’s say you have 500 Go routines, and 300 of them initiate a network request. These routines will be blocked until a response is received. Since they’re waiting, it’s inefficient for them to occupy CPU time. The other 200 Go routines, which aren’t waiting on a network response, could utilize that time instead.


Go’s scheduler recognizes that these 300 routines are awaiting a network response and, thus, it preemptively switches out the currently executing routine in the thread with another that can perform its task without delay. It won’t run the blocked routines until their respective blocking operations (like waiting for a network response) conclude. This approach ensures optimal utilization of CPU time. Pretty impressive, isn’t it?


Designing a loadbalancer:



My approach to this is rather straightforward. When a request reaches the port on which the load balancer is listening, a new Go routine is spawned. This Go routine identifies the appropriate server to route to and dispatches the request to it. You can see a detailed flow of this process below:


architecture picture


There are many pitfalls to this approach:


  • Goroutine Creation on Each Request: For every incoming request, a new goroutine is spawned. While this isn’t as resource-intensive as creating a new thread, it still incurs overhead. Implementing a mechanism akin to a thread pool, but tailored for goroutines, could enhance performance. This approach would prevent the initialization of a new goroutine for every request.

  • Excessive Use of Mutexes: In several parts of the code, I’ve used Mutexes extensively to prevent data races. This has, in some instances, adversely impacted performance. A more efficient way to synchronize data access between goroutines is through channels, which facilitate data sharing without direct access to shared resources. Admittedly, this being my first Go project, I didn’t leverage channels as much as I could have. In retrospect, I recognize their efficacy in preventing data races and reducing the dependency on Mutexes.

  • Potential for Goroutine Starvation: Related to the previous point, there’s a risk of goroutine starvation. When a goroutine attempts to acquire a lock on a resource and finds it locked, the Go scheduler marks it as non-runnable. Subsequent goroutines attempting to lock the same resource experience the same fate. Once the resource is free, there’s no preserved order in which the waiting goroutines tried to access the resource. If numerous goroutines are vying for the same lock, it’s conceivable that one or more could experience significant delays. In an extreme scenario, and with sheer unfortunate timing, some might not run at all if there’s a constant influx of competing goroutines.

Things I want to build into GoLoadBalance:



We won’t be talking about all of the features I have implemented but only key features that I found interesting.


  • Can take cli arguments for config.
  • Health checks to ping server whether it’s still alive or not.
  • Loading balancing using round_robin and Circular/Ring buffers.
  • Can pass .yaml config files.
  • Basic Caching by hashing parts of the request header
  • Max Retries before a server is removed from the server pool.
  • Add/Remove server at runtime.
  • Blacklisting IPs.
  • Support Webhooks for when a server dies.

Setup:



To start, we need a set of servers we can load balance to. This is a perfect use case of using docker and it’s powerful tool docker compose. So, I just created a dummy flask app:

from flask import Flask
import time

app = Flask(__name__)

@app.route("/ping")
def ping():
    return "pong"

The /ping route is primarily used for health checks by our load balancer. To start the server, we can use utilities like uwsgi or gunicorn. I’ve opted for uwsgi because I’m more familiar with it.


We’re leveraging uwsgi over a standalone Flask app for a specific reason: concurrency. By default, a Flask application cannot process multiple requests simultaneously—it’s single-threaded. This limitation arises from Python’s Global Interpreter Lock (GIL), which prevents more than one thread from executing Python bytecode at a given moment. To manage concurrent requests effectively, we must delegate them to separate processes. uwsgi adeptly manages this process splitting, along with the coordination and routing of incoming requests to the server.

uwsgi --http :5000 --wsgi-file server.py --callable app -b 21573 --processes 5 --threads 5

My dockerfile for this app looks like this, but you can write your own based on your needs and framework you are using:

FROM python:slim-bullseye

RUN apt-get update && apt-get -y install build-essential
RUN pip install Flask uwsgi
WORKDIR /code/
COPY ./server.py .

CMD uwsgi --http :5000 --wsgi-file server.py --callable app -b 21573 --processes 5 --threads 5

Now that we have a Dockerfile for all of the apps, we can write a compose file to control containers and their config. My compose.yaml file looks like this:

services:
  flask-app:
    build: .
    ports:
      - "8000-8004:5000"

Nothing too special. Just giving the container a range of ports they can use. Then, to start the servers up:

docker compose up --scale flask-app=5

On the first execution, Docker will build the images. For subsequent runs, Docker will utilize its cache, ensuring a faster startup. If there are modifications to your Dockerfile, Docker recognizes the changes and invalidates the cache for the affected layer and all subsequent layers. However, even with these changes, Docker won’t automatically rebuild the images. You must use the --build flag with the docker compose command to ensure the images are rebuilt to incorporate any modifications.


Once set up, this configuration will spin up 5 server instances. Port forwarding is configured from ports 8000-8004 on the host machine to port 5000 within each container. Hence, any traffic directed to ports 8000-8004 on the host will be routed as if originating from port 5000 inside the corresponding container.


To test the setup, you can use the curl command targeting any of the ports with the endpoint /ping on localhost. For instance, curl localhost:8000/ping should return a “pong”. With the setup complete, we can delve deeper into the core functionality.


Writing GoLoadBalance:



Before anything else, we should start up a http server which can listen to incoming client requests. We can do this via using net/http module provided by Go’s standard library.

import "net/http"

...

  http.Handle(
    "/",
    func(w http.ResponseWriter, r *http.Request) {
      w.Header().Set("Content-Type", "application/json")
      encodedJson, err := json.Marshal("{\"hello\":\"world\"}")
      if err != nil {
        log.Println(err)
      }
      w.Write(encodedJson)
    },
  )

...

Once you compile and run the binary, you should be able to receive client requests on the specified port. The next step involves connecting to the servers, which is straightforward. We can obtain the server URLs from the command-line interface (CLI). Fortunately, Go’s standard library allows us to parse these URLs without the need for any external packages!

import "flag"

...

  serverUrl:= flag.string("serverUrls", "", "URLs of the servers.");
  flags.parse();

...

Now that, we have server URLs. We can start by defining a couple of structs. A loadbalancer is a entity which has multiple servers in its server pool. So, the loadbalancer’s struct could start off by looking like something like this:

type Loadbalancer struct {
  ServerPool []*Server
}

And, our server struct could look like this:

type Server struct {
  url   *url.Url
  Alive bool
}

Once we receive the server URLs via the CLI arguments, we can start by parsing each URL. Once parsed, we create the corresponding Server objects. These objects are then passed by reference to a Loadbalancer object responsible for managing them. Using these structs we can start our first feature! The Health Checks.


Health Checks:


All we need to do is create a new function like this for the Server struct:

func (server *Server) healthCheck(duration time.Duration) {
	for {
		server.Alive = isAlive
		time.Sleep(duration)
	}
}

Now, we can pass in durations for how often we want to do the health checks. And, when we initialize a new server, we need to call it and run it inside a go routine (because we want this to be a background task):

func InitServer(url *url.URL) *Server {
	server := Server{
		Url: url,
	}

	go server.healthCheck(time.Second * 5)
	return &server
}

Handing requests to the servers:


Now, that we have a couple of things working, we can start by passing requests to one of the servers and see if it responds correctly. In order to do that, we need to initialize the loadbalancer with a range of servers so, it can register it in it’s server pool. The way I did it was like this (simplified):

func InitLoadBalancer(servers []*Server) *LoadBalancer {
	return &LoadBalancer{
		Servers:  servers,
	}
}

Now that, our loadbalancer holds references to a range of servers, we can actually do some loadbalancing. The way I implemented this was using a interfaces. I defined an interface called balancer which is a balacing algorithm like this:

type Balancer interface {
	GetServer(servers []*Server) *Server
}

You pass in your pool of servers and you get back a pointer to the single server the balancer chooses to route the request to. Since, this is only an interface, we need to implement a concrete type that satisifes this interface so, I started with the simplest one, Round Robin:

func InitRoundRobin(serverPoolSize int) Balancer {
	// Initialize ring buffer so we can cycle through
	// the servers in a round robin without needing to
	// keep a index.
	ringBuffer := initRingBuffer(serverPoolSize)

	return &RoundRobin{
		ringBuffer: ringBuffer,
		mutex:      &sync.Mutex{},
	}
}

func initRingBuffer(size int) *ring.Ring {
	ringBuffer := ring.New(size)

	// Initialize the values of the ring buffer
	// to it's index.
	for i := 0; i < size; i++ {
		ringBuffer.Value = i
		ringBuffer = ringBuffer.Next()
	}

	return ringBuffer
}

From the code block above, we can observe a couple of significant components: a ring buffer and a mutex. I’ll dive deeper into the mutex shortly, explaining its purpose in this context. The other notable element is the ring buffer.


In the round robin approach, our goal is to cycle through each server, ensuring that incoming requests are distributed evenly across all servers in the pool. This is where the ring buffer, sometimes referred to as a circular buffer, comes into play. But what exactly is a ring buffer? It’s essentially a linked list where the tail connects back to the head, allowing continuous traversal. There’s no need for intricate modulus operations, making it perfectly suited for our load-balancing algorithm. This setup allows us to implement a straightforward approach like:

	serverToUse := ringBuffer.Value
	ringBuffer = ringBuffer.Next()

To get the next server without worring about hitting the end of the list. Now we can make this a part of the loadbalancer struct, so, our loadbalancer struct looks like this now:

type Loadbalancer struct {
  ServerPool []*Server
  balancer   *Balancer
}

Now moving on to implementing the GetServer function to make our RoundRobin struct to adhere to the Balancer interface.

func (balancer *RoundRobin) GetServer(servers []*Server) *Server {
	// Round robin works by distributing the load equally to incoming requests
	// hence here we just get the value of the current value in the index of the
	// ring buffer select the server we want to supply the load to and then return
	// it

	// Accessing the ring buffer is not go routine safe so we need a mutex
	balancer.mutex.Lock()
	defer balancer.mutex.Unlock()

	currentIndex := balancer.ringBuffer.Value
	balancer.ringBuffer = balancer.ringBuffer.Next()
	return servers[currentIndex.(int)]
}

We have several critical components in play here. Let’s break them down.


Mutex Usage: The need for a mutex arises because of a potential race condition. When multiple go routines request a server simultaneously, there’s a chance their execution paths could intersect at the same function. If the CPU context then switches to another thread, updating the ringBuffer value, another go routine might retain an outdated server index. This leads to the erroneous scenario where we update the ringBuffer’s index twice while not sending a request to one of the servers.


RWMutex Limitation: Since we’re both reading and updating the value concurrently, employing a RWMutex (Read-Write Mutex) isn’t feasible. We’re left to work with a standard mutex. While it’s effective at preventing data races, this solution does present performance setbacks. With the mutex in place, this segment of code runs in a single-threaded manner. Having deepened my understanding of Go, I’ve recognized that a buffered channel would be a suitable alternative for this scenario, allowing go routines to determine the appropriate server for request handling. I intend to incorporate this solution in the upcoming version of GoLoadBalance.


Round Robin Implementation: By designing our RoundRobin struct to adhere to the Balancer interface, it offers versatility. It can be utilized as a balancer in various contexts. Furthermore, any subsequent load-balancing algorithms we develop can be integrated seamlessly, provided they align with this interface. This modular approach enhances our system’s flexibility which is super nice!


Now that we have a server, it’s very easy to forward our request to the server. Just copy our request from client, update the headers to say where it is going and then forward it. For that we need to add one more field into our struct for the server which is a field called client.

type Server struct {
  url     *url.Url
  client  *http.Client
}

This new field stores a reference to an http.Client object. Utilizing this object allows us to send requests to other hosts in the server pool. Why add it into the struct? This design choice prevents us from continuously re-initializing the http.Client object, offering a minor optimization by maintaining a persistent object. We can then employ this object to dispatch the request:

func (server *Server) HandleRequest(responseWriter http.ResponseWriter, request *http.Request) (*http.Response, *string, error) {
	// We don't need to keep the old host intact as that host is us
	// We remove the old host and reuse the same request by changing the host to the server.
	request.URL.Host = server.Url.Host
	request.URL.Scheme = server.Url.Scheme
	// RequestURI needs to be empty for this to be a client request.
	request.RequestURI = ""

	res, err := server.client.Do(request)

	if err != nil {
		log.Println("Error: ", err)
		responseWriter.WriteHeader(http.StatusInternalServerError)
		return nil, nil, err
	}

	// Reset the Headers properly before relaying the response back
	responseWriter.Header().Set("Content-Length", res.Header.Get("Content-Length"))
	responseWriter.Header().Set("Content-Type", res.Header.Get("Content-Type"))
	responseWriter.Header().Set("Status", res.Status)
	// Body is a stream we read from the stream into two and write it to the reader we get back
	// from tee and then write it to the responseWriter as well.
	teeReader := io.TeeReader(res.Body, responseWriter)
	body, err := io.ReadAll(teeReader)

	if err != nil {
		log.Println("Error while trying to copy body for caching")
	}

	res.Body.Close()
	stringifiedBody := string(body)
	return res, &stringifiedBody, nil
}

A lot is happening in this snippet:


  • Adjusting Request Headers: As previously mentioned, we modify the request headers.

  • Altering the Request Object: We must clear the RequestURI field before sending the request. This field cannot be defined when sending a request using the http.Client object, so we set it to an empty string.

  • Sending the Request: We then send the request using the http.Client object referenced in our server struct.

  • Processing the Response: Once we receive a response, we set essential attributes like content length, response content type, and status so that we can forward it back to the client.

  • Forwarding the Response: We write the response body to the client. We use TeeReader instead of writing the response body directly. This is because we aim to preserve the body for caching purposes, a feature we’ll will be implementing later in this blog.


Now, we have a basic setup of a loadbalancer working! Exciting! You can set up logs when a client request comes in and see which server the request is going to like this:

func (server *Server) HandleRequest(responseWriter http.ResponseWriter, request *http.Request) (*http.Response, *string, error) {
	log.Printf("Forwarding Request to path %s using host %s", request.URL.Path, server.Url.Host)

Now all you need to do is glue it together by adding a function in loadbalancer struct to handle incoming requests and then also attach it to the http server we are running:

func (loadBalancer *LoadBalancer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
	server := (*loadBalancer.balancer).GetServer(loadBalancer.Servers)
	res, body, err := server.HandleRequest(responseWriter, request)
	...
}

And, then also attach it to the http server running:

http.Handle(
    "/",
    loadBalancer,
)

Since our loadbalancer struct satisfies the interface for http.Handler as we implement ServeHTTP, we can just directly use the struct here which is super nice! Great! Now we have it working! You should be able to see the requests going to the hosts as you ping your loadbalancer with requests:

❯ go run -race . --config=./example.yaml
2023/09/09 21:03:11 Forwarding Request to path /hello_world using host localhost:8000
2023/09/09 21:03:12 Forwarding Request to path /hello_world using host localhost:8001

With that setup we can implement a lot of features which is the fun part!


Writing fun features for Goloadbalance:



Caching:


Caching offers a powerful means to optimize the performance of our load balancer. At the heart of its significance is the resource-intensive nature of HTTP requests. Each time a request is made to a server, a sequence of events is triggered: an establishment of a TCP connection, a TCP handshake, bit encoding, and potential network latencies. Subsequently, the receiving server decodes this data, crafts a response, and transmits it back.


Consider a simple scenario: a user requests a page from the server and, for whatever reason, refreshes the page multiple times within just 2 seconds. In many cases, the underlying data the page pulls from remains unchanged, yet with each refresh, the full spectrum of steps mentioned earlier gets triggered. It’s both resource-intensive and unnecessary.


By integrating a caching mechanism, we can address this inefficiency. Instead of repetitively sending requests to the servers in our pool for identical or near-identical data, we can retrieve the information from the cache. For my load balancer, I’ve implemented this by hashing specific segments of the request header, allowing for rapid and efficient data retrieval.


For this we will be using sync module from the Golang’ standard library. This provides us with a very nice data structure which is the hashmap. But just not any hashmap, a go routine safe hash map. Using this, multiple go routines can update/read from the hashmap without needing any locking required from us. It is all safely handled by the map. Hence, we don’t need to worry about any data race conditions with this.


To start, let’s define a Cache struct and a initializer function:

type Cache struct {
	values *sync.Map
	hasher *hash.Hash
}

func InitCache() *Cache {
	hasher := sha256.New()
	cache := Cache{
		values: &sync.Map{},
		hasher: &hasher,
	}

	return &cache
}

Additionally, we require utility functions to manage values within the cache. These functions will handle the saving, checking, and removal of cached responses.

func (cache *Cache) save(request *http.Request, body *string, response *http.Response, cacheTimeoutInSeconds int) {
	responseAndBody := make(map[string]any)
	responseAndBody["body"] = body
	responseAndBody["response"] = response
	hashedKey := cache.hash(request)
	cache.values.Store(hashedKey, &responseAndBody)
	go cache.keepHashedKeyAliveFor(hashedKey, time.Second*time.Duration(cacheTimeoutInSeconds))
}

func (cache *Cache) check(request *http.Request) *map[string]any {
	if val, ok := cache.values.Load(cache.hash(request)); ok {
		return val.(*map[string]any)
	}

	return nil
}


func (cache *Cache) keepHashedKeyAliveFor(hashKey string, duration time.Duration) {
	time.Sleep(duration)
	cache.values.Delete(hashKey)
}

Now, we need a way to hash some key parts of the request header so, we use the hasher defined in our Cache struct to generate a deterministic hash for a request using the Method + Auth + IP + Real IP + Path:

// Basic Hash function to generate a hash based on
// Method + Auth + IP + Real IP + Path
func (cache *Cache) hash(request *http.Request) string {
	hasher := *(cache.hasher)
	defer hasher.Reset()
	body, err := io.ReadAll(request.Body)
	if err != nil {
		log.Print("Error while reading request body")
	}

	auth := request.Header.Get("Authorization")
	path := request.URL.Path
	method := request.Method
	realIp := request.Header.Get("X-Real-Ip")
	xForwardedFor := request.Header.Get("X-Forwarded-For")

	hasher.Write([]byte(method + path + realIp + xForwardedFor + auth + string(body)))
	return hex.EncodeToString(hasher.Sum(nil))
}

Now, let’s attach this struct to our loadbalancer, it should look like this now:

type LoadBalancer struct {
	Servers               []*Server
	balancer              *Balancer
	cache                 *Cache
	cacheTimeoutInSeconds int
}

We define two new fields, cache to hold a reference to a Cache object and cache timeout which the user should be able to pass in the config or through the CLI.


Now, on every request to one the servers, we want to save the body of the response like this in our loadbalancer. Remember when we were implementing the HandleRequest function for the server and we used a TeeReader to write the body to two different streams. Since, one of the stream is written to the response body, this is where we use the the other stream for caching. So in our ServeHTTP we need to update it to save the response in the cache.

func (loadBalancer *LoadBalancer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
...
	// Handle caching on a different goroutine so that we can return the response
	go loadBalancer.cache.save(request, body, res, loadBalancer.cacheTimeoutInSeconds)
...
}

Now that we have that, on each request we want to start checking the hashmap for cached results. And at any point of time if a result is in the hashmap, it means that it hasn’t expired.

func (loadBalancer *LoadBalancer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
...
	if cachedMap := loadBalancer.cache.check(request); cachedMap != nil {
		cachedResponse := (*cachedMap)["response"].(*http.Response)
		cachedBody := (*cachedMap)["body"].(*string)

		if cachedResponse != nil {
			log.Printf("Cache Hit: Found for route %s", request.URL.Path)

			// Reset the Headers properly before relaying the response back
			responseWriter.Header().Set("Content-Length", cachedResponse.Header.Get("Content-Length"))
			responseWriter.Header().Set("Content-Type", cachedResponse.Header.Get("Content-Type"))
			responseWriter.Header().Set("Status", cachedResponse.Status)
			io.WriteString(responseWriter, *cachedBody)
			cachedResponse.Body.Close()
			return
		}
	}
...
}

And there we go! We have caching implemented for our loadbalancer now!


Blacklisting IPs:


There might be malicious IPs that we don’t want to serve requests to. Hence, we add this feature so that the user can specify them and the requests coming from such IPs won’t be server. This feature is pretty easy.

Firstly, we should start storing our config on your loadbalancer struct through which we can check values passed by the developer to GoLoadbalance. Parsing and creating a config struct should be trivial, so I won’t be going over it here.

The updated loadbalancer struct should look like this:

type LoadBalancer struct {
	Servers               []*Server
	balancer              *Balancer
	config                *configuration.Config
	cache                 *Cache
	cacheTimeoutInSeconds int
}

On receiving the client request, we first check their IP is in the list of IPs that the developer marked as black listed in their config like this:

func (loadBalancer *LoadBalancer) ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) {
...
	// Don't process request from blocked ips
	clientIp := net.ParseIP(strings.Split(request.RemoteAddr, ":")[0])
	for _, ip := range loadBalancer.config.BlacklistedIps {
		if ip.Equal(clientIp) {
			responseWriter.WriteHeader(http.StatusUnauthorized)
			responseWriter.Write([]byte("Unauthorized request."))
			return
		}
	}
...

And voila! The IPs don’t get their requests served now! This is a easy but nice win!


Retries, webhooks, adding/removing server from the server pool and runtime metrics:


Retries in this context mean the retries for health checks. When a health check has been failing for a while we want to do 3 things:


  • Keep track of how many retries to the same server have we done.

  • Send a webhook event to the developer team’s server to notify that one of the servers in the server pool has died.

  • Remove the server. (In addition to this, we also will implement adding/removing a server at runtime.)

To keep track of the number of retries for a health check we have done for a server we can update the Server struct like this:

type Server struct {
	Url               *url.URL
	Alive             bool
	client            *http.Client
	isDeadChannel     *(chan bool)
	retryCount        int32
	ActiveConnections int32
	mutex             *sync.RWMutex
	config            *configuration.Config
}

We add 5 new fields, retryCount, ActiveConnections, mutex, isDeadChannel and config. retryCount keep tracks of how many pings we have failed. ActiveConnections is the number of connections we have running right now. config is the developer config we have gotten through a config file or CLI. mutex here is a read write mutex we will use to tame some datarace conditions. isDeadChannel is a channel used to notify the receiver a server has died.

Then, on each health check fail we can increase the retryCount by 1. Once it hits a certain limit defined by the developer config, it will be removed from the server pool. So we first update our healthCheck function:

func (server *Server) healthCheck(duration time.Duration, maxRetryCount int) {
	for {
		if int(server.retryCount) == maxRetryCount {
			log.Printf("Removing server at host: %s after %d retries", server.Url.Host, maxRetryCount)

			close(*server.isDeadChannel)

			return
		}
		isAlive := server.ping()
		server.mutex.Lock()
		server.Alive = isAlive
		server.mutex.Unlock()
		if !isAlive {
			atomic.AddInt32(&server.retryCount, 1)
		} else {
			atomic.StoreInt32(&server.retryCount, 0)
		}
		time.Sleep(duration)
	}
}

There are a couple of things happening here:


  • Retry Check: We first check whether we’ve reached the maximum number of retries. If we have, we close a channel to notify that the server is dead and unresponsive.
  • Concurrency Control with Mutex: We use a mutex, specifically an RWMutex, to lock the server.Alive method. This ensures that there are no race conditions. We utilize a RWMutex here because we read often but only change the server.Alive once in a while.
  • Atomic Operations with Int32: We employ an int32 data type to store our retryCount and ActiveConnections. We increment and set these values using the standard library’s sync/atomic module. This allows us to atomically set or increment the value of a memory address that stores an int32. When we mention updating a value stored in memory atomically, it means another thread can’t see its intermediary value. It gives the illusion that the operation is executed in a single CPU instruction. However, it’s essential to note that while some atomic operations might seem to be executed in a single step, they can, in reality, be compound atomic operations that consist of multiple instructions.

With the server struct now prepared for removal, our next step is to update our load balancer to stop sending request to this server and remove it from the server pool. The load balancer should listen to the isDeadChannel and then gracefully shut down the non-responsive server.

type LoadBalancer struct {
	Servers               []*Server
	balancer              *Balancer
	config                *configuration.Config
	cache                 *Cache
	cacheTimeoutInSeconds int
	mutex                 *sync.RWMutex
}

func InitLoadBalancer(servers []*Server, balancer *Balancer, config *configuration.Config) *LoadBalancer {
	loadbalancer := &LoadBalancer{
		Servers:  servers,
		balancer: balancer,
		mutex:    &sync.RWMutex{},
		config:   config,
	}

	...

	for _, server := range servers {
		// Listen for dead servers on a different go routine
		go loadbalancer.listenForDeadServer(server)
	}

	return loadbalancer
}

func (loadBalancer *LoadBalancer) listenForDeadServer(server *Server) {
	for {
		select {
		case <-*server.isDeadChannel:
			loadBalancer.gracefullyShutdownServer(server)
			// Once the server is shutdown we release the go routine
			return
		}
	}
}

Additionally, we need some changes to the balancer struct before we can actually remove the server from the server pool.

type Balancer interface {
	GetServer(servers []*Server) *Server
	ServerDead()
	ServerAdd()
}

I have added the event for ServerDead and ServerAdd because we are going to be implementing both removing and adding servers at runtime. But, right now we only need ServerDead.

In our RoundRobin struct, we need to also update it to handle these events.

func (balancer *RoundRobin) ServerDead() {
	balancer.mutex.Lock()
	defer balancer.mutex.Unlock()

	// Once server died so ring buffer needs to go down by one.
	balancer.ringBuffer = initRingBuffer(balancer.ringBuffer.Len() - 1)
}

func (balancer *RoundRobin) ServerAdd() {
	balancer.mutex.Lock()
	defer balancer.mutex.Unlock()

	// Once server died so ring buffer needs to go down by one.
	balancer.ringBuffer = initRingBuffer(balancer.ringBuffer.Len() + 1)
}

We just reinitialize a new ring buffer and update the number of nodes in the circular buffer on each case.

Then we finally implement gracefully shutting down the server.

func (loadBalancer *LoadBalancer) gracefullyShutdownServer(deadServer *Server) {
	// First, we let our balancing algorithm know that a server has died and it should
	// clean up.
	(*loadBalancer.balancer).ServerDead()

	loadBalancer.mutex.Lock()
	// Second, We take it out of the server pool. For this we need to lock the resource.
	// This shouldn't affect performance as I would assume this shouldn't happen often
	for idx, server := range loadBalancer.Servers {
		// Check if mem addresses match
		if server == deadServer {
			// If the only active server dies, we quit as well.
			if len(loadBalancer.Servers) == 1 {
				log.Fatal("Died because no other servers alive.")
			}

			// Replace the current index with the mem address for the last server
			loadBalancer.Servers[idx] = loadBalancer.Servers[len(loadBalancer.Servers)-1]
			// Remove the copied last server mem address from the server
			loadBalancer.Servers = loadBalancer.Servers[:len(loadBalancer.Servers)-1]
		}
	}

	// We want to close this right away becuase if we defer it, it may get caught in between the time.Sleep
	// below.
	loadBalancer.mutex.Unlock()

	// Once taken out of the server pool. We make sure that all it's active connections
	// finish before we clear out the resource and since this is already running on another
	// go routine we can just poll and wait for the active connections to go down to 0.
	for atomic.LoadInt32(&deadServer.ActiveConnections) != 0 {
		time.Sleep(time.Second)
	}

	// Send the webhook event and let this go routine go
	statuses, err := json.Marshal(loadBalancer.GetServersStatus())

	if err != nil {
		log.Printf("Error while getting server statuses while sending webhook event to notify a server died.")
	}

	_, err = http.Post(loadBalancer.config.OnServerDeadWebhook.String(), "application/json", bytes.NewReader(statuses))

	if err != nil {
		log.Printf("Error while sending a http request to server because a server died: ", err)
	} else {
		log.Printf("Sent webhook event to %s because a server died.", loadBalancer.config.OnServerDeadWebhook)
	}

	// Now the server should go out of scope and cleaned up by the GC.
}

Okay, this is massive function so, we need to unravel this step by step:


  • We notify our balancing algorithm that a server has died and ask it to clean up its resources. Since our balancing algorithm is agnostic to the specific balancing algorithm used, we don’t pass in the server that died. This might change in the future with the introduction of new balancing algorithms. So, for now, we just send that event to our balancing algorithm.

  • Next, we lock the resources inside the load balancer using the mutex we added. This is an RWMutex because we read a lot but only update the resource every once in a while.

  • We then locate the server we want to remove from the server pool array, remove it and then release the lock on the load balancer. If there is only one server left, we kill the loadbalancer as well as there is no servers we can send requests to.

  • However, doing that isn’t enough. The server might still have active connections. We don’t want to abruptly terminate all the connections, as it might not be accepting new ones, but the older connections could still be functional. Hence, we wait for the number of active connections to reduce to 0. Only then can we actually remove the server. So, we continuously poll and sleep until the server’s active connections drop to 0.

  • Afterward, we send a webhook event with server information.

  • Finally, the server goes out of scope and is cleaned up by the Garbage collector.

I had so much fun writing this function, as it required considerable thought and proved to be a rewarding experience! Now, let’s discuss adding and removing servers at runtime. We can reuse the existing function to remove a server at runtime, but we’ll need to develop a new one to add a server to the pool.


The rationale behind this feature is that if you’re aware of a host’s failure and wish to eliminate it from the server pool, or if you aim to make updates to the server without interrupting client requests, you can simply remove that server. On the other hand, the reasoning for adding is twofold: either there’s an influx in client requests necessitating an additional server or, if a host fails, you’d want to replace it.

func (loadBalancer *LoadBalancer) AddServer(host string) error {
	parsedHostUrl, err := url.Parse(host)

	if err != nil {
		log.Printf("Can't parse host url %s", host)
		return err
	}

	_, err = loadBalancer.FindServer(parsedHostUrl)

	if err == nil {
		return &ServerExists{}
	}

	server := InitServer(parsedHostUrl, loadBalancer.config)

	loadBalancer.mutex.Lock()

	loadBalancer.Servers = append(loadBalancer.Servers, server)

	loadBalancer.mutex.Unlock()

	// Make sure we let our balancer know that we have a new server
	(*loadBalancer.balancer).ServerAdd()

	return nil
}

To add a server, it is still a complex process:


  • First, we parse and make sure the URL the client passed us is a valid URL. If not, we error and return.

  • After that, we must check that it isn’t already in our server pool. If it does, we error again.

  • Next, we lock our resources in the loadbalancer and update the server pool. And once, that is done, we notify our balancing algorithm that there is a new server.


Now, we just need to make sure that the endpoint for this is available in the load balancer’s HTTP Server:

	// Utility Function to add a server at runtime
	http.HandleFunc(
		"/goloadbalance/add_server",
		func(w http.ResponseWriter, r *http.Request) {
			...

			err = loadBalancer.AddServer(body.Host)

			if err != nil {
				w.WriteHeader(http.StatusBadRequest)
				w.Write([]byte(err.Error()))
			}

			log.Printf("Added %s to server pool", body.Host)
		},
	)

	// Utility Handler to check which hosts are alive
	http.HandleFunc(
		"/goloadbalance/remove_server",
		func(w http.ResponseWriter, r *http.Request) {
			...

			parsedHostUrl, err := url.Parse(body.Host)

			if err != nil {
				log.Printf("Can't parse host url %s", body.Host)
			}

			server, err := loadBalancer.FindServer(parsedHostUrl)

			if err != nil {
				log.Printf(err.Error())
				w.WriteHeader(http.StatusBadRequest)
				w.Write([]byte(err.Error()))
				return
			}

			log.Printf("Removing %s from the server pool", body.Host)
			loadBalancer.gracefullyShutdownServer(server)
		},
	)

The structure of the request body for adding and removing servers can be tailored as the user sees fit, and I have abstracted that aspect. With this, we should have an endpoint available in the /goloadbalance path to add and remove a server!


Performance against Nginx:



I guess we all know that we are going to fail brutally against a battle-tested loadbalancer like Nginx. But it’s still fun to test the performance against something real and used in production. For this, I will be using the nice CLI tool called ab which is an apache cli tool for benchmarking HTTP servers. In our case the HTTP servers are going to be our loadbalancers. Goloadbalance is already set up ith the configuration. But we need to set up Nginx.

To start, we need the nginx image:

docker pull nginx:latest

Just start it up using:

docker run -p 8000:8000 --it --entrypoint bash nginx:latest

Please, don’t forget to add them to the same docker network as your composed containers because without it, the nginx loadbalancer won’t be able to find the container hosts as by default docker compose creates a new network. Firstly, find the network for your docker compose containers using:

docker network ls

And its output should look like this:

NETWORK_ID     NAME                         DRIVER    SCOPE
1b5f83fe9ec6   bridge                       bridge    local
75ae66f53850   flask-apps_default           bridge    local

In my case the network for the composed containers is the flask-apps_default network. You can inspect the network using docker network inspect ${network_id} and you should be able to see which hosts are in the network.

[
	{
		...
		"Containers": {
			"1e9f166739b394377fbcbea0b1fe6f6cffa13eac412067e69cfedb157bafccec": {
				"Name": "flask-apps-flask-app-1",
				"EndpointID": "451f159fb8eca0d6d6c22a8bd2f6092249859c92be348f679a6ee4cf44012859",
				"MacAddress": "02:42:ac:15:00:05",
				"IPv4Address": "172.21.0.5/16",
				"IPv6Address": ""
			},
			"636f384de8c59a32874336f751cfbbef490d82ab336d5a9f272432f430d29830": {
				"Name": "flask-apps-flask-app-5",
				"EndpointID": "f78f87cd84e0c85dfc052ab9549432993ef1be5c042136a291f5fccddd8920e7",
				"MacAddress": "02:42:ac:15:00:02",
				"IPv4Address": "172.21.0.2/16",
				"IPv6Address": ""
			},
			"9f4ce4f4f39fb8f9a89c2a0ec07e5148a0d8d0d495cd2c7e395a16507dcd8ff6": {
				"Name": "flask-apps-flask-app-2",
				"EndpointID": "bf3fcb59033ecc0445fa1082e6b1f849a38e57b19fadba520208007731454a81",
				"MacAddress": "02:42:ac:15:00:04",
				"IPv4Address": "172.21.0.4/16",
				"IPv6Address": ""
			},
			"d5e30eb97dbebc6c7561afbe3946041e3a580bfd87752753c6451f0a1f1355a7": {
				"Name": "flask-apps-flask-app-3",
				"EndpointID": "f230d22c74856caf28a1177201c22f074f3fdbd5e5e8f76efd83609c89fbf4cc",
				"MacAddress": "02:42:ac:15:00:03",
				"IPv4Address": "172.21.0.3/16",
				"IPv6Address": ""
			}
		},
		...
	}
]

In our case, we can see all the flask app containers running inside the network flask-apps_default. So we need to edit our run command to add our nginx container to that network:

docker run -p 8000:8000 --it --entrypoint --network=flask-apps_default bash nginx:latest

Now when you run the docker network inspect ${network_id}, you should see your container in there:

"Containers": {
	"1e9f166739b394377fbcbea0b1fe6f6cffa13eac412067e69cfedb157bafccec": {
		"Name": "flask-apps-flask-app-1",
		"EndpointID": "451f159fb8eca0d6d6c22a8bd2f6092249859c92be348f679a6ee4cf44012859",
		"MacAddress": "02:42:ac:15:00:05",
		"IPv4Address": "172.21.0.5/16",
		"IPv6Address": ""
	},
	"636f384de8c59a32874336f751cfbbef490d82ab336d5a9f272432f430d29830": {
		"Name": "flask-apps-flask-app-5",
		"EndpointID": "f78f87cd84e0c85dfc052ab9549432993ef1be5c042136a291f5fccddd8920e7",
		"MacAddress": "02:42:ac:15:00:02",
		"IPv4Address": "172.21.0.2/16",
		"IPv6Address": ""
	},
	"9f4ce4f4f39fb8f9a89c2a0ec07e5148a0d8d0d495cd2c7e395a16507dcd8ff6": {
		"Name": "flask-apps-flask-app-2",
		"EndpointID": "bf3fcb59033ecc0445fa1082e6b1f849a38e57b19fadba520208007731454a81",
		"MacAddress": "02:42:ac:15:00:04",
		"IPv4Address": "172.21.0.4/16",
		"IPv6Address": ""
	},
	"d5e30eb97dbebc6c7561afbe3946041e3a580bfd87752753c6451f0a1f1355a7": {
		"Name": "flask-apps-flask-app-3",
		"EndpointID": "f230d22c74856caf28a1177201c22f074f3fdbd5e5e8f76efd83609c89fbf4cc",
		"MacAddress": "02:42:ac:15:00:03",
		"IPv4Address": "172.21.0.3/16",
		"IPv6Address": ""
	},
	"dee5422db086e8a0061a0c5af7b7f3a82a2209c32295230c52b58fad9c12f6b1": {
		"Name": "jolly_pare",
		"EndpointID": "dedc4ef02516d43cb6f292a10eac0b0f1392d4e4d6b8b012954a8ed71b0ace08",
		"MacAddress": "02:42:ac:15:00:06",
		"IPv4Address": "172.21.0.6/16",
		"IPv6Address": ""
	}
}

Another way to test this is through pinging another container in the same network from your newly added current container.

ping 172.21.0.3

If you can’t find ping, you can run this to install it apt update && apt install -y iputils-ping. And it should output like this:

root@dee5422db086:/# ping 172.21.0.3
PING 172.21.0.3 (172.21.0.3) 56(84) bytes of data.
64 bytes from 172.21.0.3: icmp_seq=1 ttl=64 time=0.076 ms
64 bytes from 172.21.0.3: icmp_seq=2 ttl=64 time=0.054 ms
64 bytes from 172.21.0.3: icmp_seq=3 ttl=64 time=0.054 ms
64 bytes from 172.21.0.3: icmp_seq=4 ttl=64 time=0.061 ms
64 bytes from 172.21.0.3: icmp_seq=5 ttl=64 time=0.063 ms
64 bytes from 172.21.0.3: icmp_seq=6 ttl=64 time=0.052 ms
64 bytes from 172.21.0.3: icmp_seq=7 ttl=64 time=0.056 ms
64 bytes from 172.21.0.3: icmp_seq=8 ttl=64 time=0.051 ms
64 bytes from 172.21.0.3: icmp_seq=9 ttl=64 time=0.062 ms
64 bytes from 172.21.0.3: icmp_seq=10 ttl=64 time=0.053 ms
64 bytes from 172.21.0.3: icmp_seq=11 ttl=64 time=0.053 ms
^C
--- 172.21.0.3 ping statistics ---
11 packets transmitted, 11 received, 0% packet loss, time 10244ms
rtt min/avg/max/mdev = 0.051/0.057/0.076/0.007 ms

After all the setup and the final docker run you should have a bash shell starting up. We do it this way instead of creating a Dockerfile because we are basically writing the config inside the container. So, to start off you should have an interactive shell session inside the container because of -it flag as well as our entrypoint being specified to bash. We have access to our required hosts because of the setup we did before this. Now, we just need to write the config for it. Navigate to /etc/nginx. You should find some config files there. The one we care about is nginx.conf. You can start editing it using vim nginx.conf. If your container doesn’t have vim installed, run apt update && apt install -y vim . This will update your local package index files and then when we run apt install -y vim we will be able to find the vim binary. After this, all we need to do is edit the nginx.conf file like this:

user  nginx;
worker_processes  auto;

error_log  /var/log/nginx/error.log notice;
pid        /var/run/nginx.pid;


events {
    worker_connections  1024;
}


http {
    include       /etc/nginx/mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    keepalive_timeout  65;

    #gzip  on;

    upstream backend_hosts {
		server flask-apps-flask-app-1:5000;
		server flask-apps-flask-app-2:5000;
		server flask-apps-flask-app-3:5000;
		server flask-apps-flask-app-5:5000;
    }

    server {
		listen 8000;

		location / {
			proxy_pass http://backend_hosts;
		}
    }
}

Great! Now nginx is fully set up. By default, it uses round robin as the load balancing algorithm. Just change the server hostname to match your container’s hostname and you are all set to go! To start it run nginx on your shell. And it should start up as expected.

root@dee5422db086:/etc/nginx# nginx
2023/09/09 02:19:49 [notice] 320#320: using the "epoll" event method
2023/09/09 02:19:49 [notice] 320#320: nginx/1.25.2
2023/09/09 02:19:49 [notice] 320#320: built by gcc 12.2.0 (Debian 12.2.0-14)
2023/09/09 02:19:49 [notice] 320#320: OS: Linux 5.19.0-46-generic
2023/09/09 02:19:49 [notice] 320#320: getrlimit(RLIMIT_NOFILE): 1048576:1048576
2023/09/09 02:19:49 [notice] 321#321: start worker processes
2023/09/09 02:19:49 [notice] 321#321: start worker process 322
2023/09/09 02:19:49 [notice] 321#321: start worker process 323
root@dee5422db086:/etc/nginx# 2023/09/09 02:19:49 [notice] 321#321: start worker process 324
2023/09/09 02:19:49 [notice] 321#321: start worker process 325
2023/09/09 02:19:49 [notice] 321#321: start worker process 326
2023/09/09 02:19:49 [notice] 321#321: start worker process 327
2023/09/09 02:19:49 [notice] 321#321: start worker process 328
2023/09/09 02:19:49 [notice] 321#321: start worker process 329
2023/09/09 02:19:49 [notice] 321#321: start worker process 330
2023/09/09 02:19:49 [notice] 321#321: start worker process 331
2023/09/09 02:19:49 [notice] 321#321: start worker process 332
2023/09/09 02:19:49 [notice] 321#321: start worker process 333

After this is done, send a request to your nginx server and you should get a response!

Now for the actual performance testing. We are going to test using ab and check how many requests we can server each time and caching is disabled for both balancers. The basic command structure is this:

ab -c ${number_of_concurrent_requests_we_want_to_send_at_one_time} -n ${number_of_requests_we_want_to_send} localhost:8000/hello_world

Requests per second:

ConfigNginxGoloadbalance
c = 10, n = 1000116397773
c = 100, n = 100001628410680
c = 1000, n = 1000001436510581
c = 5000, n = 100000102359427
c = 7500, n = 100000102748755
c = 9500, n = 100000115678519

Time per request in ms (mean across all concurrent requests):

ConfigNginxGoloadbalance
c = 10, n = 10000.880.125
c = 100, n = 100000.0620.102
c = 1000, n = 1000000.0670.097
c = 5000, n = 1000000.0960.109
c = 7500, n = 1000000.0950.117
c = 9500, n = 1000000.0980.123

We can see that Goloadbalance performs significantly slower than Nginx which makes sense. Additionally, as the number of concurrent connections increase, Goloadbalance starts to bottleneck on the number of requests it can handle. Whereas, Nginx can handle that sustained load with 9500 concurrent request at a single time without dropping in performance.


I believe the way Nginx can achieve this kind of performance is through managing a thread pool and worker processes. We do implement the same asynchronous/event-driven flow of Nginx but are bottlenecked by how fast we can spawn the Goroutines. Each time we are allocating some memory creating a stack for the Goroutine. While it is cheap, in the bigger picture when we hit a lot of requests, we are spawning 8500 Goroutines. Instead we should buffer these requests to a Goroutine pool. So even if a go routine can’t handle it right now, it will be waiting on a buffered channel until one Goroutine is ready to take it off the buffered channel. This will increase the throughput given we have a buffered channel for the Goroutine to read from but then we would only be bottlenecked by how fast our Goroutines run. There is some performance optimization that we can squeeze out of this process as well. As soon as a request is sent from our one of our Goroutines, we give our Goroutine back to the pool and then when a response comes in one of the Goroutine can take the task to return the response.


Conclusion:



I remember being excited the entire day at work to come back home and work on this project. This was an incredibly fun experience! I enjoyed not only learning about Golang but also honed my concurrent programming skills. Dealing with multiple data races, learning about atomic operations, using mutexes and RWMutexes to control and fix data races across Goroutines has been so fruitful as to understanding how concurrent programming works at scale. For me, this was one of the most fun personal projects I have worked on. Benchmarking and seeing how far off we were was super fun as well! Although, we didn’t perform well, we didn’t exactly crash and burn which means there is potential if we make these small tweaks to have the same throughput as Nginx! Look out for more blog posts coming soon on how I try and make this better and other fun projects I am working on!