Debezium MySQL Connector
Debezium is a type of Kafka Connector which implementes SourceConnector
and SourceTask
abstract class defined in Kafka.
So, Kafka can call Debezium as a connector and deploy it in standalone or distributed mode.
What I care?
- How dose Debezium read binlog and generate events?
- How dose Kafka call Debezium to work in standalone deployment mode.
Source Code List
Debezium MySqlConnector which implements SourceConnector
Kafka Connector has
start
,stop
,taskClass
andtaskConfigs
main functions wheretaskClass
specifies the class of the task to run.1
2
3
4
5
6
7
8
9
public Class<? extends Task> taskClass() {
final String implementation = properties.get(IMPLEMENTATION_PROP);
if (isLegacy(implementation)) {
LOGGER.warn("Legacy MySQL connector implementation is enabled");
return io.debezium.connector.mysql.legacy.MySqlConnectorTask.class;
}
return io.debezium.connector.mysql.MySqlConnectorTask.class;
}Debezium MySqlConnectorTask which implements SourceTask
Kafka SourceTask has
start
,stop
andpoll
main functions wherepoll
provides the data which need need send to Kafka.The
start
function createsEventDispatcher
,MySqlStreamingChangeEventSource
andChangeEventSourceCoordinator
object, then startsChangeEventSourceCoordinator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25... ... ... ...
final EventDispatcher<TableId> dispatcher = new EventDispatcher<>(
connectorConfig,
topicSelector,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
null,
metadataProvider,
heartbeat,
schemaNameAdjuster,
connection);
... ... ... ...
ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
errorHandler,
MySqlConnector.class,
connectorConfig,
new MySqlChangeEventSourceFactory(connectorConfig, connection, errorHandler, dispatcher, clock, schema, taskContext, streamingMetrics, queue),
new MySqlChangeEventSourceMetricsFactory(streamingMetrics),
dispatcher,
schema);
... ... ... ...
coordinator.start(taskContext, this.queue, metadataProvider);poll
function will callChangeEventQueue::poll
function to get the events which need send to Kafka broker.Debezium MySqlStreamingChangeEventSource
Enter function:
MySqlStreamingChangeEventSource
initial function andMySqlStreamingChangeEventSource::execute
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlConnection connection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics) {
... ... ... ...
eventDeserializationFailureHandlingMode = connectorConfig.getEventProcessingFailureHandlingMode();
inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();
// Set up the log reader ...
client = taskContext.getBinaryLogClient();
// BinaryLogClient will overwrite thread names later
client.setThreadFactory(
Threads.threadFactory(MySqlConnector.class, connectorConfig.getLogicalName(), "binlog-client", false, false,
x -> binaryLogClientThreads.put(x.getName(), x)));
... ... ... ...
boolean filterDmlEventsByGtidSource = configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
// Set up the event deserializer with additional type(s) ...
final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
EventDeserializer eventDeserializer = new EventDeserializer() {
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
try {
// Delegate to the superclass ...
Event event = super.nextEvent(inputStream);
// We have to record the most recent TableMapEventData for each table number for our custom deserializers ...
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
}
return event;
}
// DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
catch (EventDataDeserializationException edde) {
... ... ... ...
}
};
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
... ... ... ...
client.setEventDeserializer(eventDeserializer);
}1
2
3
4
5
6
7
8
9
10
11public void execute(ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException {
... ... ... ...
// Start the log reader, which starts background threads ...
if (context.isRunning()) {
long timeout = connectorConfig.getConnectionTimeout().toMillis();
long started = clock.currentTimeInMillis();
try {
LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
client.connect(timeout);
... ... ... ...
}Object
client
belongs to BinaryLogClientMySQL Binlog Connector Java BinaryLogClient
connect
function1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public void connect() throws IOException, IllegalStateException {
if (!connectLock.tryLock()) {
throw new IllegalStateException("BinaryLogClient is already connected");
}
boolean notifyWhenDisconnected = false;
try {
Callable cancelDisconnect = null;
try {
try {
long start = System.currentTimeMillis();
channel = openChannel();
if (connectTimeout > 0 && !isKeepAliveThreadRunning()) {
cancelDisconnect = scheduleDisconnectIn(connectTimeout -
(System.currentTimeMillis() - start));
}
if (channel.getInputStream().peek() == -1) {
throw new EOFException();
}
} catch (IOException e) {
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
". Please make sure it's running.", e);
}
... ... ... ...
setupConnection();
gtid = null;
tx = false;
requestBinaryLogStream();
}
... ... ... ...
listenForEventPackets();
... ... ... ...
}requestBinaryLogStream
function1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private void requestBinaryLogStream() throws IOException {
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
gtidSet);
} else {
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
}
}
channel.write(dumpBinaryLogCommand);
}listenForEventPackets
function1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60private void listenForEventPackets() throws IOException {
ByteArrayInputStream inputStream = channel.getInputStream();
boolean completeShutdown = false;
try {
while (inputStream.peek() != -1) {
int packetLength = inputStream.readInteger(3);
inputStream.skip(1); // 1 byte for sequence
int marker = inputStream.read();
if (marker == 0xFF) {
ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
errorPacket.getSqlState());
}
if (marker == 0xFE && !blocking) {
completeShutdown = true;
break;
}
Event event;
try {
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
inputStream);
if (event == null) {
throw new EOFException();
}
} catch (Exception e) {
Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
if (cause instanceof EOFException || cause instanceof SocketException) {
throw e;
}
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, e);
}
}
continue;
}
if (isConnected()) {
eventLastSeen = System.currentTimeMillis();
updateGtidSet(event);
notifyEventListeners(event);
updateClientBinlogFilenameAndPosition(event);
}
}
} catch (Exception e) {
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onCommunicationFailure(this, e);
}
}
} finally {
if (isConnected()) {
if (completeShutdown) {
disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
} else {
disconnectChannel();
}
}
}
}Debezium MySqlStreamingChangeEventSource Handle Events Generated by BinaryLogClient
handleEvent
function:1
eventHandlers.getOrDefault(eventType, (e) -> ignoreEvent(offsetContext, e)).accept(event);
This will call registered functions. For example,
handleUpdate
function callshandleChange
function and the main logic in it is shown as below:1
2
3
4// RowsProvider<T, U> rowsProvider, BinlogChangeEmitter<U> changeEmitter
// changeEmitter is created by
// new MySqlChangeRecordEmitter(partition, offsetContext, clock, Operation.UPDATE, row.getKey(), row.getValue())
changeEmitter.emit(tableId, rows.get(row));The main logic in
MySqlChangeRecordEmitter
which extendsRelationalChangeRecordEmitter
is as shown below:So1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// receiver object is created by below logic.
new Receiver() {
public void changeRecord(Partition partition,
DataCollectionSchema schema,
Operation operation,
Object key, Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
if (operation == Operation.CREATE && signal.isSignal(dataCollectionId)) {
signal.process(partition, value, offset);
}
if (neverSkip || !skippedOperations.contains(operation)) {
transactionMonitor.dataEvent(partition, dataCollectionId, offset, key, value);
eventListener.onEvent(dataCollectionId, offset, key, value);
if (incrementalSnapshotChangeEventSource != null) {
incrementalSnapshotChangeEventSource.processMessage(partition, dataCollectionId, key, offset);
}
streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers);
}
}
}
receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);streamingReceiver
is the class which receive events which need send to Kafka broker, and it's type isStreamingChangeRecordReceiver
Debezium EventDispatcher Saves Data Change Event into queue
Main logic in class
StreamingChangeRecordReceiver::changeRecord
1
queue.enqueue(changeEventCreator.createDataChangeEvent(record));
queue
object is a parameter passed to StreamingChangeRecordReceiver. and it is created by below logic and it's type isChangeEventQueue
1
2
3
4
5
6
7
8this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.buffering()
.build();Debezium ChangeEventQueue Controls When to send
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27protected void doEnqueue(T record) throws InterruptedException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enqueuing source record '{}'", record);
}
synchronized (this) {
while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
// notify poll() to drain queue
this.notify();
// queue size or queue sizeInBytes threshold reached, so wait a bit
this.wait(pollInterval.toMillis());
}
queue.add(record);
// If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
if (maxQueueSizeInBytes > 0) {
long messageSize = ObjectSizeCalculator.getObjectSize(record);
sizeInBytesQueue.add(messageSize);
currentQueueSizeInBytes += messageSize;
}
if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
// notify poll() to start draining queue and do not wait
this.notify();
}
}
}
Core Class
MySqlPartition
: serverName which used to identify task.ChangeEventSourceContext
: It indicates whether the source should run continue.MySqlOffsetContext
DataChangeEvent
EnumMap<EventType, BlockingConsumer<Event>>
BinaryLogClient