断点续传 使用断点续传的原因 理想环境里网络永远通畅,对象存储系统并不需要断点续传这个功能。但在现实世界,对象存储服务在数据中心运行,而客户端在客户本地机器上运行,它们之间通过互联网连接。互联网的连接速度慢且不稳定,有可能由于网络故障导致断开连接 。在客户端上传或下载一个大对象时,因网络断开导致上传下载失败的概率就会变得不可忽视。为了解决这个问题,对象存储服务必须提供断点续传功能,允许客户端从某个检查点而不是从头开始上传或下载对象 。
断点下载 断点下载的实现比较简单,客户端在GET对象请求时通过设置Range头部来告诉接口服务需要从什么位置开始输出对象的数据 。
接口服务的处理流程在生成对象流之前和上一版本没有任何区别,但是在成功打开了对象数据流之后,接口服务会额外调用rs.RSGetStream.Seek
方法跳至客户端请求的位置,然后才开始输出数据。
断点上传流程 断点上传的流程则要比断点下载复杂得多,这是由 HTTP服务的特性导致的。客户端在下载时并不在乎数据的完整性,一旦发生网络故障,数据下到哪算哪,下次继续从最后下载的数据位置开始续传就可以了 。
但是对于上传来说,接口服务会对数据进行散列值校验,当发生网络故障时,如果上传的数据跟期望的不一致,那么整个上传的数据都会被丢弃。所以断点上传在一开始就需要客户端和接口服务做好约定,使用特定的接口进行上传 。
客户端在知道自己要上传大对象时就主动改用对象POST接口,提供对象的散列值和大小 。接口服务的处理流程和上一版本处理对象PUT一样,搜索6个数据服务并分别POST临时对象接口。数据服务的地址以及返回的uuid 会被记录在一个token里返回给客户端。
客户端POST对象后会得到一个token。对token进行PUT可以上传数据。在上传时客户端需要指定range头部来告诉接口服务上传数据的范围。接口服务对token进行解码,获取6个分片所在的数据服务地址以及uuid,分别调用PATCH将数据写入6个临时对象。
通过PUT上传的数据并不一定会被接口服务完全接收。在上一版本已经知道,经过RS分片的数据是以块的形式分批写入4个数据片的,每个数据片一次写入8000字节,4个数据片一共写入 32 000字节。所以除非是最后一批数据,否则接口服务只接收32000字节的整数倍进行写入。这是一个服务端的行为逻辑,不能要求客户端知道接口服务背后的逻辑,所以接口服务必须提供token 的 HEAD操作,让客户端知道服务端上该token目前的写入进度。
客户端每次用PUT方法访问token之前都需要先用HEAD方法获取当前已上传了多少数据。接口服务对token进行解码,获取6个分片所在的数据服务地址以及uuid,仅对第一个分片调用HEAD获取该分片当前长度,将这个数字乘以4,作为Content-Length响应头部返回给客户端。
接口服务的REST接口 接口服务的objects接口GET 方法新增了Range 请求头部,用于告诉接口服务需要的对象数据范围。
1 2 3 4 5 6 7 GET /objects/<object_name> 请求头部: Range:bytes=<first>- 响应头部: Content-Range : bytes<first>-<size>/<size>响应正文: 从first开始的对象内容
Range 请求头部是HTTP/1.1协议的一部分。给GET请求加上 Range头部意味着这个请求期望的只是全体数据的一个或多个子集。Range 请求主要支持以字节为单位的byte Range (虽然协议本身也支持其他自定义的Range单位,但是如果实现者没有在 IANA申请注册这个自定义的单位,那么就只有自己写的客户端和服务端之间可以互相理解),byte range的格式是固定字符串”bytes=”开头,后面跟上一个或多个数字的范围,由逗号分隔 。假设整体数据是10000字节,那么合法的byte range格式可以有以下几个例子:
1 2 3 4 5 6 7 8 9 请求最后500个字节(9500~9999): bytes=-500 或 bytes=9500- 请求第一个和最后一个字节(字节О和9999): bytes=0-0,—1 其他几个合法但不常见的请求第500~999个字节的格式: bytes=500-600,601-999 bytes=500-700,601-999
本项目的对象存储系统实现的格式是bytes=<first>-
。客户端通过指定first 的值告诉接口服务下载对象的数据范围,接口服务返回的数据从first开始,first之前的对象数据会在服务端被丢弃。 根据Range请求的协议规范,接口服务需要返回HTTP错误代码 206 Partial Content,并设置 Content-Range响应头部告知返回数据的范围<first>-<size>/<size>
,其中<first>
是客户端要求的起始位置,<size>
是对象的大小。
objects接口还新增了POST方法,用于创建token。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 POST /objects/<object_name> 请求头部: Digest:SHA-256=<对象散列值的Base64编码> Size:<对象内容的长度> 响应头部: Location:<访问/temp/token的URI> token被放在Location头部返回给客户端。客户端拿到后可以直接访问该URI. 除了objects接口发生的改变以外,接口服务还新增temp接口: HEAD /temp/<token> 响应头部: Content-Length:<token当前的上传字节数> PUT /temp/<token> 请求头部: Range:bytes=<first>-<last> 请求正文: 对象的内容,字节范围为first~last
客户端通过Range头部指定上传的范围,first必须跟token当前的上传字节数一致,否则接口服务会返回416 Range Not Satisfiable。如果上传的是最后一段数据,<last>
为空。
数据服务的REST接口 数据服务的temp接口新增了HEAD和GET两个方法,HEAD方法用于获取某个分片临时对象当前的大小;而GET方法则用于获取临时对象的数据。
1 2 3 4 5 6 7 HEAD /temp/<uuid> 响应头部: Content-Length:<临时对象当前的上传字节数> GET /temp/<uuid> 响应正文: 临时对象的内容
客户端将对象所有的数据上传完毕之后,接口服务需要调用这个方法从数据服务读取各分片临时对象的内容并进行数据校验,只有在验证了对象的散列值符合预期的情况下,服务端才认为该对象的上传是成功的,进而将临时对象转正。
具体实现 接口服务 接口服务的main函数以及objects包发生了变化,且新增加了temp包。
1 2 3 4 5 6 7 8 9 func main () { go heartbeat.ListenHeartbeat() http.HandleFunc("/objects/" , objects.Handler) http.HandleFunc("/temp/" , temp.Handler) http.HandleFunc("/locate/" , locate.Handler) http.HandleFunc("/versions/" , versions.Handler) log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS" ), nil )) }
接口服务的objects包 objects.Handler函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 func Handler (w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodPost { post(w, r) return } if m == http.MethodPut { put(w, r) return } if m == http.MethodGet { get(w, r) return } if m == http.MethodDelete { del(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }
objects.post相关函数:
前半段处理流程与put函数类似
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 func post (w http.ResponseWriter, r *http.Request) { name := strings.Split(r.URL.EscapedPath(), "/" )[2 ] size, e := strconv.ParseInt(r.Header.Get("size" ), 0 , 64 ) if e != nil { log.Println(e) w.WriteHeader(http.StatusForbidden) return } hash := utils.GetHashFromHeader(r.Header) if hash == "" { log.Println("missing object hash in digest header" ) w.WriteHeader(http.StatusBadRequest) return } if locate.Exist(url.PathEscape(hash)) { e = es.AddVersion(name, hash, size) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) } else { w.WriteHeader(http.StatusOK) } return } ds := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil ) if len (ds) != rs.ALL_SHARDS { log.Println("cannot find enough dataServer" ) w.WriteHeader(http.StatusServiceUnavailable) return } stream, e := rs.NewRSResumablePutStream(ds, name, url.PathEscape(hash), size) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } w.Header().Set("location" , "/temp/" +url.PathEscape(stream.ToToken())) w.WriteHeader(http.StatusCreated) } type resumableToken struct { Name string Size int64 Hash string Servers []string Uuids []string } type RSResumablePutStream struct { *RSPutStream *resumableToken } func NewRSResumablePutStream (dataServers []string , name, hash string , size int64 ) (*RSResumablePutStream, error) { putStream, e := NewRSPutStream(dataServers, hash, size) if e != nil { return nil , e } uuids := make ([]string , ALL_SHARDS) for i := range uuids { uuids[i] = putStream.writers[i].(*objectstream.TempPutStream).Uuid } token := &resumableToken{name, size, hash, dataServers, uuids} return &RSResumablePutStream{putStream, token}, nil } func (s *RSResumablePutStream) ToToken () string { b, _ := json.Marshal(s) return base64.StdEncoding.EncodeToString(b) }
objects包修改了get函数,相关函数修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 func get (w http.ResponseWriter, r *http.Request) { name := strings.Split(r.URL.EscapedPath(), "/" )[2 ] versionId := r.URL.Query()["version" ] version := 0 var e error if len (versionId) != 0 { version, e = strconv.Atoi(versionId[0 ]) if e != nil { log.Println(e) w.WriteHeader(http.StatusBadRequest) return } } meta, e := es.GetMetadata(name, version) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } if meta.Hash == "" { w.WriteHeader(http.StatusNotFound) return } hash := url.PathEscape(meta.Hash) stream, e := GetStream(hash, meta.Size) if e != nil { log.Println(e) w.WriteHeader(http.StatusNotFound) return } offset := utils.GetOffsetFromHeader(r.Header) if offset != 0 { stream.Seek(offset, io.SeekCurrent) w.Header().Set("content-range" , fmt.Sprintf("bytes %d-%d/%d" , offset, meta.Size-1 , meta.Size)) w.WriteHeader(http.StatusPartialContent) } io.Copy(w, stream) stream.Close() } func GetOffsetFromHeader (h http.Header) int64 { byteRange := h.Get("range" ) if len (byteRange) < 7 { return 0 } if byteRange[:6 ] != "bytes=" { return 0 } bytePos := strings.Split(byteRange[6 :], "-" ) offset, _ := strconv.ParseInt(bytePos[0 ], 0 , 64 ) return offset } func (s *RSGetStream) Seek (offset int64 , whence int ) (int64 , error) { if whence != io.SeekCurrent { panic ("only support SeekCurrent" ) } if offset < 0 { panic ("only support forward seek" ) } for offset != 0 { length := int64 (BLOCK_SIZE) if offset < length { length = offset } buf := make ([]byte , length) io.ReadFull(s, buf) offset -= length } return offset, nil }
接口服务的temp包 temp.Handler函数
1 2 3 4 5 6 7 8 9 10 11 12 13 func Handler (w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodHead { head(w, r) return } if m == http.MethodPut { put(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }
temp.put相关函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 func put (w http.ResponseWriter, r *http.Request) { token := strings.Split(r.URL.EscapedPath(), "/" )[2 ] stream, e := rs.NewRSResumablePutStreamFromToken(token) if e != nil { log.Println(e) w.WriteHeader(http.StatusForbidden) return } current := stream.CurrentSize() if current == -1 { w.WriteHeader(http.StatusNotFound) return } offset := utils.GetOffsetFromHeader(r.Header) if current != offset { w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) return } bytes := make ([]byte , rs.BLOCK_SIZE) for { n, e := io.ReadFull(r.Body, bytes) if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } current += int64 (n) if current > stream.Size { stream.Commit(false ) log.Println("resumable put exceed size" ) w.WriteHeader(http.StatusForbidden) return } if n != rs.BLOCK_SIZE && current != stream.Size { return } stream.Write(bytes[:n]) if current == stream.Size { stream.Flush() getStream, e := rs.NewRSResumableGetStream(stream.Servers, stream.Uuids, stream.Size) hash := url.PathEscape(utils.CalculateHash(getStream)) if hash != stream.Hash { stream.Commit(false ) log.Println("resumable put done but hash mismatch" ) w.WriteHeader(http.StatusForbidden) return } if locate.Exist(url.PathEscape(hash)) { stream.Commit(false ) } else { stream.Commit(true ) } e = es.AddVersion(stream.Name, stream.Hash, stream.Size) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) } return } } } func NewRSResumablePutStreamFromToken (token string ) (*RSResumablePutStream, error) { b, e := base64.StdEncoding.DecodeString(token) if e != nil { return nil , e } var t resumableToken e = json.Unmarshal(b, &t) if e != nil { return nil , e } writers := make ([]io.Writer, ALL_SHARDS) for i := range writers { writers[i] = &objectstream.TempPutStream{t.Servers[i], t.Uuids[i]} } enc := NewEncoder(writers) return &RSResumablePutStream{&RSPutStream{enc}, &t}, nil } func (s *RSResumablePutStream) CurrentSize () int64 { r, e := http.Head(fmt.Sprintf("http://%s/temp/%s" , s.Servers[0 ], s.Uuids[0 ])) if e != nil { log.Println(e) return -1 } if r.StatusCode != http.StatusOK { log.Println(r.StatusCode) return -1 } size := utils.GetSizeFromHeader(r.Header) * DATA_SHARDS if size > s.Size { size = s.Size } return size }
temp.head相关函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 func head (w http.ResponseWriter, r *http.Request) { token := strings.Split(r.URL.EscapedPath(), "/" )[2 ] stream, e := rs.NewRSResumablePutStreamFromToken(token) if e != nil { log.Println(e) w.WriteHeader(http.StatusForbidden) return } current := stream.CurrentSize() if current == -1 { w.WriteHeader(http.StatusNotFound) return } w.Header().Set("content-length" , fmt.Sprintf("%d" , current)) }
数据服务 数据服务的temp包发生改动,新增get和head方法。
Handler函数相比上一版本多了对 HEAD/PUT 方法的处理。如果接口服务以HEAD方式访问数据服务的temp接口,Handler 会调用head;如果接口服务以GET方式访问数据服务的temp接口,则Handler会调用get。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func get (w http.ResponseWriter, r *http.Request) { uuid := strings.Split(r.URL.EscapedPath(), "/" )[2 ] f, e := os.Open(os.Getenv("STORAGE_ROOT" ) + "/temp/" + uuid + ".dat" ) if e != nil { log.Println(e) w.WriteHeader(http.StatusNotFound) return } defer f.Close() io.Copy(w, f) } func head (w http.ResponseWriter, r *http.Request) { uuid := strings.Split(r.URL.EscapedPath(), "/" )[2 ] f, e := os.Open(os.Getenv("STORAGE_ROOT" ) + "/temp/" + uuid + ".dat" ) if e != nil { log.Println(e) w.WriteHeader(http.StatusNotFound) return } defer f.Close() info, e := f.Stat() if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } w.Header().Set("content-length" , fmt.Sprintf("%d" , info.Size())) }
测试 本版本所有代码及测试用例可见增加断点续传功能版本代码
参考 《分布式对象存储—原理、架构及Go语言实现》