可扩展分布式系统实现

可扩展分布式系统

分布式系统

一个分布式系统要求各节点分布在网络上,并通过消息传递来合作完成一个共同目标

分布式系统的三大关键特征:

  • 节点之间并发工作
  • 没有全局锁
  • 某个节点上发生的错误不影响其他节点。

分布式系统的好处在于可扩展性,只需要加入新的节点就可以自由扩展集群的性能。

实现分布式系统的关键是实现接口和具体实现的解耦。

接口服务和数据存储服务分离

如图所示,接口服务层提供了对外的REST接口,而数据服务层则提供数据的存储功能。接口服务处理客户端的请求,然后向数据服务存取对象,数据服务处理来自接口服务的请求并在本地磁盘上存取对象。

接口服务和数据存储服务分离

接口服务和数据服务之间的两种接口:

  • 实现对象存取的接口:对象的存取使用REST 接口。也就是说数据服务本身也提供REST 接口。此时,接口服务节点作为HTTP客户端向数据服务请求对象
  • 第二种接口通过RabbitMQ消息队列进行通信,对 RabbitMQ 的使用分为两种模式。
    • 一种模式是向某个exchange进行一对多的消息群发
    • 另一种模式则是向某个消息队列进行一对一的消息单发

每一个数据服务节点都需要向所有的接口服务节点通知自身的存在,为此,创建一个名为apiServers 的 exchange,每一台数据服务节点都会持续向这个exchange发送心跳消息。所有的接口服务节点在启动以后都会创建一个消息队列来绑定这个exchange,任何发往这个exchange 的消息都会被转发给绑定它的所有消息队列,也就是说每一个接口服务节点都会接收到任意一台数据服务节点的心跳消息。

接口服务需要在收到对象GET 请求时定位该对象被保存在哪个数据服务节点上,所以还需要创建一个名为dataServers的exchange。所有的数据服务节点绑定这个exchange并接收来自接口服务的定位消息。拥有该对象的数据服务节点则使用消息单发通知该接口服务节点。

之所以必须使用REST和消息队列这两种不同类型的接口是为了满足不同的需求:对象存取的特点是数据量有可能很大,不适合将一个巨大的对象通过消息队列传输。而REST接口虽然能够处理大数据量传输,但是对于群发却显得力不从心。

REST接口
  • 对于数据服务来说,它的REST接口和单机版本完全相同,也就是对象的PUT和GET方法。
  • 对于接口服务来说,除了对象的PUT和GET 方法之外,还应另外提供一个用于定位的 locate接口,用来帮助验证架构。
    1
    2
    GET /locate/<object_name>
    响应正文

客户端通过GET方法发起对象定位请求,接口服务节点收到该请求后通过dataServers exchange会向数据服务层群发一个定位消息,然后等待数据服务节点的反馈如果有数据服务节点发回确认消息,则返回该数据服务节点的地址;如果超过一定时间没有任何反馈,则返回HTTP错误代码404 NOT FOUND.

RabbitMQ消息设计

数据服务需要通过RabbitMQ将自身的存在通知给所有的接口服务,这样的消息称为心跳消息。

数据服务心跳消息

apiServers和dataServers这两个exchange需要在RabbitMQ服务器上预先创建。每个接口服务节点在启动后都会创建自己的消息队列并绑定至 apiServers exchange。 每个数据服务节点在启动后每隔5s就会发送一条消息给apiServers exchange,消息的正文就是该数据服务节点的 HTTP监听地址。接口服务节点在收到该消息后就会记录这个地址。

定位流程

每个数据服务节点在启动时都必须创建自己的消息队列并绑定至dataServers exchange。当接口服务需要定位时,会创建一个临时消息队列,然后发送一条消息给dataServers exchange消息的正文是需要定位的对象,返回地址则是该临时队列的名字。定位成功的数据服务节点需要将反馈消息发送给这个临时队列,反馈消息的正文是该数据服务节点自身的监听地址临时消息队列会在一定时间后关闭。如果在关闭前没有收到任何反馈则该对象定位失败,接口服务节点就会知道该对象不存在于数据服务层。

接口和存储分离的对象PUT流程

接口和存储分离的对象PUT流程

客户端向接口服务发送HTTP的PUT 请求并提供了<object_name><contentof object>,接口服务选出一个随机数据服务节点并向它转发这个PUT请求,数据服务节点将<content of object>写入$STORAGE_ROOT/objects/<object_name>文件。

接口和存储分离的对象GET流程

接口和存储分离的对象GET流程

