本文分类:
数据结构与算法

一致性 Hash 算法通常用于在分布式系统中尽可能降低因节点变动带来的数据迁移开销,本文总结集中常见的一致性 Hash 实现。

分布式系统缓存问题

假如一个分布式缓存系统有三台服务器,其IP和端口如下:

访问缓存时只要保证对相同 key 的访问会被发送到相同的服务器即可,最常用的方法是把服务器编号,访问哪台服务器采用如下算法,即计算 key 的 Hash 值,然后取模。

h = Hash(key) % N

这种算法的参考实现如下,采用一个 ArrayList 记录 Server,Hash 算法采用 32 位的 FNV1a 算法。算法虽然简单,但扩展性差,增加或者移除一台服务器将导致大量的 key 被重新定位到不同的服务器而造成缓存不命中。

private List<CacheServer> servers = new ArrayList<>();

public void addServer(String name) {
    servers.add(new CacheServer(name));
}

public void removeServer(String name) {
    Iterator<CacheServer> it = servers.iterator();
    while (it.hasNext()) {
        if (it.next().name().equals(name)) {
            it.remove();
        }
    }
}

protected CacheServer getServer(K key) {
    int hash = FNV.fvn1a32(key.toString().getBytes());
    return servers.get(Integer.remainderUnsigned(hash, servers.size()));
}

采用三个服务器,10万测试数据,测试结果如下,可见删除和新增节点对命中率的影响很大,但每个服务器的负载是比较均衡的。

#### 不使用一致性Hash ####
==== 无节点变动 ====
servers: 3
127.0.0.1:40000, keysize: 33369
127.0.0.2:40000, keysize: 33333
127.0.0.3:40000, keysize: 33298
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

==== 新增一个节点 ====
servers: 4
127.0.0.1:40000, keysize: 33369
127.0.0.2:40000, keysize: 33333
127.0.0.3:40000, keysize: 33298
127.0.0.4:40000, keysize: 0
total: 100000
available: 100000
hit:24983,
total hit ratio = 24.98%
available hit ratio = 24.98%

==== 新增节点后再次Hash ====
servers: 4
127.0.0.1:40000, keysize: 50031
127.0.0.2:40000, keysize: 50010
127.0.0.3:40000, keysize: 49975
127.0.0.4:40000, keysize: 25001
total: 100000
available: 175017
hit:100000,
total hit ratio = 100.00%
available hit ratio = 57.14%

==== 删除一个节点 ====
servers: 2
127.0.0.1:40000, keysize: 33369
127.0.0.2:40000, keysize: 33333
total: 100000
available: 66702
hit:33319,
total hit ratio = 33.32%
available hit ratio = 49.95%

==== 删除节点后再次Hash ====
servers: 2
127.0.0.1:40000, keysize: 66693
127.0.0.2:40000, keysize: 66690
total: 100000
available: 133383
hit:100000,
total hit ratio = 100.00%
available hit ratio = 74.97%

经典一致性 Hash 算法

经典一致性 Hash 算法是指 David Karger 等在Consistent Hashing and Random Trees: Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web一文中提出的基本实现,其原理是:

  1. 构造一个长度为2^32的整数环(这个环被称为一致性 Hash 环)
  2. 将服务器根据其 Hash 值(其分布为[0, 2^32-1])放置在 Hash 环上
  3. 计算要访问的数据的 Key 的 Hash 值(其分布也为[0, 2^32-1]),也放置在 Hash 环上
  4. 在 Hash 环上顺时针查找距离这个 Key 值的 Hash 值最近的服务器节点

Hash 环的实现可使用平衡二叉树,Hash 函数应冲突尽量少且分布足够散列。下面是一种参考实现,采用一个 TreeMap 作为 Hash 环并记录Server,Hash 算法采用 32 位的 FNV1a 算法。

private SortedMap<Integer, CacheServer> servers = new TreeMap<>();

public void addServer(String name) {
    int hash = FNV.fvn1a32(name.getBytes());
    servers.put(hash, new CacheServer(name));
}

