可扩展分布式系统 分布式系统 一个分布式系统要求各节点分布在网络上,并通过消息传递来合作完成一个共同目标 。
分布式系统的三大关键特征:
节点之间并发 工作
没有全局锁
某个节点上发生的错误不影响其他节点。
分布式系统的好处在于可扩展性,只需要加入新的节点就可以自由扩展集群的性能。
实现分布式系统的关键是实现接口和具体实现的解耦。
接口服务和数据存储服务分离 如图所示,接口服务层提供了对外的REST接口 ,而数据服务层则提供数据的存储功能 。接口服务处理客户端的请求,然后向数据服务存取对象,数据服务处理来自接口服务的请求并在本地磁盘上存取对象。
接口服务和数据服务之间的两种接口:
实现对象存取 的接口:对象的存取使用REST 接口。也就是说数据服务本身也提供REST 接口。此时,接口服务节点作为HTTP客户端向数据服务请求对象 。
第二种接口通过RabbitMQ消息队列进行通信,对 RabbitMQ 的使用分为两种模式。
一种模式是向某个exchange进行一对多 的消息群发
另一种模式则是向某个消息队列进行一对一 的消息单发 。
每一个数据服务节点都需要向所有的接口服务节点通知自身的存在 ,为此,创建一个名为apiServers 的 exchange,每一台数据服务节点都会持续向这个exchange发送心跳消息。所有的接口服务节点在启动以后都会创建一个消息队列来绑定这个exchange,任何发往这个exchange 的消息都会被转发给绑定它的所有消息队列,也就是说每一个接口服务节点都会接收到任意一台数据服务节点的心跳消息。
接口服务需要在收到对象GET 请求时定位该对象被保存在哪个数据服务节点上 ,所以还需要创建一个名为dataServers的exchange。所有的数据服务节点绑定这个exchange并接收来自接口服务的定位消息。拥有该对象的数据服务节点则使用消息单发通知该接口服务节点。
之所以必须使用REST和消息队列这两种不同类型的接口是为了满足不同的需求:对象存取的特点是数据量有可能很大,不适合将一个巨大的对象通过消息队列传输。而REST接口虽然能够处理大数据量传输,但是对于群发却显得力不从心。
REST接口
对于数据服务来说,它的REST接口和单机版本完全相同,也就是对象的PUT和GET方法。
对于接口服务来说,除了对象的PUT和GET 方法之外,还应另外提供一个用于定位的 locate接口,用来帮助验证架构。 1 2 GET /locate/<object_name> 响应正文
客户端通过GET方法发起对象定位请求,接口服务节点收到该请求后通过dataServers exchange会向数据服务层群发一个定位消息,然后等待数据服务节点的反馈 。如果有数据服务节点发回确认消息,则返回该数据服务节点的地址;如果超过一定时间没有任何反馈,则返回HTTP错误代码404 NOT FOUND
.
RabbitMQ消息设计 数据服务需要通过RabbitMQ将自身的存在通知给所有的接口服务,这样的消息称为心跳消息。
apiServers和dataServers这两个exchange需要在RabbitMQ服务器上预先创建。每个接口服务节点在启动后都会创建自己的消息队列并绑定至 apiServers exchange。 每个数据服务节点在启动后每隔5s就会发送一条消息给apiServers exchange,消息的正文就是该数据服务节点的 HTTP监听地址。接口服务节点在收到该消息后就会记录这个地址。
每个数据服务节点在启动时都必须创建自己的消息队列并绑定至dataServers exchange。当接口服务需要定位时,会创建一个临时消息队列,然后发送一条消息给dataServers exchange ,消息的正文是需要定位的对象,返回地址则是该临时队列的名字。定位成功的数据服务节点需要将反馈消息发送给这个临时队列,反馈消息的正文是该数据服务节点自身的监听地址 。临时消息队列会在一定时间后关闭。如果在关闭前没有收到任何反馈则该对象定位失败,接口服务节点就会知道该对象不存在于数据服务层。
接口和存储分离的对象PUT流程
客户端向接口服务发送HTTP的PUT 请求并提供了<object_name>
和<contentof object>
,接口服务选出一个随机数据服务节点并向它转发这个PUT请求,数据服务节点将<content of object>
写入$STORAGE_ROOT/objects/<object_name>
文件。
接口和存储分离的对象GET流程
客户端的GET 请求提供了<object_name>
,接口服务在收到GET请求后会对该object进行定位,如果定位失败则返回404 Not Found
;如果定位成功,接口服务会接收到某个数据服务的地址,就可以向该地址转发来自客户端的GET请求,由数据服务读取本地磁盘上的文件并将其内容写入HTTP响应的正文。
具体实现 数据服务 为支持新的功能,数据服务的在实现上扩展了单机版的相关功能:
1 2 3 4 5 6 func main () { go heartbeat.StartHeartbeat() go locate.StartLocate() http.HandleFunc("/objects/" , objects.Handler) log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS" ), nil )) }
数据服务的heartbeat包:
1 2 3 4 5 6 7 8 9 10 11 12 func StartHeartbeat () { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER" )) defer q.Close() for { q.Publish("apiServers" , os.Getenv("LISTEN_ADDRESS" )) time.Sleep(5 * time.Second) } }
数据服务的locate包:
locate包有两个函数,分别是用于实际定位对象的Locate函数和用于监听定位消息的StartLocate函数 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func Locate (name string ) bool { _, err := os.Stat(name) return !os.IsNotExist(err) } func StartLocate () { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER" )) defer q.Close() q.Bind("dataServers" ) c := q.Consume() for msg := range c { object, e := strconv.Unquote(string (msg.Body)) if e != nil { panic (e) } if Locate(os.Getenv("STORAGE_ROOT" ) + "/objects/" + object) { q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS" )) } } }
接口服务 接口服务除了提供对象的REST接口以外还需要提供locate功能 ,其main函数为:
1 2 3 4 5 6 7 8 9 func main () { go heartbeat.ListenHeartbeat() http.HandleFunc("/objects/" , objects.Handler) http.HandleFunc("/locate/" , locate.Handler) log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS" ), nil )) }
注意:接口服务层和数据服务层的objects包以及heartbeat、locate包虽然名字相同,但具体实现差距较大。
数据服务的objects包负责对象在本地磁盘上的存取 ;而接口服务的objects包则负责将对象请求转发给数据服 务。
数据服务的heartbeat包用于发送心跳消息 ;而接口服务的heartbeat包则用于接收数据服务节点的心跳消息 。
数据服务的locate包用于接收定位消息、定位对象以及发送反馈消息 ;而接口服务的locate包则用于发送定位消息并处理反馈消息 。
接口服务的heartbeat包:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 var dataServers = make (map [string ]time.Time)var mutex sync.Mutexfunc ListenHeartbeat () { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER" )) defer q.Close() q.Bind("apiServers" ) c := q.Consume() go removeExpiredDataServer() for msg := range c { dataServer, e := strconv.Unquote(string (msg.Body)) if e != nil { panic (e) } mutex.Lock() dataServers[dataServer] = time.Now() mutex.Unlock() } } func removeExpiredDataServer () { for { time.Sleep(5 * time.Second) mutex.Lock() for s, t := range dataServers { if t.Add(10 * time.Second).Before(time.Now()) { delete (dataServers, s) } } mutex.Unlock() } } func GetDataServers () []string { mutex.Lock() defer mutex.Unlock() ds := make ([]string , 0 ) for s, _ := range dataServers { ds = append (ds, s) } return ds } func ChooseRandomDataServer () string { ds := GetDataServers() n := len (ds) if n == 0 { return "" } return ds[rand.Intn(n)] }
接口服务的locate包:主要用于向数据服务节点群发定位消息并接收反馈 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 func Handler (w http.ResponseWriter, r *http.Request) { m := r.Method if m != http.MethodGet { w.WriteHeader(http.StatusMethodNotAllowed) return } info := Locate(strings.Split(r.URL.EscapedPath(), "/" )[2 ]) if len (info) == 0 { w.WriteHeader(http.StatusNotFound) return } b, _ := json.Marshal(info) w.Write(b) } func Locate (name string ) string { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER" )) q.Publish("dataServers" , name) c := q.Consume() go func () { time.Sleep(time.Second) q.Close() }() msg := <-c s, _ := strconv.Unquote(string (msg.Body)) return s } func Exist (name string ) bool { return Locate(name) != "" }
接口服务的objects包:
接口服务的objects包跟数据服务有很大区别,其put函数和get函数并不会访问本地磁盘上的对象,而是将HTTP请求转发给数据服务。put函数负责处理对象PUT请求,其相关函数如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 func put (w http.ResponseWriter, r *http.Request) { object := strings.Split(r.URL.EscapedPath(), "/" )[2 ] c, e := storeObject(r.Body, object) if e != nil { log.Println(e) } w.WriteHeader(c) } func storeObject (r io.Reader, object string ) (int , error) { stream, e := putStream(object) if e != nil { return http.StatusServiceUnavailable, e } io.Copy(stream, r) e = stream.Close() if e != nil { return http.StatusInternalServerError, e } return http.StatusOK, nil } func putStream (object string ) (*objectstream.PutStream, error) { server := heartbeat.ChooseRandomDataServer() fmt.Println("data server =" , server) if server == "" { return nil , fmt.Errorf("cannot find any dataServer" ) } return objectstream.NewPutStream(server, object), nil }
objectstream包:对Go中的http包的一个封装,用于将一些http函数的调用转换成写流的形式,方便处理。具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 type PutStream struct { writer *io.PipeWriter c chan error } func NewPutStream (server, object string ) *PutStream { reader, writer := io.Pipe() c := make (chan error) go func () { request, _ := http.NewRequest("PUT" , "http://" +server+"/objects/" +object, reader) client := http.Client{} r, e := client.Do(request) if e == nil && r.StatusCode != http.StatusOK { e = fmt.Errorf("dataServer return http code %d" , r.StatusCode) } c <- e }() return &PutStream{writer, c} } func (w *PutStream) Write (p []byte ) (n int , err error) { return w.writer.Write(p) } func (w *PutStream) Close () error { w.writer.Close() return <-w.c }
接口服务层object包用于处理GET请求具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 func get (w http.ResponseWriter, r *http.Request) { object := strings.Split(r.URL.EscapedPath(), "/" )[2 ] stream, e := getStream(object) if e != nil { log.Println(e) w.WriteHeader(http.StatusNotFound) return } io.Copy(w, stream) } func getStream (object string ) (io.Reader, error) { server := locate.Locate(object) if server == "" { return nil , fmt.Errorf("object %s locate fail" , object) } return objectstream.NewGetStream(server, object) }
objectstream包的GetStream相关实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 type GetStream struct { reader io.Reader } func newGetStream (url string ) (*GetStream, error) { r, e := http.Get(url) if e != nil { return nil , e } if r.StatusCode != http.StatusOK { return nil , fmt.Errorf("dataServer return http code %d" , r.StatusCode) } return &GetStream{r.Body}, nil } func NewGetStream (server, object string ) (*GetStream, error) { if server == "" || object == "" { return nil , fmt.Errorf("invalid server %s object %s" , server, object) } return newGetStream("http://" + server + "/objects/" + object) } func (r *GetStream) Read (p []byte ) (n int , err error) { return r.reader.Read(p) }
rabbitmq包实现:
为使用RabbitMQ,需要下载RabbitMQ提供的Go语言包"github.com/streadway/amqp"
。
1 go get -u "github.com/streadway/amqp"
为方便使用,实现一个rabbitmq包,该包对"github.com/streadway/amqp
进行封装以简化接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 package rabbitmqimport ( "encoding/json" "github.com/streadway/amqp" ) type RabbitMQ struct { channel *amqp.Channel conn *amqp.Connection Name string exchange string } func New (s string ) *RabbitMQ { conn, e := amqp.Dial(s) if e != nil { panic (e) } ch, e := conn.Channel() if e != nil { panic (e) } q, e := ch.QueueDeclare( "" , false , true , false , false , nil , ) if e != nil { panic (e) } mq := new (RabbitMQ) mq.channel = ch mq.conn = conn mq.Name = q.Name return mq } func (q *RabbitMQ) Bind (exchange string ) { e := q.channel.QueueBind( q.Name, "" , exchange, false , nil ) if e != nil { panic (e) } q.exchange = exchange } func (q *RabbitMQ) Send (queue string , body interface {}) { str, e := json.Marshal(body) if e != nil { panic (e) } e = q.channel.Publish("" , queue, false , false , amqp.Publishing{ ReplyTo: q.Name, Body: []byte (str), }) if e != nil { panic (e) } } func (q *RabbitMQ) Publish (exchange string , body interface {}) { str, e := json.Marshal(body) if e != nil { panic (e) } e = q.channel.Publish(exchange, "" , false , false , amqp.Publishing{ ReplyTo: q.Name, Body: []byte (str), }) if e != nil { panic (e) } } func (q *RabbitMQ) Consume () <-chan amqp .Delivery { c, e := q.channel.Consume(q.Name, "" , true , false , false , false , nil , ) if e != nil { panic (e) } return c } func (q *RabbitMQ) Close () { q.channel.Close() q.conn.Close() }
功能测试 测试环境:包括6个数据服务节点和2个接口服务节点,共8个节点。为了方便测试,8个节点其实都运行在同一台服务器上,只是绑定了8个不同的地址加以区分。 6个数据服务节点地址分别是10.29.1.1:12346、10.29.1.2:12346、10.29.1.3:12346、10.29.1.4:12346、10.29.1.5:12346、10.29.1.6:12346
。2个接口服务节点地址是10.29.2.1:12346,10.29.2.2:12346
在同一台服务器上绑定多个地址命令:
1 2 3 4 5 6 7 8 sudo ifconfig eno1:1 10.29.1.1/16 sudo ifconfig eno1:2 10.29.1.2/16 sudo ifconfig eno1:3 10.29.1.3/16 sudo ifconfig eno1:4 10.29.1.4/16 sudo ifconfig eno1:5 10.29.1.5/16 sudo ifconfig eno1:6 10.29.1.6/16 sudo ifconfig eno1:7 10.29.2.1/16 sudo ifconfig eno1:8 10.29.2.2/16
eno1是这台机器的网络接口,可使用ifconfig
查询得到,由于Ubuntu 16.0.4的内核支持接口别名,只需要在ifconfig 命令上使用别名接口(eno1后面加上冒号和一个数字)就可以在同一个接口上绑定多个地址。 为了让节点能够建立消息队列,还需要一台 RabbitMQ服务器(地址设置为本地即可),在其上安装rabbitmq-server:
1 2 3 4 sudo apt-get install rabbitmq-server sudo rabbitmq-plugins enable rabbitmq_managements wget localhost:15672/cli /rabbitmqadmin
1 2 3 $ python3 rabbitmqadmin declare exchange name=apiServers type =fanout $ python3 rabbitmqadmin declare exchange name=dataServers type =fanout
1 2 3 4 $ sudo rabbitmqctl add_user test test $ sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*"
消息队列服务器就绪,现在需要同时启动8个服务程序,在启动前还要记得创建相应的$STORAGE_ROOT
目录及其子目录objects
,接下来的操作可使用shell脚本进行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 for i in `seq 1 6`do rm -rf /tmp/$i /objects/* rm -rf /tmp/$i /temp/* done for i in `seq 1 6`do mkdir -p /tmp/$i /objects mkdir -p /tmp/$i /temp mkdir -p /tmp/$i /garbage done python3 rabbitmqadmin declare exchange name=apiServers type =fanout python3 rabbitmqadmin declare exchange name=dataServers type =fanout export RABBITMQ_SERVER=amqp://test :test @localhost:5672export ES_SERVER=localhost:9200LISTEN_ADDRESS=10.29.1.1:12346 STORAGE_ROOT=/tmp/1 go run ../dataServer/dataServer.go & LISTEN_ADDRESS=10.29.1.2:12346 STORAGE_ROOT=/tmp/2 go run ../dataServer/dataServer.go & LISTEN_ADDRESS=10.29.1.3:12346 STORAGE_ROOT=/tmp/3 go run ../dataServer/dataServer.go & LISTEN_ADDRESS=10.29.1.4:12346 STORAGE_ROOT=/tmp/4 go run ../dataServer/dataServer.go & LISTEN_ADDRESS=10.29.1.5:12346 STORAGE_ROOT=/tmp/5 go run ../dataServer/dataServer.go & LISTEN_ADDRESS=10.29.1.6:12346 STORAGE_ROOT=/tmp/6 go run ../dataServer/dataServer.go & LISTEN_ADDRESS=10.29.2.1:12346 go run ../apiServer/apiServer.go & LISTEN_ADDRESS=10.29.2.2:12346 go run ../apiServer/apiServer.go &
使用curl测试:
1 2 3 4 5 6 7 8 #!/bin/bash curl -v 10.29 .2 .1 :12346 /objects/test2 -XPUT -d"this is object test2" curl 10.29 .2 .2 :12346 /locate/test2 echo curl 10.29 .2 .1 :12346 /objects/test2 echo
测试结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 * Trying 10.29.2.1... * TCP_NODELAY set * Connected to 10.29.2.1 (10.29.2.1) port 12346 ( > PUT /objects/test2 HTTP/1.1 > Host: 10.29.2.1:12346 > User-Agent: curl/7.63.0 > Accept: */* > Content-Length: 20 > Content-Type: application/x-www-form-urlencoded > * upload completely sent off: 20 out of 20 bytes < HTTP/1.1 200 OK < Date: Sat, 10 Apr 2021 08:11:54 GMT < Content-Length: 0 < * Connection "10.29.1.4:12346" this is object test2
完整代码可见:可扩展分布式系统实现
参考 《分布式对象存储—原理、架构及Go语言实现》