元数据服务

元数据服务

上一个版本的分布式对象存储实现了接口服务和数据服务的分离,对象的数据被保存在了专门的数据服务节点,而不是保存在接口服务的本地磁盘上。通过解耦后,可以往集群中添加新的接口服务结点或数据服务节点。

但也存在问题:

  • 如果多次PUT同一个对象,会发现该对象在所有的数据服务节点上都有一份副本。这是由于在每次PUT的时候都是随机选择一个数据服务节点,只要PUT次数足够多,那么所有的节点必然都会被选中一次,结果就是每个节点上都保存着这个对象的数据。为了解决这个问题,对象存储系统提出了一个十分重要的概念,叫作数据去重。
    • 最简单的办法似乎式只需要在每次PUT之前先定位一下,确保该对象不存在之后再PUT就好了。然而问题并没有那么简单,更复杂的情况是,两个名字不同的对象有可能内容相同。这样的对象也属于需要去重的范畴。这是怎么回事呢?一个对外提供服务的对象存储系统不可能只有一个用户,而是会有很多用户一起使用,这些用户上传的对象可能存在大量的重复数据。为了节省存储空间,对象存储服务通常都会尽量让数据相同的对象共享系统中的同一份数据存储
  • 另一个问题则是数据的不一致。假设多次PUT同一个对象,且内容不同,这个对象的不同版本会被随机保存在不同的数据服务节点上。当GET它时就会随机取得不同版本的对象,这不仅破坏了对象数据的一致性,也破坏了GET方法的幂等性( 对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的)。
    • 该问题似乎可以通过每次PUT之前先进行定位,如果该对象不存在则随机选择,如果存在则选择相对应的数服务节点。这时候如果客户不要求版本控制,系统的行为表现为用最新的版本覆盖上一个版本。但如果客户希望保存某个对象的所有版本,这时候用户上传的某个对象的所有版本都需要被保存起来。比如说,当用户第一次上传一个对象时,它的初始版本为1:当用户使用PUT方法改变了该对象的内容,那么新对象的版本为2,依次递增。新的版本会覆盖旧的版本,但是旧版本的对象不会被删除。在下载对象时,用户可以指定GET对象的任意一个版本。 为了实现版本控制,需要一个数据库来记录系统中所有对象的所有版本。这个数据库就是元数据服务。

元数据

元数据指的是对象的描述信息。为了和对象的数据本身区分开,赋予了元数据这个名称。对象的哪些信息可以称作元数据?举例来说,有对象的名字、版本、大小以及散列值等。这些都是系统定义的元数据,因为它们的存在对一个对象存储系统有实际意义,比如说客户端和接口服务之间根据对象的名字来引用一个对象:一个对象可以有多个版本,除了删除标记外,每个版本实际都指向数据服务节点上的一份数据存储。

用户自定义的元数据:除了系统定义的元数据以外,用户也可以为这个对象添加自定义的元数据。这些元数据通常以键值对形式保存的任意描述信息,比如一张照片的拍摄时间和拍摄地点,一首歌的作者和演唱者等。对象存储系统不关心这些元数据,但是用户需要将它们添加到对象存储系统中,作为该对象的元数据进行保存

散列值和散列函数

对象的散列值是一种非常特殊的元数据,因为对象存储通常将对象的散列值作为其全局唯一的标识符。在此前,数据服务节点上的对象都是用名字来引用的,如果两个对象名字不同,那么无法知道它们的内容是否相同。如此则无法实现针对不同对象的去重。现在,以对象的散列值作为标识符,就可以将接口服务层访问的对象和数据服务存取的对象数据解耦合客户端和接口服务通过对象的名字来引用一个对象,而实际则是通过其散列值来引用存储在数据节点上的对象数据,只要散列值相同则可以认为对象的数据相同,这样就可以实现名字不同但数据相同的对象之间的去重。

