Mqtt-Based Data Transfer With Kafka Cloud

Through this article, I would like to list down the detailed steps for communicating from a device to a server in the cloud running a scalable Kafka infrastructure and back from Kafka to the device using Mosquitto broker and Kafka connect.

Architecture Diagram

Device to cloud connectivity using mqtt-mosquitto broker and kafka connect :

Step 1:

Download and install mosquito for windows from : https://mosquitto.org/download/

Step 2:

Navigate to D:\Program Files\mosquitto\mosquitto.conf and modify the following

# =================================================================

# Extra listeners

# =================================================================

# Listen on a port/ip address combination. By using this variable

# multiple times, mosquitto can listen on more than one port. If

# this variable is used and neither bind_address nor port given,

# Note that for a websockets listener it is not possible to bind to a host

# name.

# listener port-number [ip address/host name]

listener 1883 0.0.0.0

Step 3:

Run the command

D:\Program Files\mosquitto>mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic” -m “This request is coming from LOCALLAPTOP007 to Avengers”

D:\Program Files\mosquitto>mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic”

Step 4:

Navigate to the url : https://howtoprogram.xyz/2016/04/30/getting-started-apache-kafka-0-9/ and install kafka on the server avengers@12.XX.XXX.XXX and test the producer consumer message interchange using the following commands

./kafka-topics.sh –list -zookeeper 12.XX.XXX.XXX:2181

bin/kafka-console-consumer.sh –bootstrap-server 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic

bin/kafka-console-producer.sh –broker-list 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic

Step 5:

modify /etc/hosts and add the entries as below

10.XXX.XX.XX avengers.eastus.cloudapp.azure.com

#Above is the private ip of the cloud machine

12.XX.XXX.XXX avengers.eastus.cloudapp.azure.com

#Above is the public ip of the cloud machine

Step 6:

Install mosquito on the cloud (just for testing) using the instructions at the url : https://www.disk91.com/2016/technology/internet-of-things-technology/install-mosquitto-mqtt-server-on-centos-to-publish-iot-data/

yum install mosquitto

# service mosquitto start
# systemctl enable mosquitto

Step 7:

Check if port 1883 and 9092 are open in cloud for inbound traffic using

netstat | grep 1883

netstat | grep 9092

If the ports are not open , open them for inbound traffic in Azure also run the following commands on server to open them

firewall-cmd –zone=public –add-port=1883/tcp –permanent

firewall-cmd –reload

iptables-save | grep 1883

Step 8:

Navigate to /etc/mosquitto/mosquitto.conf and modify the following

# =================================================================

# Extra listeners

# =================================================================

# Listen on a port/ip address combination. By using this variable

# multiple times, mosquitto can listen on more than one port. If

# this variable is used and neither bind_address nor port given,

# then the default listener will not be started.

# The port number to listen on must be given. Optionally, an ip

# address or host name may be supplied as a second argument. In

# this case, mosquitto will attempt to bind the listener to that

# address and so restrict access to the associated network and

# interface. By default, mosquitto will listen on all interfaces.

# Note that for a websockets listener it is not possible to bind to a host

# name.

# listener port-number [ip address/host name]

listener 1883 0.0.0.0

Step 9:

Test publish and subscribe between the mosquito broker using the following commands

mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"

Step 10:

Navigate to https://howtoprogram.xyz/wp-content/uploads/2016/07/kafka-mqtt-bin.zip

Unzip and copy the following files

  • kafka-connect-mqtt-1.0-SNAPSHOT.jar
  • org.eclipse.paho.client.mqttv3-1.0.2.jar

to

/ opt/kafka_2.12-2.3.0/libs

Step 11:

Navigate to /opt/kafka_2.12-2.3.0/config and vi server.properties and just make the following change

advertised.listeners=PLAINTEXT://12.XX.XXX.XXX:9092
 
Navigate to /opt/kafka_2.12-2.3.0/config and vi connect-standalone.properties and just make the following change

bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092

Navigate to /opt/kafka_2.12-2.3.0/config and vi mqtt.properties and ensure the file has following entries

