Kafka Connect

apiVersion: apps/v1
kind: Deployment
metadata:
 labels:
    app: kafka-connect
  name: kafka-connect
  namespace: python
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-connect
  strategy: {}
  template:
    metadata:
      labels:
        app: kafka-connect
    spec:
      containers:
        - args:
            - bash
            - -c
            - |
              echo "Installing connector plugins"
              confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.3.3
              confluent-hub install --no-prompt confluentinc/kafka-connect-elasticsearch:14.0.10
              confluent-hub install --no-prompt confluentinc/csid-secrets-provider-azure:1.0.13
              #
              echo "Launching Kafka Connect worker"
              echo "client-id=xxxxxxxx-xxxx-498e-87f1-xxxxxxxxxxxxx
              client-secret=Ygp12~xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" > /home/appuser/connect-secrets.properties
              echo ""
              /etc/confluent/docker/run &
              #
              echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
              while : ; do
                  curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
                  echo -e $$(date) " Kafka Connect 8083 listener HTTP state: " $$curl_status " (waiting for 200)"
                  if [ $$curl_status -eq 200 ] ; then
                  break
                  fi
                  sleep 5
              done
              echo ".............................................."
              #
              #
              sleep infinity

Run the following yaml to deploy the Kafka Connect to EKS

          env:            
​- name: CONNECT_BOOTSTRAP_SERVERS
              value: pkc-xxxxx.southeastasia.azure.confluent.cloud:9092
            - name: CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR
              value: "3"
            - name: CONNECT_CONFIG_STORAGE_TOPIC
              value: connect-cluster-configs
            - name: CONNECT_CONSUMER_REQUEST_TIMEOUT_MS
              value: "20000"
            - name: CONNECT_CONSUMER_RETRY_BACKOFF_MS
              value: "500"
            - name: CONNECT_CONSUMER_SASL_JAAS_CONFIG
              value: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxxxxxxxxxxx\" password=\".........................\";
            - name: CONNECT_CONSUMER_SASL_MECHANISM
              value: PLAIN
            - name: CONNECT_CONSUMER_SECURITY_PROTOCOL
              value: SASL_SSL
            - name: CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
              value: https
            - name: CONNECT_CUB_KAFKA_TIMEOUT
              value: "300"
            - name: CONNECT_GROUP_ID
              value: connect-cluster-group
            - name: CONNECT_KEY_CONVERTER
              value: io.confluent.connect.avro.AvroConverter
            - name: CONNECT_KEY_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE
              value: USER_INFO
            - name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
              value: xxxxxxxxxxxxxxxx:............................................./
            - name: CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL
              value: https://psrc-xxxxx.southeastasia.azure.confluent.cloud
            - name: CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN
              value: '[%d] %p %X{connector.context}%m (%c:%L)%n'
            - name: CONNECT_LOG4J_LOGGERS
              value: org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR
            - name: CONNECT_LOG4J_ROOT_LOGLEVEL
              value: INFO
            - name: CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR
              value: "3"
            - name: CONNECT_OFFSET_STORAGE_TOPIC
              value: connect-cluster-offsets
            - name: CONNECT_PLUGIN_PATH
              value: /usr/share/java,/usr/share/confluent-hub-components/
            - name: CONNECT_PRODUCER_REQUEST_TIMEOUT_MS
              value: "20000"
            - name: CONNECT_PRODUCER_RETRY_BACKOFF_MS
              value: "500"
            - name: CONNECT_PRODUCER_SASL_JAAS_CONFIG
              value: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxxxxxxxxxxx\" password=\".........................\";
            - name: CONNECT_PRODUCER_SASL_MECHANISM
              value: PLAIN
            - name: CONNECT_PRODUCER_SECURITY_PROTOCOL
              value: SASL_SSL
            - name: CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
              value: https
            - name: CONNECT_REQUEST_TIMEOUT_MS
              value: "20000"
            - name: CONNECT_REST_ADVERTISED_HOST_NAME
              value: kafka-connect-ccloud
            - name: CONNECT_REST_PORT
              value: "8083"
            - name: CONNECT_RETRY_BACKOFF_MS
              value: "500"
            - name: CONNECT_SASL_JAAS_CONFIG
              value: org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xxxxxxxxxxxxxxxx\" password=\".........................\";
            - name: CONNECT_SASL_MECHANISM
              value: PLAIN
            - name: CONNECT_SECURITY_PROTOCOL
              value: SASL_SSL
            - name: CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
              value: https
            - name: CONNECT_STATUS_STORAGE_REPLICATION_FACTOR
              value: "3"
            - name: CONNECT_STATUS_STORAGE_TOPIC
              value: connect-cluster-status
            - name: CONNECT_VALUE_CONVERTER
              value: io.confluent.connect.avro.AvroConverter
            - name: CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE
              value: USER_INFO
            - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO
              value: xxxxxxxxxxxxxxxx:............................................./
            - name: CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL
              value: https://psrc-xxxxx.southeastasia.azure.confluent.cloud           image: confluentinc/cp-kafka-connect:7.4.1
          name: kafka-connect
          ports:
            - containerPort: 8083
          resources: {}
      restartPolicy: Alway
