TCP连接耗时

开发过程中调用mysql, redis等组件获取数据或者执行rpc远程调用以及调用restful api的时候底层使用的是TCP协议。这是因为在传输层协议中,TCP协议具备可靠的连接,错误重传,拥塞控制等优点。

正常的TCP连接建立过程

在软中断中,当一个包被内核从RingBuffer中摘下来的时候,在内核中是用struct sk_buff结构体来表示的(参见内核代码include/linux/skbuff.h)。其中的data成员是接收到的数据,在协议栈逐层被处理的时候,通过修改指针指向data的不同位置,来找到每一层协议关心的数据。

网络包读取过程

对于TCP协议包来说,它的Header中有一个重要的字段:flags,也就是标志位

通过设置不同的标记为,可以将TCP包分成SYNC、FIN、ACK、RST等类型。客户端通过connect系统调用命令内核发出SYNC、ACK等包来实现和服务器TCP连接的建立。在服务器端,可能会接收许许多多的连接请求,内核还需要借助一些辅助数据结构-半连接队列和全连接队列。看一下整个连接过程:

TCP三次握手具体过程

简单分析每一步的耗时:

  • 客户端发出SYNC包:客户端一般是通过connect系统调用来发出SYN的,这里牵涉到本机的系统调用和软中断的CPU耗时开销
  • SYN传到服务器:SYN从客户端网卡被发出,开始长途远距离的网络传输
  • 服务器处理SYN包:内核通过软中断来收包,然后放到半连接队列中,然后再发出SYN/ACK响应。又是CPU耗时开销
  • SYC/ACK传到客户端:SYN/ACK从服务器端被发出后,同样进行一次长途网络跋涉
  • 客户端处理SYN/ACK:客户端内核收包并处理SYN后,经过几us的CPU处理,接着发出ACK。同样是软中断处理开销
  • ACK传到服务器:和SYN包,一样,再经过几乎同样远的路,传输一遍。 又一次长途网络跋涉
  • 服务端收到ACK:服务器端内核收到并处理ACK,然后把对应的连接从半连接队列中取出来,然后放到全连接队列中。一次软中断CPU开销
  • 服务器端用户进程唤醒:正在被accpet系统调用阻塞的用户进程被唤醒,然后从全连接队列中取出来已经建立好的连接。一次上下文切换的CPU开销

以上操作主要可以分为两类:

  • 第一类是内核消耗CPU进行接收、发送或者是处理,包括系统调用、软中断和上下文切换。它们的耗时基本都是几个us左右
  • 第二类是网络传输,当包被从一台机器上发出以后,中间要经过各式各样的网线、各种交换机路由器。所以网络传输的耗时相比本机的CPU处理,就要高的多了。根据网络远近一般在几ms~到几百ms不等。

1ms等于1000us,因此网络传输耗时比双端的CPU开销要高1000倍左右,甚至更高可能还到100000倍。所以,在正常的TCP连接的建立过程中,一般可以考虑网络延时即可。一个RTT指的是包从一台服务器到另外一台服务器的一个来回的延迟时间。所以从全局来看,TCP连接建立的网络耗时大约需要三次传输,再加上少许的双方CPU开销,总共大约比1.5倍RTT大一点点。不过从客户端视角来看,只要ACK包发出了,内核就认为连接是建立成功了。所以如果在客户端打点统计TCP连接建立耗时的话,只需要两次传输耗时-既1个RTT多一点的时间。(对于服务器端视角来看同理,从SYN包收到开始算,到收到ACK,中间也是一次RTT耗时)

TCP连接建立时的异常情况

在正常情况下一次TCP连接总的耗时也就就大约是一次网络RTT的耗时。但在某些情况下,可能会导致连接时的网络传输耗时上涨、CPU处理开销增加、甚至是连接失败。

客户端connect系统调用耗时失控

正常一个系统调用的耗时也就是几个us(微秒)左右。但当TCP客户端TIME_WAIT有30000左右,导致可用端口不是特别充足的时候,connect系统调用的CPU开销直接上涨了100多倍,会达到毫秒级别。

查看本机的端口内核参数配置

1
2
$ sudo sysctl -a | grep ip_local_port_range
net.ipv4.ip_local_port_range = 32768 60999

可以看到内核分配的端口其实就3万个左右,当端口快占满的时候(TIME_WAIT过多),端口的选择会十分耗时,且该段时间内cpu一直处于找空闲端口阶段,一直在占用CPU。

主要原因:临时端口选择过程是生成一个随机数,利用随机数在ip_local_port_range范围内取值,如果取到的值在ip_local_reserved_ports范围内 ,那就再依次取下一个值,直到不在ip_local_reserved_ports范围内为止。原来临时端口竟然是随机撞出来的。也就是说假如就有range里配置了5W个端口可以用,已经使用掉了49999个。那么新建立连接的时候,可能需要调用这个随机函数5W次才能撞到这个没用的端口身上。

解决办法
  • 保证可用临时端口的充裕,避免你的connect系统调用进入反复查找模式。正常端口充足的时候,只需要微秒几倍。但是一旦出现端口紧张,则一次系统调用耗时会上升到毫秒几倍,整整多出100倍。这个开销比正常tcp连接的建立吃掉的cpu时间(每个30usec左右)的开销要大的多。

    修改方法:

    1
    2
    # vim /etc/sysctl.conf
    net.ipv4.ip_local_port_range = 10000 65000
  • 可以考虑设置net.ipv4.tcp_tw_recycle和net.ipv4.tcp_tw_reuse这两个参数,避免端口长时间保守地等待2MSL时间。

  • 参考https://blog.csdn.net/enweitech/article/details/79261439
半/全连接队列全满

如果连接建立的过程中,任意一个队列满了,那么客户端发送过来的syn或者ack就会被丢弃。客户端等待很长一段时间无果后,然后会发出TCP Retransmission重传。

TCP握手超时重传的时间是秒级别的。也就是说一旦server端的连接队列导致连接建立不成功,那么光建立连接就至少需要秒级以上。而正常的在同机房的情况下只是不到1毫秒的事情,整整高了1000倍左右。尤其是对于给用户提供实时服务的程序来说,用户体验将会受到较大影响。如果连重传也没有握手成功的话,很可能等不及二次重试,这个用户访问直接就超时了。

更坏的情况是,它还有可能会影响其它的用户。假如使用的是进程/线程池这种模型提供服务,比如php-fpm。fpm进程是阻塞的,当它响应一个用户请求的时候,该进程是没有办法再响应其它请求的。假如开了100个进程/线程,而某一段时间内有50个进程/线程卡在和redis或者mysql服务器的握手连接上了(注意:这个时候服务器是TCP连接的客户端一方)。这一段时间内相当于可以用的正常工作的进程/线程只有50个了。而这个50个worker可能根本处理不过来,这时候服务可能就会产生拥堵。再持续稍微时间长一点的话,可能就产生雪崩了,整个服务都有可能会受影响。

解决办法

查看服务是否有因为半/全连接队列满的情况发生。在客户端,可以抓包查看是否有SYN的TCP Retransmission。如果有偶发的TCP Retransmission,那就说明对应的服务端连接队列可能有问题了。

在服务端的话,查看起来就更方便一些。netstat -s可查看到当前系统半连接队列满导致的丢包统计,但该数字记录的是总丢包数。需要再借助watch命令动态监控。如果下面的数字在监控的过程中变了,那说明当前服务器有因为半连接队列满而产生的丢包。可能需要加大半连接队列的长度。

1
$ watch 'netstat -s | grep LISTEN'

对于全连接队列来说,查看方法也类似:

1
$ watch 'netstat -s  | grep overflowed'

如果服务因为队列满产生丢包,其中一个做法就是加大半/全连接队列的长度。 半连接队列长度Linux内核中,主要受tcp_max_syn_backlog影响 加大它到一个合适的值就可以。

1
2
3
# cat /proc/sys/net/ipv4/tcp_max_syn_backlog
512
# echo "2048" > /proc/sys/net/ipv4/tcp_max_syn_backlog

全连接队列长度是应用程序调用listen时传入的backlog以及内核参数net.core.somaxconn二者之中较小的那个。可能需要同时调整应用程序和该内核参数

1
2
3
# cat /proc/sys/net/core/somaxconn
128
# echo "256" > /proc/sys/net/core/somaxconn

改完之后可以通过ss命令输出的Send-Q确认最终生效长度:

1
2
3
$ ss -nlt
Recv-Q Send-Q Local Address:Port Address:Port
0 128 *:80 *:*

Recv-Q告知当前该进程的全连接队列使用长度情况如果Recv-Q已经逼近了Send-Q,那么可能不需要等到丢包也应该准备加大全连接队列。

如果加大队列后仍然有非常偶发的队列溢出的话,可以暂且容忍。如果仍然有较长时间处理不过来怎么办?另外一个做法就是直接报错,不要让客户端超时等待。例如将Redis、Mysql等后端接口的内核参数tcp_abort_on_overflow为1。如果队列满了,直接发reset给client。告诉后端进程/线程不要痴情地傻等。这时候client会收到错误“connection reset by peer”。牺牲一个用户的访问请求,要比把整个站都搞崩了还是要强的。

实际测试

参考张彦飞的博客.可以得出的结论主要有:

TCP连接建立异常情况下,可能需要好几秒,一个坏处就是会影响用户体验,甚至导致当前用户访问超时都有可能。另外一个坏处是可能会诱发雪崩。所以当服务器使用短连接的方式访问数据的时候,一定要学会要监控服务器的连接建立是否有异常状态发生。如果有,学会优化掉它。当然可以采用本机内存缓存,或者使用连接池来保持长连接,通过这两种方式直接避免掉TCP握手挥手的各种开销也可以。

当连接队列溢出时需要特别注意:一旦发生队列满,当时撞上的那个连接请求就得需要长时间的连接建立延时。

正常情况下,TCP建立的延时大约就是两台机器之间的一个RTT耗时,这是避免不了的。但是可以控制两台机器之间的物理距离来降低这个RTT,比如把要访问的redis尽可能地部署的离后端接口机器近一点,这样RTT也能从几十ms削减到最低可能零点几ms。

线上部署时,理想的方案是将自己服务依赖的各种mysql、redis等服务和自己部署在同一个地区、同一个机房(再变态一点,甚至可以是甚至是同一个机架)。因为这样包括TCP链接建立啥的各种网络包传输都要快很多。要尽可能避免长途跨地区机房的调用情况出现。

参考

https://mp.weixin.qq.com/s?__biz=MjM5Njg5NDgwNA==&mid=2247484126&idx=1&sn=4c35ea42477ffd5db5f05fe8bc850cdb&chksm=a6e303e591948af3cc31b1db349958c8a879b9eb7100ebd138dc361aaf04ba5878f602a16365&cur_album_id=1532487451997454337&scene=189#rd

https://mp.weixin.qq.com/s/tH8RFmjrveOmgLvk9hmrkw

https://blog.csdn.net/enweitech/article/details/79261439

RabbitMQ简介

Message Broker(消息代理)

RabbitMQ 官网对 Message broker 的定义:Message broker 接收来自发布者的消息并将其路由到消费者。

RabbitMQ 实现了一个加 AMQP(Advanced Message Queuing Protocol)的协议,AMQP 就如互联网的 HTTP 协议,它更注重于如何传输数据,并不关心发送的数据是什么。这也就意味着需要消息的发送者和接收者来协调消息的格式。

Message Broker的作用

有这样一个应用,客户端需要与服务器进行通信,传递数据。最简单的情况就是客户端通过 HTTP 类协议直接与服务器连接,并发送数据。看似非常简单,但如果服务器因为维护或其它原因发生了停机,或者想对其横向扩展,添加更多的服务器来进行响应。那么问题就来了,客户端目前是与这台服务器紧密的耦合在一起了,而随着系统的增长和进化,这种紧耦合就开始让人头疼。

此时,如果在客户端和服务器之间放置一个Message broker则不一样:

在客户端和服务器之间添加Message broker

客户端首先将数据发送给 Message broker。Message broker 会对数据进行检验,并将其发送给服务器。但是在这种简单的情景下,似乎没带来什么好处。然而,Message broker 可以通过简单的设置来允许多种场景,从而让后端服务器的横向扩展或停机维护等工作变得轻松,并且客户端并不会感知到任何的变化。

Exchange和Queue

在最简单的场景下,RabbitMQ 的架构示意图大致如下:

Rabbitmq架构

  • 首先某个消息从发布者那里发往 RabbitMQ
  • 这个消息需要声明一个 Exchange(交换机),并被发往这个 Exchange
    • Exchange 有点类似“暂存区”,消息都会发往 Exchange。用个类比来说:Exchange 就像邮箱一样,写的信件首先都要放到邮箱里才能进行发送。
  • 然后,Exchange 将使用消息内的一些信息以及它自己的配置来决定一条或多条发送消息的路由。
  • 这些路由都通向一个 Queue(队列),消息会存储在这个 Queue 里,等待消息的接收者来进行使用。
  • 一个消息的接收者可以使用 Queue 中的信息。一旦确认这个消息被传递成功,那么它将从 Queue 中被删除。
    • RabbitMQ 所提供的松耦合的特性,主要是因为 Exchange 和 Queue 的分离
    • 继续使用邮箱的类比,Queue 就相当于是接收信件的邮箱。而根据邮件地址,邮件系统会选择不同的邮箱来接收邮件。

RabbitMQ的4种Exchange:

  • Direct Exchange。是默认的 Exchange ,会把消息发送到一个接收者。如果注册了多个接收者来监听同样的路由 Key,那么 RabbitMQ 将会向每个 Queue 轮流发送一条消息,相当于提供了一个简单的负载均衡

  • Fanout Exchange。 把消息的副本发送到每个绑定到该 Exchange 的 Queue 上面。而这里的 Queue 没有办法对消息进行过滤,如果需要过滤,则需要在消息接收者那里实现。

  • Topic Exchange。和 Direct Exchange 类似,但不同的是:每个消息接收者监听特定的路由 Key,它们会收到消息的副本。

    • 例如聊天室就可以使用 Topic Exchange。每个聊天室的 ID 可以作为路由 Key,这样就可以保证消息只会发送给同一个聊天室的其他参与者。
  • Headers Exchange。这类 Exchange 会忽略路由 Key,取而代之的是,它们会查看消息的 Header,并由此来决定消息应该发往哪个 Queue。Queue 可以有一个或多个 Header 用来进行匹配。这也就开启了复杂的路由场景,例如某个 Queue 有时可以接收到某类消息而有时则不行。

Fanout Exchange

当消息被发往 RabbitMQ 的时候,需要指明它需要发送到哪个 Exchange。而这个 Exchange 就可以被设置成为所谓的 Fanout Exchange。使用 Fanout Exchange,消息会被克隆,并被发送到所有与这个 Exchange 绑定的 Queue 上.

fanout exchange

这里每一个 Queue 都会得到属于自己的消息的副本,这些消息副本就可以被消息的接收者所使用。

  • 在很多大规模多人游戏的场景中,经常使用这种方式来同步玩家的数据:每个玩家都订阅到一个 Fanout Exchange,游戏的实例只需要将数据发送到一个地方即可,游戏中其他的玩家就会获得更新,而游戏实例就不需要知道如何数据发往每一个玩家了。

基本管理命令

基本安装和配置参考RabbitMQ官网。

  • 获取RabbitMQ Server状态: rabbitmqctl status 得到大量关于RabbitMQ运行的消息
  • 获取所有Queue队列: rabbitmqctl list_queues
  • 获取当前集群的server信息: rabbitmqctl cluster_status
  • 使用service rabbitmq-server start/stop/restart/status也可以进行服务管理

参考

https://www.bilibili.com/read/cv10512424?from=search

linux内核接收网络包的过程

linux内核对网络包的接收过程大致分为:硬中断处理、接收数据到RingBuffer、ksoftirqd软中断处理几个过程。在软中断处理中,将数据包从RingBuffer中获取送到协议栈进行处理,之后再送到用户进程socket的接收队列中。

几个工具

监控网卡的工具:

ethtool

该工具用来查看和设置网卡参数。这个工具其实本身只是提供几个通用接口,真正的实现是都是在网卡驱动中。几个选项:

  • -i 显示网卡驱动的信息,如驱动的名称、版本等
  • -S 查看网卡收发包的统计情况
  • -g/-G 查看或者修改RingBuffer的大小
  • -l/-L 查看或者修改网卡队列
  • -c/-C 查看或者修改硬中断合并策略
1
2
3
4
5
6
# 查看网卡驱动:
$ethtool -i eno1

driver: e1000e
version: 3.2.6-k
firmware-version: 0.13-4

ifconfig

网络管理工具ifconfig不只是可以为网卡配置ip,启动或者禁用网卡,也包含了一些网卡的统计信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
eno1      Link encap:以太网  硬件地址 98:90:96:a5:4c:a7  
inet 地址:172.27.229.18 广播:172.27.255.255 掩码:255.255.0.0
inet6 地址: fe80::aea1:731a:3687:7f4a/64 Scope:Link
UP BROADCAST RUNNING MULTICAST MTU:1500 跃点数:1
接收数据包:10938370 错误:0 丢弃:0 过载:0 帧数:0
发送数据包:1479049 错误:0 丢弃:0 过载:0 载波:0
碰撞:0 发送队列长度:1000
接收字节:2793457160 (2.7 GB) 发送字节:187860463 (187.8 MB)
中断:20 Memory:f7d00000-f7d20000

中英文对应:
RX packets:接收的总包数
RX bytes:接收的字节数
RX errors:表示总的收包的错误数量
RX dropped:数据包已经进入了 Ring Buffer,但是由于其它原因导致的丢包
RX overruns:表示了 fifo 的 overruns,这是由于 Ring Buffer不足导致的丢包

伪文件系统/proc

Linux内核提供了/proc伪文件系统,通过/proc可以查看内核内部数据结构、改变内核设置

主要内容:

  • /proc/sys目录可以查看或修改内核参数
  • /proc/cpuinfo可以查看CPU信息
  • /proc/meminfo可以查看内存信息
  • /proc/interrupts统计所有的硬中断
  • /proc/softirqs统计的所有的软中断信息
  • /proc/slabinfo统计了内核数据结构的slab内存使用情况
  • /proc/net/dev可以看到一些网卡统计数据

关注伪文件 /proc/net/dev查看内核中对网卡的相关统计,包含的主要信息:

  • bytes: 发送或接收的数据的总字节
  • packets: 接口发送或接收的数据总数
  • errs: 由设备驱动程序检测到的发送或接收错误的总数
  • drop: 设备驱动程序丢弃的数据包总数
  • fifo: FIFO缓冲区错误的数量
  • frame: The number of packet framing errors.(分组帧错误的数量)
  • colls: 接口上检测到的冲突数

伪文件系统sysfs

sysfs/proc类似,也是一个伪文件系统,但是比proc更新,结构更清晰。其中的/sys/class/net/eno1/statistics/也包含了网卡的统计信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
# cd /sys/class/net/eno1/statistics/ 
# grep . * | grep tx

tx_aborted_errors:0
tx_bytes:193280753
tx_carrier_errors:0
tx_compressed:0
tx_dropped:0
tx_errors:0
tx_fifo_errors:0
tx_heartbeat_errors:0
tx_packets:1513872
tx_window_errors:0

RingBuffer监控和调优

当网线中的数据帧到达网卡后,第一站就是RingBuffer(网卡通过DMA机制将数据帧送到RingBuffer中)。

使用ethtool查看RingBuffer:

1
2
3
4
5
6
7
8
9
10
11
12
13
# ethtool -g eno1

Ring parameters for eno1:
Pre-set maximums:
RX: 4096
RX Mini: 0
RX Jumbo: 0
TX: 4096
Current hardware settings:
RX: 256
RX Mini: 0
RX Jumbo: 0
TX: 256

本机器的网卡设置RingBuffer最大允许到4096,实际设置为256。

注意:ethtool查看到的是实际是Rx bd的大小。Rx bd位于网卡中,相当于一个指针。RingBuffer在内存中,Rx bd指向RingBufferRx bdRingBuffer中的元素是一一对应的关系。在网卡启动的时候,内核会为网卡的Rx bd在内存中分配RingBuffer,并设置好对应关系。

在Linux的整个网络栈中,RingBuffer起到一个任务的收发中转站的角色。对于接收过程来讲,网卡负责往RingBuffer中写入收到的数据帧,ksoftirqd内核线程负责从中取走处理。只要ksoftirqd线程工作的足够快,RingBuffer这个中转站就不会出现问题。但是设想一下,假如某一时刻,瞬间来了特别多的包,而ksoftirqd处理不过来了,会发生什么?这时RingBuffer可能瞬间就被填满了,后面再来的包网卡直接就会丢弃,不做任何处理!

查看机器上是否有因为RingBuffer设置导致的丢包:

1
2
3
4
# ethtool -S eno1

rx_fifo_errors: 0 #有的机器上不一定有
tx_fifo_errors: 0

rx_fifo_errors如果不为0的话(在 ifconfig中体现为overruns 指标增长),就表示有包因为RingBuffer装不下而被丢弃了。那么怎么解决这个问题呢?很自然首先想到的是,加大RingBuffer这个“中转仓库”的大小。通过ethtool就可以修改:

1
# ethtool -G eth1 rx 512 tx 512

这样网卡会被分配更大一点的”中转站“,可以解决偶发的瞬时的丢包。不过这种方法有个小副作用,那就是排队的包过多会增加处理网络包的延时。所以另外一种解决思路更好,那就是让内核处理网络包的速度更快一些,而不是让网络包傻傻地在RingBuffer中排队。怎么加快内核消费RingBuffer中任务的速度?

硬中断监控与调优

监控部分

硬中断可以通过内核提供的伪文件/proc/interrupts来查看:

1
2
3
4
$ cat /proc/interrupts

CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 CPU6 CPU7
31: 0 0 0 0 0 536 0 4507261 IR-PCI-MSI 409600-edge eno1

分析:

  • 网卡的输入队列eno1的中断号是31
  • 31号中断由CPU5和CPU7来处理

注意:

  • 为什么输入队列的中断在CPU5和CPU7上?

这是因为内核的一个配置,在伪文件系统中可以查看到:

1
2
#cat /proc/irq/31/smp_affinity
80

smp_affinity里是CPU的亲和性的绑定,80是二进制的01010000,第5位和第7位都为1,代表的就是第5和第7个CPU核心CPU5和CPU7。

  • 对于收包来过程来讲,硬中断的总次数表示的是Linux收包总数吗?

不是,硬件中断次数不代表总的网络包数。第一网卡可以设置中断合并,多个网络帧可以只发起一次中断。第二NAPI 运行的时候会关闭硬中断,通过poll来收包。

多队列网络调优

现在的主流网卡基本上都是支持多队列的,可以通过将不同的队列分给不同的CPU核心来处理,从而加快Linux内核处理网络包的速度。这是最为有用的一个优化手段

每一个队列都有一个中断号,可以独立向某个CPU核心发起硬中断请求,让CPU来poll包。通过将接收进来的包被放到不同的内存队列里,多个CPU就可以同时分别向不同的队列发起消费了。这个特性叫做RSS(Receive Side Scaling,接收端扩展)。通过ethtool工具可以查看网卡的队列情况。

1
2
3
4
5
6
7
8
9
10
11
12
# ethtool -l eth0 # 本人网卡不支持该操作,参考开发内功修炼
Channel parameters for eth0:
Pre-set maximums:
RX: 0
TX: 0
Other: 1
Combined: 63
Current hardware settings:
RX: 0
TX: 0
Other: 1
Combined: 8

上述结果表示当前网卡支持的最大队列数是63,当前开启的队列数是8。对于这个配置来讲,最多同时可以有8个核心来参与网络收包。如果想提高内核收包的能力,直接简单加大队列数就可以了,这比加大RingBuffer更为有用。因为加大RingBuffer只是给个更大的空间让网络帧能继续排队,而加大队列数则能让包更早地被内核处理ethtool修改队列数量方法如下:

1
#ethtool -L eth0 combined 32

硬中断发生在哪一个核上,它发出的软中断就由哪个核来处理。所有通过加大网卡队列数,这样硬中断工作、软中断工作都会有更多的核心参与进来。

每一个队列都有一个中断号,每一个中断号都是绑定在特定的CPU上。如果不满意某一个中断的CPU绑定,可以通过修改/proc/irq/{中断号}/smp_affinity来实现。

硬中断合并

一个实际中的例子,假如你是一位开发同学,和你对口的产品经理一天有10个小需求需要让你帮忙来处理。她对你有两种中断方式:

  • 第一种:产品经理想到一个需求,就过来找你,和你描述需求细节,然后让你帮你来改
  • 第二种:产品经理想到需求后,不来打扰你,等攒够5个来找你一次,你集中处理

现在不考虑及时性,只考虑工作整体效率,哪种方案下你的工作效率会高呢?或者换句话说,你更喜欢哪一种工作状态呢?很明显,只要你是一个正常的开发,都会觉得第二种方案更好。对人脑来讲,频繁的中断会打乱你的计划,你脑子里刚才刚想到一半技术方案可能也就废了。当产品经理走了以后,你再想捡起来刚被中断之的工作的时候,很可能得花点时间回忆一会儿才能继续工作。