name=mqtt

connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector

tasks.max=1

kafka.topic=mqtt-mosquitto-topic-kafka

mqtt.client_id=mqtt-kafka-123456789

mqtt.clean_session=true

mqtt.connection_timeout=30

mqtt.keep_alive_interval=60

mqtt.server_uris=tcp://avengers.eastus.cloudapp.azure.com:1883

mqtt.topic=mqtt-mosquitto-topic

Step 12:

cd /opt/kafka_2.12-2.3.0

Run all the following on separate putty windows

./bin/zookeeper-server-start.sh config/zookeeper.properties &

./bin/kafka-server-start.sh config/server.properties &

./bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties

bin/kafka-console-consumer.sh –bootstrap-server avengers.eastus.cloudapp.azure.com:9092 –topic mqtt-mosquitto-topic-kafka

Run the following first from within server itself and then from windows local laptop

mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic” -m “This request is coming from LOCALLAPTOP007 to Avengers”

Ensure that the message sent from mosquito publisher on local laptop arrives in base64 encoded format on the server kafka consumer.

Cloud to device connectivity using mqtt-mosquitto broker and kafka connect :

Step 1:

Download the kafka stream connector library and copy the jar to /home/plugins folder :

wget https://github.com/Landoop/stream-reactor/releases/download/1.2.2/kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz

tar -xf kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz

cp kafka-connect-mqtt-1.2.2-2.1.0-all.jar /home/plugins

Step 2:

Create a new connect.properties file  under config folder as below:

[root@avengers config]# pwd

/opt/kafka_2.12-2.3.0/config

[root@avengers config]# cat connect.properties

# Kafka broker IP addresses to connect to

bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092

# Path to directory containing the connector jar

plugin.path=/root/plugins

# Converters to use to convert keys and values

key.converter=org.apache.kafka.connect.json.JsonConverter

#key.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.json.JsonConverter

#value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter.schemas.enable=false

# The internal converters Kafka Connect uses for storing offset and configuration data

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

#internal.key.converter=org.apache.kafka.connect.storage.StringConverter

#internal.value.converter=org.apache.kafka.connect.storage.StringConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

[root@avengers config]#

Step 3:

Create a new kafka topic called mqtt-sink using the instructions below :

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic mqtt-sink

bin/kafka-topics.sh –list -zookeeper localhost:2181

Step 4:

Create a new mqtt-sink.properties file  under config folder as below:

[root@avengers config]# pwd

/opt/kafka_2.12-2.3.0/config

[root@avengers config]# cat mqtt-sink.properties

name=mqtt-sink

connector.class=com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector

tasks.max=1

topics=mqtt-sink

connect.mqtt.hosts=tcp://avengers.eastus.cloudapp.azure.com:1883

connect.mqtt.clean=true

connect.mqtt.timeout=1000

connect.mqtt.keep.alive=1000

connect.mqtt.service.quality=1

connect.mqtt.kcql=INSERT INTO /lttsspskafka/test SELECT * FROM mqtt-sink

WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter

connect.progress.enabled=true

[root@avengers config]#

Step 5:

cd /opt/kafka_2.12-2.3.0

Run all the following on separate putty windows

./bin/zookeeper-server-start.sh config/zookeeper.properties &

./bin/kafka-server-start.sh config/server.properties &

./bin/connect-standalone.sh config/connect.properties config/mqtt-sink.properties

bin/kafka-console-producer.sh –broker-list avengers.eastus.cloudapp.azure.com:9092 -topic mqtt-sink

Sent a test json message like

{“id”:3,”temp”:21.9,”timestamp”:1530511201,”Note”:”This message going from LTTS Avengers server to the device LOCALLAPTOP007″}

Run the following first from within server itself and then from windows local laptop

mosquitto_sub -h avengers.eastus.cloudapp.azure.com  -t “/lttsspskafka/test” -q 1

Verify if the message sent from server is received by the laptop

References:

https://mosquitto.org/download/

https://mosquitto.org (test.mosquitto.org )

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: