Through this article, I would like to list down the detailed steps for communicating from a device to a server in the cloud running a scalable Kafka infrastructure and back from Kafka to the device using Mosquitto broker and Kafka connect.

Device to cloud connectivity using mqtt-mosquitto broker and kafka connect :
Step 1:
Download and install mosquito for windows from : https://mosquitto.org/download/
Step 2:
Navigate to D:\Program Files\mosquitto\mosquitto.conf and modify the following
# =================================================================
# Extra listeners
# =================================================================
# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
listener 1883 0.0.0.0
Step 3:
Run the command
D:\Program Files\mosquitto>mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic” -m “This request is coming from LOCALLAPTOP007 to Avengers”
D:\Program Files\mosquitto>mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic”
Step 4:
Navigate to the url : https://howtoprogram.xyz/2016/04/30/getting-started-apache-kafka-0-9/ and install kafka on the server avengers@12.XX.XXX.XXX and test the producer consumer message interchange using the following commands
./kafka-topics.sh –list -zookeeper 12.XX.XXX.XXX:2181
bin/kafka-console-consumer.sh –bootstrap-server 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic
bin/kafka-console-producer.sh –broker-list 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic
Step 5:
modify /etc/hosts and add the entries as below
10.XXX.XX.XX avengers.eastus.cloudapp.azure.com
#Above is the private ip of the cloud machine
12.XX.XXX.XXX avengers.eastus.cloudapp.azure.com
#Above is the public ip of the cloud machine
Step 6:
Install mosquito on the cloud (just for testing) using the instructions at the url : https://www.disk91.com/2016/technology/internet-of-things-technology/install-mosquitto-mqtt-server-on-centos-to-publish-iot-data/
yum install mosquitto
# service mosquitto start
# systemctl enable mosquitto
Step 7:
Check if port 1883 and 9092 are open in cloud for inbound traffic using
netstat | grep 1883
netstat | grep 9092
If the ports are not open , open them for inbound traffic in Azure also run the following commands on server to open them
firewall-cmd –zone=public –add-port=1883/tcp –permanent
firewall-cmd –reload
iptables-save | grep 1883
Step 8:
Navigate to /etc/mosquitto/mosquitto.conf and modify the following
# =================================================================
# Extra listeners
# =================================================================
# Listen on a port/ip address combination. By using this variable
# multiple times, mosquitto can listen on more than one port. If
# this variable is used and neither bind_address nor port given,
# then the default listener will not be started.
# The port number to listen on must be given. Optionally, an ip
# address or host name may be supplied as a second argument. In
# this case, mosquitto will attempt to bind the listener to that
# address and so restrict access to the associated network and
# interface. By default, mosquitto will listen on all interfaces.
# Note that for a websockets listener it is not possible to bind to a host
# name.
# listener port-number [ip address/host name]
listener 1883 0.0.0.0
Step 9:
Test publish and subscribe between the mosquito broker using the following commands
mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"
Step 10:
Navigate to https://howtoprogram.xyz/wp-content/uploads/2016/07/kafka-mqtt-bin.zip
Unzip and copy the following files
- kafka-connect-mqtt-1.0-SNAPSHOT.jar
- org.eclipse.paho.client.mqttv3-1.0.2.jar
to
/ opt/kafka_2.12-2.3.0/libs
Step 11:
Navigate to /opt/kafka_2.12-2.3.0/config and vi server.properties and just make the following change
advertised.listeners=PLAINTEXT://12.XX.XXX.XXX:9092
Navigate to /opt/kafka_2.12-2.3.0/config and vi connect-standalone.properties and just make the following change
bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092
Navigate to /opt/kafka_2.12-2.3.0/config and vi mqtt.properties and ensure the file has following entries
name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
kafka.topic=mqtt-mosquitto-topic-kafka
mqtt.client_id=mqtt-kafka-123456789
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
mqtt.server_uris=tcp://avengers.eastus.cloudapp.azure.com:1883
mqtt.topic=mqtt-mosquitto-topic
Step 12:
cd /opt/kafka_2.12-2.3.0
Run all the following on separate putty windows
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
./bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties
bin/kafka-console-consumer.sh –bootstrap-server avengers.eastus.cloudapp.azure.com:9092 –topic mqtt-mosquitto-topic-kafka
Run the following first from within server itself and then from windows local laptop
mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic” -m “This request is coming from LOCALLAPTOP007 to Avengers”
Ensure that the message sent from mosquito
publisher on local laptop arrives in base64 encoded format on the server kafka
consumer.
Cloud to device connectivity using mqtt-mosquitto broker and kafka connect :
Step 1:
Download the kafka stream connector library and copy the jar to /home/plugins folder :
tar -xf kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz
cp kafka-connect-mqtt-1.2.2-2.1.0-all.jar /home/plugins
Step 2:
Create a new connect.properties file under config folder as below:
[root@avengers config]# pwd
/opt/kafka_2.12-2.3.0/config
[root@avengers config]# cat connect.properties
# Kafka broker IP addresses to connect to
bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092
# Path to directory containing the connector jar
plugin.path=/root/plugins
# Converters to use to convert keys and values
key.converter=org.apache.kafka.connect.json.JsonConverter
#key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schemas.enable=false
# The internal converters Kafka Connect uses for storing offset and configuration data
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
#internal.key.converter=org.apache.kafka.connect.storage.StringConverter
#internal.value.converter=org.apache.kafka.connect.storage.StringConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
[root@avengers config]#
Step 3:
Create a new kafka topic called mqtt-sink using the instructions below :
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic mqtt-sink
bin/kafka-topics.sh –list -zookeeper localhost:2181
Step 4:
Create a new mqtt-sink.properties file under config folder as below:
[root@avengers config]# pwd
/opt/kafka_2.12-2.3.0/config
[root@avengers config]# cat mqtt-sink.properties
name=mqtt-sink
connector.class=com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector
tasks.max=1
topics=mqtt-sink
connect.mqtt.hosts=tcp://avengers.eastus.cloudapp.azure.com:1883
connect.mqtt.clean=true
connect.mqtt.timeout=1000
connect.mqtt.keep.alive=1000
connect.mqtt.service.quality=1
connect.mqtt.kcql=INSERT INTO /lttsspskafka/test SELECT * FROM mqtt-sink
WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter
connect.progress.enabled=true
[root@avengers config]#
Step 5:
cd /opt/kafka_2.12-2.3.0
Run all the following on separate putty windows
./bin/zookeeper-server-start.sh config/zookeeper.properties &
./bin/kafka-server-start.sh config/server.properties &
./bin/connect-standalone.sh config/connect.properties config/mqtt-sink.properties
bin/kafka-console-producer.sh –broker-list avengers.eastus.cloudapp.azure.com:9092 -topic mqtt-sink
Sent a test json message like
{“id”:3,”temp”:21.9,”timestamp”:1530511201,”Note”:”This message going from LTTS Avengers server to the device LOCALLAPTOP007″}
Run the following first from within server itself and then from windows local laptop
mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t “/lttsspskafka/test” -q 1
Verify if the message sent from server is
received by the laptop