客户端的GET 请求提供了<object_name>,接口服务在收到GET请求后会对该object进行定位,如果定位失败则返回404 Not Found;如果定位成功,接口服务会接收到某个数据服务的地址,就可以向该地址转发来自客户端的GET请求,由数据服务读取本地磁盘上的文件并将其内容写入HTTP响应的正文。

具体实现

数据服务

为支持新的功能,数据服务的在实现上扩展了单机版的相关功能:

1
2
3
4
5
6
func main() {
go heartbeat.StartHeartbeat()
go locate.StartLocate()
http.HandleFunc("/objects/", objects.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}

数据服务的heartbeat包:

1
2
3
4
5
6
7
8
9
10
11
12
// 每隔5s向apiServers exchange发送一条消息
// 将本服务节点的监听对地址发送出去
func StartHeartbeat() {
// 调用rabbitmq.New创建一个rabbitmq.RabbitMQ结构体
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
// 无限循环调用Publish方法向apiServers exchange发送本节点的监听地址
for {
q.Publish("apiServers", os.Getenv("LISTEN_ADDRESS"))
time.Sleep(5 * time.Second)
}
}

数据服务的locate包:

locate包有两个函数,分别是用于实际定位对象的Locate函数和用于监听定位消息的StartLocate函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 实际定位对象
func Locate(name string) bool {
// os.Stat访问磁盘上对应的文件名
_, err := os.Stat(name)
// 判断文件名是否存在,存在返回true,失败返回false
return !os.IsNotExist(err)
}

// 用于监听定位消息
func StartLocate() {
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
// 绑定dataServers exchange
q.Bind("dataServers")
// 返回一个channel
c := q.Consume()
// range遍历channel,接收消息
for msg := range c {
// JSON编码使得对象名字上有一对双引号,使用strconv.Unquote将输入的字符串前后的双引号去除并作为结果返回
object, e := strconv.Unquote(string(msg.Body))
if e != nil {
panic(e)
}
if Locate(os.Getenv("STORAGE_ROOT") + "/objects/" + object) {
//文件存在,调用Send方法向消息的发送方返回本服务节点的监听地址,表示该对象存在于本服务节点上。
q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS"))
}
}
}
接口服务

接口服务除了提供对象的REST接口以外还需要提供locate功能,其main函数为:

1
2
3
4
5
6
7
8
9
func main() {
// 提供locate功能
go heartbeat.ListenHeartbeat()
// 处理URL以/objects/开头的对象请求
http.HandleFunc("/objects/", objects.Handler)
// 处理URL以/locate/开头的定位请求
http.HandleFunc("/locate/", locate.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}

注意:接口服务层和数据服务层的objects包以及heartbeat、locate包虽然名字相同,但具体实现差距较大。

  • 数据服务的objects包负责对象在本地磁盘上的存取;而接口服务的objects包则负责将对象请求转发给数据服务。
  • 数据服务的heartbeat包用于发送心跳消息;而接口服务的heartbeat包则用于接收数据服务节点的心跳消息
  • 数据服务的locate包用于接收定位消息、定位对象以及发送反馈消息;而接口服务的locate包则用于发送定位消息并处理反馈消息

接口服务的heartbeat包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// map,整个包可见,用于缓存所有的数据服务节点
var dataServers = make(map[string]time.Time)
var mutex sync.Mutex

func ListenHeartbeat() {
// 创建消息队列绑定apiServers exchange
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
q.Bind("apiServers")
// 通过go channel监听每个来自数据服务节点的心跳信息
c := q.Consume()
go removeExpiredDataServer() //goroutine并行处理
for msg := range c {
//将数据服务节点的监听地址作为map的键,收到消息的时间作为值存入map中
dataServer, e := strconv.Unquote(string(msg.Body))
if e != nil {
panic(e)
}
mutex.Lock()
dataServers[dataServer] = time.Now()
mutex.Unlock()
}
}

// 每隔5s扫描一遍map,清除其中超过10s没有收到心跳消息的数据服务节点
func removeExpiredDataServer() {
for {
time.Sleep(5 * time.Second)
mutex.Lock()
for s, t := range dataServers {
if t.Add(10 * time.Second).Before(time.Now()) {
delete(dataServers, s)
}
}
mutex.Unlock()
}
}

// 遍历map并返回当前所有的数据服务节点
// 为防止多个goroutine并发读写map造成错误,map读写全部需要mutex的保护
func GetDataServers() []string {
mutex.Lock()
defer mutex.Unlock()
ds := make([]string, 0)
for s, _ := range dataServers {
ds = append(ds, s)
}
return ds
}

// 从当前所有的数据服务节点中随机选择一个节点并返回
func ChooseRandomDataServer() string {
ds := GetDataServers()
n := len(ds)
if n == 0 {
return ""
}
return ds[rand.Intn(n)]
}

接口服务的locate包:主要用于向数据服务节点群发定位消息并接收反馈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 处理HTTP请求
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 将文件名作为Locate函数的参数进行定位
info := Locate(strings.Split(r.URL.EscapedPath(), "/")[2])
// 为空,说明定位失败
if len(info) == 0 {
w.WriteHeader(http.StatusNotFound)
return
}
// 不为空,则拥有该对象的一个数据服务节点的地址,将地址作为HTTP响应的正文输出
b, _ := json.Marshal(info)
w.Write(b)
}

func Locate(name string) string {
//创建一个消息队列
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
// 向dataServers exchange群发对象名字的定位信息
q.Publish("dataServers", name)
c := q.Consume()
go func() { //1s后关闭临时消息队列
//设置超时机制,避免无止境的等待。
//1s后没有任何反馈,消息队列关闭,收到一个长度为0的消息,返回一个空字符串
time.Sleep(time.Second)
q.Close()
}()
//阻塞等待数据服务节点向自己发送反馈消息
//若在1s内有来自数据服务节点的消息,返回该消息的正文内容,也就是该数据服务节点的监听地址
msg := <-c
s, _ := strconv.Unquote(string(msg.Body))
return s
}

// 检查Locate结果是否为空字符串来判定对象是否存在
func Exist(name string) bool {
return Locate(name) != ""
}

接口服务的objects包:

接口服务的objects包跟数据服务有很大区别,其put函数和get函数并不会访问本地磁盘上的对象,而是将HTTP请求转发给数据服务。put函数负责处理对象PUT请求,其相关函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func put(w http.ResponseWriter, r *http.Request) {
// 从URL中获取objects_name
object := strings.Split(r.URL.EscapedPath(), "/")[2]
// 将r.Body和objects作为参数调用storeObject
// 第一个返回值为int类型的变量,用于表示HTTP错误码
// 第二个返回值为error,如果error不为nil,出错并打印
c, e := storeObject(r.Body, object)
if e != nil {
log.Println(e)
}
w.WriteHeader(c)
}

func storeObject(r io.Reader, object string) (int, error) {
stream, e := putStream(object)
// 没有找到可用的数据服务节点
if e != nil {
return http.StatusServiceUnavailable, e
}

// 找到可用的数服务节点并得到一个objectstream.PutStream的指针
// objectstream.PutStream实现了Write方法,是一个io.Write接口
// 用io.Copy将HTTP请求的正文写入stream
io.Copy(stream, r)
e = stream.Close()
//写入出错
if e != nil {
return http.StatusInternalServerError, e
}
//写入成功
return http.StatusOK, nil
}

func putStream(object string) (*objectstream.PutStream, error) {
// 先获得一个随机数服务节点的地址
server := heartbeat.ChooseRandomDataServer()
fmt.Println("data server =", server)
// 没有可用的数据服务节点,返回objectstream.PutStream的空指针
if server == "" {
return nil, fmt.Errorf("cannot find any dataServer")
}

return objectstream.NewPutStream(server, object), nil
}

objectstream包:对Go中的http包的一个封装,用于将一些http函数的调用转换成写流的形式,方便处理。具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

type PutStream struct {
// writer用于实现Write方法
writer *io.PipeWriter
// c用于把在一个gouroutine传输数据过程中发生的错误传回主线程
c chan error
}

// 用于生成一个PutStream结构体
func NewPutStream(server, object string) *PutStream {
// 用io.Pipe创建一对reader和writer,类型为*io.PipeReader和*io.PipeWriter
// 管道互联,写入writer的内容可以从reader中读出
// 希望以写入数据流的方法操作HTTP的PUT请求
reader, writer := io.Pipe()
c := make(chan error)
go func() {
// 生成put请求,需要提供一个io.Reader作为http.NewRequest的参数
request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader)
// http.Client负责从request中读取需要PUT的内容
client := http.Client{}
// 由于管道的读写阻塞特性,在goroutine中调用client.Do方法
r, e := client.Do(request)
if e == nil && r.StatusCode != http.StatusOK {
e = fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
c <- e
}()
return &PutStream{writer, c}
}

// 用于写入writer,实现该方法PutStream才被认为实现了io.Write接口
func (w *PutStream) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}

// 关闭writer,为了让管道另一端的reader读到io.EOF,否则在gouroutine中运行的client.Do将始终阻塞无法返回
func (w *PutStream) Close() error {
w.writer.Close()
// 从c中读取发送自goroutine得错误并返回
return <-w.c
}

接口服务层object包用于处理GET请求具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func get(w http.ResponseWriter, r *http.Request) {
object := strings.Split(r.URL.EscapedPath(), "/")[2]
// 调用getStream生成一个类型为io.Reader的stream
stream, e := getStream(object)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 调用io.Copy将stream的内容写入HTTP响应的正文
io.Copy(w, stream)
}

func getStream(object string) (io.Reader, error) {
// 定位object对象
server := locate.Locate(object)
if server == "" {
return nil, fmt.Errorf("object %s locate fail", object)
}
// 调用objectstream.NewGetStream并返回结果
return objectstream.NewGetStream(server, object)
}

objectstream包的GetStream相关实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type GetStream struct {
reader io.Reader
}

func newGetStream(url string) (*GetStream, error) {
// 输入的url表示用于获取数据流的HTTP服务地址
// 调用http.Get发起一个GET请求,获取该地址的HTTP响应
r, e := http.Get(url) //返回的r类型为*http.Response,其body是用于读取HTTP响应正文的io.Reader
if e != nil {
return nil, e
}
if r.StatusCode != http.StatusOK {
return nil, fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
return &GetStream{r.Body}, nil
}

// 封装newGetStream函数
func NewGetStream(server, object string) (*GetStream, error) {
if server == "" || object == "" {
return nil, fmt.Errorf("invalid server %s object %s", server, object)
}
// 内部拼凑一个url传给newGetStream,对外隐藏url的细节
return newGetStream("http://" + server + "/objects/" + object)
}

// 用于读取reader成员,实现该方法,则GetStream结构体实现io.Reader接口
func (r *GetStream) Read(p []byte) (n int, err error) {
return r.reader.Read(p)
}

rabbitmq包实现:

为使用RabbitMQ,需要下载RabbitMQ提供的Go语言包"github.com/streadway/amqp"

1
go get -u "github.com/streadway/amqp"

为方便使用,实现一个rabbitmq包,该包对"github.com/streadway/amqp进行封装以简化接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package rabbitmq

import (
"encoding/json"
"github.com/streadway/amqp"
)

type RabbitMQ struct {
channel *amqp.Channel
conn *amqp.Connection
Name string
exchange string
}

// 创建一个新的rabbitmq.RabbitMQ结构体
func New(s string) *RabbitMQ {
conn, e := amqp.Dial(s)
if e != nil {
panic(e)
}

ch, e := conn.Channel()
if e != nil {
panic(e)
}

q, e := ch.QueueDeclare(
"", // name
false, // durable
true, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if e != nil {
panic(e)
}

mq := new(RabbitMQ)
mq.channel = ch
mq.conn = conn
mq.Name = q.Name
return mq
}

// 将自己的消息队列和一个exchange绑定
// 所有发往该exchange的消息都能在自己的消息队列中被接收到
func (q *RabbitMQ) Bind(exchange string) {
e := q.channel.QueueBind(
q.Name, // queue name
"", // routing key
exchange, // exchange
false,
nil)
if e != nil {
panic(e)
}
q.exchange = exchange
}

// 往某个消息队列发送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
str, e := json.Marshal(body)
if e != nil {
panic(e)
}
e = q.channel.Publish("",
queue,
false,
false,
amqp.Publishing{
ReplyTo: q.Name,
Body: []byte(str),
})
if e != nil {
panic(e)
}
}

// 往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}) {
str, e := json.Marshal(body)
if e != nil {
panic(e)
}
e = q.channel.Publish(exchange,
"",
false,
false,
amqp.Publishing{
ReplyTo: q.Name,
Body: []byte(str),
})
if e != nil {
panic(e)
}
}

