Kudu is a new storage system designed and implemented from the ground up to fill this gap between high-throughput sequential-access storage systems such as HDFS[27] and low-latency random-access systems such as HBase or Cassandra.
Kudu的定位很明确,就是要填补高吞吐的顺序读与低延迟的随机读之间的gap,即,要balance,满足对两方面都有需求的业务;
In particular, Kudu o ers a simple API for row-level inserts, updates, and deletes, while providing table scans at throughputs similar to Parquet, a commonly-used columnar format for static data.
什么是Parquet,一种支持嵌套的列存格式实现,基于Dremel的算法
Kudu即提供简单的基于行的读写API,也提供基于列存的扫表的支持
Kudu at a high level
Tables and schemas
Kudu is a storage system for tables of structured data.
A Kudu cluster may have any number of tables, each of which has a well-defined schema consisting of a finite number of columns.
Each such column has a name, type (e.g INT32 or STRING) and optional nullability.
Some ordered subset of those columns are specified to be the table's primary key.
The primary key enforces a uniqueness constraint (at most one row may have a given primary key tuple) and acts as the sole index by which rows may be effieciently updated or deleted.
上面这些等同于关系型数据库,除了主键作为唯一索引,因为kudu不支持二级索引
Unlike most relational databases, Kudu does not currently offer secondary indexes or uniqueness constraints other than the primary key.
Currently, Kudu requires that every table has a primary key defined, though we anticipate that a future version will add automatic generation of surrogate keys.
不支持二级索引,必须要定义主键
Write operations
After creating a table, the user mutates the table using Insert, Update, and Delete APIs.
Currently, Kudu does not offer any multi-row transactional APIs: each mutation conceptually executes as its own transaction, despite being automatically batched with other mutations for better performance.
Modifications within a single row are always executed atomically across columns.写操作,不支持多行transactions;保证单行的操作原子性
Read operations
Kudu offers only a Scan operation to retrieve data from a table.
On a scan, the user may add any number of predicates to filter the results. Currently, we offer only two types of predicates: comparisons between a column and a constant value, and composite primary key ranges.kudu提供Scan来扫表,用户可以增加任意个数的谓词来过滤结果,当前只支持两种谓词,将column和常量比较,或组合的组件range
In addition to applying predicates, the user may specify a projection for a scan.
A projection consists of a subset of columns to be retrieved. Because Kudu's on-disk storage is columnar, specifying such a subset can substantially improve performance for typical analytic workloads.除了谓词,还能使用projection,因为kudu是列存,所以进行projection会大大提高效率
Other APIs
In addition to data path APIs, the Kudu client library offers other useful functionality.
In particular, the Hadoop ecosystem gains much of its performance by scheduling for data locality. Kudu provides APIs for callers to determine the mapping of data ranges to particular servers to aid distributed execution frameworks such as Spark, MapReduce, or Impala in scheduling.kudu还提供其它的API,比如提供mapping of data ranges to particular servers,来便于locality
Consistency Model
Kudu provides clients the choice between two consistency modes.
The default consistency mode is snapshot consistency. A scan is guaranteed to yield a snapshot with no anomalies in which causality would be violated. As such, it also guarantees read-your-writes consistency from a single client.snapshot consistency,保证单个client的读写一致性,即,单个client,读写是可以保证顺序和因果关系的
By default, Kudu does not provide an external consistency guarantee.
比如,1完成写入a,发送消息给2,2收到消息后写入b,从c看来,只看到b,而没有看到a
虽然a在时序上是先的,但由于在不同的client,所以是无法保证这种外部一致性的
Kudu offers the option to manually propagate timestamps between clients:
after performing a write, the user may ask the client library for a timestamp token. This token may be propagated to another client through the external channel, and passed to the Kudu API on the other side, thus preserving the causal relationship between writes made across the two clients.kudu提供一种可选的手工的在clients之间传播timestamps的方式:
执行写操作时,用户会向client library要一个timestamp token,这个token会被附带的通过外部的channel传递给另一个client,另一个client在调用Kudu API的时候,会带上这个token,这样kudu就知道两个写操作之间的先后顺序
If propagating tokens is too complex, Kudu optionally uses commit-wait as in Spanner[14].
After performing a write with commit-wait enabled, the client may be delayed for a period of time to ensure that any later write will be causally ordered correctly. Absent specialized time-keeping hardware, this can introduce signi cant latencies in writes (100-1000ms with default NTP con gurations), so we anticipate that a minority of users will take advantage of this option.如果嫌propagating tokens太麻烦,kudu可以使用类似spanner的commit-wait,但是由于没有专门的硬件,在使用NTP的条件下,写延迟会达到100-1000ms,估计没啥人用
Given this, it is plausible that within a few years, cloud providers will offer tight global time synchronization as a differentiating service.
The assignment of operation timestamps is based on a clock algorithm termed HybridTime[15].
参考,
Timestamps
Although Kudu uses timestamps internally to implement concurrency control, Kudu does not allow the user to manually set the timestamp of a write operation. This differs from systems such as Cassandra and HBase, which treat the timestamp of a cell as a first-class part of the data model.
We do, however, allow the user to specify a timestamp for a read operation.
This allows the user to perform point-in-time queries in the past, as well as to ensure that different distributed tasks that together make up a single query" (e.g. as in Spark or Impala) read a consistent snapshot.kudu将timestamps用于实现并发控制,所以在写操作的时候,不允许用户手工set timestamp
但在读操作中,用户可以指定timestamp,来让用户执行point-in-time queries;
Architecture
Following the design of BigTable and GFS[18] (and their open-source analogues HBase and HDFS), Kudu relies on a single Master server, responsible for metadata, and an arbitrary number of Tablet Servers, responsible for data.
Partitioning
As in most distributed database systems, tables in Kudu are horizontally partitioned. Kudu, like BigTable, calls these horizontal partitions tablets.
Any row may be mapped to exactly one tablet based on the value of its primary key, thus ensuring that random access operations such as inserts or updates
affect only a single tablet.For large tables where throughputis important, we recommend on the order of 10-100 tablets per machine. Each tablet can be tens of gigabytes.
Unlike BigTable, which offers only key-range-based partitioning, and unlike Cassandra, which is nearly always deployed with hash-based partitioning, Kudu supports a flexible array of partitioning schemes.
kudu的partition策略比较flexible,可以同时兼顾hash-based和key-range-based
When creating a table, the user specifies a partition schema for that table.
The partition schema acts as a function which can map from a primary key tuple into a binary partition key.当创建一个table,我们指定一个partition schema,即将primary key转换成partition key的function
Each tablet covers a contiguous range of these partition keys. Thus, a client, when performing a read or write, can easily determine which tablet should hold the given key and route the request accordingly.
每个tablet都会cover一个连续的partitions key的range,所以client在执行读写的时候,可以很容易找到到底要去哪个tablet去读
The partition schema is made up of zero or more hash-partitioning rules followed by an optional range-partitioning rule:
A hash-partitioning rule consists of a subset of the primary key columns and a number of buckets.
For example, as expressed in our SQL dialect, DISTRIBUTE BY HASH(hostname, ts) INTO 16 BUCKETS. These rules convert tuples into binary keys by first concatenating the values of the specified columns, and then computing the hash code of the resulting string modulo the requested number of buckets. This resulting bucket number is encoded as a 32-bit big-endian integer in the resulting partition key.A range-partitioning rule consists of an ordered subset of the primary key columns.
This rule maps tuples into binary strings by concatenating the values of the specified columns using an order-preserving encoding.Partition schema可以由0+个hash-partitioning rules和可选的range-partitioning rule组成
By employing these partitioning rules, users can easily trade off between query parallelism and query concurrency based on their particular workload.
For example, consider a time series application which stores rows of the form (host, metric, time, value) and in which inserts are almost always done with monotonically increasing time values. Choosing to hash-partition by timestamp optimally spreads the insert load across all servers; however, a query for a specific metric on a specific host during a short time range must scan all tablets, limiting concurrency.
如果只是用hash-partition,无法应对short time range的查询
A user might instead choose to range-partition by timestamp while adding separate hash partitioning rules for the metric name and hostname, which
would provide a good trade-off of parallelism on write and concurrency on read.做法是选择对于timestamp进行range-partition,并且对metric name and hostname增加hash partitioning
Although this exibility in partitioning is relatively unique in the NoSQL space, it should be quite familiar to users and administrators of analytic MPP database
management systems.
Replication
In order to provide high availability and durability while running on large commodity clusters, Kudu replicates all of its table data across multiple machines.
Kudu employs the Raft consensus algorithm to replicate its tablets.
In particular, Kudu uses Raft to agree upon a logical log of operations (e.g. insert/update/delete) for each tablet.When a client wishes to perform a write, it first locates the leader replica (see Section 3.4.3) and sends a Write RPC to this replica.
If the client's information was stale and the replica is no longer the leader, it rejects the request, causing the client to invalidate and refresh its metadata cache and resend the request to the new leader.
If the replica is in fact still acting as the leader, it employs a local lock manager to serialize the operation against other concurrent operations, picks an MVCC timestamp, and proposes the operation via Raft to its followers.
If a majority of replicas accept the write and log it to their own local write-ahead logs, the write is considered durably replicated and thus can be committed on all replicas.kudu提供多复本机制,并且通过Raft来保证复本之间的一致性,特别是用于同步复本间operations (e.g. insert/update/delete)的逻辑日志
client想要写,会先找到leader replica,然后发起一个写RPC;如果client meta过期,leader已经迁移,那么需要更新meta data,并把写请求发给新的leader
然后leader replica,会用local的锁管理,来保证concurrent operations的顺序执行,并pick一个MVCC时间戳,并通过Raft将operations发送给followers
如果大部分replicas接受该写操作,并写入他们自己的local write-ahead logs,就认为这次写操作被认为durably replicated,并且commit这次写操作
In the case of a failure of a minority of replicas, the leader can continue to propose and commit operations to the tablet's replicated log.
If the leader itself fails, the Raft algorithm quickly elects a new leader. By default, Kudu uses a 500-millisecond heartbeat interval and a 1500-millisecond election timeout; thus, after a leader fails, a new leader is typically elected within a few seconds.如果少量的replicas失败,leader还可以继续propose或commit操作;并且如果leader fail,Raft算法会重新选出一个新的leader;
Kudu implements some minor improvements on the Raft algorithm. In particular:
1. As proposed in [19] we employ an exponential back-off algorithm after a failed leader election. We found that, as we typically commit Raft's persistent metadata to contended hard disk drives, such an extension was necessary to ensure election convergence on busy clusters.2. When a new leader contacts a follower whose log diverges from its own, Raft proposes marching backward one operation at a time until discovering the point where they diverged. Kudu instead immediately jumps back to the last known committedIndex, which is always guaranteed to be present on any divergent follower. This minimizes the potential number of round trips at the cost of potentially sending redundant operations over the network.
We found this simple to implement, and it ensures that divergent operations are aborted after a single round-trip.
Kudu不会对tablet本身做replica,而仅仅对operation log做replica 这样tablet每个replica的物理存储都是完全的decoupled,这样有如下的好处,
这怎么理解?
不是说tablet没有replica,而是这些replica不会显式同步的,是全解耦的,独立的 Kudu只会去同步operation log,当大部分follower,把operation写入operation log,就认为operation成功 而后续每个follower都独立根据operation log去更新每个replica
Kudu does not replicate the on-disk storage of a tablet, but rather just its operation log.
The physical storage of each replica of a tablet is fully decoupled. This yields several advantages:When one replica is undergoing physical-layer background operations such as flushes or compactions (see Section 4), it is unlikely that other nodes are operating on the same tablet at the same time.
Because Raft may commit after an acknowledgment by a majority of replicas, this reduces the impact of such physical-layer operations on the tail latencies experienced by clients for writes. In the future, we anticipate implementing techniques such as the speculative read requests described in [16] to further decrease tail latencies for reads in concurrent read/write workloads.我的理解,在replica做物理层的flush或compaction的时候,其他node是无法同时操作这个tablet replica的 而operating log的同步是不会影响的,所以不会影响Raft的majority acknowledgment
During development, we discovered some rare race conditions in the physical storage layer of the Kudu tablet.
Because the storage layer is decoupled across replicas, none of these race conditions resulted in unrecoverable data loss: in all cases, we were able to detect that one replica had become corrupt (or silently diverged from the majority) and repair it.物理存储层的race conditions或单replica crash不会导致数据丢失
Configuration Change
Kudu implements Raft configuration change following the one-by-one algorithm proposed in [24].
In this approach, the number of voters in the Raft configuration may change by at most one in each configuration change. In order to grow a 3-replica con guration to 5 replicas, two separate configuration changes (3->4, 4->5) must be proposed and committed.Raft的配置是one by one提交的,比如3 replica到5 replica,需要提交两次
Kudu implements the addition of new servers through a process called remote bootstrap.
In our design, in order to add a new replica, we first add it as a new member in the Raft configuration, even before notifying the destination server that a new replica will be copied to it. When this configuration change has been committed, the current Raft leader replica triggers a StartRemoteBootstrap RPC, which causes the destination server to pull a snapshot of the tablet data and log from the current leader. When the transfer is complete, the new server opens the tablet following the same process as after a server restart. When the tablet has opened the tablet data and replayed any necessary write-ahead logs, it has fully replicated the state of the leader at the time it began the transfer, and may begin responding to Raft RPCs as a fully-functional replica.如何增加一个新的replica,过程称为remote bootstrap,做法就是common的方式,能想象到的
In our current implementation, new servers are added immediately as VOTER replicas.
This has the disadvantage that, after moving from a 3-server configuration to a 4-server configuration, three out of the four servers must acknowledge each operation. Because the new server is in the process of copying, it is unable to acknowledge operations. If another server were to crash during the snapshot-transfer process, the tablet would become unavailable for writes until the remote bootstrap finished.当前增加一个replica,会立刻变为Voter,这个有问题的 比如,配置从3 replica 到4 replica,此时新的replica还没有完成初始化,但由于replica增加,现在需要3 replica ack才能成功,如果其中任意replica crash,会导致写失败
To address this issue, we plan to implement a PRE VOTER replica state.
In this state, the leader will send Raft updates and trigger remote bootstrap on the target replica, but not count it as a voter when calculating the size of the configuration's majority. Upon detecting that the PRE VOTER replica has fully caught up to the current logs, the leader will automatically propose and commit another configuration change to transition the new replica to a full VOTER.解决的方法很简单,就是增加PRE VOTER状态,当replica完全ready以后,再切换成Voter状态
When removing replicas from a tablet, we follow a similar approach:
the current Raft leader proposes an operation to change the configuration to one that does not include the node to be evicted. If this is committed, then the remaining nodes will no longer send messages to the evicted node, though the evicted node will not know that it has been removed. When the configuration change is committed, the remaining nodes report the configuration change to the Master, which is responsible for cleaning up the orphaned replica (see Section 3.4.2).如何删除一个replica?过程也比较common
The Kudu Master
Kudu's central master process has several key responsibilities:
1. Act as a catalog manager, keeping track of which tables and tablets exist, as well as their schemas, desired replication levels, and other metadata. When tables are created, altered, or deleted, the Master coordinates these actions across the tablets and ensures their eventual completion. 2. Act as a cluster coordinator, keeping track of which servers in the cluster are alive and coordinating redistribution of data after server failures. 3. Act as a tablet directory, keeping track of which tablet servers are hosting replicas of each tablet.
Catalog Manager
The master itself hosts a single-tablet table which is restricted from direct access by users.
The master internally writes catalog information to this tablet, while keeping a full write through cache of the catalog in memory at all times.master会创建一个内存中的表用于存放目录元数据,这种设计是可能有扩展性问题的,但是当前由于元数据比较小,问题不大
The catalog table maintains a small amount of state for each table in the system.
In particular, it keeps the current version of the table schema, the state of the table (creating, running, deleting, etc), and the set of tablets which comprise the table.catalog table包含,table schema当前的版本,table的状态(creating, running, deleting, etc),以及table所包含的tablets集合
The master services a request to create a table by first writing a table record to the catalog table indicating a CREATING state.
Asynchronously, it selects tablet servers to host tablet replicas, creates the Master-side tablet metadata, and sends asynchronous requests to create the replicas on the tablet servers. If the replica creation fails or times out on a majority of replicas, the tablet can be safely deleted and a new tablet created with a new set of replicas. If the Master fails in the middle of this operation, the table record indicates that a roll-forward is necessary and the master can resume where it left off.table创建的过程,先往catalog table里面写条table record,并将状态设为,CREATING 然后选择一堆tablet servers来放他的tablet replicas,于是先把tablets的metadata,存到catalog里面,最后异步的发送请求给各个tablet servers去创建replica 如果某个replica失败,只需要重新选择创建一个tablet 如果在过程中master fails,由于在catalog的已经记录下来了,后续master resume后还能继续
A similar approach is used for other operations such as schema changes and deletion, where the Master ensures that the change is propagated to the relevant tablet servers before writing the new state to its own storage.
In all cases, the messages from the Master to the tablet servers are designed to be idempotent, such that on a crash and restart, they can be safely resent.其他操作的过程是大致相同,比如schema changes and deletion,但我们需要保证从Master到tablet servers的message都是idempotent的,即crash and restart后,可以反复重复发的
Because the catalog table is itself persisted in a Kudu tablet, the Master supports using Raft to replicate its persistent state to backup master processes.
Currently, the backup masters act only as Raft followers and do not serve client requests. Upon becoming elected leader by the Raft algorithm, a backup master scans its catalog table, loads its in-memory cache, and begins acting as an active master following the same process as a master restart.当然catalog table本身是Kudu tablet,所以也可以用Raft来保证replicas之间的一致性; 但backup master无法处理client requests,只有被raft选为leader,backup master才会把catalog table加载到内存,从而成为真正的master
Cluster Coordination
Each of the tablet servers in a Kudu cluster is statically configured with a list of host names for the Kudu masters.
Upon startup, the tablet servers register with the Masters and proceed to send tablet reports indicating the total set of tablets which they are hosting. The first such tablet report contains information about all tablets. All future tablet reports are incremental, only containing reports for tablets that have been newly created, deleted, or modified (e.g. processed a schema change or Raft configuration change).tablet servers是通过静态配置一系列hosts列表,来让master知道 完成启动后,tablets server会到master上完成注册,并会定期发送他hosting的所有的tablets的report
第一次的report是全量的,后续的report会是增量的
A critical design point of Kudu is that, while the Master is the source of truth about catalog information, it is only an observer of the dynamic cluster state.
The tablet servers themselves are always authoritative about the location of tablet replicas, the current Raft configuration, the current schema version of a tablet, etc. Because tablet replicas agree on all state changes via Raft, every such change can be mapped to a specific Raft operation index in which it was committed. This allows the Master to ensure that all tablet state updates are idempotent and resilient to transmission delays: the Master simply compares the Raft operation index of a tablet state update and discards it if the index is not newer than the Master's current view of the world.Kudu的一个核心的设计思路是,master是作为一个观察者,而非一个决策者,虽然他catalog表中保存了所有的tablets的元数据 tablets所做的所有change都是通过Raft协议来决策的,每个change都有对应到一个Raft operation index 所以master可以保证所有tablet状态的更新是幂等的,并可以接受传输延迟,因为master只要发现Raft operation index,比master当前的状态要老,就可以discard掉
This design choice leaves much responsibility in the hands of the tablet servers themselves.
For example, rather than detecting tablet server crashes from the Master, Kudu instead delegates that responsibility to the Raft LEADER replicas of any tablets with replicas on the crashed machine.
The leader keeps track of the last time it successfully communicated with each follower, and if it has failed to communicate for a significant period of time, it declares the follower dead and proposes a Raft configuration change to evict the follower from the Raft configuration. When this configuration change is successfully committed, the remaining tablet servers will issue a tablet report to the Master to advise it of the decision made by the leader.这样设计会将很多责任推给tablet servers,比如,发现tablet servers crash不是由master,而是由Raft LEADER replicas,leader当发现和follower无法通信的时候,就会认为该follower dead,并通过Raft过程去修改配置,当修改成功后,通知master即可
In order to regain the desired replication count for the tablet, the Master selects a tablet server to host a new replica based on its global view of the cluster.
After selecting a server, the Master suggests a configuration change to the current leader replica for the tablet. However, the Master itself is powerless to change a tablet configuration - it must wait for the leader replica to propose and commit the configuration change operation, at which point the Master is notified of the configuration change's success via a tablet report. If the Master's suggestion failed (e.g. because the message was lost) it will stubbornly retry periodically until successful. Because these operations are tagged with the unique index of the degraded configuration, they are fully idempotent and conflict-free, even if the Master issues several conflicting suggestions, as might happen soon after a master fail-over.当一个tablet server crash,Raft LEADER完成配置更改后,Master为了保证desired replication count,需要根据globe view重新选一个tablet server 来放新replica 但是Master本身是没有权利去改配置的,他只能建议Raft leader去change配置,如果leader change成功会通过report通知master 如果失败,它会一直尝试,因为这个request是幂等的,所以发多次不会有问题。。。
The master responds similarly to extra replicas of tablets.
If the Master receives a tablet report which indicates that a replica has been removed from a tablet configuration, it stubbornly sends DeleteTablet RPCs to the removed node until the RPC succeeds. To ensure eventual cleanup even in the case of a master crash, the Master also sends such RPCs in response to a tablet report which identifies that a tablet server is hosting a replica which is not in the newest committed Raft configuration.当发现一个replica已经被删除或多余时,过程也是一样的
Tablet Directory
In order to efficiently perform read and write operations without intermediate network hops, clients query the Master for tablet location information.
Clients are “thick" and maintain a local metadata cache which includes their most recent information about each tablet they have previously accessed, including the tablet's partition key range and its Raft configuration. At any point in time, the client's cache may be stale; if the client attempts to send a write to a server which is no longer the leader for a tablet, the server will reject the request. The client then contacts the Master to learn about the new leader. In the case that the client receives a network error communicating with its presumed leader, it follows the same strategy, assuming that the tablet has likely elected a new leader.clients会从master去同步metadata,并且作为thick client会去cache,然后就不需要访问master,而直接访问tablet server,当然如果metadata发生变化,client会访问失败,这是会再次去同步metadata
In the future, we plan to piggy-back the current Raft configuration on the error response if a client contacts a non-leader replica.
This will prevent extra round-trips to the master after leader elections, since typically the followers will have up-to-date information.后面,会试图在error response中随便返回Raft configuration,这样client就不需要再去查一次了,因为follower应该是知道最新的信息的
Tablet storage
Within a tablet server, each tablet replica operates as an entirely separate entity, significantly decoupled from the partitioning and replication systems described in sections 3.2 and 3.3.
During development of Kudu, we found that it was convenient to develop the storage layer somewhat independently from the higher-level distributed system, and in fact many of our functional and unit tests operate entirely within the confines of the tablet implementation. Due to this decoupling, we are exploring the idea of providing the ability to select an underlying storage layout on a per-table, per-tablet or even per-replica basis - a distributed analogue of Fractured Mirrors, as proposed in [26]. However, we currently offer only a single storage layout, described in this section.在tablet server,每个tablet replica是作为一个完全separate的存在,是和higher-level 的分布式系统decoupled 的,你可以对tablet replica进行单独的UT 这样的好处是,你甚至可以为per-table, per-tablet or even per-replica basis去选择不同的storage layout
Overview
The implementation of tablet storage in Kudu addresses several goals:
1. Fast columnar scans - In order to provide analytic performance comparable to best-of-breed immutable data formats such as Parquet and ORCFile[7], it's critical that the majority of scans can be serviced from efficiently encoded columnar data files. 2. Low-latency random updates - In order to provide fast access to update or read arbitrary rows, we require O(lg n) lookup complexity for random access. 3. Consistency of performance - Based on our experiences supporting other data storage systems, we have found that users are willing to trade o peak performance in order to achieve predictability. In order to provide these characteristics simultaneously, Kudu does not reuse any pre-existing storage engine, but rather chooses to implement a new hybrid columnar store architecture.为了实现tablet storage,有以下几个目标:
1. 快速的columnar scans,为了满足analytic performance 2. 低延迟的随机更新 3. Consistency of performance ,怎么翻?根据经验,用户往往希望通过牺牲peak性能来达到更多的可预见性,所以我没有用现有的storage engine,而是重新实现一套hybrid columnar store architecture。
RowSets
Tablets in Kudu are themselves subdivided into smaller units called RowSets.
Some RowSets exist in memory only, termed MemRowSets, while others exist in a combination of disk and memory, termed DiskRowSets. Any given live (not deleted) row exists in exactly one RowSet; thus, RowSets form disjoint sets of rows. However, note that the primary key intervals of different RowSets may intersect.At any point in time, a tablet has a single MemRowSet which stores all recently-inserted rows. Because these stores are entirely in-memory, a background thread periodically flushes MemRowSets to disk.
The scheduling of these flushes is described in further detail in Section 4.11. When a MemRowSet has been selected to be flushed, a new, empty MemRowSet is swapped in to replace it. The previous MemRowSet is written to disk, and becomes one or more DiskRowSets.This flush process is fully concurrent: readers can continue to access the old MemRowSet while it is being flushed, and updates and deletes of rows in the flushing MemRowSet are carefully tracked and rolled forward into the on-disk data upon completion of the flush process.
Tablets可以分为更小的单元,RowSets,两种类型,放在内存的,MemRowSets;和放在磁盘的DiskRowSets,这个和Hbase的思路一样
每一个live的行都会exactly的存在于某一个RowSet中,所以RowSets 是一系列不相交的rows的集合;但是主键的范围会相交
可以想象,MemRowSet 会定期不停的flush到DiskRowSets,并且flush过程是concurrent,不会影响readers对老MemRowSet 的访问
MemRowSet Implementation
MemRowSets are implemented by an in-memory concurrent B-tree with optimistic locking, broadly based off the design of MassTree[22], with the following changes:
1. We do not support removal of elements from the tree. Instead, we use MVCC records to represent deletions.
MemRowSets eventually flush to other storage, so we can defer removal of these records to other parts of the system.
2. Similarly, we do not support arbitrary in-place updates of records in the tree. Instead, we allow only modifications which do not change the value's size: this permits atomic compare-and-swap operations to append mutations to a
per-record linked list.3. We link together leaf nodes with a next pointer, as in the B+-tree[13]. This improves our sequential scan performance, a critical operation.
4. We do not implement the full ”trie of trees", but rather just a single tree, since we are less concerned about extremely high random access throughput compared to the original application.
MemRowSets,是以在内存中加上乐观锁的B-tree来实现的,基于MassTree 1. 不支持删除,用MVCC 2. 不支持arbitrary in-place updates,permits atomic compare-and-swap operations to append mutations to aper-record linked list 3. 在叶节点间增加,next pointer,来提供scan的性能 4. 不用多树,而用single tree,提高random access 的throughput
In order to optimize for scan performance over random access, we use slightly larger internal and leaf nodes sized at four cache-lines (256 bytes) each.
Unlike most data in Kudu, MemRowSets store rows in a row-wise layout. This still provides acceptable performance, since the data is always in memory. To maximize throughput despite the choice of row storage, we utilize SSE2 memory prefetch instructions to prefetch one leaf node ahead of our scanner, and JIT-compile record projection operations using LLVM[5]. These optimizations provide significant performance boosts relative to the naive implementation. In order to form the key for insertion into the B-tree, we encode each row's primary key using an order-preserving encoding as described in Section 3.2. This allows efficient tree traversal using only memcmp operations for comparison, and the sorted nature of the MemRowSet allows for efficient scans over primary key ranges or individual key lookups.为了在random acess的基础上优化批量scan的性能,我们适当的加大internal and leaf nodes的大小到256k; 而且不像kudu中的大部分数据,在MemRowSets中的数据是行存的,由于数据在内存中,读取很快,所以行存的性能是可以接受的 并且为了最大化throughput,使用SSE2指令集中的内存预取指令在scanner之前预先读出叶节点,并且用LLVM编译器框架来JIT complie记录的projection操作;这些优化极大的提升了性能
为了产生插入到B-tree中的key,我们通过order-preserving encoding的方式将primary key 进行encoding;这样会使遍历树很高效,仅仅通过memcmp 操作进行比较即可;
DiskRowSet Implementation
When MemRowSets flush to disk, they become DiskRowSets.
While flushing a MemRowSet, we roll the DiskRowSet after each 32 MB of IO. This ensures that no DiskRowSet is too large, thus allowing efficient incremental compaction as described later in Section 4.10. Because a MemRowSet is in sorted order, the flushed DiskRowSets will themselves also be in sorted order, and each rolled segment will have a disjoint interval of primary keys.MemRowSets 每32M会flush到一个DiskRowSet,这样保证DiskRowSet 不会太大;MemRowSet 是有序的,所以DiskRowSets 也是有序的,并且每个rolled segment 主键不相交
A DiskRowSet is made up of two main components: base data and delta stores.
DiskRowSet 分为base data 和 delta stores The base data is a column-organized representation of the rows in the DiskRowSet.
Each column is separately written to disk in a single contiguous block of data. The column itself is subdivided into small pages to allow for granular random reads, and an embedded B-tree index allows efficient seeking to each page based on its ordinal offset within the rowset. Column pages are encoded using a variety of encodings, such as dictionary encoding, bitshue[23], or front coding, and is optionally compressed using generic binary compression schemes such as LZ4, gzip, or bzip2.These encodings and compression options may be specified explicitly by the user on a per-column basis, for example to designate that a large infrequently-accessed text column should be gzipped, while a column that typically stores small integers should be bit-packed. Several of the page formats supported
by Kudu are common with those supported by Parquet, and our implementation shares much code with Impala's Parquet library.In addition to flushing columns for each of the user-specified columns in the table, we also write a primary key index column, which stores the encoded primary key for each row. We also flush a chunked Bloom filter[10] which can be used to test for the possible presence of a row based on its encoded
primary key.base data就是将rows按column-organized 进行存储; column会被分成small pages 以允许细粒度的随机读,embedded的B-tree索引让我们可以很容易的在rowset中找到这个page; column可以被用各种格式encoding,或选择各种压缩,取决于应用的场景。
除了写入column本身,还需要写入row主键的索引列,以及相应的bloom filter来快速发现是否包含某行的primary key
Because columnar encodings are diffcult to update in place, the columns within the base data are considered immutable once flushed.
Instead, updates and deletes are tracked through structures termed delta stores. Delta stores are either in-memory DeltaMemStores, or on-disk DeltaFiles.A DeltaMemStore is a concurrent B-tree which shares the implementation described above.
A DeltaFile is a binary-typed column block. In both cases, delta stores maintain a mapping from (row offset, timestamp) tuples to RowChange-List records. The row offset is simply the ordinal index of a row within the RowSet - for example, the row with the lowest primary key has offset 0. The timestamp is the MVCC timestamp assigned when the operation was originally written. The RowChangeList is a binary-encoded list of changes to a row, for example indicating SET column id 3 = `foo' or DELETE.
When servicing an update to data within a DiskRowSet,
we first consult the primary key index column. By using its embedded B-tree index, we can efficiently seek to the page containing the target row. Using page-level metadata, we can determine the row offset for the first cell within that page. By searching within the page (eg via in-memory binary search) we can then calculate the target row's offset within the entire DiskRowSet. Upon determining this offset, we insert a new delta record into the rowset's DeltaMemStore.base data一旦flushed,就认为是不可变更的;所以update和delete就需要通过delta stores来存储,delta stores也分为DeltaMemStores和DeltaFiles delta stores通过maintain一个(row offset, timestamp)的mapping的RowChange-List。注意这里是row offset,而不是row主键
所以后面当你做更新的时候,先通过row主键找到相应的rowset,然后通过embedded B-tree找到相应的page,然后再在page里面通过二分查找找到对应的column,这样可以算出对于整个DiskRowSet的offset是多少,然后用这个offset去插入一条delta数据到DeltaMemStore
Delta Flushes
Because the DeltaMemStore is an in-memory store, it has finite capacity.
The same background process which schedules flushes of MemRowSets also schedules flushes of DeltaMemStores. When flushing a DeltaMemStore, a new empty store is swapped in while the existing one is written to disk and becomes a DeltaFile. A DeltaFile is a simple binary column which contains an immutable copy of the data that was previously in memory.
INSERT path
As described previously, each tablet has a single MemRowSet which is holds recently inserted data;
however, it is not sufficient to simply write all inserts directly to the current MemRowSet, since Kudu enforces a primary key uniqueness constraint. In other words, unlike many NoSQL stores, Kudu differentiates INSERT from UPSERT.In order to enforce the uniqueness constraint, Kudu must consult all of the existing DiskRowSets before inserting the new row.
Because there may be hundreds or thousands of DiskRowSets per tablet, it is important that this be done efficiently, both by culling the number of DiskRowSets to consult and by making the lookup within a DiskRowSet efficient.In order to cull the set of DiskRowSets to consult on an INSERT operation, each DiskRowSet stores a Bloom filter of the set of keys present.
Because new keys are never inserted into an existing DiskRowSet, this Bloom filter is static data. We chunk the Bloom filter into 4KB pages, each corresponding to a small range of keys, and index those pages using an immutable B-tree structure. These pages as well as their index are cached in a server-wide LRU page cache, ensuring that most Bloom filter accesses do not require a physical disk seek.Additionally, for each DiskRowSet, we store the minimum and maximum primary key, and use these key bounds to index the DiskRowSets in an interval tree. This further culls the set of DiskRowSets to consult on any given key lookup.
A background compaction process, described in Section 4.10 reorganizes DiskRowSets to improve the effectiveness of the interval tree-based culling. For any DiskRowSets that are not able to be culled, we must fall back to looking up the key to be inserted within its encoded primary key column. This is done via the embedded B-tree index in that column, which ensures a logarithmic number of disk seeks in the worst case. Again, this data access is performed through the page cache, ensuring that for hot areas of key space, no physical disk seeks are needed.由于kudu强制的主键唯一约束,所以在insert前,必须知道这个key是否存在 论文中说,所以在insert前,必须要问所有的DiskRowSets,而DiskRowSets 很多,所以为了效率,每个DiskRowSets 都会存储一个bloom filter; 这里将bloom filter chunk成4kb的page(这样每个bl都只需要读一个page cache),每个只对于小范围的keys,用B-tree来索引这些bloom filter pages;
这些pages像其他索引一样会被cache在server端的LRU page cache中;这样来保证大部分对bloom filter的访问,不需要访问磁盘;
同时每个DiskRowSet都会记录minimum and maximum primary key,作为进一步的筛选条件
这样设计的目的,
因为bloom filter,判无是精确的,而判有是有可能错的
所以我们用bl只是筛选掉大部分判无的case,而对于判有的case,需要真正去search看看是不是确实有
Read path
Similar to systems like X100[11], Kudu's read path always operates in batches of rows in order to amortize function call cost and provide better opportunities for loop unrolling and SIMD instructions.
Kudu's in-memory batch format consists of a top-level structure which contains pointers to smaller blocks for each column being read. Thus, the batch itself is columnar in memory, which avoids any offset calculation cost when copying from columnar on-disk stores into the batch.
When reading data from a DiskRowSet, Kudu first determines if a range predicate on the scan can be used to cull the range of rows within this DiskRowSet.
For example, if the scan has set a primary key lower bound, we perform a seek within the primary key column in order to determine a lower bound row offset; we do the same with any upper bound key. This converts the key range predicate into a row offset range predicate, which is simpler to satisfy as it requires no expensive string comparisons.Next, Kudu performs the scan one column at a time.
First, it seeks the target column to the correct row offset (0, if no predicate was provided, or the start row, if it previously determined a lower bound). Next, it copies cells from the source column into our row batch using the page-encoding specific decoder. Last, it consult the delta stores to see if any later updates have replaced cells with newer versions, based on the current scan's MVCC snapshot, applying those changes to our in-memory batch as necessary.Because deltas are stored based on numerical row offsets rather than primary keys, this delta application process is extremely effcient: it does not require any per-row branching or expensive string comparisons.
这是deltas为何使用row offsets而不是主键的原因
After performing this process for each row in the projection, it returns the batch results, which will likely be copied into an RPC response and sent back to the client.
The tablet server maintains stateful iterators on the server side for each scanner so that successive requests do not need to re-seek, but rather can continue from the previous point in each column file.
Lazy Materialization
If predicates have been specified for the scanner, we perform lazy materialization[9] of column data. In particular, we prefer to read columns which have associated range predicates before reading any other columns.
After reading each such column, we evaluate the associated predicate. In the case that the predicate filters all rows in this batch, we short circuit the reading of other columns. This provides a significant speed boost when applying selective predicates, as the majority of data from the other selected columns will never be read from disk.scanner会去检测predicate,会先去读有range predicates的columns,这样可以先对row做过滤,减少其他column的读取,成为lazy materialization
Delta Compaction
Because deltas are not stored in a columnar format, the scan speed of a tablet will degrade as ever more deltas are applied to the base data. Thus, Kudu's background maintenance manager periodically scans DiskRowSets to find any cases where a large number of deltas (as identified by the ratio between base data row count and delta count) have accumulated, and schedules a delta compaction operation which merges those deltas back into the base data columns.
In particular, the delta compaction operation identifies the common case where the majority of deltas only apply to a subset of columns: for example, it is common for a SQL batch operation to update just one column out of a wide table. In this case, the delta compaction will only rewrite that single column, avoiding IO on the other unmodified columns.deltas不是以columnar的格式存储的,所以如果deltas太大会大大降低scan速度。所以Kudu后台会去定期把过大的deltas merge到base data中去
RowSet Compaction
In addition to compacting deltas into base data, Kudu also periodically compacts different DiskRowSets together in a process called RowSet compaction. This process performs a key-based merge of two or more DiskRowSets, resulting in a sorted stream of output rows. The output is written back to new DiskRowSets, again rolling every 32 MB, to ensure that no DiskRowSet in the system is too large.
RowSet compaction has two goals: 1. We take this opportunity to remove deleted rows. 2. This process reduces the number of DiskRowSets that overlap in key range. By reducing the amount by which RowSets overlap, we reduce the number of RowSets which are expected to contain a randomly selected key in the tablet. This value acts as an upper bound for the number of Bloom lter lookups, and thus disk seeks, expected to service a write operation within the tablet.会定期merge DiskRowSets,好处是remove删除行,reduce多个DiskRowSets之间key range的overlap
Scheduling maintenance
As described in the sections above, Kudu has several different background maintenance operations that it performs to reduce memory usage and improve performance of the on-disk layout.
These operations are performed by a pool of maintenance threads that run within the tablet server process. Toward the design goal of consistent performance, these threads run all the time, rather than being triggered by specific events or conditions. Upon the completion of one maintenance operation, a scheduler process evaluates the state of the on-disk storage and picks the next operation to perform based on a set of heuristics meant to balance memory usage, write-ahead log retention, and the performance of future read and write operations. In order to select DiskRowSets to compact, the maintenance scheduler solves an optimization problem: given an IO budget (typically 128 MB), select a set of DiskRowSets such that compacting them would reduce the expected number of seeks, as described above. This optimization turns out to be a series of instances of the well-known integer knapsack problem, and is able to be solved effciently in a few milliseconds. Because the maintenance threads are always running small units of work, the operations can react quickly to changes in workload behavior. For example, when insertion workload increases, the scheduler quickly reacts and flushes in-memory stores to disk. When the insertion workload reduces, the server performs compactions in the background to increase performance for future writes. This provides smooth transitions in performance, making it easier for developers and operators to perform capacity planning and estimate the latency profile of their workloads.
会根据实际情况,启发式的调度各种maintenance线程