WRY

Where Are You?
You are on the brave land,
To experience, to remember...

0%

Debezium Source Code Learning

Debezium MySQL Connector

Debezium is a type of Kafka Connector which implementes SourceConnector and SourceTask abstract class defined in Kafka.

So, Kafka can call Debezium as a connector and deploy it in standalone or distributed mode.

What I care?

  • How dose Debezium read binlog and generate events?
  • How dose Kafka call Debezium to work in standalone deployment mode.

Source Code List

  • Debezium MySqlConnector which implements SourceConnector

    Kafka Connector has start, stop, taskClass and taskConfigs main functions where taskClass specifies the class of the task to run.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Override
    public Class<? extends Task> taskClass() {
    final String implementation = properties.get(IMPLEMENTATION_PROP);
    if (isLegacy(implementation)) {
    LOGGER.warn("Legacy MySQL connector implementation is enabled");
    return io.debezium.connector.mysql.legacy.MySqlConnectorTask.class;
    }
    return io.debezium.connector.mysql.MySqlConnectorTask.class;
    }

  • Debezium MySqlConnectorTask which implements SourceTask

    Kafka SourceTask has start, stop and poll main functions where poll provides the data which need need send to Kafka.

    The start function creates EventDispatcher, MySqlStreamingChangeEventSource and ChangeEventSourceCoordinator object, then starts ChangeEventSourceCoordinator

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    ... ... ... ...
    final EventDispatcher<TableId> dispatcher = new EventDispatcher<>(
    connectorConfig,
    topicSelector,
    schema,
    queue,
    connectorConfig.getTableFilters().dataCollectionFilter(),
    DataChangeEvent::new,
    null,
    metadataProvider,
    heartbeat,
    schemaNameAdjuster,
    connection);
    ... ... ... ...
    ChangeEventSourceCoordinator<MySqlPartition, MySqlOffsetContext> coordinator = new ChangeEventSourceCoordinator<>(
    previousOffsets,
    errorHandler,
    MySqlConnector.class,
    connectorConfig,
    new MySqlChangeEventSourceFactory(connectorConfig, connection, errorHandler, dispatcher, clock, schema, taskContext, streamingMetrics, queue),
    new MySqlChangeEventSourceMetricsFactory(streamingMetrics),
    dispatcher,
    schema);
    ... ... ... ...
    coordinator.start(taskContext, this.queue, metadataProvider);

    poll function will call ChangeEventQueue::poll function to get the events which need send to Kafka broker.

  • Debezium ChangeEventSourceCoordinator

  • Debezium MySqlStreamingChangeEventSource

    Enter function: MySqlStreamingChangeEventSource initial function and MySqlStreamingChangeEventSource::execute

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    public MySqlStreamingChangeEventSource(MySqlConnectorConfig connectorConfig, MySqlConnection connection, EventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext,  MySqlStreamingChangeEventSourceMetrics metrics) {

    ... ... ... ...
    eventDeserializationFailureHandlingMode = connectorConfig.getEventProcessingFailureHandlingMode();
    inconsistentSchemaHandlingMode = connectorConfig.inconsistentSchemaFailureHandlingMode();

    // Set up the log reader ...
    client = taskContext.getBinaryLogClient();
    // BinaryLogClient will overwrite thread names later
    client.setThreadFactory(
    Threads.threadFactory(MySqlConnector.class, connectorConfig.getLogicalName(), "binlog-client", false, false,
    x -> binaryLogClientThreads.put(x.getName(), x)));
    ... ... ... ...

    boolean filterDmlEventsByGtidSource = configuration.getBoolean(MySqlConnectorConfig.GTID_SOURCE_FILTER_DML_EVENTS);
    gtidDmlSourceFilter = filterDmlEventsByGtidSource ? connectorConfig.gtidSourceFilter() : null;

    // Set up the event deserializer with additional type(s) ...
    final Map<Long, TableMapEventData> tableMapEventByTableId = new HashMap<Long, TableMapEventData>();
    EventDeserializer eventDeserializer = new EventDeserializer() {
    @Override
    public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
    try {
    // Delegate to the superclass ...
    Event event = super.nextEvent(inputStream);

    // We have to record the most recent TableMapEventData for each table number for our custom deserializers ...
    if (event.getHeader().getEventType() == EventType.TABLE_MAP) {
    TableMapEventData tableMapEvent = event.getData();
    tableMapEventByTableId.put(tableMapEvent.getTableId(), tableMapEvent);
    }
    return event;
    }
    // DBZ-217 In case an event couldn't be read we create a pseudo-event for the sake of logging
    catch (EventDataDeserializationException edde) {
    ... ... ... ...
    }
    };

    // Add our custom deserializers ...
    eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
    ... ... ... ...
    client.setEventDeserializer(eventDeserializer);
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    public void execute(ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException {
    ... ... ... ...
    // Start the log reader, which starts background threads ...
    if (context.isRunning()) {
    long timeout = connectorConfig.getConnectionTimeout().toMillis();
    long started = clock.currentTimeInMillis();
    try {
    LOGGER.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout);
    client.connect(timeout);
    ... ... ... ...
    }

    Object client belongs to BinaryLogClient

  • MySQL Binlog Connector Java BinaryLogClient

    connect function

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    public void connect() throws IOException, IllegalStateException {
    if (!connectLock.tryLock()) {
    throw new IllegalStateException("BinaryLogClient is already connected");
    }
    boolean notifyWhenDisconnected = false;
    try {
    Callable cancelDisconnect = null;
    try {
    try {
    long start = System.currentTimeMillis();
    channel = openChannel();
    if (connectTimeout > 0 && !isKeepAliveThreadRunning()) {
    cancelDisconnect = scheduleDisconnectIn(connectTimeout -
    (System.currentTimeMillis() - start));
    }
    if (channel.getInputStream().peek() == -1) {
    throw new EOFException();
    }
    } catch (IOException e) {
    throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
    ". Please make sure it's running.", e);
    }
    ... ... ... ...
    setupConnection();
    gtid = null;
    tx = false;
    requestBinaryLogStream();
    }
    ... ... ... ...
    listenForEventPackets();
    ... ... ... ...
    }

    requestBinaryLogStream function

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
        private void requestBinaryLogStream() throws IOException {
    long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
    Command dumpBinaryLogCommand;
    synchronized (gtidSetAccessLock) {
    if (gtidSet != null) {
    dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId,
    useBinlogFilenamePositionInGtidMode ? binlogFilename : "",
    useBinlogFilenamePositionInGtidMode ? binlogPosition : 4,
    gtidSet);
    } else {
    dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
    }
    }
    channel.write(dumpBinaryLogCommand);
    }

    listenForEventPackets function

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    private void listenForEventPackets() throws IOException {
    ByteArrayInputStream inputStream = channel.getInputStream();
    boolean completeShutdown = false;
    try {
    while (inputStream.peek() != -1) {
    int packetLength = inputStream.readInteger(3);
    inputStream.skip(1); // 1 byte for sequence
    int marker = inputStream.read();
    if (marker == 0xFF) {
    ErrorPacket errorPacket = new ErrorPacket(inputStream.read(packetLength - 1));
    throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(),
    errorPacket.getSqlState());
    }
    if (marker == 0xFE && !blocking) {
    completeShutdown = true;
    break;
    }
    Event event;
    try {
    event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ?
    new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) :
    inputStream);
    if (event == null) {
    throw new EOFException();
    }
    } catch (Exception e) {
    Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e;
    if (cause instanceof EOFException || cause instanceof SocketException) {
    throw e;
    }
    if (isConnected()) {
    for (LifecycleListener lifecycleListener : lifecycleListeners) {
    lifecycleListener.onEventDeserializationFailure(this, e);
    }
    }
    continue;
    }
    if (isConnected()) {
    eventLastSeen = System.currentTimeMillis();
    updateGtidSet(event);
    notifyEventListeners(event);
    updateClientBinlogFilenameAndPosition(event);
    }
    }
    } catch (Exception e) {
    if (isConnected()) {
    for (LifecycleListener lifecycleListener : lifecycleListeners) {
    lifecycleListener.onCommunicationFailure(this, e);
    }
    }
    } finally {
    if (isConnected()) {
    if (completeShutdown) {
    disconnect(); // initiate complete shutdown sequence (which includes keep alive thread)
    } else {
    disconnectChannel();
    }
    }
    }
    }

  • Debezium MySqlStreamingChangeEventSource Handle Events Generated by BinaryLogClient

    handleEvent function:

    1
    eventHandlers.getOrDefault(eventType, (e) -> ignoreEvent(offsetContext, e)).accept(event);

    This will call registered functions. For example, handleUpdate function calls handleChange function and the main logic in it is shown as below:

    1
    2
    3
    4
    // RowsProvider<T, U> rowsProvider, BinlogChangeEmitter<U> changeEmitter
    // changeEmitter is created by
    // new MySqlChangeRecordEmitter(partition, offsetContext, clock, Operation.UPDATE, row.getKey(), row.getValue())
    changeEmitter.emit(tableId, rows.get(row));

    The main logic in MySqlChangeRecordEmitter which extends RelationalChangeRecordEmitter is as shown below:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // receiver object  is created by below logic.
    new Receiver() {

    @Override
    public void changeRecord(Partition partition,
    DataCollectionSchema schema,
    Operation operation,
    Object key, Struct value,
    OffsetContext offset,
    ConnectHeaders headers)
    throws InterruptedException {
    if (operation == Operation.CREATE && signal.isSignal(dataCollectionId)) {
    signal.process(partition, value, offset);
    }

    if (neverSkip || !skippedOperations.contains(operation)) {
    transactionMonitor.dataEvent(partition, dataCollectionId, offset, key, value);
    eventListener.onEvent(dataCollectionId, offset, key, value);
    if (incrementalSnapshotChangeEventSource != null) {
    incrementalSnapshotChangeEventSource.processMessage(partition, dataCollectionId, key, offset);
    }
    streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers);
    }
    }
    }
    receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);
    So streamingReceiver is the class which receive events which need send to Kafka broker, and it's type is StreamingChangeRecordReceiver

  • Debezium EventDispatcher Saves Data Change Event into queue

    Main logic in class StreamingChangeRecordReceiver::changeRecord

    1
    queue.enqueue(changeEventCreator.createDataChangeEvent(record));

    queue object is a parameter passed to StreamingChangeRecordReceiver. and it is created by below logic and it's type is ChangeEventQueue

    1
    2
    3
    4
    5
    6
    7
    8
    this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
    .pollInterval(connectorConfig.getPollInterval())
    .maxBatchSize(connectorConfig.getMaxBatchSize())
    .maxQueueSize(connectorConfig.getMaxQueueSize())
    .maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
    .loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
    .buffering()
    .build();

  • Debezium ChangeEventQueue Controls When to send

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    protected void doEnqueue(T record) throws InterruptedException {
    if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("Enqueuing source record '{}'", record);
    }

    synchronized (this) {
    while (queue.size() >= maxQueueSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
    // notify poll() to drain queue
    this.notify();
    // queue size or queue sizeInBytes threshold reached, so wait a bit
    this.wait(pollInterval.toMillis());
    }

    queue.add(record);
    // If we pass a positiveLong max.queue.size.in.bytes to enable handling queue size in bytes feature
    if (maxQueueSizeInBytes > 0) {
    long messageSize = ObjectSizeCalculator.getObjectSize(record);
    sizeInBytesQueue.add(messageSize);
    currentQueueSizeInBytes += messageSize;
    }

    if (queue.size() >= maxBatchSize || (maxQueueSizeInBytes > 0 && currentQueueSizeInBytes >= maxQueueSizeInBytes)) {
    // notify poll() to start draining queue and do not wait
    this.notify();
    }
    }
    }

Core Class

  • MySqlPartition: serverName which used to identify task.
  • ChangeEventSourceContext: It indicates whether the source should run continue.
  • MySqlOffsetContext
  • DataChangeEvent
  • EnumMap<EventType, BlockingConsumer<Event>>
  • BinaryLogClient