// 生成一个接收消息的go channel,使客户程序可以通过Go的原生机制接收队列中的消息。
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
c, e := q.channel.Consume(q.Name,
"",
true,
false,
false,
false,
nil,
)
if e != nil {
panic(e)
}
return c
}

// 关闭消息队列
func (q *RabbitMQ) Close() {
q.channel.Close()
q.conn.Close()
}

功能测试

测试环境:包括6个数据服务节点和2个接口服务节点,共8个节点。为了方便测试,8个节点其实都运行在同一台服务器上,只是绑定了8个不同的地址加以区分。
6个数据服务节点地址分别是10.29.1.1:12346、10.29.1.2:12346、10.29.1.3:12346、10.29.1.4:12346、10.29.1.5:12346、10.29.1.6:12346。2个接口服务节点地址是10.29.2.1:12346,10.29.2.2:12346

在同一台服务器上绑定多个地址命令:

1
2
3
4
5
6
7
8
sudo ifconfig eno1:1 10.29.1.1/16
sudo ifconfig eno1:2 10.29.1.2/16
sudo ifconfig eno1:3 10.29.1.3/16
sudo ifconfig eno1:4 10.29.1.4/16
sudo ifconfig eno1:5 10.29.1.5/16
sudo ifconfig eno1:6 10.29.1.6/16
sudo ifconfig eno1:7 10.29.2.1/16
sudo ifconfig eno1:8 10.29.2.2/16

