数据冗余和即时修复

数据冗余和即时修复

数据冗余

上一版本的去重主要是用于避免同一个对象在系统中各处都被保存一份副本。本版本的冗余是在受到版本控制的情况下增加对象数据的稳定性。

数据丢失和数据不可用

数据丢失是指信息在存储、传输或处理的过程中由于错误或遗漏而发生损失。

数据在传输过程中的丢失通常是由于网络不稳定导致的,对数据进行校验可以有效检测出传输过程中发生的数据丢失,然后服务端就可以拒绝接收有损的数据。

数据处理过程中的丢失则可能是由于软件或人为的错误而造成的。对于软件错误,需要对其进行修复并重新部署;对于人为错误,需要制定严格的操作规范。

重点是:已经丢失的数据无法恢复

存储硬件损坏是数据在存储过程中丢失的最常见的原因,可能发生的硬件损坏从某个硬盘出现坏道到整个数据中心受灾等不一而足,使用数据备份以及灾难恢复可以在一定程度上弥补损失,但是这通常都会造成几小时到几天不等的停机时间,而且系统最后一次备份点之后加入的数据也依然是无法恢复的。

服务器的维护可能导致数据暂时的不可用,比如预先安排的服务器重启等。和永久性的数据丢失不同,数据不可用是暂时性的,当服务器重启完成后,数据就会恢复可用的状态。但是在服务器重启过程中如果恰好有用户需要对其上的对象进行访问,那么同样会表现成数据丢失。

除了硬件损坏和服务器维护以外,还有数据降解也会造成数据丢失。数据降解是由数据存储设备的非关键故障累积导致的数据逐渐损坏,即使在没有发生任何软件错误或硬件损坏的情况下,存储介质上的数据依然有可能随时间的推移而丢失。比如说,固态硬盘会由于存在缺陷的绝缘封装工艺而导致其中保存的电荷慢慢流失:磁盘上保存的比特会随时间的推移而消磁:潮湿温暖的空气会加速磁性材质的降解等。

单个数据的损坏和丢失是不可避免的。为了保护用户的数据,在计算机存储领域,依靠数据冗余来对抗数据丢失数据冗余不仅可以在一定程度上克服数据丢失,而且在发生数据丢失的时候还可以帮助对其进行修复。

数据冗余

在计算机领域,数据冗余是指在存储和传输的过程中,除了实际需要的数据,还存在一些额外数据用来纠正错误。这些额外的数据可以是一份简单的原始数据的复制,也可以是一些经过精心选择的校验数据,允许在一定程度上检测并修复损坏的数据。

比如,ECC (Error-correcting memory)内存在其每一个存储字中会额外包含存储数据的校验和,可以检测并修正一个比特的错误;RAID 1使用两个硬盘组成单个逻辑存储单元,在任何一个硬盘发生故障的情况下依然可以有效运行:Btrfs和ZFS这样的文件系统通过结合使用校验和以及数据复制这两种方式来检测并修正硬盘上的数据降解等。
在对象存储领域,我们也有很多数据冗余的策略。

对象存储系统的数据冗余策略

最显而易见的冗余策略是每个对象都保留两个或更多副本,由接口服务负责将其存储在不同的数据服务节点上。跟去重之前完全无控制的状况不同,多副本冗余是受接口服务控制的有限副本冗余,副本对象的数量有限,而不是散落在每一个数据服务节点上。多副本冗余的概念很简单,实现也很方便,任何一台数据服务节点停机都不会影响数据的可用性,因为在另外一台数据服务节点上还存在一个副本。

多副本冗余的策略胜在实现简单,但是代价也比较高,本项目将要介绍和实现的冗余策略比多副本冗余复杂,叫作 Reed Solomon 纠删码。在编码理论学中,RS 纠删码属于非二进制循环码,它的实现基于有限域上的一元多项式,并被广泛应用于CD、DVD、蓝光、QR码等消费品技术,DSL、WiMAX等数据传输技术,DVB、ATSC等广播技术以及卫星通信技术等。

RS纠删码允许选择数据片和校验片的数量,本项目选择4个数据片加两个校验片,也就是说会把一个完整的对象平均分成6个分片对象,其中包括4个数据片对象,每个对象的大小是原始对象大小的25%,另外还有两个校验片,其大小和数据片一样。这6个分片对象被接口服务存储在6个不同的数据服务节点上,只需要其中任意4个就可以恢复出完整的对象。

