Autoscaling a Kubernetes cluster using Horizontal Pod AutoScaler (HPA)

In the blog I shall be describing the step by step process to setup a Kubernetes cluster using minikube to run a node js based application in Azure cloud and autoscaling the application using Horizontal Pod AutoScaler.

  • Spin up a Standard D4s v3 (4 vcpus, 16 GiB memory) Virtual machine in Azure based on Linux (ubuntu 18.04)
  • Putty into the server and sudo su -. Execute all following commands as root.
sudo apt-get update
sudo apt-get install -y apt-transport-https
sudo apt-get install -y virtualbox virtualbox-ext-pack
  • Install kubectl
curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add –

sudo touch /etc/apt/sources.list.d/kubernetes.list

echo "deb http://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee -a /etc/apt/sources.list.d/kubernetes.list

sudo apt-get update

sudo apt-get install -y kubectl
  • Install minikube
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-latest.x86_64.rpm

sudo rpm -ivh minikube-latest.x86_64.rpm


chmod +x minikube && sudo mv minikube /usr/local/bin/

minikube start
  • Run the following commands to ensure proper installation.
kubectl api-versions
       
kubectl get services
  • Create a new node js application
apt-get install npm

npm init  // this create a new package.json file in the directory
  • Create an index.js file with the following code.
const http = require(‘http’);
const port = process.env.PORT || 3000;
const server = http.createServer((req, res) => {
         res.statusCode = 200;
         res.setHeader(‘Content-Type’, ‘text/plain’);
        res.end(‘Hello World\n’);
});server.listen(port, () => {
 console.log(‘Server running on port: ${port}’);
});
  • Create a Dockerfile in the same directory with the following content
FROM node:alpine

RUN mkdir -p /usr/src/app

WORKDIR /usr/src/app

ADD index.js ./

ADD package.json ./

RUN npm install

CMD ["npm", "start"]

Sudo apt-get install docker.io
docker login

docker build -t rtdhaccount/node-application .

docker push rtdhaccount/node-application
  • Deploy to kubernetes
    • Create a folder called k8s
    • Create deployment.yml inside k8s with the following content
apiVersion: extensions/v1
kind: Deployment
metadata:
  name: node-application-deployment
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: node-application
    spec:
      containers:
      - name: node-application
        image: rtdhaccount/node-application
        imagePullPolicy: Always
        ports:
        - containerPort: 3000
  • Create service.yml inside k8s with the following content
apiVersion: v1
kind: Service
metadata:
  name: node-application
  labels:
    app: node-application
spec:
  selector:
    app: node-application
  ports:
  - port: 3000
    protocol: TCP
    nodePort: 30001
  type: LoadBalancer
  • Verify the service using the following commands
kubectl apply -f k8s
kubectl get services

kubectl get deployments
kubectl get pods
  • Access Minikube dashboard. To access the minikube dashboard. Open a new putty window and run
sudo kubectl proxy --address='0.0.0.0' --disable-filter=true

open a new browser window and enter the url below

http://xx.yy.zz.ww:8001/api/v1/namespaces/kube-system/services/kubernetes-dashboard/proxy/#!/deployment?namespace=default

where xx.yy.zz.ww is the public ipaddress of your VM.

Ensure that port 8001 is open to inbound traffic in the VM

Incase you get the error :

Unable to start VM: Error getting state for host: machine does not exist

minikube delete
minikube start
  • Accessing the node application.Open a new putty window and setup port forwarding like below
sudo ssh -i ~/.minikube/machines/minikube/id_rsa docker@$(minikube ip) -L \*:30001:0.0.0.0:30001

And the access your application from the browser window using the url

http://xx.yy.zz.ww:30001/

Ensure that port 30001 is open to inbound traffic in the VM

Incase of any errors in port forwarding

ssh-keygen -f "/root/.ssh/known_hosts" -R "192.168.99.100"
  • Scaling the cluster using Horizontal pod autoscaler. Create a new file hpa.yml under the k8s folder with the following content
apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
 annotations:
 name: node-application
 namespace: default
spec:
 maxReplicas: 5
 minReplicas: 1
 scaleTargetRef:
  apiVersion: extensions/v1
  kind: Deployment
  name: node-application
 targetCPUUtilizationPercentage: 1

With targetCPUUtilizationPercentage option, we are saying that, Once the cpu load inside observed CPU more than 1%, scale this pod.

kubectl apply -f k8s/hpa.yml
  • Deploy the metrics-server
kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.3.6/components.yaml

kubectl get deployment metrics-server -n kube-system

minikube addons list

minikube addons enable metrics-server

kubectl get hpa
  • Testing using apache benchmark : Install apache benchmark using the below command
apt-get install apache2-utils

ab -c 500 -n 100000 -t 100000 http://xx.yy.zz.ww:30001/
  • Observe the pods scaling up using the commands
kubectl get pods

kubectl get hpa

References :

https://matthewpalmer.net/kubernetes-app-developer/articles/install-kubernetes-ubuntu-tutorial.html

https://blog.kloia.com/deploy-auto-scalable-node-js-application-on-kubernetes-cluster-part-1-f40e622f2337

Augmented Reality – Healthcare Use Case

This blog demonstrates creating augmented reality application using Unity game engine and Vuforia.

Augmented reality (AR) can be defined as the overlaying of digital content (images, video, text, sound, etc.) onto physical objects or locations, and it is typically experienced by looking through the camera lens of an electronic device such as a smartphone, tablet, or specialized viewing devices like google glass or Microsoft hololens.

Objective

The purpose of this blog will be to help a patient taking prescription medications to view the pharmaceutical details of the medication under a mobile phone by just scanning the prescription label on the medication bottle.

Examples are shown below. They might not be medically accurate but for the purpose of this case study have just been picked up as images.

When the bottle with the label (We shall call this our Image Target) shown below is scanned using a laptop with a camera, the medication literature as shown below is displayed to the user for reading (We shall refer to this as our Image Overlay).

Fig 1 : Image Target
Fig 2 : Image Overlay

Download and Installation

Setting up Unity

  1. Navigate to the GameObject dropdown menu and select “Vuforia > AR Camera.” If a dialog box appears requesting that you import additional assets, select “Import.”
  2. Select “Vuforia > Image” in the GameObject dropdown menu to add an Image Target to your scene.
  3. Locate the directional widget in the top right corner of your scene panel. Click the green y-axis option of your directional widget. This will make the green arrow disappear and shift your perspective in the scene panel to a y-axis birds-eye view of your project.

Go to the Vuforia developer portal to create your free Vuforia account and generate a development license key for your application.

  1. Select “Register” in the upper right corner.
  2. Once your account is setup, select “Develop” in the menu panel and then “License Manager.”
  3. Select “Get Development Key” on the License Manager page.
  4. On the “Add a free Development License Key” page, give your application a name and agree to the terms.
  5. To access your license key, return to the “License Manager” page in the Vuforia developer portal and click the application name you just created. Copy the alphanumeric string on the right.
  6. Return to Unity and File > Build Setting and click the “Player Settings” button in the bottom of the pop up window.
  7. Navigate back to the Inspector panel and click the “XR Settings” option located at the bottom of the accordion menu. Select “Vuforia Augmented Reality Support.” Accept the Vuforia license in Unity.
  8. Select ARCamera in the Hierarchy pane. In the inspector panel, click the “Open Vuforia Configuration” button in the Vuforia Behavious (Script)” component. Copy and paste your license key in the App License Key text field.
  9. Navigate to “File > Save Scene as…” and give your scene a name. By default, Unity will save your scenes to the “Assets” folder of your project.

Convert your Image Target to a Dataset

Upload the medication bottle image shown above to Vuforia inorder to code it with the tracking information.

  1. Go to the Vuforia developer portal and click “Develop” in the top menu and then select “Target Manager.”
  2. Click the “Add Database” button on the Target Manager page. In the dialog box that appears, give your database a name and select the device option. Click “Create.”
  3. Once your database has been created, return to the Target Manager page and click your database. Select “Add Target,” choose “Single Image” as the type, add the width and height of your image, and upload by clicking on the “Browse” button.

To determine the width of your image, right-click the image file and choose Properties. Select the Details tab in the dialog box that appears and scroll down to the “Image” section to find the width of the image.

Vuforia assigns target images a feature tracking rating on a scale of 1 to 5 stars. Navigate to the Image Targets database in the Vuforia Developer Portal and select the image you just uploaded. click the Show Features link to reveal the image’s trackable features.

The yellow markers represent the areas in the image that your application will use to determine if it is looking at the proper image target. If you notice that Vuforia is tracking unstable elements in your image (e.g. shadow, person, etc.), then you will need to re-edit or choose a different image.

If your image is given a good augmentability rating (anything between 3-5 stars should work), access the Target Manager page, then select your image and click “Download Database.”

Clicking download database will create a package file like the one shown below. Doubleclick and Import it

Import the Image Target

Select the ImageTarget GameObject in the project Hierarchy panel. Navigate to the inspector panel and select the drop-down menu of the Database parameter in the Image Target Behaviour. Select the name of the database you created earlier. The ImageTarget game object should now be associated with your book cover.

Add an Image Overlay

You will need to add an overlay to test if your AR camera is tracking the image. The overlay is the multimedia content that your application will superimpose onto your trigger image. In Unity, you can add images, videos, audio files, and animated 3D models as overlays. In our case it is the Rx detailed information Fig 2 above.

Drag the image into the “Assets” folder in the project panel at the bottom of the editor. Once the image has finished importing, select it and change its Texture Type to “Sprite (2D and UI)” in the inspector panel. Click “Apply.”

