缓存系统学习
缓存系统
1. 前言
学习自:
有些地方(bug)进行了修改,由于修改了变量的名字,所以没有给原来的项目提 pr,不过已和原作者反馈。
2. 知识点 List
- 使用
With
方案来对结构体初始化。 - 一致性 Hash。
- BKDR 计算 Hash 值。
- 掩码思想的使用。
- 函数的参数依旧是函数的回调写法。
- singleflight
- 使用 etcd 注册服务,包括客户端的创建、连接发起、注册服务。
- Context 使用:请求时间限定和变量传递。
- gRPC 的使用:gRPC 客户端和服务端的创建、优雅关机和健康检查。
- 接口型函数
- panic 和 error
- 主线程中针对其他节点的请求使用协程异步处理。
- 各种输出的用法:log、logrus 和 fmt 三种使用场景。
- 定时器以及其开关。
- 包以及结构体中变量大小写的规则。
3. 知识点细节
2.1 使用 With
方案来对结构体初始化
以 peers.go 中的
PeerPicker
为例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81package KamaCacheLearning
import (
"KamaCacheLearning/consistenthash"
"KamaCacheLearning/registry"
"context"
"fmt"
"github.com/sirupsen/logrus"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"strings"
"sync"
"time"
)
const defaultSvcName = "kama-cache-learning"
type PeerPicker interface {
// PickPeer 根据键选择合适的缓存节点
// @Return peer 为节点实例、ok 为是否找到、self 为是否本地节点
PickPeer(key string) (peer Peer, ok bool, self bool)
Close() error
}
type ClientPicker struct {
selfAddr string
svcName string
mutex sync.RWMutex
etcdClient *clientv3.Client
clients map[string]*Client // 节点地址与 Client 的映射
consHashMap *consistenthash.Map
ctx context.Context
cancel context.CancelFunc
}
type PickerOption func(*ClientPicker)
func WithServiceName(svcName string) PickerOption {
return func(c *ClientPicker) {
c.svcName = svcName
}
}
func NewClientPicker(selfAddr string, opts ...PickerOption) (*ClientPicker, error) {
// 1. 初始化 ctx 和变量
ctx, cancel := context.WithCancel(context.Background())
clientPicker := &ClientPicker{
selfAddr: selfAddr,
svcName: defaultSvcName,
clients: make(map[string]*Client),
ctx: ctx,
cancel: cancel,
consHashMap: consistenthash.NewConsistentHashMap(),
}
// 2. 调用特殊初始化方法
for _, opt := range opts {
opt(clientPicker)
}
// 3. etcdClient 初始化
etcdClient, err := clientv3.New(
clientv3.Config{
Endpoints: registry.DefaultEtcdConfig.Endpoints,
DialTimeout: registry.DefaultEtcdConfig.DialTimeout,
})
if err != nil {
cancel()
return nil, fmt.Errorf("fail to create etcd client: %w", err)
}
clientPicker.etcdClient = etcdClient
// 4. 启动服务发现
err = clientPicker.startSvcDiscovery()
if err != nil {
cancel()
err := etcdClient.Close()
if err != nil {
return nil, fmt.Errorf("fail to close etcd client: %w", err)
}
return nil, fmt.Errorf("fail to startSvcDiscovery: %w", err)
}
// 5. 返回值
return clientPicker, nil
}可以看到,首先命名一个使用结构体的函数,随后创建
With
开头的函数,对外开放方法以修改结构体中的变量,最后在结构体初始化中用for
进行调用With
开头的函数。
其实 gRPC 创建客户端也用到了类似的手段:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26grpcConn, err := grpc.NewClient(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.WaitForReady(true)))
// 来自 gRPC
func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error) {
// ...
// Apply dial options.
disableGlobalOpts := false
for _, opt := range opts {
if _, ok := opt.(*disableGlobalDialOptions); ok {
disableGlobalOpts = true
break
}
}
if !disableGlobalOpts {
for _, opt := range globalDialOptions {
opt.apply(&cc.dopts)
}
}
for _, opt := range opts {
opt.apply(&cc.dopts)
}
// ...
}
2.2 一致性 Hash
网上也有相关的教程:
主要用于缓存节点的动态扩展和缩容问题,是一种常用的负载均衡策略。
2.3 BKDR 计算 Hash 值
一种用的很广泛的,用于计算字符串 Hash 的算法,直接用即可。
项目 lru2store.go 中使用了该方法计算 key 的 hash 以定位具体的桶:
1
2
3
4
5
6
7
8
9
10
11
12
13
14func (lru2Store *LRU2Store) getKeyIndex(key string) int32 {
//log.Println(key, "index is", hashBKRD(key)&lru2Store.mask)
return hashBKRD(key) & lru2Store.mask
}
// 实现了 BKDR 哈希算法,用于计算键的哈希值
// note 这是一种用于计算字符串 hash 值的算法,用的很多很广泛
func hashBKRD(s string) (hash int32) {
for i := 0; i < len(s); i++ {
hash = hash*131 + int32(s[i])
}
return hash
}
2.4 掩码思想的使用
在 lru2store.go 中,将桶的数量定义为 2 的幂数,然后使用掩码结合 BKDR Hash 值来定位一个 key 应该寻找的桶。
项目中给出了掩码的计算方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// maskOfNextPowOf2 计算大于或等于输入值的最近 2 的幂次方减一作为掩码值
// note 最多 11111111,8 位全 1 掩码值,因此最大个数是 2^9 次方个桶,也就是 512
func maskOfNextPowOf2(cap uint16) uint16 {
// note 这一行的意思是如果 cap 本身就是 2 的幂,那么返回 cap - 1(其二进制就是 0111111...)
if cap > 0 && cap&(cap-1) == 0 {
return cap - 1
}
// 通过多次右移和按位或操作,将二进制中最高的 1 位右边的所有位都填充为 1
// note 给定一个二进制为 1xxxxxxx,可以跟着试一下。
// note 每次右移的位数是前一次的两倍,直到右移的位数覆盖或超过最高位 1 的位置。
cap |= cap >> 1
cap |= cap >> 2
cap |= cap >> 4
return cap | (cap >> 8)
}
2.5 函数的参数是函数
- lru2.go 中定义了遍历的函数: 可以看到函数的参数是另外一个函数。
1
2
3
4
5
6
7
8
9
10
11// walk 为遍历方法,会遍历每一个节点。
// @param walker 遍历到节点时需要执行的回调方法
func (lru2cache *LRU2Cache) walk(walker func(key string, value Value, expireAt int64) bool) {
// 从首节点开始遍历
for index := *lru2cache.frontPoint; index != 0; index = lru2cache.doubleLink[index][next] {
tempNode := lru2cache.results[index-1]
if tempNode.expireAt > 0 && !walker(tempNode.key, tempNode.value, tempNode.expireAt) {
return
}
}
}
2.6 singleflight 单飞机制
可以参照:
注意一下这种机制的问题。
这个项目的 singleflight 使用
sync.Map
来保证并发安全:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// Package util -----------------------------
// @author : EndlessShw
// @time : 2025/7/29 15:54
// @Description : singleflight 保证缓存不被击穿,阻止大量请求涌入数据库
// -------------------------------------------
package util
import "sync"
type call struct {
mutexWG sync.WaitGroup
val interface{}
err error
}
type Group struct {
resultMap sync.Map
}
func (group *Group) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
if result, ok := group.resultMap.Load(key); ok {
resultCall := result.(*call)
resultCall.mutexWG.Wait()
return resultCall.val, resultCall.err
}
// note 相比与不使用 sync.Map 的普通版本,这里的两步时间有可能导致多个首次请求
resultCall := &call{}
resultCall.mutexWG.Add(1)
group.resultMap.Store(key, resultCall)
resultCall.val, resultCall.err = fn()
resultCall.mutexWG.Done()
go func() {
group.resultMap.Delete(key)
}()
return resultCall.val, resultCall.err
}
2.7 使用 etcd 注册服务
有关 etcd 的介绍,可以详见:
etcd 和 Redis 有相似之处,是键值对数据库。和 Redis 不同的是,它是分布式数据库(Redis 是主从)。使用 gRPC 进行通讯。
有关其基本操作和机制详解,可见:
https://www.ztong-techhub.com/archives/Oc9dBGEO
https://topgoer.com/%E6%95%B0%E6%8D%AE%E5%BA%93%E6%93%8D%E4%BD%9C/go%E6%93%8D%E4%BD%9Cetcd/%E6%93%8D%E4%BD%9Cetcd.html
https://zhuanlan.zhihu.com/p/111800017除了基本的 etcd 的 crud,本项目还涉及了 Lease 和 Watch 机制。
Lease 机制在本项目是为了保证服务注册的时间(Register.go):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70// Register 注册服务到etcd
func Register(svcName, addr string, stopCh <-chan error) error {
// note 1. etcd 客户端创建
cli, err := clientv3.New(clientv3.Config{
Endpoints: DefaultConfig.Endpoints,
DialTimeout: DefaultConfig.DialTimeout,
})
if err != nil {
return fmt.Errorf("failed to create etcd client: %v", err)
}
// note 2. 获取本地 IP 并打印(未发现本地 IP 的后续使用
localIP, err := getLocalIP()
if err != nil {
cli.Close()
return fmt.Errorf("failed to get local IP: %v", err)
}
if addr[0] == ':' {
addr = fmt.Sprintf("%s%s", localIP, addr)
}
// note 3. 创建租约
// point 租约在 etcd 中就是针对 kv 的生效时间
lease, err := cli.Grant(context.Background(), 10) // 增加租约时间到10秒
if err != nil {
cli.Close()
return fmt.Errorf("failed to create lease: %v", err)
}
// 注册服务,使用完整的key路径
key := fmt.Sprintf("/services/%s/%s", svcName, addr)
_, err = cli.Put(context.Background(), key, addr, clientv3.WithLease(lease.ID))
if err != nil {
cli.Close()
return fmt.Errorf("failed to put key-value to etcd: %v", err)
}
// 保持租约
// point KeepAlive 本质是自动续约,只要 context 没有被 cancel 或者 timeout,那么其就会被一直续租
keepAliveCh, err := cli.KeepAlive(context.Background(), lease.ID)
if err != nil {
cli.Close()
return fmt.Errorf("failed to keep lease alive: %v", err)
}
// 处理租约续期和服务注销
go func() {
defer cli.Close()
for {
select {
case <-stopCh:
// 服务注销,撤销租约
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
cli.Revoke(ctx, lease.ID)
cancel()
return
case resp, ok := <-keepAliveCh:
// note 对续约租期过程中的失败进行一个处理
if !ok {
logrus.Warn("keep alive channel closed")
return
}
logrus.Debugf("successfully renewed lease: %d", resp.ID)
}
}
}()
logrus.Infof("Service registered: %s at %s", svcName, addr)
return nil
}Watch 机制用于监听 key 或范围的变化,本项目中,
PeerPicker
用于监听其他服务节点的变化并同时进行 client 的修改(以保证对其他节点的连通性):1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48// watchServiceChanges 监听服务实例变化
// note 监听需要额外开始协程
func (p *ClientPicker) watchServiceChanges() {
watcher := clientv3.NewWatcher(p.etcdCli)
watchChan := watcher.Watch(p.ctx, "/services/"+p.svcName, clientv3.WithPrefix())
// note ClientPicker.ctx 生命周期和 ClientPicker 保持一致
for {
select {
case <-p.ctx.Done():
watcher.Close()
return
case resp := <-watchChan:
// note resp 是 clientv3.WatchResponse,里面有个 Events 数组
p.handleWatchEvents(resp.Events)
}
}
}
// handleWatchEvents 处理监听到的事件
func (p *ClientPicker) handleWatchEvents(events []*clientv3.Event) {
p.mu.Lock()
defer p.mu.Unlock()
for _, event := range events {
// note key 的格式是:/services/svcName/addr,value 的格式是 addr
addr := string(event.Kv.Value)
if addr == p.selfAddr {
continue
}
switch event.Type {
// note 有 put 时就增加
case clientv3.EventTypePut:
if _, exists := p.clients[addr]; !exists {
p.set(addr)
logrus.Infof("New service discovered at %s", addr)
}
// note 有 Delete 时就跟着删除,注意 client 内部有 rpc 连接需要断开,所以要调用 Close
case clientv3.EventTypeDelete:
if client, exists := p.clients[addr]; exists {
client.Close()
p.remove(addr)
logrus.Infof("Service removed at %s", addr)
}
}
}
}
2.8 Context
的使用
上下文的使用可以参考这篇文章:
本项目主要是超时控制用的比较多,只要发起非本机的请求基本都会用到,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25// fetchAllServices 获取当前服务相关的所有实例
func (p *ClientPicker) fetchAllServices() error {
ctx, cancel := context.WithTimeout(p.ctx, 3*time.Second)
defer cancel()
// note 通过 etcd 与服务打交道
// note clientv3.WithPrefix 意思是匹配所有带上前缀的 key
resp, err := p.etcdCli.Get(ctx, "/services/"+p.svcName, clientv3.WithPrefix())
if err != nil {
return fmt.Errorf("failed to get all services: %v", err)
}
p.mu.Lock()
defer p.mu.Unlock()
for _, kv := range resp.Kvs {
addr := string(kv.Value)
// todo
if addr != "" && addr != p.selfAddr {
p.set(addr)
logrus.Infof("Discovered service at %s", addr)
}
}
return nil
}
2.9 gRPC
本项目使用 gRPC 而不是 HTTP 来完成分布式节点间的数据获取与同步。每个节点的 server 端都有一个 gRPC 服务器,同时 client 端又有 gRPC 客户端。双方约定的方法通过 protobuf 定义。例如本项目 server 端就实现了 protobuf 中的
Get
、Set
和Delete
方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24syntax = "proto3";
package pb;
option go_package = "./";
message Request {
string group = 1;
string key = 2;
bytes value = 3;
}
message ResponseForGet {
bytes value = 1;
}
message ResponseForDelete {
bool value = 1;
}
service KamaCache {
rpc Get(Request) returns (ResponseForGet);
rpc Set(Request) returns (ResponseForGet);
rpc Delete(Request) returns(ResponseForDelete);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51// note 接口的实现按照 protobuf 文件来
// Get 实现Cache服务的Get方法
func (s *Server) Get(ctx context.Context, req *pb.Request) (*pb.ResponseForGet, error) {
// note 1. 获取 group
group := GetGroup(req.Group)
if group == nil {
return nil, fmt.Errorf("group %s not found", req.Group)
}
// note 2. 调用 group.Get,同时将结果用 protobuf 中定义的返回类包裹
view, err := group.Get(ctx, req.Key)
if err != nil {
return nil, err
}
return &pb.ResponseForGet{Value: view.ByteSLice()}, nil
}
// Set 实现Cache服务的Set方法
func (s *Server) Set(ctx context.Context, req *pb.Request) (*pb.ResponseForGet, error) {
group := GetGroup(req.Group)
if group == nil {
return nil, fmt.Errorf("group %s not found", req.Group)
}
// note 当时写 group.Set 时涉及到节点同步的问题,这里就是针对同步问题定义 context。
// note 下文的 Delete 同理
// 从 context 中获取标记,如果没有则创建新的 context
fromPeer := ctx.Value("from_peer")
if fromPeer == nil {
ctx = context.WithValue(ctx, "from_peer", true)
}
if err := group.Set(ctx, req.Key, req.Value); err != nil {
return nil, err
}
return &pb.ResponseForGet{Value: req.Value}, nil
}
// Delete 实现Cache服务的Delete方法
func (s *Server) Delete(ctx context.Context, req *pb.Request) (*pb.ResponseForDelete, error) {
group := GetGroup(req.Group)
if group == nil {
return nil, fmt.Errorf("group %s not found", req.Group)
}
err := group.Delete(ctx, req.Key)
return &pb.ResponseForDelete{Value: err == nil}, err
}有关 gRPC 的文章可以参考:
old:https://geektutu.com/post/quick-go-rpc.html#4-RPC-%E6%9C%8D%E5%8A%A1%E4%B8%8E%E8%B0%83%E7%94%A8
https://grpc.org.cn/docs/languages/go/basics/#simple-rpc
身份验证:https://grpc.org.cn/docs/guides/auth/#authentication-api
https://www.lixueduan.com/posts/grpc/04-encryption-tls/
健康检查:https://blog.csdn.net/luduoyuan/article/details/129250405
优雅关机:https://grpc.org.cn/docs/guides/server-graceful-stop/本项目的身份验证采用半 SSL(Server.go):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47// NewServer 创建新的服务器实例
func NewServer(addr, svcName string, opts ...ServerOption) (*Server, error) {
...
if options.TLS {
creds, err := loadTLSCredentials(options.CertFile, options.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load TLS credentials: %v", err)
}
serverOpts = append(serverOpts, grpc.Creds(creds))
}
// note 4. 服务端创建
srv := &Server{
addr: addr,
svcName: svcName,
groups: &sync.Map{},
grpcServer: grpc.NewServer(serverOpts...),
etcdCli: etcdCli,
stopCh: make(chan error),
opts: options,
}
// note 5. gRPC 服务器注册
// 注册服务
pb.RegisterKamaCacheServer(srv.grpcServer, srv)
// note 6. gRPC 服务健康检查
// 注册健康检查服务
healthServer := health.NewServer()
healthpb.RegisterHealthServer(srv.grpcServer, healthServer)
healthServer.SetServingStatus(svcName, healthpb.HealthCheckResponse_SERVING)
return srv, nil
}
// loadTLSCredentials 加载TLS证书
// note 原本创建 ServerSide-TLS 可以通过:grpc.NewServer(grpc.Creds(creds)),这里就是将 creds 的创建抽离出来
// note 参考网址:https://www.lixueduan.com/posts/grpc/04-encryption-tls/ 他这里依旧是 ServerSide TLS,client 中用的是 insecure。
func loadTLSCredentials(certFile, keyFile string) (credentials.TransportCredentials, error) {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
return nil, err
}
return credentials.NewTLS(&tls.Config{
Certificates: []tls.Certificate{cert},
}), nil
}
2.10 接口型函数
详情可以见:
简单来说就是一个函数的参数是接口时,这个接口还可以是接口型函数。根据业务的复杂程度,上层可以传复杂的接口,也可以是简单的接口型函数。
2.11 panic
和 error
当出现程序员无法预知的严重异常(不可恢复的),考虑使用 panic 而不是 error。
可以参考:
https://zhuanlan.zhihu.com/p/87345297
https://www.yingnd.com/golang/214113.html
2.12 主线程中针对其他节点的请求使用协程异步处理。
- 这个就是为了防止主线程阻塞而建议的方案,例如 group.go 中,使用异步请求来完成分布节点间数据的同步:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36// Set 设置缓存值
func (g *Group) Set(ctx context.Context, key string, value []byte) error {
// note 1. 健壮性检查
// 检查组是否已关闭
if atomic.LoadInt32(&g.closed) == 1 {
return ErrGroupClosed
}
if key == "" {
return ErrKeyRequired
}
if len(value) == 0 {
return ErrValueRequired
}
// note 2. 创建缓存视图并设置到本地缓存
// 创建缓存视图
view := ByteView{b: cloneBytes(value)}
// 设置到本地缓存
if g.expiration > 0 {
g.mainCache.AddWithExpiration(key, view, time.Now().Add(g.expiration))
} else {
g.mainCache.Add(key, view)
}
// note 3. 判断是否是同步请求,如果不是同步请求且是分布式系统,那还要对一致性 Hash 上的节点进行同步
// point 注意同步是异步请求操作,开一个协程来处理
// 检查是否是从其他节点同步过来的请求
isPeerRequest := ctx.Value("from_peer") != nil
// 如果不是从其他节点同步过来的请求,且启用了分布式模式,同步到其他节点
if !isPeerRequest && g.peers != nil {
go g.syncToPeers(ctx, "set", key, value)
}
return nil
}
2.13 各种输出的用法:log、logrus 和 fmt 三种使用场景。
- 项目中大量涉及错误处理和日志使用。百度一下没有明显的帖子进行区分。
- 就项目而言,对于底层 error 的处理,常常使用 fmt。而对于上层的应用,则更多的使用 logrus,其中 logrus 是日志框架,对 log 进行集成。
2.14 定时器
详见:
其中涉及到使用
chan
来关闭定时器,1
2
3
4
5
6
7
8
9
10
11func (lruCache *LRUCache) cleanupLoop() {
// point 定时器的使用
for {
select {
case <-lruCache.cleanupTicker.C:
lruCache.evict()
case <-lruCache.closeChan:
return
}
}
}同时在项目中遇到:
stopCh chan error
。这里就联动一下,即chan
来关闭定时器和协程。
定时器用的是chan struct{}
。有关协程的关闭,可以看:其中使用
chan error
来获取协程的报错。
2.15 包以及结构体中变量大小写的规则。
当时遇到的一些 bug,主要还是小的知识点。
结构体中变量的大小写影响结构体成员是否可以被不同的包访问:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22// Package KamaCacheLearning -----------------------------
// @author : EndlessShw
// @time : 2025/7/18 17:25
// @Description : ByteView 用于返回缓存结果,是 Value 的只读视图(可以认为是 Value 的子类,因为实现了 Len())
// -------------------------------------------
package KamaCacheLearning
type ByteView struct {
// point 考虑到只读原则,因此内部的变量小写(同一个包下才能直接调用),同时 cloneBytes 也是小写,变成内部方法
b []byte
}
// point 这里不是指针接收者 pointer receiver。详见文章:https://zhuanlan.zhihu.com/p/76384820 和 https://golang.design/go-questions/interface/receiver/
// point 选择原则:https://zhuanlan.zhihu.com/p/667384821
func (byteView ByteView) Len() int { return len(byteView.b) }
func (byteView ByteView) ByteSlice() []byte { return cloneBytes(byteView.b) }
func (byteView ByteView) String() string { return string(byteView.b) }
func cloneBytes(b []byte) []byte {
c := make([]byte, len(b))
copy(c, b)
return c
}这里还涉及到一个指针接收者的问题。改为指针接收者的话,那么只有指针对象才能调用这些方法。
todo:还有一些疑问已询问作者,待反馈。