Cassandra源码阅读之Batch实现

Cassandra的batch,就是一次提交多个修改操作,节省传输请求的资源消耗。同时也可以理解为一种事务的解决方案——all or nothing,这些操作可以保证要么都成功要么都不成功,即原子性。

这里需要注意几点:

1:batch只保证原子性,要么都成功要么都失败,不保证隔离性。就是说可能存在某个时间点,batch的若干个修改只能读到一部分。同时batch也可以指定本次操作不需要保证原子性,这样显然性能会更高。另外counter因为不是幂等操作(多加了一次总和就变了)以及其实现比较复杂,不支持原子性的batch。

2:batch中针对同一个row的操作会先合并再写,直接变成了一条操作。所以Cassandra是可以通过static column的batch来实现订单和余额在一个表,一个人的全部订单和余额放在一个row,对一个row key(userid)同时写成交新订单和余额修改操作,这样可以通过batch来实现原子性+隔离性。换句话说,只涉及到一个人的余额变化及订单成交,没有另一个人的余额变化,是可以搞的,而另一种事务的典型场景——一个人给另一个人转账,一个余额增加一个余额减少——这种跨行事务Cassandra应该至少目前是搞不定的。

3:非batch的普通修改请求,在写失败的时候不会回滚,比如QUORUM的操作,写两份client才认为成功,假如3个节点只写成了一份另两份超时,client会报错说写入失败,但实际上写成了的那份也没回滚。不过这个倒是问题不大,毕竟写失败的时候client可以重试的,幂等的操作反复写也没事。

执行batch操作的逻辑,和其他逻辑一样,CQL解析完了之后发现这是batch,就把每个写操作封装成List<Mutation>执行StorageProxy.mutateWithTriggers,然后根据操作的对象或语句option来判断应该mutateAtomically还是mutate。mutate就跟普通的写操作没区别了,每条操作挨个执行。

为了保证原子性,写入之前会先生成一个batchlog,将整个batch内全部Mutation序列化,带上当前时间戳和batch请求的时间戳(可以是客户端指定也可以是用接受客户端请求的节点的时间戳),传给本地机房的若干个节点,要求满足一致性要求的节点数并且不小于2。因为batch请求的row key可能不同,这里存储batchlog的节点的选择其实只是找若干个尽可能不在同一个机架上的节点,就是为了防丢数据。

CREATE TABLE system.batchlog (
    id uuid PRIMARY KEY,
    data blob,
    version int,
    written_at timestamp
)

batchlog的表定义如上,也因为会把全部Mutation序列化成blob存储,如果操作占用的空间太大(默认是5K),会报warning,毕竟大的blob的读写会略微影响Cassandra的性能。如果这里写入超时了,整个请求报超时异常返回。

然后就可以正式的写数据了,逐条发送给row key所在的节点。这步总体是同步的,但batch中的每个Mutation是异步的发,最后阻塞等待全部返回。如果这里有某个Mutation超时,那整个请求抛超时异常返回。

执行完这步没抛异常,说明所有Mutation都写成功了,返回client成功并异步删除batchlog。

显然只有这些是不够的,因为如果在写Mutation的时候部分请求没写成,会导致整个batch部分成功部分失败。当然,如果这个时候马上来读(或者Mutation正在执行还不知道是否全部成功的时候),就是所谓不满足隔离性的时候,这个时候读只能读到那些已经成功的,还在执行的和已经失败的还是读不到。

而这个时候就需要batchlog起作用了。每个节点有个独立的BatchlogManagerMBean,来维护batchlog信息。这个MBean用scheduleWithFixedDelay间隔60秒来起一个线程把本地batchlog表中所有的log拿出来去再把每个Mutation写一遍(也因此batchlog要求必须是幂等的操作,也就是不能操作counter,因为不怕重复执行)。这里会涉及到一些细节:比如为了不加重负载写的时候有个限速(顺便说下Cassandra的所有限速都是通过guava的RateLimiter来实现);每次从batchlog里拿128条去写,然后while循环不断的取直到剩余的log不到128了就停止隔下一分钟再取;写batchlog的时候得记录batch生成的时间戳以及整个batch的超时时间,确保只把那些确实超时了之后还没删除的batchlog执行replay(因为这些很可能是因为写失败了没删掉log)。全部Mutation都写成了,就删除这条log,否则留着等下一次执行再试一次,直到最终都写成了。

因此,所谓batch保证原子性,要么都成功要么都失败,其实就是只要batchlog写成了,那最终一定保证都成功,只有写batchlog的时候就悲剧了,才会都失败。而且因为batchlog至少要写两份,只要有一份写成了,那最终也肯定都会把所有Mutation写进去。

另外,batch中的多次写操作如果其中有一个是用了CAS的,那么整个batch的所有操作必须是针对同一个row的,因为同一个row的操作会合并成一个操作,这样可以一次CAS请求写全部。如果多个row的batch有了CAS就是不可控的了,一旦这个CAS挂了其他row的操作没办法回滚,就不能保证原子性了。

总结起来说,就是这个batch,只能降低一点点client到node的网络传输,但并不能提高性能(反而下降因为要有batchlog),而且因为一旦跨行/跨表就不能保证隔离性,所以本质上跟client逐个写然后把没成功的若干个反复重试没有区别,顶多是他是俩节点存batchlog不怕宕机而客户端可能写一半机器挂了。甚至理论上client上用异步接口同时写N条可能更快,因为client driver一般会优化,在选择节点的时候是先看row key的位置的,也就是说虽然Cassandra不需要smart client,随便连一个node就能执行全部操作,但实际上一般client实现的时候还是自己很smart的从3个节点中选择一个来连,这样三备份的时候有其中一个请求是直接读写本地,速度肯定最快,对系统的负担也最小。

所以batch这个功能在大多数情况下是不建议你用的……只有前面提到的基于static column来实现在一个row中同时成交订单并修改余额,才有用batch的意义,因为CQL的写操作是针对一个column的,而batch可以合并同一个row的各种操作变成一次保证原子性+隔离性的操作。另外官方举的余额相关的例子是通过CAS改一个普通的数字column然后logged batch一个row来实现的,而非counter上直接减去多少,毕竟余额这玩意你得先判断有才能成交。


已发布

分类

来自

标签:

评论

  1. […] RAMP的全称是“Read Atomic MultiPartition”,就是分布式的“读原子性”。文章定义出一种新的隔离性——Read Atomic Isolation。简单说就是在分布式数据库中一个对多行同时写入的事务,需要保证其他客户端在读取事务发生时要么可以见到其修改的所有行,要么都见不到。相对来说,ACID中的A是描述的是“写原子性”,写入多行的事务要么都成功要么都不成功。而Cassandra已经通过logged batch来实现了写原子性。读原子性只保证了单个写入事务在其他读取事务面前的原子性,但是多个事务同时写的话不保证隔离性。所以RA隔离性的事务可以满足诸如“不能在某个时刻同时读到A的好友里有B,而B的好友里没A的状态”这样的需求,经典的“银行转账”类跨行事务的需求因为可能涉及到并发写,RA隔离性是搞不定的。这就意味着RAMP事务只满足了一部分需求,当然性能也会比满足大部分需求的要好。 […]

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注