数据校验和去重 上一个版本加入了对象存储服务的元数据服务。有了元数据服务,可以在不实际删除数据的情况下实现对象的删除功能,可以实现对象的版本控制,可以确保数据对象的一致性和GET方法的幂等性 。这些都是因为元数据服务可以保存对象的元数据。
另外,上一个版本的接口服务要求客户端提供对象的散列值作为全局唯一的标识符,也就是数据服务存储的对象名,但实现过程中并没有对这个散列值进行校验,用户提供的对象散列值和数据有可能是不一致的, 产生不一致的原因有很多,本版本需要解决该问题。
去重基本概念 去重是一种消除重复数据多余副本的数据压缩技术 。对于一个对象存储系统来说,通常都会有来自不同(或相同)用户的大量重复数据。如果没有去重,每一份重复的数据都会占据存储空间。去重能够让重复数据在系统中只保留一个实体 ,是一个极好的节省存储空间、提升存储利用率的技术。
一个很常见的去重例子是邮件的转发。假设某个邮件内含一个大小为1MB的附件,如果该邮件被转发了100次,那么邮件服务器上就保存了100个一模一样的附件,总共占用100MB的空间。每次管理员对该邮件服务器进行云备份,都会上传100个一模一样的附件对象到对象存储系统。如果这个对象存储系统使用了数据去重技术,那么无论这个管理员备份多少次,在对象存储系统中,这个附件所代表的对象就只有一份。
本项目的去重基于对象的全局唯一标识符,也就是通过对该对象的散列值进行单例检查(Single Instance Storage,SIS)来实现 。具体来说,每次当接口服务节点接收到对象的PUT请求之后,都会进行一次定位,如果PUT对象的散列值已经存在于数据服务中,就会跳过之后的数据服务PUT请求,直接生成该对象的新版本插入元数据服务;如果PUT对象的散列值不存在于数据服务中,说明这是一个全新的对象。接口服务会读取PUT请求的正文,写入数据服务。
但是在实现去重之前,还有一个步骤要做,就是数据校验 。
数据校验的原因 一般来说,客户端上传的数据不一致可能由以下几种情况导致。
客户端是一个恶意客户端,故意上传不一致的数据给服务器。
客户端有bug,计算出来的数据是错误的。
客户端计算的数据正确,但是传输过程中发生了错误。
对象存储是一个服务,如果全盘接收来自客户端的数据,而不对这个散列值进行校验,那么恶意客户端就可以通过随意编造散列值的方式上传大量内容和散列值不符的对象来污染数据;且即使是善意的客户端也难免因为软件错误或上传的数据损坏而导致对象数据和散列值不符。如果不对数据进行校验,允许错误的对象数据被保存在系统中,那么当另一个用户上传的数据的散列值恰好跟错误数据的相同时,就会因为SIS检查而导致其数据并没有被真正上传。然后当这个用户需要下载自己的对象时,下载到的就会是那个错误的数据。
为了防止这种情况发生,必须进行数据校验,验证客户端提供的散列值和服务器根据对象数据计算出来的散列值是否一致。有了数据校验,才能确保数据服务中保存的对象数据和散列值一致,然后放心对后续上传的对象根据散列值进行去重。
现在的主要问题:之前的版本一直都是以数据流的形式处理来自客户端的请求,接口服务调用io.Copy 从对象PUT请求的正文中直接读取对象数据并写入数据服务 。这是因为客户端上传的对象大小可能超出接口服务节点的内存,不能把整个对象读入内存后再进行处理 。而现在必须等整个对象都上传完以后才能算出散列值,然后才能决定是否要存进数据服务。这就形成了一个悖论:在客户端的对象完全上传完毕之前,不知道要不要把这个对象写入数据服务;但是等客户端的对象上传完毕之后再开始写入又做不到,因为对象可能太大,内存里根本放不下。
最简单的解决办法只需要在数据服务节点进行数据校验,将校验一致的对象保留,不一致的删除不就可以了吗? 这样的设计在版本是没问题的。在数据服务节点上进行数据校验的前提是数据服务节点上的数据和用户上传的数据完全相同 ,本版本的设计满足这个前提。但是在后续版本中会看到,随着对象存储系统的不断完善,最终保存在数据服务节点上的对象数据和用户上传的对象数据可能截然不同 。那时就无法在数据服务节点上进行数据校验。数据校验这一步骤必须在接口服务节点完成。
实现数据校验的方法 为了真正解决上述矛盾,需要在数据服务上提供对象的缓存功能,接口服务不需要将用户上传的对象缓存在自身节点的内存里,而是传输到某个数据服务节点的一个临时对象里,并在传输数据的同时计算其散列值。当整个数据传输完毕以后,散列值计算也同步完成,如果一致,接口节点需要将临时对象转成正式对象:如果不一致,则将临时对象删除。
为数据服务加入缓存功能 本版本接口服务功能没有发生变化,数据服务删除了objects接口的PUT方法并新添加了temp接口的POST,PATCH,PUT,DELETE4种方法。
数据服务的REST接口
请求头部
响应正文
接口服务以PATCH方法访问数据服务节点上的临时对象,HTTP请求的正文会被写入该临时对象。
接口服务数据校验一致,调用PUT方法将该临时文件转正。
接口服务数据校验不一致 ,调用DELETE方法将临时文件删除 。
对象PUT流程
客户端在PUT对象时需要提供对象的散列值和大小。接口服务首先在数据服务层定位散列值,如果已经存在,则直接添加元数据;如果不存在,则用POST方法访问数据服务节点的temp接口,提供对象的散列值和大小。数据服务节点返回一个uuid。然后接口服务用PATCH方法将客户端的数据上传给数据服务,同时计算数据的散列值 。客户端数据上传完毕后核对计算出的散列值和客户端提供的散列值是否一致,如果一致则用PUT方法将临时对象转正;否则用DELETE方法删除临时对象。临时对象的内容首先被保存在数据服务本地磁盘的$STORAGE_ROOT/temp/<uuid>.dat
文件,转正后会被重命名为$STORAGE_ROOTlobjects/<hash>
文件。
具体实现 接口服务 接口服务主要修改了objects.put部分的相关函数。
objects.put相关函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 func put (w http.ResponseWriter, r *http.Request) { hash := utils.GetHashFromHeader(r.Header) if hash == "" { log.Println("missing object hash in digest header" ) w.WriteHeader(http.StatusBadRequest) return } size := utils.GetSizeFromHeader(r.Header) c, e := storeObject(r.Body, hash, size) if e != nil { log.Println(e) w.WriteHeader(c) return } if c != http.StatusOK { w.WriteHeader(c) return } name := strings.Split(r.URL.EscapedPath(), "/" )[2 ] e = es.AddVersion(name, hash, size) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) } } func storeObject (r io.Reader, hash string , size int64 ) (int , error) { if locate.Exist(url.PathEscape(hash)) { return http.StatusOK, nil } stream, e := putStream(url.PathEscape(hash), size) if e != nil { return http.StatusInternalServerError, e } reader := io.TeeReader(r, stream) d := utils.CalculateHash(reader) if d != hash { stream.Commit(false ) return http.StatusBadRequest, fmt.Errorf("object hash mismatch, calculated=%s, requested=%s" , d, hash) } stream.Commit(true ) return http.StatusOK, nil } func CalculateHash (r io.Reader) string { h := sha256.New() io.Copy(h, r) return base64.StdEncoding.EncodeToString(h.Sum(nil )) } func putStream (hash string , size int64 ) (*objectstream.TempPutStream, error) { server := heartbeat.ChooseRandomDataServer() if server == "" { return nil , fmt.Errorf("cannot find any dataServer" ) } return objectstream.NewTempPutStream(server, hash, size) }
objectstream.TempPutStream相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 type TempPutStream struct { Server string Uuid string } func NewTempPutStream (server, object string , size int64 ) (*TempPutStream, error) { request, e := http.NewRequest("POST" , "http://" +server+"/temp/" +object, nil ) if e != nil { return nil , e } request.Header.Set("size" , fmt.Sprintf("%d" , size)) client := http.Client{} response, e := client.Do(request) if e != nil { return nil , e } uuid, e := ioutil.ReadAll(response.Body) if e != nil { return nil , e } return &TempPutStream{server, string (uuid)}, nil } func (w *TempPutStream) Write (p []byte ) (n int , err error) { request, e := http.NewRequest("PATCH" , "http://" +w.Server+"/temp/" +w.Uuid, strings.NewReader(string (p))) if e != nil { return 0 , e } client := http.Client{} r, e := client.Do(request) if e != nil { return 0 , e } if r.StatusCode != http.StatusOK { return 0 , fmt.Errorf("dataServer return http code %d" , r.StatusCode) } return len (p), nil } func (w *TempPutStream) Commit (good bool ) { method := "DELETE" if good { method = "PUT" } request, _ := http.NewRequest(method, "http://" +w.Server+"/temp/" +w.Uuid, nil ) client := http.Client{} client.Do(request) }
数据服务 main函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 func main () { locate.CollectObjects() go heartbeat.StartHeartbeat() go locate.StartLocate() http.HandleFunc("/objects/" , objects.Handler) http.HandleFunc("/temp/" , temp.Handler) log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS" ), nil )) }
数据服务的locate包 具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 var objects = make (map [string ]int )var mutex sync.Mutexfunc Locate (hash string ) bool { mutex.Lock() _, ok := objects[hash] mutex.Unlock() return ok } func Add (hash string ) { mutex.Lock() objects[hash] = 1 mutex.Unlock() } func Del (hash string ) { mutex.Lock() delete (objects, hash) mutex.Unlock() } func StartLocate () { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER" )) defer q.Close() q.Bind("dataServers" ) c := q.Consume() for msg := range c { hash, e := strconv.Unquote(string (msg.Body)) if e != nil { panic (e) } exist := Locate(hash) if exist { q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS" )) } } } func CollectObjects () { files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT" ) + "/objects/*" ) for i := range files { hash := filepath.Base(files[i]) objects[hash] = 1 } }
数据服务的temp包 Handler函数针对访问temp接口的HTTP方法分别调用相应的处理函数put,patch,post和del:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func Handler (w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodPut { put(w, r) return } if m == http.MethodPatch { patch(w, r) return } if m == http.MethodPost { post(w, r) return } if m == http.MethodDelete { del(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }
temp包的post相关函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 type tempInfo struct { Uuid string Name string Size int64 } func post (w http.ResponseWriter, r *http.Request) { output, _ := exec.Command("uuidgen" ).Output() uuid := strings.TrimSuffix(string (output), "\n" ) name := strings.Split(r.URL.EscapedPath(), "/" )[2 ] size, e := strconv.ParseInt(r.Header.Get("size" ), 0 , 64 ) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } t := tempInfo{uuid, name, size} e = t.writeToFile() if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } os.Create(os.Getenv("STORAGE_ROOT" ) + "/temp/" + t.Uuid + ".dat" ) w.Write([]byte (uuid)) } func (t *tempInfo) writeToFile () error { f, e := os.Create(os.Getenv("STORAGE_ROOT" ) + "/temp/" + t.Uuid) if e != nil { return e } defer f.Close() b, _ := json.Marshal(t) f.Write(b) return nil }
接口服务调用POST方法之后会从数据服务获得一个uuid,这意味着数据服务已经为这个临时对象做好了准备。之后接口服务还需要继续调用PATCH方法将数据上传。
patch方法的相关函数实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 func patch (w http.ResponseWriter, r *http.Request) { uuid := strings.Split(r.URL.EscapedPath(), "/" )[2 ] tempinfo, e := readFromFile(uuid) if e != nil { log.Println(e) w.WriteHeader(http.StatusNotFound) return } infoFile := os.Getenv("STORAGE_ROOT" ) + "/temp/" + uuid datFile := infoFile + ".dat" f, e := os.OpenFile(datFile, os.O_WRONLY|os.O_APPEND, 0 ) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } defer f.Close() _, e = io.Copy(f, r.Body) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } info, e := f.Stat() if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } actual := info.Size() if actual > tempinfo.Size { os.Remove(datFile) os.Remove(infoFile) log.Println("actual size" , actual, "exceeds" , tempinfo.Size) w.WriteHeader(http.StatusInternalServerError) } } func readFromFile (uuid string ) (*tempInfo, error) { f, e := os.Open(os.Getenv("STORAGE_ROOT" ) + "/temp/" + uuid) if e != nil { return nil , e } defer f.Close() b, _ := ioutil.ReadAll(f) var info tempInfo json.Unmarshal(b, &info) return &info, nil }
接口服务调用PATCH方法将整个临时对象上传完毕后,自己也已经完成了数据校验的工作,根据数据校验的结果决定是调用PUT方法将临时文件转正还是调用DELETE 方法删除临时文件
temp包的put相关函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func put (w http.ResponseWriter, r *http.Request) { uuid := strings.Split(r.URL.EscapedPath(), "/" )[2 ] tempinfo, e := readFromFile(uuid) if e != nil { log.Println(e) w.WriteHeader(http.StatusNotFound) return } infoFile := os.Getenv("STORAGE_ROOT" ) + "/temp/" + uuid datFile := infoFile + ".dat" f, e := os.Open(datFile) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } defer f.Close() info, e := f.Stat() if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } actual := info.Size() os.Remove(infoFile) if actual != tempinfo.Size { os.Remove(datFile) log.Println("actual size mismatch, expect" , tempinfo.Size, "actual" , actual) w.WriteHeader(http.StatusInternalServerError) return } commitTempObject(datFile, tempinfo) }
temp包的del函数:
1 2 3 4 5 6 7 8 func del (w http.ResponseWriter, r *http.Request) { uuid := strings.Split(r.URL.EscapedPath(), "/" )[2 ] infoFile := os.Getenv("STORAGE_ROOT" ) + "/temp/" + uuid datFile := infoFile + ".dat" os.Remove(infoFile) os.Remove(datFile) }
数据服务的objects包 数据服务的put方法需要做相关修改。
由于当前版本在数据服务的对象上传完全依靠temp接口的临时对象转正,不再需要objects接口的PUT方法,objects的Handler函数做修改如下:
1 2 3 4 5 6 7 8 func Handler (w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodGet { get(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }
读取对象的时候需要进行一次数据校验。
对objects.get相关函数做修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func get (w http.ResponseWriter, r *http.Request) { file := getFile(strings.Split(r.URL.EscapedPath(), "/" )[2 ]) if file == "" { w.WriteHeader(http.StatusNotFound) return } sendFile(w, file) } func getFile (hash string ) string { file := os.Getenv("STORAGE_ROOT" ) + "/objects/" + hash f, _ := os.Open(file) d := url.PathEscape(utils.CalculateHash(f)) f.Close() if d != hash { log.Println("object hash mismatch, remove" , file) locate.Del(hash) os.Remove(file) return "" } return file } func sendFile (w io.Writer, file string ) { f, _ := os.Open(file) defer f.Close() io.Copy(w, f) }
注意:即使再接口层已经对数据进行过校验,在数据服务层进行校验依然很有必要。本版本的数据校验是用于防止存储系统的数据降解,哪怕在上传时正确的数据也有可能随着时间的流逝而逐渐发生损坏。
功能测试 本版本实现的程序可见添加数据校验和去重的分布式对象存储系统 ,功能测试请参考shell脚本进行。
去重带来的性能问题 实际的功能测试中可以发现,系统在第一次PUT对象时等待了约1s。这是locate定位的超时时间。 为了去重,每一个新对象上传时都不得不等待这个时间以确保数据服务中不存在散列值相等的对象。实际使用中大多数情况下上传的都是内容不同的新对象,这是一个很严重的性能问题。减少定位的超时时间可以减少用户的等待时间,但这并不算是从根本上解决了问题,且超时时间设置过短也会提升SIS检查的失败概率(比如某个对象其实存在于数据服务中但没能及时返回定位消息),这么做得不偿失。
有一个看上去可行的解决方案是免除小对象的去重 :对于大对象,其上传的时间本来就比较长,比如1个10MB的对象在20Mbit/s 上行带宽的连接上需要4s的传输时间,1s 的定位超时只是25%的额外时间,看上去这个并不特别突出。而一个10KB 的对象上传只需要0.004s,25000%的额外等待就显得无法忍受了。如果免除小对象的去重,看上去性能会好很多,小对象本身占用的空间也不大,不去重似乎也可以接受。
但很可惜这样是不行的,原因有两点:
首先,对小对象不去重会导致它们在对象存储系统的每一个数据服务节点上都存在一个备份,这就会占用大量的磁盘资源 。
更重要的原因在于,一旦接口服务定位一个这样的小对象,所有的数据服务节点都会响应,然后每一个节点都会反馈一个消息以通知该对象的存在。渐渐的消息队列会塞满反馈消息。 而如果有用户在同一时间下载大量小对象(比如用户从云端恢复客户机的操作系统),那就成了系统的灾难,真正的生产环境可不会像测试这样只有寥寥几台数据服务节点,而是可能有成千上万的数据节点。
这个性能问题单靠对象存储服务端是无法解决的。一个有效的解决方案是优化客户端的行为。如果客户端能将多个小对象尽量打包成一个大对象上传而不是分别上传,那么1s的等待时间就可以忽略 。而且,当客户端下载小对象时,就需要下载含有该小对象的大对象,然后从中取出小对象。这样看上去有些烦琐,但是在需要一次性恢复大量小对象时非常有利,因为无须为每个小对象而频繁访问对象存储服务。
参考 《分布式对象存储—原理、架构及Go语言实现》