对于CPU来讲也是一样,CPU要做一件新的事情之前,要加载该进程的地址空间,load进程代码,读取进程数据,各级别cache要慢慢热身。因此如果能适当降低中断的频率,多攒几个包一起发出中断,对提升CPU的工作效率是有帮助的。所以,网卡允许我们对硬中断进行合并。

现在看一下网卡的硬中断合并配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
# ethtool -c eno1

Adaptive RX: off TX: off
...
rx-usecs: 3
rx-frames: 0
rx-usecs-irq: 0
rx-frames-irq: 0
rx-usecs-low: 0
rx-frame-low: 0
rx-usecs-high: 0
rx-frame-high: 0
...

参数的含义:

  • Adaptive RX: 自适应中断合并,网卡驱动自己判断啥时候该合并啥时候不合并
  • rx-usecs:当过这么长时间过后,一个RX interrupt就会被产生
  • rx-frames:当累计接收到这么多个帧后,一个RX interrupt就会被产生

修改相关参数:接使用ethtool -C就可以

1
# ethtool -C eno1 adaptive-rx on

需要注意的是,减少中断数量虽然能使得Linux整体吞吐更高,不过一些包的延迟也会增大,所以用的时候得适当注意。

软中断监控和调优

软中断和它对应的硬中断是在同一个核心上处理的。因此,前面硬中断分散到多核上处理的时候,软中断的优化其实也就跟着做了,也会被多核处理。不过软中断也还有自己的可优化选项。

监控

软中断的信息可以从/proc/softirqs读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
$ cat /proc/softirqs

CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 CPU6 CPU7
HI: 0 0 1228152 865 360 7 0 0
TIMER: 27307279 27853788 27665421 31261338 27720654 27228264 27604399 29304702
NET_TX: 10611 21 33099 1480 103 39090 43 56
NET_RX: 936563 934954 932782 903383 896445 940357 1111527 11896810
BLOCK: 293 1850485 65944 395 365 332 354 333
IRQ_POLL: 0 0 0 0 0 0 0 0
TASKLET: 18 4 104755 75 25 84 748 8691
SCHED: 15801149 13822836 13336518 16738535 13374504 13423444 13224695 14819090
HRTIMER: 0 0 0 0 0 0 0 0
RCU: 23189803 22923131 23217378 25381785 22908760 22872174 23143036 24117847
软中断budget调整

番茄工作法:大致意思就是要有一整段的不被打扰的时间,集中精力处理某一项作业。这一整段时间时长被建议是25分钟。对于Linux的处理软中断的ksoftirqd来说,它也和番茄工作法思路类似。一旦它被硬中断触发开始了工作,它会集中精力处理一波网络包(绝不只是1个),然后再去做别的事情。

处理一波是多少呢,策略略复杂。只说其中一个比较容易理解的,那就是net.core.netdev_budget内核参数:

1
2
3
# sysctl -a | grep core.netdev_budget 

net.core.netdev_budget = 300

这里的意思说的是,ksoftirqd一次最多处理300个包,处理够了就会把CPU主动让出来,以便Linux上其它的任务可以得到处理。那么假如说,现在想提高内核处理网络包的效率。那就可以让ksoftirqd进程多干一会儿网络包的接收,再让出CPU。至于怎么提高,直接修改不这个参数的值就行。

1
# sysctl -w net.core.netdev_budget=600

如果要保证重启仍然生效,需要将这个配置写到/etc/sysctl.conf

软中断GRO合并

GRO和硬中断合并的思想很类似,不过阶段不同。硬中断合并是在中断发起之前,而GRO已经到了软中断上下文中了。

如果应用中是大文件的传输,大部分包都是一段数据,不用GRO的话,会每次都将一个小包传送到协议栈(IP接收函数、TCP接收)函数中进行处理。开启GRO的话,Linux就会智能进行包的合并,之后将一个大包传给协议处理函数。这样CPU的效率也是就提高了。

1
2
# ethtool -k eno1 | grep generic-receive-offload
generic-receive-offload: on

如果网卡驱动没有打开GRO的话,可以通过如下方式打开。

1
# ethtool -K eno1  gro on

GRO说的仅仅只是包的接收阶段的优化方式,对于发送来说是GSO。

参考

https://mp.weixin.qq.com/s?__biz=MjM5Njg5NDgwNA==&mid=2247484065&idx=1&sn=ab0d3e11c472b845dedf6a87dcc38b25&chksm=a6e3039a91948a8c23ec7d426469ebe30d70f59c7a58437e9ea8b8aac697aa1e2e29b4c47dec&scene=178&cur_album_id=1532487451997454337#rd

元数据服务

上一个版本的分布式对象存储实现了接口服务和数据服务的分离,对象的数据被保存在了专门的数据服务节点,而不是保存在接口服务的本地磁盘上。通过解耦后,可以往集群中添加新的接口服务结点或数据服务节点。

但也存在问题:

  • 如果多次PUT同一个对象,会发现该对象在所有的数据服务节点上都有一份副本。这是由于在每次PUT的时候都是随机选择一个数据服务节点,只要PUT次数足够多,那么所有的节点必然都会被选中一次,结果就是每个节点上都保存着这个对象的数据。为了解决这个问题,对象存储系统提出了一个十分重要的概念,叫作数据去重。
    • 最简单的办法似乎式只需要在每次PUT之前先定位一下,确保该对象不存在之后再PUT就好了。然而问题并没有那么简单,更复杂的情况是,两个名字不同的对象有可能内容相同。这样的对象也属于需要去重的范畴。这是怎么回事呢?一个对外提供服务的对象存储系统不可能只有一个用户,而是会有很多用户一起使用,这些用户上传的对象可能存在大量的重复数据。为了节省存储空间,对象存储服务通常都会尽量让数据相同的对象共享系统中的同一份数据存储
  • 另一个问题则是数据的不一致。假设多次PUT同一个对象,且内容不同,这个对象的不同版本会被随机保存在不同的数据服务节点上。当GET它时就会随机取得不同版本的对象,这不仅破坏了对象数据的一致性,也破坏了GET方法的幂等性( 对同一个系统,使用同样的条件,一次请求和重复的多次请求对系统资源的影响是一致的)。
    • 该问题似乎可以通过每次PUT之前先进行定位,如果该对象不存在则随机选择,如果存在则选择相对应的数服务节点。这时候如果客户不要求版本控制,系统的行为表现为用最新的版本覆盖上一个版本。但如果客户希望保存某个对象的所有版本,这时候用户上传的某个对象的所有版本都需要被保存起来。比如说,当用户第一次上传一个对象时,它的初始版本为1:当用户使用PUT方法改变了该对象的内容,那么新对象的版本为2,依次递增。新的版本会覆盖旧的版本,但是旧版本的对象不会被删除。在下载对象时,用户可以指定GET对象的任意一个版本。 为了实现版本控制,需要一个数据库来记录系统中所有对象的所有版本。这个数据库就是元数据服务。

元数据

元数据指的是对象的描述信息。为了和对象的数据本身区分开,赋予了元数据这个名称。对象的哪些信息可以称作元数据?举例来说,有对象的名字、版本、大小以及散列值等。这些都是系统定义的元数据,因为它们的存在对一个对象存储系统有实际意义,比如说客户端和接口服务之间根据对象的名字来引用一个对象:一个对象可以有多个版本,除了删除标记外,每个版本实际都指向数据服务节点上的一份数据存储。

用户自定义的元数据:除了系统定义的元数据以外,用户也可以为这个对象添加自定义的元数据。这些元数据通常以键值对形式保存的任意描述信息,比如一张照片的拍摄时间和拍摄地点,一首歌的作者和演唱者等。对象存储系统不关心这些元数据,但是用户需要将它们添加到对象存储系统中,作为该对象的元数据进行保存

散列值和散列函数

对象的散列值是一种非常特殊的元数据,因为对象存储通常将对象的散列值作为其全局唯一的标识符。在此前,数据服务节点上的对象都是用名字来引用的,如果两个对象名字不同,那么无法知道它们的内容是否相同。如此则无法实现针对不同对象的去重。现在,以对象的散列值作为标识符,就可以将接口服务层访问的对象和数据服务存取的对象数据解耦合客户端和接口服务通过对象的名字来引用一个对象,而实际则是通过其散列值来引用存储在数据节点上的对象数据,只要散列值相同则可以认为对象的数据相同,这样就可以实现名字不同但数据相同的对象之间的去重。

对象的散列值是通过散列函数计算出来,散列函数会将对象的数据进行重复多轮的数学运算,这些运算操作包括按位与、按位或、按位异或等,最后计算出来一个长度固定的数字,作为对象的散列值。一个理想的散列函数具有以下5个特征。

  • 操作具有决定性,同样的数据必定计算出同样的散列值
  • 无论计算任何数据都很快。
  • 无法根据散列值倒推数据,只能遍历尝试所有可能的数据。
  • 数据上微小的变化就会导致散列值的巨大改变,新散列值和旧散列值不具有相关性。
  • 无法找到两个能产生相同散列值的不同数据。

实际情况无法满足所有要求,一个散列函数hash的安全级别根据3种属性决定:

  • 抗原像攻击:给定一个散列值h,难以找到一个数据m令 h=hash(m)。这个属性称为函数的单向性。欠缺单向性的散列函数易受到原像攻击。
  • 抗第二原像攻击:给定一个数据m1,难以找到第二个数据m2令hash(m1)=hash(m2)。欠缺该属性的散列函数易受到第二原像攻击。
  • 抗碰撞性:难以找到两个不同的数据m1和m2令hash(m1)=hash(m2)。这样的一对数据被称为散列碰撞。

本项目使用的散列函数为SHA-256,该函数使用64位的数学运算,产生一个长度为256位的二进制数字作为散列值。

分布式对象存储加入元数据服务

元数据服务就是提供对元数据的存取功能的服务。本项目中实现的元数据服务较为简单,它将只保存系统定义的元数据,也就是对象的名字、版本、大小和散列值,因为这些直接影响到存储功能

和上一版本的架构对,加入元数据服务的架构其他组件不变,而多了一个ElasticSearch (以下简称ES),也就是项目种选择的元数据服务。需要说明的是能做元数据服务的并不只有ES一种,任何一个分布式数据库都可以做元数据服务。选择ES的原因是它足够好且实现方便。和RabbitMQ一样,ES本身也支持集群,但是在本书的测试环境中只使用了一个服务节点。
ES使用的也是REST 接口接口服务节点作为客户端通过HTTP访问ES的索引(index)。ES 的索引就相当于一个数据库,而类型(type)则相当于数据库里的一张表。项目种会创建一个名为metadata的索引,其中有一个名为objects 的类型。

加入元数据服务的分布式存储架构

REST接口

有了元数据服务之后,就可以给接口服务增加新的功能,首先是给对象的GET方法增加一个参数version:

1
GET /objects/<object_name>?version=<version_id>

响应正文:

  • 对象的数据:这个参数可以告诉接口服务客户端需要的是该对象的第几个版本,默认是最新的那个
1
PUT /objects/<object_name>

请求头部(Request Header)

  • Digest: SHA-256=<对象散列值的Base64编码>
  • Content-Length:<对象数据的长度>

请求正文

  • 对象的内容如下:
  • PUT方法没变,但是每次客户端PUT一个对象时,必须提供一个名为Digest 的HTTP请求头部,它记录了用SHA-256散列函数计算出来的对象散列值。

HTTP头部分为请求头部(Request Header)和响应头部(Response Header),它允许客户端和服务器在HTTP的请求和响应中交换额外的信息。一个头部由3个部分组成:一个大小写不敏感的名字,后面跟着一个冒号“:”,然后是该头部的值。注意头部的值不能包含回车。

Digest头部的名字是 Digest,后面跟着一个冒号,然后是 Digest头部的值,也就是”SHA-256=<对象散列值的Base64编码>”。SHA-256是要求使用的散列函数,根据RFC3230的要求,客户端需要在Digest头部提供计算散列值时使用的散列函数,如果服务器发现客户端使用的散列函数跟服务器使用的散列函数不一致则会拒绝整个请求SHA-256计算出的散列值是一个256位的二进制数字,客户端还需要对其进行Base64编码,将数字转化成ASCII字符串格式,以确保不包含回车的二进制数字。

Base64编码规则选定了64个不同的字符,分别代表1个6位的二进制数字。对一个256位的二进制数字进行编码,首先要将其切成11个24位的二进制数字(不足的位在最后一个数字用0补齐),然后每个数字正好用4个Base64字符来表示。