对象的散列值是通过散列函数计算出来,散列函数会将对象的数据进行重复多轮的数学运算,这些运算操作包括按位与、按位或、按位异或等,最后计算出来一个长度固定的数字,作为对象的散列值。一个理想的散列函数具有以下5个特征。

  • 操作具有决定性,同样的数据必定计算出同样的散列值
  • 无论计算任何数据都很快。
  • 无法根据散列值倒推数据,只能遍历尝试所有可能的数据。
  • 数据上微小的变化就会导致散列值的巨大改变,新散列值和旧散列值不具有相关性。
  • 无法找到两个能产生相同散列值的不同数据。

实际情况无法满足所有要求,一个散列函数hash的安全级别根据3种属性决定:

  • 抗原像攻击:给定一个散列值h,难以找到一个数据m令 h=hash(m)。这个属性称为函数的单向性。欠缺单向性的散列函数易受到原像攻击。
  • 抗第二原像攻击:给定一个数据m1,难以找到第二个数据m2令hash(m1)=hash(m2)。欠缺该属性的散列函数易受到第二原像攻击。
  • 抗碰撞性:难以找到两个不同的数据m1和m2令hash(m1)=hash(m2)。这样的一对数据被称为散列碰撞。

本项目使用的散列函数为SHA-256,该函数使用64位的数学运算,产生一个长度为256位的二进制数字作为散列值。

分布式对象存储加入元数据服务

元数据服务就是提供对元数据的存取功能的服务。本项目中实现的元数据服务较为简单,它将只保存系统定义的元数据,也就是对象的名字、版本、大小和散列值,因为这些直接影响到存储功能

和上一版本的架构对,加入元数据服务的架构其他组件不变,而多了一个ElasticSearch (以下简称ES),也就是项目种选择的元数据服务。需要说明的是能做元数据服务的并不只有ES一种,任何一个分布式数据库都可以做元数据服务。选择ES的原因是它足够好且实现方便。和RabbitMQ一样,ES本身也支持集群,但是在本书的测试环境中只使用了一个服务节点。
ES使用的也是REST 接口接口服务节点作为客户端通过HTTP访问ES的索引(index)。ES 的索引就相当于一个数据库,而类型(type)则相当于数据库里的一张表。项目种会创建一个名为metadata的索引,其中有一个名为objects 的类型。

加入元数据服务的分布式存储架构

REST接口

有了元数据服务之后,就可以给接口服务增加新的功能,首先是给对象的GET方法增加一个参数version:

1
GET /objects/<object_name>?version=<version_id>

响应正文:

  • 对象的数据:这个参数可以告诉接口服务客户端需要的是该对象的第几个版本,默认是最新的那个
1
PUT /objects/<object_name>

请求头部(Request Header)

  • Digest: SHA-256=<对象散列值的Base64编码>
  • Content-Length:<对象数据的长度>

请求正文

  • 对象的内容如下:
  • PUT方法没变,但是每次客户端PUT一个对象时,必须提供一个名为Digest 的HTTP请求头部,它记录了用SHA-256散列函数计算出来的对象散列值。

HTTP头部分为请求头部(Request Header)和响应头部(Response Header),它允许客户端和服务器在HTTP的请求和响应中交换额外的信息。一个头部由3个部分组成:一个大小写不敏感的名字,后面跟着一个冒号“:”,然后是该头部的值。注意头部的值不能包含回车。

Digest头部的名字是 Digest,后面跟着一个冒号,然后是 Digest头部的值,也就是”SHA-256=<对象散列值的Base64编码>”。SHA-256是要求使用的散列函数,根据RFC3230的要求,客户端需要在Digest头部提供计算散列值时使用的散列函数,如果服务器发现客户端使用的散列函数跟服务器使用的散列函数不一致则会拒绝整个请求SHA-256计算出的散列值是一个256位的二进制数字,客户端还需要对其进行Base64编码,将数字转化成ASCII字符串格式,以确保不包含回车的二进制数字。

