Kudu Tablet Design (KUDU Table Design)

Reprinted: http://www.voidcn.com/article/p-rbkrvsdb-ug.html
Tablet is a horizontal partition of the Kudu table, similar to the tablet of Google Bigtable, or the region of HBase. Each tablet stores a certain continuous range of data (key), and the ranges between the two tablets will not overlap. All the tablets of a table contain all the key spaces of this table.

Tablet is composed of RowSet, and RowSet is composed of a set of rows (n pieces of data, n rows of data). RowSets are disjoint, that is, rows between different RowSets will not cross, so a given piece of data will only exist in one RowSet. Although Rowsets are disjoint, the key space between the two can be intersected (key range).

**

Handling Insertions

**

A RowSet Stored in memory, it is called MemRowSet, and there is only one MemRowSet in a tablet. MemRowSet is an in-memory B-Tree tree, and it is sorted according to the primary key of the table. All inserts are written directly into MemRowSet. Benefit from MVCC (Multi-Version Concurrency Control, which will be described below), once the data is written to the MemRowSet, subsequent readers can immediately query it.

Note: Unlike BigTable, Kudu only inserts and mutations before inserting and flushing will be recorded to MemRowSet. Mutations, such as update and deletion based on disk data, will be introduced below.

Any piece of data exactly exists in a MemRowSet in the form of an entry, which is composed of a special header and the actual row data content. Since MemRowSet is only stored in memory, it will eventually be filled up, and then Flush to disk (in one or more DiskRowSets). (Detailed introduction below)
**

MVCC overview

**

In order to provide some useful features, Kudu uses multi-version concurrency control:

Snapshot scanner: Snapshot query, when a query is created, the system will operate a point-in-time snapshot of the tablet at a specified time. Any update for this tablet during this query will be ignored. In addition, point-in-time snapshots can be stored and reused in other queries. For example, an application performs multiple interactive analysis queries on a set of continuous data.
Time-travel scanners: Historical snapshot query, the same as the snapshot query above. Users can specify a point in history to create a query, and MVCC can ensure the consistency of historical snapshots. This function can be used for consistent backup at a certain point in time.
Change-history queries: historical change queries, given two MVCC snapshots, users can query task data between these two snapshots. This function can be used for incremental backup, cross-cluster synchronization, or offline audit analysis.
Multi-row atomic updates within a tablet: Atomic updates of multiple rows of data within a tablet. In a tablet, one operation (mutation) can modify multiple rows of data, and it is visible in atomic operations in one data. (It should be an atomic operation for the column)
In order to provide the MVCC function, each operation (mutation) will have a timestamp (timestamp). Timestamp is provided by TS-wide Clock instance, tablet MvccManager can guarantee that timestamp is unique and unique in this tablet. MvccManager determines the timestamp of the data submission, so that all queries after this point in time can get the data just submitted. When the query is created, the scanner extracts a snapshot of the time state of the MvccManager. All data visible to the scanner will be compared with the MvccSnapshot to determine which insertion, update, or delete operation data is visible.

The Timestamp of each tablet is monotonically increasing. We use HybridTime technology to create timestamps, which can ensure that the timestamps between nodes are consistent.

In order to support snapshot and historical snapshot functions, multiple versions of data must be stored. In order to prevent unlimited expansion of the space, users can configure a retention time and GC the records before this time (this function can prevent each query from starting to read from the original version).

**

MVCC Mutations in MemRowSet

**

In order to support the MVCC function in MemRowSet, each row of inserted data will carry a timestamp. Moreover, row will have a pointer that points to the list of mutations immediately following it, and each mutation has a timestamp:
Write the picture description here

In traditional relational database terms , This ordered list of mutations can be called “RODO log”.

Any reader needs to access the mutations in the row of MemRowSet to get the correct snapshot. The logic is as follows:

If the timestamp of this row of data is not in the MVCC snapshot of the scanner (that is, the timestamp specified by the scanner snapshot is less than the timestamp of the data insertion, and the data has not been created), ignore this row.
If the above is not satisfied, put this line of data into the output cache.
Loop the mutation in the list:

  1. If the timestamp of the mutation is in the MVCC snapshot, perform the update in the memory cache. If not, skip this mutation.
  2. If the mutation is a DELETE operation, it is marked as deleted in the buffer, and the data in the previously loaded cache is cleared.

Note that mutation can be any of the following:

UPDATE: update value, one or more columns in a row of data
DELETE: delete a row of data
REINSERT: Reinsert a row of data (this situation only occurs when there is a DELETE mutation before and the data is in the MemRowSet.)
For a real example, the table structure (key STRING, val UINT32), after the following operations :