要在对象存储系统中评价一个冗余策略的好坏,主要是衡量该策略对存储空间的要求和其抗数据损坏的能力。对存储空间的要求是指采用的冗余策略相比不使用冗余要额外支付的存储空间,以百分比表示。抗数据损坏的能力以允许损坏或丢失的对象数量来衡量。

比如说,在没有任何冗余策略的情况下,对象占用存储空间的大小就是它本身的大小,而一旦该对象损坏,就丢失了这个对象,那么它对存储空间的要求是100%,而抵御能力则是0。如果采用双副本冗余策略,当任何一个副本损坏或丢失时,都可以通过另外一个副本将其恢复出来。也就是说,这个冗余策略对存储空间要求是200%,抵御数据损坏的能力是1(可以丢失两个副本中的任意1个),而使用4+2的RS码的策略,存储空间要求是150%,抵御能力是2(可以丢失6个分片对象中的任意两个)。总的来说,对于一个M+N的RS码(M个数据片加N个校验片),其对存储空间的要求是(M+N)/M*100%,抵御能力是N。

可以看到,RS码冗余策略对存储空间的要求更低,而抵御数据损坏的能力却更强。选择RS码还有一个好处,就是它会将一个大对象拆分成多个分片对象分别存储,有助于数据服务层的负载均衡。

使用了RS 码冗余策略之后,对象存储系统单个节点的维护就不会导致整个系统的停机。只要每次维护的节点数小于N,那么任意对象的分片数依然大于M,对象就可以正确恢复。

数据冗余的实现

由于底层使用的数据冗余策略对上层的接口不产生任何影响。REST接口不发生变化。

对象PUT流程

使用RS纠删码实现冗余策略的对象PUT流程

接口服务会将客户端PUT上来的数据流切成4+2的分片,然后向6个数据服务节点上传临时对象<hash>.X,同时计算对象散列值。如果散列值一致,6个临时对象会被转成正式的分片对象<hash>.X,其内容被保存在$STORAGE_ROOT/objects/<hash>.X.<hash of shard X>文件中。其中, X的取值范围为0~5的整数,表示该分片的id,0~3是数据片,4和5则是校验片<hash of shard X>是该分片的散列值,在转正时通过计算临时对象的内容得到。

对象GET流程

使用RS冗余编码的对象GET流程

接口服务发送针对对象散列值<hash>的定位信息,含有该对象分片的数据服务共有6个,它们都会发送反馈的消息。接口服务在收到所有反馈消息后向响应的数据服务分别GET分片对象<hash>.X。数据服务读取分片X的内容,并响应接口服务的请求。然后接口服务将数据片0到3的内容组合成对象来响应客户端的请求。

接口服务在进行定位时的超时和之前一样是1s,如果在1s内收到所有6个定位反馈则定位成功:如果在1s超时后收到的定位反馈大于等于4,那么依然可以恢复出完整的对象:如果定位反馈小于等于3,则定位失败。

当数据服务的接口收到的请求对象是<hash>.X,数据服务需要自己去$STORAGE_ROOT/objects目录下寻找<hash>.X开头的文件<hash>.X.<hash of shard X>然后在返回该文件内容时还需要进行数据校验,确保其内容的散列值和<hash of shard X>一致。如果不一致,则需要删除该分片对象并返回404 Not Found。这是为了防止该分片对象被数据降解破坏。

接口服务可能由于分片定位失败或分片散列值不一致等原因无法获取某个分片的数据。如果失败的分片数小于等于2,接口服务会根据成功获取的分片重塑数据,并将新的分片写入随机的数据服务;如果失败的分片数大于等于3,对这样的数据丢失无能为力,只能向客户端返回404 Not Found。

具体实现

为实现RS码,接口服务的locate、heartbeat和objects包需要做相应变化。

接口服务的locate包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func Locate(name string) (locateInfo map[int]string) {
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
q.Publish("dataServers", name)
c := q.Consume()
go func() {
// 设置1秒超时,无论当前收到多少条反馈消息都会立即返回
time.Sleep(time.Second)
q.Close()
}()
locateInfo = make(map[int]string)
// 使用for循环获取6条消息,每条消息包含拥有某个分片的数据服务节点的地址和分片的id
// rs.ALL_SHARDS为rs包中的常数6,代表一共有4+2个分片
for i := 0; i < rs.ALL_SHARDS; i++ {
msg := <-c
if len(msg.Body) == 0 {
return
}
var info types.LocateMessage
json.Unmarshal(msg.Body, &info)
// 获取到的分片信息放到输出参数locateInfo中
locateInfo[info.Id] = info.Addr
}
return
}

