Caching in Go - Part 2 - Concurrency Improvements

Monday, Dec 16, 2024 | 10 minute read | Updated at Monday, Dec 16, 2024

@

In the previous post, we discussed the basics of caching in Go and implemented a simple in-memory cache that utilizes a RWMutex to protect concurrent access to the inner map. In this post, we will explore how to further improve the handling of concurrent access to the cache through the use of a concept called “sharding.”

In the implementation of the cache in Part 1 of this series, we used go’s built-in map data structure and a RWMutex to protect concurrent access to the cache. This approach will work well for a moderate number of concurrent requests, but as the number of requests increases, the performance of the cache can degrade. This is because the lock contention can become a bottleneck when the number of concurrent requests becomes elevated.

To address this issue, we can use a technique called “sharding”. Sharding involves dividing the cache into multiple smaller caches, each protected by its own RWMutex. This allows us to distribute the load across multiple caches, reducing the lock contention on a single map and improving the overall performance of the cache.

Let’s take a look at how we can update our current cache implementation to support sharding.

Sharding the Cache

The current implementation of the cache does not need to be modified heavily to support sharding. In fact, it will be used as the data structure for each shard. What we will need to implement is the high level data structure that will hold all the shards, and the logic to distribute the keys across the shards. Considering that the cache implementation is generic and does not mandate a specific data type for the key, other thay it be a comparanaable type, we will need to allow the user to provide a function that can be used to determine the shard for a given key. This is commonly referred to as a “hash function.” The diagram below shows a high level overview of the sharded cache implementation.

Sharded Cache

Let’s first start by defining the data structures. In the current implementation, we have a map that holds the cache data, and a RWMutex to protect concurrent access to the map. It also stored the timer for the cache cleanup process. However, in the sharded implementation, we do not want each shard to have its own timer and cleanup process. That would result in more overhead and would cause contention issues. The point of sharding is to distribute the load across multiple caches in order to reduce the contention on a single cache. If each shard has its own timer and cleanup process, then unless we add more complec logic, the cleanup could trigger at the same time across all shards, thus increasing the probability of contention. To avoid this, we will create a single timer and cleanup process that will be shared across all shards. The cache will also allow the user to determine the number of shards that will be cleaned up at the same time. By only cleaning up a smaller number of shards at a time, we can reduce contention and improve the overall performance of the cache.

First, let’s redefine the existing Cache struct to be a shard. Make sure to update all the existing functions that are attached to the old Cache struct to point to the renamed cacheShard struct.

type cacheShard[K comparable, V any] struct {
	mutex           sync.RWMutex
	cache           map[K]*cacheItem[V]
	cacheDuration   time.Duration
	cleanupFunc     func(V)
}

type cacheItem[V any] struct {
	value        V
	lastAccessed time.Time
}

Now, that we have defined the struct for a shard, we can define the Cache struct that will hold all the shards.

type Cache[K comparable, V any] struct {
	shards 			 []*cacheShard[K, V]
	shardCnt 		 uint64
	hashFunc 		 func(key K) uint64
	cleanupChan      chan bool
	cleanupTicker    *time.Ticker
	cleanupInterval  time.Duration
	cleanupWorkerCnt int
}

The NewCache function will be updated to initialize all the shards and start the cleanup process. It will also take in 3 additional parameters, hashFunc, shardCount, and cleanupWorkerCnt. The hashFunc will be used to determine which shard a given key will be stored in. The shardCount will be the number of shards that the user wants to use. The cleanupWorkerCnt will be the number of workers that will be used to clean up the cache.

func NewCache[K comparable, V any](cacheDuration,
                                   cleanupInterval time.Duration,
                                   cleanupFunc func(V),
                                   hashFunc func(K) uint64,
                                   shardCount uint64,
                                   cleanupWorkerCnt int) *Cache[K, V] {

	var shards = make([]*cacheShard[K, V],shardCount)

	for i := uint64(0); i < shardCount; i++ {
		shards[i] = &cacheShard[K, V]{
			cache:           make(map[K]*cacheItem[V]),
			cacheDuration:   cacheDuration,

			cleanupFunc:     cleanupFunc,
		}
	}

	cache := &Cache[K, V]{
		shards: 		shards,
		shardCnt: 		shardCount,
		hashFunc: 		hashFunc,
		cleanupChan:     make(chan bool),
		cleanupTicker:   time.NewTicker(cleanupInterval),
		cleanupInterval:  cleanupInterval,
		cleanupWorkerCnt: cleanupWorkerCnt,
	}

	go cache.cleanupLoop()

	return cache
}

You may have noticed that the user will provide a hash function that will be responsible for determining which shard a given key will be stored in. The hash functiion should take in the key and return a uint64 value. It is important to note that the hash function should be deterministic, meaning that it should always return the same value for a given key. However, it is not necessary for it to be perfectly unique for each key. Our cache implementation will use the modulo operator to determine the shard for a given key given the number of shards that the user decided to use.

