package hbase import ( "bytes" "crypto/md5" "encoding/binary" "encoding/hex" "fmt" "sync" "time" pb "github.com/golang/protobuf/proto" "github.com/juju/errors" "github.com/ngaut/go-zookeeper/zk" "github.com/ngaut/log" "github.com/pingcap/go-hbase/proto" ) const ( zkRootRegionPath = "/meta-region-server" zkMasterAddrPath = "/master" magicHeadByte = 0xff magicHeadSize = 1 idLengthSize = 4 md5HexSize = 32 servernameSeparator = "," rpcTimeout = 30000 pingTimeout = 30000 callTimeout = 5000 defaultMaxActionRetries = 3 // Some operations can take a long time such as disable of big table. // numRetries is for 'normal' stuff... Multiply by this factor when // want to wait a long time. retryLongerMultiplier = 31 socketDefaultRetryWaitMs = 200 defaultRetryWaitMs = 100 // always >= any unix timestamp(hbase version) beyondMaxTimestamp = "99999999999999" ) var ( hbaseHeaderBytes []byte = []byte("HBas") metaTableName []byte = []byte("hbase:meta") metaRegionName []byte = []byte("hbase:meta,,1") ) var retryPauseTime = []int64{1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200} type RegionInfo struct { Server string StartKey []byte EndKey []byte Name string Ts string TableNamespace string TableName string Offline bool Split bool } type tableInfo struct { tableName string families []string } // export client interface type HBaseClient interface { Get(tbl string, g *Get) (*ResultRow, error) Put(tbl string, p *Put) (bool, error) Delete(tbl string, d *Delete) (bool, error) TableExists(tbl string) (bool, error) DropTable(t string) error DisableTable(t string) error EnableTable(t string) error CreateTable(t *TableDescriptor, splits [][]byte) error ServiceCall(table string, call *CoprocessorServiceCall) (*proto.CoprocessorServiceResponse, error) LocateRegion(table, row []byte, useCache bool) (*RegionInfo, error) GetRegions(table []byte, useCache bool) ([]*RegionInfo, error) Split(tblOrRegion, splitPoint string) error CleanRegionCache(table []byte) CleanAllRegionCache() Close() error } // hbase client implemetation var _ HBaseClient = (*client)(nil) type client struct { mu sync.RWMutex // for read/update region info zkClient *zk.Conn zkHosts []string zkRoot string prefetched map[string]bool cachedConns map[string]*connection cachedRegionInfo map[string]map[string]*RegionInfo maxRetries int rootServerName *proto.ServerName masterServerName *proto.ServerName } func serverNameToAddr(server *proto.ServerName) string { return fmt.Sprintf("%s:%d", server.GetHostName(), server.GetPort()) } func cachedConnKey(addr string, srvType ServiceType) string { return fmt.Sprintf("%s|%d", addr, srvType) } func NewClient(zkHosts []string, zkRoot string) (HBaseClient, error) { cl := &client{ zkHosts: zkHosts, zkRoot: zkRoot, cachedConns: make(map[string]*connection), cachedRegionInfo: make(map[string]map[string]*RegionInfo), prefetched: make(map[string]bool), maxRetries: defaultMaxActionRetries, } err := cl.init() if err != nil { return nil, errors.Trace(err) } return cl, nil } func (c *client) decodeMeta(data []byte) (*proto.ServerName, error) { if data[0] != magicHeadByte { return nil, errors.New("unknown packet") } var n int32 err := binary.Read(bytes.NewBuffer(data[1:]), binary.BigEndian, &n) if err != nil { return nil, errors.Trace(err) } dataOffset := magicHeadSize + idLengthSize + int(n) data = data[(dataOffset + 4):] var mrs proto.MetaRegionServer err = pb.Unmarshal(data, &mrs) if err != nil { return nil, errors.Trace(err) } return mrs.GetServer(), nil } // init and get root region server addr and master addr func (c *client) init() error { zkclient, _, err := zk.Connect(c.zkHosts, time.Second*30) if err != nil { return errors.Trace(err) } c.zkClient = zkclient res, _, _, err := c.zkClient.GetW(c.zkRoot + zkRootRegionPath) if err != nil { return errors.Trace(err) } c.rootServerName, err = c.decodeMeta(res) if err != nil { return errors.Trace(err) } log.Debug("connect root region server...", c.rootServerName) serverAddr := serverNameToAddr(c.rootServerName) conn, err := newConnection(serverAddr, ClientService) if err != nil { return errors.Trace(err) } // Set buffered regionserver conn. cachedKey := cachedConnKey(serverAddr, ClientService) c.cachedConns[cachedKey] = conn res, _, _, err = c.zkClient.GetW(c.zkRoot + zkMasterAddrPath) if err != nil { return errors.Trace(err) } c.masterServerName, err = c.decodeMeta(res) if err != nil { return errors.Trace(err) } return nil } // get connection func (c *client) getConn(addr string, srvType ServiceType) (*connection, error) { connKey := cachedConnKey(addr, srvType) c.mu.RLock() conn, ok := c.cachedConns[connKey] c.mu.RUnlock() if ok { return conn, nil } var err error conn, err = newConnection(addr, srvType) if err != nil { return nil, errors.Errorf("create new connection failed - %v", errors.ErrorStack(err)) } c.mu.Lock() c.cachedConns[connKey] = conn c.mu.Unlock() return conn, nil } func (c *client) getAdminConn(addr string) (*connection, error) { return c.getConn(addr, AdminService) } func (c *client) getClientConn(addr string) (*connection, error) { return c.getConn(addr, ClientService) } func (c *client) getMasterConn() (*connection, error) { return c.getConn(serverNameToAddr(c.masterServerName), MasterService) } func (c *client) doAction(conn *connection, req pb.Message) (chan pb.Message, error) { cl := newCall(req) err := conn.call(cl) if err != nil { return nil, errors.Trace(err) } return cl.responseCh, nil } func (c *client) adminAction(req pb.Message) (chan pb.Message, error) { conn, err := c.getMasterConn() if err != nil { return nil, errors.Trace(err) } return c.doAction(conn, req) } func (c *client) regionAction(addr string, req pb.Message) (chan pb.Message, error) { conn, err := c.getAdminConn(addr) if err != nil { return nil, errors.Trace(err) } return c.doAction(conn, req) } // http://stackoverflow.com/questions/27602013/correct-way-to-get-region-name-by-using-hbase-api func (c *client) createRegionName(table, startKey []byte, id string, newFormat bool) []byte { if len(startKey) == 0 { startKey = make([]byte, 1) } b := bytes.Join([][]byte{table, startKey, []byte(id)}, []byte{','}) if newFormat { m := md5.Sum(b) mhex := []byte(hex.EncodeToString(m[:])) b = append(bytes.Join([][]byte{b, mhex}, []byte{'.'}), '.') } return b } func (c *client) parseRegion(rr *ResultRow) (*RegionInfo, error) { regionInfoCol, ok := rr.Columns["info:regioninfo"] if !ok { return nil, errors.Errorf("Unable to parse region location (no regioninfo column): %#v", rr) } offset := bytes.Index(regionInfoCol.Value, []byte("PBUF")) + 4 regionInfoBytes := regionInfoCol.Value[offset:] var info proto.RegionInfo err := pb.Unmarshal(regionInfoBytes, &info) if err != nil { return nil, errors.Errorf("Unable to parse region location: %#v", err) } ri := &RegionInfo{ StartKey: info.GetStartKey(), EndKey: info.GetEndKey(), Name: bytes.NewBuffer(rr.Row).String(), TableNamespace: string(info.GetTableName().GetNamespace()), TableName: string(info.GetTableName().GetQualifier()), Offline: info.GetOffline(), Split: info.GetSplit(), } if v, ok := rr.Columns["info:server"]; ok { ri.Server = string(v.Value) } return ri, nil } func (c *client) getMetaRegion() *RegionInfo { return &RegionInfo{ StartKey: []byte{}, EndKey: []byte{}, Name: string(metaRegionName), Server: serverNameToAddr(c.rootServerName), } } func (c *client) getCachedLocation(table, row []byte) *RegionInfo { c.mu.RLock() defer c.mu.RUnlock() tableStr := string(table) if regions, ok := c.cachedRegionInfo[tableStr]; ok { for _, region := range regions { if (len(region.EndKey) == 0 || bytes.Compare(row, region.EndKey) < 0) && (len(region.StartKey) == 0 || bytes.Compare(row, region.StartKey) >= 0) { return region } } } return nil } func (c *client) updateRegionCache(table []byte, region *RegionInfo) { c.mu.Lock() defer c.mu.Unlock() tableStr := string(table) if _, ok := c.cachedRegionInfo[tableStr]; !ok { c.cachedRegionInfo[tableStr] = make(map[string]*RegionInfo) } c.cachedRegionInfo[tableStr][region.Name] = region } func (c *client) CleanRegionCache(table []byte) { c.mu.Lock() defer c.mu.Unlock() delete(c.cachedRegionInfo, string(table)) } func (c *client) CleanAllRegionCache() { c.mu.Lock() defer c.mu.Unlock() c.cachedRegionInfo = map[string]map[string]*RegionInfo{} } func (c *client) LocateRegion(table, row []byte, useCache bool) (*RegionInfo, error) { // If user wants to locate metaregion, just return it. if bytes.Equal(table, metaTableName) { return c.getMetaRegion(), nil } // Try to get location from cache. if useCache { if r := c.getCachedLocation(table, row); r != nil { return r, nil } } // If cache missed or not using cache, try to get and update region info. metaRegion := c.getMetaRegion() conn, err := c.getClientConn(metaRegion.Server) if err != nil { return nil, errors.Trace(err) } regionRow := c.createRegionName(table, row, beyondMaxTimestamp, true) call := newCall(&proto.GetRequest{ Region: &proto.RegionSpecifier{ Type: proto.RegionSpecifier_REGION_NAME.Enum(), Value: metaRegionName, }, Get: &proto.Get{ Row: regionRow, Column: []*proto.Column{&proto.Column{ Family: []byte("info"), }}, ClosestRowBefore: pb.Bool(true), }, }) err = conn.call(call) if err != nil { return nil, errors.Trace(err) } response := <-call.responseCh switch r := response.(type) { case *proto.GetResponse: res := r.GetResult() if res == nil { return nil, errors.Errorf("Empty region: [table=%s] [row=%q] [region_row=%q]", table, row, regionRow) } rr := NewResultRow(res) region, err := c.parseRegion(rr) if err != nil { return nil, errors.Trace(err) } c.updateRegionCache(table, region) return region, nil case *exception: return nil, errors.New(r.msg) default: log.Warnf("Unknown response - %T - %v", r, r) } return nil, errors.Errorf("Couldn't find the region: [table=%s] [row=%q] [region_row=%q]", table, row, regionRow) } func (c *client) GetRegions(table []byte, useCache bool) ([]*RegionInfo, error) { var regions []*RegionInfo startKey := []byte("") // Get first region. region, err := c.LocateRegion(table, []byte(startKey), useCache) if err != nil { return nil, errors.Errorf("couldn't find any region: [table=%s] [useCache=%t]", table, useCache) } regions = append(regions, region) startKey = region.EndKey for len(startKey) > 0 { region, err = c.LocateRegion(table, []byte(startKey), useCache) if err != nil { return nil, errors.Trace(err) } regions = append(regions, region) startKey = region.EndKey } return regions, nil } func (c *client) Close() error { if c.zkClient != nil { c.zkClient.Close() } for _, conn := range c.cachedConns { err := conn.close() if err != nil { return errors.Trace(err) } } return nil }