Base64编码规则选定了64个不同的字符,分别代表1个6位的二进制数字。对一个256位的二进制数字进行编码,首先要将其切成11个24位的二进制数字(不足的位在最后一个数字用0补齐),然后每个数字正好用4个Base64字符来表示。

经过Base64编码后的散列值将作为该对象的全局唯一标识符,也是数据服务节点储存的对象名。也就是说,只要对象内容发生了变化,那么原来在数据服务节点上储存的数据不会被更新,而是会储存一个新的对象。

除了Digest头部以外,客户端还必须提供一个名为Content-Length 的HTTP请求头部用来告诉服务端该对象数据的长度。客户端提供的对象散列值和长度会作为元数据被保存在元数据服务中。

将数据服务层存取的对象名和接口服务层访问的对象名区分开对于去重来说至关重要。现在,无论接口服务层收到的对象名是什么,只要从数据服务层角度看到的对象名一致,就可以认为是对象的内容一致,去重就只需要简单地根据数据服务层的对象名来实现就可以

PUT成功后,在元数据服务中该对象就会添加一个新的版本,版本号从1开始递增。

除了对象的GET 和 PUT方法发生了变化以外,还可以添加新的功能,首先是对象的DELETE方法。

1
DELETE /objects/<object_name>

使用DELETE方法来删除一个对象。

在此之前都没有实现对象的删除功能,这是有原因的。对象存储的去重会让名字不同的对象共享同一份数据存储,而删除一个对象意味着要将该对象和数据存储之间的联系断开。在把对象的名字和对象的数据存储解耦合之前,无法做到在删除一个对象的同时保留对象的数据存储。 有了元数据服务,在删除一个对象时,只需要在元数据中给对象添加一个表示删除的特殊版本,而在数据节点上保留其数据。

在GET时,如果该对象的最新版本是一个删除标记,则返回404 Not Found。除了对象的删除功能之外,还需要提供对象的列表功能,用于查询所有对象或指定对象的所有版本。

1
GET /versions/
  • 响应正文
    指定对象的所有版本:客户端GET某个指定对象的版本列表,接口服务节点返回该对象的所有版本。HTTP响应内容结构同上。
ES接口

元数据服务的索引使用映射(mappings)结构:

1
2
3
4
5
6
7
8
9
10
11
12
{
mappings":{
"objects":{
"properties":{
"name" : {"type" : "string" , "index" :"not analyzed"},
"version" :{"type": "integer"},
"size":{"type"": ""integer"},
"hash" : {"type : "string" },
}
}
}
}

ES的索引相当于数据库而类型相当于数据库的表,那么现在这个映射则相当于定义表结构。这个映射会在创建metadata索引时作为参数一并被引入,该索引只有一个类型就是objects,其中包括4个属性分别是 name、version、size和hash,相当于数据库表的4个列。

name属性有个额外的要求”index”:”not_analyzed”,这是为了在搜索时能够精确匹配name默认的 analyzed index 会对name进行分词匹配。这有可能导致不相关的匹配结果。比如我们有一个元数据的name是“little cat”,如果使用analyzed index,那么它会被分成little和cat两个词,之后任何包含little或cat的搜索都会导致“little cat”被选中。

添加对象元数据

当客户端PUT或DELETE对象时,都需要往元数据服务添加新版本,处理步骤如图:

往元数据服务添加新版本

上图显示了往元数据服务添加新版本的流程,当接口服务需要给某个对象添加一个新版本时,首先会去查询该对象当前最新版本的元数据,如果该对象不存在,则新版本从1开始:否则新版本为当前最新版本加1,然后将其添加进元数据服务。

GET对象时分两种情况,如果没有指定版本号,同样需要搜索对象最新版本的元数据;如果指定了版本号,可以根据对象的名字和版本号直接获取对象指定版本的元数据。

用到的ES API

要想荻取对象当前最新版本的元数据需要使用ES搜索API.

1
GET /metadata/_search?q=name:<object_name>&size=1&sort=version:desc