public void removeServer(String name) {
    int hash = FNV.fvn1a32(name.getBytes());
    servers.remove(hash);
}

protected CacheServer getServer(K key) {
    int hash = FNV.fvn1a32(key.toString().getBytes());
    if (servers.containsKey(hash)) {
        return servers.get(hash);
    }

    SortedMap<Integer, CacheServer> tailMap = servers.tailMap(hash);
    if (tailMap.isEmpty()) {
        return servers.get(servers.firstKey());
    }
    return servers.get(tailMap.firstKey());

}

同样采用三个服务器,10万测试数据,测试结果如下,可以看到,增加服务器后的影响较小,而删除节点后不影响已有节点的读取。

#### 一致性 Hash ####
==== 无节点变动 ====
servers: 3
127.0.0.3:40000, keysize = 54030, hash = 0x86d81976
127.0.0.2:40000, keysize = 42667, hash = 0xf34e8f45
127.0.0.1:40000, keysize = 3303, hash = 0xfc5a05c8
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

==== 新增一个节点 ====
servers: 4
127.0.0.3:40000, keysize = 54030, hash = 0x86d81976
127.0.0.4:40000, keysize = 0, hash = 0x9c516553
127.0.0.2:40000, keysize = 42667, hash = 0xf34e8f45
127.0.0.1:40000, keysize = 3303, hash = 0xfc5a05c8
total: 100000
available: 100000
hit:91660,
total hit ratio = 91.66%
available hit ratio = 91.66%

==== 新增节点后再次Hash ====
servers: 4
127.0.0.3:40000, keysize = 54030, hash = 0x86d81976
127.0.0.4:40000, keysize = 8340, hash = 0x9c516553
127.0.0.2:40000, keysize = 42667, hash = 0xf34e8f45
127.0.0.1:40000, keysize = 3303, hash = 0xfc5a05c8
total: 100000
available: 108340
hit:100000,
total hit ratio = 100.00%
available hit ratio = 92.30%

==== 删除一个节点 ====
servers: 2
127.0.0.2:40000, keysize = 42667, hash = 0xf34e8f45
127.0.0.1:40000, keysize = 3303, hash = 0xfc5a05c8
total: 100000
available: 45970
hit:45970,
total hit ratio = 45.97%
available hit ratio = 100.00%

==== 删除节点后再次Hash ====
servers: 2
127.0.0.2:40000, keysize = 96697, hash = 0xf34e8f45
127.0.0.1:40000, keysize = 3303, hash = 0xfc5a05c8
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

带虚拟节点的一致性 Hash

一致性 Hash 虽然减少了增删节点带来的影响,但是存在平衡性方面的问题,当集群中发生节点添加时,该节点只承担了 Hash 环上两个节点间的一部分数据的访问,服务器的负载并不均衡。而当集群中发生节点删除时,被删除的节点上的数据就会落在下一个节点上,加重了下个节点的负担。

解决的方法就是引入虚拟节点,数据的 Key 不直接映射到实际服务器,而是映射到虚拟节点,每个实际节点对应若干个虚拟节点。下面是一种参考实现,采用一个 TreeMap 作为 Hash 环并记录虚拟节点,虚拟节点为原来的节点后面加上后缀“_VN虚拟节点编号”,如 127.0.0.1:4000 对应的第0个虚拟节点为127.0.0.1:4000_VN0,默认一个实际节点对应 160 虚拟节点。Hash 算法采用 32 位的 FNV1a 算法。

private SortedMap<Integer, String> vservers = new TreeMap<>();
private Map<String, CacheServer> servers = new HashMap<>();
private int virtualNodeNum = 160;

public void addServer(String name) {
    servers.put(name, new CacheServer(name));
    IntStream.range(0, virtualNodeNum).forEach(i -> {
        String vname = name2vname(name, i);
        int vhash = FNV.fvn1a32(vname.getBytes());
        vservers.put(vhash, vname);
    });
}

