数据维护

数据维护

对象存储系统的长期运行离不开对系统的维护,系统维护包括3个方面:硬件、软件和数据维护。硬件维护包括现有硬件的维修、更换和升级以及新硬件的添加等;软件维护包括错误原因的调查和修复,软件的升级、回滚和全新的安装部署等;数据维护则包括数据的修复、过期数据的删除等。

硬件和软件方面的维护通常有一整套处理流程,手动或自动分批执行,确保整个系统在部分维护的情况下的可用性。

数据维护主要通过软件执行预先设定的维护工作。

对象存储系统的数据维护工作

对象版本留存

存储空间不可能无限制增长,而用户的对象却是每天都会有新版本添加进来。这些数据很快就会变得过时,被更新的对象所替代。而用户通常也不需要长期保留所有的版本。所以需要提供一种留存策略,在保留用户更关注的版本的情况下清理一些不必要的版本。

版本留存策略就是一套决定哪些版本需要被保留的决策依据。数量限定策略会保留每个对象最后的若干个版本而将其余的删除;时间限定策略会将版本保留一段固定的时间,比如3个月或半年等,超过这个阈值的版本将被删除;除了数量限定和时间限定这两类策略以外,当然还有很多更复杂更精妙的策略。

本项目实现最简单的数量限定策略,对于每个对象仅保留最后5个版本。

维护软件除了需要在元数据服务中删除对象旧版本的元数据以外,对于已经没有任何元数据引用的对象数据也需要进行清除。此处存在一个竞争条件的步骤序列:

  1. 维护软件检查某个对象散列值,没有任何元数据引用它
  2. 有用户需要上传一个相同散列值的对象,由于SIS检查该散列值存在,跳过了上传步骤
  3. 维护软件删除了该对象散列值
  4. 用户添加了新的版本引用这个散列值

这样的步骤序列一旦发生,意味着用户的对下数据丢失。为避免这种情况,在删除对象散列值时并没有彻底删除文件,只是将对象文件移动到另一个grabage目录。隔一段时间后再去真正删除。

数据服务的REST接口
1
DELETE /objects/<hash>

删除对象散列值需要数据服务提供对象的DELETE 操作,该接口不仅将对象文件移动至垃圾目录,且从数据服务的定位对象缓存中删除散列值。

具体实现

删除过期元数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
const MIN_VERSION_COUNT = 5

func main() {
// 调用es.SearchVersionStatus将元数据服务中所有版本数量大于等于6的对象搜索出来保存到Bucket结构体的数组buckets中
buckets, e := es.SearchVersionStatus(MIN_VERSION_COUNT + 1)
if e != nil {
log.Println(e)
return
}
// 遍历buckets
for i := range buckets {
bucket := buckets[i]
// 在for循环中调用es.DelMetadata,从该对象当前最小的版本号开始一一删除
// 直到最后剩下5个
for v := 0; v < bucket.Doc_count-MIN_VERSION_COUNT; v++ {
es.DelMetadata(bucket.Key, v+int(bucket.Min_version.Value))
}
}
}

// 输入min_doc_count用于指示需要搜索对象最小版本数量
func SearchVersionStatus(min_doc_count int) ([]Bucket, error) {
client := http.Client{}
url := fmt.Sprintf("http://%s/metadata/_search", os.Getenv("ES_SERVER"))
// 使用ElasticSearch的aggregation search API搜索元数据
// 以对象的名字分组,搜索版本数量大于等于min_doc_count的对象并返回
body := fmt.Sprintf(`
{
"size": 0,
"aggs": {
"group_by_name": {
"terms": {
"field": "name",
"min_doc_count": %d
},
"aggs": {
"min_version": {
"min": {
"field": "version"
}
}
}
}
}
}`, min_doc_count)
request, _ := http.NewRequest("GET", url, strings.NewReader(body))
r, e := client.Do(request)
if e != nil {
return nil, e
}
b, _ := ioutil.ReadAll(r.Body)
var ar aggregateResult
json.Unmarshal(b, &ar)
return ar.Aggregations.Group_by_name.Buckets, nil
}