// 判断收到的反馈消息数量是否大于等于4
func Exist(name string) bool {
// 大于等于4则说明对象存在,否则说明对象不存在(或者存在也无法读取)
return len(Locate(name)) >= rs.DATA_SHARDS
}
接口服务的heartbeat包

接口服务的heartbeat包也需要进行改动,将原来的返回一个随机数据服务节点的ChooseRandomDataServer函数改为能够返回多个随机数据服务节点ChooseRandomDataServers函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 用于获取上传复原分片的随机数据服务节点。目前已有的分片所在的数据服务节点需要被排除
func ChooseRandomDataServers(n int, exclude map[int]string) (ds []string) {
// 输入参数n表示需要多少个随机数服务节点
// 输入参数exclude表明要求返回的数据服务节点不能包含哪些节点。
// 当定位完成后,实际收到的反馈消息可能不足6个,此时需要进行数据恢复,即根据目前已有的分片将丢失的分片复原出来并再次上传到数据服务
candidates := make([]string, 0)
reverseExcludeMap := make(map[string]int)
for id, addr := range exclude {
reverseExcludeMap[addr] = id
}
servers := GetDataServers()
for i := range servers {
s := servers[i]
_, excluded := reverseExcludeMap[s]
if !excluded {
// 不需要被排除的加入到condaidate数组
candidates = append(candidates, s)
}
}
length := len(candidates)
if length < n {
// 无法满足要求的n个数据服务节点,返回一个空数组
return
}
// 将0-length-1的所有整数乱序排列返回一个数组
p := rand.Perm(length)
// 取前n个作为candicate数组的下标取数据节点地址返回
for i := 0; i < n; i++ {
ds = append(ds, candidates[p[i]])
}
return
}
接口服务的objects包
PUT相关函数

PUT对象时需要用到的putStream函数,调用新的heartbeat.ChooseRandomDataServers函数获取随机数据服务节点地址并调用rs.NewRSPutStream来生成一个数据流:

1
2
3
4
5
6
7
8
9
func putStream(hash string, size int64) (*rs.RSPutStream, error) {
servers := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil)
if len(servers) != rs.ALL_SHARDS {
return nil, fmt.Errorf("cannot find enough dataServer")
}

// 返回一个指向rs.RSPutStream结构体的指针
return rs.NewRSPutStream(servrs, hash, size)
}

创建rs.RSPutStream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
const (
DATA_SHARDS = 4
PARITY_SHARDS = 2
ALL_SHARDS = DATA_SHARDS + PARITY_SHARDS
BLOCK_PER_SHARD = 8000
BLOCK_SIZE = BLOCK_PER_SHARD * DATA_SHARDS
)

// 内嵌一个encoder结构体,RPutStream的使用者可以像访问RPutStream的方法或成员一样访问*encoder的方法或成员
type RSPutStream struct {
*encoder
}

func NewRSPutStream(dataServers []string, hash string, size int64) (*RSPutStream, error) {
// dataServers长度不为6,返回错误
if len(dataServers) != ALL_SHARDS {
return nil, fmt.Errorf("dataServers number mismatch")
}
// 根据size计算出每个分片的大小perShard,也就是size/4再向上取整
perShard := (size + DATA_SHARDS - 1) / DATA_SHARDS
// 创建长度为6的io.Writers数组
writers := make([]io.Writer, ALL_SHARDS)
var e error
for i := range writers {
// writers数组中的每个元素都是一个objectstream.TempPutSterm,用于上传一个分片对象
writers[i], e = objectstream.NewTempPutStream(dataServers[i],
fmt.Sprintf("%s.%d", hash, i), perShard)
if e != nil {
return nil, e
}
}
// 调用NewEncoder函数创建一个encoder结构体指针enc
enc := NewEncoder(writers)
// 将enc作为RSPutSterm的内嵌结构体返回
return &RSPutStream{enc}, nil
}

type encoder struct {
writers []io.Writer
enc reedsolomon.Encoder // reedsolomon.Encoder接口
cache []byte //用于做输入数据缓存的字节数组
}