Next, drag the sprite you just imported onto your “Image Target” game object in the project Hierarchy.

The sprite is now a “child” of the Image Target game object and will only appear if the ARCamera recognizes the Image Target.

Create a User Interface

Now we shall create a simple user interface that guides the patient to scan their pharma Rx label.

Import a .jpg or .png file of your target image into your Assets folder.

You can do this by simply clicking and dragging the image file from its current file location into the Unity Assets folder. Once the image is imported, convert it to “Sprite (2D and UI)” in the inspector panel like you did for the author image overlay.

Next, navigate to the Hierarchy panel and select “Create > UI > Image.” Select the “2D” option in your scene view menu and center the image you just added by selecting it and clicking “F” on your keyboard. Click and drag the blue corners to center the image in your Canvas frame. Then, drag your Rx label into the “Source Image” parameter in the inspector panel.

Next, add some instructions to your UI by

  1. Selecting “Create > UI > Text.”
  2. In the inspector panel, write some instructions in the text field, such as “Scan this Prescription Label.”
  3. Position your text box directly below your book cover and modify the font and color accordingly.

Before you can add functionality to the UI, you will need to combine your Canvas elements into a single game object.

  1. Select “Create > Create Empty.” This will add an empty GameObject to your project Hierarchy.
  2. Rename this GameObject “Finder” and make it a child of the Canvas by dragging it into the Canvas game object.
  3. Drag the Image and Text game objects and make them children of your new “Finder” game object.

For this next step of the UI design, you will need to modify Vuforia’s “Default Trackable Event Handler” so that your “Finder” game object disappears from the user’s view once the Image Target has been found by the application.

Locate the script number packages folder and rightclick on open c# project as shown below.

Clicking open c# project will open Visual Studio editor like below

For this simple case study. We might just be able to modify the code through Unity console.

Select ImageTarget in Hierarchy and find the DefaultTrackableEvent in Inspector. Initially there will not be entries under On Target Found () and On Target Lost () methods.

Click the “+” button.

Select values for the 3 fields under On Target Found and On Target Lost as below. Note the values for the check box under each.

These settings tell the application to disable the Finder game object if the target image is found and to enable the Finder game object if the target image is lost.

Test Your Scene

Save your scene and press the play icon in the menu above the scene view.

Hold your prescription label in front of your webcam. You should see your UI overlaid onto the camera view. Place your label into the camera view and the prescription information / medication literature details image should appear overlaid on top of it.

Digital Patient Health Record Locker using Azure Blockchain Workbench

With Blockchain Workbench, you can define blockchain applications using configuration and writing smart contract code. In this blog I shall walk you through a set of generic steps to create and execute a blockchain application workflow in Azure Blockchain Workbench and then using those steps we shall create a Digital Patient Health Record Locker to help Clinics in the frontline of Covid19 maintain patient records securely and exchange them with authorized third parties.

Define a configuration file:

It contains configuration metadata defining the high-level workflows and interaction model of the blockchain application. 

  1. Application Metadata like name and description
  2. Application roles, defining the user roles who can act or participate within the blockchain application
  3. Workflows define one or more stages and actions of the contract.

Smart contract code file

Smart contracts represent the business logic of the blockchain application. Azure Blockchain workbench internally uses Ethereum for blockchain ledger.

Ethereum uses Solidity as its programming language. Logic is written in Solidity to enforce contract, contains state and functions to implement stages and actions of the smart contract.

Solidity code contains the following sections:

  1. Version Pragma:  Version of Solidity
  2. Contract header: Header of Smart Contract
  3. State Variables: State variables store values of the state for each contract instance.
  4. Constructor: The constructor defines input parameters for a new smart contract instance of a workflow. 
  5. Functions: Functions are the executable units of business logic within a contract. 

Deploy Azure Blockchain Workbench

  • Sign in to the Azure portal.
  • Select Create a resource in the upper left-hand corner of the Azure portal.
  • Select Blockchain > Azure Blockchain Workbench
  • Select OK to finish the basic setting configuration section.
  • In Advanced Settings, choose create a new blockchain network
  • Select Create to agree to the terms and deploy your Azure Blockchain Workbench.

After deployment is completed

  • Navigate to your resource group in Azure Portal
  • There are two resources with type App Service. Select the resource of type App Service without the “-api” suffix.
  • In the App Service Overview, copy the URL value, which represents the web URL to your deployed Blockchain Workbench.

Something like: https://CovidFrontlineblockchain-p6c7yh.azurewebsites.net

Azure AD must be configured to complete your Blockchain Workbench deployment.

  • In a browser, navigate to the https://CovidFrontlineblockchain-p6c7yh.azurewebsites.net
  • You’ll see instructions to set up Azure AD using Cloud Shell. Copy the command and launch Cloud Shell.
  • In Cloud Shell PowerShell environment, paste and run the command.
  • When prompted, enter the Azure AD tenant you want to use for Blockchain Workbench. 
  • You’ll be prompted to authenticate to the Azure AD tenant using a browser. Open the web URL in a browser, enter the code, and authenticate.
  • The script outputs several status messages. You get a SUCCESS status message if the tenant was successfully provisioned.
  • Navigate to the Blockchain Workbench URL. 
  • Select Accept to consent.
  • After consent, the Blockchain Workbench web app can be used.

Add blockchain application to Blockchain Workbench

  • In a web browser, navigate to the https://CovidFrontlineblockchain-p6c7yh.azurewebsites.net
  • Sign in as a Blockchain Workbench administrator.
  • Select Applications > New. The New application pane is displayed.
  • Select Upload the contract configuration > Browse to locate the PHRLocker.json configuration file you created.
  • Select Upload the contract code > Browse to locate the PHRLocker.sol smart contract code file.
  • Select Deploy to create the blockchain application based on the configuration and smart contract files.
  • When deployment is finished, the new application is displayed in Applications.

Add blockchain application members

  • Select Applications > PHR Locker!.
  • The number of members associated to the application is displayed in the upper right corner of the page. For a new application, the number of members will be zero.
  • Select the members link in the upper right corner of the page. A current list of members for the application is displayed.
  • In the membership list, select Add members.
  • Select or enter the member’s name you want to add.
  • Select the Role for the member.
  • Select Add to add the member with the associated role to the application.

Create new contract

  • In Blockchain Workbench application section, select the application tile that contains the contract you want to create.
  • To create a new contract, select New contract.
  • The New contract pane is displayed. Specify the initial parameters values. Select Create.
  • The newly created contract is displayed in the list with the other active contracts.

Take action on contract

  • Depending on the state the contract is in, members can take actions to transition to the next state of the contract.
  • In Blockchain Workbench application section, select the application tile that contains the contract to take the action.
  • Select the contract in the list. Details about the contract are displayed in different sections.
  • In the Action section, select Take action.
  • The details about the current state of the contract are displayed in a pane. Choose the action you want to take in the drop-down.
  • Select Take action to initiate the action.
  • If parameters are required for the action, specify the values for the action.
  • Select Take action to execute the action.

Now lets Kick the tires and light the fires !!

Using the steps defined in the sections above we shall now create a Digital Locker Application for holding patient health records.

PHR Digital Locker Application

Overview 

The PHR Digital Locker application expresses a workflow of sharing digitally locked patient record files where the Patient controls the access to his/her health record files. We illustrate Digital Locker using an example of a Patient performing access control to their document held by a Insurance Provider.  The state transition diagram below shows the interactions among the states in this workflow. 

Application Roles 

——————

NameDescription
 
PatientThe owner of the digital asset.                  
InsuranceAgentThe keeper of the digital asset.
ClinicAn Entity requesting access to the digital asset. 
CurrentAuthorizedUserAn Entity currently authorized to access the digital asset. 

States 

——————

StateDescription
 
Requested             Indicates patient’s request to make the digital asset available. 
DocumentReviewIndicates that the insurance agent has reviewed the patient’s request.                                           
AvailableToShareIndicates that the insurance agent has uploaded the digital asset and the digital asset is available for sharing
SharingWithThirdPartyIndicates that the patient is reviewing a Clinic’s request to access the digital asset.          
Terminated            Indicates termination of sharing the digital asset.                                                       

State Transition Diagram (Fig. 1)

An instance of the PHR (Patient Health Records) Digital Locker application’s workflow starts in the Requested state when a Patient requests their Insurance Agent to begin a process of sharing a document held by the Insurance Provider. 

An InsuranceAgent causes the state to transition to DocumentReview by calling the function BeginReviewProcess indicating that the process to review the request has begun. 

Once the review is complete, the InsuranceAgent then makes the document available by uploading the documents. 

The AvailableToShare state can be thought of a perpetual state, more on this in a bit. Once the document is available to share, the document can be shared either with a Clinic that the patient has identified or any random Clinic requestor. 

If the patient specifies the Clinic requestor, then the state transitions from AvailableToShare to SharingWithThirdParty.  If a random Clinic requestor needs access to the document, then that Clinic requestor first requests access to the document.  At this point, the patient can either accept the request and grant access or reject the request. 

If the patient rejects the request to the random Clinic requestor, then the state goes back to AvailableToShare.  If the patient accepts the request to allow the random Clinic request to access the document, then the state transitions to SharingWithThirdParty. 

Once the Clinic requestor is done with the document, they can release the lock to the document and the state transitions to AvailableToShare.  The patient can also cause the state to transition from SharingWithThirdParty to AvailableToShare when they revoke access from the third-party (Clinic) requestor. 

Finally, at any time during these transitions the insurance agent can decide to terminate the sharing of the document once the document becomes available to share. 

The happy path shown in the state transition diagram (Fig. 1) traces a path where the patient grants access to a random third party (Clinic). 