// 根据对象的名字name和版本号version删除相应的对象元数据
func DelMetadata(name string, version int) {
client := http.Client{}
url := fmt.Sprintf("http://%s/metadata/objects/%s_%d",
os.Getenv("ES_SERVER"), name, version)
request, _ := http.NewRequest("DELETE", url, nil)
client.Do(request)
}
删除没有元数据引用的对象数据

deleteOrphanObject相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func main() {
// 需要在每个数据节点上定期运行
// 调用filepath.Global获取$STORAGE_ROOT/objects/目录下的所有文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")

// for循环中遍历访问文件
for i := range files {
// 从文件中获得对象的散列值
hash := strings.Split(filepath.Base(files[i]), ".")[0]
// 调用es.HasHash检查元数据服务中是否存在该散列值
hashInMetadata, e := es.HasHash(hash)
if e != nil {
log.Println(e)
return
}
// 不存在,调用del删除散列值
if !hashInMetadata {
del(hash)
}
}
}

// 通过ES的search API搜索所有对象元数据中hash属性等于散列值的文档
func HasHash(hash string) (bool, error) {
url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=0", os.Getenv("ES_SERVER"), hash)
r, e := http.Get(url)
if e != nil {
return false, e
}
b, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(b, &sr)
// 如果满足条件的文档数量不为0,说明还存在对该散列值的引用,函数返回true,否则返回false
return sr.Hits.Total != 0, nil
}

// 访问数据服务的DELETE对象接口进行散列值的删除
func del(hash string) {
log.Println("delete", hash)
url := "http://" + os.Getenv("LISTEN_ADDRESS") + "/objects/" + hash
request, _ := http.NewRequest("DELETE", url, nil)
client := http.Client{}
client.Do(request)
}

为支持对象的删除操作,数据服务的objects包发生变化:

DELETE对象相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodGet {
get(w, r)
return
}
if m == http.MethodDelete {
del(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

// 根据对象散列值搜索对象文件
func del(w http.ResponseWriter, r *http.Request) {
hash := strings.Split(r.URL.EscapedPath(), "/")[2]
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/" + hash + ".*")
if len(files) != 1 {
return
}
// 调用locate.Del将散列值移出对象定位缓存
locate.Del(hash)
// 调用os.Rename将对象文件移动到$STORAGE_ROOT/garbage/目录下
os.Rename(files[0], os.Getenv("STORAGE_ROOT")+"/garbage/"+filepath.Base(files[0]))
}
对象数据的检查和修复
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
func main() {
// 在数据服务节点上定期运行
// 调用filepath.Glob获取$STOTAGE_ROOT/objects/目录下所有文件
files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT") + "/objects/*")

// 在for循环中遍历访问文件
for i := range files {
// 从文件名中获得对象的散列值
hash := strings.Split(filepath.Base(files[i]), ".")[0]
// 调用verify检查数据
verify(hash)
}
}

func verify(hash string) {
log.Println("verify", hash)
// 调用es.SearchHashSize从元数据服务中获取该散列值对应的对象大小
size, e := es.SearchHashSize(hash)
if e != nil {
log.Println(e)
return
}
// 以对象的散列值和大小为参数调用objects.GetStream创建一个对象数据流
// 底层实现会自动完成数据的修复。
stream, e := objects.GetStream(hash, size)
if e != nil {
log.Println(e)
return
}
// 调用utils.CalculateHash计算对象的散列值
d := utils.CalculateHash(stream)
// 检查hash是否一致,不一致则以log的形式报告错误(数据损坏,已经不可修复)
if d != hash {
log.Printf("object hash mismatch, calculated=%s, requested=%s", d, hash)
}
// 关闭数据对象流
stream.Close()
}

// 输入对象的散列值hash,通过ES的seach API查询元数据属性中hash等于该散列值的文档的size属性
func SearchHashSize(hash string) (size int64, e error) {
url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=1",
os.Getenv("ES_SERVER"), hash)
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf("fail to search hash size: %d", r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(result, &sr)
if len(sr.Hits.Hits) != 0 {
size = sr.Hits.Hits[0].Source.Size
}
// 返回size
return
}

测试

本版本所有代码及测试用例可见增加数据维护版本全部代码

参考

《分布式对象存储—原理、架构及Go语言实现》