Ristretto 源码分析
什么是ristetto?
ristetto是golang写的高性能的基于内存的缓存库。
- ristretto 在并发方面的设计,使用了BP-Wrapper的并发技巧,引入了环形缓冲区。数据一致性方面采用的是弱一致. 最终一致.更新缓存后立马查询不保证是最新的数据.
- 基于LFU淘汰策略的
- ristretto 使用的内存分配技术。构建可以选择使用 jemalloc 来分配内存。http://jemalloc.net/
- key 的存储限制基于成本cost
算法介绍
在介绍 ristetto 我先给大家简单介绍以下几种相关算法。
缓存淘汰算法 LRU 和 LFU
LRU,淘汰最近最少使用的数据。按时间排序淘汰。把数据加入一个链表中,按访问时间排序,发生淘汰的时候,把访问时间最旧的淘汰掉。
比如有数据 1,2,1,3,2
此时缓存中已有(1,2)
当3加入的时候,得把后面的2淘汰,变成(3,1)LFU,最近不经常使用,把数据加入到链表中,按频次排序,一个数据被访问过,把它的频次+1,发生淘汰的时候,把频次低的淘汰掉。
比如有数据 1,1,1,2,2,3
缓存中有(1(3次),2(2次))
当3加入的时候,得把后面的2淘汰,变成(1(3次),3(1次))
区别:LRU 是得把 1 淘汰。
频率评估算法 Count-Min Sketch
- Count-Min Sketch算法
选定d个hash函数,开一个 dxm 的二维整数数组作为哈希表
对于每个元素,分别使用d个hash函数计算相应的哈希值,并对m取余,然后在对应的位置上增1,二维数组中的每个整数称为sketch
要查询某个元素的频率时,只需要取出d个sketch, 返回最小的那一个(其实d个sketch都是该元素的近似频率,返回任意一个都可以,该算法选择最小的那个)
ristetto的实现
介绍 Ristretto 主要通过以下三个地方分析
- 怎么存储数据的
- 为什么具高吞吐量
- 淘汰策略是什么样的
怎么存储数据的
缓存的核心是散列图,其中包含有关进出的规则。如果哈希表的性能不佳,则整个缓存将受到影响。Go 没有无锁的并发哈希图。相反,Go 中的线程安全是通过显式获取互斥锁来实现的。然而sync.Map对于读取繁重的工作负载表现良好,但对于写入工作负载却表现不佳。在Ristretto中采用分片互斥包装的 Go 映射的方式来提高整体性能。特别是,这里使用了256 个分片以确保即使在 64 核服务器上也能很好地执行。
使用基于分片的方法,Ristretto使用uint64密钥 来计算密钥应该放入哪个分片,而不是存储整个密钥。这样做的理由是,不用对长密钥消耗过多内存的担忧,而且Ristretto在多个位置进行键的散列,并且在入口处执行一次即可使我们重新使用该散列,从而避免了更多的计算。
为了生成快速哈希Key,Ristretto 使用 Go Runtime 的runtime.memhash 的算法。 这个函数使用汇编代码快速生成哈希。 但是我们可以自定义方法。
func KeyToHash(key interface{}) (uint64, uint64) {
if key == nil {
return 0, 0
}
switch k := key.(type) {
case uint64:
return k, 0
case string:
return MemHashString(k), xxhash.Sum64String(k)
case []byte:
return MemHash(k), xxhash.Sum64(k)
case byte:
return uint64(k), 0
case int:
return uint64(k), 0
case int32:
return uint64(k), 0
case uint32:
return uint64(k), 0
case int64:
return uint64(k), 0
default:
panic("Key type not supported")
}
}
分片存储代码:
const numShards uint64 = 256
type shardedMap struct {
shards []*lockedMap
expiryMap *expirationMap
}
func newShardedMap() *shardedMap {
sm := &shardedMap{
shards: make([]*lockedMap, int(numShards)),
expiryMap: newExpirationMap(),
}
for i := range sm.shards {
sm.shards[i] = newLockedMap(sm.expiryMap)
}
return sm
}
为什么具高吞吐量
要实现高命中率,需要管理有关缓存中存在的内容以及缓存中应存在的内容的元数据。在跨 goroutines 平衡缓存的性能和可伸缩性时,这是一件非常困难的事情。这里 Ristretto 借用了 BP- 中仅使用批处理这种方式。
与其为每个元数据突变获取互斥锁,不如在获取互斥锁并处理突变之前等待环形缓冲区填满。这几乎没有任何开销地降低了竞争。
Wrapper论文中的技巧,这论文介绍了一个系统框架,该框架使得任何替换算法几乎都可以无争用地锁定。其中介绍了两种缓解争用的方法:预取和批处理。Ristretto
我们应用这一切的方法Gets,并Set到缓存中。
- Gets
在LRU 缓存中,通常将密钥放在链接列表的开头。在基于 LFU 的缓存中,需要增加项目的点击计数器。这两个操作都需要对缓存全局结构进行线程安全访问。BP-Wrapper建议使用批处理来处理命中计数器的增量。Ristretto 中设计了一种巧妙的方法sync.Pool
实现带状,有损环形缓冲区,这些缓冲区性能出色,数据丢失很少。
池中存储的任何项目都可以随时自动删除,删除的时候不会通知。这就引入了一种有损行为。 池中的每个项目实际上都是一批密钥。批次填满后,将其推送到某个渠道。故意将通道大小保持较小,以避免消耗太多的 CPU 周期来处理它。如果通道已满,则删除该批次。 这引入了有损行为的第二级。一个 goroutine 从内部通道中提取此批次并处理密钥,从而更新其命中计数器。
实现在ristretto的ring文件中。拿一些代码看一下:
// key加入 有损环形缓冲区中
func (b *ringBuffer) Push(item uint64) {
// Reuse or create a new stripe.
stripe := b.pool.Get().(*ringStripe)
stripe.Push(item)
b.pool.Put(stripe)
}
// 一旦缓冲区填满,push到通道:
func (s *ringStripe) Push(item uint64) {
s.data = append(s.data, item)
// Decide if the ring buffer should be drained.
if len(s.data) >= s.capa {
// Send elements to consumer and create a new ring stripe.
if s.cons.Push(s.data) {
s.data = make([]uint64, 0, s.capa)
} else {
s.data = s.data[:0]
}
}
}
func (p *defaultPolicy) Push(keys []uint64) bool {
if p.isClosed {
return false
}
if len(keys) == 0 {
return true
}
select {
case p.itemsCh <- keys:
p.metrics.add(keepGets, keys[0], uint64(len(keys)))
return true
default:
p.metrics.add(dropGets, keys[0], uint64(len(keys)))
return false
}
}
// 更新key计数器
func (p *tinyLFU) Push(keys []uint64) {
for _, key := range keys {
p.Increment(key)
}
}
- Set
Set 缓冲区的要求与 Get 稍有不同。在 Gets 中,我们对键进行缓冲,仅在缓冲区填满后才对其进行处理。在集合中,我们希望尽快处理密钥。因此,使用一个通道来捕获 Set,如果通道已满,则将它们放在地板上以避免竞争。几个后台 goroutine 从通道中选择集并处理该集。
与 Gets 一样,该方法旨在优化竞争阻力。
func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration) bool {
if c == nil || c.isClosed || key == nil {
return false
}
var expiration int64
switch {
case ttl == 0:
// No expiration.
break
case ttl < 0:
// Treat this a a no-op.
return false
default:
expiration = time.Now().Add(ttl).Unix()
}
keyHash, conflictHash := c.keyToHash(key)
i := &Item{
flag: itemNew,
Key: keyHash,
Conflict: conflictHash,
Value: value,
Cost: cost,
Expiration: expiration,
}
// cost is eventually updated. The expiration must also be immediately updated
// to prevent items from being prematurely removed from the map.
if prev, ok := c.store.Update(i); ok {
c.onExit(prev)
i.flag = itemUpdate
}
// Attempt to send item to policy.
select {
case c.setBuf <- i:
return true
default:
if i.flag == itemUpdate {
// Return true if this was an update operation since we've already
// updated the store. For all the other operations (set/delete), we
// return false which means the item was not inserted.
return true
}
c.Metrics.add(dropSets, keyHash, 1)
return false
}
}
淘汰策略是什么样的
- 关于成本
无限大的缓存实际上是不可能的。高速缓存必须有大小限制。许多缓存库会将缓存大小视为元素数。ristretto将用成本来限制大小。大多数key存储的是具有可变大小的值。一个值可能要花几个字节,另一个值要花几千字节,而另一个值要花几兆字节。将它们视为具有相同的内存成本是不对的。
在Ristretto 中,他将成本附加到每个键值。用户在 Set 时指定该费用。然后他将该成本与缓存的 MaxCost 相比较。当缓存以最大容量运行时,比较重要的点击率比较高的将会取代点击率比较低的。
- 通过 TinyLFU 的set政策
Ristretto 中set策略使用的是TinyLFU中的算法。主要思想是仅在新项目的估计值高于被逐出的项目的估计值时才允许使用。Ristretto使用Count-Min Sketch 在Ristretto 中实现了 TinyLFU 。它使用 4 位计数器来近似项(ɛ)的访问频率。与使用普通键映射到频率映射相比,每个键的这种小成本使我们能够跟踪更大范围的全局键空间样本。
TinyLFU 还通过Reset功能保持键访问的新近性。N 键递增后,计数器减半。因此,一段时间未看到的键会将其计数器重置为零;为最近出现的密钥铺平道路。
- 通过采样 LFU 驱逐政策
当缓存达到容量时,每个传入密钥都应替换缓存中存在的一个或多个密钥。不仅如此,传入密钥的 should 应该比被逐出的密钥的 higher 高。要查找低 key 的密钥,Ristretto使用了 Go map 迭代提供的自然 随机性来挑选一个密钥样本,并在它们上循环查找最低ɛ的密钥。
然后,将此键的 against 与传入键进行比较。如果输入的密钥具有较高的ɛ,则此密钥将被逐出(逐出策略)。否则,输入密钥将被拒绝(准入策略)。重复此机制,直到可以将传入密钥的成本放入高速缓存中为止。因此,单个输入密钥可以移动一个以上的密钥。
使用
配置介绍
type Config struct {
// 可以简单理解成key的数量,他是用来保存key被点击次数的,但实际数量并不是这个设置的值,而是最靠近并且大于或等于该值的2的n次方值减1
// 比如
// 设置成 1,2^0=1,2^1=2,这时候2^0次方等于1,所以最终的值就是2^0-1=0
// 设置成 2,2^0=1,2^1=2,这时候2^1次方等于2,所以最终的值就是2^1-1=1
// 设置成 3,2^0=1,2^1=2,2^2=4,这时候2^2次方大于等于3,所以最终的值就是2^2-1=3
// 设置成 6,2^0=1,2^1=2,2^2=4,2^3=8,这时候2^3次方大于等于7,所以最终的值就是2^3-1=7
// 设置成 20,2^0=1,2^1=2,2^2=4,2^3=8,...,2^4=16,2^5=32,这时候2^5次方大于等于7,所以最终的值就是2^5-1=31
// 官方建议设置成你想要设置key数量的10倍,因为这样会具有更高的准确性
// 根据这个值,可以知道计数器要用的内存 NumCounters / 1024 / 1024 * 4 MB
NumCounters int64
// 单位是可以随意的,例如你想限制内存最大为100MB,你可以把MaxCost设置为100,000,000,那么每个key的成本cost就是bytes
MaxCost int64
// BufferItems决定获取缓冲区的大小。除非您有一个罕见的用例,否则使用'64'作为BufferItems值可以
获得良好的性能。
// BufferItems 决定了Get缓冲区的大小,在有损环形缓冲区ringBuffer中,当数据这个值,就会去对这批key进行点击数量统计
BufferItems int64
// 设置为true,就会统计操作类型的次数,设置为true会消耗成本,建议在测试的时候才开启
Metrics bool
// 当cache被清除的时候调用,过期清除 还有 策略清除
OnEvict func(item *Item)
// 设置一个key失败的时候调用,失败的条件一般有,已经存在key,再次add,或者cost不足
OnReject func(item *Item)
// 删除一个值的时候调用,可以用做手动回收内存。
OnExit func(val interface{})
// 计算key的hash函数
KeyToHash func(key interface{}) (uint64, uint64)
// 计算成本的函数,没有设置成本的时候用这个来计算成本
Cost func(value interface{}) int64
//set(k,v,)
// 设置为true的时候 不计内部结构的成本,默认是计算的,用于存储key-value 结构
// type storeItem struct {
// key uint64
// conflict uint64
// value interface{}
// expiration int64
// }
IgnoreInternalCost bool
}