数据维护 对象存储系统的长期运行离不开对系统的维护,系统维护包括3个方面:硬件、软件和数据维护。硬件维护包括现有硬件的维修、更换和升级以及新硬件的添加等 ;软件维护包括错误原因的调查和修复,软件的升级、回滚和全新的安装部署等 ;数据维护则包括数据的修复、过期数据的删除等。
硬件和软件方面的维护通常有一整套处理流程,手动或自动分批执行,确保整个系统在部分维护的情况下的可用性。
数据维护主要通过软件执行预先设定的维护工作。
对象存储系统的数据维护工作 对象版本留存 存储空间不可能无限制增长,而用户的对象却是每天都会有新版本添加进来。这些数据很快就会变得过时,被更新的对象所替代。而用户通常也不需要长期保留所有的版本。所以需要提供一种留存策略,在保留用户更关注的版本的情况下清理一些不必要的版本。
版本留存策略就是一套决定哪些版本需要被保留 的决策依据。数量限定策略会保留每个对象最后的若干个版本而将其余的删除;时间限定策略会将版本保留一段固定的时间 ,比如3个月或半年等,超过这个阈值的版本将被删除;除了数量限定和时间限定这两类策略以外,当然还有很多更复杂更精妙的策略。
本项目实现最简单的数量限定策略,对于每个对象仅保留最后5个版本。
维护软件除了需要在元数据服务中删除对象旧版本的元数据以外,对于已经没有任何元数据引用的对象数据也需要进行清除。此处存在一个竞争条件的步骤序列:
维护软件检查某个对象散列值,没有任何元数据引用它
有用户需要上传一个相同散列值的对象,由于SIS检查该散列值存在,跳过了上传步骤
维护软件删除了该对象散列值
用户添加了新的版本引用这个散列值
这样的步骤序列一旦发生,意味着用户的对下数据丢失。为避免这种情况,在删除对象散列值时并没有彻底删除文件,只是将对象文件移动到另一个grabage目录。隔一段时间后再去真正删除。
数据服务的REST接口
删除对象散列值需要数据服务提供对象的DELETE 操作,该接口不仅将对象文件移动至垃圾目录,且从数据服务的定位对象缓存中删除散列值。
具体实现 删除过期元数据 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 const MIN_VERSION_COUNT = 5 func main () { buckets, e := es.SearchVersionStatus(MIN_VERSION_COUNT + 1 ) if e != nil { log.Println(e) return } for i := range buckets { bucket := buckets[i] for v := 0 ; v < bucket.Doc_count-MIN_VERSION_COUNT; v++ { es.DelMetadata(bucket.Key, v+int (bucket.Min_version.Value)) } } } func SearchVersionStatus (min_doc_count int ) ([]Bucket, error) { client := http.Client{} url := fmt.Sprintf("http://%s/metadata/_search" , os.Getenv("ES_SERVER" )) body := fmt.Sprintf(` { "size": 0, "aggs": { "group_by_name": { "terms": { "field": "name", "min_doc_count": %d }, "aggs": { "min_version": { "min": { "field": "version" } } } } } }` , min_doc_count) request, _ := http.NewRequest("GET" , url, strings.NewReader(body)) r, e := client.Do(request) if e != nil { return nil , e } b, _ := ioutil.ReadAll(r.Body) var ar aggregateResult json.Unmarshal(b, &ar) return ar.Aggregations.Group_by_name.Buckets, nil } func DelMetadata (name string , version int ) { client := http.Client{} url := fmt.Sprintf("http://%s/metadata/objects/%s_%d" , os.Getenv("ES_SERVER" ), name, version) request, _ := http.NewRequest("DELETE" , url, nil ) client.Do(request) }
删除没有元数据引用的对象数据 deleteOrphanObject相关函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 func main () { files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT" ) + "/objects/*" ) for i := range files { hash := strings.Split(filepath.Base(files[i]), "." )[0 ] hashInMetadata, e := es.HasHash(hash) if e != nil { log.Println(e) return } if !hashInMetadata { del(hash) } } } func HasHash (hash string ) (bool , error) { url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=0" , os.Getenv("ES_SERVER" ), hash) r, e := http.Get(url) if e != nil { return false , e } b, _ := ioutil.ReadAll(r.Body) var sr searchResult json.Unmarshal(b, &sr) return sr.Hits.Total != 0 , nil } func del (hash string ) { log.Println("delete" , hash) url := "http://" + os.Getenv("LISTEN_ADDRESS" ) + "/objects/" + hash request, _ := http.NewRequest("DELETE" , url, nil ) client := http.Client{} client.Do(request) }
为支持对象的删除操作,数据服务的objects包发生变化:
DELETE对象相关函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func Handler (w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodGet { get(w, r) return } if m == http.MethodDelete { del(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) } func del (w http.ResponseWriter, r *http.Request) { hash := strings.Split(r.URL.EscapedPath(), "/" )[2 ] files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT" ) + "/objects/" + hash + ".*" ) if len (files) != 1 { return } locate.Del(hash) os.Rename(files[0 ], os.Getenv("STORAGE_ROOT" )+"/garbage/" +filepath.Base(files[0 ])) }
对象数据的检查和修复 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 func main () { files, _ := filepath.Glob(os.Getenv("STORAGE_ROOT" ) + "/objects/*" ) for i := range files { hash := strings.Split(filepath.Base(files[i]), "." )[0 ] verify(hash) } } func verify (hash string ) { log.Println("verify" , hash) size, e := es.SearchHashSize(hash) if e != nil { log.Println(e) return } stream, e := objects.GetStream(hash, size) if e != nil { log.Println(e) return } d := utils.CalculateHash(stream) if d != hash { log.Printf("object hash mismatch, calculated=%s, requested=%s" , d, hash) } stream.Close() } func SearchHashSize (hash string ) (size int64 , e error) { url := fmt.Sprintf("http://%s/metadata/_search?q=hash:%s&size=1" , os.Getenv("ES_SERVER" ), hash) r, e := http.Get(url) if e != nil { return } if r.StatusCode != http.StatusOK { e = fmt.Errorf("fail to search hash size: %d" , r.StatusCode) return } result, _ := ioutil.ReadAll(r.Body) var sr searchResult json.Unmarshal(result, &sr) if len (sr.Hits.Hits) != 0 { size = sr.Hits.Hits[0 ].Source.Size } return }
测试 本版本所有代码及测试用例可见增加数据维护版本全部代码
参考 《分布式对象存储—原理、架构及Go语言实现》