经过Base64编码后的散列值将作为该对象的全局唯一标识符,也是数据服务节点储存的对象名。也就是说,只要对象内容发生了变化,那么原来在数据服务节点上储存的数据不会被更新,而是会储存一个新的对象。

除了Digest头部以外,客户端还必须提供一个名为Content-Length 的HTTP请求头部用来告诉服务端该对象数据的长度。客户端提供的对象散列值和长度会作为元数据被保存在元数据服务中。

将数据服务层存取的对象名和接口服务层访问的对象名区分开对于去重来说至关重要。现在,无论接口服务层收到的对象名是什么,只要从数据服务层角度看到的对象名一致,就可以认为是对象的内容一致,去重就只需要简单地根据数据服务层的对象名来实现就可以

PUT成功后,在元数据服务中该对象就会添加一个新的版本,版本号从1开始递增。

除了对象的GET 和 PUT方法发生了变化以外,还可以添加新的功能,首先是对象的DELETE方法。

1
DELETE /objects/<object_name>

使用DELETE方法来删除一个对象。

在此之前都没有实现对象的删除功能,这是有原因的。对象存储的去重会让名字不同的对象共享同一份数据存储,而删除一个对象意味着要将该对象和数据存储之间的联系断开。在把对象的名字和对象的数据存储解耦合之前,无法做到在删除一个对象的同时保留对象的数据存储。 有了元数据服务,在删除一个对象时,只需要在元数据中给对象添加一个表示删除的特殊版本,而在数据节点上保留其数据。

在GET时,如果该对象的最新版本是一个删除标记,则返回404 Not Found。除了对象的删除功能之外,还需要提供对象的列表功能,用于查询所有对象或指定对象的所有版本。

1
GET /versions/
  • 响应正文
    指定对象的所有版本:客户端GET某个指定对象的版本列表,接口服务节点返回该对象的所有版本。HTTP响应内容结构同上。
ES接口

元数据服务的索引使用映射(mappings)结构:

1
2
3
4
5
6
7
8
9
10
11
12
{
mappings":{
"objects":{
"properties":{
"name" : {"type" : "string" , "index" :"not analyzed"},
"version" :{"type": "integer"},
"size":{"type"": ""integer"},
"hash" : {"type : "string" },
}
}
}
}

ES的索引相当于数据库而类型相当于数据库的表,那么现在这个映射则相当于定义表结构。这个映射会在创建metadata索引时作为参数一并被引入,该索引只有一个类型就是objects,其中包括4个属性分别是 name、version、size和hash,相当于数据库表的4个列。

name属性有个额外的要求”index”:”not_analyzed”,这是为了在搜索时能够精确匹配name默认的 analyzed index 会对name进行分词匹配。这有可能导致不相关的匹配结果。比如我们有一个元数据的name是“little cat”,如果使用analyzed index,那么它会被分成little和cat两个词,之后任何包含little或cat的搜索都会导致“little cat”被选中。

添加对象元数据

当客户端PUT或DELETE对象时,都需要往元数据服务添加新版本,处理步骤如图:

往元数据服务添加新版本

上图显示了往元数据服务添加新版本的流程,当接口服务需要给某个对象添加一个新版本时,首先会去查询该对象当前最新版本的元数据,如果该对象不存在,则新版本从1开始:否则新版本为当前最新版本加1,然后将其添加进元数据服务。

GET对象时分两种情况,如果没有指定版本号,同样需要搜索对象最新版本的元数据;如果指定了版本号,可以根据对象的名字和版本号直接获取对象指定版本的元数据。

用到的ES API

要想荻取对象当前最新版本的元数据需要使用ES搜索API.

1
GET /metadata/_search?q=name:<object_name>&size=1&sort=version:desc

给对象添加一个新版本需要使用ES索引API:

1
PUT /metadata/objects/<object_name>_<version>?op_type=create

在这里,特地将<object_name>_<version>作为_id创建。这是为了当客户端指定版本GET对象时可以直接根据对象名和版本号拼出相对应的_id来从ES中获取元数据,从而免除了搜索的步骤。

使用op_type=create可以确保当多个客户端同时上传同一个对象时不至于发生数据丢失,因为只有第一个请求能成功上传给ES。其他请求会收到HTTP错误代码409 Conflict,这样接口服务节点就能知道发生了版本冲突并重新上传。

当客户端GET对象时分两种情况,如果没有指定版本号,则使用和之前同样的ES搜索API来获取对象的最新版本。

如果客户端指定版本号GET对象,则使用ES Get API直接获取对象指定版本的元数据。

1
GET /metadata/objects/<object_name>_<version_id>/_source

当客户端GET全体对象版本列表时,使用ES搜索API方法如下:

1
GET /metadata/_search?sort=name,version&from=<from>&size=<size>

其中,from和 size用于分页显示。在不指定from和 size的情况下,ES 默认的分页是从0开始显示10条。

当客户端GET指定对象版本列表时,使用ES 搜索API方法如下:

1
GET /metadata/_search?sort=name,versions&from=<from>&size=<size>&q=name:<object_name>

这里多了一个q参数用于指定name。

对象PUT流程

加入元数据服务的对象put流程

客户端的HTTP请求提供了对象的名字、散列值和大小,接口服务以散列值作为数据服务的对象名来保存对象,然后在元数据服务中根据对象的名字搜索当前最新的元数据,使其版本号加1并添加一个新版本的元数据。

对象GET流程

加入元数据服务的对象get流程

客户端在HTTP请求中指定对象的名字,可在 URL的查询参数中指定版本号。如果指定版本号,则接口服务根据对象的名字和版本号获取元数据;否则根据对象的名字搜索最新元数据。然后从元数据中获得对象的散列值作为数据服务的对象名来读取对象。

具体实现

主要关注于与上一版本有变化的部分(主要在接口服务种实现元数据的互动)进行说明。

接口服务
接口服务的main函数
1
2
3
4
5
6
7
func main() {
go heartbeat.ListenHeartbeat()
http.HandleFunc("/objects/", objects.Handler)
http.HandleFunc("/locate/", locate.Handler)
http.HandleFunc("/versions/", versions.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}

本版本的接口服务main函数多了一个用于处理/vesions/的函数,名字为versions.Handler

接口服务的versions

主要工作为调用es包的函数完成相关工作:

versions.Handler函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func Handler(w http.ResponseWriter, r *http.Request) {
// 检查HTTP方法是否为GET
m := r.Method
// 如果不为GET,返回405 Method Not Allowed
if m != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 方法为GET,获取URL中的<object_name>部分
from := 0
size := 1000
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 无限循环调用es包的SearchAllVersions函数
for {
// 返回一个元数据的数组
metas, e := es.SearchAllVersions(name, from, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 遍历数组,将元数据一一写入HTTP响应的正文
for i := range metas {
b, _ := json.Marshal(metas[i])
w.Write(b)
w.Write([]byte("\n"))
}
// 如果返回的数组长度不等于size,说明元数据服务种没有更多的数据,直接返回
if len(metas) != size {
return
}
// 否则把from的值增加1000进行下一轮迭代
from += size
}
}
接口服务的objects包

加入元数据服务以后,接口服务的objects包与上一版本相比发生了较大的变化,除了多了一个对象的DELETE方法以外,对象的PUT和GET方法也都有所改变,它们需要和元数据服务互动。

obejects.Handler函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m == http.MethodPut {
put(w, r)
return
}
if m == http.MethodGet {
get(w, r)
return
}
// 与上一版本多一个DELETE方法处理del函数
if m == http.MethodDelete {
del(w, r)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
}

objects.del函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func del(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 以name为参数调用es.SearchLaestVersion,搜索该对象最新的版本
version, e := es.SearchLatestVersion(name)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// 插入新的元数据,接受元数据的name, version, size和hash
// hash 为空字符串表示这个一个删除标记
e = es.PutMetadata(name, version.Version+1, 0, "")
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
}

objects.put相关函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func put(w http.ResponseWriter, r *http.Request) {
// 先从HTTP请求头部获取对象的散列值
hash := utils.GetHashFromHeader(r.Header)
if hash == "" {
log.Println("missing object hash in digest header")
w.WriteHeader(http.StatusBadRequest)
return
}

// 以散列值作为参数调用stroreObject
c, e := storeObject(r.Body, url.PathEscape(hash))
if e != nil {
log.Println(e)
w.WriteHeader(c)
return
}
if c != http.StatusOK {
w.WriteHeader(c)
return
}

// 从URL中获取对象的名字和对象的大小
name := strings.Split(r.URL.EscapedPath(), "/")[2]
size := utils.GetSizeFromHeader(r.Header)
// 以对象的名字、散列值和大小为参数调用es.AddVersions给对象添加新版本
e = es.AddVersion(name, hash, size)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
}
}

GetHashFromHeader函数和GetSizeFromHeader函数是utils包提供的两个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func GetHashFromHeader(h http.Header) string {
// 获取"digest"头部
digest := h.Get("digest")
// 检查diest头部的形式是否满足"SHA-256=<hash>"
if len(digest) < 9 {
return ""
}
if digest[:8] != "SHA-256=" {
return ""
}
return digest[8:]
}

func GetSizeFromHeader(h http.Header) int64 {
// 得到"conten-length"头部,并调用strconv.PareseInt将字符串转换为int64输出
size, _ := strconv.ParseInt(h.Get("content-length"), 0, 64)
return size
}

objects.get函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func get(w http.ResponseWriter, r *http.Request) {
name := strings.Split(r.URL.EscapedPath(), "/")[2]
// 获取URL并从URL的查询参数中获取"version"参数的值
// Query方法返回一个保存URL所有查询参数的map,该map的键是查询参数的名字,值是一个字符串数组
// HTTP的URL查询参数允许存在多个值,以"version"为key可以得到URL中查询参数的所有值
versionId := r.URL.Query()["version"]
version := 0
var e error
if len(versionId) != 0 {
// 项目中不考虑多个"version"查询参数的情况
// 始终以versionId数组的第一个元素作为客户端提供的版本号
// 将字符串转换为整型
version, e = strconv.Atoi(versionId[0])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusBadRequest)
return
}
}
// 调用es的GetMetadata函数得到对象的元数据meta
meta, e := es.GetMetadata(name, version)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusInternalServerError)
return
}
// meta.Hash为对象的散列值,如果为空表示该对象版本是一个删除标记
// 返回404 Not Found
if meta.Hash == "" {
w.WriteHeader(http.StatusNotFound)
return
}
// 以散列值为对象名从数据服务层获取对象并输出
object := url.PathEscape(meta.Hash)
stream, e := getStream(object)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
io.Copy(w, stream)
}
es包

es包封装了以HTTP访问ES的各种API的操作。

es.getMetadata函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 结构与ES映射中定义的objects类型属性一一对应
type Metadata struct {
Name string
Version int
Size int64
Hash string
}

// 根据对象的名字和版本号来获取对象的元数据
func getMetadata(name string, versionId int) (meta Metadata, e error) {
// ES服务器地址来自环境变量ES_SERVER,索引是metadata,类型是objects
// 文档的id由对象的名字和版本号拼接而成。
url := fmt.Sprintf("http://%s/metadata/objects/%s_%d/_source",
os.Getenv("ES_SERVER"), name, versionId)
// GET到URL中的对象的元数据,免除耗时的搜索操作
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf("fail to get %s_%d: %d", name, versionId, r.StatusCode)
return
}
// 读出数据
result, _ := ioutil.ReadAll(r.Body)
// ES返回的结果经过JSON解码后被es,SearchLatestVersion函数使用
json.Unmarshal(result, &meta)
return
}

es.SearchLatestVersion函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
type hit struct {
Source Metadata `json:"_source"`
}

type searchResult struct {
Hits struct {
Total int
Hits []hit
}
}

func SearchLatestVersion(name string) (meta Metadata, e error) {
// 调用ES搜索API.在URL中指定对象的名字,版本号以降序排列且只返回第一个结果。
url := fmt.Sprintf("http://%s/metadata/_search?q=name:%s&size=1&sort=version:desc",
os.Getenv("ES_SERVER"), url.PathEscape(name))
fmt.Println(url)
r, e := http.Get(url)
if e != nil {
return
}
if r.StatusCode != http.StatusOK {
e = fmt.Errorf("fail to search latest metadata: %d", r.StatusCode)
return
}
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
// ES返回的结果通过JSON解码到一个searchResult结构体中
// searchResult和ES搜索API返回的结构体保持一致
// 方便读取搜索到的元数据并赋值给meta返回。
json.Unmarshal(result, &sr)
if len(sr.Hits.Hits) != 0 {
meta = sr.Hits.Hits[0].Source
}
// ES的返回结果长度为0,说明没有搜到相对应的元数据,直接返回
// 此时meta中各属性都为初始值:字符串为空,整型为0
return
}