// 调用reeddolomon.New生成具有4个数据片加两个校验片的RS码编码器enc
func NewEncoder(writers []io.Writer) *encoder {
enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
// 将输入参数writers和enc作为生成的encoder结构体的成员返回
return &encoder{writers, enc, nil}
}

reedsolomon包是一个RS编解码的开源库,需要用以下命令下载该包:
go get github.com/klauspost/reedsolomon

在对象PUT过程中,写入对象数据的流调用的是rs.RSPutStream,实际背后调用的Write方法也不一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// RSPutStream本身没有实现Write方法,实现的时候函数会直接调用其内嵌结构体encoder的Write方法
func (e *encoder) Write(p []byte) (n int, err error) {
length := len(p)
current := 0
// 将p中待写入的数据以块的形式放入缓存
for length != 0 {
next := BLOCK_SIZE - len(e.cache)
if next > length {
next = length
}
e.cache = append(e.cache, p[current:current+next]...)
// 如果缓存已满,调用Flush方法将缓存实际写入writers
// 缓存的上限是每个数据片8000字节,4个数据片共32000字节。
// 如果缓存里剩余的数据不满32000字节就暂不刷新,等待Write方法下一次调用
if len(e.cache) == BLOCK_SIZE {
e.Flush()
}
current += next
length -= next
}
return len(p), nil
}

func (e *encoder) Flush() {
if len(e.cache) == 0 {
return
}
// 调用Split方法将缓存的数据切成4个数据片
shards, _ := e.enc.Split(e.cache)
// 调用Encode方法生成两个校验片
e.enc.Encode(shards)
// 循环将6个片的数据依次写入wtiters并清空缓存
for i := range shards {
e.writers[i].Write(shards[i])
}
e.cache = []byte{}
}

用户上传的对象数据经过散列值校验后,RSPutStrem需要一个Commit方法用来将临时对象转正或删除。

1
2
3
4
5
6
7
8
func (s *RSPutStream) Commit(success bool) {
// 调用RSPutStream的内嵌结构体encoder的Flush方法将缓存中最后的数据写入
s.Flush()
// 对调用encoder的成员数组writers中的元素调用Commit方法将6个临时对象依次转正或删除
for i := range s.writers {
s.writers[i].(*objectstream.TempPutStream).Commit(success)
}
}
GET相关函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func get(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
hash := url.PathEscape(meta.Hash)

stream, e := GetStream(hash, meta.Size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 调用io.Copy将对象数据流写入HTTP响应
_, e = io.Copy(w, stream)
// 如果有错误,说明对象数据在RS解码过程中发生了错误,意味着该对象已经无法被读取
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
stream.Close()
}

// 提供给外部包使用。增加size参数是由于RS码的实现要求每一个数据片的长度完全一样
// 在编码如果对象长度不能被4整除,函数会队最后一个数片进行填充。
// 解码时必须提供对象的准确长度,防止填充数据被当成元数对象数据返回。
func GetStream(hash string, size int64) (*rs.RSGetStream, error) {
// 根据对象散列值hash定位对象
locateInfo := locate.Locate(hash)
// 反馈的定位结果数组长度小于4,返回错误
if len(locateInfo) < rs.DATA_SHARDS {
return nil, fmt.Errorf("object %s locate fail, result %v", hash, locateInfo)
}
dataServers := make([]string, 0)
// 长度不为6,说明对象有部分分片丢失
if len(locateInfo) != rs.ALL_SHARDS {
// 调用heartbeat.ChooseRandomDataServers随机选择用于接收恢复分片的数据服务节点
dataServers = heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS-len(locateInfo), locateInfo)
}
// 调用rs.NewRSGetStream函数创建rs.RSGetStream
return rs.NewRSGetStream(locateInfo, dataServers, hash, size)
}

