funcHandler(w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodGet { get(w, r) return } if m == http.MethodDelete { del(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }
funcHandler(w http.ResponseWriter, r *http.Request) { m := r.Method // 增加POST方法的处理函数post if m == http.MethodPost { post(w, r) return } if m == http.MethodPut { put(w, r) return } if m == http.MethodGet { get(w, r) return } if m == http.MethodDelete { del(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }
// 检查访问方式并调用相应的函数 funcHandler(w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodHead { head(w, r) return } if m == http.MethodPut { put(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }
funcput(w http.ResponseWriter, r *http.Request) { token := strings.Split(r.URL.EscapedPath(), "/")[2] stream, e := rs.NewRSResumablePutStreamFromToken(token) if e != nil { log.Println(e) w.WriteHeader(http.StatusForbidden) return } // 调用CurrentSize获取token当前大小 current := stream.CurrentSize() if current == -1 { w.WriteHeader(http.StatusNotFound) return } // 从Range头部获得offset offset := utils.GetOffsetFromHeader(r.Header) // 如果offset和当前的大小不一致,接口服务返回416Range Not Satisfiable if current != offset { w.WriteHeader(http.StatusRequestedRangeNotSatisfiable) return } bytes := make([]byte, rs.BLOCK_SIZE) // 在for循环中以32000字节为长度读取HTTP请求的正文并写入stream for { n, e := io.ReadFull(r.Body, bytes) if e != nil && e != io.EOF && e != io.ErrUnexpectedEOF { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } current += int64(n) // 如果读到的总长度超出了对象的大小,说明客户端上传的数据有误 // 接口服务删除临时对象并返回403 Forbidden if current > stream.Size { stream.Commit(false) log.Println("resumable put exceed size") w.WriteHeader(http.StatusForbidden) return } if n != rs.BLOCK_SIZE && current != stream.Size { return } stream.Write(bytes[:n]) // 读到的总长度等于对象的大小,说明客户端上传了对象的全部数据 if current == stream.Size { // 调用Flush方法将剩余数据写入临时对象 stream.Flush() // 调用NewRSResumableGetStream生成一个临时对象读取流getStream getStream, e := rs.NewRSResumableGetStream(stream.Servers, stream.Uuids, stream.Size) // 读取getStream中的数据并计算散列值 hash := url.PathEscape(utils.CalculateHash(getStream)) // 散列值不一致,客户端上传数据有错误 if hash != stream.Hash { // 接口服务删除临时对象 stream.Commit(false) log.Println("resumable put done but hash mismatch") // 返回403 Forbidden w.WriteHeader(http.StatusForbidden) return } // 检查该散列值是否已经存在 if locate.Exist(url.PathEscape(hash)) { // 存在则删除临时对象 stream.Commit(false) } else { // 否则将对象转正 stream.Commit(true) } // 调用es.AddVersion添加新版本 e = es.AddVersion(stream.Name, stream.Hash, stream.Size) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) } return } } }
funcNewRSResumablePutStreamFromToken(token string)(*RSResumablePutStream, error) { // 对token进行Base64解码 b, e := base64.StdEncoding.DecodeString(token) if e != nil { returnnil, e }
var t resumableToken // 将JSON数据编出形成resumableToken结构体t e = json.Unmarshal(b, &t) if e != nil { returnnil, e }
接口服务会将客户端PUT上来的数据流切成4+2的分片,然后向6个数据服务节点上传临时对象<hash>.X,同时计算对象散列值。如果散列值一致,6个临时对象会被转成正式的分片对象<hash>.X,其内容被保存在$STORAGE_ROOT/objects/<hash>.X.<hash of shard X>文件中。其中, X的取值范围为0~5的整数,表示该分片的id,0~3是数据片,4和5则是校验片。<hash of shard X>是该分片的散列值,在转正时通过计算临时对象的内容得到。
当数据服务的接口收到的请求对象是<hash>.X,数据服务需要自己去$STORAGE_ROOT/objects目录下寻找<hash>.X开头的文件<hash>.X.<hash of shard X>,然后在返回该文件内容时还需要进行数据校验,确保其内容的散列值和<hash of shard X>一致。如果不一致,则需要删除该分片对象并返回404 Not Found。这是为了防止该分片对象被数据降解破坏。
接口服务可能由于分片定位失败或分片散列值不一致等原因无法获取某个分片的数据。如果失败的分片数小于等于2,接口服务会根据成功获取的分片重塑数据,并将新的分片写入随机的数据服务;如果失败的分片数大于等于3,对这样的数据丢失无能为力,只能向客户端返回404 Not Found。
管道网络传输:HTTP/1.1 采用了长连接的方式,这使得管道(pipeline)网络传输成为可能。即可在同一个 TCP 连接里面,客户端可以发起多个请求,只要第一个请求发出去了,不必等其回来,就可以发第二个请求出去,可以减少整体的响应时间。举例来说,客户端需要请求两个资源。以前的做法是,在同一个TCP连接里面,先发送 A 请求,然后等待服务器做出回应,收到后再发出 B 请求。管道机制则是允许浏览器同时发出 A 请求和 B 请求。
队头阻塞:但是服务器还是按照顺序,先回应 A 请求,完成后再回应 B 请求。要是前面的回应特别慢,后面就会有许多请求排队等着。这称为队头堵塞。
[~/path/to/repo]$ git config user.name "dongsihfu" [~/path/to/repo]$ git config user.email 自己的git邮箱 This change will only affect future commits. Past commits will retain the username and address they were committed with.
funcput(w http.ResponseWriter, r *http.Request) { // 先从HTTP请求头部获取对象的散列值 hash := utils.GetHashFromHeader(r.Header) if hash == "" { log.Println("missing object hash in digest header") w.WriteHeader(http.StatusBadRequest) return } // 从URL中获取对象的大小 size := utils.GetSizeFromHeader(r.Header) // 以散列值和size作为参数调用stroreObject // 新实现的storeObject需要在一开始就确定临时对象大小 c, e := storeObject(r.Body, hash, size) if e != nil { log.Println(e) w.WriteHeader(c) return } if c != http.StatusOK { w.WriteHeader(c) return }
name := strings.Split(r.URL.EscapedPath(), "/")[2] e = es.AddVersion(name, hash, size) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) } }
funcstoreObject(r io.Reader, hash string, size int64)(int, error) { // 首先调用locate.Exist方法定位对象的散列值 // 如果已经存在,跳过后续上传操作直接返回200 OK if locate.Exist(url.PathEscape(hash)) { return http.StatusOK, nil }
// 不存在,调用putStream生成对象的写入流stream用于写入 stream, e := putStream(url.PathEscape(hash), size) if e != nil { return http.StatusInternalServerError, e } // 两个输入参数,分别是作为io.Reader的r和io.Writer的stream // 返回的reader也是一个io.Reader reader := io.TeeReader(r, stream) // reader被读取的时候,实际的内容读取自r,同时也会写入stream // 用utils.CalculateHash从reader中读取数据的同时也写入了stream d := utils.CalculateHash(reader) // 计算出来的散列值和hash做比较 // 不一致则调用stream.Commit(false)删除临时对象,并返回400 Bad Request if d != hash { stream.Commit(false) return http.StatusBadRequest, fmt.Errorf("object hash mismatch, calculated=%s, requested=%s", d, hash) } // 一致则调用stream.Commit(true)将临时对象转正并返回200 OK stream.Commit(true) return http.StatusOK, nil }
funcputStream(hash string, size int64)(*objectstream.TempPutStream, error) { server := heartbeat.ChooseRandomDataServer() if server == "" { returnnil, fmt.Errorf("cannot find any dataServer") }
funcStartLocate() { q := rabbitmq.New(os.Getenv("RABBITMQ_SERVER")) defer q.Close() q.Bind("dataServers") c := q.Consume() for msg := range c { hash, e := strconv.Unquote(string(msg.Body)) if e != nil { panic(e) }
funcHandler(w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodPut { put(w, r) return } if m == http.MethodPatch { patch(w, r) return } if m == http.MethodPost { post(w, r) return } if m == http.MethodDelete { del(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }
funcHandler(w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodGet { get(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) }