status: {}

Execute the following command to deploy to EKS 
kubectl apply -f kafka-connect-confluent-deploy.yaml

The Kafka Connect instance is running successfully in EKS

To create a JDBC Source Kafka connector, execute the following curl commands. The Kafka Connect instance is running successfully in EKS
The Postgres DB host name and credentials will be retrieved from Azure Key Vault.

curl -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" \
        http://www.xxxxxx.com/connectors/shuup_last_login_eks_test_03/config \
        -d '{
        "config.providers": "file",
        "config.providers.file.class": "org.apache.kafka.common.config.provider.FileConfigProvider",
        "config.providers": "keyVault",
        "config.providers.keyVault.class": "io.confluent.csid.config.provider.azure.KeyVaultConfigProvider",
        "config.providers.keyVault.param.vault.url": "https://xxxxxxxxx.vault.azure.net/",
        "config.providers.keyVault.param.credential.type": "ClientSecret",
        "config.providers.keyVault.param.retry.count": 1,
        "config.providers.keyVault.param.thread.count": 1,
        "config.providers.keyVault.param.timeout.seconds": 10,
        "config.providers.keyVault.param.client.secret": "${file:/home/appuser/connect-secrets.properties:client-secret}",
        "config.providers.keyVault.param.tenant.id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
        "config.providers.keyVault.param.client.id": "${file:/home/appuser/connect-secrets.properties:client-id}",
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "value.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "auto.create.topics.enable": "true",
        "topic.creation.default.partitions": 1,
        "topic.creation.default.replication.factor": 3,
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://${keyVault:kafka-connect:servername}/shuupf",
        "connection.user": "${keyVault:dbidkafka:username}",
        "connection.password": "${keyVault:dbidkafka:password}",
        "query": "select * from get_encrypt_shuup_users('\''${keyVault:kafka-connect:encrypt_key}'\'')",
        "numeric.mapping": "best_fit",
        "mode": "timestamp",
        "timestamp.column.name": "last_login",
        "validate.non.null": "false",
        "topic.prefix": "eks_shuup_last_login_test_03"
        }'

curl -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" \
        http://www.xxxxxx.com/connectors/shuup_txn_json/config \
        -d '{
        "config.providers": "file",
        "config.providers.file.class": "org.apache.kafka.common.config.provider.FileConfigProvider",
        "config.providers": "keyVault",
        "config.providers.keyVault.class": "io.confluent.csid.config.provider.azure.KeyVaultConfigProvider",
        "config.providers.keyVault.param.vault.url": "https://xxxxxxxxx.vault.azure.net/",
        "config.providers.keyVault.param.credential.type": "ClientSecret",
        "config.providers.keyVault.param.retry.count": 1,
        "config.providers.keyVault.param.thread.count": 1,
        "config.providers.keyVault.param.timeout.seconds": 10,
        "config.providers.keyVault.param.client.secret": "${file:/home/appuser/connect-secrets.properties:client-secret}",
        "config.providers.keyVault.param.tenant.id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
        "config.providers.keyVault.param.client.id": "${file:/home/appuser/connect-secrets.properties:client-id}",
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "value.converter.schemas.enable": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "auto.create.topics.enable": "true",
        "topic.creation.default.partitions": 1,
        "topic.creation.default.replication.factor": 3,
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://${keyVault:kafka-connect:servername}/shuupf",
        "connection.user": "${keyVault:dbidkafka:username}",
        "connection.password": "${keyVault:dbidkafka:password}",
        "query": "select * from shuup_txn",
        "numeric.mapping": "best_fit",
        "mode": "timestamp",
        "timestamp.column.name": "txn_time",
        "validate.non.null": "false",
        "topic.prefix": "eks_shuup_txn_json"
        }'

Run the following curl command to check kafka connector deployment status

curl -s "http://www.xxxxxx.com/connectors?expand=info&expand=status" | \

       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \

       column -s : -t| sed 's/\"//g'| sort

Kafka Connect and 2 JDBC Source Connectors are running. Let's go the Confluent portal check the Topics 

Customer login data is flowing into the Kafka Topic, where sensitive information is encrypted.