Some Screenshots

Workbench home

PHR Locker Smart Contract

Adding Users

Taking Action

Ledger Entries

PHRLocker.json (Configuration File)


{
  "ApplicationName": "PHRLocker",
  "DisplayName": "PHR Locker",
  "Description": "...",
  "ApplicationRoles": [
    {
      "Name": "Patient",
      "Description": "..."
    },
    {
      "Name": "InsuranceAgent",
      "Description": "..."
    },
    {
      "Name": "Clinic",
      "Description": "..."
    },
    {
      "Name": "CurrentAuthorizedUser",
      "Description": "..."
    }
  ],
  "Workflows": [
    {
      "Name": "PHRLocker",
      "DisplayName": "PHR Locker",
      "Description": "...",
      "Initiators": [
        "Patient"
      ],
      "StartState": "Requested",
      "Properties": [
        {
          "Name": "State",
          "DisplayName": "State",
          "Description": "Holds the state of the contract",
          "Type": {
            "Name": "state"
          }
        },
        {
          "Name": "Patient",
          "DisplayName": "Patient",
          "Description": "...",
          "Type": {
            "Name": "Patient"
          }
        },
        {
          "Name": "InsuranceAgent",
          "DisplayName": "Insurance Agent",
          "Description": "...",
          "Type": {
            "Name": "InsuranceAgent"
          }
        },
        {
          "Name": "LockerFriendlyName",
          "DisplayName": "Locker Friendly Name",
          "Description": "...",
          "Type": {
            "Name": "string"
          }
        },
        {
          "Name": "LockerIdentifier",
          "DisplayName": "Locker Identifier",
          "Description": "...",
          "Type": {
            "Name": "string"
          }
        },
        {
          "Name": "CurrentAuthorizedUser",
          "DisplayName": "Current Authorized User",
          "Description": "...",
          "Type": {
            "Name": "CurrentAuthorizedUser"
          }
        },
        {
          "Name": "ExpirationDate",
          "DisplayName": "Expiration Date",
          "Description": "...",
          "Type": {
            "Name": "string"
          }
        },
        {
          "Name": "Image",
          "DisplayName": "Image",
          "Description": "...",
          "Type": {
            "Name": "string"
          }
        },
        {
          "Name": "Clinic",
          "DisplayName": "Clinic",
          "Description": "...",
          "Type": {
            "Name": "Clinic"
          }
        },
        {
          "Name": "IntendedPurpose",
          "DisplayName": "Intended Purpose",
          "Description": "...",
          "Type": {
            "Name": "string"
          }
        },
        {
          "Name": "LockerStatus",
          "DisplayName": "Locker Status",
          "Description": "...",
          "Type": {
            "Name": "string"
          }
        },
        {
          "Name": "RejectionReason",
          "DisplayName": "Rejection Reason",
          "Description": "...",
          "Type": {
            "Name": "string"
          }
        }
      ],
      "Constructor": {
        "Parameters": [
          {
            "Name": "lockerFriendlyName",
            "Description": "...",
            "DisplayName": "Locker Friendly Name",
            "Type": {
              "Name": "string"
            }
          },
          {
            "Name": "insuranceAgent",
            "Description": "...",
            "DisplayName": "Insurance Agent",
            "Type": {
              "Name": "InsuranceAgent"
            }
          }
        ]
      },
      "Functions": [
        {
          "Name": "BeginReviewProcess",
          "DisplayName": "Begin Review Process",
          "Description": "...",
          "Parameters": []
        },
        {
          "Name": "RejectApplication",
          "DisplayName": "Reject Application",
          "Description": "...",
          "Parameters": [
            {
              "Name": "rejectionReason",
              "Description": "...",
              "DisplayName": "Rejection Reason",
              "Type": {
                "Name": "string"
              }
            }
          ]
        },
        {
          "Name": "UploadDocuments",
          "DisplayName": "Documents Upload Placeholder",
          "Description": "...",
          "Parameters": [
            {
              "Name": "lockerIdentifier",
              "Description": "...",
              "DisplayName": "Locker Identifier Placeholder",
              "Type": {
                "Name": "string"
              }
            },
            {
              "Name": "image",
              "Description": "...",
              "DisplayName": "Image Upload Placeholder",
              "Type": {
                "Name": "string"
              }
            }
          ]
        },
        {
          "Name": "ShareWithThirdParty",
          "DisplayName": "Share With Third Party",
          "Description": "...",
          "Parameters": [
            {
              "Name": "clinic",
              "Description": "...",
              "DisplayName": "Clinic",
              "Type": {
                "Name": "Clinic"
              }
            },
            {
              "Name": "expirationDate",
              "Description": "...",
              "DisplayName": "Expiration Date Placeholder",
              "Type": {
                "Name": "string"
              }
            },
            {
              "Name": "intendedPurpose",
              "Description": "...",
              "DisplayName": "Intended Purpose",
              "Type": {
                "Name": "string"
              }
            }
          ]
        },
        {
          "Name": "AcceptSharingRequest",
          "DisplayName": "Accept Sharing Request",
          "Description": "...",
          "Parameters": []
        },
        {
          "Name": "RejectSharingRequest",
          "DisplayName": "Reject Sharing Request",
          "Description": "...",
          "Parameters": []
        },
        {
          "Name": "RequestLockerAccess",
          "DisplayName": "Request Locker Access",
          "Description": "...",
          "Parameters": [
            {
              "Name": "intendedPurpose",
              "Description": "...",
              "DisplayName": "Intended Purpose",
              "Type": {
                "Name": "string"
              }
            }
          ]
        },
        {
          "Name": "ReleaseLockerAccess",
          "DisplayName": "Release Locker Access",
          "Description": "...",
          "Parameters": []
        },
        {
          "Name": "RevokeAccessFromThirdParty",
          "DisplayName": "Revoke Access From Third Party",
          "Description": "...",
          "Parameters": []
        },
        {
          "Name": "Terminate",
          "DisplayName": "Terminate",
          "Description": "...",
          "Parameters": []
        }
      ],
      "States": [
        {
          "Name": "Requested",
          "DisplayName": "Requested",
          "Description": "...",
          "PercentComplete": 0,
          "Style": "Success",
          "Transitions": [
            {
              "AllowedRoles": [
                "InsuranceAgent"
              ],
              "AllowedInstanceRoles": [],
              "Description": "...",
              "Function": "BeginReviewProcess",
              "NextStates": [
                "DocumentReview"
              ],
              "DisplayName": "Begin Review Process"
            }
          ]
        },
        {
          "Name": "DocumentReview",
          "DisplayName": "DocumentReview",
          "Description": "...",
          "PercentComplete": 20,
          "Style": "Success",
          "Transitions": [
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "InsuranceAgent" ],
              "Description": "...",
              "Function": "UploadDocuments",
              "NextStates": [ "AvailableToShare" ],
              "DisplayName": "Upload Documents Placeholder"
            }
          ]
        },
        {
          "Name": "AvailableToShare",
          "DisplayName": "Available To Share",
          "Description": "...",
          "PercentComplete": 30,
          "Style": "Success",
          "Transitions": [
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "Patient" ],
              "Description": "...",
              "Function": "ShareWithThirdParty",
              "NextStates": ["SharingWithThirdParty" ],
              "DisplayName": "Share With Third Party"
            },
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "Patient" ],
              "Description": "...",
              "Function": "Terminate",
              "NextStates": [ "Terminated" ],
              "DisplayName": "Terminate"
            },
            {
              "AllowedRoles": [ "Clinic" ],
              "AllowedInstanceRoles": [],
              "Description": "...",
              "Function": "RequestLockerAccess",
              "NextStates": [ "SharingRequestPending" ],
              "DisplayName": "Request Locker Access"
            }
          ]
        },
        {
          "Name": "SharingRequestPending",
          "DisplayName": "Sharing Request Pending",
          "Description": "...",
          "PercentComplete": 40,
          "Style": "Success",
          "Transitions": [
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "Patient" ],
              "Description": "...",
              "Function": "AcceptSharingRequest",
              "NextStates": [
                "SharingWithThirdParty"
              ],
              "DisplayName": "Accept Sharing Request"
            },
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "Patient" ],
              "Description": "...",
              "Function": "RejectSharingRequest",
              "NextStates": [ "AvailableToShare" ],
              "DisplayName": "Reject Sharing Request"
            },
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "Patient" ],
              "Description": "...",
              "Function": "Terminate",
              "NextStates": [ "Terminated" ],
              "DisplayName": "Terminate"
            }
          ]
        },
        {
          "Name": "SharingWithThirdParty",
          "DisplayName": "Sharing With Third Party",
          "Description": "...",
          "PercentComplete": 45,
          "Style": "Success",
          "Transitions": [
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "Patient" ],
              "Description": "...",
              "Function": "RevokeAccessFromThirdParty",
              "NextStates": [ "AvailableToShare" ],
              "DisplayName": "Revoke Access From Third Party"
            },
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "Patient" ],
              "Description": "...",
              "Function": "Terminate",
              "NextStates": [ "Terminated" ],
              "DisplayName": "Terminate"
            },
            {
              "AllowedRoles": [],
              "AllowedInstanceRoles": [ "Clinic" ],
              "Description": "...",
              "Function": "ReleaseLockerAccess",
              "NextStates": [ "AvailableToShare" ],
              "DisplayName": "Release Locker Access"
            }
          ]
        },
        {
          "Name": "Terminated",
          "DisplayName": "Terminated",
          "Description": "...",
          "PercentComplete": 100,
          "Style": "Failure",
          "Transitions": []
        }
      ]
    }
  ]
}