func (c *Cache[K, V]) getShardIndex(key K) uint64 {
	return c.hashFunc(key) % c.shardCnt
}

The last big logic change we need to make is the cleanup loop. The logic for cleaning up an individual shard is similar to the existing cleanup loop, but we will need to execute the cleanup logic on each shard individually. We will do this bu using a a variable amount of goroutines, defined by the initializer of the cache, that will listen to a channel for the index of the shard to be cleaned up. When a shard index is received on the channel, the cleanup logic will be executed on the corresponding shard. This will allow the user to define the number of shards that will be cleaned up at the same time, allowing the user to optimize the contention on the cache.

func (c *Cache[K, V]) cleanupLoop() {
	for {
		select {
		case <-c.cleanupTicker.C:

			idxChannel := make(chan int, c.shardCnt)
			wg := new(sync.WaitGroup)
			for w := 1; w <= c.cleanupWorkerCnt; w++ {
				wg.Add(1)
				go c.worker(idxChannel, wg)
			}

			for i := uint64(0); i < c.shardCnt; i++ {

				idxChannel <- int(i)
			}
			close(idxChannel)
			wg.Wait()

		case <-c.cleanupChan:
			return
		}
	}
}

func (c *Cache[K, V]) worker(idx <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for idx := range idx {
		c.shards[idx].Cleanup()
	}
}


func (c *cacheShard[K, V]) Cleanup() {
	c.mutex.RLock()
	defer c.mutex.RUnlock()

	for k, v := range c.cache {
		c.mutex.RUnlock()

		if time.Since(v.lastAccessed) > c.cacheDuration {
			c.mutex.Lock()
			if c.cleanupFunc != nil {
				c.cleanupFunc(v.value)
			}
			delete(c.cache, k)
			c.mutex.Unlock()
		}
	c.mutex.RLock()
	}
}

The majority of the logic should be familiar but lets go over the logic regarding the cleanup workers. The worker function will be responsible for listening to the channel for the index of the shard to be cleaned up. It will then call the Cleanup function on the corresponding shard. Withn the cleanupLoop function, we will create a channel to receive the shard index, and a wait group to wait for all the workers to finish. We will then create a number of workers, defined by the user, and start them. Finally, we will push all the shard indexes into the channel and close the channel. Due to the defer statement within the worker function, the wait group will be decremented once the worker function has finished executing when the channel is closed.

The remaining functions are fairly straightforward, as it will be internally calling the respective shard functions.

func (c *Cache[K, V]) Get(key K) (V, bool) {
	idx := c.getShardIndex(key)
	return c.shards[idx].Get(key)
}

func (c *Cache[K, V]) Set(key K, value V) {
	idx := c.getShardIndex(key)
	c.shards[idx].Set(key, value)
}

func (c *Cache[K, V]) Delete(key K) {
	idx := c.getShardIndex(key)
	c.shards[idx].Delete(key)
}

func (c *Cache[K, V]) Close() {
	c.cleanupTicker.Stop()
	close(c.cleanupChan)
	for _, shard := range c.shards {
		shard.Close()
	}
}

Conclusion

At this point, we have a functional sharded cache implementation. This implementation should outperform the previous implementation in highly concurrent scenarios. However, it is important to note that the sharding implementation is optimixed for high concurrency, thusly it will outperform the non-sharded implementation for these scenarios, but it will not be as performant as the non-sharded implementation for low concurrency scenarios. One should utilize the implementation that best suits their use case. In a followup post, we will do some performance benchmarks to see how each implementation performs under different workloads and number of concurrent requests.

Full Code

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import (
	"sync"
	"time"
)


type Cache[K comparable, V any] struct {
	shards 			 []*cacheShard[K, V]
	shardCnt 		 uint64
	hashFunc 		 func(key K) uint64
	cleanupChan      chan bool
	cleanupTicker    *time.Ticker
	cleanupInterval  time.Duration
	cleanupWorkerCnt int
}

type cacheShard[K comparable, V any] struct {
	mutex           sync.RWMutex
	cache           map[K]*cacheItem[V]
	cacheDuration   time.Duration
	cleanupFunc     func(V)
}

type cacheItem[V any] struct {
	value        V
	lastAccessed time.Time
}

func NewCache[K comparable, V any](cacheDuration, cleanupInterval time.Duration, cleanupFunc func(V), hashFunc func(K) uint64, shardCount uint64, cleanupWorkerCnt int) *Cache[K, V] {
	var shards = make([]*cacheShard[K, V],shardCount)

	for i := uint64(0); i < shardCount; i++ {
		shards[i] = &cacheShard[K, V]{
			cache:           make(map[K]*cacheItem[V]),
			cacheDuration:   cacheDuration,

			cleanupFunc:     cleanupFunc,
		}
	}
	cache := &Cache[K, V]{
		shards: 		shards,
		shardCnt: 		shardCount,
		hashFunc: 		hashFunc,
		cleanupChan:     make(chan bool),
		cleanupTicker:   time.NewTicker(cleanupInterval),
		cleanupInterval:  cleanupInterval,
		cleanupWorkerCnt: cleanupWorkerCnt,
	}
	go cache.cleanupLoop()
	return cache
}