es.GetMetadata函数:

GetMetadata函数的功能类似getMetadata,输入对象的名字和版本号返回对象,区别在于当version为0时会调用SearchLatestVersion获取当前最新的版本。

1
2
3
4
5
6
7
func GetMetadata(name string, version int) (Metadata, error) {
// 当version为0的时候,调用SearchLatestVersion获取当前最新的版本
if version == 0 {
return SearchLatestVersion(name)
}
return getMetadata(name, version)
}

es.PutMetadata函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 用于向ES服务上传一个新的元数据,输入的4个参数对应元数据的4个属性
func PutMetadata(name string, version int, size int64, hash string) error {
// 生成ES文档,一个ES的文档相当于数据库的一条记录
doc := fmt.Sprintf(`{"name":"%s","version":%d,"size":%d,"hash":"%s"}`,
name, version, size, hash)
client := http.Client{}
// 使用op_type=create参数,如果同时又多个客户端上传同一个数据,结果会发生冲突
// 只有第一个文档被成功创建,之后的PUT请求,ES会返回409Conflict
url := fmt.Sprintf("http://%s/metadata/objects/%s_%d?op_type=create",
os.Getenv("ES_SERVER"), name, version)
// 用PUT方法将文档上传到metadata索引的objects类型
request, _ := http.NewRequest("PUT", url, strings.NewReader(doc))
r, e := client.Do(request)
if e != nil {
return e
}
// 如果为409Conflict,函数让版本号加1并递归调用自身继续上传
if r.StatusCode == http.StatusConflict {
return PutMetadata(name, version+1, size, hash)
}
if r.StatusCode != http.StatusCreated {
result, _ := ioutil.ReadAll(r.Body)
return fmt.Errorf("fail to put metadata: %d %s", r.StatusCode, string(result))
}
return nil
}

es.AddVersion函数:

1
2
3
4
5
6
7
8
9
func AddVersion(name, hash string, size int64) error {
// 获取对象最新的版本
version, e := SearchLatestVersion(name)
if e != nil {
return e
}
// 在版本号上加1调用PutMetadata
return PutMetadata(name, version.Version+1, size, hash)
}

es.SearchAllVersion函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 用于搜索某个对象或所有对象的全部版本
func SearchAllVersions(name string, from, size int) ([]Metadata, error) {
// name表示对象的名字,如果name不为空字符粗则搜索指定对象的所有版本
// 否则搜索所有对象的全部版本
// from和size指定分页的显示结果
// 搜索的结果按照对象的名字和版本号排序
url := fmt.Sprintf("http://%s/metadata/_search?sort=name,version&from=%d&size=%d",
os.Getenv("ES_SERVER"), from, size)
if name != "" {
url += "&q=name:" + name
}
r, e := http.Get(url)
if e != nil {
return nil, e
}
metas := make([]Metadata, 0)
result, _ := ioutil.ReadAll(r.Body)
var sr searchResult
json.Unmarshal(result, &sr)
for i := range sr.Hits.Hits {
metas = append(metas, sr.Hits.Hits[i].Source)
}
return metas, nil
}

测试

测试环境与上一版本相同,具体代码见带元数据服务的分布式对象存储

参考

《分布式对象存储—原理、架构及Go语言实现》

可扩展分布式系统

分布式系统

一个分布式系统要求各节点分布在网络上,并通过消息传递来合作完成一个共同目标

分布式系统的三大关键特征:

  • 节点之间并发工作
  • 没有全局锁
  • 某个节点上发生的错误不影响其他节点。

分布式系统的好处在于可扩展性,只需要加入新的节点就可以自由扩展集群的性能。

实现分布式系统的关键是实现接口和具体实现的解耦。

接口服务和数据存储服务分离

如图所示,接口服务层提供了对外的REST接口,而数据服务层则提供数据的存储功能。接口服务处理客户端的请求,然后向数据服务存取对象,数据服务处理来自接口服务的请求并在本地磁盘上存取对象。

接口服务和数据存储服务分离

接口服务和数据服务之间的两种接口:

  • 实现对象存取的接口:对象的存取使用REST 接口。也就是说数据服务本身也提供REST 接口。此时,接口服务节点作为HTTP客户端向数据服务请求对象
  • 第二种接口通过RabbitMQ消息队列进行通信,对 RabbitMQ 的使用分为两种模式。
    • 一种模式是向某个exchange进行一对多的消息群发
    • 另一种模式则是向某个消息队列进行一对一的消息单发

每一个数据服务节点都需要向所有的接口服务节点通知自身的存在,为此,创建一个名为apiServers 的 exchange,每一台数据服务节点都会持续向这个exchange发送心跳消息。所有的接口服务节点在启动以后都会创建一个消息队列来绑定这个exchange,任何发往这个exchange 的消息都会被转发给绑定它的所有消息队列,也就是说每一个接口服务节点都会接收到任意一台数据服务节点的心跳消息。

接口服务需要在收到对象GET 请求时定位该对象被保存在哪个数据服务节点上,所以还需要创建一个名为dataServers的exchange。所有的数据服务节点绑定这个exchange并接收来自接口服务的定位消息。拥有该对象的数据服务节点则使用消息单发通知该接口服务节点。

之所以必须使用REST和消息队列这两种不同类型的接口是为了满足不同的需求:对象存取的特点是数据量有可能很大,不适合将一个巨大的对象通过消息队列传输。而REST接口虽然能够处理大数据量传输,但是对于群发却显得力不从心。

REST接口
  • 对于数据服务来说,它的REST接口和单机版本完全相同,也就是对象的PUT和GET方法。
  • 对于接口服务来说,除了对象的PUT和GET 方法之外,还应另外提供一个用于定位的 locate接口,用来帮助验证架构。
    1
    2
    GET /locate/<object_name>
    响应正文

客户端通过GET方法发起对象定位请求,接口服务节点收到该请求后通过dataServers exchange会向数据服务层群发一个定位消息,然后等待数据服务节点的反馈如果有数据服务节点发回确认消息,则返回该数据服务节点的地址;如果超过一定时间没有任何反馈,则返回HTTP错误代码404 NOT FOUND.

RabbitMQ消息设计

数据服务需要通过RabbitMQ将自身的存在通知给所有的接口服务,这样的消息称为心跳消息。

数据服务心跳消息

apiServers和dataServers这两个exchange需要在RabbitMQ服务器上预先创建。每个接口服务节点在启动后都会创建自己的消息队列并绑定至 apiServers exchange。 每个数据服务节点在启动后每隔5s就会发送一条消息给apiServers exchange,消息的正文就是该数据服务节点的 HTTP监听地址。接口服务节点在收到该消息后就会记录这个地址。

定位流程

每个数据服务节点在启动时都必须创建自己的消息队列并绑定至dataServers exchange。当接口服务需要定位时,会创建一个临时消息队列,然后发送一条消息给dataServers exchange消息的正文是需要定位的对象,返回地址则是该临时队列的名字。定位成功的数据服务节点需要将反馈消息发送给这个临时队列,反馈消息的正文是该数据服务节点自身的监听地址临时消息队列会在一定时间后关闭。如果在关闭前没有收到任何反馈则该对象定位失败,接口服务节点就会知道该对象不存在于数据服务层。

接口和存储分离的对象PUT流程

接口和存储分离的对象PUT流程

客户端向接口服务发送HTTP的PUT 请求并提供了<object_name><contentof object>,接口服务选出一个随机数据服务节点并向它转发这个PUT请求,数据服务节点将<content of object>写入$STORAGE_ROOT/objects/<object_name>文件。

接口和存储分离的对象GET流程

接口和存储分离的对象GET流程

客户端的GET 请求提供了<object_name>,接口服务在收到GET请求后会对该object进行定位,如果定位失败则返回404 Not Found;如果定位成功,接口服务会接收到某个数据服务的地址,就可以向该地址转发来自客户端的GET请求,由数据服务读取本地磁盘上的文件并将其内容写入HTTP响应的正文。

具体实现

数据服务

为支持新的功能,数据服务的在实现上扩展了单机版的相关功能:

1
2
3
4
5
6
func main() {
go heartbeat.StartHeartbeat()
go locate.StartLocate()
http.HandleFunc("/objects/", objects.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}

数据服务的heartbeat包:

1
2
3
4
5
6
7
8
9
10
11
12
// 每隔5s向apiServers exchange发送一条消息
// 将本服务节点的监听对地址发送出去
func StartHeartbeat() {
// 调用rabbitmq.New创建一个rabbitmq.RabbitMQ结构体
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
// 无限循环调用Publish方法向apiServers exchange发送本节点的监听地址
for {
q.Publish("apiServers", os.Getenv("LISTEN_ADDRESS"))
time.Sleep(5 * time.Second)
}
}

数据服务的locate包:

locate包有两个函数,分别是用于实际定位对象的Locate函数和用于监听定位消息的StartLocate函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 实际定位对象
func Locate(name string) bool {
// os.Stat访问磁盘上对应的文件名
_, err := os.Stat(name)
// 判断文件名是否存在,存在返回true,失败返回false
return !os.IsNotExist(err)
}

// 用于监听定位消息
func StartLocate() {
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
// 绑定dataServers exchange
q.Bind("dataServers")
// 返回一个channel
c := q.Consume()
// range遍历channel,接收消息
for msg := range c {
// JSON编码使得对象名字上有一对双引号,使用strconv.Unquote将输入的字符串前后的双引号去除并作为结果返回
object, e := strconv.Unquote(string(msg.Body))
if e != nil {
panic(e)
}
if Locate(os.Getenv("STORAGE_ROOT") + "/objects/" + object) {
//文件存在,调用Send方法向消息的发送方返回本服务节点的监听地址,表示该对象存在于本服务节点上。
q.Send(msg.ReplyTo, os.Getenv("LISTEN_ADDRESS"))
}
}
}
接口服务

接口服务除了提供对象的REST接口以外还需要提供locate功能,其main函数为:

1
2
3
4
5
6
7
8
9
func main() {
// 提供locate功能
go heartbeat.ListenHeartbeat()
// 处理URL以/objects/开头的对象请求
http.HandleFunc("/objects/", objects.Handler)
// 处理URL以/locate/开头的定位请求
http.HandleFunc("/locate/", locate.Handler)
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}

注意:接口服务层和数据服务层的objects包以及heartbeat、locate包虽然名字相同,但具体实现差距较大。

  • 数据服务的objects包负责对象在本地磁盘上的存取;而接口服务的objects包则负责将对象请求转发给数据服务。
  • 数据服务的heartbeat包用于发送心跳消息;而接口服务的heartbeat包则用于接收数据服务节点的心跳消息
  • 数据服务的locate包用于接收定位消息、定位对象以及发送反馈消息;而接口服务的locate包则用于发送定位消息并处理反馈消息

接口服务的heartbeat包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// map,整个包可见,用于缓存所有的数据服务节点
var dataServers = make(map[string]time.Time)
var mutex sync.Mutex

func ListenHeartbeat() {
// 创建消息队列绑定apiServers exchange
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
defer q.Close()
q.Bind("apiServers")
// 通过go channel监听每个来自数据服务节点的心跳信息
c := q.Consume()
go removeExpiredDataServer() //goroutine并行处理
for msg := range c {
//将数据服务节点的监听地址作为map的键,收到消息的时间作为值存入map中
dataServer, e := strconv.Unquote(string(msg.Body))
if e != nil {
panic(e)
}
mutex.Lock()
dataServers[dataServer] = time.Now()
mutex.Unlock()
}
}

// 每隔5s扫描一遍map,清除其中超过10s没有收到心跳消息的数据服务节点
func removeExpiredDataServer() {
for {
time.Sleep(5 * time.Second)
mutex.Lock()
for s, t := range dataServers {
if t.Add(10 * time.Second).Before(time.Now()) {
delete(dataServers, s)
}
}
mutex.Unlock()
}
}

// 遍历map并返回当前所有的数据服务节点
// 为防止多个goroutine并发读写map造成错误,map读写全部需要mutex的保护
func GetDataServers() []string {
mutex.Lock()
defer mutex.Unlock()
ds := make([]string, 0)
for s, _ := range dataServers {
ds = append(ds, s)
}
return ds
}

// 从当前所有的数据服务节点中随机选择一个节点并返回
func ChooseRandomDataServer() string {
ds := GetDataServers()
n := len(ds)
if n == 0 {
return ""
}
return ds[rand.Intn(n)]
}

接口服务的locate包:主要用于向数据服务节点群发定位消息并接收反馈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 处理HTTP请求
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method
if m != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// 将文件名作为Locate函数的参数进行定位
info := Locate(strings.Split(r.URL.EscapedPath(), "/")[2])
// 为空,说明定位失败
if len(info) == 0 {
w.WriteHeader(http.StatusNotFound)
return
}
// 不为空,则拥有该对象的一个数据服务节点的地址,将地址作为HTTP响应的正文输出
b, _ := json.Marshal(info)
w.Write(b)
}