INSERT INTO t VALUES (“row”, 1); [timestamp 1]
UPDATE t SET val = 2 WHERE key = “row”; [timestamp 2]
DELETE FROM t WHERE key = “row”; [timestamp 3]
INSERT INTO t VALUES (“row”, 3); [timestamp 4]
In MemRowSet, there will be the following structure:
Write picture description here

Note that when the update is too frequent, there will be the following effects:

Readers need to track the pointer of the linked list, resulting in a lot of CPU cache tasks.
Updates need to be appended to the linked list At the end of, the time complexity of each update is O(n).
Considering the above inefficient operation, we give the following assumptions:

Kudu is suitable for relatively low-frequency update scenarios, that is, it is assumed that the data will not be updated too frequently.
Of the entire data, only a small part is stored in the MemRowSet: once the MemRowSet reaches a certain threshold, it will be flushed to disk. Therefore, even if the mutation of MemRowSet leads to low performance, it only takes up a small part of the overall query time.
If the inefficiency mentioned above affects the actual application, there will be many optimizations to reduce overhead that can be done in the future.

**

MemRowSet Flushes

**

When MemRowSet If it is full, the Flush operation will be triggered, and it will continue to write data to the disk.

Write the picture description here
The data is flushed to disk and becomes a CFiles file (see src/kudu/cfile/README). Each row in the data is identified by an ordered rowid, and this rowid is dense, immutable, and unique in the DiskRowSet. For example, if a given DiskRowSet contains 5 rows of data, they will be allocated as rowid 0~4 in ascending order of key. Different DiskRowSet will have different rows, but they may have the same rowid.

When reading, the system will use an index structure to map the user-visible primary key key to the rowid inside the system. The primary key in the above example is a simple key whose structure is embedded in the cfile of the primary key column. In addition, an independent index cfile saves the encoded combination key, and provides a similar method. (Don’t understand)

Note: The rowid does not exactly exist with the data of each row of data, but an implicit identification based on the ordered index of the data in this cfile. In part of the source code, rowid is defined as “row indexes” or “ordinal indexes”.

Note: Other systems, such as C-Store, refer to MemRowSet as “write optimized store” (WOS) and DiskRowSet as “read-optimized store” (ROS).

Historical MVCC in DiskRowSets

In order to provide on-disk data with MVCC function, each on-disk data The Rowset of the disk not only contains the data of the current version of the row, but also contains the records of UNDO. In this way, the historical version of this row of data can be obtained.
Write picture description here
When users want to read the latest version of data after flush, they only need to get base data. Because base data is stored in columns, this query performance is very high. If you are not reading the latest data, but a time-travel query, you have to roll back to a version of the specified historical time. At this time, you need to use UNDO record data.

When a query gets a piece of data, its process of processing MVCC information is:

Read base data
Loop each UNDO record: if the relevant operation timestamp has not Submit, then perform a rollback operation. That is, the specified snapshot timestamp of the query is less than the timestamp of the mutation, and the mutation has not occurred yet.
For example, review the series of operations in the previous MVCC Mutations in MemRowSet chapter example:
Write the picture description here
When this piece of data is flushed into the disk, it will be stored in the following format: < br> Write the picture description here
Each UNDO record is the opposite of execution processing. For example, in UNDO record, the first INSERT transaction will be converted to DELETE. UNDO recod aims to retain the timestamp of the inserted or updated data: when the specified time of the MVCC snapshot of the query is earlier than Tx1, and Tx1 has not yet been submitted, the DELETE operation will be executed at this time, then this data does not exist at this time.

Two more examples of different queries:
here Write picture description
Each example deals with UNDO records at the correct time to generate correct data.

The most common scenario is to query the latest data. At this point, we need to optimize the query strategy to avoid processing all UNDO records. In order to achieve this goal, we introduce file-level metadata, pointing to the data range of UNDO record. If all the transactions that the queried MVCC snapshot meets have been submitted (to query the latest data), this group of deltas will be short-circuited (UNDO record is not processed), and the query will have no MVCC overhead.

Handling mutations against on-disk files

Update or delete flushed to disk The data does not operate on MemRowSet. Its processing process is as follows: In order to determine which RowSet the update/delete key is in, the system will patrol all RowSets. This process first uses an interval tree to locate a set of RowSets that may contain this key. Then, use the boom filter to determine whether all candidate RowSets contain this key. If a certain RowSet passes the above two checks at the same time, the system will look for the rowid corresponding to the primary key in these RowSets.

Once the RowSet where the data is located is determined, the mutation will get the rowid corresponding to the primary key, and then the mutation will be written to a memory structure called DeltaMemStore.

There is only one DeltaMemStore in a DiskRowSet. DeltaMemStore is a parallel BTree. The key of the BTree is a mixture of rowid and mutation timestamp. When querying, after the qualified mutation is executed, the corresponding data of the snapshot timestamp is obtained. The execution method is similar to the mutation after the new data is inserted (MemRowSet).

When the data stored in DeltaMemStore is large, flush to disk will also be executed, landing as DeltaFile file:
Write the picture description here
The information type of DeltaFile is the same as DeltaMemStore , It is just compacted and serialized in a dense disk. In order to update the data from base data to the latest data, the mutation transactions in these DeltaFiles need to be executed when querying. These DeltaFile collections are called REDO files, and these mutations in the file are called REDO records. Similar to the mutations stored in MemRowSet, when reading data with a newer version than base data, they need to be applied (executed) once.

The delta information of a piece of data may be contained in multiple DeltaFile files. In this case, the DeltaFile is ordered, and subsequent changes will take precedence over the previous changes.

Note that the mutation storage structure does not necessarily contain the entire row of data. If only one column of data is updated in a row, then the mutation structure will only contain the update information for this column. Do not read or rewrite irrelevant columns, so that the update data operation is fast and efficient.

Summary of delta file processing

To summarize, each DiskRowSet is logically divided into three parts :
Write the picture description here
Base data: The latest data when MemRowSet flushes to DiskRowSet. The data is stored in column.
UNDO records: historical data, used to roll back some historical version data before Base data.
REDO records: Some updated data after Base data can be used to get the latest version of data.
UNDO record and REDO record have the same storage format, both are called DeltaFile.

Delta Compactions

As mutations in DeltaFile accumulate more and more, reading RowSet data becomes more efficient Low, worst case, reading the latest version of data requires traversing all REDO records and merging with base data. In other words, if the data is updated too many times, in order to get the latest version of the data, so many mutations need to be performed.

In order to improve the reading performance, Kudu converts the inefficient physical layout into a more efficient layout in the background, and after the conversion, it has the same logical content. This conversion is called: delta compaction. Its goals are as follows:

  1. Reduce the number of delta files. The more delta files of RowSet flush, the more independent delta
    files to read in order to read the latest version of data. This work is not suitable to be placed in memory (RAM), because each read will have a delta file disk addressing, which will suffer performance loss.
  2. Migrate REDO records to UNDO records. As mentioned above, a RowSet contains a base
    data and is stored in columns. The next paragraph is UNDO records, and the previous paragraph is REDO
    records. Most of the queries are to get the latest version of the data, so we need to minimize the number of REDO records.
  3. Recycle old UNDO records. UNDO recods only needs to save the data after the earliest time point set by the user, and UNDO
    records before this time can be removed from the disk.

Note: The design of BigTable is that timestamp is bound to data and does not retain change information (insert update delete); while Kudu’s design is that timestamp is bound to change, not data. If the historical UNDO record is deleted, you will not be able to get a certain row of data or when a certain column of data was inserted or updated. If users need this feature, they need to save the inserted or updated timestamp column, just like a traditional relational database.

Types of Delta Compaction

Delta campaction is divided into minor and major.

Minor delta compactoin:
Minor compaction is a compaction of multiple delta files, and does not include base data. Compact generates delta files.
Write picture description here
Major delta compaction:
Major compaction is a compaction of base data and any number of delta files.
Write the picture description here
Major compaction consumes more performance than minor compaction, because it needs to read and rewrite base data, and base data is much larger than delta data (because base data stores a row of data, and delta Data is a mutation of certain columns. The base data that needs to be noted is columnar storage, but delta data is not).

Major compaction can compact any number or one column in DiskRowSet. If only one column of data has undergone multiple important updates, then compact can read and rewrite only this column. This situation is often encountered in enterprise-level applications, such as updating the status of orders and updating user visits.

Both types of compaction maintain the rowid in the RowSet. Because they are executed completely in the background and will not be locked. The compact result file will be introduced into the RowSet by means of atomic swapping. After the Swap operation is over, the old files before compact will be deleted.

Merging compactions

As more and more data is written to the tablet, the number of DiskRowSets will also accumulate more. This will reduce kudu performance:

  1. Random access (to obtain or update a piece of data through the primary key), in this case, as long as its key range contains this primary key, each RowSet will Respectively to locate the position of the primary key. Boom
    filter can alleviate a certain amount of physical addressing, but access to a very large bloom filter will affect the CPU and also increase memory consumption.
  2. Query data in a certain key range (for example, query data with a primary key between A and B). At this time, each RowSet, as long as its key range overlaps with the provided range, will be addressed separately. Do not use bloom
    filter. A dedicated index structure may help, but it will also consume memory.
  3. Sorted query, if the user requires the query result to have the same order as the primary key, then the query result set must go through a merge process. The consumption of Merge usually grows logarithmically with the amount of input data, that is, as the amount of data increases, the merge will consume more performance.

As mentioned above, we should merge RowSets to reduce the number of RowSets:
Write the picture description here
Different from the Delta Compaction mentioned above, please note that merging Compaction Will not keep the rowid the same. This makes the handling of concurrent mutations complicated. This process is described in more detail in the compaction.txt file.

Overall picture

Write the picture description here

Comparison to BigTable approach

The different design methods from BigTable are as follows:

  1. In kudu, a given The key will only exist in the RowSet of a tablet.
    In BigTable, a key can exist in multiple different SSTables. An entire tablet of BigTable is similar to kudu’s RowSet: reading a piece of data requires merging all the data found in SSTable (according to the key), similar to Kudu, reading a piece of data requires merging
    base data and all DeltaFile data.
    The advantage of Kudu is that there is no need to merge to read a piece of data or perform a non-sorted query. For example, aggregating keys within a certain range can independently query each RowSet (even in parallel), and then perform the summation, because the order of the keys is not important, obviously the query efficiency is higher.
    The disadvantage of Kudu is that, unlike BigTable, insert and mutation are different operations: insert writes data to the MemRowSet, and mutation (delete, update) writes to the DeltaMemStore of the RowSet where this data is stored. The performance impacts are as follows:
    a) When writing, you must make sure that this is a new piece of data. This will generate a bloom
    filter to query all RowSets. If the Bloom filter gets a possible match (that is, it is calculated that it may be in a RowSet), then in order to determine whether it is an insert or an update, an address must be performed.
    Assuming that as long as the RowSet is small enough, the result of the bloom
    filter will be accurate enough, so most inserts will not require physical disk addressing. In addition, if the inserted key is ordered, such as timeseries+”_”+xxx, due to frequent use, the block where the key is located may be stored in the data block cache.
    b) When updating, you need to determine which RowSet the key is in. Same as above, bloom filter needs to be implemented.

    This is a bit similar to the relational database RDBMS. When inserting a data with a primary key, an error will be reported and the data will not be updated. Similarly, when updating a piece of data, an error will be reported if the piece of data does not exist. The syntax of BigTable is different.

  2. Mutation manipulates disk data through rowid, not key in the actual sense.
    In BigTable, the same primary key data can be stored in multiple SSTables. In order to combine the mutation and the stored key on the disk, BigTable needs to perform merge based on the rowkey. Rowkey can be a string of any length, so comparing rowkey is very expensive. In addition, in a query, even if the key columns are not used (for example, aggregation calculations), they must be read out, which leads to additional IO. Composite primary keys are very common in BigTable applications. The size of the primary key may be an order of magnitude larger than the column you care about, especially when the query column is compressed.
    In contrast, Kudu’s mutation is bound to rowid. So merge will be more efficient, by maintaining the counter: given the next mutation to be saved, we can simply subtract it, and then we can get how many mutations there are from base
    data to the current version. Or, direct addressing can be used to efficiently obtain the latest version of the data.
    In addition, if the key is not specified in the query, the execution plan will not consult the key, except for the key boundary conditions. For example:
    Write the picture description here
    As the primary key of the above table is (host,unitx_time), the pseudocode for execution in kudu is as follows: sum = 0 foreach RowSet:
    start_rowid = rowset.lookup_key(1349658729) end_rowid =
    rowset.lookup_key(1352250720) iter =
    rowset.new_iterator(“cpu_usage”) iter.seek(start_rowid) remaining =
    end_rowid-start_rowid while remaining> 0: block =
    iter. fetch_upto(remaining) sum += sum(block).
    Obtaining a block is also very efficient, because the mutation directly points to the index address of the block.
  3. timgstamp is not part of the data model.
    In the BigTable-like system, the timstamp of each cell is exposed to the user, which essentially constitutes a matching primary key of the cell. This means that this method can efficiently and directly access the specified version of the cell, and it stores all the versions of the entire time series of a cell. But Kudu is not efficient (need to perform multiple mutations), its timestamp is implemented from MVCC, it is not another description of the primary key.

    As an alternative, kudu can use native composite primary keys to meet time series scenarios, such as primary keys (host, unix_time).

Reference

Source document: kudu table design

< p>

Leave a Comment

Your email address will not be published.