eno1是这台机器的网络接口,可使用ifconfig查询得到,由于Ubuntu 16.0.4的内核支持接口别名,只需要在ifconfig 命令上使用别名接口(eno1后面加上冒号和一个数字)就可以在同一个接口上绑定多个地址。
为了让节点能够建立消息队列,还需要一台 RabbitMQ服务器(地址设置为本地即可),在其上安装rabbitmq-server:

1
2
3
4
sudo apt-get install rabbitmq-server
# 下载rabbitmqadmin管理工具
sudo rabbitmq-plugins enable rabbitmq_managements
wget localhost:15672/cli /rabbitmqadmin
1
2
3
# 创建apiServers 和 dataServers这两个exchange.
$ python3 rabbitmqadmin declare exchange name=apiServers type=fanout
$ python3 rabbitmqadmin declare exchange name=dataServers type=fanout
1
2
3
4
#添加用户test,密码test。
$ sudo rabbitmqctl add_user test test
#给test用户添加访问所有exchange的权限。
$ sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*"

消息队列服务器就绪,现在需要同时启动8个服务程序,在启动前还要记得创建相应的$STORAGE_ROOT目录及其子目录objects,接下来的操作可使用shell脚本进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#clean environment
for i in `seq 1 6`
do
rm -rf /tmp/$i/objects/*
rm -rf /tmp/$i/temp/*
done

# prepare the distributed envrionment
for i in `seq 1 6`
do
mkdir -p /tmp/$i/objects
mkdir -p /tmp/$i/temp
mkdir -p /tmp/$i/garbage
done

# rabbitmq env
# wget localhost:15672/cli/rabbitmqadmin #rabbitmq访问
python3 rabbitmqadmin declare exchange name=apiServers type=fanout
python3 rabbitmqadmin declare exchange name=dataServers type=fanout
#sudo rabbitmqctl add_user test test #首次运行需要创建用户和密码
#sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*" #修改访问权限

# start test env
export RABBITMQ_SERVER=amqp://test:test@localhost:5672
export ES_SERVER=localhost:9200

LISTEN_ADDRESS=10.29.1.1:12346 STORAGE_ROOT=/tmp/1 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.2:12346 STORAGE_ROOT=/tmp/2 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.3:12346 STORAGE_ROOT=/tmp/3 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.4:12346 STORAGE_ROOT=/tmp/4 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.5:12346 STORAGE_ROOT=/tmp/5 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.6:12346 STORAGE_ROOT=/tmp/6 go run ../dataServer/dataServer.go &

LISTEN_ADDRESS=10.29.2.1:12346 go run ../apiServer/apiServer.go &
LISTEN_ADDRESS=10.29.2.2:12346 go run ../apiServer/apiServer.go &

使用curl测试:

1
2
3
4
5
6
7
8
#!/bin/bash

curl -v 10.29.2.1:12346/objects/test2 -XPUT -d"this is object test2"

curl 10.29.2.2:12346/locate/test2
echo
curl 10.29.2.1:12346/objects/test2
echo

测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
*   Trying 10.29.2.1...
* TCP_NODELAY set
* Connected to 10.29.2.1 (10.29.2.1) port 12346 (#0)
> PUT /objects/test2 HTTP/1.1
> Host: 10.29.2.1:12346
> User-Agent: curl/7.63.0
> Accept: */*
> Content-Length: 20
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 20 out of 20 bytes
< HTTP/1.1 200 OK
< Date: Sat, 10 Apr 2021 08:11:54 GMT
< Content-Length: 0
<
* Connection #0 to host 10.29.2.1 left intact
"10.29.1.4:12346"
this is object test2

完整代码可见:可扩展分布式系统实现

参考

《分布式对象存储—原理、架构及Go语言实现》