func Locate(name string) string {
//创建一个消息队列
q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER"))
// 向dataServers exchange群发对象名字的定位信息
q.Publish("dataServers", name)
c := q.Consume()
go func() { //1s后关闭临时消息队列
//设置超时机制,避免无止境的等待。
//1s后没有任何反馈,消息队列关闭,收到一个长度为0的消息,返回一个空字符串
time.Sleep(time.Second)
q.Close()
}()
//阻塞等待数据服务节点向自己发送反馈消息
//若在1s内有来自数据服务节点的消息,返回该消息的正文内容,也就是该数据服务节点的监听地址
msg := <-c
s, _ := strconv.Unquote(string(msg.Body))
return s
}

// 检查Locate结果是否为空字符串来判定对象是否存在
func Exist(name string) bool {
return Locate(name) != ""
}

接口服务的objects包:

接口服务的objects包跟数据服务有很大区别,其put函数和get函数并不会访问本地磁盘上的对象,而是将HTTP请求转发给数据服务。put函数负责处理对象PUT请求,其相关函数如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func put(w http.ResponseWriter, r *http.Request) {
// 从URL中获取objects_name
object := strings.Split(r.URL.EscapedPath(), "/")[2]
// 将r.Body和objects作为参数调用storeObject
// 第一个返回值为int类型的变量,用于表示HTTP错误码
// 第二个返回值为error,如果error不为nil,出错并打印
c, e := storeObject(r.Body, object)
if e != nil {
log.Println(e)
}
w.WriteHeader(c)
}

func storeObject(r io.Reader, object string) (int, error) {
stream, e := putStream(object)
// 没有找到可用的数据服务节点
if e != nil {
return http.StatusServiceUnavailable, e
}

// 找到可用的数服务节点并得到一个objectstream.PutStream的指针
// objectstream.PutStream实现了Write方法,是一个io.Write接口
// 用io.Copy将HTTP请求的正文写入stream
io.Copy(stream, r)
e = stream.Close()
//写入出错
if e != nil {
return http.StatusInternalServerError, e
}
//写入成功
return http.StatusOK, nil
}

func putStream(object string) (*objectstream.PutStream, error) {
// 先获得一个随机数服务节点的地址
server := heartbeat.ChooseRandomDataServer()
fmt.Println("data server =", server)
// 没有可用的数据服务节点,返回objectstream.PutStream的空指针
if server == "" {
return nil, fmt.Errorf("cannot find any dataServer")
}

return objectstream.NewPutStream(server, object), nil
}

objectstream包:对Go中的http包的一个封装,用于将一些http函数的调用转换成写流的形式,方便处理。具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

type PutStream struct {
// writer用于实现Write方法
writer *io.PipeWriter
// c用于把在一个gouroutine传输数据过程中发生的错误传回主线程
c chan error
}

// 用于生成一个PutStream结构体
func NewPutStream(server, object string) *PutStream {
// 用io.Pipe创建一对reader和writer,类型为*io.PipeReader和*io.PipeWriter
// 管道互联,写入writer的内容可以从reader中读出
// 希望以写入数据流的方法操作HTTP的PUT请求
reader, writer := io.Pipe()
c := make(chan error)
go func() {
// 生成put请求,需要提供一个io.Reader作为http.NewRequest的参数
request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader)
// http.Client负责从request中读取需要PUT的内容
client := http.Client{}
// 由于管道的读写阻塞特性,在goroutine中调用client.Do方法
r, e := client.Do(request)
if e == nil && r.StatusCode != http.StatusOK {
e = fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
c <- e
}()
return &PutStream{writer, c}
}

// 用于写入writer,实现该方法PutStream才被认为实现了io.Write接口
func (w *PutStream) Write(p []byte) (n int, err error) {
return w.writer.Write(p)
}

// 关闭writer,为了让管道另一端的reader读到io.EOF,否则在gouroutine中运行的client.Do将始终阻塞无法返回
func (w *PutStream) Close() error {
w.writer.Close()
// 从c中读取发送自goroutine得错误并返回
return <-w.c
}

接口服务层object包用于处理GET请求具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func get(w http.ResponseWriter, r *http.Request) {
object := strings.Split(r.URL.EscapedPath(), "/")[2]
// 调用getStream生成一个类型为io.Reader的stream
stream, e := getStream(object)
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
// 调用io.Copy将stream的内容写入HTTP响应的正文
io.Copy(w, stream)
}

func getStream(object string) (io.Reader, error) {
// 定位object对象
server := locate.Locate(object)
if server == "" {
return nil, fmt.Errorf("object %s locate fail", object)
}
// 调用objectstream.NewGetStream并返回结果
return objectstream.NewGetStream(server, object)
}

objectstream包的GetStream相关实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
type GetStream struct {
reader io.Reader
}

func newGetStream(url string) (*GetStream, error) {
// 输入的url表示用于获取数据流的HTTP服务地址
// 调用http.Get发起一个GET请求,获取该地址的HTTP响应
r, e := http.Get(url) //返回的r类型为*http.Response,其body是用于读取HTTP响应正文的io.Reader
if e != nil {
return nil, e
}
if r.StatusCode != http.StatusOK {
return nil, fmt.Errorf("dataServer return http code %d", r.StatusCode)
}
return &GetStream{r.Body}, nil
}

// 封装newGetStream函数
func NewGetStream(server, object string) (*GetStream, error) {
if server == "" || object == "" {
return nil, fmt.Errorf("invalid server %s object %s", server, object)
}
// 内部拼凑一个url传给newGetStream,对外隐藏url的细节
return newGetStream("http://" + server + "/objects/" + object)
}

// 用于读取reader成员,实现该方法,则GetStream结构体实现io.Reader接口
func (r *GetStream) Read(p []byte) (n int, err error) {
return r.reader.Read(p)
}

rabbitmq包实现:

为使用RabbitMQ,需要下载RabbitMQ提供的Go语言包"github.com/streadway/amqp"

1
go get -u "github.com/streadway/amqp"

为方便使用,实现一个rabbitmq包,该包对"github.com/streadway/amqp进行封装以简化接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package rabbitmq

import (
"encoding/json"
"github.com/streadway/amqp"
)

type RabbitMQ struct {
channel *amqp.Channel
conn *amqp.Connection
Name string
exchange string
}

// 创建一个新的rabbitmq.RabbitMQ结构体
func New(s string) *RabbitMQ {
conn, e := amqp.Dial(s)
if e != nil {
panic(e)
}

ch, e := conn.Channel()
if e != nil {
panic(e)
}

q, e := ch.QueueDeclare(
"", // name
false, // durable
true, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if e != nil {
panic(e)
}

mq := new(RabbitMQ)
mq.channel = ch
mq.conn = conn
mq.Name = q.Name
return mq
}

// 将自己的消息队列和一个exchange绑定
// 所有发往该exchange的消息都能在自己的消息队列中被接收到
func (q *RabbitMQ) Bind(exchange string) {
e := q.channel.QueueBind(
q.Name, // queue name
"", // routing key
exchange, // exchange
false,
nil)
if e != nil {
panic(e)
}
q.exchange = exchange
}

// 往某个消息队列发送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
str, e := json.Marshal(body)
if e != nil {
panic(e)
}
e = q.channel.Publish("",
queue,
false,
false,
amqp.Publishing{
ReplyTo: q.Name,
Body: []byte(str),
})
if e != nil {
panic(e)
}
}

// 往某个exchange发送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}) {
str, e := json.Marshal(body)
if e != nil {
panic(e)
}
e = q.channel.Publish(exchange,
"",
false,
false,
amqp.Publishing{
ReplyTo: q.Name,
Body: []byte(str),
})
if e != nil {
panic(e)
}
}

// 生成一个接收消息的go channel,使客户程序可以通过Go的原生机制接收队列中的消息。
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
c, e := q.channel.Consume(q.Name,
"",
true,
false,
false,
false,
nil,
)
if e != nil {
panic(e)
}
return c
}

// 关闭消息队列
func (q *RabbitMQ) Close() {
q.channel.Close()
q.conn.Close()
}

功能测试

测试环境:包括6个数据服务节点和2个接口服务节点,共8个节点。为了方便测试,8个节点其实都运行在同一台服务器上,只是绑定了8个不同的地址加以区分。
6个数据服务节点地址分别是10.29.1.1:12346、10.29.1.2:12346、10.29.1.3:12346、10.29.1.4:12346、10.29.1.5:12346、10.29.1.6:12346。2个接口服务节点地址是10.29.2.1:12346,10.29.2.2:12346

在同一台服务器上绑定多个地址命令:

1
2
3
4
5
6
7
8
sudo ifconfig eno1:1 10.29.1.1/16
sudo ifconfig eno1:2 10.29.1.2/16
sudo ifconfig eno1:3 10.29.1.3/16
sudo ifconfig eno1:4 10.29.1.4/16
sudo ifconfig eno1:5 10.29.1.5/16
sudo ifconfig eno1:6 10.29.1.6/16
sudo ifconfig eno1:7 10.29.2.1/16
sudo ifconfig eno1:8 10.29.2.2/16

eno1是这台机器的网络接口,可使用ifconfig查询得到,由于Ubuntu 16.0.4的内核支持接口别名,只需要在ifconfig 命令上使用别名接口(eno1后面加上冒号和一个数字)就可以在同一个接口上绑定多个地址。
为了让节点能够建立消息队列,还需要一台 RabbitMQ服务器(地址设置为本地即可),在其上安装rabbitmq-server:

1
2
3
4
sudo apt-get install rabbitmq-server
# 下载rabbitmqadmin管理工具
sudo rabbitmq-plugins enable rabbitmq_managements
wget localhost:15672/cli /rabbitmqadmin
1
2
3
# 创建apiServers 和 dataServers这两个exchange.
$ python3 rabbitmqadmin declare exchange name=apiServers type=fanout
$ python3 rabbitmqadmin declare exchange name=dataServers type=fanout
1
2
3
4
#添加用户test,密码test。
$ sudo rabbitmqctl add_user test test
#给test用户添加访问所有exchange的权限。
$ sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*"

消息队列服务器就绪,现在需要同时启动8个服务程序,在启动前还要记得创建相应的$STORAGE_ROOT目录及其子目录objects,接下来的操作可使用shell脚本进行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#clean environment
for i in `seq 1 6`
do
rm -rf /tmp/$i/objects/*
rm -rf /tmp/$i/temp/*
done

# prepare the distributed envrionment
for i in `seq 1 6`
do
mkdir -p /tmp/$i/objects
mkdir -p /tmp/$i/temp
mkdir -p /tmp/$i/garbage
done

# rabbitmq env
# wget localhost:15672/cli/rabbitmqadmin #rabbitmq访问
python3 rabbitmqadmin declare exchange name=apiServers type=fanout
python3 rabbitmqadmin declare exchange name=dataServers type=fanout
#sudo rabbitmqctl add_user test test #首次运行需要创建用户和密码
#sudo rabbitmqctl set_permissions -p / test ".*" ".*" ".*" #修改访问权限

# start test env
export RABBITMQ_SERVER=amqp://test:test@localhost:5672
export ES_SERVER=localhost:9200

LISTEN_ADDRESS=10.29.1.1:12346 STORAGE_ROOT=/tmp/1 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.2:12346 STORAGE_ROOT=/tmp/2 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.3:12346 STORAGE_ROOT=/tmp/3 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.4:12346 STORAGE_ROOT=/tmp/4 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.5:12346 STORAGE_ROOT=/tmp/5 go run ../dataServer/dataServer.go &
LISTEN_ADDRESS=10.29.1.6:12346 STORAGE_ROOT=/tmp/6 go run ../dataServer/dataServer.go &

LISTEN_ADDRESS=10.29.2.1:12346 go run ../apiServer/apiServer.go &
LISTEN_ADDRESS=10.29.2.2:12346 go run ../apiServer/apiServer.go &

使用curl测试:

1
2
3
4
5
6
7
8
#!/bin/bash

curl -v 10.29.2.1:12346/objects/test2 -XPUT -d"this is object test2"

curl 10.29.2.2:12346/locate/test2
echo
curl 10.29.2.1:12346/objects/test2
echo

测试结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
*   Trying 10.29.2.1...
* TCP_NODELAY set
* Connected to 10.29.2.1 (10.29.2.1) port 12346 (#0)
> PUT /objects/test2 HTTP/1.1
> Host: 10.29.2.1:12346
> User-Agent: curl/7.63.0
> Accept: */*
> Content-Length: 20
> Content-Type: application/x-www-form-urlencoded
>
* upload completely sent off: 20 out of 20 bytes
< HTTP/1.1 200 OK
< Date: Sat, 10 Apr 2021 08:11:54 GMT
< Content-Length: 0
<
* Connection #0 to host 10.29.2.1 left intact
"10.29.1.4:12346"
this is object test2