给对象添加一个新版本需要使用ES索引API:

1
PUT /metadata/objects/<object_name>_<version>?op_type=create

在这里,特地将<object_name>_<version>作为_id创建。这是为了当客户端指定版本GET对象时可以直接根据对象名和版本号拼出相对应的_id来从ES中获取元数据,从而免除了搜索的步骤。

使用op_type=create可以确保当多个客户端同时上传同一个对象时不至于发生数据丢失,因为只有第一个请求能成功上传给ES。其他请求会收到HTTP错误代码409 Conflict,这样接口服务节点就能知道发生了版本冲突并重新上传。

当客户端GET对象时分两种情况,如果没有指定版本号,则使用和之前同样的ES搜索API来获取对象的最新版本。

如果客户端指定版本号GET对象,则使用ES Get API直接获取对象指定版本的元数据。

1
GET /metadata/objects/<object_name>_<version_id>/_source

当客户端GET全体对象版本列表时,使用ES搜索API方法如下:

1
GET /metadata/_search?sort=name,version&from=<from>&size=<size>

其中,from和 size用于分页显示。在不指定from和 size的情况下,ES 默认的分页是从0开始显示10条。

当客户端GET指定对象版本列表时,使用ES 搜索API方法如下:

1
GET /metadata/_search?sort=name,versions&from=<from>&size=<size>&q=name:<object_name>

这里多了一个q参数用于指定name。

对象PUT流程

加入元数据服务的对象put流程

客户端的HTTP请求提供了对象的名字、散列值和大小,接口服务以散列值作为数据服务的对象名来保存对象,然后在元数据服务中根据对象的名字搜索当前最新的元数据,使其版本号加1并添加一个新版本的元数据。

对象GET流程

加入元数据服务的对象get流程

客户端在HTTP请求中指定对象的名字,可在 URL的查询参数中指定版本号。如果指定版本号,则接口服务根据对象的名字和版本号获取元数据;否则根据对象的名字搜索最新元数据。然后从元数据中获得对象的散列值作为数据服务的对象名来读取对象。

具体实现

主要关注于与上一版本有变化的部分(主要在接口服务种实现元数据的互动)进行说明。

