数据维护

对象存储系统的长期运行离不开对系统的维护,系统维护包括3个方面:硬件、软件和数据维护。硬件维护包括现有硬件的维修、更换和升级以及新硬件的添加等;软件维护包括错误原因的调查和修复,软件的升级、回滚和全新的安装部署等;数据维护则包括数据的修复、过期数据的删除等。

硬件和软件方面的维护通常有一整套处理流程,手动或自动分批执行,确保整个系统在部分维护的情况下的可用性。

数据维护主要通过软件执行预先设定的维护工作。

对象存储系统的数据维护工作

对象版本留存

存储空间不可能无限制增长,而用户的对象却是每天都会有新版本添加进来。这些数据很快就会变得过时,被更新的对象所替代。而用户通常也不需要长期保留所有的版本。所以需要提供一种留存策略,在保留用户更关注的版本的情况下清理一些不必要的版本。

版本留存策略就是一套决定哪些版本需要被保留的决策依据。数量限定策略会保留每个对象最后的若干个版本而将其余的删除;时间限定策略会将版本保留一段固定的时间,比如3个月或半年等,超过这个阈值的版本将被删除;除了数量限定和时间限定这两类策略以外,当然还有很多更复杂更精妙的策略。

本项目实现最简单的数量限定策略,对于每个对象仅保留最后5个版本。

维护软件除了需要在元数据服务中删除对象旧版本的元数据以外,对于已经没有任何元数据引用的对象数据也需要进行清除。此处存在一个竞争条件的步骤序列:

  1. 维护软件检查某个对象散列值,没有任何元数据引用它
  2. 有用户需要上传一个相同散列值的对象,由于SIS检查该散列值存在,跳过了上传步骤
  3. 维护软件删除了该对象散列值
  4. 用户添加了新的版本引用这个散列值

这样的步骤序列一旦发生,意味着用户的对下数据丢失。为避免这种情况,在删除对象散列值时并没有彻底删除文件,只是将对象文件移动到另一个grabage目录。隔一段时间后再去真正删除。

数据服务的REST接口
1
DELETE /objects/<hash>

删除对象散列值需要数据服务提供对象的DELETE 操作,该接口不仅将对象文件移动至垃圾目录,且从数据服务的定位对象缓存中删除散列值。

具体实现

删除过期元数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
const MIN_VERSION_COUNT = 5

func main() {
// 调用es.SearchVersionStatus将元数据服务中所有版本数量大于等于6的对象搜索出来保存到Bucket结构体的数组buckets中
buckets, e := es.SearchVersionStatus(MIN_VERSION_COUNT + 1)
if e != nil {
log.Println(e)
return
}
// 遍历buckets
for i := range buckets {
bucket := buckets[i]
// 在for循环中调用es.DelMetadata,从该对象当前最小的版本号开始一一删除
// 直到最后剩下5个
for v := 0; v < bucket.Doc_count-MIN_VERSION_COUNT; v++ {
es.DelMetadata(bucket.Key, v+int(bucket.Min_version.Value))
}
}
}

// 输入min_doc_count用于指示需要搜索对象最小版本数量
func SearchVersionStatus(min_doc_count int) ([]Bucket, error) {
client := http.Client{}
url := fmt.Sprintf("http://%s/metadata/_search", os.Getenv("ES_SERVER"))
// 使用ElasticSearch的aggregation search API搜索元数据
// 以对象的名字分组,搜索版本数量大于等于min_doc_count的对象并返回
body := fmt.Sprintf(`
{
"size": 0,
"aggs": {
"group_by_name": {
"terms": {
"field": "name",
"min_doc_count": %d
},
"aggs": {
"min_version": {
"min": {
"field": "version"
}
}
}
}
}
}`, min_doc_count)
request, _ := http.NewRequest("GET", url, strings.NewReader(body))
r, e := client.Do(request)
if e != nil {
return nil, e
}
b, _ := ioutil.ReadAll(r.Body)
var ar aggregateResult
json.Unmarshal(b, &ar)
return ar.Aggregations.Group_by_name.Buckets, nil
}

// 根据对象的名字name和版本号version删除相应的对象元数据
func DelMetadata(name string, version int) {
client := http.Client{}
url := fmt.Sprintf("http://%s/metadata/objects/%s_%d",
os.Getenv("ES_SERVER"), name, version)
request, _ := http.NewRequest("DELETE", url, nil)
client.Do(request)
}
删除没有元数据引用的对象数据

deleteOrphanObject相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func main() {
// 需要在每个数据节点上定期运行
// 调用filepath.Global获取$STORAGE_ROOT/objects/目录下的所有文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")

// for循环中遍历访问文件
for i := range files {
// 从文件中获得对象的散列值
hash := strings.Split(filepath.Base(files[i]), ".")[0]
// 调用es.HasHash检查元数据服务中是否存在该散列值
hashInMetadata, e := es.HasHash(hash)
if e != nil {
log.Println(e)
return
}
// 不存在,调用del删除散列值
if !hashInMetadata {
del(hash)
}
}
}

// 通过ES的search API搜索所有对象元数据中hash属性等于散列值的文档
func HasHash(hash string) (bool, error) {
url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=0", os.Getenv("ES_SERVER"), hash)
r, e := http.Get(url)
if e != nil {
return false, e
}
b, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(b, &sr)
// 如果满足条件的文档数量不为0,说明还存在对该散列值的引用,函数返回true,否则返回false
return sr.Hits.Total != 0, nil
}

// 访问数据服务的DELETE对象接口进行散列值的删除
func del(hash string) {
log.Println("delete", hash)
url := "http://" + os.Getenv("LISTEN_ADDRESS") + "/objects/" + hash
request, _ := http.NewRequest("DELETE", url, nil)
client := http.Client{}
client.Do(request)
}

为支持对象的删除操作,数据服务的objects包发生变化:

DELETE对象相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodGet {
get(w, r)
return
}
if m == http.MethodDelete {
del(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

// 根据对象散列值搜索对象文件
func del(w http.ResponseWriter, r *http.Request) {
hash := strings.Split(r.URL.EscapedPath(), "/")[2]
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/" + hash + ".*")
if len(files) != 1 {
return
}
// 调用locate.Del将散列值移出对象定位缓存
locate.Del(hash)
// 调用os.Rename将对象文件移动到$STORAGE_ROOT/garbage/目录下
os.Rename(files[0], os.Getenv("STORAGE_ROOT")+"/garbage/"+filepath.Base(files[0]))
}
对象数据的检查和修复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func main() {
// 在数据服务节点上定期运行
// 调用filepath.Glob获取$STOTAGE_ROOT/objects/目录下所有文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")

// 在for循环中遍历访问文件
for i := range files {
// 从文件名中获得对象的散列值
hash := strings.Split(filepath.Base(files[i]), ".")[0]
// 调用verify检查数据
verify(hash)
}
}

func verify(hash string) {
log.Println("verify", hash)
// 调用es.SearchHashSize从元数据服务中获取该散列值对应的对象大小
size, e := es.SearchHashSize(hash)
if e != nil {
log.Println(e)
return
}
// 以对象的散列值和大小为参数调用objects.GetStream创建一个对象数据流
// 底层实现会自动完成数据的修复。
stream, e := objects.GetStream(hash, size)
if e != nil {
log.Println(e)
return
}
// 调用utils.CalculateHash计算对象的散列值
d := utils.CalculateHash(stream)
// 检查hash是否一致,不一致则以log的形式报告错误(数据损坏,已经不可修复)
if d != hash {
log.Printf("object hash mismatch, calculated=%s, requested=%s", d, hash)
}
// 关闭数据对象流
stream.Close()
}

// 输入对象的散列值hash,通过ES的seach API查询元数据属性中hash等于该散列值的文档的size属性
func SearchHashSize(hash string) (size int64, e error) {
url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=1",
os.Getenv("ES_SERVER"), hash)
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf("fail to search hash size: %d", r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(result, &sr)
if len(sr.Hits.Hits) != 0 {
size = sr.Hits.Hits[0].Source.Size
}
// 返回size
return
}

测试

本版本所有代码及测试用例可见增加数据维护版本全部代码

参考

《分布式对象存储—原理、架构及Go语言实现》

数据压缩

对象存储服务端并不是最适合做数据压缩的地方。最适合做数据压缩的地方是客户端。一个高性能的客户端不仅可以将大量小对象打包成大对象提高存储和传输的效率,也可以在客户机本地进行数据压缩,进一步节省网络带宽和存储空间。如果云存储系统在设计最初就包含了专门的客户端,那么一定要将数据压缩功能放在客户端,而不是服务端。

数据压缩的效率和使用的压缩算法以及待压缩数据的特征密切相关,存放随机数据的二进制文件的压缩比惨不忍睹,文本文件的压缩比会好很多。如果云存储系统中没有一个专门的客户端,或者用户更倾向使用通用的客户端比如浏览器,且用户上传的对象大多数都是一些适合数据压缩的文档,那么可以考虑在服务端实现数据压缩功能,将客户上传的对象压缩起来再进行存储。

可以应用数据压缩功能的不仅仅在数据存储这一块,数据的传输也一样可以进行压缩。对于对象的上传来说,由于没有一个专门的客户端,没办法限定客户上传的数据。但是对于对象的下载,服务端可以提供一种选择,只要客户端支持,接口服务就可以传输压缩后的数据给客户端。

Go语言原生支持的压缩算法包有bzip2、flate、gzip、lzw 和zlib。

本项目采用的压缩算法是 gzip,它不是压缩速度最快的也不是压缩比最高的压缩算法,但是对于功能的实现来说,gzip足够好且足够简单。

用gzip实现对象存储和下载时的数据压缩

存储时的数据压缩

在本版本之前,数据服务节点把分片临时对象转正时使用的是os.Rename 操作,将$STORAGE_ROOT/temp/<uuid>.dat重命名为$STORAGE_ROOT/objects/<hash>.X.<hash>of shard X>。而本版本的实现则需要读取$STORAGE_ROOT/temp/<uid>.dat文件的内容,并使用gzip压缩后写入$STORAGE_ROOT/objects/<hash>.X.<hash of shard X>

在临时对象转正时进行数据压缩

在读取对象分片时,数据服务节点需要在读取$STORAGE_ROOT/objects/<hash>.X.<hash of shard X>文件的内容后先进行 gzip解压,然后才作为HTTP响应的正文输出。

get对象时进行数据解压

下载时进行数据压缩

客户端在下载对象时可以设置Accept-Encoding头部为gzip。接口服务在检查到这个头部后会将对象数据流经过gzip压缩后写入HTTP响应的正文。

对象下载时进行数据压缩

接口服务的REST接口
1
2
3
4
5
6
7
GET /objects/<object_name>
请求头部
Accept-Encoding:gzip
响应头部
Content-Encoding:gzip
响应正文
gzip压缩后的对象内容

具体实现

接口服务

接口服务的objects.get函数发生改变:多了一个对Accpet-Encoding请求头部的检查。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
func get(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
hash := url.PathEscape(meta.Hash)
stream, e := GetStream(hash, meta.Size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
offset := utils.GetOffsetFromHeader(r.Header)
if offset != 0 {
stream.Seek(offset, io.SeekCurrent)
w.Header().Set("content-range", fmt.Sprintf("bytes %d-%d/%d", offset, meta.Size-1, meta.Size))
w.WriteHeader(http.StatusPartialContent)
}
// 增加对Accept-Encoding请求头部的检查
acceptGzip := false
encoding := r.Header["Accept-Encoding"]
for i := range encoding {
if encoding[i] == "gzip" {
acceptGzip = true
break
}
}
// 如果头部中含有gzip,说明客户端可以接受gzip压缩数据
if acceptGzip {
// 设置Content-Encoding响应头部为gzip
w.Header().Set("content-encoding", "gzip")
// 以w为参数调用gzip.NewWriter创建一个指向gzip.Writer结构体的指针w2
w2 := gzip.NewWriter(w)
// 用io.Copy将对象数据流stream的内容用io.Copy写入w2,数据会被自动压缩,然后写入w
io.Copy(w2, stream)
w2.Close()
} else {
io.Copy(w, stream)
}
stream.Close()
}
数据服务

用于将临时对象转正的commitTempObject函数发生改变:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func commitTempObject(datFile string, tempinfo *tempInfo) {
f, _ := os.Open(datFile)
defer f.Close()
d := url.PathEscape(utils.CalculateHash(f))
f.Seek(0, io.SeekStart)
// 使用os.Create创建正式对象文件w
w, _ := os.Create(os.Getenv("STORAGE_ROOT") + "/objects/" + tempinfo.Name + "." + d)
// 然后以w为参数调用gzip.NewWriter创建w2
w2 := gzip.NewWriter(w)
// 将临时对象文件f中的数据复制进w2
io.Copy(w2, f)
w2.Close()
// 删除临时对象文件
os.Remove(datFile)
// 添加对象定位缓存
locate.Add(tempinfo.hash(), tempinfo.id())
}

用于读取对象的objects.SendFile函数发生改变:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func sendFile(w io.Writer, file string) {
f, e := os.Open(file)
if e != nil {
log.Println(e)
return
}
defer f.Close()
// 在对象文件上用gzip.NewReader创建一个指向gzip.Reader结构体的指针gzipStream
gzipStream, e := gzip.NewReader(f)
if e != nil {
log.Println(e)
return
}
// 读出gzipStream中的数据
io.Copy(w, gzipStream)
gzipStream.Close()
}

测试

本版本所有代码及测试用例可见增加数据压缩版本全部代码

参考

《分布式对象存储—原理、架构及Go语言实现》

断点续传

使用断点续传的原因

理想环境里网络永远通畅,对象存储系统并不需要断点续传这个功能。但在现实世界,对象存储服务在数据中心运行,而客户端在客户本地机器上运行,它们之间通过互联网连接。互联网的连接速度慢且不稳定,有可能由于网络故障导致断开连接。在客户端上传或下载一个大对象时,因网络断开导致上传下载失败的概率就会变得不可忽视。为了解决这个问题,对象存储服务必须提供断点续传功能,允许客户端从某个检查点而不是从头开始上传或下载对象

断点下载

断点下载的实现比较简单,客户端在GET对象请求时通过设置Range头部来告诉接口服务需要从什么位置开始输出对象的数据

接口服务的处理流程在生成对象流之前和上一版本没有任何区别,但是在成功打开了对象数据流之后,接口服务会额外调用rs.RSGetStream.Seek方法跳至客户端请求的位置,然后才开始输出数据。

断点下载流程

断点上传流程

断点上传的流程则要比断点下载复杂得多,这是由 HTTP服务的特性导致的。客户端在下载时并不在乎数据的完整性,一旦发生网络故障,数据下到哪算哪,下次继续从最后下载的数据位置开始续传就可以了

但是对于上传来说,接口服务会对数据进行散列值校验,当发生网络故障时,如果上传的数据跟期望的不一致,那么整个上传的数据都会被丢弃。所以断点上传在一开始就需要客户端和接口服务做好约定,使用特定的接口进行上传

对象POST接口

客户端在知道自己要上传大对象时就主动改用对象POST接口,提供对象的散列值和大小。接口服务的处理流程和上一版本处理对象PUT一样,搜索6个数据服务并分别POST临时对象接口。数据服务的地址以及返回的uuid 会被记录在一个token里返回给客户端。

客户端POST对象后会得到一个token。对token进行PUT可以上传数据。在上传时客户端需要指定range头部来告诉接口服务上传数据的范围。接口服务对token进行解码,获取6个分片所在的数据服务地址以及uuid,分别调用PATCH将数据写入6个临时对象。

用PUT方法访问token上传数据

通过PUT上传的数据并不一定会被接口服务完全接收。在上一版本已经知道,经过RS分片的数据是以块的形式分批写入4个数据片的,每个数据片一次写入8000字节,4个数据片一共写入 32 000字节。所以除非是最后一批数据,否则接口服务只接收32000字节的整数倍进行写入。这是一个服务端的行为逻辑,不能要求客户端知道接口服务背后的逻辑,所以接口服务必须提供token 的 HEAD操作,让客户端知道服务端上该token目前的写入进度。

用HEAD方法访问token获取当前已经上传了多少数据

客户端每次用PUT方法访问token之前都需要先用HEAD方法获取当前已上传了多少数据。接口服务对token进行解码,获取6个分片所在的数据服务地址以及uuid,仅对第一个分片调用HEAD获取该分片当前长度,将这个数字乘以4,作为Content-Length响应头部返回给客户端。

接口服务的REST接口

接口服务的objects接口GET 方法新增了Range 请求头部,用于告诉接口服务需要的对象数据范围。

1
2
3
4
5
6
7
GET /objects/<object_name>
请求头部:
Range:bytes=<first>-
响应头部:
Content-Range: bytes<first>-<size>/<size>
响应正文:
从first开始的对象内容

Range 请求头部是HTTP/1.1协议的一部分。给GET请求加上 Range头部意味着这个请求期望的只是全体数据的一个或多个子集。Range 请求主要支持以字节为单位的byte Range(虽然协议本身也支持其他自定义的Range单位,但是如果实现者没有在 IANA申请注册这个自定义的单位,那么就只有自己写的客户端和服务端之间可以互相理解),byte range的格式是固定字符串”bytes=”开头,后面跟上一个或多个数字的范围,由逗号分隔。假设整体数据是10000字节,那么合法的byte range格式可以有以下几个例子:

1
2
3
4
5
6
7
8
9
请求最后500个字节(9500~9999):
bytes=-500 或 bytes=9500-

请求第一个和最后一个字节(字节О和9999):
bytes=0-0,—1

其他几个合法但不常见的请求第500~999个字节的格式:
bytes=500-600,601-999
bytes=500-700,601-999

本项目的对象存储系统实现的格式是bytes=<first>-客户端通过指定first 的值告诉接口服务下载对象的数据范围,接口服务返回的数据从first开始,first之前的对象数据会在服务端被丢弃。根据Range请求的协议规范,接口服务需要返回HTTP错误代码 206 Partial Content,并设置 Content-Range响应头部告知返回数据的范围<first>-<size>/<size>,其中<first>是客户端要求的起始位置,<size>是对象的大小。

objects接口还新增了POST方法,用于创建token。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
POST /objects/<object_name>
请求头部:
Digest:SHA-256=<对象散列值的Base64编码>
Size:<对象内容的长度>
响应头部:
Location:<访问/temp/token的URI>
token被放在Location头部返回给客户端。客户端拿到后可以直接访问该URI.

除了objects接口发生的改变以外,接口服务还新增temp接口:
HEAD /temp/<token>
响应头部:
Content-Length:<token当前的上传字节数>

PUT /temp/<token>
请求头部:
Range:bytes=<first>-<last>
请求正文:
对象的内容,字节范围为first~last

客户端通过Range头部指定上传的范围,first必须跟token当前的上传字节数一致,否则接口服务会返回416 Range Not Satisfiable。如果上传的是最后一段数据,<last>为空。

数据服务的REST接口

数据服务的temp接口新增了HEAD和GET两个方法,HEAD方法用于获取某个分片临时对象当前的大小;而GET方法则用于获取临时对象的数据。

1
2
3
4
5
6
7
HEAD /temp/<uuid>
响应头部:
Content-Length:<临时对象当前的上传字节数>

GET /temp/<uuid>
响应正文:
临时对象的内容

客户端将对象所有的数据上传完毕之后,接口服务需要调用这个方法从数据服务读取各分片临时对象的内容并进行数据校验,只有在验证了对象的散列值符合预期的情况下,服务端才认为该对象的上传是成功的,进而将临时对象转正。

具体实现

接口服务

接口服务的main函数以及objects包发生了变化,且新增加了temp包。

1
2
3
4
5
6
7
8
9
func main() {
go heartbeat.ListenHeartbeat()
http.HandleFunc("/objects/", objects.Handler)
// 增加temp.Handler函数用于处理对/temp的请求
http.HandleFunc("/temp/", temp.Handler)
http.HandleFunc("/locate/", locate.Handler)
http.HandleFunc("/versions/", versions.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}
接口服务的objects包

objects.Handler函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
// 增加POST方法的处理函数post
if m == http.MethodPost {
post(w, r)
return
}
if m == http.MethodPut {
put(w, r)
return
}
if m == http.MethodGet {
get(w, r)
return
}
if m == http.MethodDelete {
del(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

objects.post相关函数:

前半段处理流程与put函数类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
func post(w http.ResponseWriter, r *http.Request) {
// 从请求的URL中获得对象的名字
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 从请求的相应头部获得对象的大小
size, e := strconv.ParseInt(r.Header.Get("size"), 0, 64)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusForbidden)
return
}
// 从请求的相应头部获得对象的散列值
hash := utils.GetHashFromHeader(r.Header)
if hash == "" {
log.Println("missing object hash in digest header")
w.WriteHeader(http.StatusBadRequest)
return
}
// 如果散列值已经存在,可以直接往元数据服务添加新版本并返回200OK
if locate.Exist(url.PathEscape(hash)) {
e = es.AddVersion(name, hash, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
} else {
w.WriteHeader(http.StatusOK)
}
return
}
// 散列值不存在,随机选出6个数据节点
ds := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil)
if len(ds) != rs.ALL_SHARDS {
log.Println("cannot find enough dataServer")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
// 调用rs.NewRSResumablePutStream生成数据流stream
stream, e := rs.NewRSResumablePutStream(ds, name, url.PathEscape(hash), size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 调用ToToken方法生成一个字符串token,放入Location响应头部
w.Header().Set("location", "/temp/"+url.PathEscape(stream.ToToken()))
// 返回HTTP代码201 Created
w.WriteHeader(http.StatusCreated)
}

// 保存对象的名字、大小、散列值以及6个分片所在数据服务节点的地址和uuid
type resumableToken struct {
Name string
Size int64
Hash string
Servers []string
Uuids []string
}

type RSResumablePutStream struct {
*RSPutStream
*resumableToken
}

// 传入保存数据服务节点地址的dataServers数组,对象的名字name,散列值以及大小
func NewRSResumablePutStream(dataServers []string, name, hash string, size int64) (*RSResumablePutStream, error) {
// 调用NewRSPutStream创建一个类型为RSPutStream的变量putStream
putStream, e := NewRSPutStream(dataServers, hash, size)
if e != nil {
return nil, e
}
uuids := make([]string, ALL_SHARDS)
// 从putStream的成员writers数组中获取6个分片的uuid,保存到uuid数组
for i := range uuids {
uuids[i] = putStream.writers[i].(*objectstream.TempPutStream).Uuid
}
// 创建resumableToken结构体token
token := &resumableToken{name, size, hash, dataServers, uuids}
return &RSResumablePutStream{putStream, token}, nil
}

// 将自身数据以JSON格式编入,然后返回结果Base64编码后的字符串
func (s *RSResumablePutStream) ToToken() string {
b, _ := json.Marshal(s)
return base64.StdEncoding.EncodeToString(b)
}

objects包修改了get函数,相关函数修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func get(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
hash := url.PathEscape(meta.Hash)
stream, e := GetStream(hash, meta.Size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 调用utils.GetOffsetFromHeader函数获取HTTP的Range头部
offset := utils.GetOffsetFromHeader(r.Header)
if offset != 0 {
// offset不为0,调用stream的Seek方法跳到offset位置
stream.Seek(offset, io.SeekCurrent)
// 设置Content-Range响应头部以及HTTP代码206 Partial Content
w.Header().Set("content-range", fmt.Sprintf("bytes %d-%d/%d", offset, meta.Size-1, meta.Size))
w.WriteHeader(http.StatusPartialContent)
}
// 通过io.Copy输出数据
io.Copy(w, stream)
stream.Close()
}

// Range的头部格式必须为bytes=<first>- 开头
func GetOffsetFromHeader(h http.Header) int64 {
byteRange := h.Get("range")
if len(byteRange) < 7 {
return 0
}
if byteRange[:6] != "bytes=" {
return 0
}
// 调用strings.Split将<first>部分切取出来
bytePos := strings.Split(byteRange[6:], "-")
// 调用strconv.ParseInt将字符串转换为int64返回
offset, _ := strconv.ParseInt(bytePos[0], 0, 64)
return offset
}

// offset表示需要跳过的字节数,whence表示起跳点
func (s *RSGetStream) Seek(offset int64, whence int) (int64, error) {
// 方法只支持从当前位置io.SeekCurrent起跳
if whence != io.SeekCurrent {
panic("only support SeekCurrent")
}
// 跳过的字节数不能为负
if offset < 0 {
panic("only support forward seek")
}
// for循环中每次读取32000字节并丢弃,直到读到offset位置为止
for offset != 0 {
length := int64(BLOCK_SIZE)
if offset < length {
length = offset
}
buf := make([]byte, length)
io.ReadFull(s, buf)
offset -= length
}
return offset, nil
}
接口服务的temp包

temp.Handler函数

1
2
3
4
5
6
7
8
9
10
11
12
13
// 检查访问方式并调用相应的函数
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodHead {
head(w, r)
return
}
if m == http.MethodPut {
put(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

temp.put相关函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
func put(w http.ResponseWriter, r *http.Request) {
token := strings.Split(r.URL.EscapedPath(), "/")[2]
stream, e := rs.NewRSResumablePutStreamFromToken(token)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusForbidden)
return
}
// 调用CurrentSize获取token当前大小
current := stream.CurrentSize()
if current == -1 {
w.WriteHeader(http.StatusNotFound)
return
}
// 从Range头部获得offset
offset := utils.GetOffsetFromHeader(r.Header)
// 如果offset和当前的大小不一致,接口服务返回416Range Not Satisfiable
if current != offset {
w.WriteHeader(http.StatusRequestedRangeNotSatisfiable)
return
}
bytes := make([]byte, rs.BLOCK_SIZE)
// 在for循环中以32000字节为长度读取HTTP请求的正文并写入stream
for {
n, e := io.ReadFull(r.Body, bytes)
if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
current += int64(n)
// 如果读到的总长度超出了对象的大小,说明客户端上传的数据有误
// 接口服务删除临时对象并返回403 Forbidden
if current > stream.Size {
stream.Commit(false)
log.Println("resumable put exceed size")
w.WriteHeader(http.StatusForbidden)
return
}
if n != rs.BLOCK_SIZE && current != stream.Size {
return
}
stream.Write(bytes[:n])
// 读到的总长度等于对象的大小,说明客户端上传了对象的全部数据
if current == stream.Size {
// 调用Flush方法将剩余数据写入临时对象
stream.Flush()
// 调用NewRSResumableGetStream生成一个临时对象读取流getStream
getStream, e := rs.NewRSResumableGetStream(stream.Servers, stream.Uuids, stream.Size)
// 读取getStream中的数据并计算散列值
hash := url.PathEscape(utils.CalculateHash(getStream))
// 散列值不一致,客户端上传数据有错误
if hash != stream.Hash {
// 接口服务删除临时对象
stream.Commit(false)
log.Println("resumable put done but hash mismatch")
// 返回403 Forbidden
w.WriteHeader(http.StatusForbidden)
return
}
// 检查该散列值是否已经存在
if locate.Exist(url.PathEscape(hash)) {
// 存在则删除临时对象
stream.Commit(false)
} else {
// 否则将对象转正
stream.Commit(true)
}
// 调用es.AddVersion添加新版本
e = es.AddVersion(stream.Name, stream.Hash, stream.Size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
}
return
}
}
}

func NewRSResumablePutStreamFromToken(token string) (*RSResumablePutStream, error) {
// 对token进行Base64解码
b, e := base64.StdEncoding.DecodeString(token)
if e != nil {
return nil, e
}

var t resumableToken
// 将JSON数据编出形成resumableToken结构体t
e = json.Unmarshal(b, &t)
if e != nil {
return nil, e
}

writers := make([]io.Writer, ALL_SHARDS)
// t的Servers和uuid数组中保存了当初创建的6个分片临时对象所在的数据服务节点地址和uuid
for i := range writers {
// 保存到writers数组中
writers[i] = &objectstream.TempPutStream{t.Servers[i], t.Uuids[i]}
}
// 以writers数组为参数创建encoder结构体enc
enc := NewEncoder(writers)
// 以enc为内嵌结构体创建RSPutStream
// 最终以RSPutStream和t为内嵌结构体创建RSResumablePutStream返回
return &RSResumablePutStream{&RSPutStream{enc}, &t}, nil
}

// 以HEAD方法获取第一个分片临时对象的大小并乘4作为size返回
func (s *RSResumablePutStream) CurrentSize() int64 {
r, e := http.Head(fmt.Sprintf("http://%s/temp/%s", s.Servers[0], s.Uuids[0]))
if e != nil {
log.Println(e)
return -1
}
if r.StatusCode != http.StatusOK {
log.Println(r.StatusCode)
return -1
}
size := utils.GetSizeFromHeader(r.Header) * DATA_SHARDS
// 如果size超出对象的大小,返回对象大小
if size > s.Size {
size = s.Size
}
return size
}

temp.head相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 根据token恢复出stream后调用CurrentSize获取当前大小并方在Content-Length头部返回
func head(w http.ResponseWriter, r *http.Request) {
token := strings.Split(r.URL.EscapedPath(), "/")[2]
stream, e := rs.NewRSResumablePutStreamFromToken(token)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusForbidden)
return
}
current := stream.CurrentSize()
if current == -1 {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Set("content-length", fmt.Sprintf("%d", current))
}
数据服务

数据服务的temp包发生改动,新增get和head方法。

Handler函数相比上一版本多了对 HEAD/PUT 方法的处理。如果接口服务以HEAD方式访问数据服务的temp接口,Handler 会调用head;如果接口服务以GET方式访问数据服务的temp接口,则Handler会调用get。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 打开$STORAGE_ROOT/temp/<uuid>.dat文件并将其内容作为HTTP的响应正文输出
func get(w http.ResponseWriter, r *http.Request) {
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid + ".dat")
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
defer f.Close()
io.Copy(w, f)
}

// 将$STORAGE_ROOT/temp/<uuid>.dat文件的大小放在Content-Length响应头部返回
func head(w http.ResponseWriter, r *http.Request) {
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid + ".dat")
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
defer f.Close()
info, e := f.Stat()
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("content-length", fmt.Sprintf("%d", info.Size()))
}

测试

本版本所有代码及测试用例可见增加断点续传功能版本代码

参考

《分布式对象存储—原理、架构及Go语言实现》

数据冗余和即时修复

数据冗余

上一版本的去重主要是用于避免同一个对象在系统中各处都被保存一份副本。本版本的冗余是在受到版本控制的情况下增加对象数据的稳定性。

数据丢失和数据不可用

数据丢失是指信息在存储、传输或处理的过程中由于错误或遗漏而发生损失。

数据在传输过程中的丢失通常是由于网络不稳定导致的,对数据进行校验可以有效检测出传输过程中发生的数据丢失,然后服务端就可以拒绝接收有损的数据。

数据处理过程中的丢失则可能是由于软件或人为的错误而造成的。对于软件错误,需要对其进行修复并重新部署;对于人为错误,需要制定严格的操作规范。

重点是:已经丢失的数据无法恢复

存储硬件损坏是数据在存储过程中丢失的最常见的原因,可能发生的硬件损坏从某个硬盘出现坏道到整个数据中心受灾等不一而足,使用数据备份以及灾难恢复可以在一定程度上弥补损失,但是这通常都会造成几小时到几天不等的停机时间,而且系统最后一次备份点之后加入的数据也依然是无法恢复的。

服务器的维护可能导致数据暂时的不可用,比如预先安排的服务器重启等。和永久性的数据丢失不同,数据不可用是暂时性的,当服务器重启完成后,数据就会恢复可用的状态。但是在服务器重启过程中如果恰好有用户需要对其上的对象进行访问,那么同样会表现成数据丢失。

除了硬件损坏和服务器维护以外,还有数据降解也会造成数据丢失。数据降解是由数据存储设备的非关键故障累积导致的数据逐渐损坏,即使在没有发生任何软件错误或硬件损坏的情况下,存储介质上的数据依然有可能随时间的推移而丢失。比如说,固态硬盘会由于存在缺陷的绝缘封装工艺而导致其中保存的电荷慢慢流失:磁盘上保存的比特会随时间的推移而消磁:潮湿温暖的空气会加速磁性材质的降解等。

单个数据的损坏和丢失是不可避免的。为了保护用户的数据,在计算机存储领域,依靠数据冗余来对抗数据丢失数据冗余不仅可以在一定程度上克服数据丢失,而且在发生数据丢失的时候还可以帮助对其进行修复。

数据冗余

在计算机领域,数据冗余是指在存储和传输的过程中,除了实际需要的数据,还存在一些额外数据用来纠正错误。这些额外的数据可以是一份简单的原始数据的复制,也可以是一些经过精心选择的校验数据,允许在一定程度上检测并修复损坏的数据。

比如,ECC (Error-correcting memory)内存在其每一个存储字中会额外包含存储数据的校验和,可以检测并修正一个比特的错误;RAID 1使用两个硬盘组成单个逻辑存储单元,在任何一个硬盘发生故障的情况下依然可以有效运行:Btrfs和ZFS这样的文件系统通过结合使用校验和以及数据复制这两种方式来检测并修正硬盘上的数据降解等。
在对象存储领域,我们也有很多数据冗余的策略。

对象存储系统的数据冗余策略

最显而易见的冗余策略是每个对象都保留两个或更多副本,由接口服务负责将其存储在不同的数据服务节点上。跟去重之前完全无控制的状况不同,多副本冗余是受接口服务控制的有限副本冗余,副本对象的数量有限,而不是散落在每一个数据服务节点上。多副本冗余的概念很简单,实现也很方便,任何一台数据服务节点停机都不会影响数据的可用性,因为在另外一台数据服务节点上还存在一个副本。

多副本冗余的策略胜在实现简单,但是代价也比较高,本项目将要介绍和实现的冗余策略比多副本冗余复杂,叫作 Reed Solomon 纠删码。在编码理论学中,RS 纠删码属于非二进制循环码,它的实现基于有限域上的一元多项式,并被广泛应用于CD、DVD、蓝光、QR码等消费品技术,DSL、WiMAX等数据传输技术,DVB、ATSC等广播技术以及卫星通信技术等。

RS纠删码允许选择数据片和校验片的数量,本项目选择4个数据片加两个校验片,也就是说会把一个完整的对象平均分成6个分片对象,其中包括4个数据片对象,每个对象的大小是原始对象大小的25%,另外还有两个校验片,其大小和数据片一样。这6个分片对象被接口服务存储在6个不同的数据服务节点上,只需要其中任意4个就可以恢复出完整的对象。

要在对象存储系统中评价一个冗余策略的好坏,主要是衡量该策略对存储空间的要求和其抗数据损坏的能力。对存储空间的要求是指采用的冗余策略相比不使用冗余要额外支付的存储空间,以百分比表示。抗数据损坏的能力以允许损坏或丢失的对象数量来衡量。

比如说,在没有任何冗余策略的情况下,对象占用存储空间的大小就是它本身的大小,而一旦该对象损坏,就丢失了这个对象,那么它对存储空间的要求是100%,而抵御能力则是0。如果采用双副本冗余策略,当任何一个副本损坏或丢失时,都可以通过另外一个副本将其恢复出来。也就是说,这个冗余策略对存储空间要求是200%,抵御数据损坏的能力是1(可以丢失两个副本中的任意1个),而使用4+2的RS码的策略,存储空间要求是150%,抵御能力是2(可以丢失6个分片对象中的任意两个)。总的来说,对于一个M+N的RS码(M个数据片加N个校验片),其对存储空间的要求是(M+N)/M*100%,抵御能力是N。

可以看到,RS码冗余策略对存储空间的要求更低,而抵御数据损坏的能力却更强。选择RS码还有一个好处,就是它会将一个大对象拆分成多个分片对象分别存储,有助于数据服务层的负载均衡。

使用了RS 码冗余策略之后,对象存储系统单个节点的维护就不会导致整个系统的停机。只要每次维护的节点数小于N,那么任意对象的分片数依然大于M,对象就可以正确恢复。

数据冗余的实现

由于底层使用的数据冗余策略对上层的接口不产生任何影响。REST接口不发生变化。

对象PUT流程

使用RS纠删码实现冗余策略的对象PUT流程

接口服务会将客户端PUT上来的数据流切成4+2的分片,然后向6个数据服务节点上传临时对象<hash>.X,同时计算对象散列值。如果散列值一致,6个临时对象会被转成正式的分片对象<hash>.X,其内容被保存在$STORAGE_ROOT/objects/<hash>.X.<hash of shard X>文件中。其中, X的取值范围为0~5的整数,表示该分片的id,0~3是数据片,4和5则是校验片<hash of shard X>是该分片的散列值,在转正时通过计算临时对象的内容得到。

对象GET流程

使用RS冗余编码的对象GET流程

接口服务发送针对对象散列值<hash>的定位信息,含有该对象分片的数据服务共有6个,它们都会发送反馈的消息。接口服务在收到所有反馈消息后向响应的数据服务分别GET分片对象<hash>.X。数据服务读取分片X的内容,并响应接口服务的请求。然后接口服务将数据片0到3的内容组合成对象来响应客户端的请求。

接口服务在进行定位时的超时和之前一样是1s,如果在1s内收到所有6个定位反馈则定位成功:如果在1s超时后收到的定位反馈大于等于4,那么依然可以恢复出完整的对象:如果定位反馈小于等于3,则定位失败。

当数据服务的接口收到的请求对象是<hash>.X,数据服务需要自己去$STORAGE_ROOT/objects目录下寻找<hash>.X开头的文件<hash>.X.<hash of shard X>然后在返回该文件内容时还需要进行数据校验,确保其内容的散列值和<hash of shard X>一致。如果不一致,则需要删除该分片对象并返回404 Not Found。这是为了防止该分片对象被数据降解破坏。

接口服务可能由于分片定位失败或分片散列值不一致等原因无法获取某个分片的数据。如果失败的分片数小于等于2,接口服务会根据成功获取的分片重塑数据,并将新的分片写入随机的数据服务;如果失败的分片数大于等于3,对这样的数据丢失无能为力,只能向客户端返回404 Not Found。

具体实现

为实现RS码,接口服务的locate、heartbeat和objects包需要做相应变化。

接口服务的locate包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func Locate(name string) (locateInfo map[int]string) {
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
q.Publish("dataServers", name)
c := q.Consume()
go func() {
// 设置1秒超时,无论当前收到多少条反馈消息都会立即返回
time.Sleep(time.Second)
q.Close()
}()
locateInfo = make(map[int]string)
// 使用for循环获取6条消息,每条消息包含拥有某个分片的数据服务节点的地址和分片的id
// rs.ALL_SHARDS为rs包中的常数6,代表一共有4+2个分片
for i := 0; i < rs.ALL_SHARDS; i++ {
msg := <-c
if len(msg.Body) == 0 {
return
}
var info types.LocateMessage
json.Unmarshal(msg.Body, &info)
// 获取到的分片信息放到输出参数locateInfo中
locateInfo[info.Id] = info.Addr
}
return
}

// 判断收到的反馈消息数量是否大于等于4
func Exist(name string) bool {
// 大于等于4则说明对象存在,否则说明对象不存在(或者存在也无法读取)
return len(Locate(name)) >= rs.DATA_SHARDS
}
接口服务的heartbeat包

接口服务的heartbeat包也需要进行改动,将原来的返回一个随机数据服务节点的ChooseRandomDataServer函数改为能够返回多个随机数据服务节点ChooseRandomDataServers函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 用于获取上传复原分片的随机数据服务节点。目前已有的分片所在的数据服务节点需要被排除
func ChooseRandomDataServers(n int, exclude map[int]string) (ds []string) {
// 输入参数n表示需要多少个随机数服务节点
// 输入参数exclude表明要求返回的数据服务节点不能包含哪些节点。
// 当定位完成后,实际收到的反馈消息可能不足6个,此时需要进行数据恢复,即根据目前已有的分片将丢失的分片复原出来并再次上传到数据服务
candidates := make([]string, 0)
reverseExcludeMap := make(map[string]int)
for id, addr := range exclude {
reverseExcludeMap[addr] = id
}
servers := GetDataServers()
for i := range servers {
s := servers[i]
_, excluded := reverseExcludeMap[s]
if !excluded {
// 不需要被排除的加入到condaidate数组
candidates = append(candidates, s)
}
}
length := len(candidates)
if length < n {
// 无法满足要求的n个数据服务节点,返回一个空数组
return
}
// 将0-length-1的所有整数乱序排列返回一个数组
p := rand.Perm(length)
// 取前n个作为candicate数组的下标取数据节点地址返回
for i := 0; i < n; i++ {
ds = append(ds, candidates[p[i]])
}
return
}
接口服务的objects包
PUT相关函数

PUT对象时需要用到的putStream函数,调用新的heartbeat.ChooseRandomDataServers函数获取随机数据服务节点地址并调用rs.NewRSPutStream来生成一个数据流:

1
2
3
4
5
6
7
8
9
func putStream(hash string, size int64) (*rs.RSPutStream, error) {
servers := heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS, nil)
if len(servers) != rs.ALL_SHARDS {
return nil, fmt.Errorf("cannot find enough dataServer")
}

// 返回一个指向rs.RSPutStream结构体的指针
return rs.NewRSPutStream(servrs, hash, size)
}

创建rs.RSPutStream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
const (
DATA_SHARDS = 4
PARITY_SHARDS = 2
ALL_SHARDS = DATA_SHARDS + PARITY_SHARDS
BLOCK_PER_SHARD = 8000
BLOCK_SIZE = BLOCK_PER_SHARD * DATA_SHARDS
)

// 内嵌一个encoder结构体,RPutStream的使用者可以像访问RPutStream的方法或成员一样访问*encoder的方法或成员
type RSPutStream struct {
*encoder
}

func NewRSPutStream(dataServers []string, hash string, size int64) (*RSPutStream, error) {
// dataServers长度不为6,返回错误
if len(dataServers) != ALL_SHARDS {
return nil, fmt.Errorf("dataServers number mismatch")
}
// 根据size计算出每个分片的大小perShard,也就是size/4再向上取整
perShard := (size + DATA_SHARDS - 1) / DATA_SHARDS
// 创建长度为6的io.Writers数组
writers := make([]io.Writer, ALL_SHARDS)
var e error
for i := range writers {
// writers数组中的每个元素都是一个objectstream.TempPutSterm,用于上传一个分片对象
writers[i], e = objectstream.NewTempPutStream(dataServers[i],
fmt.Sprintf("%s.%d", hash, i), perShard)
if e != nil {
return nil, e
}
}
// 调用NewEncoder函数创建一个encoder结构体指针enc
enc := NewEncoder(writers)
// 将enc作为RSPutSterm的内嵌结构体返回
return &RSPutStream{enc}, nil
}

type encoder struct {
writers []io.Writer
enc reedsolomon.Encoder // reedsolomon.Encoder接口
cache []byte //用于做输入数据缓存的字节数组
}

// 调用reeddolomon.New生成具有4个数据片加两个校验片的RS码编码器enc
func NewEncoder(writers []io.Writer) *encoder {
enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
// 将输入参数writers和enc作为生成的encoder结构体的成员返回
return &encoder{writers, enc, nil}
}

reedsolomon包是一个RS编解码的开源库,需要用以下命令下载该包:
go get github.com/klauspost/reedsolomon

在对象PUT过程中,写入对象数据的流调用的是rs.RSPutStream,实际背后调用的Write方法也不一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// RSPutStream本身没有实现Write方法,实现的时候函数会直接调用其内嵌结构体encoder的Write方法
func (e *encoder) Write(p []byte) (n int, err error) {
length := len(p)
current := 0
// 将p中待写入的数据以块的形式放入缓存
for length != 0 {
next := BLOCK_SIZE - len(e.cache)
if next > length {
next = length
}
e.cache = append(e.cache, p[current:current+next]...)
// 如果缓存已满,调用Flush方法将缓存实际写入writers
// 缓存的上限是每个数据片8000字节,4个数据片共32000字节。
// 如果缓存里剩余的数据不满32000字节就暂不刷新,等待Write方法下一次调用
if len(e.cache) == BLOCK_SIZE {
e.Flush()
}
current += next
length -= next
}
return len(p), nil
}

func (e *encoder) Flush() {
if len(e.cache) == 0 {
return
}
// 调用Split方法将缓存的数据切成4个数据片
shards, _ := e.enc.Split(e.cache)
// 调用Encode方法生成两个校验片
e.enc.Encode(shards)
// 循环将6个片的数据依次写入wtiters并清空缓存
for i := range shards {
e.writers[i].Write(shards[i])
}
e.cache = []byte{}
}

用户上传的对象数据经过散列值校验后,RSPutStrem需要一个Commit方法用来将临时对象转正或删除。

1
2
3
4
5
6
7
8
func (s *RSPutStream) Commit(success bool) {
// 调用RSPutStream的内嵌结构体encoder的Flush方法将缓存中最后的数据写入
s.Flush()
// 对调用encoder的成员数组writers中的元素调用Commit方法将6个临时对象依次转正或删除
for i := range s.writers {
s.writers[i].(*objectstream.TempPutStream).Commit(success)
}
}
GET相关函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func get(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
hash := url.PathEscape(meta.Hash)

stream, e := GetStream(hash, meta.Size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 调用io.Copy将对象数据流写入HTTP响应
_, e = io.Copy(w, stream)
// 如果有错误,说明对象数据在RS解码过程中发生了错误,意味着该对象已经无法被读取
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
stream.Close()
}

// 提供给外部包使用。增加size参数是由于RS码的实现要求每一个数据片的长度完全一样
// 在编码如果对象长度不能被4整除,函数会队最后一个数片进行填充。
// 解码时必须提供对象的准确长度,防止填充数据被当成元数对象数据返回。
func GetStream(hash string, size int64) (*rs.RSGetStream, error) {
// 根据对象散列值hash定位对象
locateInfo := locate.Locate(hash)
// 反馈的定位结果数组长度小于4,返回错误
if len(locateInfo) < rs.DATA_SHARDS {
return nil, fmt.Errorf("object %s locate fail, result %v", hash, locateInfo)
}
dataServers := make([]string, 0)
// 长度不为6,说明对象有部分分片丢失
if len(locateInfo) != rs.ALL_SHARDS {
// 调用heartbeat.ChooseRandomDataServers随机选择用于接收恢复分片的数据服务节点
dataServers = heartbeat.ChooseRandomDataServers(rs.ALL_SHARDS-len(locateInfo), locateInfo)
}
// 调用rs.NewRSGetStream函数创建rs.RSGetStream
return rs.NewRSGetStream(locateInfo, dataServers, hash, size)
}

创建Re.RSGetStream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
func NewRSGetStream(locateInfo map[int]string, dataServers []string, hash string, size int64) (*RSGetStream, error) {
// 检查是否满足4+2 的RS码需求
// 不满足,返回错误
if len(locateInfo)+len(dataServers) != ALL_SHARDS {
return nil, fmt.Errorf("dataServers number mismatch")
}

// 创建长度为6的io.Reader数组readers,用于读取6个分片的数据
readers := make([]io.Reader, ALL_SHARDS)
// 遍历6个分片的id
for i := 0; i < ALL_SHARDS; i++ {
// 在locateinfo中查找分片所在的数据服务节点地址
server := locateInfo[i]
// 如果某个分片id相对的数据服务节点地址为空,说明该分片丢失
// 取一个随机数据服务节点补上
if server == "" {
locateInfo[i] = dataServers[0]
dataServers = dataServers[1:]
continue
}
// 数据服务节点存在,调用objectstream.NewGetStream打开一个对象读取流用于读取该分片数据
reader, e := objectstream.NewGetStream(server, fmt.Sprintf("%s.%d", hash, i))
// 打开的流保存在readers数组相应的元素中
if e == nil {
readers[i] = reader
}
}

writers := make([]io.Writer, ALL_SHARDS)
perShard := (size + DATA_SHARDS - 1) / DATA_SHARDS
var e error
// 遍历readers
for i := range readers {
// 第一个次遍历出现nil的情况
// 1.该分片数据服务节点地址为空
// 2.数据服务节点存在但打开流失败
if readers[i] == nil {
// 某个元素为nil,调用NewTempPutStream创建相应的临时对象写入流用于恢复分片
// 打开的流保存到writers数组相应元素中
writers[i], e = objectstream.NewTempPutStream(locateInfo[i], fmt.Sprintf("%s.%d", hash, i), perShard)
if e != nil {
return nil, e
}
}
}
// 处理完成,readers和writers数组形成互补关系
// 对于某个分片id,要么在readers中存在相应的读取流,要么在writers中存在相应的写入流
// 将两个数组以及对象的大小size作为参数调用NewDecoder生成decoder结构体的指针dec
dec := NewDecoder(readers, writers, size)
// 将dec作为RSGetStream的内嵌结构体返回
return &RSGetStream{dec}, nil
}

type decoder struct {
readers []io.Reader
writers []io.Writer
enc reedsolomon.Encoder // 接口,用于RS解码
size int64 // 对象的大小
cache []byte // cache用于缓存读取的数据
cacheSize int // cachSize
total int64 // 当前读了多少字节
}

func NewDecoder(readers []io.Reader, writers []io.Writer, size int64) *decoder {
// 调用reedsolomon.New创建4+2 RS码的解码器enc
enc, _ := reedsolomon.New(DATA_SHARDS, PARITY_SHARDS)
// 设置decoder结构体中相应属性后返回
return &decoder{readers, writers, enc, size, nil, 0, 0}
}

对象GET过程中读取对象数据的流调用的是rs.RSGetStream,实际调用的Read方法也发生了变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
func (d *decoder) Read(p []byte) (n int, err error) {
// 当cache中没有更多数据时会调用getData方法获取数据
if d.cacheSize == 0 {
// getData返回的e不为nil,说明没能获取更多数据
e := d.getData()
if e != nil {
// 返回0和e通知调用方
return 0, e
}
}
// 函数参数p的数组长度
length := len(p)
if d.cacheSize < length {
// length超出当前缓冲区的数据大小,令length等于缓存的数据大小
length = d.cacheSize
}
// 将缓存中length长度的数据复制给输入参数p,然后调整缓存,仅保留剩下的部分
d.cacheSize -= length
copy(p, d.cache[:length])
d.cache = d.cache[length:]
// 返回length,通知调用方本次读取一共有多少数据被复制到p中
return length, nil
}

func (d *decoder) getData() error {
// 先判断当前解码的数据大小是否等于对象原始大小
// 如果已经相等,说明所有数都已经被读取,返回io.EOF
if d.total == d.size {
return io.EOF
}
// 如果还有数需要读取,创建一个长度为6的数组
// 每个元素都是一个字节数组,用于保存相应分片中读取的数据
shards := make([][]byte, ALL_SHARDS)
repairIds := make([]int, 0)
// 遍历6个shards
for i := range shards {
// 若某个分片对应的reader为nil,说明该分片已经丢失
if d.readers[i] == nil {
// 在repairIds中添加该分片的id
repairIds = append(repairIds, i)
} else {
// 对应的reader不为nil,那么对应的shards需要被初始化为一个长度为8000的字节数组
shards[i] = make([]byte, BLOCK_PER_SHARD)
// 调用io.ReadFull从reader中完整读取8000字节的数据保存在shards中。
n, e := io.ReadFull(d.readers[i], shards[i])
// 如果发生了非EOF失败,该shards被置为nil
if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF {
shards[i] = nil
// 读取数据长度n不到8000字节,将shards实际的长度缩减为n
} else if n != BLOCK_PER_SHARD {
shards[i] = shards[i][:n]
}
}
}
// 调用成员enc的Reconstruct方法尝试将被置为nil的shards恢复出来
e := d.enc.Reconstruct(shards)
// 若发生错误,说明对象遭到不可恢复的破坏,返回错误给上层
if e != nil {
return e
}
// 恢复成功,6个shards中都保存了对应分片的正确数据
// 遍历repairIds,将需要恢复的分片数据写入到相应的writer
for i := range repairIds {
id := repairIds[i]
d.writers[id].Write(shards[id])
}
// 遍历4个数据分片
for i := 0; i < DATA_SHARDS; i++ {
shardSize := int64(len(shards[i]))
if d.total+shardSize > d.size {
shardSize -= d.total + shardSize - d.size
}
// 将每个分片中的数据添加到缓存cache中
d.cache = append(d.cache, shards[i][:shardSize]...)
// 修改缓存当前的大小cacheSize
d.cacheSize += int(shardSize)
// 修改当前已经读取的全部数据的大小total
d.total += shardSize
}
return nil
}
数据服务

数据服务的locate包:由于在磁盘上保存的对象文件名格式发生了变化,locate的实现也有相应变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
var objects = make(map[string]int)
var mutex sync.Mutex

// 告知某个对象是否存在,同时告知本节点保存的是该对象哪个分片
func Locate(hash string) int {
mutex.Lock()
id, ok := objects[hash]
mutex.Unlock()
if !ok {
return -1
}
return id
}

// 将对象及其分片id加入缓存
func Add(hash string, id int) {
mutex.Lock()
objects[hash] = id
mutex.Unlock()
}

func Del(hash string) {
mutex.Lock()
delete(objects, hash)
mutex.Unlock()
}

func StartLocate() {
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
q.Bind("dataServers")
c := q.Consume()
for msg := range c {
// 读取来自接口服务需要定位的对象散列值hash
hash, e := strconv.Unquote(string(msg.Body))
if e != nil {
panic(e)
}
// 调用Locate获得分片id
id := Locate(hash)
if id != -1 {
// id不为-1,将自身的节点监听地址和id打包成一个types.Locate Message结构体作为反馈消息发送
q.Send(msg.ReplyTo, types.LocateMessage{Addr: os.Getenv("LISTEN_ADDRESS"), Id: id})
}
}
}

func CollectObjects() {
// 获取存储目录下的所有文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")
for i := range files {
// 以'.'分割其基本文件名,获得对象的散列值hash以及分片id
file := strings.Split(filepath.Base(files[i]), ".")
if len(file) != 3 {
panic(files[i])
}
hash := file[0]
id, e := strconv.Atoi(file[1])
if e != nil {
panic(e)
}
// 将对象的散列值hash以及分片id加入定位缓存
objects[hash] = id
}
}

数据服务的temp包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (t *tempInfo) hash() string {
s := strings.Split(t.Name, ".")
return s[0]
}

func (t *tempInfo) id() int {
s := strings.Split(t.Name, ".")
id, _ := strconv.Atoi(s[1])
return id
}

func commitTempObject(datFile string, tempinfo *tempInfo) {
// 读取临时对象的数据并计算散列值<hash of shard X>
f, _ := os.Open(datFile)
d := url.PathEscape(utils.CalculateHash(f))
f.Close()
os.Rename(datFile, os.Getenv("STORAGE_ROOT")+"/objects/"+tempinfo.Name+"."+d)
// 调用locate.Add,以<hash>为键,分片的id为值添加进定位缓存
locate.Add(tempinfo.hash(), tempinfo.id())
}

数据服务的objects包:

getFile函数发生改变:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func getFile(name string) string {
// 在$STORAGE_ROOT/objects/目录下查找所有以<hash>.X开头的文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/" + name + ".*")
// 找不到,返回空字符串
if len(files) != 1 {
return ""
}
// 找到之后计算散列值
file := files[0]
h := sha256.New()
sendFile(h, file)
d := url.PathEscape(base64.StdEncoding.EncodeToString(h.Sum(nil)))
hash := strings.Split(file, ".")[2]
// 如果与<hash of shard X>的值不匹配则删除该对象并返回空字符串
if d != hash {
log.Println("object hash mismatch, remove", file)
locate.Del(hash)
os.Remove(file)
return ""
}
// 否则返回对象的文件名
return file
}

测试

本版本所有代码及测试用例可见添加数据冗余和修复的分布式对象存储系统

参考

《分布式对象存储—原理、架构及Go语言实现》

HTTP基本概念

定义:超文本传输协议

  • 协议:一个用在计算机中的协议。使用计算机语言确定了一种计算机之间通信的规范(两个及以上的参与者)以及相关的控制和错误处理方式。
  • 传输:两点之间的双向数据通信(可以服务器到服务器),该过程中还允许有中转或接力。(在 HTTP 里,需要中间人遵从 HTTP 协议,只要不打扰基本的数据传输,就可以添加任意额外的东西)
  • 超文本:传输的内容是超文本(是文字、图片、视频等的混合体,能从一个超文本跳转到另外一个超文本,eg. html

常见的状态码

  • 1xx:该类状态码属于提示信息,是协议处理中的一种中间状态,实际用到的比较少。
  • 2xx :该类状态码表示服务器成功处理了客户端的请求。
    • 200 OK是最常见的成功状态码,表示一切正常。如果是HEAD 请求,服务器返回的响应头都会有 body 数据
    • 204 No Content也是常见的成功状态码,与 200 OK 基本相同,但响应头没有 body 数据
    • 206 Partial Content应用于 HTTP分块下载或断点续传,表示响应返回的 body 数据并不是资源的全部,而是其中的一部分,也是服务器处理成功的状态。
  • 3xx:该 类状态码表示客户端请求的资源发生了变动,需要客户端用新的 URL 重新发送请求获取资源,也就是重定向
    • 301 Moved Permanentl表示永久重定向,说明请求的资源已经不存在了,需改用新的 URL 再次访问
    • 302 Moved Temporarily表示临时重定向,说明请求的资源还在,但暂时需要用另一个 URL 来访问
    • 301 和 302 都会在响应头里使用字段 Location,指明后续要跳转的 URL,浏览器会自动重定向新的 URL。
    • 304 Not Modified不具有跳转的含义,表示资源未修改,重定向已存在的缓冲文件,也称缓存重定向,用于缓存控制。
  • 4xx :该类状态码表示客户端发送的报文有误,服务器无法处理,也就是错误码的含义。
    • 400 Bad Request表示客户端请求的报文有错误,但只是个笼统的错误。
    • 403 Forbidden表示服务器禁止访问资源,并不是客户端的请求出错。
    • 404 Not Found表示请求的资源在服务器上不存在或未找到,所以无法提供给客户端
  • 5xx:该 类状态码表示客户端请求报文正确,但是服务器处理时内部发生了错误,属于服务器端的错误码
    • 500 Internal Server Error与 400 类型,是个笼统通用的错误码,服务器发生了什么错误并不知道。
    • 501 Not Implemented表示客户端请求的功能还不支持,类似“即将开业,敬请期待”的意思。
    • 502 Bad Gateway通常是服务器作为网关或代理时返回的错误码,表示服务器自身工作正常,访问后端服务器发生了错误。
    • 503 Service Unavailable表示服务器当前很忙,暂时无法响应服务器,类似“网络服务正忙,请稍后重试”的意思。

HTTP常用字段

  • Host:客户端发送请求时,用来指定服务器的域名Host: www.A.com。有了 Host 字段,就可以将请求发往[同一台]服务器上的不同网站。
  • Connection: 该字段最常用于客户端要求服务器使用 TCP 持久连接,以便其他请求复用HTTP/1.1 版本的默认连接都是持久连接,但为了兼容老版本的 HTTP,需要指定 Connection 首部字段的值为 Keep-Alive
  • Content-Length :服务器在返回数据时,会有 Content-Length 字段Content-Length: 1000表明本次回应的数据长度
  • Content-Type 字段用于服务器回应时,告诉客户端,本次数据是什么格式.eg.Content-Type: text/html; charset=utf-8,该类型表明,发送的是网页,而且编码是UTF-8.
    • 客户端请求的时候,可以使用 Accept 字段声明自己可以接受哪些数据格式Accept: */*表明客户端声明自己可以接受任何格式的数据。
  • Content-Encoding :该字段说明数据的压缩方法。表示服务器返回的数据使用了什么压缩格式eg.Content-Encoding: gzip表示服务器返回的数据采用了gzip 方式压缩,告知客户端需要用此方式解压。
    • 客户端在请求时,用 Accept-Encoding 字段说明自己可以接受哪些压缩方法。eg.Accept-Encoding: gzip, deflate

Get与Post

基本功能

  • Get 方法的含义是请求从服务器获取资源,这个资源可以是静态的文本、页面、图片视频等
  • POST 方法则是相反操作,它向 URI 指定的资源提交数据,数据就放在报文的 body 里。

安全和幂等

  • 在 HTTP 协议里,安全是指请求方法不会破坏服务器上的资源。
  • 幂等意思是多次执行相同的操作,结果都是相同的

很明显 GET 方法就是安全且幂等的,因为它是只读操作,无论操作多少次,服务器上的数据都是安全的,且每次的结果都是相同的。

POST 因为是新增或提交数据的操作,会修改服务器上的资源,所以是不安全的,且多次提交数据就会创建多个资源,所以不是幂等的。

HTTP特性

HTTP的优点

  • 简单:基本的报文格式就是 header + body,头部信息也是 key-value 简单文本的形式,易于理解,降低学习和使用的门槛。
  • 灵活和易于扩展:各类请求方法、URI/URL、状态码、头字段等每个组成要求都没有被固定死,允许开发人员自定义和扩充HTTP工作在应用层,则它下层可以随意变化
  • 应用广泛/跨平台:从台式机的浏览器到手机上的各种 APP,从看新闻、刷贴吧到购物、理财、游戏,HTTP 的应用无处不在,同时天然具有跨平台的优越性。

HTTP的缺点

  • 无状态
    • 好处:服务器不会去记忆HTTP 的状态,所以不需要额外的资源来记录状态信息,这能减轻服务器的负担,能够把更多的 CPU 和内存用来对外提供服务
    • 坏处:服务器没有记忆能力,它在完成有关联性的操作时会非常麻烦。例如:网上购物过程中,登录->添加购物车->下单->结算->支付,这系列操作都要知道用户的身份才行。但服务器不知道这些请求是有关联的,每次都要问一遍身份信息。
    • 解决办法:Cookie 技术。通过在请求和响应报文中写入Cookie 信息来控制客户端的状态

cookie

  • 明文传输:虽然为调试等工作带来便利,但该传输方式相当于信息裸奔。在传输的漫长的过程中,信息的内容都毫无隐私可言,很容易就能被窃取,如果里面有账号密码信息,则可能出现账号被盗等情况。
  • 不安全
    • 通信使用明文(不加密),内容可能会被窃听。比如,账号信息容易泄漏
    • 不验证通信方的身份,因此有可能遭遇伪装。比如,访问假的淘宝、拼多多。
    • 无法证明报文的完整性,所以有可能已遭篡改。比如,网页上植入垃圾广告

HTTP/1.1的性能

HTTP 协议基于 TCP/IP,并且使用了请求 - 应答的通信模式,所以性能的关键就在这里。

  • 长连接。早期 HTTP/1.0 性能上的一个很大的问题,那就是每发起一个请求,都要新建一次 TCP 连接(三次握手),而且是串行请求,做了无畏的 TCP 连接建立和断开,增加了通信开销。为了解决该 问题,HTTP/1.1 提出了长连接的通信方式,也叫持久连接。这种方式的好处在于减少了 TCP 连接的重复建立和断开所造成的额外开销,减轻了服务器端的负载

    • 持久连接的特点是,只要任意一端没有明确提出断开连接,则保持 TCP 连接状态
    • 长连接与短连接
  • 管道网络传输:HTTP/1.1 采用了长连接的方式,这使得管道(pipeline)网络传输成为可能。即可在同一个 TCP 连接里面,客户端可以发起多个请求,只要第一个请求发出去了,不必等其回来,就可以发第二个请求出去,可以减少整体的响应时间。举例来说,客户端需要请求两个资源。以前的做法是,在同一个TCP连接里面,先发送 A 请求,然后等待服务器做出回应,收到后再发出 B 请求。管道机制则是允许浏览器同时发出 A 请求和 B 请求。

  • 队头阻塞:但是服务器还是按照顺序,先回应 A 请求,完成后再回应 B 请求。要是前面的回应特别慢,后面就会有许多请求排队等着。这称为队头堵塞

HTTPSHTTP

HTTPHTTPS的区别

  • HTTP 是超文本传输协议,信息是明文传输,存在安全风险的问题。HTTPS 则解决 HTTP 不安全的缺陷,在 TCPHTTP网络层之间加入了SSL/TLS 安全协议,使得报文能够加密传输。

  • HTTP 连接建立相对简单, TCP 三次握手之后便可进行 HTTP 的报文传输。而 HTTPSTCP 三次握手之后,还需进行 SSL/TLS 的握手过程,才可进入加密报文传输。

  • HTTP 的端口号是 80,HTTPS 的端口号是 443。

  • HTTPS 协议需要向 CA(证书权威机构)申请数字证书,来保证服务器的身份是可信的。

HTTPS解决窃听、篡改、冒充采取的措施

  • 混合加密的方式实现信息的机密性,解决了窃听的风险。
    • 在通信建立前采用非对称加密的方式交换会话秘钥,后续就不再使用非对称加密。(非对称加密使用两个密钥:公钥和私钥,公钥可以任意分发而私钥保密,解决了密钥交换问题但速度慢
    • 在通信过程中全部使用对称加密的会话秘钥的方式加密明文数据(对称加密只使用一个密钥,运算速度快,密钥必须保密,无法做到安全的密钥交换)
  • 摘要算法的方式来实现完整性,它能够为数据生成独一无二的”指纹”,指纹用于校验数据的完整性,解决了篡改的风险。
    • 客户端在发送明文之前会通过摘要算法算出明文的指纹,发送的时候把指纹 + 明文一同加密成密文后,发送给服务器,服务器解密后,用相同的摘要算法算出发送过来的明文,通过比较客户端携带的指纹和当前算出的指纹做比较,若指纹相同,说明数据是完整的。
  • 将服务器公钥放入到数字证书中,解决了冒充的风险。
    • 客户端先向服务器端索要公钥,然后用公钥加密信息,服务器收到密文后,用自己的私钥解密。
    • 如何保证公钥不被篡改:需要借助第三方权威机构 CA (数字证书认证机构),将服务器公钥放在数字证书(由数字证书认证机构颁发)中,只要证书是可信的,公钥就是可信的。
    • CA

HTTPS建立的过程

  • 客户端向服务器索要并验证服务器的公钥
  • 双方协商产生会话秘钥
  • 双方采用会话秘钥进行加密通信。

前两步也就是 SL/TLS的建立过程,也就是握手阶段

https握手过程

SSL/TLS 协议建立的详细流程:

1.ClientHello

首先,由客户端向服务器发起加密通信请求,也就是 ClientHello 请求。

在这一步,客户端主要向服务器发送以下信息:

  • 客户端支持的SSL/TLS 协议版本,如 TLS 1.2 版本。

  • 客户端生产的随机数(Client Random),后面用于生产会话秘钥。

  • 客户端支持的密码套件列表,如 RSA加密算法。

2. SeverHello

服务器收到客户端请求后,向客户端发出响应,也就是 SeverHello。服务器回应的内容有如下内容:

  • 确认SSL/ TLS 协议版本,如果浏览器不支持,则关闭加密通信。

  • 服务器生产的随机数(Server Random),后面用于生产会话秘钥。

  • 确认的密码套件列表,如 RSA 加密算法。

  • 服务器的数字证书。

3.客户端回应

客户端收到服务器的回应之后,首先通过浏览器或者操作系统中的 CA 公钥,确认服务器的数字证书的真实性。如果证书没有问题,客户端会从数字证书中取出服务器的公钥,然后使用它加密报文,向服务器发送如下信息:

  • 一个随机数(pre-master key)。该随机数会被服务器公钥加密。

  • 加密通信算法改变通知,表示随后的信息都将用会话秘钥加密通信。

  • 客户端握手结束通知,表示客户端的握手阶段已经结束。这一项同时把之前所有内容的发生的数据做个摘要,用来供服务端校验。

上面第一项的随机数是整个握手阶段的第三个随机数,这样服务器和客户端就同时有三个随机数,接着就用双方协商的加密算法,各自生成本次通信的会话秘钥。

4. 服务器的最后回应

服务器收到客户端的第三个随机数(pre-master key)之后,通过协商的加密算法,计算出本次通信的会话秘钥。然后,向客户端发生最后的信息:

  • 加密通信算法改变通知,表示随后的信息都将用会话秘钥加密通信。

  • 服务器握手结束通知,表示服务器的握手阶段已经结束。这一项同时把之前所有内容的发生的数据做个摘要,用来供客户端校验。

至此,整个 SSL/TLS 的握手阶段全部结束。接下来,客户端与服务器进入加密通信,就完全是使用普通的 HTTP协议,只不过用会话秘钥加密内容。

HTTP/1.1、HTTP/2、HTTP/3演变

HTTP/1.1 相比 HTTP/1.0性能上的改进:

  • 使用 TCP 长连接的方式改善了HTTP/1.0 短连接造成的性能开销。
  • 支持管道(pipeline)网络传输,只要第一个请求发出去了,不必等其回来,就可以发第二个请求出去,可以减少整体的响应时间。

存在的性能瓶颈:

  • 请求 / 响应头部(Header)未经压缩就发送,首部信息越多延迟越大。只能压缩 Body 的部分
  • 发送冗长的首部。每次互相发送相同的首部造成的浪费较多
  • 服务器是按请求的顺序响应的,如果服务器响应慢,会招致客户端一直请求不到数据,也就是队头阻塞
  • 没有请求优先级控制
  • 请求只能从客户端开始,服务器只能被动响应。

HTTP/2的优化

HTTP/2协议是基于 HTTPS 的,所以 HTTP/2 的安全性也是有保障的。

HTTP/2 相比 HTTP/1.1 性能上的改进:

1. 头部压缩

HTTP/2压缩头(Header),如果同时发出多个请求,他们的头是一样的或是相似的,那么,协议会帮助消除重复的部分。使用的是 HPACK 算法:在客户端和服务器同时维护一张头信息表,所有字段都会存入这个表,生成一个索引号,以后就不发送同样字段了,只发送索引号,这样就提高速度了。

2. 二进制格式

HTTP/2不再像HTTP/1.1里的纯文本形式的报文,而是全面采用了二进制格式。

头信息和数据体都是二进制,并且统称为帧(frame)头信息帧和数据帧

因为计算机只懂二进制,那么收到报文后,无需再将明文的报文转成二进制,而是直接解析二进制报文,这增加了数据传输的效率

3. 数据流

HTTP/2 的数据包不是按顺序发送的,同一个连接里面连续的数据包,可能属于不同的回应。因此,必须要对数据包做标记,指出它属于哪个回应。

每个请求或回应的所有数据包,称为一个数据流(Stream)。

每个数据流都标记着一个独一无二的编号,其中规定客户端发出的数据流编号为奇数, 服务器发出的数据流编号为偶数

客户端还可以指定数据流的优先级。优先级高的请求,服务器就先响应该请求。

4. 多路复用

HTTP/2是可以在一个连接中并发多个请求或回应,而不用按照顺序一一对应

移除了 HTTP/1.1 中的串行请求,不需要排队等待,也就不会再出现队头阻塞问题,降低了延迟,大幅度提高了连接的利用率

举例来说,在一个 TCP 连接里,服务器收到了客户端 A 和 B 的两个请求,如果发现 A 处理过程非常耗时,于是就回应 A 请求已经处理好的部分,接着回应 B 请求,完成后,再回应 A 请求剩下的部分。

5. 服务器推送

HTTP/2还在一定程度上改善了传统的请求 - 应答工作模式,服务不再是被动地响应,也可以主动向客户端发送消息。

举例来说,在浏览器刚请求 HTML 的时候,就提前把可能会用到的 JS、CSS 文件等静态资源主动发给客户端,减少延时的等待,也就是服务器推送(Server Push,也叫 Cache Push)。

HTTP/2的缺陷

多个HTTP 请求在复用一个 TCP 连接,下层的 TCP 协议是不知道有多少个 HTTP 请求的。

所以一旦发生了丢包现象,就会触发 TCP 的重传机制,这样在一个 TCP 连接中的所有的 HTTP 请求都必须等待这个丢了的包被重传回来

  • HTTP/1.1 中的管道( pipeline)传输中如果有一个请求阻塞了,那么队列后请求也统统被阻塞住了
  • HTTP/2 多请求复用一个TCP连接,一旦发生丢包,就会阻塞住所有的 HTTP 请求。

这都是基于TCP 传输层的问题,所以 HTTP/3 把 HTTP 下层的 TCP 协议改成了 UDP!

UDP是不可靠传输的,但基于 UDPQUIC 协议 可以实现类似 TCP 的可靠性传输。

  • QUIC有自己的一套机制可以保证传输的可靠性的。当某个流发生丢包时,只会阻塞这个流,其他流不会受到影响
  • TL3 升级成了最新的 1.3 版本,头部压缩算法也升级成了 QPack
  • HTTPS 要建立一个连接,要花费 6 次交互,先是建立三次握手,然后是 TLS/1.3 的三次握手。QUIC 直接把以往的 TCP 和 TLS/1.3 的 6 次交互合并成了 3 次,减少了交互次数

quic

所以, QUIC 是一个在 UDP 之上的 TCP + TLS + HTTP/2多路复用的协议。

参考

https://mp.weixin.qq.com/s/bUy220-ect00N4gnO0697A

使用带https访问的慕布托管博客图片

之前博客 一直使用有道云笔记来保存图片,对于保存的每张图片都可以生成一个唯一的URL,但实际访问的时候提供的只是http的访问。浏览器的进步(Chrome为代表)使得http协议变得不安全,之前写的博客中的http访问的图片在浏览器中居然不能正常显示了(虽然火狐可以,但同步还是Chrome好用),找了好久都没有找到合适的图片保存库。

今天意外发现慕布这款在线编辑软件可以给保存的图片一个带httpsURL,从而满足了浏览器强制的https访问。遂将所有博客的图片更新了一下。

有需要的同学可根据需要注册使用慕布。图片保存到文件后进行共享,将共享的链接打开后右键可以获取图片的https地址,然后就可以在自己的博客中使用啦。(慕布还支持脑图以及markdown,其实还是挺好用的)。

关于httphttps,后面写个博客来讲清楚吧(最近要写导师的本子,还有科研任务和找工作的事情,忙的有点混乱了)。

使用来必力为博客添加评论

之前搭建博客评论功能的时候使用的是gitalk, 但使用gitalk必须创建issue来进行评论,使用起来并不是很方便。

经过网上查找,发现了一款好用的第三方评论工具:livere(来必力)。这款工具只需要注册并得到一个livere_uid值就可以在自己的网页上添加评论功能,登录后台管理还可以对评论进行管理。白嫖可真相!!!

现在说下主要步骤:

1
2
3
4
5
打开 https://livere.com/ 网站进行用户注册(注册的时候是韩文,可以有道或者百度翻译对应一下内容,其实也就是邮箱,名称和密码这几项),注册完成后选择【安装】,然后点击“现在安装”。

在“现在安装”页面填写相关的信息,然后获取授权ID码(主要需要填写自己的网站链接)

获取到授权ID码之后会打开主题目录下的配置文件,修改“livere_uid”的值为授权ID码

livere_uid获取

1
在自己的博客主体目录下找到_config.yml文件(我的是~/blog/dongshifu/themes/next/_config.yml)然后找到livere_uid选项,将livere管理页面中的代码管理打开并找到data-uid,复制到livere_uid后面保存即可

_config.yaml配置

重新部署自己的网站到github就可以看到添加的评论功能了。(授权的时候可以用QQ,微信等方式,真的是很方便了)

博客评论功能添加

设置git的用户名和邮箱

一直在使用自己搭建的github.io来保存博客,虽然更新比较慢,也算是记录了自己的学习过程。

今天在使用的时候意外发现自己在git上的contribution一直没有,鉴于自己时不时会提交博客记录,遂发现不对,难道是git出错了??明显是不可能的,经过查看,之前的大部分提交居然都是用实验室小伙伴的git帐号提交的。估计是之前小伙伴用我的电脑登录了他的git帐号吧,发现之后赶快改了一下git的帐号设置。

1
2
3
4
5
[~/path/to/repo]$ git config user.name "dongsihfu"
[~/path/to/repo]$ git config user.email 自己的git邮箱
This change will only affect future commits. Past commits will retain the username and address they were committed with.

设置好以后就用 :git config --list 命令查看。

数据校验和去重

上一个版本加入了对象存储服务的元数据服务。有了元数据服务,可以在不实际删除数据的情况下实现对象的删除功能,可以实现对象的版本控制,可以确保数据对象的一致性和GET方法的幂等性。这些都是因为元数据服务可以保存对象的元数据。

另外,上一个版本的接口服务要求客户端提供对象的散列值作为全局唯一的标识符,也就是数据服务存储的对象名,但实现过程中并没有对这个散列值进行校验,用户提供的对象散列值和数据有可能是不一致的,产生不一致的原因有很多,本版本需要解决该问题。

去重基本概念

去重是一种消除重复数据多余副本的数据压缩技术。对于一个对象存储系统来说,通常都会有来自不同(或相同)用户的大量重复数据。如果没有去重,每一份重复的数据都会占据存储空间。去重能够让重复数据在系统中只保留一个实体,是一个极好的节省存储空间、提升存储利用率的技术。

一个很常见的去重例子是邮件的转发。假设某个邮件内含一个大小为1MB的附件,如果该邮件被转发了100次,那么邮件服务器上就保存了100个一模一样的附件,总共占用100MB的空间。每次管理员对该邮件服务器进行云备份,都会上传100个一模一样的附件对象到对象存储系统。如果这个对象存储系统使用了数据去重技术,那么无论这个管理员备份多少次,在对象存储系统中,这个附件所代表的对象就只有一份。

本项目的去重基于对象的全局唯一标识符,也就是通过对该对象的散列值进行单例检查(Single Instance Storage,SIS)来实现。具体来说,每次当接口服务节点接收到对象的PUT请求之后,都会进行一次定位,如果PUT对象的散列值已经存在于数据服务中,就会跳过之后的数据服务PUT请求,直接生成该对象的新版本插入元数据服务;如果PUT对象的散列值不存在于数据服务中,说明这是一个全新的对象。接口服务会读取PUT请求的正文,写入数据服务。

但是在实现去重之前,还有一个步骤要做,就是数据校验

数据校验的原因

一般来说,客户端上传的数据不一致可能由以下几种情况导致。

  • 客户端是一个恶意客户端,故意上传不一致的数据给服务器。
  • 客户端有bug,计算出来的数据是错误的。
  • 客户端计算的数据正确,但是传输过程中发生了错误。

对象存储是一个服务,如果全盘接收来自客户端的数据,而不对这个散列值进行校验,那么恶意客户端就可以通过随意编造散列值的方式上传大量内容和散列值不符的对象来污染数据;且即使是善意的客户端也难免因为软件错误或上传的数据损坏而导致对象数据和散列值不符。如果不对数据进行校验,允许错误的对象数据被保存在系统中,那么当另一个用户上传的数据的散列值恰好跟错误数据的相同时,就会因为SIS检查而导致其数据并没有被真正上传。然后当这个用户需要下载自己的对象时,下载到的就会是那个错误的数据。

为了防止这种情况发生,必须进行数据校验,验证客户端提供的散列值和服务器根据对象数据计算出来的散列值是否一致。有了数据校验,才能确保数据服务中保存的对象数据和散列值一致,然后放心对后续上传的对象根据散列值进行去重。

现在的主要问题:之前的版本一直都是以数据流的形式处理来自客户端的请求,接口服务调用io.Copy 从对象PUT请求的正文中直接读取对象数据并写入数据服务。这是因为客户端上传的对象大小可能超出接口服务节点的内存,不能把整个对象读入内存后再进行处理。而现在必须等整个对象都上传完以后才能算出散列值,然后才能决定是否要存进数据服务。这就形成了一个悖论:在客户端的对象完全上传完毕之前,不知道要不要把这个对象写入数据服务;但是等客户端的对象上传完毕之后再开始写入又做不到,因为对象可能太大,内存里根本放不下。

最简单的解决办法只需要在数据服务节点进行数据校验,将校验一致的对象保留,不一致的删除不就可以了吗? 这样的设计在版本是没问题的。在数据服务节点上进行数据校验的前提是数据服务节点上的数据和用户上传的数据完全相同,本版本的设计满足这个前提。但是在后续版本中会看到,随着对象存储系统的不断完善,最终保存在数据服务节点上的对象数据和用户上传的对象数据可能截然不同。那时就无法在数据服务节点上进行数据校验。数据校验这一步骤必须在接口服务节点完成。

实现数据校验的方法

为了真正解决上述矛盾,需要在数据服务上提供对象的缓存功能,接口服务不需要将用户上传的对象缓存在自身节点的内存里,而是传输到某个数据服务节点的一个临时对象里,并在传输数据的同时计算其散列值。当整个数据传输完毕以后,散列值计算也同步完成,如果一致,接口节点需要将临时对象转成正式对象:如果不一致,则将临时对象删除。

为数据服务加入缓存功能

本版本接口服务功能没有发生变化,数据服务删除了objects接口的PUT方法并新添加了temp接口的POST,PATCH,PUT,DELETE4种方法。

数据服务的REST接口
1
POST /temp/<hash>

请求头部

  • Size:<需要缓存的对象的大小>

响应正文

  • uuid

    接口服务以POST方法访问数据服务temp接口,在 URL的<hash>部分指定对象散列值,并提供一个名为size的HTTP请求头部,用于指定对象的大小。这会在数据服务节点上创建一个临时对象。该接口返回一个随机生成的uuid用以标识这个临时对象,后续操作通过uuid进行。

    1
    PATCH /temp/<uuid>

    请求正文

  • 对象的内容

接口服务以PATCH方法访问数据服务节点上的临时对象,HTTP请求的正文会被写入该临时对象。

1
PUT /temp/<uuid>

接口服务数据校验一致,调用PUT方法将该临时文件转正。

1
DELETE /temp/<uuid>

接口服务数据校验不一致,调用DELETE方法将临时文件删除

对象PUT流程

加入数据校验和去重的PUT流程

客户端在PUT对象时需要提供对象的散列值和大小。接口服务首先在数据服务层定位散列值,如果已经存在,则直接添加元数据;如果不存在,则用POST方法访问数据服务节点的temp接口,提供对象的散列值和大小。数据服务节点返回一个uuid。然后接口服务用PATCH方法将客户端的数据上传给数据服务,同时计算数据的散列值客户端数据上传完毕后核对计算出的散列值和客户端提供的散列值是否一致,如果一致则用PUT方法将临时对象转正;否则用DELETE方法删除临时对象。临时对象的内容首先被保存在数据服务本地磁盘的$STORAGE_ROOT/temp/<uuid>.dat文件,转正后会被重命名为$STORAGE_ROOTlobjects/<hash>文件。

具体实现

接口服务

接口服务主要修改了objects.put部分的相关函数。

objects.put相关函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
func put(w http.ResponseWriter, r *http.Request) {
// 先从HTTP请求头部获取对象的散列值
hash := utils.GetHashFromHeader(r.Header)
if hash == "" {
log.Println("missing object hash in digest header")
w.WriteHeader(http.StatusBadRequest)
return
}
// 从URL中获取对象的大小
size := utils.GetSizeFromHeader(r.Header)
// 以散列值和size作为参数调用stroreObject
// 新实现的storeObject需要在一开始就确定临时对象大小
c, e := storeObject(r.Body, hash, size)
if e != nil {
log.Println(e)
w.WriteHeader(c)
return
}
if c != http.StatusOK {
w.WriteHeader(c)
return
}

name := strings.Split(r.URL.EscapedPath(), "/")[2]
e = es.AddVersion(name, hash, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
}
}

func storeObject(r io.Reader, hash string, size int64) (int, error) {
// 首先调用locate.Exist方法定位对象的散列值
// 如果已经存在,跳过后续上传操作直接返回200 OK
if locate.Exist(url.PathEscape(hash)) {
return http.StatusOK, nil
}

// 不存在,调用putStream生成对象的写入流stream用于写入
stream, e := putStream(url.PathEscape(hash), size)
if e != nil {
return http.StatusInternalServerError, e
}

// 两个输入参数,分别是作为io.Reader的r和io.Writer的stream
// 返回的reader也是一个io.Reader
reader := io.TeeReader(r, stream)
// reader被读取的时候,实际的内容读取自r,同时也会写入stream
// 用utils.CalculateHash从reader中读取数据的同时也写入了stream
d := utils.CalculateHash(reader)
// 计算出来的散列值和hash做比较
// 不一致则调用stream.Commit(false)删除临时对象,并返回400 Bad Request
if d != hash {
stream.Commit(false)
return http.StatusBadRequest, fmt.Errorf("object hash mismatch, calculated=%s, requested=%s", d, hash)
}
// 一致则调用stream.Commit(true)将临时对象转正并返回200 OK
stream.Commit(true)
return http.StatusOK, nil
}

func CalculateHash(r io.Reader) string {
h := sha256.New()
io.Copy(h, r)
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

func putStream(hash string, size int64) (*objectstream.TempPutStream, error) {
server := heartbeat.ChooseRandomDataServer()
if server == "" {
return nil, fmt.Errorf("cannot find any dataServer")
}

// 数据服务的temp接口代替了原先的对象PUT接口,调用objectstream.NewTempPutStream
return objectstream.NewTempPutStream(server, hash, size)
}

objectstream.TempPutStream相关代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
type TempPutStream struct {
Server string
Uuid string
}

func NewTempPutStream(server, object string, size int64) (*TempPutStream, error) {
// 根据数据服务的节点地址server,对象散列值hash和对象大小size
// 以POST方法访问数据服务的temp接口获得uuid
request, e := http.NewRequest("POST", "http://"+server+"/temp/"+object, nil)
if e != nil {
return nil, e
}
request.Header.Set("size", fmt.Sprintf("%d", size))
client := http.Client{}
response, e := client.Do(request)
if e != nil {
return nil, e
}
uuid, e := ioutil.ReadAll(response.Body)
if e != nil {
return nil, e
}
// 将server和uuid保存在TempPutStrem结构体的相应属性中返回
return &TempPutStream{server, string(uuid)}, nil
}

func (w *TempPutStream) Write(p []byte) (n int, err error) {
// 根据Server和Uuid属性的值,以PATCH方法访问数据服务的temp接口,将需要写入的数据上传
request, e := http.NewRequest("PATCH", "http://"+w.Server+"/temp/"+w.Uuid, strings.NewReader(string(p)))
if e != nil {
return 0, e
}
client := http.Client{}
r, e := client.Do(request)
if e != nil {
return 0, e
}
if r.StatusCode != http.StatusOK {
return 0, fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
return len(p), nil
}

// 根据输入参数good决定用PUT还是DELETE方法访问数据服务的temp接口
func (w *TempPutStream) Commit(good bool) {
method := "DELETE"
if good {
method = "PUT"
}
request, _ := http.NewRequest(method, "http://"+w.Server+"/temp/"+w.Uuid, nil)
client := http.Client{}
client.Do(request)
}
数据服务

main函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
// 之前的版本定位对象通过调用os.Stat来检查对象文件是否存在
// 每次定位请求都会导致一次磁盘访问,会对系统带来很大负担
// 为减少对磁盘的访问次数,数据服务定位功能仅在程序启动时候扫描一遍本地磁盘
// 将磁盘中所有的对象散列值读入内存,之后的定位不需要再次访问磁盘,只需搜索内存即可
locate.CollectObjects()
go heartbeat.StartHeartbeat()
go locate.StartLocate()
http.HandleFunc("/objects/", objects.Handler)
// 引入temp.Handler处理函数注册
http.HandleFunc("/temp/", temp.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}
数据服务的locate包

具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 用于缓存所有对象
var objects = make(map[string]int)
// 保护对objects的读写操作
var mutex sync.Mutex

// 利用map操作判断某个散列值是否存在于objects中,存在返回true,否则返回false
func Locate(hash string) bool {
mutex.Lock()
_, ok := objects[hash]
mutex.Unlock()
return ok
}

// 将一个散列值加入缓存,输入参数hash作为存入map的键,值为1
func Add(hash string) {
mutex.Lock()
objects[hash] = 1
mutex.Unlock()
}

// 将一个散列值移出缓存
func Del(hash string) {
mutex.Lock()
delete(objects, hash)
mutex.Unlock()
}

func StartLocate() {
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
q.Bind("dataServers")
c := q.Consume()
for msg := range c {
hash, e := strconv.Unquote(string(msg.Body))
if e != nil {
panic(e)
}

// 直接将从RabbitMQ消息队列中收到的对象散列值作为Locate参数
exist := Locate(hash)
if exist {
q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS"))
}
}
}

func CollectObjects() {
// 读取存储目录里的所有文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")
for i := range files {
// 对读出的文件一一调用filepath.Base获取其基本文件名
// 也就是对象的散列值,将散列值加入objects缓存
hash := filepath.Base(files[i])
objects[hash] = 1
}
}
数据服务的temp包

Handler函数针对访问temp接口的HTTP方法分别调用相应的处理函数put,patch,post和del:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodPut {
put(w, r)
return
}
if m == http.MethodPatch {
patch(w, r)
return
}
if m == http.MethodPost {
post(w, r)
return
}
if m == http.MethodDelete {
del(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

temp包的post相关函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
type tempInfo struct {
Uuid string
Name string
Size int64
}

func post(w http.ResponseWriter, r *http.Request) {
// 生成一个随机的uuid
output, _ := exec.Command("uuidgen").Output()
uuid := strings.TrimSuffix(string(output), "\n")
// 从请求的URL获取对象的名字,也即散列值
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 从头部获取对象的大小
size, e := strconv.ParseInt(r.Header.Get("size"), 0, 64)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 将uuid,name,size拼成一个tempInfo结构体
t := tempInfo{uuid, name, size}
// 调用tempInfo的writeToFile方法将结构体内容写入磁盘文件
e = t.writeToFile()
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 保存临时对象的内容
os.Create(os.Getenv("STORAGE_ROOT") + "/temp/" + t.Uuid + ".dat")
// 将uuid通过HTTP响应返回给接口服务
w.Write([]byte(uuid))
}

func (t *tempInfo) writeToFile() error {
f, e := os.Create(os.Getenv("STORAGE_ROOT") + "/temp/" + t.Uuid)
if e != nil {
return e
}
defer f.Close()
// 将tempInfo的内容经过JSON编码后写入文件
// 用于保存临时对象信息,与实际的对象内容不同
b, _ := json.Marshal(t)
f.Write(b)
return nil
}

接口服务调用POST方法之后会从数据服务获得一个uuid,这意味着数据服务已经为这个临时对象做好了准备。之后接口服务还需要继续调用PATCH方法将数据上传。

patch方法的相关函数实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func patch(w http.ResponseWriter, r *http.Request) {
// 先找到URL的<uuid>部分
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
// 从相关信息文件中读取tempInfo结构体
tempinfo, e := readFromFile(uuid)
// 找不到,返回404
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid
datFile := infoFile + ".dat"
// 找到,打开临时文件
f, e := os.OpenFile(datFile, os.O_WRONLY|os.O_APPEND, 0)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer f.Close()
// 将请求的正文写入到数据文件
_, e = io.Copy(f, r.Body)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 写完数据,调用f.Stat方法获取数据文件的信息
info, e := f.Stat()
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
actual := info.Size()
if actual > tempinfo.Size {
os.Remove(datFile)
os.Remove(infoFile)
log.Println("actual size", actual, "exceeds", tempinfo.Size)
w.WriteHeader(http.StatusInternalServerError)
}
}

// 根据uuid打开temp目录下的uuid文件,获取全部内容并经过JSON解码成一个tempInfo结构体
func readFromFile(uuid string) (*tempInfo, error) {
f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/temp/" + uuid)
if e != nil {
return nil, e
}
defer f.Close()
b, _ := ioutil.ReadAll(f)
var info tempInfo
json.Unmarshal(b, &info)
return &info, nil
}

接口服务调用PATCH方法将整个临时对象上传完毕后,自己也已经完成了数据校验的工作,根据数据校验的结果决定是调用PUT方法将临时文件转正还是调用DELETE 方法删除临时文件

temp包的put相关函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func put(w http.ResponseWriter, r *http.Request) {
// 获取uuid
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
// 打开数据文件读取对象
tempinfo, e := readFromFile(uuid)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid
datFile := infoFile + ".dat"
f, e := os.Open(datFile)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer f.Close()
// 读取文件大小
info, e := f.Stat()
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 进行文件大小比较
actual := info.Size()
os.Remove(infoFile)
if actual != tempinfo.Size {
os.Remove(datFile)
log.Println("actual size mismatch, expect", tempinfo.Size, "actual", actual)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 大小一致,调用commitTempObject将临时文件对象转正
// commitTempObject会将临时对象的数据文件修改名字
// 还会调用locate.Add将<hash>加入数据服务的对象定位缓存
commitTempObject(datFile, tempinfo)
}

temp包的del函数:

1
2
3
4
5
6
7
8
// 获取uuid,删除相应的信息文件和数据文件
func del(w http.ResponseWriter, r *http.Request) {
uuid := strings.Split(r.URL.EscapedPath(), "/")[2]
infoFile := os.Getenv("STORAGE_ROOT") + "/temp/" + uuid
datFile := infoFile + ".dat"
os.Remove(infoFile)
os.Remove(datFile)
}
数据服务的objects包

数据服务的put方法需要做相关修改。

由于当前版本在数据服务的对象上传完全依靠temp接口的临时对象转正,不再需要objects接口的PUT方法,objects的Handler函数做修改如下:

1
2
3
4
5
6
7
8
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodGet {
get(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

读取对象的时候需要进行一次数据校验。

对objects.get相关函数做修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
func get(w http.ResponseWriter, r *http.Request) {
// 先从URL中获取对象的散列值,然后以散列值为参数调用getFile获得对象的文件名file
file := getFile(strings.Split(r.URL.EscapedPath(), "/")[2])
// file为空,返回404
if file == "" {
w.WriteHeader(http.StatusNotFound)
return
}
// file不为空,调用sendFile将对象文件的内容输出到HTTP响应
sendFile(w, file)
}

func getFile(hash string) string {
// 先根据hash值找到对应的文件
file := os.Getenv("STORAGE_ROOT") + "/objects/" + hash
// 打开文件并计算文件的hash值和URL中的hash进行比较
f, _ := os.Open(file)
d := url.PathEscape(utils.CalculateHash(f))
f.Close()
// hash不一致,出现问题,从缓存和磁盘上删除对象
// 返回空字符串
if d != hash {
log.Println("object hash mismatch, remove", file)
locate.Del(hash)
os.Remove(file)
return ""
}
// 一致则返回对象的文件名
return file
}

// 两个输入参数,用于写入对象数据的w和对象的文件名file
func sendFile(w io.Writer, file string) {
// 调用os.Open打开对象文件
f, _ := os.Open(file)
defer f.Close()
// 用io.Copy将文件内容写入w
io.Copy(w, f)
}

注意:即使再接口层已经对数据进行过校验,在数据服务层进行校验依然很有必要。本版本的数据校验是用于防止存储系统的数据降解,哪怕在上传时正确的数据也有可能随着时间的流逝而逐渐发生损坏。

功能测试

本版本实现的程序可见添加数据校验和去重的分布式对象存储系统,功能测试请参考shell脚本进行。

去重带来的性能问题

实际的功能测试中可以发现,系统在第一次PUT对象时等待了约1s。这是locate定位的超时时间。 为了去重,每一个新对象上传时都不得不等待这个时间以确保数据服务中不存在散列值相等的对象。实际使用中大多数情况下上传的都是内容不同的新对象,这是一个很严重的性能问题。减少定位的超时时间可以减少用户的等待时间,但这并不算是从根本上解决了问题,且超时时间设置过短也会提升SIS检查的失败概率(比如某个对象其实存在于数据服务中但没能及时返回定位消息),这么做得不偿失。

有一个看上去可行的解决方案是免除小对象的去重:对于大对象,其上传的时间本来就比较长,比如1个10MB的对象在20Mbit/s 上行带宽的连接上需要4s的传输时间,1s 的定位超时只是25%的额外时间,看上去这个并不特别突出。而一个10KB 的对象上传只需要0.004s,25000%的额外等待就显得无法忍受了。如果免除小对象的去重,看上去性能会好很多,小对象本身占用的空间也不大,不去重似乎也可以接受。

但很可惜这样是不行的,原因有两点:

  • 首先,对小对象不去重会导致它们在对象存储系统的每一个数据服务节点上都存在一个备份,这就会占用大量的磁盘资源
  • 更重要的原因在于,一旦接口服务定位一个这样的小对象,所有的数据服务节点都会响应,然后每一个节点都会反馈一个消息以通知该对象的存在。渐渐的消息队列会塞满反馈消息。而如果有用户在同一时间下载大量小对象(比如用户从云端恢复客户机的操作系统),那就成了系统的灾难,真正的生产环境可不会像测试这样只有寥寥几台数据服务节点,而是可能有成千上万的数据节点。

这个性能问题单靠对象存储服务端是无法解决的。一个有效的解决方案是优化客户端的行为。如果客户端能将多个小对象尽量打包成一个大对象上传而不是分别上传,那么1s的等待时间就可以忽略。而且,当客户端下载小对象时,就需要下载含有该小对象的大对象,然后从中取出小对象。这样看上去有些烦琐,但是在需要一次性恢复大量小对象时非常有利,因为无须为每个小对象而频繁访问对象存储服务。

参考

《分布式对象存储—原理、架构及Go语言实现》

一台服务器最大可以支持的TCP连接

理论情况

TCP连接四元组是源IP地址、源端口、目的IP地址和目的端口。任意一个元素发生了改变,那么就代表的是一条完全不同的连接了。对任意一个网络服务来说,它的端口是固定。另外IP也是固定的,这样目的IP地址、目的端口都是固定的。剩下源IP地址、源端口是可变的。所以理论上该服务最多可以建立2的32次方(ip数)×2的16次方(port数)个连接。这是两百多万亿的一个大数字!!

实际情况

进程每打开一个文件(linux下一切皆文件,包括socket),都会消耗一定的内存资源。如果有不怀好心的人启动一个进程来无限的创建和打开新的文件,会让服务器崩溃。所以linux系统出于安全角度的考虑,在多个位置都限制了可打开的文件描述符的数量,包括系统级、用户级、进程级。这三个限制的含义和修改方式如下:

  • 系统级:当前系统可打开的最大数量,通过fs.file-max参数可修改
  • 用户级:指定用户可打开的最大数量,修改/etc/security/limits.conf
  • 进程级:单个进程可打开的最大数量,通过fs.nr_open参数可修改

修改以上内核参数后可以将文件句柄数据加大到100W,但是每一条TCP连接在服务器端都需要file, socket等内核对象。一条空TCP连接大概占用3.3KB内存。

接收区的缓存设置:

1
2
3
4
$ sudo sysctl -a | grep rmem
net.ipv4.tcp_rmem = 4096 131072 629145
net.core.rmem_default = 212992
net.core.rmem_max = 212992

其中在tcp_rmem中的第一个值是为TCP连接所需分配的最少字节数。该值默认是4K,最大的话8MB之多。也就是说有数据发送的时候需要至少为对应的socket再分配4K内存,甚至可能更大。

TCP分配发送缓存区的大小受参数net.ipv4.tcp_wmem配置影响:

1
2
3
4
$ sudo sysctl -a | grep wmem
net.ipv4.tcp_wmem = 4096 16384 4194304
net.core.wmem_default = 212992
net.core.wmem_max = 212992

net.ipv4.tcp_wmem中的第一个值是发送缓存区的最小值,默认也是4K。如果数据很大的话,该缓存区实际分配的也会比默认值大。

查询实际消耗

活动连接数量查询:

1
$ ss -n | grep ESTAB | wc -l

查看内存开销:

1
$ cat /proc/meminfo

通过slabtop命令可以查看到densty、flip、sock_inode_cache、TCP四个内核对象.

客户端最大建立的TCP连接数

修改ip_local_port_range和文件描述符

修改该值可以使客户端可用的端口号范围变大:

1
echo "5000 65000" > /proc/sys/net/ipv4/ip_local_port_range

还可以通过给主机增加多个IP地址来实现客户端的并发提高,但还需要修改文件描述符。

文件描述符修改:

1
2
3
4
5
6
7
8
9
10
11
//修改整个系统能打开的文件描述符为20W
echo 200000 > /proc/sys/fs/file-max


//修改所有用户每个进程可打开文件描述符为20W
#vi /etc/sysctl.conf
fs.nr_open=210000
#sysctl -p
#vi /etc/security/limits.conf
* soft nofile 200000
* hard nofile 200000

注意: limits中的hard limit不能超过nr_open, 所以要先改nr_open。而且最好是在sysctl.conf中改。避免重启的时候 hard limit生效了,nr_open不生效导致启动问题。

连接多个服务进行端口复用

TCP连接的本质是客户端和服务器分别在内存维护一堆socket内核对象,它们只要能找到对方就算一条连接。在连接过程中,端口只是socket对象找到另一半的信物之一。

源码分析:

socket中有一个主要的数据结构sock_common,在它里面有两个联合体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// file: include/net/sock.h
struct sock_common {
union {
__addrpair skc_addrpair; //TCP连接IP对
struct {
__be32 skc_daddr;
__be32 skc_rcv_saddr;
};
};
union {
__portpair skc_portpair; //TCP连接端口对
struct {
__be16 skc_dport;
__u16 skc_num;
};
};
......
}

其中skc_addrpair记录的是TCP连接里的IP对,skc_portpair记录的是端口对。

当客户端的两个连接使用一个端口和服务器的两个服务进行连接时,如何区分服务器给客户端发送的数据属于哪条连接?

在网络包到达网卡之后,依次经历DMA、硬中断、软中断等处理,最后被送到socket的接收队列中。该过程设计到协议层对网络帧的处理,主要的tcp_v4_rcv代码如下:

1
2
3
4
5
6
7
8
9
10
// file: net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
......
th = tcp_hdr(skb); //获取tcp header
iph = ip_hdr(skb); //获取ip header

sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest); // 返回socket,这里设计到连接
......
}

连接的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// file: include/net/inet_hashtables.h
static inline struct sock *__inet_lookup(struct net *net,
struct inet_hashinfo *hashinfo,
const __be32 saddr, const __be16 sport,
const __be32 daddr, const __be16 dport,
const int dif)
{
u16 hnum = ntohs(dport);
struct sock *sk = __inet_lookup_established(net, hashinfo,
saddr, sport, daddr, hnum, dif);

return sk ? : __inet_lookup_listener(net, hashinfo, saddr, sport,
daddr, hnum, dif);
}

先判断有没有连接状态的socket,这会走到__inet_lookup_established函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
struct sock *__inet_lookup_established(struct net *net,
struct inet_hashinfo *hashinfo,
const __be32 saddr, const __be16 sport,
const __be32 daddr, const u16 hnum,
const int dif)
{
//将源端口、目的端口拼成一个32位int整数
const __portpair ports = INET_COMBINED_PORTS(sport, hnum);
......

//内核用hash的方法加速socket的查找
unsigned int hash = inet_ehashfn(net, daddr, hnum, saddr, sport);
unsigned int slot = hash & hashinfo->ehash_mask;
struct inet_ehash_bucket *head = &hashinfo->ehash[slot];

begin:
//遍历链表,逐个对比直到找到
sk_nulls_for_each_rcu(sk, node, &head->chain) {
if (sk->sk_hash != hash)
continue;
if (likely(INET_MATCH(sk, net, acookie,
saddr, daddr, ports, dif))) {
if (unlikely(!atomic_inc_not_zero(&sk->sk_refcnt)))
goto begintw;
if (unlikely(!INET_MATCH(sk, net, acookie,
saddr, daddr, ports, dif))) {
sock_put(sk);
goto begin;
}
goto out;
}
}
}

内核使用hash+链表的方式来管理其维护的socket。服务器端计算完hash值以后找到对应的链表进行遍历。

socket的对比函数(宏)INET_MATCH:

1
2
3
4
5
6
7
8
// include/net/inet_hashtables.h
#define INET_MATCH(__sk, __net, __cookie, __saddr, __daddr, __ports, __dif) \
((inet_sk(__sk)->inet_portpair == (__ports)) && \
(inet_sk(__sk)->inet_daddr == (__saddr)) && \
(inet_sk(__sk)->inet_rcv_saddr == (__daddr)) && \
(!(__sk)->sk_bound_dev_if || \
((__sk)->sk_bound_dev_if == (__dif))) && \
net_eq(sock_net(__sk), (__net)))

INET_MATCH中将网络包tcp header中的__saddr、__daddr、__ports和Linux中的socket中inet_portpair、inet_daddr、inet_rcv_saddr进行对比。如果匹配socket就找到了。当然除了ip和端口,INET_MATCH还比较了其它一些东西,所以TCP还有五元组、七元组之类的说法。

由此:可以把同一个端口用于两条连接,只要server端的ip或两者端口不一样就能正确找到socket,而不是串线。

所以在客户端增加TCP最大并发能力有两个方法。第一个办法,为客户端配置多个ip。第二个办法,连接多个不同的server。

参考

https://mp.weixin.qq.com/s?__biz=MjM5Njg5NDgwNA==&mid=2247484207&idx=1&sn=50ae06628062bcdd5b2aff044f34fa80&chksm=a6e3021491948b0287e4f856791e4d1880ddfb76a76c3de4ea7c8e59a0cb1f2312c49e9ff5ce&cur_album_id=1532487451997454337&scene=189#rd

https://mp.weixin.qq.com/s?__biz=MjM5Njg5NDgwNA==&mid=2247484310&idx=1&sn=025f7787f39a9eef322ab73c4687b910&chksm=a6e302ad91948bbb17479890b03b47b93cc33af7db4ce5f22d63de83c498c424b71da7bc884b&cur_album_id=1532487451997454337&scene=189#rd