完整代码可见:可扩展分布式系统实现

参考

《分布式对象存储—原理、架构及Go语言实现》

对象存储

网络存储

  • NAS: Network Attached Storage的简称,是一个提供了存储功能文件系统网络服务器。客户端可以访问NAS上的文件系统,还可以上传和下载文件。NAS客户端和服务端之间使用的协议有SMB、NFS 以及AFS等网络文件系统协议。对于客户端来说,NAS就是一个网络上的文件服务器。
  • SAN: Storage Area Network 的简称。和NAS的区别是SAN只提供了块存储,而把文件系统的抽象交给客户端来管理。SAN 的客户端和服务端之间的协议有FibreChannel、iSCSI、ATA over Ethernet(AoE)和 HyperSCSI。对于客户端来说,SAN就是一块磁盘,可以对其格式化、创建文件系统并挂载。

现代的网络存储通常混合使用NAS和SAN,同时提供文件级别的协议和块级别的协议。

网络文件系统、块存储与对象存储的区别

数据管理
  • 网络文件系统:数据是以一个个文件的形式来管理
  • 块存储:数据是以数据块的形式来管理的,每个数据块有它自己的地址,但是没有额外的背景信息
  • 对象存储:以对象的方式来管理数据的,一个对象通常包含了3个部分:对象的数据、对象的元数据以及一个全局唯一的标识符(即对象的ID)。其中
    • 对象的数据就是该对象中存储的数据本身。一个对象可以用来保存大量无结构的数据(例如:音乐的具体内容)。
    • 对象的元数据是对象的描述信息,为了和对象的数据本身区分开来,称其为元数据(例如:音乐的名字、大小等)。
    • 对象的标识符用于引用该对象。和对象的名字不同,标识符具有全局唯一性。名字不具有这个特性。通常用对象的散列值来做其标识符
数据访问
  • 网络文件系统的客户端通过NFS等网络协议访问某个远程服务器上存储的文件。
  • 块存储的客户端通过数据块的地址访问SAN上的数据块。
  • 对象存储则通过REST网络服务访问对象。

REST为Representational State Transfer的简称。REST网络服务通过标准HTTP服务对网络资源提供一套预先定义的无状态操作。网络资源被定义为可以通过URL 访问的文档或文件。更广发的:网络上一切可以通过任何方式被标识、命名、引用或处理的东西都是一种网络资源。

对于对象存储来说,对象就是一种网络资源,但除了对象本身以外,还需要提供一些其他的网络资源用来访问对象存储的各种功能。客户端向 REST网络服务发起请求并接收响应,以确认网络资源发生了某种变化。HTTP预定义的请求方法(Request Method)通常包括且不限于GET、POST、PUT、DELETE等。它们分别对应不同的处理方式:GET方法在REST 标准中通常用来获取某个网络资源,PUT通常用于创建或替换某个网络资源(注意,它跟PUT的区别是POST一般不同于替换网络资源,如果该资源已经存在,POST通常会返回一个错误而不是覆盖它,POST通常用于创建某个网络资源,DELETE通常用于删除某个网络资源。

对象存储的优势
  • 扩展方便:扩展只需要添加新的存储节点就可以
  • 低代价的数据冗余能力

单机版对象存储系统

通过在一台服务器上运行一个HTTP服务提供的REST接口,并通过接口实现本地服务器上的对象存取。

单机版分布式存储系统

REST接口

实现PUT方法和GET方法

对象PUT
1
2
PUT /object/<object_name>
请求正文(Request Body)

单机版put操作

客户端通过PUT方法将一个对象上传至服务器,服务器则将该对象保存在本地磁盘上。/objects/<object_name>是标识该对象网络资源的URL。URL是Uniform Resource Locator 的简称,也就是一个网络地址,用于引用某个网络资源在网络上的位置。

在对象存储中,通常使用PUT方法来上传一个对象。客户端的PUT请求提供了对象的名字<object_name>和对象的数据<content of object>,它们最终被保存在本地磁盘上的文件STORAGE_ROOT/objects/<object_name>中。$STORAGE_ROOT环境变量保存着在本地磁盘上的存储根目录的名字。

对象GET
1
2
GET /objects/<object_name>
响应正文 (Response Body)

单机版get操作

客户端通过GET方法从服务器上下载一个对象,服务器在本地磁盘上查找并读取该对象,如果该对象不存在,则服务器返回HTTP错误代码404 Not Found

在对象存储中,总是使用GET方法来下载一个对象。客户端的GET请求提供了<object_name>,服务进程从本地磁盘上的文件$STORAGE_ROOT/objects/<object_name>读取对象并将其写入HTTP响应正文。

具体实现

main函数

1
2
3
4
5
6
7
8
9
func main() {
//注册HTTP处理函数objects.Handler,若有客户端访问该服务器的HTTP服务且URL以"objects/"开头,
//则请求将由objects.Handler负责处理。除此之外的HTTP请求会默认返回HTTP错误代码404 Not Found.
http.HandleFunc("/objects/", objects.Handler)
//监听端口
//正常情况下没有返回,程序运行后开始监听端口上的请求
//非正常情况下,该函数将错误返回,log.Fatal打印错误信息并退出程序
log.Fatal(http.ListenAndServe(os.Getenv("LISTEN_ADDRESS"), nil))
}

objects包下的Handler函数、get、put函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package objects

import "net/http"

// 检查HTTP请求方法:PUT则调用put函数,GET则调用共get函数。其余则返回405 Method Not Allowed错误代码
func Handler(w http.ResponseWriter, r *http.Request) {
m := r.Method //Method记录该HTTP请求的方法
if m == http.MethodPut {
put(w, r)
return
}
if m == http.MethodGet {
get(w, r)
return
}
//写HTTP响应的代码
w.WriteHeader(http.StatusMethodNotAllowed)
}

put方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package objects

import (
"io"
"log"
"net/http"
"os"
"strings"
"fmt"
)

func put(w http.ResponseWriter, r *http.Request) {
fmt.Println(r.URL.EscapedPath())
//r.URL.EsccapedPath得到request的路径,此处为/objects/xxx
f, e := os.Create(os.Getenv("STORAGE_ROOT") + "/objects/" +
strings.Split(r.URL.EscapedPath(), "/")[2]) //得到文件名
if e != nil {
//创建文件失败
log.Println(e)
//写入HTTP响应的代码
w.WriteHeader(http.StatusInternalServerError)
return
}
defer f.Close()
io.Copy(f, r.Body) //将r.Body写入文件
}

get方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package objects

import (
"io"
"log"
"net/http"
"os"
"strings"
)

func get(w http.ResponseWriter, r *http.Request) {
f, e := os.Open(os.Getenv("STORAGE_ROOT") + "/objects/" +
strings.Split(r.URL.EscapedPath(), "/")[2])
if e != nil {
log.Println(e)
w.WriteHeader(http.StatusNotFound)
return
}
defer f.Close()
io.Copy(w, f)
//f本身的类型是*os.File,同时实现了io.Writer和io.Reader两个接口,即实现了Write和Read方法
//http.ResponseWriter也是接口,该接口实现了Write方法,也是一个io.Write接口
}

linux下功能测试

运行服务器:

1
2
mkdir /tmp/objects
LISTEN_ADDRESS=:12345 STORAGE_ROOT=/tmp go run server.go

curl 进行http访问:

1
2
3
4
5
curl -v 10.29.102.172:12345/objects/test #默认get操作,此时没有数据,返回404 Not Found

curl -v 10.29.102.172:12345/objects/test -XPUT -d"this is a test object" # put一个对象

curl -v 10.29.102.172:12345/objects/test #默认get操作,成功返回200 OK

参考

《分布式对象存储—原理、架构及Go语言实现》

channel发送和接收元素的本质

1
2
All transfer of value on the go channels happens with the copy of value.
channel 的发送和接收操作本质上都是 “值的拷贝”,无论是从 sender goroutine 的栈到 chan buf,还是从 chan buf 到 receiver goroutine,或者是直接从 sender goroutine 到 receiver goroutine。

举例分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"time"
)

type user struct {
name string
age int8
}

var u = user{name: "Ankur", age: 25}
var g = &u

func modifyUser(pu *user) {
fmt.Println("modifyUser Received Vaule", pu)
pu.name = "Anand"
}

func printUser(u <-chan *user) {
time.Sleep(2 * time.Second)
fmt.Println("printUser goRoutine called", <-u)
}

func main() {
c := make(chan *user, 5)
c <- g
fmt.Println(g)
// modify g
g = &user{name: "Ankur Anand", age: 100}
go printUser(c)
go modifyUser(g)
time.Sleep(5 * time.Second)
fmt.Println(g)
}
/* 结果
&{Ankur 25}
modifyUser Received Vaule &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}
*/

一开始构造一个结构体 u,地址是 0x56420,图中地址上方就是它的内容。接着把 &u 赋值给指针 g,g 的地址是 0x565bb0,它的内容就是一个地址,指向 u。

main 程序里,先把 g 发送到 c,根据 copy value 的本质,进入到 chan buf 里的就是 0x56420,它是指针 g 的值(不是它指向的内容),所以打印从 channel 接收到的元素时,它就是 &{Ankur 25}。因此,这里并不是将指针 g “发送” 到了 channel 里,只是拷贝它的值而已。

向channel发送数据的过程

发送操作最终转化为 chansend 函数,关注主流程(hchan源码分析见channel底层实现原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
// 位于 src/runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果 channel 是 nil
if c == nil {
// 不能阻塞,直接返回 false,表示未发送成功
if !block {
return false
}
// 当前 goroutine 被挂起
gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
throw("unreachable")
}

// 省略 debug 相关……

// 对于不阻塞的 send,快速检测失败场景
//
// 如果 channel 未关闭且 channel 没有多余的缓冲空间。这可能是:
// 1. channel 是非缓冲型的,且等待接收队列里没有 goroutine
// 2. channel 是缓冲型的,但循环数组已经装满了元素
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

// 锁住 channel,并发安全
lock(&c.lock)

// 如果 channel 关闭了
if c.closed != 0 {
// 解锁
unlock(&c.lock)
// 直接 panic
panic(plainError("send on closed channel"))
}

// 如果接收队列里有 goroutine,直接将要发送的数据拷贝到接收 goroutine
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 对于缓冲型的 channel,如果还有缓冲空间
if c.qcount < c.dataqsiz {
// qp 指向 buf 的 sendx 位置
qp := chanbuf(c, c.sendx)

// ……

// 将数据从 ep 处拷贝到 qp
typedmemmove(c.elemtype, qp, ep)
// 发送游标值加 1
c.sendx++
// 如果发送游标值等于容量值,游标值归 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 缓冲区的元素数量加一
c.qcount++

// 解锁
unlock(&c.lock)
return true
}

// 如果不需要阻塞,则直接返回错误
if !block {
unlock(&c.lock)
return false
}

// channel 满了,发送方会被阻塞。接下来会构造一个 sudog

// 获取当前 goroutine 的指针
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.selectdone = nil
mysg.c = c
gp.waiting = mysg
gp.param = nil

// 当前 goroutine 进入发送等待队列
c.sendq.enqueue(mysg)

// 当前 goroutine 被挂起
goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

// 从这里开始被唤醒了(channel 有机会可以发送了)
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被唤醒后,channel 关闭了。坑爹啊,panic
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 去掉 mysg 上绑定的 channel
mysg.c = nil
releaseSudog(mysg)
return true
}

底层数据结构

源码( go 1.9.2)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type hchan struct {
// chan 里元素数量
qcount uint
// chan 底层循环数组的长度
dataqsiz uint
// 指向底层循环数组的指针
// 只针对有缓冲的 channel
buf unsafe.Pointer
// chan 中元素大小
elemsize uint16
// chan 是否被关闭的标志
closed uint32
// chan 中元素类型
elemtype *_type // element type
// 已发送元素在循环数组中的索引
sendx uint // send index
// 已接收元素在循环数组中的索引
recvx uint // receive index
// 等待接收的 goroutine 队列
recvq waitq // list of recv waiters
// 等待发送的 goroutine 队列
sendq waitq // list of send waiters

// 保护 hchan 中所有字段
lock mutex
}

