断点续传

断点续传

使用断点续传的原因

理想环境里网络永远通畅,对象存储系统并不需要断点续传这个功能。但在现实世界,对象存储服务在数据中心运行,而客户端在客户本地机器上运行,它们之间通过互联网连接。互联网的连接速度慢且不稳定,有可能由于网络故障导致断开连接。在客户端上传或下载一个大对象时,因网络断开导致上传下载失败的概率就会变得不可忽视。为了解决这个问题,对象存储服务必须提供断点续传功能,允许客户端从某个检查点而不是从头开始上传或下载对象

断点下载

断点下载的实现比较简单,客户端在GET对象请求时通过设置Range头部来告诉接口服务需要从什么位置开始输出对象的数据

接口服务的处理流程在生成对象流之前和上一版本没有任何区别,但是在成功打开了对象数据流之后,接口服务会额外调用rs.RSGetStream.Seek方法跳至客户端请求的位置,然后才开始输出数据。

断点下载流程

断点上传流程

断点上传的流程则要比断点下载复杂得多,这是由 HTTP服务的特性导致的。客户端在下载时并不在乎数据的完整性,一旦发生网络故障,数据下到哪算哪,下次继续从最后下载的数据位置开始续传就可以了

但是对于上传来说,接口服务会对数据进行散列值校验,当发生网络故障时,如果上传的数据跟期望的不一致,那么整个上传的数据都会被丢弃。所以断点上传在一开始就需要客户端和接口服务做好约定,使用特定的接口进行上传

对象POST接口

客户端在知道自己要上传大对象时就主动改用对象POST接口,提供对象的散列值和大小。接口服务的处理流程和上一版本处理对象PUT一样,搜索6个数据服务并分别POST临时对象接口。数据服务的地址以及返回的uuid 会被记录在一个token里返回给客户端。

客户端POST对象后会得到一个token。对token进行PUT可以上传数据。在上传时客户端需要指定range头部来告诉接口服务上传数据的范围。接口服务对token进行解码,获取6个分片所在的数据服务地址以及uuid,分别调用PATCH将数据写入6个临时对象。

用PUT方法访问token上传数据

通过PUT上传的数据并不一定会被接口服务完全接收。在上一版本已经知道,经过RS分片的数据是以块的形式分批写入4个数据片的,每个数据片一次写入8000字节,4个数据片一共写入 32 000字节。所以除非是最后一批数据,否则接口服务只接收32000字节的整数倍进行写入。这是一个服务端的行为逻辑,不能要求客户端知道接口服务背后的逻辑,所以接口服务必须提供token 的 HEAD操作,让客户端知道服务端上该token目前的写入进度。

用HEAD方法访问token获取当前已经上传了多少数据

客户端每次用PUT方法访问token之前都需要先用HEAD方法获取当前已上传了多少数据。接口服务对token进行解码,获取6个分片所在的数据服务地址以及uuid,仅对第一个分片调用HEAD获取该分片当前长度,将这个数字乘以4,作为Content-Length响应头部返回给客户端。

接口服务的REST接口

接口服务的objects接口GET 方法新增了Range 请求头部,用于告诉接口服务需要的对象数据范围。

1
2
3
4
5
6
7
GET /objects/<object_name>
请求头部:
Range:bytes=<first>-
响应头部:
Content-Range: bytes<first>-<size>/<size>
响应正文:
从first开始的对象内容

Range 请求头部是HTTP/1.1协议的一部分。给GET请求加上 Range头部意味着这个请求期望的只是全体数据的一个或多个子集。Range 请求主要支持以字节为单位的byte Range(虽然协议本身也支持其他自定义的Range单位,但是如果实现者没有在 IANA申请注册这个自定义的单位,那么就只有自己写的客户端和服务端之间可以互相理解),byte range的格式是固定字符串”bytes=”开头,后面跟上一个或多个数字的范围,由逗号分隔。假设整体数据是10000字节,那么合法的byte range格式可以有以下几个例子:

1
2
3
4
5
6
7
8
9
请求最后500个字节(9500~9999):
bytes=-500 或 bytes=9500-

请求第一个和最后一个字节(字节О和9999):
bytes=0-0,—1

其他几个合法但不常见的请求第500~999个字节的格式:
bytes=500-600,601-999
bytes=500-700,601-999

