Cassandra&HBase源码阅读之“顺序”读写

除了redis这类单线程的数据库,一般的多线程数据库在读写数据的时候必然会涉及到资源的抢占或同时操作一行数据带来的版本控制问题。说白了就是,某次读可以读到哪些写、需要屏蔽哪些写、需要暂时阻塞住哪些请求。于是就需要将读写请求维护出一个“顺序”来。

Cassandra不需要太多的保证读写顺序,因为最后都是靠时间戳控制版本。需要考虑顺序的主要是涉及到MemTable需要switch或者flush的时候保证不会因为MemTable被清空而导致读写失败。在2.0和之前的版本,直接通过 ReentrantReadWriteLock来加锁维护,读写操作都是读锁,切换MemTable是写锁。在2.1开始用了一个OpOrder的概念来无锁地控制顺序,只需要在需要控制顺序的地方用jdk7的新语法糖来获取一个对象并自动close:

try (OpOrder.Group op = writeOrdering.start()){
    //do anything
}

若干个通过这种方式维护的操作被囊括在一个Group内,每个Group内部通过cas维护一个原子计数器,可以理解为start一次+1,close一次-1。需要在若干次操作的序列中制造一个间隔点时——比如当MemTable需要flush时,确保某次写入及之前的读取还可以写入这个MemTable,等都写完了才flush,并确保下一次写入就需要去写其他的MemTable ——也就类似之前需要获取写锁的时候,执行两个操作——获取和等待:

writeBarrier = writeOrder.newBarrier();
writeBarrier.issue();

writeBarrier.await();

在这个OpOrder对象中新建一个Barrier,并将这个Barrier issue(没issue的barrier不影响这个OpOrder)。issue操作会将OpOrder当前的Group过期,之后再执行start返回的会是下一个新的Group。过期的Group继续监控计数器,一旦其对应的各种操作都close掉,则这个Group的所有操作都执行完毕,这时会将自己unlink。Barrier被issue后可以在需要的时候执行await,也就是阻塞等待之前的Group unlink。同时每个group对象也可以跟某个barrier比对,如果group “after” barrier,就意味着不能进行对应的操作。比如某个MemTable的barrier “before” 某个写入操作的Group,那么这个写入操作就不能往这个MemTable写,需要找别的MemTable内存池。

读取的时候逻辑更简单,每个Group不需要判断和barrier的关系,而是上层逻辑确保每个MemTable在flush后、删除前,等待这个时间点之前开启的读取操作完成(即某个readBarrier等待readOrder的这个group全部close)再删掉这个MemTable即可。标记为flush的MemTable相当于变成只读。

此外在Commitlog层面,维护了一个顺序叫ReplayPosition。内部分segment和position。后者是个原子int来保证每条commit log有递增的id。同时每个SSTable会保存自己数据所对应的最大的ReplayPosition。这样当重启读log恢复时直接从对应segment的最大的position+1开始读即可。

 

然后是HBase。

HBase有region的概念,在HRegion层面首先要保证各种增删改查和region的split不冲突。于是用了一个读写锁来保证。写锁只被split之类的锁住,其他的普通操作包括compaction都是锁住读锁。感觉上似乎可以也改成OpOrder这种无锁的形式?不知道两者的性能差多少。

在写操作的时候,region会以行和线程为单位维护锁,一个ConcurrentHashMap<Row,RowLock>来存。取的时候要保证拿出来的锁是本线程放的,如果不是需要等对应线程释放。不过没太仔细看没明白啥时候会导致一个线程反复拿锁。拿到锁之后获取当前系统时间戳(貌似直接拿System的,不知道遇到闰秒或者时间被修改之类情况如何处理)作为Cell内部的各种时间戳。然后拿一个”updateLock”读写锁的读锁,这个锁和Cassandra2.0的读写锁用处差不多,负责保证MemTable的flush不会影响写操作,应该也是可以改成类似OpOrder的无锁版本的。

类似Cassandra的ReplayPosition,HBase在每个region中维护了一个AtomicLong,以写入WAL的顺序来给每个写操作分配一个序列号——sequenceId。不过这个id的作用会更多。这个id首先会加+ 1000000000来作为一个mvcc的版本号,之所以加一个大数,注释里说是为了保证当前scanner不会扫到这个未完成的插入。然后这里又用了一个加锁的链表来将每次修改放入链表末尾,后面会用。而这个版本号实际上就是确保读数据的时候不会读到未完整执行完毕(即commit)的写操作。同时每个region维护一个全局的memstoreRead来设置可以读取的最大mvcc版本号,随着MemStore里的Entry更新也就是commit随时将这个值设置成最大的版本号来允许读取。

然后把数据插入到MemStore中,因为mvcc的版本设的很大,所以这时候是读不到的,同时会备份所有update的对象引用到一个list留作后续更新mvcc的版本号。另外似乎HBase的MemStore一直没搞off heap吧?然后写WAL,做完持久化后可以释放各种region锁、行锁,并将更新mvcc的版本号变成实际的版本号这样就可以读了。这里更新mvcc的时候会用到前面说的那个链表,阻塞等待保证这次更新前的其他更新已经更新了mvcc,这样就保证了所有写操作都是有序的,不会出现读取的时候只看到后写的没看到先写的情况。而Scan的时候一般会以当前memstoreRead为基准,从而保证扫到的数据是某个时间点的一致数据,当然也可以设这个为maxlongint,这样可以扫到哪怕是没commit的任何数据。同时flush MemStore的时候也利用mvcc来等待未完成的写操作(相当于C*里那个writeBarrier.await()),不过hbase的MemStore每个CF只有一个(每个CF又有一个读写锁),里面最多有两个SkipList,一个是正常用的,当准备flush的时候切成snapshot然后开一个新的,所以读的时候要读两个。而非像C*一样在上层搞了多个MemTable每个是一个二维map。

所以执行一次写操作要拿好多个锁,读的话也至少要有一个Store内的锁。这些都是为了保证HBase所需要的ACID相关特性,不知道会多大程度上的影响HBase的性能。

 


已发布

分类

来自

标签:

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注