public void removeServer(String name) {
    Iterator<Map.Entry<Integer, String>> it = vservers.entrySet().iterator();
    while (it.hasNext()) {
        if (vname2name(it.next().getValue()).equals(name)) {
            it.remove();
        }
    }
    servers.remove(name);
}

private String name2vname(String name, int i) {
    return name + VIRTUAL_NODE_FLAG + i;
}

private String vname2name(String vname) {
    return vname.substring(0, vname.lastIndexOf(VIRTUAL_NODE_FLAG));
}

@Override
protected CacheServer getServer(K key) {
    int hash = FNV.fvn1a32(key.toString().getBytes());
    String vname = null;
    if (vservers.containsKey(hash)) {
        vname = vservers.get(hash);
    } else {
        SortedMap<Integer, String> tailMap =
                vservers.tailMap(hash);
        if (tailMap.isEmpty()) {
            vname = vservers.get(vservers.firstKey());
        } else {
            vname = vservers.get(tailMap.firstKey());
        }
    }

    return servers.get(vname2name(vname));
}

同样采用三个服务器,10万测试数据,测试结果如下,可以看到,增加服务器后的影响较小,而删除节点后不影响已有节点的读取。

#### 带虚拟节点的一致性 Hash ####
==== 无节点变动 ====
servers: 3
127.0.0.3:40000, keysize = 24015
127.0.0.1:40000, keysize = 46441
127.0.0.2:40000, keysize = 29544
vservers: 480
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

==== 新增一个节点 ====
servers: 4
127.0.0.3:40000, keysize = 24015
127.0.0.1:40000, keysize = 46441
127.0.0.4:40000, keysize = 0
127.0.0.2:40000, keysize = 29544
vservers: 640
total: 100000
available: 100000
hit:74329,
total hit ratio = 74.33%
available hit ratio = 74.33%

==== 新增节点后再次Hash ====
servers: 4
127.0.0.3:40000, keysize = 24015
127.0.0.1:40000, keysize = 46441
127.0.0.4:40000, keysize = 25671
127.0.0.2:40000, keysize = 29544
vservers: 640
total: 100000
available: 125671
hit:100000,
total hit ratio = 100.00%
available hit ratio = 79.57%

==== 删除一个节点 ====
servers: 2
127.0.0.3:40000, keysize = 24015
127.0.0.2:40000, keysize = 29544
vservers: 320
total: 100000
available: 53559
hit:53559,
total hit ratio = 53.56%
available hit ratio = 100.00%

==== 删除节点后再次Hash ====
servers: 2
127.0.0.3:40000, keysize = 25222
127.0.0.2:40000, keysize = 74778
vservers: 320
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

Ketama 一致性 Hash

Ketama算法最初是由 Last.fm 的程序员实现的并得到了广泛的应用,一些开源框架譬如spymemcached,twemproxy等都内置了该算法的实现。

for (int i = 0; i < virtualNodeNum / 4; i++) {
    String vname = name2vname(name, i);
    byte[] digest = MD.md5(vname.getBytes());
    for (int h = 0; h < 4; h++) {
        int k = ((digest[3 + h * 4] & 0xFF) << 24)
                | ((digest[2 + h * 4] & 0xFF) << 16)
                | ((digest[1 + h * 4] & 0xFF) << 8)
                | (digest[h * 4] & 0xFF);

        vservers.put(k, vname);
    }
}

同样采用三个服务器,10万测试数据,对 Server 采用 Ketama 算法,key 仍然采用 FNA1a 算法,测试结果如下,可以看到,增加服务器后的影响较小,而且 Key 的分布更加均匀。

==== 无节点变动 ====
servers: 3
127.0.0.3:40000, keysize = 32266
127.0.0.1:40000, keysize = 38020
127.0.0.2:40000, keysize = 29714
vservers: 480
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

==== 新增一个节点 ====
servers: 4
127.0.0.3:40000, keysize = 32266
127.0.0.1:40000, keysize = 38020
127.0.0.4:40000, keysize = 0
127.0.0.2:40000, keysize = 29714
vservers: 640
total: 100000
available: 100000
hit:76272,
total hit ratio = 76.27%
available hit ratio = 76.27%