本项目的对象存储系统实现的格式是bytes=<first>-客户端通过指定first 的值告诉接口服务下载对象的数据范围,接口服务返回的数据从first开始,first之前的对象数据会在服务端被丢弃。根据Range请求的协议规范,接口服务需要返回HTTP错误代码 206 Partial Content,并设置 Content-Range响应头部告知返回数据的范围<first>-<size>/<size>,其中<first>是客户端要求的起始位置,<size>是对象的大小。

objects接口还新增了POST方法,用于创建token。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
POST /objects/<object_name>
请求头部:
Digest:SHA-256=<对象散列值的Base64编码>
Size:<对象内容的长度>
响应头部:
Location:<访问/temp/token的URI>
token被放在Location头部返回给客户端。客户端拿到后可以直接访问该URI.

除了objects接口发生的改变以外,接口服务还新增temp接口:
HEAD /temp/<token>
响应头部:
Content-Length:<token当前的上传字节数>

PUT /temp/<token>
请求头部:
Range:bytes=<first>-<last>
请求正文:
对象的内容,字节范围为first~last

客户端通过Range头部指定上传的范围,first必须跟token当前的上传字节数一致,否则接口服务会返回416 Range Not Satisfiable。如果上传的是最后一段数据,<last>为空。

数据服务的REST接口

数据服务的temp接口新增了HEAD和GET两个方法,HEAD方法用于获取某个分片临时对象当前的大小;而GET方法则用于获取临时对象的数据。

1
2
3
4
5
6
7
HEAD /temp/<uuid>
响应头部:
Content-Length:<临时对象当前的上传字节数>

GET /temp/<uuid>
响应正文:
临时对象的内容

客户端将对象所有的数据上传完毕之后,接口服务需要调用这个方法从数据服务读取各分片临时对象的内容并进行数据校验,只有在验证了对象的散列值符合预期的情况下,服务端才认为该对象的上传是成功的,进而将临时对象转正。

具体实现

接口服务

接口服务的main函数以及objects包发生了变化,且新增加了temp包。