重点字段

  • buf 指向底层循环数组,只有缓冲型的 channel 才有。

  • sendxrecvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)

  • sendqrecvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。

  • waitqsudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:

1
2
3
4
type waitq struct {
first *sudog
last *sudog
}
  • lock 用来保证每个读 channel 或写 channel 的操作都是原子的。

例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下 :

channel数据结构

channel的创建过程

通道有两个方向,发送和接收。理论上来说,可以创建一个只发送或只接收的通道,但是这种通道创建出来后,怎么使用呢?一个只能发的通道,怎么接收呢?同样,一个只能收的通道,如何向其发送数据呢?

一般而言,使用 make 创建一个能收能发的通道:

1
2
3
4
// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

创建 chan 的函数是 makechan

1
func makechan(t *chantype, size int64) *hchan

从函数原型来看,创建的 chan 是一个指针。所以能在函数间直接传递 channel,而不用传递 channel 的指针。

具体过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

func makechan(t *chantype, size int64) *hchan {
elem := t.elem

// 省略了检查 channel size,align 的代码
// ……

var c *hchan
// 如果元素类型不含指针 或者 size 大小为 0(无缓冲类型)
// 只进行一次内存分配
if elem.kind&kindNoPointers != 0 || size == 0 {
// 如果 hchan 结构体中不含指针,GC 就不会扫描 chan 中的元素
// 只分配 "hchan 结构体大小 + 元素大小*个数" 的内存
c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
// 如果是缓冲型 channel 且元素大小不等于 0(大小等于 0的元素类型:struct{})
if size > 0 && elem.size != 0 {
c.buf = add(unsafe.Pointer(c), hchanSize)
} else {
// race detector uses this l是ocation for synchronization
// Also prevents us from pointing beyond the allocation (see issue 9401).
// 1. 非缓冲型的,buf 没用,直接指向 chan 起始地址处
// 2. 缓冲型的,能进入到这里,说明元素无指针且元素类型为 struct{},也无影响
// 因为只会用到接收和发送游标,不会真正拷贝东西到 c.buf 处(这会覆盖 chan的内容)
c.buf = unsafe.Pointer(c)
}
} else {
// 进行两次内存分配操作
c = new(hchan)
c.buf = newarray(elem, int(size))
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
// 循环数组长度
c.dataqsiz = uint(size)

// 返回 hchan 指针
return c
}

新建一个 chan 后,内存在堆上分配,大概长这样:

chan的内存分配情况

CSP

Go的并发依赖CSP模型,基于channel实现。

1
2
Do not communicate by sharing memory; instead, share memory by communicating.
不要通过共享内存来通信,而要通过通信来实现内存共享。

CSP 全称是 “Communicating Sequential Processes”,用于描述两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。Go语言并没有完全实现了 CSP 并发模型的所有理论,仅仅是实现了 process 和 channel 这两个概念。process 就是Go语言中的 goroutine,每个 goroutine 之间是通过 channel 通讯来实现数据共享。

大多数的编程语言的并发编程模型是基于线程和内存同步访问控制,Go 的并发编程的模型则用 goroutine 和 channel 来替代。Goroutine 和线程类似,channel 和 mutex (用于内存同步访问控制)类似。

Go 的并发原则非常优秀,目标就是简单:尽量使用 channel;把 goroutine 当作免费的资源,随便用。

操作channel的结果

操作 nil channel closed channel not nil, not closed channel
close panic panic 正常关闭
读 <- ch 阻塞 读到对应类型的零值 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写 ch <- 阻塞 panic 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞

总结一下,发生 panic 的情况有三种:向一个关闭的 channel 进行写操作;关闭一个 nil 的 channel;重复关闭一个 channel。

读、写一个 nil channel 都会被阻塞。

事务隔离

转账场景:要给朋友小王转100块钱,而此时银行卡只有100块钱。转账过程具体到程序里会有一系列的操作,比如查询余额、做加减法、更新余额等,这些操作必须保证是一体的,不然等程序查完之后,还没做减法之前,100块钱完全可以借着这个时间差再查一次,然后再给另外一个朋友转账,如果银行这么整,不就乱了么?这时就要用到“事务”这个概念。

简单来说,事务就是要保证一组数据库操作,要么全部成功,要么全部失败。在MySQL中,事务支持是在引擎层实现的。MySQL是一个支持多引擎的系统,但并不是所有的引擎都支持事务。比如MySQL原生的MyISAM引擎就不支持事务,这也是MyISAM被InnoDB取代的重要原因之一。

隔离性与隔离级别

事务的四要素:ACID(Atomicity、Consistency、Isolation、Durability,即原子性、一致性、隔离性、持久性)。本次主要学习隔离性。

当数据库上有多个事务同时执行的时候,就可能出现脏读(dirty read)、不可重复读(non-repeatable read)、幻读(phantom read)的问题. 为了解决这些问题,就有了“隔离级别”的概念。

隔离得越严实,效率就会越低。因此很多时候,我们都要在二者之间寻找一个平衡点。SQL标准的事务隔离级别包括:读未提交(read uncommitted)、读提交(read committed)、可重复读(repeatable read)和串行化(serializable )

  • 读未提交是指,一个事务还没提交时,它做的变更就能被别的事务看到
  • 读提交是指,一个事务提交之后,它做的变更才会被其他事务看到
  • 可重复读是指,一个事务执行过程中看到的数据,总是跟这个事务在启动时看到的数据是一致的。当然在可重复读隔离级别下,未提交变更对其他事务也是不可见的。
  • 串行化,顾名思义是对于同一行记录,“写”会加“写锁”,“读”会加“读锁”。当出现读写锁冲突的时候,后访问的事务必须等前一个事务执行完成,才能继续执行。

举例说明事务的隔离性:

1
2
mysql> create table T(c int) engine=InnoDB;
insert into T(c) values(1);

事务的隔离性

在不同的隔离级别下,事务A会有哪些不同的返回结果,也就是图里面V1、V2、V3的返回值分别是什么。

  • 若隔离级别是“读未提交”,事务B虽然还没有提交,但是结果已经被A看到了。v1 为2,v2为2,v3为2
  • 若隔离级别是“读提交”,事务B的更新在提交后才能被A看到。v1为1,v2为2,v3为2
  • 若隔离级别是“可重复读”,事务在执行期间看到的数据前后必须是一致的。v1为1,v2为1,v3为2
  • 若隔离级别是“串行化”,则在事务B执行“将1改成2”的时候,会被锁住。直到事务A提交后,事务B才可以继续执行。所以从A的角度看, V1、V2值是1,V3的值是2。

在实现上,数据库里面会创建一个视图访问的时候以视图的逻辑结果为准。在“可重复读”隔离级别下,这个视图是在事务启动时创建的,整个事务存在期间都用这个视图。在“读提交”隔离级别下,这个视图是在每个SQL语句开始执行的时候创建的。这里需要注意的是,“读未提交”隔离级别下直接返回记录上的最新值,没有视图概念;而“串行化”隔离级别下直接用加锁的方式来避免并行访问。

在不同的隔离级别下,数据库行为是有所不同的。Oracle数据库的默认隔离级别其实就是“读提交”,因此对于一些从Oracle迁移到MySQL的应用,为保证数据库隔离级别的一致,一定要记得将MySQL的隔离级别设置为“读提交”。

配置隔离级别

配置的方式是,将启动参数transaction-isolation的值设置成READ-COMMITTED。你可以用show variables来查看当前的值。

1
2
3
4
5
6
7
mysql> show variables like 'transaction_isolation';
select @@tx_isolation;
mysql> set global transaction_isolation=0; -- 读未提交
mysql> set global transaction_isolation=1; -- 读提交
mysql> set global transaction_isolation=2; -- 可重复读
mysql> set global transaction_isolation=3; -- 串行化
set [glogal|session] transaction isolation level 隔离级别名称;

需要“可重复读”的场景:假设在管理一个个人银行账户表。一个表存了每个月月底的余额,一个表存了账单明细。这时候要做数据校对,也就是判断上个月的余额和当前余额的差额,是否与本月的账单明细一致。一定希望在校对过程中,即使有用户发生了一笔新的交易,也不影响校对结果。这时候使用“可重复读”隔离级别就很方便。事务启动时的视图可以认为是静态的,不受其他事务更新的影响。

事务隔离的实现

展开说明“可重复读”事务隔离的具体实现:

在MySQL中,实际上每条记录在更新的时候都会同时记录一条回滚操作。记录上的最新值,通过回滚操作,都可以得到前一个状态的值。假设一个值从1被按顺序改成了2、3、4,在回滚日志里面就会有类似下面的记录:

回滚记录

当前值是4,但是在查询这条记录的时候,不同时刻启动的事务会有不同的read-view。如图中看到的,在视图A、B、C里面,这一个记录的值分别是1、2、4,同一条记录在系统中可以存在多个版本,就是数据库的多版本并发控制(MVCC)。对于read-view A,要得到1,就必须将当前值依次执行图中所有的回滚操作得到。另外,即使现在有另外一个事务正在将4改成5,这个事务跟read-view A、B、C对应的事务是不会冲突的。

回滚日志总不能一直保留,什么时候删除呢?答案是,在不需要的时候才删除。也就是说,系统会判断,当没有事务再需要用到这些回滚日志时,回滚日志会被删除

什么时候不需要?就是当系统里没有比这个回滚日志更早的read-view的时候。

为什么建议尽量不要使用长事务

长事务意味着系统里面会存在很老的事务视图。由于这些事务随时可能访问数据库里面的任何数据,所以这个事务提交之前,数据库里面它可能用到的回滚记录都必须保留,这就会导致大量占用存储空间。

在MySQL 5.5及以前的版本,回滚日志是跟数据字典一起放在ibdata文件里的,即使长事务最终提交,回滚段被清理,文件也不会变小。若回滚段太大,最终只好为了清理回滚段,重建整个库。 除了对回滚段的影响,长事务还占用锁资源,也可能拖垮整个库

事务的启动方式

MySQL的事务启动方式有以下几种:

  • 显式启动事务语句, beginstart transaction。配套的提交语句是commit,回滚语句是rollback

  • set autocommit=0,这个命令会将这个线程的自动提交关掉。意味着如果只执行一个select语句,这个事务就启动了,而且并不会自动提交。这个事务持续存在直到主动执行commit 或 rollback 语句,或者断开连接。

有些客户端连接框架会默认连接成功后先执行一个set autocommit=0的命令。这就导致接下来的查询都在事务中,如果是长连接,就导致了意外的长事务。

因此,建议总是使用set autocommit=1, 通过显式语句的方式来启动事务。

对于一个需要频繁使用事务的业务,第二种方式每个事务在开始时都不需要主动执行一次 “begin”,减少了语句的交互次数。如果也有这个顾虑,建议使用commit work and chain语法: 在autocommit为1的情况下,用begin显式启动的事务,如果执行commit则提交事务。如果执行 commit work and chain,则是提交事务并自动启动下一个事务,这样也省去了再次执行begin语句的开销。同时带来的好处是从程序开发的角度明确地知道每个语句是否处于事务中。

可以在information_schema库的innodb_trx表中查询长事务,比如下面这个语句,用于查找持续时间超过60s的事务。

1
select * from information_schema.innodb_trx where TIME_TO_SEC(timediff(now(),trx_started))>60

如何避免长事务对业务的影响?

从应用开发端和数据库端来看。

首先,从应用开发端来看:

  • 确认是否使用了set autocommit=0。这个确认工作可以在测试环境中开展,把MySQL的general_log开起来,然后随便跑一个业务逻辑,通过general_log的日志来确认。一般框架如果会设置这个值,也就会提供参数来控制行为,你的目标就是把它改成1。

  • 确认是否有不必要的只读事务。有些框架会习惯不管什么语句先用begin/commit框起来。我见过有些是业务并没有这个需要,但是也把好几个select语句放到了事务中。这种只读事务可以去掉。

  • 业务连接数据库的时候,根据业务本身的预估,通过SET MAX_EXECUTION_TIME命令,来控制每个语句执行的最长时间,避免单个语句意外执行太长时间

其次,从数据库端来看:

  • 监控 information_schema.Innodb_trx表,设置长事务阈值,超过就报警/或者kill;

  • Percona的pt-kill这个工具不错,推荐使用;

  • 在业务功能测试阶段要求输出所有的general_log,分析日志行为提前发现问题

  • 如果使用的是MySQL 5.6或者更新版本,把innodb_undo_tablespaces设置成2(或更大的值)。如果真的出现大事务导致回滚段过大,这样设置后清理起来更方便