PHRLocker.sol (Smartcontract code File)


pragma solidity >=0.4.25 <0.6.0;

contract PHRLocker
{
    enum StateType { Requested, DocumentReview, AvailableToShare, SharingRequestPending, SharingWithThirdParty, Terminated }
    address public Patient;
    address public InsuranceAgent;
    string public LockerFriendlyName;
    string public LockerIdentifier;
    address public CurrentAuthorizedUser;
    string public ExpirationDate;
    string public Image;
    address public Clinic;
    string public IntendedPurpose;
    string public LockerStatus;
    string public RejectionReason;
    StateType public State;

    constructor(string memory lockerFriendlyName, address insuranceAgent) public
    {
        Patient = msg.sender;
        LockerFriendlyName = lockerFriendlyName;

        State = StateType.DocumentReview; //////////////// should be StateType.Requested?

        InsuranceAgent = insuranceAgent;
    }

    function BeginReviewProcess() public
    {
        /* Need to update, likely with registry to confirm sender is agent
        Also need to add a function to re-assign the agent.
        */
        if (Patient == msg.sender)
        {
            revert();
        }
        InsuranceAgent = msg.sender;

        LockerStatus = "Pending";
        State = StateType.DocumentReview;
    }

    function RejectApplication(string memory rejectionReason) public
    {
        if (InsuranceAgent != msg.sender)
        {
            revert();
        }

        RejectionReason = rejectionReason;
        LockerStatus = "Rejected";
        State = StateType.DocumentReview;
    }

    function UploadDocuments(string memory lockerIdentifier, string memory image) public
    {
        if (InsuranceAgent != msg.sender)
        {
            revert();
        }
        LockerStatus = "Approved";
        Image = image;
        LockerIdentifier = lockerIdentifier;
        State = StateType.AvailableToShare;
    }

    function ShareWithThirdParty(address clinic, string memory expirationDate, string memory intendedPurpose) public
    {
        if (Patient != msg.sender)
        {
            revert();
        }

        Clinic = clinic;
        CurrentAuthorizedUser = Clinic;

        LockerStatus = "Shared";
        IntendedPurpose = intendedPurpose;
        ExpirationDate = expirationDate;
        State = StateType.SharingWithThirdParty;
    }

    function AcceptSharingRequest() public
    {
        if (Patient != msg.sender)
        {
            revert();
        }

        CurrentAuthorizedUser = Clinic;
        State = StateType.SharingWithThirdParty;
    }

    function RejectSharingRequest() public
    {
        if (Patient != msg.sender)
        {
            revert();
        }
        LockerStatus = "Available";
        CurrentAuthorizedUser = 0x0000000000000000000000000000000000000000;
        State = StateType.AvailableToShare;
    }

    function RequestLockerAccess(string memory intendedPurpose) public
    {
        if (Patient == msg.sender)
        {
            revert();
        }

        Clinic = msg.sender;
        IntendedPurpose = intendedPurpose;
        State = StateType.SharingRequestPending;
    }

    function ReleaseLockerAccess() public
    {

        if (CurrentAuthorizedUser != msg.sender)
        {
            revert();
        }
        LockerStatus = "Available";
        Clinic = 0x0000000000000000000000000000000000000000;
        CurrentAuthorizedUser = 0x0000000000000000000000000000000000000000;
        IntendedPurpose = "";
        State = StateType.AvailableToShare;
    }
    
    function RevokeAccessFromThirdParty() public
    {
        if (Patient != msg.sender)
        {
            revert();
        }
        LockerStatus = "Available";
        CurrentAuthorizedUser = 0x0000000000000000000000000000000000000000;
        State = StateType.AvailableToShare;
    }

    function Terminate() public
    {
        if (Patient != msg.sender)
        {
            revert();
        }
        CurrentAuthorizedUser = 0x0000000000000000000000000000000000000000;
        State = StateType.Terminated;
    }
}

References :

  1. https://docs.microsoft.com/en-us/azure/blockchain/workbench/create-app
  2. https://github.com/Azure-Samples/blockchain/tree/master/blockchain-workbench/application-and-smart-contract-samples

Disclaimer :

What is illustrated above is just the Hello World of Patient Medical record. PHRs typically contain all of the following content and much more :

  • Surgical history
  • Obstetric history
  • Medications and medical allergies
  • Family history
  • Social history
  • Habits
  • Immunization history
  • Growth chart and developmental history
  • Medical encounters
  • Chief complaint
  • History of the present illness
  • Physical examination
  • Assessment and plan
  • Orders and prescriptions
  • Progress notes
  • Test results
  • ………………

Two ways to talk to Azure IoT Hub

This article lists down 2 ways to communicate with Azure IoT hub :

  1. An insecure method using a standalone java client
  2. A secure certificates based method using a standalone python client

Most of these instructions can be found on the Azure site but is scattered all over the place.

This code has been been run and tested locally and guaranteed to be bug free.

Connecting to Azure IoT hub using Java client (unsecure)

Step 1: Create an IoT hub

From the Azure homepage, select the + Create a resource button, and then enter IoT Hub in the Search the Marketplace field.

Select IoT Hub from the search results, and then select Create.

Step 2 : Register a device

Run the following command in Azure Cloud Shell to create the device identity.

az iot hub device-identity create --hub-name {YourIoTHubName} --device-id MyJavaDevice

Run the following command in Azure Cloud Shell to get the device connection string for the device you just registered:

az iot hub device-identity show-connection-string --hub-name {YourIoTHubName} --device-id MyJavaDevice --output table

Incase you plan to build REST api later to retrieve messages ingested into IoT hub , you will need the following details :

Event Hubs-compatible endpointEvent Hubs-compatible path, and service primary key. The following commands retrieve these values for your IoT hub:

az iot hub show --query properties.eventHubEndpoints.events.endpoint --name {YourIoTHubName}

az iot hub show --query properties.eventHubEndpoints.events.path --name {YourIoTHubName}

az iot hub policy show --name service --query primaryKey --hub-name {YourIoTHubName}

Step 3 : Send simulated telemetry

Download the test java client from : https://github.com/Azure-Samples/azure-iot-samples-java/archive/master.zip

  1. In a local terminal window, navigate to the root folder of the sample Java project. Then navigate to the iot-hub\Quickstarts\simulated-device folder.
  2. Open the src/main/java/com/microsoft/docs/iothub/samples/SimulatedDevice.java file in a text editor of your choice.

Replace the value of the connString variable with the device connection string you made a note of earlier in step 2 above. Then save your changes to SimulatedDevice.java.

  • In the local terminal window, run the following commands to install the required libraries and build the simulated device application:
mvn clean package

  • In the local terminal window, run the following commands to run the simulated device application:
java -jar target/simulated-device-1.0.0-with-deps.jar


Connecting to Azure IoT hub using Python client and device certificates (secure)

Step 0 – Get X.509 CA certificates

This section describes how to create your own X.509 certificates using a third-party tool such as OpenSSL.

Git clone https://github.com/Azure/azure-iot-sdk-c.git

Step 1 – Initial Setup

At linux prompt.

cd <path>\azure-iot-sdk-c\tools\CACertificates\

chmod 700 certGen.sh

Step 2 – Create the certificate chain

./certGen.sh create_root_and_intermediate

This will create azure-iot-test-only.root.ca.cert.pem

Next, go to Azure IoT Hub and navigate to Certificates. Add a new certificate, providing the root CA file when prompted.

Step 3 – Proof of Possession

Select the new certificate that you’ve created in IoT Hub and navigate to and select “Generate Verification Code”. This will give you a verification string you will need to place as the subject name of a certificate that you need to sign. Highlighted below to be replaced.

./certGen.sh create_verification_certificate 106A5SD242AF512B3498BD6098C4941E66R34H268DDB3288

the script will output the name of the file containing "CN=106A5SD242AF512B3498BD6098C4941E66R34H268DDB3288" to the console.

Upload this file to IoT Hub (in the same UX that had the “Generate Verification Code”) and select “Verify”.

Step 4 – Create a new device

On Azure IoT Hub, navigate to the IoT Devices section, or launch Azure IoT Explorer.

Add a new device (e.g. avengersDevice), and for its authentication type chose “X.509 CA Signed”. 

Run the below line  to create the new device certificate.

./certGen.sh create_device_certificate avengersDevice

This will create

/certs/new-device.cert.pem and

/private/new-device.key.pem

cd ./certs && cat new-device.cert.pem azure-iot-test-only.intermediate.cert.pem azure-iot-test-only.root.ca.cert.pem > new-device-full-chain.cert.pem

 to get the public key.

Step 5 – Authenticate your X.509 device with the X.509 certificates

pip install paho-mqtt

Kindly note : in the code snippet below the certificate mentioned in the line :

path_to_root_cert = "/home/ubuntu/azure/digicert.cer"

Is DigiCert’s Baltimore Root certificate.

You can create this file by copying the certificate information from certs.c in the Azure IoT SDK for C. <path>\azure-iot-sdk-c\certs\certs.c

Include the lines -----BEGIN CERTIFICATE----- and -----END CERTIFICATE-----, remove the " marks at the beginning and end of every line, and remove the \r\n characters at the end of every line.

Use the following python code to connect : python-cert-device-client.py

from paho.mqtt import client as mqtt
import ssl

#path_to_root_cert = "/home/ubuntu/azure/certs/certs/azure-iot-test-only.root.ca.cert.pem"
path_to_root_cert = "/home/ubuntu/azure/digicert.cer"
device_id = "avengersDevice"
sas_token = "<generated SAS token>"
iot_hub_name = "AvengersHub"


