数据冗余和即时修复 数据冗余 上一版本的去重主要是用于避免同一个对象在系统中各处都被保存一份副本。本版本的冗余是在受到版本控制的情况下增加对象数据的稳定性。
数据丢失和数据不可用 数据丢失是指信息在存储、传输或处理的过程中由于错误或遗漏而发生损失。
数据在传输过程中的丢失通常是由于网络不稳定导致的 ,对数据进行校验 可以有效检测出传输过程中发生的数据丢失,然后服务端就可以拒绝接收有损的数据。
数据处理过程中的丢失则可能是由于软件或人为的错误而造成的 。对于软件错误,需要对其进行修复并重新部署;对于人为错误,需要制定严格的操作规范。
重点是:已经丢失的数据无法恢复 。
存储硬件损坏是数据在存储过程中丢失的最常见的原因,可能发生的硬件损坏从某个硬盘出现坏道到整个数据中心受灾等不一而足,使用数据备份以及灾难恢复可以在一定程度上弥补损失,但是这通常都会造成几小时到几天不等的停机时间,而且系统最后一次备份点之后加入的数据也依然是无法恢复的。
服务器的维护可能导致数据暂时的不可用 ,比如预先安排的服务器重启等。和永久性的数据丢失不同,数据不可用是暂时性的,当服务器重启完成后,数据就会恢复可用的状态。但是在服务器重启过程中如果恰好有用户需要对其上的对象进行访问,那么同样会表现成数据丢失。
除了硬件损坏和服务器维护以外,还有数据降解也会造成数据丢失。数据降解是由数据存储设备的非关键故障累积导致的数据逐渐损坏,即使在没有发生任何软件错误或硬件损坏的情况下,存储介质上的数据依然有可能随时间的推移而丢失。 比如说,固态硬盘会由于存在缺陷的绝缘封装工艺而导致其中保存的电荷慢慢流失:磁盘上保存的比特会随时间的推移而消磁:潮湿温暖的空气会加速磁性材质的降解等。
单个数据的损坏和丢失是不可避免的。为了保护用户的数据,在计算机存储领域,依靠数据冗余来对抗数据丢失 。数据冗余不仅可以在一定程度上克服数据丢失,而且在发生数据丢失的时候还可以帮助对其进行修复。
数据冗余 在计算机领域,数据冗余是指在存储和传输的过程中,除了实际需要的数据,还存在一些额外数据用来纠正错误。这些额外的数据可以是一份简单的原始数据的复制,也可以是一些经过精心选择的校验数据,允许在一定程度上检测并修复损坏的数据。
比如,ECC (Error-correcting memory)内存在其每一个存储字中会额外包含存储数据的校验和,可以检测并修正一个比特的错误;RAID 1使用两个硬盘组成单个逻辑存储单元,在任何一个硬盘发生故障的情况下依然可以有效运行:Btrfs和ZFS这样的文件系统通过结合使用校验和以及数据复制这两种方式来检测并修正硬盘上的数据降解等。 在对象存储领域,我们也有很多数据冗余的策略。
对象存储系统的数据冗余策略 最显而易见的冗余策略是每个对象都保留两个或更多副本,由接口服务负责将其存储在不同的数据服务节点上。跟去重之前完全无控制的状况不同,多副本冗余是受接口服务控制的有限副本冗余,副本对象的数量有限,而不是散落在每一个数据服务节点上。多副本冗余的概念很简单,实现也很方便,任何一台数据服务节点停机都不会影响数据的可用性,因为在另外一台数据服务节点上还存在一个副本。
多副本冗余的策略胜在实现简单,但是代价也比较高,本项目将要介绍和实现的冗余策略比多副本冗余复杂,叫作 Reed Solomon 纠删码。在编码理论学中,RS 纠删码属于非二进制循环码,它的实现基于有限域上的一元多项式,并被广泛应用于CD、DVD、蓝光、QR码等消费品技术,DSL、WiMAX等数据传输技术,DVB、ATSC等广播技术以及卫星通信技术等。
RS纠删码允许选择数据片和校验片的数量,本项目选择4个数据片加两个校验片,也就是说会把一个完整的对象平均分成6个分片对象,其中包括4个数据片对象,每个对象的大小是原始对象大小的25%,另外还有两个校验片,其大小和数据片一样。这6个分片对象被接口服务存储在6个不同的数据服务节点上,只需要其中任意4个就可以恢复出完整的对象。
要在对象存储系统中评价一个冗余策略的好坏,主要是衡量该策略对存储空间的要求和其抗数据损坏的能力 。对存储空间的要求是指采用的冗余策略相比不使用冗余要额外支付的存储空间,以百分比表示。抗数据损坏的能力以允许损坏或丢失的对象数量来衡量。
比如说,在没有任何冗余策略的情况下,对象占用存储空间的大小就是它本身的大小,而一旦该对象损坏,就丢失了这个对象,那么它对存储空间的要求是100%,而抵御能力则是0。如果采用双副本冗余策略,当任何一个副本损坏或丢失时,都可以通过另外一个副本将其恢复出来。也就是说,这个冗余策略对存储空间要求是200%,抵御数据损坏的能力是1(可以丢失两个副本中的任意1个),而使用4+2的RS码的策略,存储空间要求是150%,抵御能力是2(可以丢失6个分片对象中的任意两个)。总的来说,对于一个M+N的RS码(M个数据片加N个校验片),其对存储空间的要求是(M+N)/M*100%,抵御能力是N。
可以看到,RS码冗余策略对存储空间的要求更低,而抵御数据损坏的能力却更强。选择RS码还有一个好处,就是它会将一个大对象拆分成多个分片对象分别存储,有助于数据服务层的负载均衡。
使用了RS 码冗余策略之后,对象存储系统单个节点的维护就不会导致整个系统的停机。只要每次维护的节点数小于N,那么任意对象的分片数依然大于M,对象就可以正确恢复。
数据冗余的实现 由于底层使用的数据冗余策略对上层的接口不产生任何影响。REST接口不发生变化。
对象PUT流程
接口服务会将客户端PUT上来的数据流切成4+2的分片,然后向6个数据服务节点上传临时对象<hash>.X
,同时计算对象散列值。如果散列值一致,6个临时对象会被转成正式的分片对象<hash>.X
,其内容被保存在$STORAGE_ROOT/objects/<hash>.X.<hash of shard X>
文件中。其中, X的取值范围为0~5
的整数,表示该分片的id,0~3
是数据片,4和5则是校验片。<hash of shard X>
是该分片的散列值,在转正时通过计算临时对象的内容得到。
对象GET流程
接口服务发送针对对象散列值<hash>
的定位信息,含有该对象分片的数据服务共有6个,它们都会发送反馈的消息。接口服务在收到所有反馈消息后向响应的数据服务分别GET分片对象<hash>.X
。数据服务读取分片X的内容,并响应接口服务的请求。然后接口服务将数据片0到3的内容组合成对象来响应客户端的请求。
接口服务在进行定位时的超时和之前一样是1s,如果在1s内收到所有6个定位反馈则定位成功:如果在1s超时后收到的定位反馈大于等于4,那么依然可以恢复出完整的对象:如果定位反馈小于等于3,则定位失败。
当数据服务的接口收到的请求对象是<hash>.X
,数据服务需要自己去$STORAGE_ROOT/objects
目录下寻找<hash>.X
开头的文件<hash>.X.<hash of shard X>
,然后在返回该文件内容时还需要进行数据校验,确保其内容的散列值和<hash of shard X>
一致。如果不一致,则需要删除该分片对象并返回404 Not Found。这是为了防止该分片对象被数据降解破坏。
接口服务可能由于分片定位失败或分片散列值不一致等原因无法获取某个分片的数据。如果失败的分片数小于等于2,接口服务会根据成功获取的分片重塑数据,并将新的分片写入随机的数据服务;如果失败的分片数大于等于3,对这样的数据丢失无能为力,只能向客户端返回404 Not Found。
具体实现 为实现RS码,接口服务的locate、heartbeat和objects包需要做相应变化。
接口服务的locate包 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func Locate (name string ) (locateInfo map [int ]string ) { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER" )) q.Publish("dataServers" , name) c := q.Consume() go func () { time.Sleep(time.Second) q.Close() }() locateInfo = make (map [int ]string ) for i := 0 ; i < rs.ALL_SHARDS; i++ { msg := <-c if len (msg.Body) == 0 { return } var info types.LocateMessage json.Unmarshal(msg.Body, &info) locateInfo[info.Id] = info.Addr } return } func Exist (name string ) bool { return len (Locate(name)) >= rs.DATA_SHARDS }
接口服务的heartbeat包 接口服务的heartbeat包也需要进行改动,将原来的返回一个随机数据服务节点的ChooseRandomDataServer
函数改为能够返回多个随机数据服务节点 的ChooseRandomDataServers
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 func ChooseRandomDataServers (n int , exclude map [int ]string ) (ds []string ) { candidates := make ([]string , 0 ) reverseExcludeMap := make (map [string ]int ) for id, addr := range exclude { reverseExcludeMap[addr] = id } servers := GetDataServers() for i := range servers { s := servers[i] _, excluded := reverseExcludeMap[s] if !excluded { candidates = append (candidates, s) } } length := len (candidates) if length < n { return } p := rand.Perm(length) for i := 0 ; i < n; i++ { ds = append (ds, candidates[p[i]]) } return }
接口服务的objects包 PUT相关函数 PUT对象时需要用到的putStream函数,调用新的heartbeat.ChooseRandomDataServers
函数获取随机数据服务节点地址并调用rs.NewRSPutStream
来生成一个数据流:
1 2 3 4 5 6 7 8 9 func putStream (hash string , size int64 ) (*rs.RSPutStream, error) { servers := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil ) if len (servers) != rs.ALL_SHARDS { return nil , fmt.Errorf("cannot find enough dataServer" ) } return rs.NewRSPutStream(servrs, hash, size) }
创建rs.RSPutStream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 const ( DATA_SHARDS = 4 PARITY_SHARDS = 2 ALL_SHARDS = DATA_SHARDS + PARITY_SHARDS BLOCK_PER_SHARD = 8000 BLOCK_SIZE = BLOCK_PER_SHARD * DATA_SHARDS ) type RSPutStream struct { *encoder } func NewRSPutStream (dataServers []string , hash string , size int64 ) (*RSPutStream, error) { if len (dataServers) != ALL_SHARDS { return nil , fmt.Errorf("dataServers number mismatch" ) } perShard := (size + DATA_SHARDS - 1 ) / DATA_SHARDS writers := make ([]io.Writer, ALL_SHARDS) var e error for i := range writers { writers[i], e = objectstream.NewTempPutStream(dataServers[i], fmt.Sprintf("%s.%d" , hash, i), perShard) if e != nil { return nil , e } } enc := NewEncoder(writers) return &RSPutStream{enc}, nil } type encoder struct { writers []io.Writer enc reedsolomon.Encoder cache []byte } func NewEncoder (writers []io.Writer) *encoder { enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS) return &encoder{writers, enc, nil } }
reedsolomon包是一个RS编解码的开源库,需要用以下命令下载该包:go get github.com/klauspost/reedsolomon
在对象PUT过程中,写入对象数据的流调用的是rs.RSPutStream,实际背后调用的Write方法也不一样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 func (e *encoder) Write (p []byte ) (n int , err error) { length := len (p) current := 0 for length != 0 { next := BLOCK_SIZE - len (e.cache) if next > length { next = length } e.cache = append (e.cache, p[current:current+next]...) if len (e.cache) == BLOCK_SIZE { e.Flush() } current += next length -= next } return len (p), nil } func (e *encoder) Flush () { if len (e.cache) == 0 { return } shards, _ := e.enc.Split(e.cache) e.enc.Encode(shards) for i := range shards { e.writers[i].Write(shards[i]) } e.cache = []byte {} }
用户上传的对象数据经过散列值校验后,RSPutStrem需要一个Commit方法用来将临时对象转正或删除。
1 2 3 4 5 6 7 8 func (s *RSPutStream) Commit (success bool ) { s.Flush() for i := range s.writers { s.writers[i].(*objectstream.TempPutStream).Commit(success) } }
GET相关函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 func get (w http.ResponseWriter, r *http.Request) { name := strings.Split(r.URL.EscapedPath(), "/" )[2 ] versionId := r.URL.Query()["version" ] version := 0 var e error if len (versionId) != 0 { version, e = strconv.Atoi(versionId[0 ]) if e != nil { log.Println(e) w.WriteHeader(http.StatusBadRequest) return } } meta, e := es.GetMetadata(name, version) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } if meta.Hash == "" { w.WriteHeader(http.StatusNotFound) return } hash := url.PathEscape(meta.Hash) stream, e := GetStream(hash, meta.Size) if e != nil { log.Println(e) w.WriteHeader(http.StatusNotFound) return } _, e = io.Copy(w, stream) if e != nil { log.Println(e) w.WriteHeader(http.StatusNotFound) return } stream.Close() } func GetStream (hash string , size int64 ) (*rs.RSGetStream, error) { locateInfo := locate.Locate(hash) if len (locateInfo) < rs.DATA_SHARDS { return nil , fmt.Errorf("object %s locate fail, result %v" , hash, locateInfo) } dataServers := make ([]string , 0 ) if len (locateInfo) != rs.ALL_SHARDS { dataServers = heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS-len (locateInfo), locateInfo) } return rs.NewRSGetStream(locateInfo, dataServers, hash, size) }
创建Re.RSGetStream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 func NewRSGetStream (locateInfo map [int ]string , dataServers []string , hash string , size int64 ) (*RSGetStream, error) { if len (locateInfo)+len (dataServers) != ALL_SHARDS { return nil , fmt.Errorf("dataServers number mismatch" ) } readers := make ([]io.Reader, ALL_SHARDS) for i := 0 ; i < ALL_SHARDS; i++ { server := locateInfo[i] if server == "" { locateInfo[i] = dataServers[0 ] dataServers = dataServers[1 :] continue } reader, e := objectstream.NewGetStream(server, fmt.Sprintf("%s.%d" , hash, i)) if e == nil { readers[i] = reader } } writers := make ([]io.Writer, ALL_SHARDS) perShard := (size + DATA_SHARDS - 1 ) / DATA_SHARDS var e error for i := range readers { if readers[i] == nil { writers[i], e = objectstream.NewTempPutStream(locateInfo[i], fmt.Sprintf("%s.%d" , hash, i), perShard) if e != nil { return nil , e } } } dec := NewDecoder(readers, writers, size) return &RSGetStream{dec}, nil } type decoder struct { readers []io.Reader writers []io.Writer enc reedsolomon.Encoder size int64 cache []byte cacheSize int total int64 } func NewDecoder (readers []io.Reader, writers []io.Writer, size int64 ) *decoder { enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS) return &decoder{readers, writers, enc, size, nil , 0 , 0 } }
对象GET过程中读取对象数据的流调用的是rs.RSGetStream,实际调用的Read方法也发生了变化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 func (d *decoder) Read (p []byte ) (n int , err error) { if d.cacheSize == 0 { e := d.getData() if e != nil { return 0 , e } } length := len (p) if d.cacheSize < length { length = d.cacheSize } d.cacheSize -= length copy (p, d.cache[:length]) d.cache = d.cache[length:] return length, nil } func (d *decoder) getData () error { if d.total == d.size { return io.EOF } shards := make ([][]byte , ALL_SHARDS) repairIds := make ([]int , 0 ) for i := range shards { if d.readers[i] == nil { repairIds = append (repairIds, i) } else { shards[i] = make ([]byte , BLOCK_PER_SHARD) n, e := io.ReadFull(d.readers[i], shards[i]) if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF { shards[i] = nil } else if n != BLOCK_PER_SHARD { shards[i] = shards[i][:n] } } } e := d.enc.Reconstruct(shards) if e != nil { return e } for i := range repairIds { id := repairIds[i] d.writers[id].Write(shards[id]) } for i := 0 ; i < DATA_SHARDS; i++ { shardSize := int64 (len (shards[i])) if d.total+shardSize > d.size { shardSize -= d.total + shardSize - d.size } d.cache = append (d.cache, shards[i][:shardSize]...) d.cacheSize += int (shardSize) d.total += shardSize } return nil }
数据服务 数据服务的locate包:由于在磁盘上保存的对象文件名格式发生了变化,locate的实现也有相应变化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 var objects = make (map [string ]int )var mutex sync.Mutexfunc Locate (hash string ) int { mutex.Lock() id, ok := objects[hash] mutex.Unlock() if !ok { return -1 } return id } func Add (hash string , id int ) { mutex.Lock() objects[hash] = id mutex.Unlock() } func Del (hash string ) { mutex.Lock() delete (objects, hash) mutex.Unlock() } func StartLocate () { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER" )) defer q.Close() q.Bind("dataServers" ) c := q.Consume() for msg := range c { hash, e := strconv.Unquote(string (msg.Body)) if e != nil { panic (e) } id := Locate(hash) if id != -1 { q.Send(msg.ReplyTo, types.LocateMessage{Addr: os.Getenv("LISTEN_ADDRESS" ), Id: id}) } } } func CollectObjects () { files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT" ) + "/objects/*" ) for i := range files { file := strings.Split(filepath.Base(files[i]), "." ) if len (file) != 3 { panic (files[i]) } hash := file[0 ] id, e := strconv.Atoi(file[1 ]) if e != nil { panic (e) } objects[hash] = id } }
数据服务的temp包:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func (t *tempInfo) hash () string { s := strings.Split(t.Name, "." ) return s[0 ] } func (t *tempInfo) id () int { s := strings.Split(t.Name, "." ) id, _ := strconv.Atoi(s[1 ]) return id } func commitTempObject (datFile string , tempinfo *tempInfo) { f, _ := os.Open(datFile) d := url.PathEscape(utils.CalculateHash(f)) f.Close() os.Rename(datFile, os.Getenv("STORAGE_ROOT" )+"/objects/" +tempinfo.Name+"." +d) locate.Add(tempinfo.hash(), tempinfo.id()) }
数据服务的objects包:
getFile函数发生改变:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 func getFile (name string ) string { files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT" ) + "/objects/" + name + ".*" ) if len (files) != 1 { return "" } file := files[0 ] h := sha256.New() sendFile(h, file) d := url.PathEscape(base64.StdEncoding.EncodeToString(h.Sum(nil ))) hash := strings.Split(file, "." )[2 ] if d != hash { log.Println("object hash mismatch, remove" , file) locate.Del(hash) os.Remove(file) return "" } return file }
测试 本版本所有代码及测试用例可见添加数据冗余和修复的分布式对象存储系统
参考 《分布式对象存储—原理、架构及Go语言实现》