接口服务
接口服务的main函数
1
2
3
4
5
6
7
func main() {
go heartbeat.ListenHeartbeat()
http.HandleFunc("/objects/", objects.Handler)
http.HandleFunc("/locate/", locate.Handler)
http.HandleFunc("/versions/", versions.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}

本版本的接口服务main函数多了一个用于处理/vesions/的函数,名字为versions.Handler

接口服务的versions

主要工作为调用es包的函数完成相关工作:

versions.Handler函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func Handler(w http.ResponseWriter, r *http.Request) {
// 检查HTTP方法是否为GET
m := r.Method
// 如果不为GET,返回405 Method Not Allowed
if m != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 方法为GET,获取URL中的<object_name>部分
from := 0
size := 1000
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 无限循环调用es包的SearchAllVersions函数
for {
// 返回一个元数据的数组
metas, e := es.SearchAllVersions(name, from, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 遍历数组,将元数据一一写入HTTP响应的正文
for i := range metas {
b, _ := json.Marshal(metas[i])
w.Write(b)
w.Write([]byte("\n"))
}
// 如果返回的数组长度不等于size,说明元数据服务种没有更多的数据,直接返回
if len(metas) != size {
return
}
// 否则把from的值增加1000进行下一轮迭代
from += size
}
}
接口服务的objects包

加入元数据服务以后,接口服务的objects包与上一版本相比发生了较大的变化,除了多了一个对象的DELETE方法以外,对象的PUT和GET方法也都有所改变,它们需要和元数据服务互动。

obejects.Handler函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodPut {
put(w, r)
return
}
if m == http.MethodGet {
get(w, r)
return
}
// 与上一版本多一个DELETE方法处理del函数
if m == http.MethodDelete {
del(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

objects.del函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func del(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 以name为参数调用es.SearchLaestVersion,搜索该对象最新的版本
version, e := es.SearchLatestVersion(name)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 插入新的元数据,接受元数据的name, version, size和hash
// hash 为空字符串表示这个一个删除标记
e = es.PutMetadata(name, version.Version+1, 0, "")
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

objects.put相关函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func put(w http.ResponseWriter, r *http.Request) {
// 先从HTTP请求头部获取对象的散列值
hash := utils.GetHashFromHeader(r.Header)
if hash == "" {
log.Println("missing object hash in digest header")
w.WriteHeader(http.StatusBadRequest)
return
}

// 以散列值作为参数调用stroreObject
c, e := storeObject(r.Body, url.PathEscape(hash))
if e != nil {
log.Println(e)
w.WriteHeader(c)
return
}
if c != http.StatusOK {
w.WriteHeader(c)
return
}

// 从URL中获取对象的名字和对象的大小
name := strings.Split(r.URL.EscapedPath(), "/")[2]
size := utils.GetSizeFromHeader(r.Header)
// 以对象的名字、散列值和大小为参数调用es.AddVersions给对象添加新版本
e = es.AddVersion(name, hash, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
}
}

GetHashFromHeader函数和GetSizeFromHeader函数是utils包提供的两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func GetHashFromHeader(h http.Header) string {
// 获取"digest"头部
digest := h.Get("digest")
// 检查diest头部的形式是否满足"SHA-256=<hash>"
if len(digest) < 9 {
return ""
}
if digest[:8] != "SHA-256=" {
return ""
}
return digest[8:]
}

func GetSizeFromHeader(h http.Header) int64 {
// 得到"conten-length"头部,并调用strconv.PareseInt将字符串转换为int64输出
size, _ := strconv.ParseInt(h.Get("content-length"), 0, 64)
return size
}

objects.get函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func get(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 获取URL并从URL的查询参数中获取"version"参数的值
// Query方法返回一个保存URL所有查询参数的map,该map的键是查询参数的名字,值是一个字符串数组
// HTTP的URL查询参数允许存在多个值,以"version"为key可以得到URL中查询参数的所有值
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
// 项目中不考虑多个"version"查询参数的情况
// 始终以versionId数组的第一个元素作为客户端提供的版本号
// 将字符串转换为整型
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
// 调用es的GetMetadata函数得到对象的元数据meta
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// meta.Hash为对象的散列值,如果为空表示该对象版本是一个删除标记
// 返回404 Not Found
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
// 以散列值为对象名从数据服务层获取对象并输出
object := url.PathEscape(meta.Hash)
stream, e := getStream(object)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
io.Copy(w, stream)
}
es包

es包封装了以HTTP访问ES的各种API的操作。

es.getMetadata函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 结构与ES映射中定义的objects类型属性一一对应
type Metadata struct {
Name string
Version int
Size int64
Hash string
}

// 根据对象的名字和版本号来获取对象的元数据
func getMetadata(name string, versionId int) (meta Metadata, e error) {
// ES服务器地址来自环境变量ES_SERVER,索引是metadata,类型是objects
// 文档的id由对象的名字和版本号拼接而成。
url := fmt.Sprintf("http://%s/metadata/objects/%s_%d/_source",
os.Getenv("ES_SERVER"), name, versionId)
// GET到URL中的对象的元数据,免除耗时的搜索操作
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf("fail to get %s_%d: %d", name, versionId, r.StatusCode)
return
}
// 读出数据
result, _ := ioutil.ReadAll(r.Body)
// ES返回的结果经过JSON解码后被es,SearchLatestVersion函数使用
json.Unmarshal(result, &meta)
return
}

es.SearchLatestVersion函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type hit struct {
Source Metadata `json:"_source"`
}

type searchResult struct {
Hits struct {
Total int
Hits []hit
}
}

func SearchLatestVersion(name string) (meta Metadata, e error) {
// 调用ES搜索API.在URL中指定对象的名字,版本号以降序排列且只返回第一个结果。
url := fmt.Sprintf("http://%s/metadata/_search?q=name:%s&size=1&sort=version:desc",
os.Getenv("ES_SERVER"), url.PathEscape(name))
fmt.Println(url)
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf("fail to search latest metadata: %d", r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
// ES返回的结果通过JSON解码到一个searchResult结构体中
// searchResult和ES搜索API返回的结构体保持一致
// 方便读取搜索到的元数据并赋值给meta返回。
json.Unmarshal(result, &sr)
if len(sr.Hits.Hits) != 0 {
meta = sr.Hits.Hits[0].Source
}
// ES的返回结果长度为0,说明没有搜到相对应的元数据,直接返回
// 此时meta中各属性都为初始值:字符串为空,整型为0
return
}

es.GetMetadata函数:

GetMetadata函数的功能类似getMetadata,输入对象的名字和版本号返回对象,区别在于当version为0时会调用SearchLatestVersion获取当前最新的版本。

1
2
3
4
5
6
7
func GetMetadata(name string, version int) (Metadata, error) {
// 当version为0的时候,调用SearchLatestVersion获取当前最新的版本
if version == 0 {
return SearchLatestVersion(name)
}
return getMetadata(name, version)
}

es.PutMetadata函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 用于向ES服务上传一个新的元数据,输入的4个参数对应元数据的4个属性
func PutMetadata(name string, version int, size int64, hash string) error {
// 生成ES文档,一个ES的文档相当于数据库的一条记录
doc := fmt.Sprintf(`{"name":"%s","version":%d,"size":%d,"hash":"%s"}`,
name, version, size, hash)
client := http.Client{}
// 使用op_type=create参数,如果同时又多个客户端上传同一个数据,结果会发生冲突
// 只有第一个文档被成功创建,之后的PUT请求,ES会返回409Conflict
url := fmt.Sprintf("http://%s/metadata/objects/%s_%d?op_type=create",
os.Getenv("ES_SERVER"), name, version)
// 用PUT方法将文档上传到metadata索引的objects类型
request, _ := http.NewRequest("PUT", url, strings.NewReader(doc))
r, e := client.Do(request)
if e != nil {
return e
}
// 如果为409Conflict,函数让版本号加1并递归调用自身继续上传
if r.StatusCode == http.StatusConflict {
return PutMetadata(name, version+1, size, hash)
}
if r.StatusCode != http.StatusCreated {
result, _ := ioutil.ReadAll(r.Body)
return fmt.Errorf("fail to put metadata: %d %s", r.StatusCode, string(result))
}
return nil
}

es.AddVersion函数:

1
2
3
4
5
6
7
8
9
func AddVersion(name, hash string, size int64) error {
// 获取对象最新的版本
version, e := SearchLatestVersion(name)
if e != nil {
return e
}
// 在版本号上加1调用PutMetadata
return PutMetadata(name, version.Version+1, size, hash)
}

es.SearchAllVersion函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 用于搜索某个对象或所有对象的全部版本
func SearchAllVersions(name string, from, size int) ([]Metadata, error) {
// name表示对象的名字,如果name不为空字符粗则搜索指定对象的所有版本
// 否则搜索所有对象的全部版本
// from和size指定分页的显示结果
// 搜索的结果按照对象的名字和版本号排序
url := fmt.Sprintf("http://%s/metadata/_search?sort=name,version&from=%d&size=%d",
os.Getenv("ES_SERVER"), from, size)
if name != "" {
url += "&q=name:" + name
}
r, e := http.Get(url)
if e != nil {
return nil, e
}
metas := make([]Metadata, 0)
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(result, &sr)
for i := range sr.Hits.Hits {
metas = append(metas, sr.Hits.Hits[i].Source)
}
return metas, nil
}

测试

测试环境与上一版本相同,具体代码见带元数据服务的分布式对象存储

参考

《分布式对象存储—原理、架构及Go语言实现》