创建Re.RSGetStream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func NewRSGetStream(locateInfo map[int]string, dataServers []string, hash string, size int64) (*RSGetStream, error) {
// 检查是否满足4+2 的RS码需求
// 不满足,返回错误
if len(locateInfo)+len(dataServers) != ALL_SHARDS {
return nil, fmt.Errorf("dataServers number mismatch")
}

// 创建长度为6的io.Reader数组readers,用于读取6个分片的数据
readers := make([]io.Reader, ALL_SHARDS)
// 遍历6个分片的id
for i := 0; i < ALL_SHARDS; i++ {
// 在locateinfo中查找分片所在的数据服务节点地址
server := locateInfo[i]
// 如果某个分片id相对的数据服务节点地址为空,说明该分片丢失
// 取一个随机数据服务节点补上
if server == "" {
locateInfo[i] = dataServers[0]
dataServers = dataServers[1:]
continue
}
// 数据服务节点存在,调用objectstream.NewGetStream打开一个对象读取流用于读取该分片数据
reader, e := objectstream.NewGetStream(server, fmt.Sprintf("%s.%d", hash, i))
// 打开的流保存在readers数组相应的元素中
if e == nil {
readers[i] = reader
}
}

writers := make([]io.Writer, ALL_SHARDS)
perShard := (size + DATA_SHARDS - 1) / DATA_SHARDS
var e error
// 遍历readers
for i := range readers {
// 第一个次遍历出现nil的情况
// 1.该分片数据服务节点地址为空
// 2.数据服务节点存在但打开流失败
if readers[i] == nil {
// 某个元素为nil,调用NewTempPutStream创建相应的临时对象写入流用于恢复分片
// 打开的流保存到writers数组相应元素中
writers[i], e = objectstream.NewTempPutStream(locateInfo[i], fmt.Sprintf("%s.%d", hash, i), perShard)
if e != nil {
return nil, e
}
}
}
// 处理完成,readers和writers数组形成互补关系
// 对于某个分片id,要么在readers中存在相应的读取流,要么在writers中存在相应的写入流
// 将两个数组以及对象的大小size作为参数调用NewDecoder生成decoder结构体的指针dec
dec := NewDecoder(readers, writers, size)
// 将dec作为RSGetStream的内嵌结构体返回
return &RSGetStream{dec}, nil
}

type decoder struct {
readers []io.Reader
writers []io.Writer
enc reedsolomon.Encoder // 接口,用于RS解码
size int64 // 对象的大小
cache []byte // cache用于缓存读取的数据
cacheSize int // cachSize
total int64 // 当前读了多少字节
}

func NewDecoder(readers []io.Reader, writers []io.Writer, size int64) *decoder {
// 调用reedsolomon.New创建4+2 RS码的解码器enc
enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
// 设置decoder结构体中相应属性后返回
return &decoder{readers, writers, enc, size, nil, 0, 0}
}

对象GET过程中读取对象数据的流调用的是rs.RSGetStream,实际调用的Read方法也发生了变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func (d *decoder) Read(p []byte) (n int, err error) {
// 当cache中没有更多数据时会调用getData方法获取数据
if d.cacheSize == 0 {
// getData返回的e不为nil,说明没能获取更多数据
e := d.getData()
if e != nil {
// 返回0和e通知调用方
return 0, e
}
}
// 函数参数p的数组长度
length := len(p)
if d.cacheSize < length {
// length超出当前缓冲区的数据大小,令length等于缓存的数据大小
length = d.cacheSize
}
// 将缓存中length长度的数据复制给输入参数p,然后调整缓存,仅保留剩下的部分
d.cacheSize -= length
copy(p, d.cache[:length])
d.cache = d.cache[length:]
// 返回length,通知调用方本次读取一共有多少数据被复制到p中
return length, nil
}

func (d *decoder) getData() error {
// 先判断当前解码的数据大小是否等于对象原始大小
// 如果已经相等,说明所有数都已经被读取,返回io.EOF
if d.total == d.size {
return io.EOF
}
// 如果还有数需要读取,创建一个长度为6的数组
// 每个元素都是一个字节数组,用于保存相应分片中读取的数据
shards := make([][]byte, ALL_SHARDS)
repairIds := make([]int, 0)
// 遍历6个shards
for i := range shards {
// 若某个分片对应的reader为nil,说明该分片已经丢失
if d.readers[i] == nil {
// 在repairIds中添加该分片的id
repairIds = append(repairIds, i)
} else {
// 对应的reader不为nil,那么对应的shards需要被初始化为一个长度为8000的字节数组
shards[i] = make([]byte, BLOCK_PER_SHARD)
// 调用io.ReadFull从reader中完整读取8000字节的数据保存在shards中。
n, e := io.ReadFull(d.readers[i], shards[i])
// 如果发生了非EOF失败,该shards被置为nil
if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF {
shards[i] = nil
// 读取数据长度n不到8000字节,将shards实际的长度缩减为n
} else if n != BLOCK_PER_SHARD {
shards[i] = shards[i][:n]
}
}
}
// 调用成员enc的Reconstruct方法尝试将被置为nil的shards恢复出来
e := d.enc.Reconstruct(shards)
// 若发生错误,说明对象遭到不可恢复的破坏,返回错误给上层
if e != nil {
return e
}
// 恢复成功,6个shards中都保存了对应分片的正确数据
// 遍历repairIds,将需要恢复的分片数据写入到相应的writer
for i := range repairIds {
id := repairIds[i]
d.writers[id].Write(shards[id])
}
// 遍历4个数据分片
for i := 0; i < DATA_SHARDS; i++ {
shardSize := int64(len(shards[i]))
if d.total+shardSize > d.size {
shardSize -= d.total + shardSize - d.size
}
// 将每个分片中的数据添加到缓存cache中
d.cache = append(d.cache, shards[i][:shardSize]...)
// 修改缓存当前的大小cacheSize
d.cacheSize += int(shardSize)
// 修改当前已经读取的全部数据的大小total
d.total += shardSize
}
return nil
}
数据服务