def on_connect(client, userdata, flags, rc):
    print("Device connected with result code: " + str(rc))


def on_disconnect(client, userdata, rc):
    print("Device disconnected with result code: " + str(rc))


def on_publish(client, userdata, mid):
    print("Device sent message")


client = mqtt.Client(client_id=device_id, protocol=mqtt.MQTTv311)

client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_publish = on_publish

# Set the username but not the password on your client
client.username_pw_set(username=iot_hub_name+".azure-devices.net/" +
                       device_id + "/?api-version=2018-06-30", password=None)

# Set the certificate and key paths on your client
cert_file = "/home/ubuntu/azure/certs/certs/new-device.cert.pem"
key_file = "/home/ubuntu/azure/certs/private/new-device.key.pem"
client.tls_set(ca_certs=path_to_root_cert, certfile=cert_file, keyfile=key_file,
               cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
client.tls_insecure_set(False)

client.connect(iot_hub_name+".azure-devices.net", port=8883)

client.publish("devices/" + device_id + "/messages/events/", "{id=123}", qos=1)
client.loop_forever()

Data Analysis of COVID-19 outbreak using matplotlib and fbprophet

The purpose of this blog post is to perform a data analysis of the COVID-19 outbreak, arrive at some charts and perform some forecasting for the near future.

Install Python and Jupyter Notebook to Windows 10 (64 bit)

  1. Download Python 3.7.4 or latest from “https://www.python.org/downloads/release/python-374/” url
  2. Choose and select “x86–64 executable installer” for Windows 10–64 bit computer and install
  3. Check the installation by running “Idle”
  4. Set “Python37” path and “Python37/Scripts” path to environment variable
  5. Open command prompt and check the python version using “python –version”
  6. Install pip using “pip install virtualenv”
  7. Upgrade pip using “python -m pip install -upgrade pip”
  8. Create a virtual environment called opencv (“virtualenv opencv”)
  9. Move to Scripts folder and activate opencv virtual environment using “activate.bat”
  10. Install numpy using “pip install numpy”
  11. Install OpenCV using “pip install opencv-python”
  12. Install Matplotlib using “pip install matplotlib”
  13. Install Jupyter using “python -m pip install jupyter”
  14. Run Jupyter Notebook using “jupyter notebook”
  15. Start the notebook server and popup dashboard in browser using “localhost:8888/tree” url
  16. Create your first notebook using dashboard by clicking on new Python 3

17. Type print (“Hello World”) in the cell and click Run to view the output or Type 3 + 5 in the cell and view output.

Reference :

https://medium.com/@kswalawage/install-python-and-jupyter-notebook-to-windows-10-64-bit-66db782e1d02

Plot a graph of existing data

  1. Install conda using miniconda https://docs.conda.io/projects/conda/en/latest/user-guide/install/. Advisable to reboot the laptop post installation.
  2. Open conda powershell prompt

cd covid-19-notebooks

conda env create -f environment.yaml

  • (base) PS C:\MainFolder\CondaInstalledFolder> conda activate COVID19
  • (COVID19) PS C:\MainFolder\CondaInstalledFolder> cd D:\covid-19\covid-19-notebooks\notebooks

(COVID19) PS D:\covid-19\covid-19-notebooks\notebooks> jupyter notebook

  • Ensure that the attached csv (data) file and the notebook file are present in the folder. Have modified the code to update the graph. You can customize it the way you like.
  • The output can be viewed like in the attached screen below.

Reference : https://github.com/alexamici/covid-19-notebooks

Plot a graph for forecasted data

  1. Download the corona virus dataset from https://www.kaggle.com/chriscc/coronavirus-confirmed-prediction-with-prophet
  1. Create a new environment with Python3.5
    1. conda create -n pht python=3.5 anaconda
  2. Install Prophet using the command.
    1. conda install -c conda-forge fbprophet
  3. (base) PS C:\MainFolder\CondaInstalledFolder> conda activate pht
  4. (pht) PS C:\MainFolder\CondaInstalledFolder> cd D:\covid-19\covid-19-notebooks\notebooks
  5. (pht) PS D:\covid-19\covid-19-notebooks\notebooks> jupyter notebook
  6. Run the cells in the attached notebook along with the attached data.

8. The output can be viewed like in the attached screen below. It is prediction of cases over the next 1 month.

(The specific graph below is the prediction of number of cases for Canada till May-09-2020.)

Reference :

https://stackoverflow.com/questions/42822902/can-someone-help-me-in-installing-python-package-prophet-on-windows-10

https://www.worldometers.info/coronavirus/

The Notebook

Run

$ jupyter nbconvert --to html mynotebook.ipynb

to convert the notebook to html or pdf.

xelatex will need to be installed for converting to pdf.

Concluding Statement

It goes without saying that

Your analytics and Insights are only as good as the data that feeds them !!

Glaucoma, Cataract and Retina Disease Detection

Using TensorFlow based model using CustomVision (Azure Cognitive Services)

Synopsis

Globally more than 1 billion people are affected by vision impairment or blindness due to unaddressed cataract (65.2 million), glaucoma (6.9 million), retina disease (3 million).

Proposed here is the development of an AI based system using Azure Cognitive Services CustomVision tool to predict the probability of existence of one of chronic conditions in an eye scan.

Preparing data

Navigate to the url below and download the full normal eye dataset of 300 images

https://www.kaggle.com/jr2ngb/cataractdataset/version/2

Use the url below to download BinRushed.zip file containing all Glaucoma Images

https://deepblue.lib.umich.edu/data/concern/data_sets/3b591905z?locale=en

Deepdiving into Azure CustomVision.ai

Navigate to CustomVision.ai and click on the New Project icon

Creating a new project

Create a new project by entering the mandatory details below

Leave all radio buttons to default except for Domains where you choose the domain of relevance to your task. Else General (compact) should be good.

Select Basic platforms under Export Capabilities which uses a Tensorflow based model.

You should see the newly created project page as below

Adding new tags

Hit the + button next to Tags and add 2 new tags : Glaucoma-Eye and Normal-Eye as shown below.

Uploading images

Upload all the 300 images in the BinRushed4 dataset. Click on Add Images , select all files

Click on MyTags textbox below and make sure that you click the Glaucoma-Eye tag before uploading all 357 files.

Next Upload 200 images from the cataractdatasetdataset1_normal folder. Click on Add Images , select 200 out of 300 files, select the Normal-Eye tag and hit upload

Training the model

Hit the green Train button on top , select Quick Training and click Train.

After the training is completed you should be able to see the Iteration details as below.

Click the info icon next to Precision, Recall and AP to understand these terms.

Quick Test of model

Hit the Quick Test button on top and then “Browse Local Files”, select any image from the folder cataractdatasetdataset1_normal from 201 to 300 (which has not been part of the training set)

Observe the Tag and Probability values for the image. It confirms that the Eye is normal.

Click “Browse Local Files”, select any Glaucoma positive image from BinRushedBinRushed1.

Observe Tag and Probability values. It confirms that the eye Glaucoma Positive

Adding Cataract and Retina Disease detection

Next add a new Tag called “Cataract Eye” and upload 90 images from cataractdatasetdataset2_cataract. Hit the train button to retrain the model and observe the performance values change.

Click on Quick Test and use one of the remaining 10 images in the untrained dataset to predict if it is a Cataract Eye or not.

Repeat all steps for Cataract Detection for Retina Disease and test with an Eye image that is positive for Retina Disease as below.

Altogether

The Journey Forward

Find as many images as possible on the web about Glaucoma, Cataract, Retina Disease and Normal Eye as possible, upload them to the appropriate tags and retrain the model.

Try sliding the Probability Threshold bar on the Performance tab to check if you get more accurate predictions.

Docker to Docker networking between TCP Client and Server

A TCP based client and server is a frequently needed setup in IoT based applications.

Presented in this article is :

  1. A method to create a java based client and server to send and receive binary data
  2. The client and server each run inside a docker container and communicate through docker networking.

Create a TCPClient.java as per the code provided below



import java.net.*; 
import java.io.*; 
import java.security.SecureRandom;
import java.nio.charset.Charset;

public class TCPClient 
{ 
	// initialize socket and input output streams 
	private Socket socket		 = null; 
	private DataInputStream input = null; 
	private DataOutputStream out	 = null; 
	
	private static final String CHAR_LOWER = "abcdefghijklmnopqrstuvwxyz";
	private static 	final String CHAR_UPPER = CHAR_LOWER.toUpperCase();
	private static 	final String NUMBER = "0123456789";

	private static final String DATA_FOR_RANDOM_STRING = CHAR_LOWER + CHAR_UPPER + NUMBER;
	private static SecureRandom random = new SecureRandom();

	// constructor to put ip address and port 
	public TCPClient(String address, int port) 
	{ 
	

	
		// establish a connection 
		try
		{ 
			socket = new Socket(address, port); 
			System.out.println("Connected"); 

		

		} 
		catch(UnknownHostException u) 
		{ 
			System.out.println(u); 
		} 
		catch(IOException i) 
		{ 
			System.out.println(i); 
		} 

		// string to read message from input 
		String line = ""; 
		int linecount = 0;

		// keep reading until "Over" is input 
		while (linecount++ != 300) 
		{ 
	
	
			try{
				String name = generateRandomString(8);

				InputStream is = new ByteArrayInputStream(name.getBytes(Charset.forName("UTF-8")));
				// takes input from terminal 
				input = new DataInputStream(is); 

				// sends output to the socket 
				out = new DataOutputStream(socket.getOutputStream()); 
			} 
			catch(UnknownHostException u) 
			{ 
				System.out.println(u); 
			} 
			catch(IOException i) 
			{ 
				System.out.println(i); 
			} 				
			
			try
			{ 
				line = input.readLine(); 
				out.writeUTF(line); 
			} 
			catch(IOException i) 
			{ 
				System.out.println(i); 
			} 
		} 

		// close the connection 
		try
		{ 
			input.close(); 
			out.close(); 
			socket.close(); 
		} 
		catch(IOException i) 
		{ 
			System.out.println(i); 
		} 
	} 

    public static String generateRandomString(int length) {
        if (length < 1) throw new IllegalArgumentException();

        StringBuilder sb = new StringBuilder(length);
        for (int i = 0; i < length; i++) {

			// 0-62 (exclusive), random returns 0-61
            int rndCharAt = random.nextInt(DATA_FOR_RANDOM_STRING.length());
            char rndChar = DATA_FOR_RANDOM_STRING.charAt(rndCharAt);

            // debug
            System.out.format("%d\t:\t%c%n", rndCharAt, rndChar);

            sb.append(rndChar);

        }

        return sb.toString();

    }
	public static void main(String args[]) 
	{ 
		TCPClient client = new TCPClient("server", 5678); 
	} 
} 

Create a manifest file for TCPClient as below

Manifest-Version: 1.0
Created-By: Me
Main-Class: TCPClient

Create a Dockerfile for TCPClient as below

FROM java:8
WORKDIR /
ADD TCPClient.jar TCPClient.jar
EXPOSE 5678
CMD java -jar TCPClient.jar

Create a TCPServer.java as per the code provided below


 
import java.net.*; 
import java.io.*; 

public class TCPServer 
{ 
	//initialize socket and input stream 
	private Socket		 socket = null; 
	private ServerSocket server = null; 
	private DataInputStream in	 = null; 
	private File file;
	private FileWriter fileWriter;

	// constructor with port 
	public TCPServer(int port) 
	{ 
		// starts server and waits for a connection 
		try
		{ 
			server = new ServerSocket(port); 
			System.out.println("Server started"); 

			System.out.println("Waiting for a client ..."); 

			socket = server.accept(); 
			System.out.println("Client accepted"); 

			// takes input from the client socket 
			in = new DataInputStream( 
				new BufferedInputStream(socket.getInputStream())); 

			String line = ""; 
			int linecount = 0;
			
			file = new File("outfile.txt");
			fileWriter = new FileWriter(file);


			// reads message from client until "Over" is sent 
			while (linecount++ != 300) 
			{ 
				try
				{ 
					line = in.readUTF(); 
					System.out.println(line); 
					fileWriter.write(line);



				} 
				catch(IOException i) 
				{ 
					System.out.println(i); 
				} 
			} 
			System.out.println("Closing connection"); 
			fileWriter.flush();
			fileWriter.close();
			// close connection 
			socket.close(); 
			in.close(); 
		} 
		catch(IOException i) 
		{ 
			System.out.println(i); 
		} 
	} 

	public static void main(String args[]) 
	{ 
		TCPServer server = new TCPServer(5678); 
	} 
} 

Create a manifest file for TCPServer as below

Manifest-Version: 1.0
Created-By: Me
Main-Class: TCPServer

Create a Dockerfile for TCPServer as below

FROM java:8
WORKDIR /
ADD TCPServer.jar TCPServer.jar
EXPOSE 5678
CMD java -jar TCPServer.jar

The directory structure should be as displayed below

Run the following commands at the unix prompt :

javac TCPServer.java

javac TCPClient.java

 

jar cfm TCPServer.jar manifest.txt TCPServer.class

jar cfm TCPClient.jar manifest.txt TCPClient.class

 

docker build -t tcpserver .

docker build -t tcpclient .

 

docker images (to list the docker images)

docker tag 00c5f2d27133 docker.io/<your account name>/<repo name>:tcpserver

docker push docker.io/<your account name>/<repo name>:tcpserver

 

docker images (to list the docker images)

docker tag 00c5f2d27133 docker.io/<your account name>/<repo name>:tcpclient

docker push docker.io/<your account name>/<repo name>:tcpclient

 

docker pull docker.io/<your account name>/<repo name>:tcpserver

docker pull docker.io/<your account name>/<repo name>:tcpclient

docker run <your account name>/<repo name>:tcpserver

docker run <your account name>/<repo name>:tcpclient

Create the network – 

sudo docker network create client_server_network

Run the server –

docker run --network-alias server --network client_server_network -it <your account name>/<repo name>:tcpserver

Run the client –

docker run --network client_server_network -it <your account name>/<repo name>:tcpclient

You should see the client output as below

And the server output should be as shown below

10 ideas for a smart workplace

  1. Smart water dispenser
    – scan with employee badge to dispense water
    – total quantity consumed in a day recorded against employee id in cloud
  2. Smart Chair
    – Records employee weight once a day when seated
    – scan with employee badge to record.
    – The chair automatically sends an alert which is displayed as a popup notification on laptop to get up and start walking when someone is seated on it for more than say 2 hours.
  3. Smart payment card
    – records the food type and calories consumed automatically when the payment is made in the food court.
    – Alerts on high carbs, high fat consumption etc.
  4. Smart badge
    – Fitted with a pedometer inside, records the total footsteps from the time the employee walks into the office till they sign out.
  5. Smart mouse
    – Measures blood pressure and pulse rate to determine employee stress level
  6. Smart No touch thermometer
    – Fitted at the attendance recording machine near the exit door records employee temperature at entry and exit.
    – This could be linked to employee attendance too. For example If and when the employee looks into this thermometer and it records the temperature a signal sent to the attendance register to record in-time or out-time.
  7. Smart voice recorder application
    – Installed in the employee laptop systray and mobile phone recognizes a particular employee voice. For that voice records the upper and lower levels to determine if an employee is stressed at work.
  8. Smart facial expression recorder application
    – Installed in the employee laptop systray records the employee facial expression every 60 mins in a day to gauge their mood.
  9. Employee social credits
    – Many a times employees perform tasks like conducting interviews, creating slides, running scrum calls , sitting in at meetings in absence of a coworker in office or if a coworker is called in for another imp task. These usually go unrecorded and forgotten. Instead each employee can be allowed to earn social credit points for each of these tasks and it can be made a company policy to trade social credits such that no employee is overburdened with the credits.
  10. Daily manager and reportees rating app
    – A simple star based rating app similar to the one found in radio taxis. App shows up on laptop.
    – Employee selects the name from auto search box and provides rating and any specific comments for the day or every couple of days or on a day there were issues in relationship with the co-worker.
    – at the end of month ratings collated to determine employee behavior.

De-Identifying or Randomizing production data in MySQL database

There is a necessity in several Web application projects to pull valid data from production and test against the same in Pre-prod environments

Fortunately or Unfortunately there are compliance issues associated with using production data.

For example in the medical domain you have HIPAA compliance, in aerospace, banking and other domains you have GDPR which prohibit disclosing production environment user details , what is appropriately called PHI (Personal Health Information) to unauthorized users.

Therefore it becomes necessary to de-identify or randomize production data before using it in a non-production environment.

Presented below are some queries and functions which will help a developer to :

1. Export data from a production database into a dump file

2. Import either the entire data or a subset of the tables into a pre-prod database.

3. Change user first names and last name to random anglical names.

4. Change key patient attributes like social security number, mobile phone number, email id , address,zipcode to valid but random value.

5. To verify the de-identified pre-prod database schema for correctness

All the queries are presented with reference to MySQL database. But the same can be replicated for other database like MS SqlServer or Postgresql

Assumptions :

my production database schema name is : batman_prod

user id is : joker

password is : joker#007

database hostname : darkKnight

my production database schema name is : batman_uat

user id is : nolan

password is : nolan#007

database hostname : punkRock

developer test email id :

test.developer.google@gmail.com

Export Data

mysqldump --routines -u joker -pjoker#007 -h darkKnight batman_prod > /path/batman_full_proddbdump-date +%F_%T.sql
mysqldump --routines -u joker -pjoker#007 -h darkKnight batman_prod BILL_TABLE BOB_TABLE JERRY_TABLE DAVID_TABLE > /path/batman_part_proddbdump-date +%F_%T.sql

Import data

mysql -h punkRock -P 3306 -u nolan -pnolan#007 batman_uat < batman_part_proddbdump-2019-08-25_00_00_01.sql

De-identify

update HOSPITAL_USER set first_name = ELT(FLOOR(1 + (RAND() * (100-1))), "James","Mary","John","Patricia","Robert","Linda","Michael","Barbara","William","Elizabeth","David","Jennifer","Richard","Maria","Charles","Susan","Joseph","Margaret","Thomas","Dorothy","Christopher","Lisa","Daniel","Nancy","Paul","Karen","Mark","Betty","Donald","Helen","George","Sandra","Kenneth","Donna","Steven","Carol","Edward","Ruth","Brian","Sharon","Ronald","Michelle","Anthony","Laura","Kevin","Sarah","Jason","Kimberly","Matthew","Deborah","Gary","Jessica","Timothy","Shirley","Jose","Cynthia","Larry","Angela","Jeffrey","Melissa","Frank","Brenda","Scott","Amy","Eric","Anna","Stephen","Rebecca","Andrew","Virginia","Raymond","Kathleen","Gregory","Pamela","Joshua","Martha","Jerry","Debra","Dennis","Amanda","Walter","Stephanie","Patrick","Carolyn","Peter","Christine","Harold","Marie","Douglas","Janet","Henry","Catherine","Carl","Frances","Arthur","Ann","Ryan","Joyce","Roger","Diane");
update HOSPITAL_USER set last_name = ELT(FLOOR(1 + (RAND() * (100-1))), "Smith","Johnson","Williams","Jones","Brown","Davis","Miller","Wilson","Moore","Taylor","Anderson","Thomas","Jackson","White","Harris","Martin","Thompson","Garcia","Martinez","Robinson","Clark","Rodriguez","Lewis","Lee","Walker","Hall","Allen","Young","Hernandez","King","Wright","Lopez","Hill","Scott","Green","Adams","Baker","Gonzalez","Nelson","Carter","Mitchell","Perez","Roberts","Turner","Phillips","Campbell","Parker","Evans","Edwards","Collins","Stewart","Sanchez","Morris","Rogers","Reed","Cook","Morgan","Bell","Murphy","Bailey","Rivera","Cooper","Richardson","Cox","Howard","Ward","Torres","Peterson","Gray","Ramirez","James","Watson","Brooks","Kelly","Sanders","Price","Bennett","Wood","Barnes","Ross","Henderson","Coleman","Jenkins","Perry","Powell","Long","Patterson","Hughes","Flores","Washington","Butler","Simmons","Foster","Gonzales","Bryant","Alexander","Russell","Griffin","Diaz","Hayes");
update batman_uat.HOSPITAL_PATIENT_DETAILS set ssn_id = str_random('(d{3})-d{2}-d{4}');
update batman_uat.HOSPITAL_PATIENT_DETAILS set email = CONCAT('test.developer.google+', first_name ,last_name,'@gmail.com');

Here we use the unique feature available in gmail where you can append any string to a valid email id test.developer.google with the sign + and generate N number of unique email ids. That way a single inbox can be used to test send/receive of emails on behalf of several user ids.

update batman_uat.HOSPITAL_PATIENT_DETAILS set address = concat(str_random('(d{5}) '), ELT(FLOOR(1 + (RAND() * (20-1))),"Second","Third","First","Fourth","Park","Fifth","Main","Sixth","Oak","Seventh","Pine","Maple","Cedar","Eighth","Elm","View","Washington","Ninth","Lake","Hill" ),str_random(' [street|lane|road|park] '))

Here I used the 20 most commonly found street names in the US to randomize the address.

update batman_uat.HOSPITAL_PATIENT_DETAILS set zipcode = str_random('(d{5})');
update batman_uat.HOSPITAL_PATIENT_DETAILS set dob = str_random_date('1920-01-01','2018-12-31','%Y-%m-%d');
update batman_uat.HOSPITAL_PATIENT_DETAILS set mobile_phone = str_random('(d{3})-d{3}-d{4}');

Below are some queries to check the resultant pre-prod schema

Find total number of rows by each table in all tables :

SELECT table_name, table_rows

FROM INFORMATION_SCHEMA.TABLES

WHERE TABLE_SCHEMA = 'batman_uat';

Find tables matching a given column name:

SELECT DISTINCT TABLE_NAME, COLUMN_NAME

FROM INFORMATION_SCHEMA.COLUMNS

WHERE COLUMN_NAME LIKE 'hospital_patient_first_name'

References:

fn_str_random.sql and fn_str_random_date.sql : MySQL Function code below. Thanks to the author !

SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='STRICT_TRANS_TABLES';

DELIMITER //
DROP FUNCTION IF EXISTS str_random;
//

CREATE FUNCTION str_random(p_pattern VARCHAR(200))
    RETURNS VARCHAR(2000)
    NO SQL
    BEGIN
    /**
    * String function. Returns a random string based on a mask
    * <br>
    * %author Ronald Speelman
    * %version 2.3
    * Example usage:
    * SELECT str_random('dddd CC') AS DutchZipCode;
    * SELECT str_random('d{4} C{2}') AS DutchZipCode;
    * SELECT str_random('*{5}*(4)') AS password;
    * select str_random('Cccc(4)') as name;
    * SELECT str_random('#X{6}') AS htmlColorCode;
    * See more complex examples and a description on www.moinne.com/blog/ronald
    *
    * %param p_pattern String: the pattern describing the random values
    *                          MASKS:
    *                          c returns lower-case character [a-z]
    *                          C returns upper-case character [A-Z]
    *                          A returns either upper or lower-case character [a-z A-Z]
    *                          d returns a digit [0-9]
    *                          D returns a digit without a zero [1-9]
    *                          b returns a bit [0-1]
    *                          X returns hexadecimal character [0-F]
    *                          * returns characters, decimals and special characters [a-z A-Z 0-9 !?-_@$#]
    *                          DIRECTIVES
    *                          "text"      : text is taken literally
    *                          {nn}        : repeat the last mask nn times
    *                          (nn)        : repeat random, but max nn times
    *                          [item|item] : pick a random item from this list, items are separated by a pipe symbol
    *                          All other characters are taken literally
    * %return String
    */

    DECLARE v_iter              SMALLINT DEFAULT 1;
    DECLARE v_char              VARCHAR(1) DEFAULT '';
    DECLARE v_next_char         VARCHAR(1) DEFAULT '';
    DECLARE v_list              VARCHAR(200) DEFAULT '';
    DECLARE v_text              VARCHAR(200) DEFAULT '';
    DECLARE v_result            VARCHAR(2000) DEFAULT '';
    DECLARE v_count             SMALLINT DEFAULT 0;
    DECLARE v_jump_characters   TINYINT DEFAULT 0;
    DECLARE v_end_position      SMALLINT DEFAULT 0;
    DECLARE v_list_count        TINYINT DEFAULT 0;
    DECLARE v_random_item       TINYINT DEFAULT 0;

    WHILE v_iter <= CHAR_LENGTH(p_pattern) DO

        SET v_char := BINARY SUBSTRING(p_pattern,v_iter,1);
        SET v_next_char := BINARY SUBSTRING(p_pattern,(v_iter + 1),1);

        -- check if text is a fixed text
        IF (v_char = '"') THEN
            -- get the text
            SET v_end_position := LOCATE('"', p_pattern, v_iter + 1);
            SET v_text := SUBSTRING(p_pattern,v_iter + 1,(v_end_position - v_iter) - 1);
            -- add the text to the result
            SET v_result := CONCAT(v_result, v_text);
            SET v_iter := v_iter + CHAR_LENGTH(v_text) + 2;
        -- if character has a count specified: repeat it
        ELSEIF (v_next_char = '{') OR (v_next_char = '(') THEN
            -- find out what the count is (max 999)...
            IF (SUBSTRING(p_pattern,(v_iter + 3),1) = '}') OR
               (SUBSTRING(p_pattern,(v_iter + 3),1) = ')') THEN
                SET v_count := SUBSTRING(p_pattern,(v_iter + 2),1);
                SET v_jump_characters := 4;
            ELSEIF (SUBSTRING(p_pattern,(v_iter + 4),1) = '}') OR
                   (SUBSTRING(p_pattern,(v_iter + 4),1) = ')')THEN
                SET v_count := SUBSTRING(p_pattern,(v_iter + 2),2);
                SET v_jump_characters := 5;
            ELSEIF (SUBSTRING(p_pattern,(v_iter + 5),1) = '}') OR
                   (SUBSTRING(p_pattern,(v_iter + 5),1) = ')')THEN
                SET v_count := SUBSTRING(p_pattern,(v_iter + 2),3);
                SET v_jump_characters := 6;
            ELSE
                SET v_count := 0;
                SET v_jump_characters := 3;
            END IF;
            -- if random count: make it random with a max of count
            IF (v_next_char = '(') THEN
                SET v_count := FLOOR((RAND() * v_count));
            END IF;
            -- repeat the characters
            WHILE v_count > 0 DO
                SET v_result := CONCAT(v_result,str_random_character(v_char));
                SET v_count := v_count - 1;
            END WHILE;
            SET v_iter := v_iter + v_jump_characters;
        -- check if there is a list in the pattern
        ELSEIF (v_char = '[') THEN
            -- get the list
            SET v_end_position := LOCATE(']', p_pattern, v_iter + 1);
            SET v_list := SUBSTRING(p_pattern,v_iter + 1,(v_end_position - v_iter) - 1);
            -- find out how many items are in the list, items are seperated by a pipe
            SET v_list_count := (LENGTH(v_list) - LENGTH(REPLACE(v_list, '|', '')) + 1);
            -- pick a random item from the list
            SET v_random_item := FLOOR(1 + (RAND() * v_list_count));
            -- add the item from the list
            SET v_result := CONCAT(v_result,
                                   REPLACE(SUBSTRING(SUBSTRING_INDEX(v_list, '|' ,v_random_item),
                                           CHAR_LENGTH(SUBSTRING_INDEX(v_list,'|', v_random_item -1)) + 1),
                                           '|', '')
                                  );
            SET v_iter := v_iter + CHAR_LENGTH(v_list) + 2;
        -- no directives: just get a random character
        ELSE
            SET v_result := CONCAT(v_result, str_random_character(v_char));
            SET v_iter := v_iter + 1;
        END IF;

   END WHILE;

   RETURN v_result;
END;
//
DELIMITER ;

SET SQL_MODE=@OLD_SQL_MODE;
SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='STRICT_TRANS_TABLES';

DELIMITER //
DROP FUNCTION IF EXISTS str_random_date;
//

CREATE FUNCTION str_random_date(p_date_start VARCHAR(20)
                               ,p_date_end VARCHAR(20)
                               ,p_format VARCHAR(20)
                                 )
    RETURNS VARCHAR(50) 
    NO SQL
    BEGIN
    /**
    * String function. Returns a random date string
    * <br>
    * %author Ronald Speelman
    * %version 1.0
    * Example usage:
    * SELECT str_random_date('1980-01-01','2012-01-01','%Y-%m-%d') AS MysqlDate;
    * See more complex examples and a description on www.moinne.com/blog/ronald
    *
    * %param p_date_start   string: the start date/ time
    * %param p_date_end     string: the end date/ time
    * %param p_format       string: the format of the returned date/time 
    * %return String
    */

    DECLARE v_format VARCHAR(20) DEFAULT '%Y-%m-%d';
    DECLARE v_rand_secs BIGINT DEFAULT 0;

    SET v_format := COALESCE(p_format, v_format);
    SET v_rand_secs  := FLOOR(0 + (RAND() * (86400 * (DATEDIFF(p_date_end , p_date_start)))));
    RETURN DATE_FORMAT(DATE_ADD(p_date_start , INTERVAL  v_rand_secs SECOND),v_format);
END;
//
DELIMITER ;

SET SQL_MODE=@OLD_SQL_MODE;

Mqtt-Based Data Transfer With Kafka Cloud

Through this article, I would like to list down the detailed steps for communicating from a device to a server in the cloud running a scalable Kafka infrastructure and back from Kafka to the device using Mosquitto broker and Kafka connect.

Architecture Diagram

Device to cloud connectivity using mqtt-mosquitto broker and kafka connect :

Step 1:

Download and install mosquito for windows from : https://mosquitto.org/download/

Step 2:

Navigate to D:\Program Files\mosquitto\mosquitto.conf and modify the following

# =================================================================

# Extra listeners

# =================================================================

# Listen on a port/ip address combination. By using this variable

# multiple times, mosquitto can listen on more than one port. If

# this variable is used and neither bind_address nor port given,

# Note that for a websockets listener it is not possible to bind to a host

# name.

# listener port-number [ip address/host name]

listener 1883 0.0.0.0

Step 3:

Run the command

D:\Program Files\mosquitto>mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic” -m “This request is coming from LOCALLAPTOP007 to Avengers”

D:\Program Files\mosquitto>mosquitto_sub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic”

Step 4:

Navigate to the url : https://howtoprogram.xyz/2016/04/30/getting-started-apache-kafka-0-9/ and install kafka on the server avengers@12.XX.XXX.XXX and test the producer consumer message interchange using the following commands

./kafka-topics.sh –list -zookeeper 12.XX.XXX.XXX:2181

bin/kafka-console-consumer.sh –bootstrap-server 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic

bin/kafka-console-producer.sh –broker-list 12.XX.XXX.XXX:9092 –topic HelloKafkaTopic

Step 5:

modify /etc/hosts and add the entries as below

10.XXX.XX.XX avengers.eastus.cloudapp.azure.com

#Above is the private ip of the cloud machine

12.XX.XXX.XXX avengers.eastus.cloudapp.azure.com

#Above is the public ip of the cloud machine

Step 6:

Install mosquito on the cloud (just for testing) using the instructions at the url : https://www.disk91.com/2016/technology/internet-of-things-technology/install-mosquitto-mqtt-server-on-centos-to-publish-iot-data/

yum install mosquitto

# service mosquitto start
# systemctl enable mosquitto

Step 7:

Check if port 1883 and 9092 are open in cloud for inbound traffic using

netstat | grep 1883

netstat | grep 9092

If the ports are not open , open them for inbound traffic in Azure also run the following commands on server to open them

firewall-cmd –zone=public –add-port=1883/tcp –permanent

firewall-cmd –reload

iptables-save | grep 1883

Step 8:

Navigate to /etc/mosquitto/mosquitto.conf and modify the following

# =================================================================

# Extra listeners

# =================================================================

# Listen on a port/ip address combination. By using this variable

# multiple times, mosquitto can listen on more than one port. If

# this variable is used and neither bind_address nor port given,

# then the default listener will not be started.

# The port number to listen on must be given. Optionally, an ip

# address or host name may be supplied as a second argument. In

# this case, mosquitto will attempt to bind the listener to that

# address and so restrict access to the associated network and

# interface. By default, mosquitto will listen on all interfaces.

# Note that for a websockets listener it is not possible to bind to a host

# name.

# listener port-number [ip address/host name]

listener 1883 0.0.0.0

Step 9:

Test publish and subscribe between the mosquito broker using the following commands

mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"

Step 10:

Navigate to https://howtoprogram.xyz/wp-content/uploads/2016/07/kafka-mqtt-bin.zip

Unzip and copy the following files

  • kafka-connect-mqtt-1.0-SNAPSHOT.jar
  • org.eclipse.paho.client.mqttv3-1.0.2.jar

to

/ opt/kafka_2.12-2.3.0/libs

Step 11:

Navigate to /opt/kafka_2.12-2.3.0/config and vi server.properties and just make the following change

advertised.listeners=PLAINTEXT://12.XX.XXX.XXX:9092
 
Navigate to /opt/kafka_2.12-2.3.0/config and vi connect-standalone.properties and just make the following change

bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092

Navigate to /opt/kafka_2.12-2.3.0/config and vi mqtt.properties and ensure the file has following entries

name=mqtt

connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector

tasks.max=1

kafka.topic=mqtt-mosquitto-topic-kafka

mqtt.client_id=mqtt-kafka-123456789

mqtt.clean_session=true

mqtt.connection_timeout=30

mqtt.keep_alive_interval=60

mqtt.server_uris=tcp://avengers.eastus.cloudapp.azure.com:1883

mqtt.topic=mqtt-mosquitto-topic

Step 12:

cd /opt/kafka_2.12-2.3.0

Run all the following on separate putty windows

./bin/zookeeper-server-start.sh config/zookeeper.properties &

./bin/kafka-server-start.sh config/server.properties &

./bin/connect-standalone.sh config/connect-standalone.properties config/mqtt.properties

bin/kafka-console-consumer.sh –bootstrap-server avengers.eastus.cloudapp.azure.com:9092 –topic mqtt-mosquitto-topic-kafka

Run the following first from within server itself and then from windows local laptop

mosquitto_pub -h avengers.eastus.cloudapp.azure.com -t “mqtt-mosquitto-topic” -m “This request is coming from LOCALLAPTOP007 to Avengers”

Ensure that the message sent from mosquito publisher on local laptop arrives in base64 encoded format on the server kafka consumer.

Cloud to device connectivity using mqtt-mosquitto broker and kafka connect :

Step 1:

Download the kafka stream connector library and copy the jar to /home/plugins folder :

wget https://github.com/Landoop/stream-reactor/releases/download/1.2.2/kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz

tar -xf kafka-connect-mqtt-1.2.2-2.1.0-all.tar.gz

cp kafka-connect-mqtt-1.2.2-2.1.0-all.jar /home/plugins

Step 2:

Create a new connect.properties file  under config folder as below:

[root@avengers config]# pwd

/opt/kafka_2.12-2.3.0/config

[root@avengers config]# cat connect.properties

# Kafka broker IP addresses to connect to

bootstrap.servers=avengers.eastus.cloudapp.azure.com:9092

# Path to directory containing the connector jar

plugin.path=/root/plugins

# Converters to use to convert keys and values

key.converter=org.apache.kafka.connect.json.JsonConverter

#key.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false

value.converter=org.apache.kafka.connect.json.JsonConverter

#value.converter=org.apache.kafka.connect.storage.StringConverter

value.converter.schemas.enable=false

# The internal converters Kafka Connect uses for storing offset and configuration data

internal.key.converter=org.apache.kafka.connect.json.JsonConverter

internal.value.converter=org.apache.kafka.connect.json.JsonConverter

#internal.key.converter=org.apache.kafka.connect.storage.StringConverter

#internal.value.converter=org.apache.kafka.connect.storage.StringConverter

internal.key.converter.schemas.enable=false

internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

[root@avengers config]#

Step 3:

Create a new kafka topic called mqtt-sink using the instructions below :

bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic mqtt-sink

bin/kafka-topics.sh –list -zookeeper localhost:2181

Step 4:

Create a new mqtt-sink.properties file  under config folder as below:

[root@avengers config]# pwd

/opt/kafka_2.12-2.3.0/config

[root@avengers config]# cat mqtt-sink.properties

name=mqtt-sink

connector.class=com.datamountaineer.streamreactor.connect.mqtt.sink.MqttSinkConnector

tasks.max=1

topics=mqtt-sink

connect.mqtt.hosts=tcp://avengers.eastus.cloudapp.azure.com:1883

connect.mqtt.clean=true

connect.mqtt.timeout=1000

connect.mqtt.keep.alive=1000

connect.mqtt.service.quality=1

connect.mqtt.kcql=INSERT INTO /lttsspskafka/test SELECT * FROM mqtt-sink

WITHCONVERTER=com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter

connect.progress.enabled=true

[root@avengers config]#

Step 5:

cd /opt/kafka_2.12-2.3.0

Run all the following on separate putty windows

./bin/zookeeper-server-start.sh config/zookeeper.properties &

./bin/kafka-server-start.sh config/server.properties &

./bin/connect-standalone.sh config/connect.properties config/mqtt-sink.properties

bin/kafka-console-producer.sh –broker-list avengers.eastus.cloudapp.azure.com:9092 -topic mqtt-sink

Sent a test json message like

{“id”:3,”temp”:21.9,”timestamp”:1530511201,”Note”:”This message going from LTTS Avengers server to the device LOCALLAPTOP007″}

Run the following first from within server itself and then from windows local laptop

mosquitto_sub -h avengers.eastus.cloudapp.azure.com  -t “/lttsspskafka/test” -q 1

Verify if the message sent from server is received by the laptop

References:

https://mosquitto.org/download/

https://mosquitto.org (test.mosquitto.org )