func (c *Cache[K, V]) getShardIndex(key K) uint64 {
	return c.hashFunc(key) % c.shardCnt
}

func (c *Cache[K, V]) Get(key K) (V, bool) {
	idx := c.getShardIndex(key)
	return c.shards[idx].Get(key)
}

func (c *Cache[K, V]) Set(key K, value V) {
	idx := c.getShardIndex(key)
	c.shards[idx].Set(key, value)
}

func (c *Cache[K, V]) Delete(key K) {
	idx := c.getShardIndex(key)
	c.shards[idx].Delete(key)
}

func (c *Cache[K, V]) Close() {
	c.cleanupTicker.Stop()
	close(c.cleanupChan)
	for _, shard := range c.shards {
		shard.Close()
	}
}

func (c *Cache[K, V]) Size() int {
	var size int
	for _, shard := range c.shards {
		size += shard.Size()
	}
	return size
}

func (c *Cache[K, V]) cleanupLoop() {
	for {
		select {
		case <-c.cleanupTicker.C:

			idxChannel := make(chan int, c.shardCnt)
			wg := new(sync.WaitGroup)
			for w := 1; w <= c.cleanupWorkerCnt; w++ {
				wg.Add(1)
				go c.worker(idxChannel, wg)
			}

			for i := uint64(0); i < c.shardCnt; i++ {
				idxChannel <- int(i)
			}
			close(idxChannel)
			wg.Wait()
		case <-c.cleanupChan:
			return
		}
	}
}

func (c *Cache[K, V]) worker(idx <-chan int, wg *sync.WaitGroup) {
	defer wg.Done()
	for idx := range idx {
		c.shards[idx].Cleanup()
	}
}


func (c *cacheShard[K, V]) Cleanup() {
	c.mutex.RLock()
	defer c.mutex.RUnlock()

	for k, v := range c.cache {
		c.mutex.RUnlock()

		if time.Since(v.lastAccessed) > c.cacheDuration {
			c.mutex.Lock()
			if c.cleanupFunc != nil {
				c.cleanupFunc(v.value)
			}
			delete(c.cache, k)
			c.mutex.Unlock()
		}
	c.mutex.RLock()
	}
}

func (c *cacheShard[K, V]) Get(key K) (V, bool) {
	c.mutex.RLock()
	defer c.mutex.RUnlock()

	item, ok := c.cache[key]
	if !ok {
		var i V

		return i, false
	}

	item.lastAccessed = time.Now()

	return item.value, true
}

func (c *cacheShard[K, V]) Set(key K, value V) {
	c.mutex.Lock()
	defer c.mutex.Unlock()

	if item, ok := c.cache[key]; ok {
		if c.cleanupFunc != nil {
			c.cleanupFunc(item.value)
		}
		item.value = value
		item.lastAccessed = time.Now()
	} else {
		c.cache[key] = &cacheItem[V]{
			value:        value,
			lastAccessed: time.Now(),
		}
	}
}

func (c *cacheShard[K, V]) Delete(key K) {
	c.mutex.Lock()
	defer c.mutex.Unlock()
	if c.cleanupFunc != nil {
		c.cleanupFunc(c.cache[key].value)
	}
	delete(c.cache, key)
}

func (c *cacheShard[K, V]) Close() {

	c.mutex.Lock()
	defer c.mutex.Unlock()
	for key, item := range c.cache {
		if c.cleanupFunc != nil {
			c.cleanupFunc(item.value)
		}
		delete(c.cache, key)
	}
}

func (c *cacheShard[K, V]) Size() int {
	return len(c.cache)
}

© 2024 MzunguDev

About Me

I am an IBM Z Mainframe software and infrastructure engineer since 2018. I have experience in application development, application infrastructure, and mainframe infrastructure on IBM Z Mainframes.

Experience

I have a wide range of experience in the application infrastructure and mainframe infrastructure space. I have worked on customer facing applications, service account caching and handling applications, API development, external connectivity, and more.

Languages Environments Software & Tools
PL/I z/OS z/OS Connect
REXX IMS CA-Librarian
CLIST CICS Datavantage
COBOL DB2 BMC Mainview
Golang Z/VM Syncsort
Python USS IBM File Manager
Javascript MVS Fault Analzyer
Easytrieve TSO
ISPF
Passions & Hobbies

I have many passions and hobbies that I keep cycling through.

  • Traveling
  • History
  • Cooking
  • Cinema
  • Photography