Kafka Connect
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
app: kafka-connect
name: kafka-connect
namespace: python
spec:
replicas: 1
selector:
matchLabels:
app: kafka-connect
strategy: {}
template:
metadata:
labels:
app: kafka-connect
spec:
containers:
- args:
- bash
- -c
- |
echo "Installing connector plugins"
confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.3.3
confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:14.0.10
confluent-hub install --no-prompt confluentinc/csid-secrets-provider-azure:1.0.13
#
echo "Launching Kafka Connect worker"
echo "client-id=xxxxxxxx-xxxx-498e-87f1-xxxxxxxxxxxxx
client-secret=Ygp12~xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" > /home/appuser/connect-secrets.properties
echo ""
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $$(date) " Kafka Connect 8083 listener HTTP state: " $$curl_status " (waiting for 200)"
if [ $$curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo ".............................................."
#
#
sleep infinity
Run the following yaml to deploy the Kafka Connect to EKS
env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: pkc-xxxxx.southeastasia.azure.confluent.cloud:9092
- name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
value: "3"
- name: CONNECT_CONFIG_STORAGE_TOPIC
value: connect-cluster-configs
- name: CONNECT_CONSUMER_REQUEST_TIMEOUT_MS
value: "20000"
- name: CONNECT_CONSUMER_RETRY_BACKOFF_MS
value: "500"
- name: CONNECT_CONSUMER_SASL_JAAS_CONFIG
value: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxxxxxxxxxxx\" password=\".........................\";
- name: CONNECT_CONSUMER_SASL_MECHANISM
value: PLAIN
- name: CONNECT_CONSUMER_SECURITY_PROTOCOL
value: SASL_SSL
- name: CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
value: https
- name: CONNECT_CUB_KAFKA_TIMEOUT
value: "300"
- name: CONNECT_GROUP_ID
value: connect-cluster-group
- name: CONNECT_KEY_CONVERTER
value: io.confluent.connect.avro.AvroConverter
- name: CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE
value: USER_INFO
- name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
value: xxxxxxxxxxxxxxxx:............................................./
- name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
value: https://psrc-xxxxx.southeastasia.azure.confluent.cloud
- name: CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN
value: '[%d] %p %X{connector.context}%m (%c:%L)%n'
- name: CONNECT_LOG4J_LOGGERS
value: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
- name: CONNECT_LOG4J_ROOT_LOGLEVEL
value: INFO
- name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
value: "3"
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: connect-cluster-offsets
- name: CONNECT_PLUGIN_PATH
value: /usr/share/java,/usr/share/confluent-hub-components/
- name: CONNECT_PRODUCER_REQUEST_TIMEOUT_MS
value: "20000"
- name: CONNECT_PRODUCER_RETRY_BACKOFF_MS
value: "500"
- name: CONNECT_PRODUCER_SASL_JAAS_CONFIG
value: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxxxxxxxxxxx\" password=\".........................\";
- name: CONNECT_PRODUCER_SASL_MECHANISM
value: PLAIN
- name: CONNECT_PRODUCER_SECURITY_PROTOCOL
value: SASL_SSL
- name: CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
value: https
- name: CONNECT_REQUEST_TIMEOUT_MS
value: "20000"
- name: CONNECT_REST_ADVERTISED_HOST_NAME
value: kafka-connect-ccloud
- name: CONNECT_REST_PORT
value: "8083"
- name: CONNECT_RETRY_BACKOFF_MS
value: "500"
- name: CONNECT_SASL_JAAS_CONFIG
value: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxxxxxxxxxxx\" password=\".........................\";
- name: CONNECT_SASL_MECHANISM
value: PLAIN
- name: CONNECT_SECURITY_PROTOCOL
value: SASL_SSL
- name: CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
value: https
- name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
value: "3"
- name: CONNECT_STATUS_STORAGE_TOPIC
value: connect-cluster-status
- name: CONNECT_VALUE_CONVERTER
value: io.confluent.connect.avro.AvroConverter
- name: CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE
value: USER_INFO
- name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
value: xxxxxxxxxxxxxxxx:............................................./
- name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
value: https://psrc-xxxxx.southeastasia.azure.confluent.cloud image: confluentinc/cp-kafka-connect:7.4.1
name: kafka-connect
ports:
- containerPort: 8083
resources: {}
restartPolicy: Alway
status: {}
Execute the following command to deploy to EKS
kubectl apply -f kafka-connect-confluent-deploy.yaml

The Kafka Connect instance is running successfully in EKS

To create a JDBC Source Kafka connector, execute the following curl commands. The Kafka Connect instance is running successfully in EKS
The Postgres DB host name and credentials will be retrieved from Azure Key Vault.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" \
http://www.xxxxxx.com/connectors/shuup_last_login_eks_test_03/config \
-d '{
"config.providers": "file",
"config.providers.file.class": "org.apache.kafka.common.config.provider.FileConfigProvider",
"config.providers": "keyVault",
"config.providers.keyVault.class": "io.confluent.csid.config.provider.azure.KeyVaultConfigProvider",
"config.providers.keyVault.param.vault.url": "https://xxxxxxxxx.vault.azure.net/",
"config.providers.keyVault.param.credential.type": "ClientSecret",
"config.providers.keyVault.param.retry.count": 1,
"config.providers.keyVault.param.thread.count": 1,
"config.providers.keyVault.param.timeout.seconds": 10,
"config.providers.keyVault.param.client.secret": "${file:/home/appuser/connect-secrets.properties:client-secret}",
"config.providers.keyVault.param.tenant.id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"config.providers.keyVault.param.client.id": "${file:/home/appuser/connect-secrets.properties:client-id}",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"auto.create.topics.enable": "true",
"topic.creation.default.partitions": 1,
"topic.creation.default.replication.factor": 3,
"tasks.max": "1",
"connection.url": "jdbc:postgresql://${keyVault:kafka-connect:servername}/shuupf",
"connection.user": "${keyVault:dbidkafka:username}",
"connection.password": "${keyVault:dbidkafka:password}",
"query": "select * from get_encrypt_shuup_users('\''${keyVault:kafka-connect:encrypt_key}'\'')",
"numeric.mapping": "best_fit",
"mode": "timestamp",
"timestamp.column.name": "last_login",
"validate.non.null": "false",
"topic.prefix": "eks_shuup_last_login_test_03"
}'
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" \
http://www.xxxxxx.com/connectors/shuup_txn_json/config \
-d '{
"config.providers": "file",
"config.providers.file.class": "org.apache.kafka.common.config.provider.FileConfigProvider",
"config.providers": "keyVault",
"config.providers.keyVault.class": "io.confluent.csid.config.provider.azure.KeyVaultConfigProvider",
"config.providers.keyVault.param.vault.url": "https://xxxxxxxxx.vault.azure.net/",
"config.providers.keyVault.param.credential.type": "ClientSecret",
"config.providers.keyVault.param.retry.count": 1,
"config.providers.keyVault.param.thread.count": 1,
"config.providers.keyVault.param.timeout.seconds": 10,
"config.providers.keyVault.param.client.secret": "${file:/home/appuser/connect-secrets.properties:client-secret}",
"config.providers.keyVault.param.tenant.id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
"config.providers.keyVault.param.client.id": "${file:/home/appuser/connect-secrets.properties:client-id}",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"value.converter.schemas.enable": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"auto.create.topics.enable": "true",
"topic.creation.default.partitions": 1,
"topic.creation.default.replication.factor": 3,
"tasks.max": "1",
"connection.url": "jdbc:postgresql://${keyVault:kafka-connect:servername}/shuupf",
"connection.user": "${keyVault:dbidkafka:username}",
"connection.password": "${keyVault:dbidkafka:password}",
"query": "select * from shuup_txn",
"numeric.mapping": "best_fit",
"mode": "timestamp",
"timestamp.column.name": "txn_time",
"validate.non.null": "false",
"topic.prefix": "eks_shuup_txn_json"
}'
Run the following curl command to check kafka connector deployment status
curl -s "http://www.xxxxxx.com/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort

Kafka Connect and 2 JDBC Source Connectors are running. Let's go the Confluent portal check the Topics

Customer login data is flowing into the Kafka Topic, where sensitive information is encrypted.