==== 新增节点后再次Hash ====
servers: 4
127.0.0.3:40000, keysize = 32266
127.0.0.1:40000, keysize = 38020
127.0.0.4:40000, keysize = 23728
127.0.0.2:40000, keysize = 29714
vservers: 640
total: 100000
available: 123728
hit:100000,
total hit ratio = 100.00%
available hit ratio = 80.82%

==== 删除一个节点 ====
servers: 2
127.0.0.3:40000, keysize = 32266
127.0.0.2:40000, keysize = 29714
vservers: 320
total: 100000
available: 61980
hit:61980,
total hit ratio = 61.98%
available hit ratio = 100.00%

==== 删除节点后再次Hash ====
servers: 2
127.0.0.3:40000, keysize = 52839
127.0.0.2:40000, keysize = 47161
vservers: 320
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

jump 一致性 Hash

jump consistent hash是一致性哈希的一种实现,论文见A Fast, Minimal Memory, Consistent Hash Algorithm,算法代码如下:

int32_t JumpConsistentHash(uint64_t key, int32_t num_buckets) {
    int64_t b = -1, j = 0;
    while (j < num_buckets) {
        b = j;
        key = key * 2862933555777941757ULL + 1;
        j = (b + 1) * (double(1LL << 31) / double((key >> 33) + 1));
    }
    return b;
}

guava里头有个现成的实现,guava-22.0-sources.jar!/com/google/common/hash/Hashing.java,其核心代码如下:

public static int consistentHash(long input, int buckets) {
    LinearCongruentialGenerator generator = new LinearCongruentialGenerator(input);
    int candidate = 0;
    int next;

    // Jump from bucket to bucket until we go out of range
    while (true) {
        next = (int) ((candidate + 1) / generator.nextDouble());
        if (next >= 0 && next < buckets) {
            candidate = next;
        } else {
            return candidate;
        }
    }
}

private static final class LinearCongruentialGenerator {
    private long state;

    public LinearCongruentialGenerator(long seed) {
        this.state = seed;
    }

    public double nextDouble() {
        state = 2862933555777941757L * state + 1;
        return ((double) ((int) (state >>> 33) + 1)) / (0x1.0p31);
    }
}

同样采用三个服务器,10万测试数据,key 采用 64位带 FNA1a 算法,测试结果如下,可以看到,增加服务器后的影响较小,而且 Key 的分布也非常均匀。

#### 使用Jump一致性Hash ####
==== 无节点变动 ====
servers: 3
127.0.0.1:40000, keysize: 33253
127.0.0.2:40000, keysize: 33655
127.0.0.3:40000, keysize: 33092
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

==== 新增一个节点 ====
servers: 4
127.0.0.1:40000, keysize: 33253
127.0.0.2:40000, keysize: 33655
127.0.0.3:40000, keysize: 33092
127.0.0.4:40000, keysize: 0
total: 100000
available: 100000
hit:74967,
total hit ratio = 74.97%
available hit ratio = 74.97%

==== 新增节点后再次Hash ====
servers: 4
127.0.0.1:40000, keysize: 33253
127.0.0.2:40000, keysize: 33655
127.0.0.3:40000, keysize: 33092
127.0.0.4:40000, keysize: 25033
total: 100000
available: 125033
hit:100000,
total hit ratio = 100.00%
available hit ratio = 79.98%

==== 删除一个节点 ====
servers: 2
127.0.0.1:40000, keysize: 33253
127.0.0.2:40000, keysize: 33655
total: 100000
available: 66908
hit:66908,
total hit ratio = 66.91%
available hit ratio = 100.00%

==== 删除节点后再次Hash ====
servers: 2
127.0.0.1:40000, keysize: 49832
127.0.0.2:40000, keysize: 50168
total: 100000
available: 100000
hit:100000,
total hit ratio = 100.00%
available hit ratio = 100.00%

参考文献

本文来自 [时光记 - 王智超的个人空间](www.hiwzc.com),转载请注明出处。