Sync Table via Kafka Connect

To synchronize a table from one database to another, you will require a Kafka JDBC source connector and a JDBC sink connector. 

For further details, refer this to link from the Confluent course : Kafka Connect 101




Below is the configuration of the data pipeline.

Table name - shuup_txn

Table count - 2.9M records and 17 Columns

Table size - 612 MB 

JDBC Source Connector - shuup_txn_avro_02 with 4 tasks/partitions

JDBC Sink Connector - shuup_txn_avro_jdbc_sink_02

Table Format

JDBC Source Connector

create source connector shuup_txn_avro_02 with (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'tasks.max' = '4',
'connection.url' =
'jdbc:postgresql://xxxxxxxx.xxxxxxxx.com:5532/common',
'connection.user' = 'kafka_id',
'connection.password' = '............',
'query' = 'select * from shuup_txn',
'topic.creation.default.replication.factor' = '1',
'topic.creation.default.partitions' = '4',
'numeric.mapping' = 'best_fit',
'mode' = 'timestamp',
'timestamp.column.name' = 'txn_time',
'value.converter.schemas.enable' = 'false',
'validate.non.null' = 'false',
'topic.prefix' = 'pg_shuup_txn_avro_02');

JDBC Sink Connector

create sink connector shuup_txn_avro_jdbc_sink_02 with (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSinkConnector',
'tasks.max' = '4',
'value.converter.schema.registry.url' = 'http://localhost:8081',
'key.converter.schema.registry.url' = 'http://localhost:8081',
'key.converter' = 'io.confluent.connect.avro.AvroConverter',
'value.converter' = 'io.confluent.connect.avro.AvroConverter',
'connection.url' = 'jdbc:postgresql://xxxxxxxx.xxxxxxxx.com:5532/shuupf',
'topics' = 'pg_shuup_txn_avro_02',
'connection.user' = 'kafka_id',
'connection.password' = '...........',
'insert.mode' = 'INSERT',
'table.name.format' = 'shuup_txn_test',
'pk.mode' = 'none',
'pk.fields' = '',
'auto.create' = 'false',
'auto.evolve' = 'true');

With 4 tasks and 4 partitions configured in the Kafka JDBC connector, it took approximately 100 seconds to synchronize 2.9 million records and 600MB to another table in the database.

Data sync to Kafka topic with 4 partitions


Data sync to new table

New data from the source table will be continuously synchronized to the new table.