A
Apache SeaTunnel
Guest
Overview
To achieve parallel reading, the Apache SeaTunnel MySQL CDC connector needs to split large tables into multiple splits. For non-primary key tables, the connector provides a variety of intelligent sharding strategies to ensure data integrity and reading efficiency. This article will detail the core sharding strategies supported by Apache SeaTunnel, the mechanism and implementation of sharding strategies, and compare the advantages and disadvantages of each sharding strategy.
1. Sharding Column Selection Strategy
1.1 Selection Priority
Code:
1. User-configured snapshotSplitColumn (preferably a unique key)
2. Primary key column (selected based on data type priority)
3. Unique key column (selected based on data type priority)
4. No available columns β Single split strategy
1.2 Supported Data Types
Data types supported by the MySQL CDC connector:
According to the implementation of the
AbstractJdbcSourceChunkSplitter.isEvenlySplitColumn()
method:
Code:
// AbstractJdbcSourceChunkSplitter.isEvenlySplitColumn()
switch (fromDbzColumn(splitColumn).getSqlType()) {
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
case DECIMAL:
case STRING:
return true;
default:
return false;
}

- Numeric types: TINYINT, SMALLINT, INT, BIGINT, DECIMAL
- String type: STRING (using hash sharding)
Note: MySQL CDC does not support sharding by datetime types
Unsupported types:
DATE: Not supported as a sharding column
DATETIME: Not supported as a sharding column
TIMESTAMP: Not supported as a sharding column
TIME: Not supported as a sharding column
Comparison: Support status of the ordinary JDBC connector
It is worth noting that the ordinary JDBC connector (
DynamicChunkSplitter
) supports sharding by DATE type:
Code:
// Types supported by DynamicChunkSplitter
switch (splitColumnType.getSqlType()) {
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
case DECIMAL:
case DOUBLE:
case FLOAT:
return evenlyColumnSplitChunks(table, splitColumnName, min, max, chunkSize);
case STRING:
// String sharding logic
case DATE: // β
Ordinary JDBC supports DATE type
return dateColumnSplitChunks(table, splitColumnName, min, max, chunkSize);
}
Practical impacts and solutions
If a table only has index columns of datetime types, MySQL CDC will:
- Fail to find a suitable sharding column
- Fall back to the single split mode
- Lose the advantage of parallel reading
1.3 Data Type Priority
Code:
// Priority: 1 is the highest, 6 is the lowest
TINYINT(1) > SMALLINT(2) > INT(3) > BIGINT(4) > DECIMAL(5) > STRING(6)
2. Sharding Strategy Decision Mechanism
SeaTunnel determines which sharding strategy to use through a sophisticated decision-making algorithm, and this decision-making process is based on factors such as data distribution characteristics and table size.
2.1 Overview of the Decision-Making Process
Code:
// The core decision-making logic is located in AbstractJdbcSourceChunkSplitter.generateSplits()
public Collection<SnapshotSplit> generateSplits(JdbcConnection jdbc, TableId tableId) {
// 1. Obtain configuration parameters
final int chunkSize = sourceConfig.getSplitSize(); // Default: 8096
final double distributionFactorUpper = sourceConfig.getDistributionFactorUpper(); // Default: 100.0
final double distributionFactorLower = sourceConfig.getDistributionFactorLower(); // Default: 0.05
final int sampleShardingThreshold = sourceConfig.getSampleShardingThreshold(); // Default: 1000
// 2. Check the sharding column type
if (isEvenlySplitColumn(splitColumn)) {
// 3. Query the approximate number of rows and calculate the distribution factor
long approximateRowCnt = queryApproximateRowCnt(jdbc, tableId);
double distributionFactor = calculateDistributionFactor(tableId, min, max, approximateRowCnt);
// 4. Determine whether the data distribution is uniform
boolean dataIsEvenlyDistributed =
distributionFactor >= distributionFactorLower &&
distributionFactor <= distributionFactorUpper;
if (dataIsEvenlyDistributed) {
// Uniform sharding strategy
return splitEvenlySizedChunks(...);
} else {
// 5. Check if the sampling strategy is needed
int shardCount = (int) (approximateRowCnt / chunkSize);
if (sampleShardingThreshold < shardCount) {
// Sampling-based sharding strategy
return efficientShardingThroughSampling(...);
} else {
// Non-uniform sharding strategy
return splitUnevenlySizedChunks(...);
}
}
} else {
// String type: Non-uniform sharding strategy
return splitUnevenlySizedChunks(...);
}
}
2.2 Calculation of the Distribution Factor
Core formula:
Code:
distributionFactor = (MAX - MIN + 1) / approximateRowCount
Calculation logic:
Code:
protected double calculateDistributionFactor(TableId tableId, Object min, Object max, long approximateRowCnt) {
if (approximateRowCnt == 0) {
return Double.MAX_VALUE; // Handling for empty tables
}
BigDecimal difference = ObjectUtils.minus(max, min);
final BigDecimal subRowCnt = difference.add(BigDecimal.valueOf(1));
double distributionFactor = subRowCnt.divide(
new BigDecimal(approximateRowCnt), 4, ROUND_CEILING).doubleValue();
return distributionFactor;
}
Meaning of the distribution factor:
- factor β 1.0: The data distribution is ideal, with continuous IDs and no gaps
- factor > 100: Sparse data, where the ID range is much larger than the number of rows (e.g., IDs 1-1,000,000 but only 1,000 rows)
- factor < 0.05: Dense data, where multiple rows share similar ID values (e.g., a timestamp column with multiple records in the same second)
2.3 Detailed Explanation of Decision-Making Conditions
Condition 1: Sharding Column Type Check
Code:
// Data types that support uniform sharding
private boolean isEvenlySplitColumn(Column splitColumn) {
return splitColumn.isNumeric() || splitColumn.isTemporalType();
}
Condition 2: Judgment of Data Distribution Uniformity
Code:
boolean dataIsEvenlyDistributed =
doubleCompare(distributionFactor, distributionFactorLower) >= 0 &&
doubleCompare(distributionFactor, distributionFactorUpper) <= 0;
// That is: 0.05 β€ distributionFactor β€ 100
Condition 3: Trigger Condition for the Sampling Strategy
Code:
int shardCount = (int) (approximateRowCnt / chunkSize);
if (sampleShardingThreshold < shardCount) {
// Enable the sampling strategy when the estimated number of splits exceeds 1000
}
2.4 Practical Decision-Making Examples
Example 1: Ideal Uniform Distribution
Code:
Table: user_orders
Sharding column: order_id (BIGINT)
Data range: 1 - 100,000
Number of rows: 100,000
chunkSize: 10,000
Calculation:
distributionFactor = (100000 - 1 + 1) / 100000 = 1.0
Judgment: 0.05 β€ 1.0 β€ 100 β Uniform data distribution
Result: Use the uniform sharding strategy to generate 10 splits
Example 2: Sparse Data, Triggering Sampling
Code:
Table: big_transactions
Sharding column: transaction_id (BIGINT)
Data range: 1 - 10,000,000
Number of rows: 50,000
chunkSize: 1,000
Calculation:
distributionFactor = (10000000 - 1 + 1) / 50000 = 200
Estimated number of splits = 50000 / 1000 = 50
Judgment: 200 > 100 β Non-uniform data distribution
50 < 1000 β Sampling not triggered
Result: Use the non-uniform sharding strategy
Example 3: Large Table Triggering the Sampling Strategy
Code:
Table: log_events
Sharding column: event_id (BIGINT)
Data range: 1 - 100,000
Number of rows: 5,000,000
chunkSize: 1,000
Calculation:
distributionFactor = (100000 - 1 + 1) / 5000000 = 0.2
Estimated number of splits = 5000000 / 1000 = 5000
Judgment: 0.02 < 0.05 β Non-uniform data distribution
5000 > 1000 β Trigger the sampling strategy
Result: Use the sampling-based sharding strategy, generate 5000 splits after sampling
Example 4: Dense Distribution of Timestamp Columns (assuming timestamp type is supported)
Code:
Table: sensor_data
Sharding column: timestamp (TIMESTAMP)
Data range: 2023-01-01 00:00:00 - 2023-01-01 01:00:00 (3600 seconds)
Number of rows: 1,000,000
chunkSize: 10,000
Calculation:
distributionFactor = 3600 / 1000000 = 0.0036
Judgment: 0.0036 < 0.05 β Non-uniform data distribution
Estimated number of splits = 1000000 / 10000 = 100 < 1000
Result: Use the non-uniform sharding strategy
2.5 Summary of Strategy Selection
Condition Combination | Distribution Factor Range | Estimated Number of Splits | Selected Strategy | Applicable Scenarios |
---|---|---|---|---|
Numeric column + Uniform distribution | [0.05, 100] | Any | Uniform sharding | Auto-incrementing IDs, uniformly distributed numeric values |
Numeric column + Non-uniform + Small table | <0.05 or >100 | β€1000 | Non-uniform sharding | Sparse IDs, dense timestamps |
Numeric column + Non-uniform + Large table | <0.05 or >100 | >1000 | Sampling-based sharding | Large tables with extremely non-uniform distribution |
String column | Not applicable | Any | Non-uniform sharding | String-type sharding columns |
3. Three Core Sharding Strategies
3.1 Uniform Sharding (Evenly Sized Chunks)
Applicable scenarios: Numeric columns with uniform data distribution
Judgment conditions:
Code:
// Calculation of the distribution factor
distributionFactor = (max - min + 1) / approximateRowCount
// Judgment of uniform distribution
distributionFactorLower <= distributionFactor <= distributionFactorUpper
// Default: 0.05 <= distributionFactor <= 100
Sharding logic:
Code:
// Calculation of dynamic chunk size
dynamicChunkSize = Math.max((int)(distributionFactor * chunkSize), 1)
// Calculation of sharding range
chunkStart = null
chunkEnd = min + dynamicChunkSize
while (chunkEnd <= max) {
splits.add(ChunkRange.of(chunkStart, chunkEnd))
chunkStart = chunkEnd
chunkEnd = chunkEnd + dynamicChunkSize
}
// Add the last split
splits.add(ChunkRange.of(chunkStart, null))
Example:
Code:
Table: user_table, Primary key: id, Range: 1-10000, Number of rows: 10000
distributionFactor = (10000-1+1)/10000 = 1.0
chunkSize = 1000, dynamicChunkSize = 1000
Sharding result:
Split1: [null, 1000] // id <= 1000
Split2: [1000, 2000] // 1000 < id <= 2000
Split3: [2000, 3000] // 2000 < id <= 3000
...
Split10: [9000, null] // id > 9000
3.2 Non-Uniform Sharding (Unevenly Sized Chunks)
Applicable scenarios: Non-uniform data distribution or non-numeric columns
Sharding logic:
Code:
// Continuously query the maximum value of the next chunk
Object chunkStart = null
Object chunkEnd = queryNextChunkMax(jdbc, min, tableId, splitColumn, max, chunkSize)
while (chunkEnd != null && chunkEnd <= max) {
splits.add(ChunkRange.of(chunkStart, chunkEnd))
chunkStart = chunkEnd
chunkEnd = queryNextChunkMax(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize)
}
splits.add(ChunkRange.of(chunkStart, null))
SQL example:
Code:
-- Query the maximum value of the next chunk
SELECT MAX(split_column) FROM (
SELECT split_column FROM table_name
WHERE split_column >= ?
ORDER BY split_column
LIMIT ?
) t
Example:
Code:
Table: order_table, Sharding column: create_time, chunkSize=1000
Query process:
1. Query the maximum create_time of the first 1000 rows β '2023-01-15 10:30:00'
2. Query the maximum create_time of the next 1000 rows β '2023-02-20 15:45:00'
3. Continue querying...
Sharding result:
Split1: [null, '2023-01-15 10:30:00']
Split2: ['2023-01-15 10:30:00', '2023-02-20 15:45:00']
Split3: ['2023-02-20 15:45:00', '2023-03-25 09:20:00']
...
3.3 Sampling-Based Sharding
Applicable scenarios: Large tables with extremely non-uniform data distribution
Trigger conditions:
Code:
// Enable when the estimated number of splits exceeds the threshold
int shardCount = (int)(approximateRowCount / chunkSize)
if (sampleShardingThreshold < shardCount) {
// Use sampling-based sharding
}
// Default threshold: 1000
Sampling logic:
Code:
// Sample data
Object[] sampleData = sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate)
// Calculate the number of samples per split
double approxSamplePerShard = (double)sampleData.length / shardCount
// Determine the split boundaries based on the sample data
for (int i = 0; i < shardCount; i++) {
Object chunkStart = lastEnd
Object chunkEnd = (i < shardCount - 1)
? sampleData[(int)((i + 1) * approxSamplePerShard)]
: null
splits.add(ChunkRange.of(chunkStart, chunkEnd))
}
Example:
Code:
Table: big_table, Number of rows: 10 million, chunkSize=10000, Estimated number of splits=1000
inverseSamplingRate=1000 (sampling rate 1/1000)
Sampling process:
1. Sample 10000 rows of data from the table
2. Sort the sample data by the sharding column
3. According to the requirement of 1000 splits, determine a split boundary every 10 samples
Sharding result: Boundaries determined based on the distribution of the sample data
4. Detailed Explanation of SQL Query Cases and Code Implementation
4.1 Mapping of Core SQL Query Methods
SQL Query Type | Corresponding Method | Implementation Class | Specific Function |
---|---|---|---|
MIN/MAX query | queryMinMax() | MySqlUtils.java | Obtain |
the minimum and maximum values of the sharding column | |||
Row count query | queryApproximateRowCnt() | MySqlUtils.java | Obtain the approximate number of rows in the table |
Dynamic boundary query | queryNextChunkMax() | MySqlUtils.java | Boundary calculation for non-uniform sharding |
Sample data query | sampleDataFromColumn() | MySqlUtils.java | Data collection for the sampling strategy |
String hash query | hashModForField() | MysqlDialect.java | Hash sharding for string types |
5. Summary of SQL Query Patterns
5.1 Comparison of SQL Query Patterns for Each Strategy
Strategy Type | Data Type | SQL Query Pattern | Example |
---|---|---|---|
Uniform Sharding | Numeric | WHERE col >= start AND col < end | WHERE order_id >= 1 AND order_id < 10001 |
Uniform Sharding | String | Hash modulo query | WHERE ABS(CRC32(name) % 4) = 0 |
Non-uniform Sharding | Numeric | Dynamic boundary query | SELECT MAX(id) FROM (SELECT id FROM table WHERE id >= ? ORDER BY id LIMIT ?) |
Non-uniform Sharding | String | Hash modulo query | WHERE ABS(MD5(name) % 4) = 0 |
Sampling-based Sharding | Numeric | Sampling + boundary query | WHERE MOD((id - (SELECT MIN(id) FROM table)), 1000) = 0 |
Sampling-based Sharding | String | String sampling query | WHERE ABS(CRC32(name) % 1000) = 0 |
5.2 Comparison of SQL Differences Across Databases
Database | Hash Function | Row Count Statistics | Pagination Syntax |
---|---|---|---|
MySQL | MD5(field) | SHOW TABLE STATUS | LIMIT n |
PostgreSQL | HASHTEXT(field) | pg_class.reltuples | LIMIT n |
SQL Server | HASHBYTES('MD5', field) | sys.dm_db_partition_stats | TOP n |
Oracle | ORA_HASH(field) | all_tables.num_rows | ROWNUM <= n |
6. Performance Optimization and Configuration
6.1 Distribution Factor Tuning
Code:
# Distribution factor configuration
chunk-key.even-distribution.factor.upper-bound = 100.0 # Upper limit, default 100.0
chunk-key.even-distribution.factor.lower-bound = 0.05 # Lower limit, default 0.05
Parameter Description:
chunk-key.even-distribution.factor.upper-bound
: Upper bound of the uniform distribution factor, used to determine if data is uniformly distributedchunk-key.even-distribution.factor.lower-bound
: Lower bound of the uniform distribution factor, calculated as: (MAX(id) - MIN(id) + 1) / row count
6.2 Sampling Strategy Tuning
Code:
# Sampling configuration
sample-sharding.threshold = 1000 # Sampling threshold, default 1000
inverse-sampling.rate = 1000 # Inverse of sampling rate, default 1000
Parameter Description:
sample-sharding.threshold
: Threshold of estimated number of splits to trigger the sampling sharding strategyinverse-sampling.rate
: Inverse of the sampling rate, e.g., 1000 means a sampling rate of 1/1000
6.3 Snapshot Sharding Configuration
Code:
# Snapshot sharding configuration
snapshot.split.size = 8096 # Split size, default 8096 rows
snapshot.fetch.size = 1024 # Fetch size per batch, default 1024 rows
Parameter Description:
snapshot.split.size
: Size of table snapshot splits (in rows)snapshot.fetch.size
: Maximum fetch size per poll when reading table snapshots
6.4 Core Configuration Parameters
Parameter Name | Type | Default Value | Description |
---|---|---|---|
snapshot.split.size | Integer | 8096 | Size of table snapshot splits (in rows) |
snapshot.fetch.size | Integer | 1024 | Maximum number of rows fetched per batch when reading snapshots |
chunk-key.even-distribution.factor.upper-bound | Double | 100.0 | Upper bound of the uniform distribution factor |
chunk-key.even-distribution.factor.lower-bound | Double | 0.05 | Lower bound of the uniform distribution factor |
sample-sharding.threshold | Integer | 1000 | Threshold for sampling-based sharding |
inverse-sampling.rate | Integer | 1000 | Inverse of the sampling rate |
server-id | String | Randomly generated | Unique ID for the database client |
server-time-zone | String | UTC | Session time zone of the database server |
connect.timeout.ms | Duration | 30000 | Connection timeout (milliseconds) |
connect.max-retries | Integer | 3 | Maximum number of retries |
connection.pool.size | Integer | 20 | JDBC connection pool size |
6.5 Configuration Example
Code:
source {
Mysql-CDC {
# Basic connection configuration
url = "jdbc:mysql://localhost:3306/test"
username = "root"
password = "123456"
table-names = ["test.user_table"]
# Snapshot sharding configuration
snapshot.split.size = 8096
snapshot.fetch.size = 1024
# Distribution factor configuration
chunk-key.even-distribution.factor.upper-bound = 100.0
chunk-key.even-distribution.factor.lower-bound = 0.05
# Sampling strategy configuration
sample-sharding.threshold = 1000
inverse-sampling.rate = 1000
# Connection configuration
server-id = "5400"
server-time-zone = "Asia/Shanghai"
connect.timeout.ms = 30000
connect.max-retries = 3
connection.pool.size = 20
# Startup mode
startup.mode = "initial"
# Other configurations
exactly_once = false
format = "DEFAULT"
}
}
7. Sharding Strategy Control and On-Site Application
7.1 Summary of Strategy Control Parameters
By adjusting key parameters, you can precisely control which sharding strategy SeaTunnel uses to cope with different on-site scenarios:
1. Force Using Uniform Sharding Strategy
Applicable Scenario: Relatively uniform data distribution, pursuing optimal parallel performance
Code:
source {
Mysql-CDC {
url = "jdbc:mysql://localhost:3306/test"
username = "root"
password = "123456"
table-names = ["test.uniform_table"]
# Force uniform sharding configuration
chunk-key.even-distribution.factor.upper-bound = 10000.0 # Significantly increase the upper bound
chunk-key.even-distribution.factor.lower-bound = 0.001 # Significantly decrease the lower bound
sample-sharding.threshold = 100000 # Extremely high threshold to avoid sampling
snapshot.split.size = 8096 # Standard split size
}
}
2. Force Using Non-uniform Sharding Strategy
Applicable Scenario: Uneven data distribution, but the table is not particularly large
Code:
source {
Mysql-CDC {
url = "jdbc:mysql://localhost:3306/test"
username = "root"
password = "123456"
table-names = ["test.sparse_table"]
# Force non-uniform sharding configuration
chunk-key.even-distribution.factor.upper-bound = 0.1 # Extremely low upper bound
chunk-key.even-distribution.factor.lower-bound = 0.1 # Extremely low lower bound
sample-sharding.threshold = 100000 # Extremely high threshold to avoid sampling
snapshot.split.size = 5000 # Moderate split size
}
}
3. Force Using Sampling-Based Sharding Strategy
Applicable Scenario: Super large tables requiring efficient sharding
Code:
source {
Mysql-CDC {
url = "jdbc:mysql://localhost:3306/test"
username = "root"
password = "123456"
table-names = ["test.huge_table"]
# Force sampling-based sharding configuration
chunk-key.even-distribution.factor.upper-bound = 0.01 # Extremely low upper bound
chunk-key.even-distribution.factor.lower-bound = 0.01 # Extremely low lower bound
sample-sharding.threshold = 100 # Extremely low threshold to force sampling
inverse-sampling.rate = 500 # Increase sampling rate
snapshot.split.size = 10000 # Larger split size
}
}
4. Avoid Sampling Strategy (High Business Database Pressure)
Applicable Scenario: Large tables but with high business database pressure, cannot use sampling
Code:
source {
Mysql-CDC {
url = "jdbc:mysql://localhost:3306/test"
username = "root"
password = "123456"
table-names = ["test.large_table"]
# Avoid sampling configuration
chunk-key.even-distribution.factor.upper-bound = 1000.0 # Relax upper bound
chunk-key.even-distribution.factor.lower-bound = 0.001 # Relax lower bound
sample-sharding.threshold = 50000 # Extremely high threshold
snapshot.split.size = 50000 # Large splits to reduce total count
connection.pool.size = 5 # Reduce connection count
snapshot.fetch.size = 1024 # Control fetch size
}
}
5. High Parallel Performance Optimization
Applicable Scenario: Pursuing maximum parallelism and processing speed
Code:
source {
Mysql-CDC {
url = "jdbc:mysql://localhost:3306/test"
username = "root"
password = "123456"
table-names = ["test.performance_table"]
# High parallelism configuration
snapshot.split.size = 2000 # Small splits to increase parallelism
snapshot.fetch.size = 2048 # Increase fetch size
connection.pool.size = 30 # Increase connection pool
chunk-key.even-distribution.factor.upper-bound = 1000.0 # Prioritize uniform sharding
sample-sharding.threshold = 10000 # Moderate threshold
}
}
7.2 Parameter Decision Matrix
Scenario Type | Split Size | Upper Bound Factor | Lower Bound Factor | Sampling Threshold | Strategy Result |
---|---|---|---|---|---|
Uniform data, pursuing performance | 2000-8096 | 10000.0 | 0.001 | 100000 | Uniform sharding |
Sparse data, medium table | 5000-10000 | 0.1 | 0.1 | 100000 | Non-uniform sharding |
Super large table, allowing sampling | 10000+ | 0.01 | 0.01 | 100 | Sampling-based sharding |
Large table, high business DB pressure | 50000+ | 1000.0 | 0.001 | 50000 | Avoid sampling |
High parallelism requirement | 2000 | 1000.0 | 0.001 | 10000 | Uniform sharding |
7.3 Explanation of Core Control Parameters
Strategy Selection Control:
chunk-key.even-distribution.factor.upper-bound
: Controls whether to use uniform shardingchunk-key.even-distribution.factor.lower-bound
: Controls the sensitivity of distribution judgmentsample-sharding.threshold
: Controls whether to trigger the sampling strategy
Performance Tuning Control:
snapshot.split.size
: Controls parallelism and memory usagesnapshot.fetch.size
: Controls database query pressureconnection.pool.size
: Controls database connection pressureinverse-sampling.rate
: Controls sampling accuracy
8. Summary
The table sharding mechanism of the SeaTunnel MySQL CDC connector is implemented through the following core components:
- AbstractJdbcSourceChunkSplitter: Core sharding logic
- MySqlUtils: MySQL-specific SQL query implementation
- JdbcDialect: Database dialect support
Three sharding strategies:
- Uniform sharding: Suitable for numeric and date types with uniform data distribution
- Non-uniform sharding: Suitable for scenarios with sparse data distribution
- Sampling-based sharding: Suitable for efficient sharding of super large tables
Decision-making mechanism:
- Judges data distribution characteristics through the distribution factor
- Selects appropriate sharding strategies based on table size
- Supports special handling for various data types
Through precise parameter control, this mechanism can cope with various complex on-site scenarios, ensuring that MySQL CDC can achieve efficient and balanced data sharding when processing tables of various sizes and types.
Appendix | Sharding Process

SeaTunnel Sharding Strategy Decision Parameters
Sure, here is the translated content in English, formatted as Mermaid code:

Continue reading...