Publish and consume from localhost to Streams for Apache Kafka on OCP

Ensure Kafka is running

oc get all
NAME READY STATUS RESTARTS AGE
pod/amq-streams-cluster-operator-v3.0.0-13-654db94d57-fjxrw 1/1 Running 0 5h35m
pod/my-cluster-entity-operator-857556699d-ql69z 2/2 Running 0 4h9m
pod/my-cluster-my-pool-0 1/1 Running 0 30m
pod/my-cluster-my-pool-1 1/1 Running 0 29m
pod/my-cluster-my-pool-2 1/1 Running 0 28m
pod/my-cluster-my-pool-controller-3 1/1 Running 0 4h9m
pod/my-cluster-my-pool-controller-4 1/1 Running 0 4h9m
pod/my-cluster-my-pool-controller-5 1/1 Running 0 4h9m


Configure listener in Kafka cluster

oc edit kafka my-cluster
- name: listener1
port: 9094
tls: true
type: route


Check that the routes have been created

oc get routes


Extract secret and create truststore

oc extract secret/my-cluster-cluster-ca-cert --keys=ca.crt --to=- > ca.crt
keytool -keystore client.truststore.jks -alias CARoot -import -file ca.crt


Get the boostrap server address

public_hostname_and_port_of_the_Kafka_bootstrap_service. Note: The client connects on port 443, the default router port. B But traffic is then routed to the port you configure, which is 9094 in this example.

oc get routes
my-cluster-kafka-listener1-bootstrap \
my-cluster-kafka-listener1-bootstrap-kafka.apps.my-ocp-cluster 9094 passthrough
#the bootstrap server will be:
--bootstrap-server my-cluster-kafka-listener1-bootstrap-kafka.apps.my-ocp-cluster:443


List topics

./bin/kafka-topics.sh --list \
--bootstrap-server my-cluster-kafka-listener1-bootstrap-kafka.apps.my-ocp-cluster:9094


Produce

kafka-console-producer.sh \
--bootstrap-server public_hostname_and_port_of_the_Kafka_bootstrap_service \
--producer-property security.protocol=SSL \
--producer-property ssl.truststore.password=password \
--producer-property ssl.truststore.location=client.truststore.jks \
--topic my-topic
#so with the bootstrap server:
/opt/kafka_2.13-4.0.0.redhat-00010/bin/kafka-console-producer.sh \
--bootstrap-server my-cluster-kafka-listener1-bootstrap-kafka.apps.my-ocp-cluster:443 \
--producer-property security.protocol=SSL \
--producer-property ssl.truststore.password=password \
--producer-property ssl.truststore.location=client.truststore.jks \
--topic my-topic


Consumer

/opt/kafka_2.13-4.0.0.redhat-00010/bin/kafka-console-consumer.sh \
--bootstrap-server my-cluster-kafka-listener1-bootstrap-kafka.apps.my-ocp-cluster:443 \
--consumer-property security.protocol=SSL \
--consumer-property ssl.truststore.password=password \
--consumer-property ssl.truststore.location=client.truststore.jks \
--topic my-topic --from-beginning


Test

#producer
/opt/kafka_2.13-4.0.0.redhat-00010/bin/kafka-console-producer.sh \
--bootstrap-server my-cluster-kafka-listener1-bootstrap-kafka.apps.my-ocp-cluster:443 \
--producer-property security.protocol=SSL --producer-property ssl.truststore.password=password \
--producer-property ssl.truststore.location=client.truststore.jks --topic my-topic
>test
#consumer
/opt/kafka_2.13-4.0.0.redhat-00010/bin/kafka-console-consumer.sh --bootstrap-server \
my-cluster-kafka-listener1-bootstrap-kafka.apps.my-ocp-cluster:443 \
--consumer-property security.protocol=SSL --consumer-property ssl.truststore.password=password \
--consumer-property ssl.truststore.location=client.truststore.jks --topic my-topic --from-beginning
test

Comments