Caching in Go - Part 2 - Concurrency Improvements
Monday, Dec 16, 2024 | 10 minute read | Updated at Monday, Dec 16, 2024
In the previous post, we discussed the basics of caching in Go and implemented a simple in-memory cache that utilizes a RWMutex to protect concurrent access to the inner map. In this post, we will explore how to further improve the handling of concurrent access to the cache through the use of a concept called “sharding.”
In the implementation of the cache in Part 1 of this series, we used go’s built-in map data structure and a RWMutex to protect concurrent access to the cache. This approach will work well for a moderate number of concurrent requests, but as the number of requests increases, the performance of the cache can degrade. This is because the lock contention can become a bottleneck when the number of concurrent requests becomes elevated.
To address this issue, we can use a technique called “sharding”. Sharding involves dividing the cache into multiple smaller caches, each protected by its own RWMutex. This allows us to distribute the load across multiple caches, reducing the lock contention on a single map and improving the overall performance of the cache.
Let’s take a look at how we can update our current cache implementation to support sharding.
Sharding the Cache
The current implementation of the cache does not need to be modified heavily to support sharding. In fact, it will be used as the data structure for each shard. What we will need to implement is the high level data structure that will hold all the shards, and the logic to distribute the keys across the shards. Considering that the cache implementation is generic and does not mandate a specific data type for the key, other thay it be a comparanaable type, we will need to allow the user to provide a function that can be used to determine the shard for a given key. This is commonly referred to as a “hash function.” The diagram below shows a high level overview of the sharded cache implementation.
Let’s first start by defining the data structures. In the current implementation, we have a map that holds the cache data, and a RWMutex to protect concurrent access to the map. It also stored the timer for the cache cleanup process. However, in the sharded implementation, we do not want each shard to have its own timer and cleanup process. That would result in more overhead and would cause contention issues. The point of sharding is to distribute the load across multiple caches in order to reduce the contention on a single cache. If each shard has its own timer and cleanup process, then unless we add more complec logic, the cleanup could trigger at the same time across all shards, thus increasing the probability of contention. To avoid this, we will create a single timer and cleanup process that will be shared across all shards. The cache will also allow the user to determine the number of shards that will be cleaned up at the same time. By only cleaning up a smaller number of shards at a time, we can reduce contention and improve the overall performance of the cache.
First, let’s redefine the existing Cache struct to be a shard. Make sure to update all the existing functions that are attached to the old Cache struct to point to the renamed cacheShard struct.
type cacheShard[K comparable, V any] struct {
mutex sync.RWMutex
cache map[K]*cacheItem[V]
cacheDuration time.Duration
cleanupFunc func(V)
}
type cacheItem[V any] struct {
value V
lastAccessed time.Time
}
Now, that we have defined the struct for a shard, we can define the Cache struct that will hold all the shards.
type Cache[K comparable, V any] struct {
shards []*cacheShard[K, V]
shardCnt uint64
hashFunc func(key K) uint64
cleanupChan chan bool
cleanupTicker *time.Ticker
cleanupInterval time.Duration
cleanupWorkerCnt int
}
The NewCache function will be updated to initialize all the shards and start the cleanup process. It will also take in 3 additional parameters, hashFunc, shardCount, and cleanupWorkerCnt. The hashFunc will be used to determine which shard a given key will be stored in. The shardCount will be the number of shards that the user wants to use. The cleanupWorkerCnt will be the number of workers that will be used to clean up the cache.
func NewCache[K comparable, V any](cacheDuration,
cleanupInterval time.Duration,
cleanupFunc func(V),
hashFunc func(K) uint64,
shardCount uint64,
cleanupWorkerCnt int) *Cache[K, V] {
var shards = make([]*cacheShard[K, V],shardCount)
for i := uint64(0); i < shardCount; i++ {
shards[i] = &cacheShard[K, V]{
cache: make(map[K]*cacheItem[V]),
cacheDuration: cacheDuration,
cleanupFunc: cleanupFunc,
}
}
cache := &Cache[K, V]{
shards: shards,
shardCnt: shardCount,
hashFunc: hashFunc,
cleanupChan: make(chan bool),
cleanupTicker: time.NewTicker(cleanupInterval),
cleanupInterval: cleanupInterval,
cleanupWorkerCnt: cleanupWorkerCnt,
}
go cache.cleanupLoop()
return cache
}
You may have noticed that the user will provide a hash function that will be responsible for determining which shard a given key will be stored in. The hash functiion should take in the key and return a uint64 value. It is important to note that the hash function should be deterministic, meaning that it should always return the same value for a given key. However, it is not necessary for it to be perfectly unique for each key. Our cache implementation will use the modulo operator to determine the shard for a given key given the number of shards that the user decided to use.
func (c *Cache[K, V]) getShardIndex(key K) uint64 {
return c.hashFunc(key) % c.shardCnt
}
The last big logic change we need to make is the cleanup loop. The logic for cleaning up an individual shard is similar to the existing cleanup loop, but we will need to execute the cleanup logic on each shard individually. We will do this bu using a a variable amount of goroutines, defined by the initializer of the cache, that will listen to a channel for the index of the shard to be cleaned up. When a shard index is received on the channel, the cleanup logic will be executed on the corresponding shard. This will allow the user to define the number of shards that will be cleaned up at the same time, allowing the user to optimize the contention on the cache.
func (c *Cache[K, V]) cleanupLoop() {
for {
select {
case <-c.cleanupTicker.C:
idxChannel := make(chan int, c.shardCnt)
wg := new(sync.WaitGroup)
for w := 1; w <= c.cleanupWorkerCnt; w++ {
wg.Add(1)
go c.worker(idxChannel, wg)
}
for i := uint64(0); i < c.shardCnt; i++ {
idxChannel <- int(i)
}
close(idxChannel)
wg.Wait()
case <-c.cleanupChan:
return
}
}
}
func (c *Cache[K, V]) worker(idx <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for idx := range idx {
c.shards[idx].Cleanup()
}
}
func (c *cacheShard[K, V]) Cleanup() {
c.mutex.RLock()
defer c.mutex.RUnlock()
for k, v := range c.cache {
c.mutex.RUnlock()
if time.Since(v.lastAccessed) > c.cacheDuration {
c.mutex.Lock()
if c.cleanupFunc != nil {
c.cleanupFunc(v.value)
}
delete(c.cache, k)
c.mutex.Unlock()
}
c.mutex.RLock()
}
}
The majority of the logic should be familiar but lets go over the logic regarding the cleanup workers. The worker function will be responsible for listening to the channel for the index of the shard to be cleaned up. It will then call the Cleanup function on the corresponding shard. Withn the cleanupLoop function, we will create a channel to receive the shard index, and a wait group to wait for all the workers to finish. We will then create a number of workers, defined by the user, and start them. Finally, we will push all the shard indexes into the channel and close the channel. Due to the defer statement within the worker function, the wait group will be decremented once the worker function has finished executing when the channel is closed.
The remaining functions are fairly straightforward, as it will be internally calling the respective shard functions.
func (c *Cache[K, V]) Get(key K) (V, bool) {
idx := c.getShardIndex(key)
return c.shards[idx].Get(key)
}
func (c *Cache[K, V]) Set(key K, value V) {
idx := c.getShardIndex(key)
c.shards[idx].Set(key, value)
}
func (c *Cache[K, V]) Delete(key K) {
idx := c.getShardIndex(key)
c.shards[idx].Delete(key)
}
func (c *Cache[K, V]) Close() {
c.cleanupTicker.Stop()
close(c.cleanupChan)
for _, shard := range c.shards {
shard.Close()
}
}
Conclusion
At this point, we have a functional sharded cache implementation. This implementation should outperform the previous implementation in highly concurrent scenarios. However, it is important to note that the sharding implementation is optimixed for high concurrency, thusly it will outperform the non-sharded implementation for these scenarios, but it will not be as performant as the non-sharded implementation for low concurrency scenarios. One should utilize the implementation that best suits their use case. In a followup post, we will do some performance benchmarks to see how each implementation performs under different workloads and number of concurrent requests.
Full Code
|
|