缓存系统学习

缓存系统

1. 前言

  1. 学习自:

    https://github.com/youngyangyang04/KamaCache-Go

    有些地方(bug)进行了修改,由于修改了变量的名字,所以没有给原来的项目提 pr,不过已和原作者反馈。

2. 知识点 List

  1. 使用 With 方案来对结构体初始化。
  2. 一致性 Hash。
  3. BKDR 计算 Hash 值。
  4. 掩码思想的使用。
  5. 函数的参数依旧是函数的回调写法。
  6. singleflight
  7. 使用 etcd 注册服务,包括客户端的创建、连接发起、注册服务。
  8. Context 使用:请求时间限定和变量传递。
  9. gRPC 的使用:gRPC 客户端和服务端的创建、优雅关机和健康检查。
  10. 接口型函数
  11. panic 和 error
  12. 主线程中针对其他节点的请求使用协程异步处理。
  13. 各种输出的用法:log、logrus 和 fmt 三种使用场景。
  14. 定时器以及其开关。
  15. 包以及结构体中变量大小写的规则。

3. 知识点细节

2.1 使用 With 方案来对结构体初始化

  1. 以 peers.go 中的 PeerPicker 为例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    package KamaCacheLearning

    import (
    "KamaCacheLearning/consistenthash"
    "KamaCacheLearning/registry"
    "context"
    "fmt"
    "github.com/sirupsen/logrus"
    "go.etcd.io/etcd/api/v3/mvccpb"
    clientv3 "go.etcd.io/etcd/client/v3"
    "strings"
    "sync"
    "time"
    )

    const defaultSvcName = "kama-cache-learning"

    type PeerPicker interface {
    // PickPeer 根据键选择合适的缓存节点
    // @Return peer 为节点实例、ok 为是否找到、self 为是否本地节点
    PickPeer(key string) (peer Peer, ok bool, self bool)
    Close() error
    }
    type ClientPicker struct {
    selfAddr string
    svcName string
    mutex sync.RWMutex
    etcdClient *clientv3.Client
    clients map[string]*Client // 节点地址与 Client 的映射
    consHashMap *consistenthash.Map
    ctx context.Context
    cancel context.CancelFunc
    }

    type PickerOption func(*ClientPicker)

    func WithServiceName(svcName string) PickerOption {
    return func(c *ClientPicker) {
    c.svcName = svcName
    }
    }

    func NewClientPicker(selfAddr string, opts ...PickerOption) (*ClientPicker, error) {
    // 1. 初始化 ctx 和变量
    ctx, cancel := context.WithCancel(context.Background())
    clientPicker := &ClientPicker{
    selfAddr: selfAddr,
    svcName: defaultSvcName,
    clients: make(map[string]*Client),
    ctx: ctx,
    cancel: cancel,
    consHashMap: consistenthash.NewConsistentHashMap(),
    }
    // 2. 调用特殊初始化方法
    for _, opt := range opts {
    opt(clientPicker)
    }
    // 3. etcdClient 初始化
    etcdClient, err := clientv3.New(
    clientv3.Config{
    Endpoints: registry.DefaultEtcdConfig.Endpoints,
    DialTimeout: registry.DefaultEtcdConfig.DialTimeout,
    })
    if err != nil {
    cancel()
    return nil, fmt.Errorf("fail to create etcd client: %w", err)
    }
    clientPicker.etcdClient = etcdClient
    // 4. 启动服务发现
    err = clientPicker.startSvcDiscovery()
    if err != nil {
    cancel()
    err := etcdClient.Close()
    if err != nil {
    return nil, fmt.Errorf("fail to close etcd client: %w", err)
    }
    return nil, fmt.Errorf("fail to startSvcDiscovery: %w", err)
    }
    // 5. 返回值
    return clientPicker, nil
    }
  2. 可以看到,首先命名一个使用结构体的函数,随后创建 With 开头的函数,对外开放方法以修改结构体中的变量,最后在结构体初始化中用 for 进行调用 With 开头的函数。
    其实 gRPC 创建客户端也用到了类似的手段:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    grpcConn, err := grpc.NewClient(addr,
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithDefaultCallOptions(grpc.WaitForReady(true)))
    // 来自 gRPC
    func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
    // ...
    // Apply dial options.
    disableGlobalOpts := false
    for _, opt := range opts {
    if _, ok := opt.(*disableGlobalDialOptions); ok {
    disableGlobalOpts = true
    break
    }
    }

    if !disableGlobalOpts {
    for _, opt := range globalDialOptions {
    opt.apply(&cc.dopts)
    }
    }

    for _, opt := range opts {
    opt.apply(&cc.dopts)
    }
    // ...
    }