1
2
3
4
5
6
7
8
9
func main() {
go heartbeat.ListenHeartbeat()
http.HandleFunc("/objects/", objects.Handler)
// 增加temp.Handler函数用于处理对/temp的请求
http.HandleFunc("/temp/", temp.Handler)
http.HandleFunc("/locate/", locate.Handler)
http.HandleFunc("/versions/", versions.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}
接口服务的objects包

objects.Handler函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
// 增加POST方法的处理函数post
if m == http.MethodPost {
post(w, r)
return
}
if m == http.MethodPut {
put(w, r)
return
}
if m == http.MethodGet {
get(w, r)
return
}
if m == http.MethodDelete {
del(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

objects.post相关函数:

前半段处理流程与put函数类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
func post(w http.ResponseWriter, r *http.Request) {
// 从请求的URL中获得对象的名字
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 从请求的相应头部获得对象的大小
size, e := strconv.ParseInt(r.Header.Get("size"), 0, 64)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusForbidden)
return
}
// 从请求的相应头部获得对象的散列值
hash := utils.GetHashFromHeader(r.Header)
if hash == "" {
log.Println("missing object hash in digest header")
w.WriteHeader(http.StatusBadRequest)
return
}
// 如果散列值已经存在,可以直接往元数据服务添加新版本并返回200OK
if locate.Exist(url.PathEscape(hash)) {
e = es.AddVersion(name, hash, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
return
}
// 散列值不存在,随机选出6个数据节点
ds := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil)
if len(ds) != rs.ALL_SHARDS {
log.Println("cannot find enough dataServer")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
// 调用rs.NewRSResumablePutStream生成数据流stream
stream, e := rs.NewRSResumablePutStream(ds, name, url.PathEscape(hash), size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 调用ToToken方法生成一个字符串token,放入Location响应头部
w.Header().Set("location", "/temp/"+url.PathEscape(stream.ToToken()))
// 返回HTTP代码201 Created
w.WriteHeader(http.StatusCreated)
}

// 保存对象的名字、大小、散列值以及6个分片所在数据服务节点的地址和uuid
type resumableToken struct {
Name string
Size int64
Hash string
Servers []string
Uuids []string
}

type RSResumablePutStream struct {
*RSPutStream
*resumableToken
}

// 传入保存数据服务节点地址的dataServers数组,对象的名字name,散列值以及大小
func NewRSResumablePutStream(dataServers []string, name, hash string, size int64) (*RSResumablePutStream, error) {
// 调用NewRSPutStream创建一个类型为RSPutStream的变量putStream
putStream, e := NewRSPutStream(dataServers, hash, size)
if e != nil {
return nil, e
}
uuids := make([]string, ALL_SHARDS)
// 从putStream的成员writers数组中获取6个分片的uuid,保存到uuid数组
for i := range uuids {
uuids[i] = putStream.writers[i].(*objectstream.TempPutStream).Uuid
}
// 创建resumableToken结构体token
token := &resumableToken{name, size, hash, dataServers, uuids}
return &RSResumablePutStream{putStream, token}, nil
}

// 将自身数据以JSON格式编入,然后返回结果Base64编码后的字符串
func (s *RSResumablePutStream) ToToken() string {
b, _ := json.Marshal(s)
return base64.StdEncoding.EncodeToString(b)
}

objects包修改了get函数,相关函数修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func get(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
hash := url.PathEscape(meta.Hash)
stream, e := GetStream(hash, meta.Size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 调用utils.GetOffsetFromHeader函数获取HTTP的Range头部
offset := utils.GetOffsetFromHeader(r.Header)
if offset != 0 {
// offset不为0,调用stream的Seek方法跳到offset位置
stream.Seek(offset, io.SeekCurrent)
// 设置Content-Range响应头部以及HTTP代码206 Partial Content
w.Header().Set("content-range", fmt.Sprintf("bytes %d-%d/%d", offset, meta.Size-1, meta.Size))
w.WriteHeader(http.StatusPartialContent)
}
// 通过io.Copy输出数据
io.Copy(w, stream)
stream.Close()
}

// Range的头部格式必须为bytes=<first>- 开头
func GetOffsetFromHeader(h http.Header) int64 {
byteRange := h.Get("range")
if len(byteRange) < 7 {
return 0
}
if byteRange[:6] != "bytes=" {
return 0
}
// 调用strings.Split将<first>部分切取出来
bytePos := strings.Split(byteRange[6:], "-")
// 调用strconv.ParseInt将字符串转换为int64返回
offset, _ := strconv.ParseInt(bytePos[0], 0, 64)
return offset
}

// offset表示需要跳过的字节数,whence表示起跳点
func (s *RSGetStream) Seek(offset int64, whence int) (int64, error) {
// 方法只支持从当前位置io.SeekCurrent起跳
if whence != io.SeekCurrent {
panic("only support SeekCurrent")
}
// 跳过的字节数不能为负
if offset < 0 {
panic("only support forward seek")
}
// for循环中每次读取32000字节并丢弃,直到读到offset位置为止
for offset != 0 {
length := int64(BLOCK_SIZE)
if offset < length {
length = offset
}
buf := make([]byte, length)
io.ReadFull(s, buf)
offset -= length
}
return offset, nil
}
接口服务的temp包

temp.Handler函数

1
2
3
4
5
6
7
8
9
10
11
12
13
// 检查访问方式并调用相应的函数
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodHead {
head(w, r)
return
}
if m == http.MethodPut {
put(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

temp.put相关函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
func put(w http.ResponseWriter, r *http.Request) {
token := strings.Split(r.URL.EscapedPath(), "/")[2]
stream, e := rs.NewRSResumablePutStreamFromToken(token)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusForbidden)
return
}
// 调用CurrentSize获取token当前大小
current := stream.CurrentSize()
if current == -1 {
w.WriteHeader(http.StatusNotFound)
return
}
// 从Range头部获得offset
offset := utils.GetOffsetFromHeader(r.Header)
// 如果offset和当前的大小不一致,接口服务返回416Range Not Satisfiable
if current != offset {
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
bytes := make([]byte, rs.BLOCK_SIZE)
// 在for循环中以32000字节为长度读取HTTP请求的正文并写入stream
for {
n, e := io.ReadFull(r.Body, bytes)
if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
current += int64(n)
// 如果读到的总长度超出了对象的大小,说明客户端上传的数据有误
// 接口服务删除临时对象并返回403 Forbidden
if current > stream.Size {
stream.Commit(false)
log.Println("resumable put exceed size")
w.WriteHeader(http.StatusForbidden)
return
}
if n != rs.BLOCK_SIZE && current != stream.Size {
return
}
stream.Write(bytes[:n])
// 读到的总长度等于对象的大小,说明客户端上传了对象的全部数据
if current == stream.Size {
// 调用Flush方法将剩余数据写入临时对象
stream.Flush()
// 调用NewRSResumableGetStream生成一个临时对象读取流getStream
getStream, e := rs.NewRSResumableGetStream(stream.Servers, stream.Uuids, stream.Size)
// 读取getStream中的数据并计算散列值
hash := url.PathEscape(utils.CalculateHash(getStream))
// 散列值不一致,客户端上传数据有错误
if hash != stream.Hash {
// 接口服务删除临时对象
stream.Commit(false)
log.Println("resumable put done but hash mismatch")
// 返回403 Forbidden
w.WriteHeader(http.StatusForbidden)
return
}
// 检查该散列值是否已经存在
if locate.Exist(url.PathEscape(hash)) {
// 存在则删除临时对象
stream.Commit(false)
} else {
// 否则将对象转正
stream.Commit(true)
}
// 调用es.AddVersion添加新版本
e = es.AddVersion(stream.Name, stream.Hash, stream.Size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
}
return
}
}
}

func NewRSResumablePutStreamFromToken(token string) (*RSResumablePutStream, error) {
// 对token进行Base64解码
b, e := base64.StdEncoding.DecodeString(token)
if e != nil {
return nil, e
}

var t resumableToken
// 将JSON数据编出形成resumableToken结构体t
e = json.Unmarshal(b, &t)
if e != nil {
return nil, e
}

writers := make([]io.Writer, ALL_SHARDS)
// t的Servers和uuid数组中保存了当初创建的6个分片临时对象所在的数据服务节点地址和uuid
for i := range writers {
// 保存到writers数组中
writers[i] = &objectstream.TempPutStream{t.Servers[i], t.Uuids[i]}
}
// 以writers数组为参数创建encoder结构体enc
enc := NewEncoder(writers)
// 以enc为内嵌结构体创建RSPutStream
// 最终以RSPutStream和t为内嵌结构体创建RSResumablePutStream返回
return &RSResumablePutStream{&RSPutStream{enc}, &t}, nil
}

// 以HEAD方法获取第一个分片临时对象的大小并乘4作为size返回
func (s *RSResumablePutStream) CurrentSize() int64 {
r, e := http.Head(fmt.Sprintf("http://%s/temp/%s", s.Servers[0], s.Uuids[0]))
if e != nil {
log.Println(e)
return -1
}
if r.StatusCode != http.StatusOK {
log.Println(r.StatusCode)
return -1
}
size := utils.GetSizeFromHeader(r.Header) * DATA_SHARDS
// 如果size超出对象的大小,返回对象大小
if size > s.Size {
size = s.Size
}
return size
}

temp.head相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 根据token恢复出stream后调用CurrentSize获取当前大小并方在Content-Length头部返回
func head(w http.ResponseWriter, r *http.Request) {
token := strings.Split(r.URL.EscapedPath(), "/")[2]
stream, e := rs.NewRSResumablePutStreamFromToken(token)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusForbidden)
return
}
current := stream.CurrentSize()
if current == -1 {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("content-length", fmt.Sprintf("%d", current))
}
数据服务

数据服务的temp包发生改动,新增get和head方法。

Handler函数相比上一版本多了对 HEAD/PUT 方法的处理。如果接口服务以HEAD方式访问数据服务的temp接口,Handler 会调用head;如果接口服务以GET方式访问数据服务的temp接口,则Handler会调用get。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 打开$STORAGE_ROOT/temp/<uuid>.dat文件并将其内容作为HTTP的响应正文输出
func get(w http.ResponseWriter, r *http.Request) {
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid + ".dat")
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
defer f.Close()
io.Copy(w, f)
}

// 将$STORAGE_ROOT/temp/<uuid>.dat文件的大小放在Content-Length响应头部返回
func head(w http.ResponseWriter, r *http.Request) {
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid + ".dat")
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
defer f.Close()
info, e := f.Stat()
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("content-length", fmt.Sprintf("%d", info.Size()))
}

测试

本版本所有代码及测试用例可见增加断点续传功能版本代码

参考

《分布式对象存储—原理、架构及Go语言实现》