Debezium MySQL Connector
Debezium is a type of Kafka Connector which implementes SourceConnector and SourceTask abstract class defined in Kafka.
So, Kafka can call Debezium as a connector and deploy it in standalone or distributed mode.
What I care?
- How dose Debezium read binlog and generate events?
- How dose Kafka call Debezium to work in standalone deployment mode.
Source Code List
Debezium MySqlConnector which implements SourceConnector
Kafka Connector has
start,stop,taskClassandtaskConfigsmain functions wheretaskClassspecifies the class of the task to run.1
2
3
4
5
6
7
8
9
public Class<? extends Task> taskClass() {
final String implementation = properties.get(IMPLEMENTATION_PROP);
if (isLegacy(implementation)) {
LOGGER.warn("Legacy MySQL connector implementation is enabled");
return io.debezium.connector.mysql.legacy.MySqlConnectorTask.class;
}
return io.debezium.connector.mysql.MySqlConnectorTask.class;
}Debezium MySqlConnectorTask which implements SourceTask
Kafka SourceTask has
start,stopandpollmain functions wherepollprovides the data which need need send to Kafka.The
startfunction createsEventDispatcher,MySqlStreamingChangeEventSourceandChangeEventSourceCoordinatorobject, then startsChangeEventSourceCoordinator1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25... ... ... ...
final EventDispatcher<TableId> dispatcher = new EventDispatcher<>(
connectorConfig,
topicSelector,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
null,
metadataProvider,
heartbeat,
schemaNameAdjuster,
connection);
... ... ... ...
ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
previousOffsets,
errorHandler,
MySqlConnector.class,
connectorConfig,
new MySqlChangeEventSourceFactory(connectorConfig, connection, errorHandler, dispatcher, clock, schema, taskContext, streamingMetrics, queue),
new MySqlChangeEventSourceMetricsFactory(streamingMetrics),
dispatcher,
schema);
... ... ... ...
coordinator.start(taskContext, this.queue, metadataProvider);pollfunction will callChangeEventQueue::pollfunction to get the events which need send to Kafka broker.Debezium MySqlStreamingChangeEventSource
Enter function:
MySqlStreamingChangeEventSourceinitial function andMySqlStreamingChangeEventSource::execute1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlConnection connection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics) {
... ... ... ...
eventDeserializationFailureHandlingMode = connectorConfig.getEventProcessingFailureHandlingMode();
inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();
// Set up the log reader ...
client = taskContext.getBinaryLogClient();
// BinaryLogClient will overwrite thread names later
client.setThreadFactory(
Threads.threadFactory(MySqlConnector.class, connectorConfig.getLogicalName(), "binlog-client", false, false,
x -> binaryLogClientThreads.put(x.getName(), x)));
... ... ... ...
boolean filterDmlEventsByGtidSource = configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;
// Set up the event deserializer with additional type(s) ...
final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
EventDeserializer eventDeserializer = new EventDeserializer() {
public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
try {
// Delegate to the superclass ...
Event event = super.nextEvent(inputStream);
// We have to record the most recent TableMapEventData for each table number for our custom deserializers ...
if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
TableMapEventData tableMapEvent = event.getData();
tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
}
return event;
}
// DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
catch (EventDataDeserializationException edde) {
... ... ... ...
}
};
// Add our custom deserializers ...
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
... ... ... ...
client.setEventDeserializer(eventDeserializer);
}1
2
3
4
5
6
7
8
9
10
11public void execute(ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException {
... ... ... ...
// Start the log reader, which starts background threads ...
if (context.isRunning()) {
long timeout = connectorConfig.getConnectionTimeout().toMillis();
long started = clock.currentTimeInMillis();
try {
LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
client.connect(timeout);
... ... ... ...
}Object
clientbelongs to BinaryLogClientMySQL Binlog Connector Java BinaryLogClient
connectfunction1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32public void connect() throws IOException, IllegalStateException {
if (!connectLock.tryLock()) {
throw new IllegalStateException("BinaryLogClient is already connected");
}
boolean notifyWhenDisconnected = false;
try {
Callable cancelDisconnect = null;
try {
try {
long start = System.currentTimeMillis();
channel = openChannel();
if (connectTimeout > 0 && !isKeepAliveThreadRunning()) {
cancelDisconnect = scheduleDisconnectIn(connectTimeout -
(System.currentTimeMillis() - start));
}
if (channel.getInputStream().peek() == -1) {
throw new EOFException();
}
} catch (IOException e) {
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
". Please make sure it's running.", e);
}
... ... ... ...
setupConnection();
gtid = null;
tx = false;
requestBinaryLogStream();
}
... ... ... ...
listenForEventPackets();
... ... ... ...
}requestBinaryLogStreamfunction1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private void requestBinaryLogStream() throws IOException {
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
gtidSet);
} else {
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
}
}
channel.write(dumpBinaryLogCommand);
}listenForEventPacketsfunction1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60private void listenForEventPackets() throws IOException {
ByteArrayInputStream inputStream = channel.getInputStream();
boolean completeShutdown = false;
try {
while (inputStream.peek() != -1) {
int packetLength = inputStream.readInteger(3);
inputStream.skip(1); // 1 byte for sequence
int marker = inputStream.read();
if (marker == 0xFF) {
ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
errorPacket.getSqlState());
}
if (marker == 0xFE && !blocking) {
completeShutdown = true;
break;
}
Event event;
try {
event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
inputStream);
if (event == null) {
throw new EOFException();
}
} catch (Exception e) {
Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
if (cause instanceof EOFException || cause instanceof SocketException) {
throw e;
}
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEventDeserializationFailure(this, e);
}
}
continue;
}
if (isConnected()) {
eventLastSeen = System.currentTimeMillis();
updateGtidSet(event);
notifyEventListeners(event);
updateClientBinlogFilenameAndPosition(event);
}
}
} catch (Exception e) {
if (isConnected()) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onCommunicationFailure(this, e);
}
}
} finally {
if (isConnected()) {
if (completeShutdown) {
disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
} else {
disconnectChannel();
}
}
}
}Debezium MySqlStreamingChangeEventSource Handle Events Generated by BinaryLogClient
handleEventfunction:1
eventHandlers.getOrDefault(eventType, (e) -> ignoreEvent(offsetContext, e)).accept(event);
This will call registered functions. For example,
handleUpdatefunction callshandleChangefunction and the main logic in it is shown as below:1
2
3
4// RowsProvider<T, U> rowsProvider, BinlogChangeEmitter<U> changeEmitter
// changeEmitter is created by
// new MySqlChangeRecordEmitter(partition, offsetContext, clock, Operation.UPDATE, row.getKey(), row.getValue())
changeEmitter.emit(tableId, rows.get(row));The main logic in
MySqlChangeRecordEmitterwhich extendsRelationalChangeRecordEmitteris as shown below:So1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// receiver object is created by below logic.
new Receiver() {
public void changeRecord(Partition partition,
DataCollectionSchema schema,
Operation operation,
Object key, Struct value,
OffsetContext offset,
ConnectHeaders headers)
throws InterruptedException {
if (operation == Operation.CREATE && signal.isSignal(dataCollectionId)) {
signal.process(partition, value, offset);
}
if (neverSkip || !skippedOperations.contains(operation)) {
transactionMonitor.dataEvent(partition, dataCollectionId, offset, key, value);
eventListener.onEvent(dataCollectionId, offset, key, value);
if (incrementalSnapshotChangeEventSource != null) {
incrementalSnapshotChangeEventSource.processMessage(partition, dataCollectionId, key, offset);
}
streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers);
}
}
}
receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);streamingReceiveris the class which receive events which need send to Kafka broker, and it's type isStreamingChangeRecordReceiverDebezium EventDispatcher Saves Data Change Event into queue
Main logic in class
StreamingChangeRecordReceiver::changeRecord1
queue.enqueue(changeEventCreator.createDataChangeEvent(record));
queueobject is a parameter passed to StreamingChangeRecordReceiver. and it is created by below logic and it's type isChangeEventQueue1
2
3
4
5
6
7
8this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.buffering()
.build();Debezium ChangeEventQueue Controls When to send
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27protected void doEnqueue(T record) throws InterruptedException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enqueuing source record '{}'", record);
}
synchronized (this) {
while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
// notify poll() to drain queue
this.notify();
// queue size or queue sizeInBytes threshold reached, so wait a bit
this.wait(pollInterval.toMillis());
}
queue.add(record);
// If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
if (maxQueueSizeInBytes > 0) {
long messageSize = ObjectSizeCalculator.getObjectSize(record);
sizeInBytesQueue.add(messageSize);
currentQueueSizeInBytes += messageSize;
}
if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
// notify poll() to start draining queue and do not wait
this.notify();
}
}
}
Core Class
MySqlPartition: serverName which used to identify task.ChangeEventSourceContext: It indicates whether the source should run continue.MySqlOffsetContextDataChangeEventEnumMap<EventType, BlockingConsumer<Event>>BinaryLogClient