2.2 一致性 Hash

  1. 网上也有相关的教程:

    https://developer.aliyun.com/article/1082388

    主要用于缓存节点的动态扩展和缩容问题,是一种常用的负载均衡策略。

2.3 BKDR 计算 Hash 值

  1. 一种用的很广泛的,用于计算字符串 Hash 的算法,直接用即可。

  2. 项目 lru2store.go 中使用了该方法计算 key 的 hash 以定位具体的桶:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func (lru2Store *LRU2Store) getKeyIndex(key string) int32 {
    //log.Println(key, "index is", hashBKRD(key)&lru2Store.mask)
    return hashBKRD(key) & lru2Store.mask
    }

    // 实现了 BKDR 哈希算法,用于计算键的哈希值
    // note 这是一种用于计算字符串 hash 值的算法,用的很多很广泛
    func hashBKRD(s string) (hash int32) {
    for i := 0; i < len(s); i++ {
    hash = hash*131 + int32(s[i])
    }

    return hash
    }

2.4 掩码思想的使用

  1. 在 lru2store.go 中,将桶的数量定义为 2 的幂数,然后使用掩码结合 BKDR Hash 值来定位一个 key 应该寻找的桶。

  2. 项目中给出了掩码的计算方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // maskOfNextPowOf2 计算大于或等于输入值的最近 2 的幂次方减一作为掩码值
    // note 最多 11111111,8 位全 1 掩码值,因此最大个数是 2^9 次方个桶,也就是 512
    func maskOfNextPowOf2(cap uint16) uint16 {
    // note 这一行的意思是如果 cap 本身就是 2 的幂,那么返回 cap - 1(其二进制就是 0111111...)
    if cap > 0 && cap&(cap-1) == 0 {
    return cap - 1
    }

    // 通过多次右移和按位或操作,将二进制中最高的 1 位右边的所有位都填充为 1
    // note 给定一个二进制为 1xxxxxxx,可以跟着试一下。
    // note 每次右移的位数是前一次的两倍,直到右移的位数覆盖或超过最高位 1 的位置。
    cap |= cap >> 1
    cap |= cap >> 2
    cap |= cap >> 4

    return cap | (cap >> 8)
    }

2.5 函数的参数是函数

  1. lru2.go 中定义了遍历的函数:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // walk 为遍历方法,会遍历每一个节点。
    // @param walker 遍历到节点时需要执行的回调方法
    func (lru2cache *LRU2Cache) walk(walker func(key string, value Value, expireAt int64) bool) {
    // 从首节点开始遍历
    for index := *lru2cache.frontPoint; index != 0; index = lru2cache.doubleLink[index][next] {
    tempNode := lru2cache.results[index-1]
    if tempNode.expireAt > 0 && !walker(tempNode.key, tempNode.value, tempNode.expireAt) {
    return
    }
    }
    }
    可以看到函数的参数是另外一个函数。

2.6 singleflight 单飞机制

  1. 可以参照:

    https://zhuanlan.zhihu.com/p/382965636

    注意一下这种机制的问题。

  2. 这个项目的 singleflight 使用 sync.Map 来保证并发安全:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    // Package util -----------------------------
    // @author : EndlessShw
    // @time : 2025/7/29 15:54
    // @Description : singleflight 保证缓存不被击穿,阻止大量请求涌入数据库
    // -------------------------------------------
    package util

    import "sync"

    type call struct {
    mutexWG sync.WaitGroup
    val interface{}
    err error
    }

    type Group struct {
    resultMap sync.Map
    }

    func (group *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
    if result, ok := group.resultMap.Load(key); ok {
    resultCall := result.(*call)
    resultCall.mutexWG.Wait()
    return resultCall.val, resultCall.err
    }

    // note 相比与不使用 sync.Map 的普通版本,这里的两步时间有可能导致多个首次请求
    resultCall := &call{}
    resultCall.mutexWG.Add(1)
    group.resultMap.Store(key, resultCall)

    resultCall.val, resultCall.err = fn()
    resultCall.mutexWG.Done()

    go func() {
    group.resultMap.Delete(key)
    }()

    return resultCall.val, resultCall.err
    }