数据服务的locate包:由于在磁盘上保存的对象文件名格式发生了变化,locate的实现也有相应变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
var objects = make(map[string]int)
var mutex sync.Mutex

// 告知某个对象是否存在,同时告知本节点保存的是该对象哪个分片
func Locate(hash string) int {
mutex.Lock()
id, ok := objects[hash]
mutex.Unlock()
if !ok {
return -1
}
return id
}

// 将对象及其分片id加入缓存
func Add(hash string, id int) {
mutex.Lock()
objects[hash] = id
mutex.Unlock()
}

func Del(hash string) {
mutex.Lock()
delete(objects, hash)
mutex.Unlock()
}

func StartLocate() {
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
q.Bind("dataServers")
c := q.Consume()
for msg := range c {
// 读取来自接口服务需要定位的对象散列值hash
hash, e := strconv.Unquote(string(msg.Body))
if e != nil {
panic(e)
}
// 调用Locate获得分片id
id := Locate(hash)
if id != -1 {
// id不为-1,将自身的节点监听地址和id打包成一个types.Locate Message结构体作为反馈消息发送
q.Send(msg.ReplyTo, types.LocateMessage{Addr: os.Getenv("LISTEN_ADDRESS"), Id: id})
}
}
}

func CollectObjects() {
// 获取存储目录下的所有文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")
for i := range files {
// 以'.'分割其基本文件名,获得对象的散列值hash以及分片id
file := strings.Split(filepath.Base(files[i]), ".")
if len(file) != 3 {
panic(files[i])
}
hash := file[0]
id, e := strconv.Atoi(file[1])
if e != nil {
panic(e)
}
// 将对象的散列值hash以及分片id加入定位缓存
objects[hash] = id
}
}

数据服务的temp包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (t *tempInfo) hash() string {
s := strings.Split(t.Name, ".")
return s[0]
}

func (t *tempInfo) id() int {
s := strings.Split(t.Name, ".")
id, _ := strconv.Atoi(s[1])
return id
}

func commitTempObject(datFile string, tempinfo *tempInfo) {
// 读取临时对象的数据并计算散列值<hash of shard X>
f, _ := os.Open(datFile)
d := url.PathEscape(utils.CalculateHash(f))
f.Close()
os.Rename(datFile, os.Getenv("STORAGE_ROOT")+"/objects/"+tempinfo.Name+"."+d)
// 调用locate.Add,以<hash>为键,分片的id为值添加进定位缓存
locate.Add(tempinfo.hash(), tempinfo.id())
}

数据服务的objects包:

getFile函数发生改变:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func getFile(name string) string {
// 在$STORAGE_ROOT/objects/目录下查找所有以<hash>.X开头的文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/" + name + ".*")
// 找不到,返回空字符串
if len(files) != 1 {
return ""
}
// 找到之后计算散列值
file := files[0]
h := sha256.New()
sendFile(h, file)
d := url.PathEscape(base64.StdEncoding.EncodeToString(h.Sum(nil)))
hash := strings.Split(file, ".")[2]
// 如果与<hash of shard X>的值不匹配则删除该对象并返回空字符串
if d != hash {
log.Println("object hash mismatch, remove", file)
locate.Del(hash)
os.Remove(file)
return ""
}
// 否则返回对象的文件名
return file
}

测试

本版本所有代码及测试用例可见添加数据冗余和修复的分布式对象存储系统

参考

《分布式对象存储—原理、架构及Go语言实现》