2.7 使用 etcd 注册服务

  1. 有关 etcd 的介绍,可以详见:

    https://topgoer.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E6%93%8D%E4%BD%9C/go%E6%93%8D%E4%BD%9Cetcd/etcd%E4%BB%8B%E7%BB%8D.html

    etcd 和 Redis 有相似之处,是键值对数据库。和 Redis 不同的是,它是分布式数据库(Redis 是主从)。使用 gRPC 进行通讯。

  2. 有关其基本操作和机制详解,可见:

    https://www.ztong-techhub.com/archives/Oc9dBGEO
    https://topgoer.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E6%93%8D%E4%BD%9C/go%E6%93%8D%E4%BD%9Cetcd/%E6%93%8D%E4%BD%9Cetcd.html
    https://zhuanlan.zhihu.com/p/111800017

    除了基本的 etcd 的 crud,本项目还涉及了 Lease 和 Watch 机制。
    Lease 机制在本项目是为了保证服务注册的时间(Register.go):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    // Register 注册服务到etcd
    func Register(svcName, addr string, stopCh <-chan error) error {
    // note 1. etcd 客户端创建
    cli, err := clientv3.New(clientv3.Config{
    Endpoints: DefaultConfig.Endpoints,
    DialTimeout: DefaultConfig.DialTimeout,
    })
    if err != nil {
    return fmt.Errorf("failed to create etcd client: %v", err)
    }

    // note 2. 获取本地 IP 并打印(未发现本地 IP 的后续使用
    localIP, err := getLocalIP()
    if err != nil {
    cli.Close()
    return fmt.Errorf("failed to get local IP: %v", err)
    }
    if addr[0] == ':' {
    addr = fmt.Sprintf("%s%s", localIP, addr)
    }

    // note 3. 创建租约
    // point 租约在 etcd 中就是针对 kv 的生效时间
    lease, err := cli.Grant(context.Background(), 10) // 增加租约时间到10秒
    if err != nil {
    cli.Close()
    return fmt.Errorf("failed to create lease: %v", err)
    }

    // 注册服务,使用完整的key路径
    key := fmt.Sprintf("/services/%s/%s", svcName, addr)
    _, err = cli.Put(context.Background(), key, addr, clientv3.WithLease(lease.ID))
    if err != nil {
    cli.Close()
    return fmt.Errorf("failed to put key-value to etcd: %v", err)
    }

    // 保持租约
    // point KeepAlive 本质是自动续约,只要 context 没有被 cancel 或者 timeout,那么其就会被一直续租
    keepAliveCh, err := cli.KeepAlive(context.Background(), lease.ID)
    if err != nil {
    cli.Close()
    return fmt.Errorf("failed to keep lease alive: %v", err)
    }

    // 处理租约续期和服务注销
    go func() {
    defer cli.Close()
    for {
    select {
    case <-stopCh:
    // 服务注销,撤销租约
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    cli.Revoke(ctx, lease.ID)
    cancel()
    return
    case resp, ok := <-keepAliveCh:
    // note 对续约租期过程中的失败进行一个处理
    if !ok {
    logrus.Warn("keep alive channel closed")
    return
    }
    logrus.Debugf("successfully renewed lease: %d", resp.ID)
    }
    }
    }()

    logrus.Infof("Service registered: %s at %s", svcName, addr)
    return nil
    }

    Watch 机制用于监听 key 或范围的变化,本项目中,PeerPicker 用于监听其他服务节点的变化并同时进行 client 的修改(以保证对其他节点的连通性):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    // watchServiceChanges 监听服务实例变化
    // note 监听需要额外开始协程
    func (p *ClientPicker) watchServiceChanges() {
    watcher := clientv3.NewWatcher(p.etcdCli)
    watchChan := watcher.Watch(p.ctx, "/services/"+p.svcName, clientv3.WithPrefix())

    // note ClientPicker.ctx 生命周期和 ClientPicker 保持一致
    for {
    select {
    case <-p.ctx.Done():
    watcher.Close()
    return
    case resp := <-watchChan:
    // note resp 是 clientv3.WatchResponse,里面有个 Events 数组
    p.handleWatchEvents(resp.Events)
    }
    }
    }

    // handleWatchEvents 处理监听到的事件
    func (p *ClientPicker) handleWatchEvents(events []*clientv3.Event) {
    p.mu.Lock()
    defer p.mu.Unlock()

    for _, event := range events {
    // note key 的格式是:/services/svcName/addr,value 的格式是 addr
    addr := string(event.Kv.Value)
    if addr == p.selfAddr {
    continue
    }

    switch event.Type {
    // note 有 put 时就增加
    case clientv3.EventTypePut:
    if _, exists := p.clients[addr]; !exists {
    p.set(addr)
    logrus.Infof("New service discovered at %s", addr)
    }
    // note 有 Delete 时就跟着删除,注意 client 内部有 rpc 连接需要断开,所以要调用 Close
    case clientv3.EventTypeDelete:
    if client, exists := p.clients[addr]; exists {
    client.Close()
    p.remove(addr)
    logrus.Infof("Service removed at %s", addr)
    }
    }
    }
    }

2.8 Context 的使用

  1. 上下文的使用可以参考这篇文章:

    https://segmentfault.com/a/1190000040917752#item-3-4

  2. 本项目主要是超时控制用的比较多,只要发起非本机的请求基本都会用到,例如:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    // fetchAllServices 获取当前服务相关的所有实例
    func (p *ClientPicker) fetchAllServices() error {
    ctx, cancel := context.WithTimeout(p.ctx, 3*time.Second)
    defer cancel()

    // note 通过 etcd 与服务打交道
    // note clientv3.WithPrefix 意思是匹配所有带上前缀的 key
    resp, err := p.etcdCli.Get(ctx, "/services/"+p.svcName, clientv3.WithPrefix())
    if err != nil {
    return fmt.Errorf("failed to get all services: %v", err)
    }

    p.mu.Lock()
    defer p.mu.Unlock()

    for _, kv := range resp.Kvs {
    addr := string(kv.Value)
    // todo
    if addr != "" && addr != p.selfAddr {
    p.set(addr)
    logrus.Infof("Discovered service at %s", addr)
    }
    }
    return nil
    }

2.9 gRPC

  1. 本项目使用 gRPC 而不是 HTTP 来完成分布式节点间的数据获取与同步。每个节点的 server 端都有一个 gRPC 服务器,同时 client 端又有 gRPC 客户端。双方约定的方法通过 protobuf 定义。例如本项目 server 端就实现了 protobuf 中的 GetSetDelete 方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    syntax = "proto3";
    package pb;

    option go_package = "./";

    message Request {
    string group = 1;
    string key = 2;
    bytes value = 3;
    }

    message ResponseForGet {
    bytes value = 1;
    }

    message ResponseForDelete {
    bool value = 1;
    }

    service KamaCache {
    rpc Get(Request) returns (ResponseForGet);
    rpc Set(Request) returns (ResponseForGet);
    rpc Delete(Request) returns(ResponseForDelete);
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    // note 接口的实现按照 protobuf 文件来

    // Get 实现Cache服务的Get方法
    func (s *Server) Get(ctx context.Context, req *pb.Request) (*pb.ResponseForGet, error) {
    // note 1. 获取 group
    group := GetGroup(req.Group)
    if group == nil {
    return nil, fmt.Errorf("group %s not found", req.Group)
    }

    // note 2. 调用 group.Get,同时将结果用 protobuf 中定义的返回类包裹
    view, err := group.Get(ctx, req.Key)
    if err != nil {
    return nil, err
    }

    return &pb.ResponseForGet{Value: view.ByteSLice()}, nil
    }

    // Set 实现Cache服务的Set方法
    func (s *Server) Set(ctx context.Context, req *pb.Request) (*pb.ResponseForGet, error) {
    group := GetGroup(req.Group)
    if group == nil {
    return nil, fmt.Errorf("group %s not found", req.Group)
    }

    // note 当时写 group.Set 时涉及到节点同步的问题,这里就是针对同步问题定义 context。
    // note 下文的 Delete 同理
    // 从 context 中获取标记,如果没有则创建新的 context
    fromPeer := ctx.Value("from_peer")
    if fromPeer == nil {
    ctx = context.WithValue(ctx, "from_peer", true)
    }

    if err := group.Set(ctx, req.Key, req.Value); err != nil {
    return nil, err
    }

    return &pb.ResponseForGet{Value: req.Value}, nil
    }

    // Delete 实现Cache服务的Delete方法
    func (s *Server) Delete(ctx context.Context, req *pb.Request) (*pb.ResponseForDelete, error) {
    group := GetGroup(req.Group)
    if group == nil {
    return nil, fmt.Errorf("group %s not found", req.Group)
    }

    err := group.Delete(ctx, req.Key)
    return &pb.ResponseForDelete{Value: err == nil}, err
    }
  2. 有关 gRPC 的文章可以参考:

    old:https://geektutu.com/post/quick-go-rpc.html#4-RPC-%E6%9C%8D%E5%8A%A1%E4%B8%8E%E8%B0%83%E7%94%A8
    https://grpc.org.cn/docs/languages/go/basics/#simple-rpc
    身份验证:https://grpc.org.cn/docs/guides/auth/#authentication-api
    https://www.lixueduan.com/posts/grpc/04-encryption-tls/
    健康检查:https://blog.csdn.net/luduoyuan/article/details/129250405
    优雅关机:https://grpc.org.cn/docs/guides/server-graceful-stop/

    本项目的身份验证采用半 SSL(Server.go):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    // NewServer 创建新的服务器实例
    func NewServer(addr, svcName string, opts ...ServerOption) (*Server, error) {
    ...

    if options.TLS {
    creds, err := loadTLSCredentials(options.CertFile, options.KeyFile)
    if err != nil {
    return nil, fmt.Errorf("failed to load TLS credentials: %v", err)
    }
    serverOpts = append(serverOpts, grpc.Creds(creds))
    }

    // note 4. 服务端创建
    srv := &Server{
    addr: addr,
    svcName: svcName,
    groups: &sync.Map{},
    grpcServer: grpc.NewServer(serverOpts...),
    etcdCli: etcdCli,
    stopCh: make(chan error),
    opts: options,
    }

    // note 5. gRPC 服务器注册
    // 注册服务
    pb.RegisterKamaCacheServer(srv.grpcServer, srv)

    // note 6. gRPC 服务健康检查
    // 注册健康检查服务
    healthServer := health.NewServer()
    healthpb.RegisterHealthServer(srv.grpcServer, healthServer)
    healthServer.SetServingStatus(svcName, healthpb.HealthCheckResponse_SERVING)

    return srv, nil
    }
    // loadTLSCredentials 加载TLS证书
    // note 原本创建 ServerSide-TLS 可以通过:grpc.NewServer(grpc.Creds(creds)),这里就是将 creds 的创建抽离出来
    // note 参考网址:https://www.lixueduan.com/posts/grpc/04-encryption-tls/ 他这里依旧是 ServerSide TLS,client 中用的是 insecure。
    func loadTLSCredentials(certFile, keyFile string) (credentials.TransportCredentials, error) {
    cert, err := tls.LoadX509KeyPair(certFile, keyFile)
    if err != nil {
    return nil, err
    }
    return credentials.NewTLS(&tls.Config{
    Certificates: []tls.Certificate{cert},
    }), nil
    }

2.10 接口型函数

  1. 详情可以见:

    https://geektutu.com/post/7days-golang-q1.html

  2. 简单来说就是一个函数的参数是接口时,这个接口还可以是接口型函数。根据业务的复杂程度,上层可以传复杂的接口,也可以是简单的接口型函数。

2.11 panicerror

  1. 当出现程序员无法预知的严重异常(不可恢复的),考虑使用 panic 而不是 error。

  2. 可以参考:

    https://zhuanlan.zhihu.com/p/87345297
    https://www.yingnd.com/golang/214113.html

2.12 主线程中针对其他节点的请求使用协程异步处理。

  1. 这个就是为了防止主线程阻塞而建议的方案,例如 group.go 中,使用异步请求来完成分布节点间数据的同步:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    // Set 设置缓存值
    func (g *Group) Set(ctx context.Context, key string, value []byte) error {
    // note 1. 健壮性检查
    // 检查组是否已关闭
    if atomic.LoadInt32(&g.closed) == 1 {
    return ErrGroupClosed
    }

    if key == "" {
    return ErrKeyRequired
    }
    if len(value) == 0 {
    return ErrValueRequired
    }

    // note 2. 创建缓存视图并设置到本地缓存
    // 创建缓存视图
    view := ByteView{b: cloneBytes(value)}
    // 设置到本地缓存
    if g.expiration > 0 {
    g.mainCache.AddWithExpiration(key, view, time.Now().Add(g.expiration))
    } else {
    g.mainCache.Add(key, view)
    }

    // note 3. 判断是否是同步请求,如果不是同步请求且是分布式系统,那还要对一致性 Hash 上的节点进行同步
    // point 注意同步是异步请求操作,开一个协程来处理
    // 检查是否是从其他节点同步过来的请求
    isPeerRequest := ctx.Value("from_peer") != nil
    // 如果不是从其他节点同步过来的请求,且启用了分布式模式,同步到其他节点
    if !isPeerRequest && g.peers != nil {
    go g.syncToPeers(ctx, "set", key, value)
    }

    return nil
    }

2.13 各种输出的用法:log、logrus 和 fmt 三种使用场景。

  1. 项目中大量涉及错误处理和日志使用。百度一下没有明显的帖子进行区分。
  2. 就项目而言,对于底层 error 的处理,常常使用 fmt。而对于上层的应用,则更多的使用 logrus,其中 logrus 是日志框架,对 log 进行集成。

2.14 定时器

  1. 详见:

    https://developer.aliyun.com/article/1426970

  2. 其中涉及到使用 chan 来关闭定时器,

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func (lruCache *LRUCache) cleanupLoop() {
    // point 定时器的使用
    for {
    select {
    case <-lruCache.cleanupTicker.C:
    lruCache.evict()
    case <-lruCache.closeChan:
    return
    }
    }
    }

    同时在项目中遇到:
    stopCh chan error。这里就联动一下,即 chan 来关闭定时器和协程。
    定时器用的是 chan struct{}。有关协程的关闭,可以看:

    https://zhuanlan.zhihu.com/p/596200504

    其中使用 chan error 来获取协程的报错。

2.15 包以及结构体中变量大小写的规则。

  1. 当时遇到的一些 bug,主要还是小的知识点。

  2. 结构体中变量的大小写影响结构体成员是否可以被不同的包访问:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    // Package KamaCacheLearning -----------------------------
    // @author : EndlessShw
    // @time : 2025/7/18 17:25
    // @Description : ByteView 用于返回缓存结果,是 Value 的只读视图(可以认为是 Value 的子类,因为实现了 Len())
    // -------------------------------------------
    package KamaCacheLearning

    type ByteView struct {
    // point 考虑到只读原则,因此内部的变量小写(同一个包下才能直接调用),同时 cloneBytes 也是小写,变成内部方法
    b []byte
    }

    // point 这里不是指针接收者 pointer receiver。详见文章:https://zhuanlan.zhihu.com/p/76384820 和 https://golang.design/go-questions/interface/receiver/
    // point 选择原则:https://zhuanlan.zhihu.com/p/667384821
    func (byteView ByteView) Len() int { return len(byteView.b) }
    func (byteView ByteView) ByteSlice() []byte { return cloneBytes(byteView.b) }
    func (byteView ByteView) String() string { return string(byteView.b) }
    func cloneBytes(b []byte) []byte {
    c := make([]byte, len(b))
    copy(c, b)
    return c
    }

    这里还涉及到一个指针接收者的问题。改为指针接收者的话,那么只有指针对象才能调用这些方法。

todo:还有一些疑问已询问作者,待反馈。


缓存系统学习
https://endlessshw.top/Golang/KamaCache/缓存系统/
作者
EndlessShw
发布于
2025年8月17日
许可协议