Featured

ML based Scalable Bioinformatics cronjob for Computational Drug Discovery

Objective :

The aim of this post is to create a scheduled job that can run on big data from the Chembl database on a nightly basis and generate useful reports and graphs that can be used by researchers for further detailed analysis and discovery of new drugs.

Architecture :

This job is based upon the https://docs.celeryproject.org/ framework , running on top of RabbitMQ for scalability. When triggered on a nightly basis it picks up new compounds or target proteins added into Chembl database and generates useful reports which are then uploaded to a s3 bucket for Clinicians and Researchers to pour over.

Architecture Diagram TBD

Code Structure :

Workflow file

Create a file called process.py with the following contents

import os

print('------------------------------Starting phase 1 execution ---------------------------------------')
os.system('python3 /root/biopython/dataprofessor-scripts/book1.py')
print('------------------------------Completed phase 1 execution ---------------------------------------')

print('------------------------------Starting phase 2 execution ---------------------------------------')
os.system('python3 /root/biopython/dataprofessor-scripts/book2.py')
print('------------------------------Completed phase 2 execution ---------------------------------------')

print('------------------------------Starting phase 3 execution ---------------------------------------')
os.system('python3 /root/biopython/dataprofessor-scripts/book3.py')
print('------------------------------Completed phase 3 execution ---------------------------------------')

print('------------------------------Starting phase 4 execution ---------------------------------------')
os.system('python3 /root/biopython/dataprofessor-scripts/book4.py')
print('------------------------------Completed phase 4 execution ---------------------------------------')

print('------------------------------Starting phase 5 execution ---------------------------------------')
os.system('python3 /root/biopython/dataprofessor-scripts/book5.py')
print('------------------------------Completed phase 5 execution ---------------------------------------')

Script to perform Data Collection and Pre-Processing from the ChEMBL Database

# Import necessary libraries
import pandas as pd
from chembl_webresource_client.new_client import new_client

# Target search for coronavirus
target = new_client.target
target_query = target.search('coronavirus')
targets = pd.DataFrame.from_dict(target_query)

print(f'targets : {targets}')


selected_target = targets.target_chembl_id[4]

print(f'selected_target : {selected_target}')


activity = new_client.activity
res = activity.filter(target_chembl_id=selected_target).filter(standard_type="IC50")
df = pd.DataFrame.from_dict(res)
df.head(3)
df.to_csv('bioactivity_data_raw.csv', index=False)

df2 = df[df.standard_value.notna()]

print(f'df2 : {df2}')


bioactivity_class = []
for i in df2.standard_value:
  if float(i) >= 10000:
    bioactivity_class.append("inactive")
  elif float(i) <= 1000:
    bioactivity_class.append("active")
  #else:
  #  bioactivity_class.append("intermediate")
  
selection = ['molecule_chembl_id','canonical_smiles','standard_value']
df3 = df2[selection]

print(f'df3 : {df3}')


bioactivity_class = pd.Series(bioactivity_class, name='bioactivity_class')
df4 = pd.concat([df3, bioactivity_class], axis=1)

print(f'df4 : {df4}')


df4.to_csv('bioactivity_data_preprocessed.csv', index=False)  

Script for performing Descriptor Calculation and Exploratory Data Analysis.

import pandas as pd
import os

df = pd.read_csv('bioactivity_data_preprocessed.csv')

import numpy as np
from rdkit import Chem
from rdkit.Chem import Descriptors, Lipinski

# Inspired by: https://codeocean.com/explore/capsules?query=tag:data-curation

def lipinski(smiles, verbose=False):

    moldata= []
    for elem in smiles:
        mol=Chem.MolFromSmiles(elem) 
        moldata.append(mol)
       
    baseData= np.arange(1,1)
    i=0  
    for mol in moldata:        
       
        desc_MolWt = Descriptors.MolWt(mol)
        desc_MolLogP = Descriptors.MolLogP(mol)
        desc_NumHDonors = Lipinski.NumHDonors(mol)
        desc_NumHAcceptors = Lipinski.NumHAcceptors(mol)
           
        row = np.array([desc_MolWt,
                        desc_MolLogP,
                        desc_NumHDonors,
                        desc_NumHAcceptors])   
    
        if(i==0):
            baseData=row
        else:
            baseData=np.vstack([baseData, row])
        i=i+1      
    
    columnNames=["MW","LogP","NumHDonors","NumHAcceptors"]   
    descriptors = pd.DataFrame(data=baseData,columns=columnNames)
    
    return descriptors
	
df_lipinski = lipinski(df.canonical_smiles)

print(f'df_lipinski : {df_lipinski}')

print(f'df : {df}')

df_combined = pd.concat([df,df_lipinski], axis=1)

print(f'df_combined : {df_combined}')

# https://github.com/chaninlab/estrogen-receptor-alpha-qsar/blob/master/02_ER_alpha_RO5.ipynb

import numpy as np

def pIC50(input):
    pIC50 = []

    for i in input['standard_value_norm']:
        molar = i*(10**-9) # Converts nM to M
        pIC50.append(-np.log10(molar))

    input['pIC50'] = pIC50
    x = input.drop('standard_value_norm', 1)
        
    return x

df_combined.standard_value.describe()

-np.log10( (10**-9)* 100000000 )

-np.log10( (10**-9)* 10000000000 )

def norm_value(input):
    norm = []

    for i in input['standard_value']:
        if i > 100000000:
          i = 100000000
        norm.append(i)

    input['standard_value_norm'] = norm
    x = input.drop('standard_value', 1)
        
    return x

df_norm = norm_value(df_combined)
print(df_norm)

df_norm.standard_value_norm.describe()

df_final = pIC50(df_norm)
print(f'df_final : {df_final}')

df_final.pIC50.describe()

df_2class = df_final[df_final.bioactivity_class != 'intermediate']
print(f'df_2class : {df_2class}')

import seaborn as sns
sns.set(style='ticks')
import matplotlib.pyplot as plt

plt.figure(figsize=(5.5, 5.5))

sns.countplot(x='bioactivity_class', data=df_2class, edgecolor='black')

plt.xlabel('Bioactivity class', fontsize=14, fontweight='bold')
plt.ylabel('Frequency', fontsize=14, fontweight='bold')

plt.savefig('plot_bioactivity_class.pdf')

plt.figure(figsize=(5.5, 5.5))

sns.scatterplot(x='MW', y='LogP', data=df_2class, hue='bioactivity_class', size='pIC50', edgecolor='black', alpha=0.7)

plt.xlabel('MW', fontsize=14, fontweight='bold')
plt.ylabel('LogP', fontsize=14, fontweight='bold')
plt.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0)
plt.savefig('plot_MW_vs_LogP.pdf')

plt.figure(figsize=(5.5, 5.5))

sns.boxplot(x = 'bioactivity_class', y = 'pIC50', data = df_2class)

plt.xlabel('Bioactivity class', fontsize=14, fontweight='bold')
plt.ylabel('pIC50 value', fontsize=14, fontweight='bold')

plt.savefig('plot_ic50.pdf')

def mannwhitney(descriptor, verbose=False):
  # https://machinelearningmastery.com/nonparametric-statistical-significance-tests-in-python/
  from numpy.random import seed
  from numpy.random import randn
  from scipy.stats import mannwhitneyu

# seed the random number generator
  seed(1)

# actives and inactives
  selection = [descriptor, 'bioactivity_class']
  df = df_2class[selection]
  active = df[df.bioactivity_class == 'active']
  active = active[descriptor]

  selection = [descriptor, 'bioactivity_class']
  df = df_2class[selection]
  inactive = df[df.bioactivity_class == 'inactive']
  inactive = inactive[descriptor]

# compare samples
  stat, p = mannwhitneyu(active, inactive)
  #print('Statistics=%.3f, p=%.3f' % (stat, p))

# interpret
  alpha = 0.05
  if p > alpha:
    interpretation = 'Same distribution (fail to reject H0)'
  else:
    interpretation = 'Different distribution (reject H0)'
  
  results = pd.DataFrame({'Descriptor':descriptor,
                          'Statistics':stat,
                          'p':p,
                          'alpha':alpha,
                          'Interpretation':interpretation}, index=[0])
  filename = 'mannwhitneyu_' + descriptor + '.csv'
  results.to_csv(filename)

  return results
  
print(f"mannwhitney('pIC50') : {mannwhitney('pIC50')}")

plt.figure(figsize=(5.5, 5.5))

sns.boxplot(x = 'bioactivity_class', y = 'MW', data = df_2class)

plt.xlabel('Bioactivity class', fontsize=14, fontweight='bold')
plt.ylabel('MW', fontsize=14, fontweight='bold')

plt.savefig('plot_MW.pdf')

print(f"mannwhitney('MW') : {mannwhitney('MW')}")


plt.figure(figsize=(5.5, 5.5))

sns.boxplot(x = 'bioactivity_class', y = 'LogP', data = df_2class)

plt.xlabel('Bioactivity class', fontsize=14, fontweight='bold')
plt.ylabel('LogP', fontsize=14, fontweight='bold')

plt.savefig('plot_LogP.pdf')


print(f"mannwhitney('LogP') : {mannwhitney('LogP')}")

plt.figure(figsize=(5.5, 5.5))

sns.boxplot(x = 'bioactivity_class', y = 'NumHDonors', data = df_2class)

plt.xlabel('Bioactivity class', fontsize=14, fontweight='bold')
plt.ylabel('NumHDonors', fontsize=14, fontweight='bold')

plt.savefig('plot_NumHDonors.pdf')


print(f"mannwhitney('NumHDonors') : {mannwhitney('NumHDonors')}")

plt.figure(figsize=(5.5, 5.5))

sns.boxplot(x = 'bioactivity_class', y = 'NumHAcceptors', data = df_2class)

plt.xlabel('Bioactivity class', fontsize=14, fontweight='bold')
plt.ylabel('NumHAcceptors', fontsize=14, fontweight='bold')

plt.savefig('plot_NumHAcceptors.pdf')


print(f"mannwhitney('NumHAcceptors') : {mannwhitney('NumHAcceptors')}")

os.system("zip -r results.zip . -i *.csv *.pdf")

Script for calculating molecular descriptors that are quantitative description of the compounds in the dataset and preparing dataset for subsequent model building

import pandas as pd
import os
import subprocess
import sys


os.system("wget https://github.com/dataprofessor/bioinformatics/raw/master/padel.zip")
os.system("wget https://github.com/dataprofessor/bioinformatics/raw/master/padel.sh")
os.system("unzip padel.zip")
os.system("wget https://raw.githubusercontent.com/dataprofessor/data/master/acetylcholinesterase_04_bioactivity_data_3class_pIC50.csv")

df3 = pd.read_csv('acetylcholinesterase_04_bioactivity_data_3class_pIC50.csv')
print(f'df3 : {df3}')

selection = ['canonical_smiles','molecule_chembl_id']
df3_selection = df3[selection]
df3_selection.to_csv('molecule.smi', sep='\t', index=False, header=False)

print("cat molecule.smi | head -5 : ")
cmd = 'cat molecule.smi'
cmdnext = 'head -5' 
p1 = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
p2 = subprocess.Popen(cmdnext,shell=True, stdin=p1.stdout, stdout=subprocess.PIPE)
output = p2.communicate()[0]
print(output)

print("cat molecule.smi | wc -l : ")
os.system("cat molecule.smi | wc -l")

print("cat padel.sh : ")
os.system("cat padel.sh")

print("bash padel.sh : ")
os.system("bash padel.sh")

print("ls -l : ")
os.system("ls -l")

df3_X = pd.read_csv('descriptors_output.csv')

print(f'df3_X : {df3_X}')

df3_X = df3_X.drop(columns=['Name'])

print(f'df3_X : {df3_X}')

df3_Y = df3['pIC50']
print(f'df3_Y : {df3_Y}')

dataset3 = pd.concat([df3_X,df3_Y], axis=1)
print(f'dataset3 : {dataset3}')

dataset3.to_csv('acetylcholinesterase_06_bioactivity_data_3class_pIC50_pubchem_fp.csv', index=False)

Script for building a regression model of acetylcholinesterase or Coronavirus (or any protein for that matter) inhibitors using the random forest algorithm.

import pandas as pd
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
import os

os.system("wget https://github.com/dataprofessor/data/raw/master/acetylcholinesterase_06_bioactivity_data_3class_pIC50_pubchem_fp.csv")

df = pd.read_csv('acetylcholinesterase_06_bioactivity_data_3class_pIC50_pubchem_fp.csv')

X = df.drop('pIC50', axis=1)
print(f'X : {X}')

Y = df.pIC50
print(f'Y : {Y}')

X.shape
print(f'X.shape : {X.shape}')

print(f'Y.shape : {Y.shape}')

from sklearn.feature_selection import VarianceThreshold
selection = VarianceThreshold(threshold=(.8 * (1 - .8)))    
X = selection.fit_transform(X)

print(f'X.shape : {X.shape}')

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2)

print(X_train.shape, Y_train.shape)

print(X_test.shape, Y_test.shape)

model = RandomForestRegressor(n_estimators=100)
model.fit(X_train, Y_train)
r2 = model.score(X_test, Y_test)
print(f'r2 : {r2}')

Y_pred = model.predict(X_test)
print(f'Y_pred : {Y_pred}')

import seaborn as sns
import matplotlib.pyplot as plt

sns.set(color_codes=True)
sns.set_style("white")

ax = sns.regplot(Y_test, Y_pred, scatter_kws={'alpha':0.4})
ax.set_xlabel('Experimental pIC50', fontsize='large', fontweight='bold')
ax.set_ylabel('Predicted pIC50', fontsize='large', fontweight='bold')
ax.set_xlim(0, 12)
ax.set_ylim(0, 12)
ax.figure.set_size_inches(5, 5)
plt.show
plt.savefig('pIC50_graph.pdf')

Script for comparing several ML algorithms for building regression models of acetylcholinesterase inhibitors.

import pandas as pd
import seaborn as sns
from sklearn.model_selection import train_test_split
import lazypredict
from lazypredict.Supervised import LazyRegressor

df = pd.read_csv('acetylcholinesterase_06_bioactivity_data_3class_pIC50_pubchem_fp.csv')

X = df.drop('pIC50', axis=1)
Y = df.pIC50

# Examine X dimension
print(f'X.shape {X.shape}')

# Remove low variance features
from sklearn.feature_selection import VarianceThreshold
selection = VarianceThreshold(threshold=(.8 * (1 - .8)))    
X = selection.fit_transform(X)
print(f'X.shape {X.shape}')

# Perform data splitting using 80/20 ratio
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

# Defines and builds the lazyclassifier
clf = LazyRegressor(verbose=0,ignore_warnings=True, custom_metric=None)
models_train,predictions_train = clf.fit(X_train, X_train, Y_train, Y_train)
models_test,predictions_test = clf.fit(X_train, X_test, Y_train, Y_test)

# Performance table of the training set (80% subset)
print(predictions_train)

# Performance table of the test set (20% subset)
print(predictions_test)

# Bar plot of R-squared values
import matplotlib.pyplot as plt
import seaborn as sns

#train["R-Squared"] = [0 if i < 0 else i for i in train.iloc[:,0] ]

plt.figure(figsize=(25, 30))
sns.set_theme(style="whitegrid")
ax = sns.barplot(y=predictions_train.index, x="R-Squared", data=predictions_train)
ax.set(xlim=(0, 1))
plt.savefig('rsquared.pdf')

# Bar plot of RMSE values
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(25, 30))
sns.set_theme(style="whitegrid")
ax = sns.barplot(y=predictions_train.index, x="RMSE", data=predictions_train)
ax.set(xlim=(0, 10))
plt.savefig('rsme.pdf')

# Bar plot of calculation time
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(25, 30))
sns.set_theme(style="whitegrid")
ax = sns.barplot(y=predictions_train.index, x="Time Taken", data=predictions_train)
ax.set(xlim=(0, 10))
plt.savefig('bar_calculation_time.pdf')

High Level code structure

Run the workflow with the command

~/biopython/dataprofessor-scripts/output# python3 ../process.py > output.log

Observed output :

Time taken to complete the script end to end : 11 mins

List of files generated as output :

Conclusion:

We havent concluded yet. This is just the beginning. The following tasks are pending

  1. Cleanup all scripts and make them object oriented
  2. Build the celery framework with RabbitMQ and trigger the tasks from the celery scheduler.
  3. Upload the pdf files and reports to s3 bucket in cloud.
Featured

Blockchain and IPFS based Mobile Health system

Here we shall be implementing a highly secure and high performance mHealth system as described in this research paper

https://www.ncbi.nlm.nih.gov/pmc/articles/PMC8073055/

Setting up an ipfs based file system

The InterPlanetary File System (IPFS) is a protocol and peer-to-peer network for storing and sharing data in a distributed file system. IPFS uses content-addressing to uniquely identify each file in a global namespace connecting all computing devices

Use the below provided docker-compose.yaml

version: '3'
services:
  ipfs:
    image: ipfs/go-ipfs:v0.8.0
    environment:
      - IPFS_PROFILE=server
      - IPFS_PATH=/ipfsdata
    volumes:
      - ./data/ipfs:/ipfsdata
    ports:
      - "4001:4001"
      - "127.0.0.1:8080:8080"
      - "127.0.0.1:8081:8081"
      - "127.0.0.1:5001:5001"

run the command

root@80:~/hello-eth/ipfs# docker-compose up

Create a python file with the below code to test our ipfs filesystem

import ipfshttpclient
client = ipfshttpclient.connect('/dns/0.0.0.0/tcp/5001/http')  # Connects to: /dns/localhost/tcp/5001/http
res = client.add('test.txt')
print(res)
print(res['Hash'])

Make sure you also create a test file say test.txt with a few lines of text to test the upload feature. Install ipfshttpclient and run

pip3 install ipfshttpclient==0.8.0a2
command-prompt# testipfs.py

You can also view the contents of the file locally using either

http://localhost:8080/ipfs/QmVyyENm5fk1NKw6MhAdb5d3AM4ecXkU22fdABBgdKkP7y/

where
QmVyyENm5fk1NKw6MhAdb5d3AM4ecXkU22fdABBgdKkP7y
is the hashcode returned by our testipfs python program

or using the webui

http://127.0.0.1:5001/webui

Basics of setting up and deploying a smart contract on Ethereum with Python, Truffle and web3py

Next we shall be setting up a smart contract on Ethereum blockchain to upload the contents of the text file to ipfs and store the hashcode in Truffle using python web3py library

Installing Truffle

$ mkdir hello-eth
$ cd hello-eth
npm install truffle
ro01180:~/hello-eth# ./node_modules/.bin/truffle init

Create a file called HelloWorld.sol inside the contracts/ folder with the following content

// mHealth Smart Contract 
pragma solidity >=0.5.0 <0.7.0;
contract HelloWorld {

    string public payload;   
                                                                                                                                                                                                                                                                                                                                function setPayload(string memory content) public {                                                                                                                            payload = content;                                                                                                                                                  }                                                                                                                                                                          

    function get()public pure returns (string memory){
        return 'Uploading File Contract';
    }
}

From the project root folder, use truffle to compile the contract:

ro180:~/hello-eth# ./node_modules/.bin/truffle compile

Deploying smart contract to the blockchain

To deploy our smart contract to the blockchain we first need:

  • a migration script
  • a blockchain to deploy the contract to

For the migration script, create a file named 2_deploy_contract.js inside migrations/ folder and add the following content to it:

var HelloWorld = artifacts.require("HelloWorld");

module.exports = function(deployer) {
	    deployer.deploy(HelloWorld);
};

spinup the personal blockchain of truffle to test. Open another terminal and run:

ro01180:~/hello-eth# ./node_modules/.bin/truffle develop

run migrate at the prompt

truffle(develop)> migrate

Python application to test the workflow by calling the deployed contract

Under the project root folder (hello-eth/) create a file named app.py with the following content

Make sure you replace the deployed_contract_adddress in the code with your custom contract address from local

import json
from web3 import Web3, HTTPProvider
import ipfshttpclient

# truffle development blockchain address
blockchain_address = 'http://127.0.0.1:9545'
# Client instance to interact with the blockchain
web3 = Web3(HTTPProvider(blockchain_address))
# Set the default account (so we don't need to set the "from" for every transaction call)
web3.eth.defaultAccount = web3.eth.accounts[0]

# Path to the compiled contract JSON file
compiled_contract_path = 'build/contracts/HelloWorld.json'
# Deployed contract address (see `migrate` command output: `contract address`)
deployed_contract_address = '0x1D1DC7C89dABDC0C415ddd2FeA98943284A23b7C'

with open(compiled_contract_path) as file:
    contract_json = json.load(file)  # load contract info as JSON
    contract_abi = contract_json['abi']  # fetch contract's abi - necessary to call its functions

# Fetch deployed contract reference
contract = web3.eth.contract(address=deployed_contract_address, abi=contract_abi)

# Call contract function (this is not persisted to the blockchain)
message = contract.functions.get().call()

print(message)

client = ipfshttpclient.connect('/dns/0.0.0.0/tcp/5001/http')  # Connects to: /dns/localhost/tcp/5001/http
res = client.add('test.txt')
print(res['Hash'])

# executes setPayload function
tx_hash = contract.functions.setPayload(res['Hash']).transact()
# waits for the specified transaction (tx_hash) to be confirmed
# (included in a mined block)
tx_receipt = web3.eth.waitForTransactionReceipt(tx_hash)
print('tx_hash: {}'.format(tx_hash.hex()))

We can verify that the contract was properly created by fetching the contract instance and checking for setPayload method existence, as well as the value of payload variable:

$ truffle(develop)> let instance = await HelloWorld.deployed()
$ truffle(develop)> instance.setPayload
$ truffle(develop)> instance.payload.call()

Executing a transaction on blockchain

We now execute a transaction that uploads our file to IPFS and gets back a hashcode which is then stored in our payload variable. For this we open a new terminal and execute our python script app.py

pip3 install web3
python3 app.py

Inorder to verify that the data was persisted on the blockchain go back to the Truffle console and call payload variable:

You then inspect the blockchain in detail

Finally you check if the file was uploaded on IPFS using the hashcode output by app.py

Conclusion

We are not done yet. Just scratched the surface. Our final objective will be implement the full workflow detailed in the paper for which we shall be modifying our HelloWorld.sol and app.py as per the directions in the diagram below.

We shall be using https://remix.ethereum.org/ ide for faster development and a cool ui library called https://streamlit.io/ for building some basic UI.

more on that when I find time over weekends.

Stay Tuned !

Until then Feed on IoT, Breathe in AI, Drink to Blockchain and keep Living Digital !!

Featured

Serverless Sonarqube using AWS ECS Fargate

Sonarqube is a popular tool used to derive code quality metrics like Code Coverage, Code Duplication, Code Cyclomatic complexity and Method Cohesion.

The most typical way of using Sonarqube is to install it on an on premise server or an EC2 instance in cloud and ensure to keep it running. The alternative to installing Sonarqube on server is to use Sonarqube service which has its own cost implication. Through this blog I would like to present a cheaper and serverless way of installing Sonarqube for large development projects.

We shall be running Sonarqube as a docker inside the aws ECS (Elastic Container Service) which will be connecting to RDS database instance to store metrics.

Step 1 : Setting up the database

Navigate to the aws RDS service on aws console :

https://xx-xxxx-x.console.aws.amazon.com/rds/home?region=us-west-2#launch-dbinstance:gdb=false;s3-import=false

  • Click on create Database
  • click on MySQL
  • Under engine option select MySQL
  • Under settings section provide DB instance identifier name as sonar
  • Type master password as yourpassword123 and confirm the same password in the next field.
  • DB instance class select Burstable classes
  • Storage select allocated storage 100
  • Maximum storage threshold 500
  • Click on create database.
  • While entering password don’t use special characters (! @#$%).

Step 2 : Creating the docker container from Sonarqube

We shall be using an EC2 instance just for the purpose of creating and packing the docker. It can be recycled after our serverless Sonarqube is up and running.

  • login to the AWS dashboard
  • click on services
  • select EC2 instance
  • click on launch instance button
  • select Ubuntu Server 18.04 LTS (64-bit) AMI.
  • Select the instance type as t2. Medium
  • Configure Instance : No changes , click Next
  • Add storage: choose 30 GB size for storage.
  • Add tag , any relevant name of your choice.
  • Configure Security Group, select all traffic anywhere, enable port 9000.
  • Click on review and launch instance. EC2 instance will be created.
  • Install MySQL CLI to interact with database.
$ sudo apt-get install mysql-client-core-5.7
$ mysql -h Db url -u admin -p
  Enter password: [mysql password]
    CREATE DATABASE sonar CHARACTER SET utf8 COLLATE utf8_general_ci;
    CREATE USER 'sonar' IDENTIFIED BY 'sonar';
    GRANT ALL ON sonar.* TO 'sonar'@'%' IDENTIFIED BY 'sonar';
    GRANT ALL ON sonar.* TO 'sonar'@'localhost' IDENTIFIED BY 'sonar';
    FLUSH PRIVILEGES;
  • Install docker on the EC2 instance.
$   curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add –

Add the Docker repository to APT sources:

$ sudo add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"

update the package database with the Docker packages from the newly added repo:

$ sudo apt-get update
$ apt-cache policy docker-ce

 Install docker:

$ sudo apt-get install -y docker-ce

Docker is installed, the daemon started, and the process enabled to start on boot

Ensure that it’s running:

$ sudo systemctl status docker
  • Create the Dockerfile for the SonarQube  installation and configure the SonarQube with RDS MySQL. Run the Dockerfile using the below command.
$ docker   build -t   sonarqube [imagename]

Dockerfile

FROM openjdk:11-jre-slim

RUN apt-get update \
    && apt-get install -y curl gnupg2 unzip \
    && rm -rf /var/lib/apt/lists/*

ENV SONAR_VERSION=7.5 \
    SONARQUBE_HOME=/opt/sonarqube \
    SONARQUBE_JDBC_USERNAME=admin \
    SONARQUBE_JDBC_PASSWORD=MyPassword123 \
    SONARQUBE_JDBC_URL="jdbc:mysql://sonar.cxxxxxxxxxx.us-west-2.rds.amazonaws.com:3306/sonar?characterEncoding=utf8&useUnicode=true&rewriteBatchedStatements=true&useSSL=false"

# Http port
EXPOSE 9000

RUN groupadd -r sonarqube && useradd -r -g sonarqube sonarqube

# pub   2048R/D26468DE 2015-05-25
#       Key fingerprint = F118 2E81 C792 9289 21DB  CAB4 CFCA 4A29 D264 68DE
# uid                  sonarsource_deployer (Sonarsource Deployer) <infra@sonarsource.com>
# sub   2048R/06855C1D 2015-05-25
RUN for server in $(shuf -e ha.pool.sks-keyservers.net \
                            hkp://p80.pool.sks-keyservers.net:80 \
                            keyserver.ubuntu.com \
                            hkp://keyserver.ubuntu.com:80 \
                            pgp.mit.edu) ; do \
        gpg --batch --keyserver "$server" --recv-keys F1182E81C792928921DBCAB4CFCA4A29D26468DE && break || : ; \
    done

RUN set -x \
    && cd /opt \
    && curl -o sonarqube.zip -fSL https://binaries.sonarsource.com/Distribution/sonarqube/sonarqube-$SONAR_VERSION.zip \
    && curl -o sonarqube.zip.asc -fSL https://binaries.sonarsource.com/Distribution/sonarqube/sonarqube-$SONAR_VERSION.zip.asc \
    && gpg --batch --verify sonarqube.zip.asc sonarqube.zip \
    && unzip -q sonarqube.zip \
    && mv sonarqube-$SONAR_VERSION sonarqube \
    && chown -R sonarqube:sonarqube sonarqube \
    && rm sonarqube.zip* \
#   && rm -rf $SONARQUBE_HOME/bin/*
	
	# create copies or delete directories allowed to be mounted as volumes, original directories will be recreated below as symlinks
    && rm --recursive --force "$SONARQUBE_HOME/bin"/* \
    && mv "$SONARQUBE_HOME/conf" "$SONARQUBE_HOME/conf_save" \
    && mv "$SONARQUBE_HOME/extensions" "$SONARQUBE_HOME/extensions_save" \
    && rm --recursive --force "$SONARQUBE_HOME/logs" \
    && rm --recursive --force "$SONARQUBE_HOME/data" \

# create directories to be declared as volumes
# copy into them to ensure they are initialized by 'docker run' when new volume is created
# 'docker run' initialization will not work if volume is bound to the host's filesystem or when volume already exists
# initialization is implemented in 'run.sh' for these cases
    && mkdir --parents "$SONARQUBE_HOME/conf" \
    && mkdir --parents "$SONARQUBE_HOME/extensions" \
    && mkdir --parents "$SONARQUBE_HOME/logs" \
    && mkdir --parents "$SONARQUBE_HOME/data" \
    && cp --recursive "$SONARQUBE_HOME/conf_save"/* "$SONARQUBE_HOME/conf/" \
    && cp --recursive "$SONARQUBE_HOME/extensions_save"/* "$SONARQUBE_HOME/extensions/" \
# create symlinks to volume directories
    && ln -s "$SONARQUBE_HOME/conf" "$SONARQUBE_HOME/conf" \
    && ln -s "$SONARQUBE_HOME/extensions" "$SONARQUBE_HOME/extensions" \
    && ln -s "$SONARQUBE_HOME/logs" "$SONARQUBE_HOME/logs" \
    && ln -s "$SONARQUBE_HOME/data" "$SONARQUBE_HOME/data" \
    && chown --recursive sonarqube:sonarqube "$SONARQUBE_HOME"

VOLUME "$SONARQUBE_HOME/data"
VOLUME "$SONARQUBE_HOME/conf"
VOLUME "$SONARQUBE_HOME/logs"
VOLUME "$SONARQUBE_HOME/extensions"

COPY --chown=sonarqube:sonarqube run.sh "$SONARQUBE_HOME/bin/"

USER sonarqube
WORKDIR $SONARQUBE_HOME
RUN ["chmod", "+x", "./bin/run.sh"]
ENTRYPOINT ["./bin/run.sh"]
$ docker run -d --name sonarqube[container name] \
  -p 9000:9000 \
  -e sonar.jdbc.username=sonar \
  -e sonar.jdbc.password=sonar \
  -v sonarqube_conf:/opt/sonarqube/conf \
  -v sonarqube_extensions:/opt/sonarqube/extensions \
  -v sonarqube_logs:/opt/sonarqube/logs \
  -v sonarqube_data:/opt/sonarqube/data \  sonarqube
  • The docker container will now be created and can be verified as below
$ docker ps

Open a browser pointing to http://y.x.w.z:9000             

SonarQube dashboard will launch locally. Where y.x.w.z is the public ip address of the EC2 instance on which the docker container is running.

Step 3 : Creating Repository in AWS ECR to push the docker image created above

On the same EC2 instance referred to above, install aws cli

$ sudo apt-get install awscli
$ aws –version

Authenticate into aws using

aws configure
aws access key: **********
secret key: *********

Login to the AWS elastic container registry through aws cli:

$ aws ecr get-login -- region [your-region]> text.txt

Authenticate docker to AWS Elastic container registry.

$ docker login –u AWS https://aws_account_id.dkr.ecr.eu-west3.amazonaws.com
Password: *****[AWS account password]

Create a repository in AWS Elastic container registry (ECR)

$ aws ecr create-repository --repository-name sonarqube

List the images stored into docker and tag them:

$docker images
$docker tag {base imagename} aws_account_id.dkr.ecr.eu-west-amazonaws.com/imagename[sonarqube]:tag[latest]

Push the image into ECR

$ docker push aws_account_id.dkr.ecr.us-east-1.amazonaws.com/imagename[sonarqube]:tag name[latest]

Step 4 : Creating Cluster in ECS

  • Open the Amazon ECS console at https://console.aws.amazon.com/ecs/
  • From the navigation bar, select the Region to use.
  • In the navigation pane, choose Clusters.
  • On the Clusters page, choose Create Cluster with the configurations.
  • For Select cluster compatibility, choose Networking only, then choose Next          step.
  • On the Configure cluster page, enter a Cluster name.
  • In the Networking section, configure the VPC for your cluster. You can retain the default settings, or you can modify these settings with the following steps.

a). If you choose to create a new VPC, for CIDR Block, select a CIDR block for your VPC.

b). For Subnets, select the subnets to use for your VPC.

  • In the CloudWatch Container Insights section, choose whether to enable Container Insights for the cluster
  • Choose Create.

Step 5 : Creating Task Definition

  • Open the Amazon ECS console at https://console.aws.amazon.com/ecs/ 
  • In the navigation pane, choose Task DefinitionsCreate new Task Definition
  • On the Select compatibilities page, select the launch type that your task should use and choose Next step.
  • Choose the Fargate launch type and configure it.
  • For Task Definition Name, type a name for your task definition.
  • For Task Role, choose an IAM role that provides permissions for containers in your task to make calls to AWS API operations 
  • For Task execution IAM role, either select your task execution role or choose Create new role so that the console can create one for you.
  • For Task size, choose a value for Task memory (GB) and Task CPU (vCPU). The table below shows the valid combinations.
  • Add container details.
  • Choose Create.

Step 6 : Creating Service

Service runs the task definition.

Step 7 : Accessing the sonarqube dashboard from the ECS cluster

  • Navigate to cluster
  • click on tasks
  • click on container
  • control flows to next page where underneath the network section you find the Public Ip address.
  • Open any browser and type Public ipaddress:9000 ,sonarqube dashboard will launch

Destination reached

Sonarqube Dashboard

Reference :

https://www.systems-plus.com/deploying-docker-containers-in-amazon-ecs-using-fargate/
Featured

End-to-End devops pipeline using aws services

The objective of this blog post is to

  1. Demonstrate the construction of an end to end devops pipeline using aws code pipeline service
  2. To demo the triggering of a build and deployment without human intervention

Why aws code pipeline ?

  1. It helps you leverage your existing investment in aws services.
  2. It makes the devops pipeline highly scalable as opposed to setting up Jenkins on ec2 instances and scaling them manually or autoscaling as the number of build jobs grow.
  3. Security comes out of box vs using an opensource tool like Jenkins
  4. Comes with the inherent advantage of Serverless environment compared to managing the Jenkins server and underlying infrastructure.

We shall be using Atlassian bitbucket as the source code control system for this example

Java will be the language in which all code is assumed to be written.Although steps are almost similar for Python or any other language.

Prerequisites for creating the pipeline

  1. Bitbucket repository with the artifacts as described below.
  2. Default S3 Bucket for CodePipeline (1 Bucket for all pipelines in region)
  3.  S3 Bucket for Artifacts.
  4. IAM roles with policy for Stack Creation.

The bitbucket repo should have the following directory structure:

Projectfolder (folder)

            .git (file)

            Module (folder)

                        pom.xml (file)

                        env-buildspec.yml (where env like dev,qa,prod) (file)

                        Submodule (folder)

                                    pom.xml (file)

                                    env-submodule-cf.template (cloudformation template) (file)

Sample env-buildspec.yml and env-submodule-serverless.template files have been provided below. Lets call them dev-buildspec.yml and dev-submodule-cf.template

version: 0.4
phases:
  install:
    runtime-versions:
       java: openjdk8
  build:
    commands:
      - CurrEPOCHTime=$(date +%F_%T) 
      - cd $CODEBUILD_SRC_DIR
      - ls
      - cd $CODEBUILD_SRC_DIR/module/
      - ls
      - echo Build started on `date`
      - mvn clean install
      #- echo override parameter for submodule
      - cd $CODEBUILD_SRC_DIR/module/submodule/target/
      - mv submodule-1.0.0-SNAPSHOT.jar submodule-1.0.0-SNAPSHOT-$CurrEPOCHTime.jar
      - configData='{"Parameters":{"CodeBucketName":"module-mvp-artifacts","CodeObjectName":"submodule-1.0.0-SNAPSHOT-'$CurrEPOCHTime'.jar"}}'
      - echo $configData
      - echo $configData > device-parameter_config.json
      

 
reports: #New
  SurefireReports: # CodeBuild will create a report group called "SurefireReports".
     files: #Store all of the files
      - submodule/target/surefire-reports/*
     base-directory: ' $CODEBUILD_SRC_DIR/module/'
artifacts:
  files:
      - 'submodule/target/submodule*'
      - 'submodule/env-submodule-serverless*'
  base-directory : '$CODEBUILD_SRC_DIR/module/'
  discard-paths: yes
  name: $(date +%Y-%m-%d)

Cloudformation template for submodule. (This is just a sample and yours could be totally different. Just want to showcase the use of variables and other configuration parameters that are used in buildspec.yml)

{
	"AWSTemplateFormatVersion": "2010-09-09",
	"Transform": [
		"AWS::Serverless-2016-10-31"
	],
	"Description": "Submodule Template. Stack-Name : Env-Module-Submodule",
	"Globals": {
		"Function": {
			"Runtime": "java8",
			"CodeUri": { "Bucket": {"Ref":"CodeBucketName"}, "Key": {"Ref":"CodeObjectName"} },
            "Timeout": 300,
			"Environment": {
				"Variables": {
					"REGION": {
						"Ref": "AWS::Region"
					},
					"ACCESS_KEY_ID": "XXXXXXXXXXXXXXXXXXXX",
					"SECRET_ACCESS_KEY": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
					"ACCOUNT_ID": {
						"Ref": "AWS::AccountId"
					},
					"CLIENT_END_POINT": "xxxxxxxxxxxxx.amazonaws.com",
					"CORS_URL" : "*",
                    "STCT_TR_SE_VAL" : "max-age=31536000,includeSubdomains,preload"
				}
			},
			"Tags": {
				"Application": {
					"Ref": "Application"
				},
				"Environment": {
					"Ref": "Environment"
				},
				"Owner": {
					"Ref": "Owner"
				},
				"SubDivision": {
					"Ref": "SubDivision"
				}
			}
		},
		"Api": {
			"Auth": {
				"ResourcePolicy":{
                    "CustomStatements": [{
                      	"Effect": "Allow",
                      	"Principal": "*",
                      	"Action": "execute-api:Invoke",
                      	"Resource": "execute-api:/*"
                    }]
                },
				"DefaultAuthorizer": "LambdaCustomAuthorizer",
				"Authorizers": {
					"LambdaCustomAuthorizer": {
						"FunctionArn": {
							"Fn::Sub": "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:sciex_customized_auth_lambda"
						},
						"Identity": {
							"Header": "AccessToken",
							"ReauthorizeEvery": 0
						}
					}
				},
				"AddDefaultAuthorizerToCorsPreflight": false
			}
		}
	},
	"Parameters": {
		"CodeBucketName" : {
			"Description": "Code Bucket Name",
			"Type": "String",
			"Default": "NA"
		},
		"CodeObjectName" : {
			"Description": "Code Object Name",
			"Type": "String",
			"Default": "NA"
		},
		"Application": {
			"Description": "Application Name",
			"Type": "String",
			"Default": "StatusScope",
			"AllowedPattern": "^[a-zA-Z0-9 ]*$",
			"ConstraintDescription": "Malformed input-Parameter MyParameter must only contain uppercase and lowercase letters and numbers"
		},
		"Environment": {
			"Description": "Environment information",
			"Type": "String",
			"Default": "Dev",
			"AllowedValues": [
				"Dev",
				"Prod",
				"Stag",
				"Test"
			]
		},
		"Owner": {
			"Description": "Owner Name",
			"Type": "String",
			"Default": "Raghu",
			"AllowedPattern": "^[a-zA-Z0-9 ]*$",
			"ConstraintDescription": "Malformed input-Parameter MyParameter must only contain uppercase and lowercase letters and numbers"
		},
		"SubDivision": {
			"Description": "SubDivision Name",
			"Type": "String",
			"AllowedPattern": "^[a-zA-Z0-9 ]*$",
			"Default": "Device Provisioning Module",
			"ConstraintDescription": "Malformed input-Parameter MyParameter must only contain uppercase and lowercase letters and numbers"
		},
		"AWSCertificates" : {
            "Type" : "String",
            "Default" : "arn:aws:acm:eu-east-1:9999999999999999:certificate/xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxx"
		},
		"ModuleURL" : {
            "Type" : "String",
            "Default" : "env.module.submodule.sitedomain.com"
		},
		"Route53HostedZoneId" : {
            "Type" : "String",
            "Default" : "XXXXXXXXXXXXXXXXXXXXXX"
        }
	},
	"Resources": {
		"GetFeatureDetails": {
			"Type": "AWS::Serverless::Function",
			"Properties": {
				"Role": null,
				"MemorySize": 512,
				"Description": null,
				"Policies": [
					"AmazonDynamoDBFullAccess",
					"AmazonS3FullAccess"
				],
				"Events": {
					"PostResource": {
						"Type": "Api",
						"Properties": {
							"Path": "/v2/submodule/feature/{paramNo}",
							"Method": "get",
							"RestApiId": {
								"Ref": "SubmoduleAPIGateway"
							},
							"Auth": {
								"Authorizer": "SubmoduleAuthorizer"
							}
						}
					}
				},
				"Timeout": 300,
				"Handler": "com.domain.submodule.function.GetFeatureDetails",
				"Environment": {
					"Variables": {
						"FEATURE_TABLE_NAME": "feature_information"
					}
				}
			}
		},
		"SubmoduleTopicRulePermission": {
			"Type": "AWS::Lambda::Permission",
			"Properties": {
				"Action": "lambda:InvokeFunction",
				"FunctionName": {
					"Fn::Sub": "${FeatureStatusUpdateAction.Arn}"
				},
				"Principal": "iot.amazonaws.com",
				"SourceAccount": {
					"Ref": "AWS::AccountId"
				},
				"SourceArn": {
					"Fn::Sub": "${FeatureStatusUpdateTopicRule.Arn}"
				}
			}
		},
		"SubmoduleAPIGateway": {
			"Type": "AWS::Serverless::Api",
			"Properties": {
				"StageName": {
					"Ref": "Environment"
				},
				"Cors": {
					"AllowMethods": "'POST,GET,DELETE,PUT,OPTIONS'",
					"AllowHeaders": "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with,AccessToken'",
					"AllowOrigin": "'*'"
				},
				"GatewayResponses": {
					"BAD_REQUEST_BODY": {
						"StatusCode": "400",
						"ResponseParameters": {
							"Headers": {
								"Access-Control-Allow-Methods": "'POST,GET,DELETE,PUT,OPTIONS'",
								"X-Requested-With": "'*'",
								"Access-Control-Allow-Headers": "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with,AccessToken'",
								"Access-Control-Allow-Origin": "'*'"
							}
						}
					},
					"WAF_FILTERED": {
						"StatusCode": "403",
						"ResponseParameters": {
							"Headers": {
								"Access-Control-Allow-Methods": "'POST,GET,DELETE,PUT,OPTIONS'",
								"X-Requested-With": "'*'",
								"Access-Control-Allow-Headers": "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with,AccessToken'",
								"Access-Control-Allow-Origin": "'*'"
							}
						}
					},
					"EXPIRED_TOKEN": {
						"StatusCode": "503",
						"ResponseParameters": {
							"Headers": {
								"Access-Control-Allow-Methods": "'POST,GET,DELETE,PUT,OPTIONS'",
								"X-Requested-With": "'*'",
								"Access-Control-Allow-Headers": "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with,AccessToken'",
								"Access-Control-Allow-Origin": "'*'"
							}
						}
					},
					"AUTHORIZER_CONFIGURATION_ERROR": {
						"StatusCode": "500",
						"ResponseParameters": {
							"Headers": {
								"Access-Control-Allow-Methods": "'POST,GET,DELETE,PUT,OPTIONS'",
								"X-Requested-With": "'*'",
								"Access-Control-Allow-Headers": "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with,AccessToken'",
								"Access-Control-Allow-Origin": "'*'"
							}
						}
					},
					"UNAUTHORIZED": {
						"StatusCode": "401",
						"ResponseParameters": {
							"Headers": {
								"Access-Control-Allow-Methods": "'POST,GET,DELETE,PUT,OPTIONS'",
								"X-Requested-With": "'*'",
								"Access-Control-Allow-Headers": "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with,AccessToken'",
								"Access-Control-Allow-Origin": "'*'"
							}
						}
					},
					"DEFAULT_5XX": {
						"ResponseParameters": {
							"Headers": {
								"Access-Control-Allow-Methods": "'POST,GET,DELETE,PUT,OPTIONS'",
								"X-Requested-With": "'*'",
								"Access-Control-Allow-Headers": "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with,AccessToken'",
								"Access-Control-Allow-Origin": "'*'"
							}
						}
					},
					"DEFAULT_4XX": {
						"ResponseParameters": {
							"Headers": {
								"Access-Control-Allow-Methods": "'POST,GET,DELETE,PUT,OPTIONS'",
								"X-Requested-With": "'*'",
								"Access-Control-Allow-Headers": "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,x-requested-with,AccessToken'",
								"Access-Control-Allow-Origin": "'*'"
							}
						}
					}
				},
				"Tags": {
					"Application": {
						"Ref": "Application"
					},
					"Environment": {
						"Ref": "Environment"
					},
					"Owner": {
						"Ref": "Owner"
					},
					"SubDivision": {
						"Ref": "SubDivision"
					}
				},
				"Domain" : {
					"DomainName": { "Ref" : "ModuleURL" },
					"CertificateArn": { "Ref" : "AWSCertificates" },
					"EndpointConfiguration": "EDGE",
					"Route53": {
						"HostedZoneId": { "Ref" : "Route53HostedZoneId" }
					},
					"BasePath": ["/"]
				}
			}
		}
	}
}

Step 1 : Login to AWS Console

Select CodePipeline service from Developer Tools

Click on Createpipeline.

Add pipeline name and click next

Step 2 :  Bitbucket Connectivity

  Select source code provider (Bitbucket)

Click on Connect to Bitbucket Cloud

  • Popup window appears to connect to bitbucket, provide any connection name say ModuleAlphaConnection.
  • For first time connection, click on Install a new App, this will take you to BitBucket login page.
  • If not click on search bar to view previous connections.

  • On the app installation page, a message shows that the AWS CodeStar app is trying to connect to your Bitbucket account. Choose Grant access.
  • The connection ID for your new installation is displayed. Choose Complete connection.
  • In Repository name, choose the name of your third-party repository. In Branch name, choose the branch where you want your pipeline to detect source changes.
  • In Output artifact format, you must choose the format for your artifacts.
  • To store output artifacts from the Bitbucket action using the default method, choose CodePipeline default. The action accesses the files from the Bitbucket repository and stores the artifacts in a ZIP file in the pipeline artifact store.
  • Once logged in, select Repository and branch for which source code to build and click next

Step 3 : Add Build Stage

       Next step is to add Build stage, select CodeBuild and click next.

If Codebuild is already present, click on search to select. Or click on Create project

A popup will appear, enter project name to create build project.

Step 4 :      Build Stage Configuration

       Select operating system Ubuntu

Once you select ubuntu, follow below screenshot to select Image and Image version.

If buildspec.yml is present in Repository, then select use a buildspec file or select Insert build commands. And give path of buildapec file (Optional).

Next, click on Continue to codepipeline.

Once CodeBuild is created it shows successful message, then click next.

Click on Skip deploy stage.

Step 5 : Configuration Overview

   Review all the configuration, then click on Create Pipeline.

Once you create, pipeline will trigger automatically. Pipeline should look like below.

Add Artifacts

  • Before Adding S3 bucket as Artifacts, create a bucket in S3
  • In pipeline click on Add stage after the Build stage

Enter the stage name as Artifacts, Click on Add stage.

Click on Add action group, to add S3 bucket for Artifacts.

Select Input artifacts as BuildArtifacts.

Create IAM role for Deployment

AlphaSQSandIot
{
    "Version": "2012-10-17",
    "Statement": [ {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "iot:ReplaceTopicRule",
                "iot:DeleteTopicRule",
                "iot:DisableTopicRule",
                "iot:GetTopicRule",
                "iot:EnableTopicRule",
                "sqs:*",
                "iot:CreateTopicRule"
            ],
            "Resource": "*"
        } ]
}
AlphaApiGatewayAndOthersAccess
{
    "Version": "2012-10-17",
    "Statement": [ {
            "Sid": "AllowPublicHostedZonePermissions",
            "Effect": "Allow",
            "Action": [
                "route53:GetHostedZone",
                "route53:ListHostedZones",
                "route53:ChangeResourceRecordSets",
                "route53:ListResourceRecordSets",
                "route53:GetChange"
            ],
            "Resource": "*"
        }  ]
}
AlphaRoute53Access
 {
    "Version": "2012-10-17",
    "Statement": [ {
            "Effect": "Allow",
            "Action": [
                "acm:ListCertificates",
                "cloudfront:*"
            ],
            "Resource": "*"
        }  ]
}
Lambda@EdgePolicy
{
    "Version": "2012-10-17",
    "Statement": [ {
  "Effect": "Allow",
		"Action": "iam:CreateServiceLinkedRole",
		"Resource": "arn:aws:iam::*:role/aws-service-role/*"        } ]
}

Step 6 : Add a section for each submodule

Deploy Stage  

Click on Action

  • Save the Pipeline & Click on the Edit Pipeline and Add the Deployment Stage
  • Click on Add Action Group

Note: Repeat the above deploy configuration for all sub modules

Stacks Created

Step 7 : Pipeline in action

Now go ahead and make source code changes in bit bucket and check them into the branch that has been configured in the pipeline

git add mysource.java
git commit -m "file modified to trigger pipeline"
git push origin branch_name

As soon as the code is checked in the pipeline will be triggered and the progress can be monitored through the aws console.

The pipeline can also be manually triggered by clicking on the release change button.

You are now all set to use postman or appropriate rest client to test your rest api for which you made the above code change.

Featured

Load testing Python Flask based REST api server from python REST clients – AWS Autoscaling use case

Objective :

  1. Create a get, put and post api using python based flask framework
  2. Invoke the api using a python based rest client
  3. Set up an AWS load balancer in front of the server
  4. Launch a swarm of 1000+ clients to invoke the rest api from a python client using aws EC2 Auto Scaling feature
  5. Both the server and client code will be uploaded to a s3 bucket from where it will downloaded at run-time when new clients or servers are launched and run. It will save us the arduous task of updating all 1000+ clients/servers either when the codebase has changed or we scale from 1000 to 5000 clients.

High level steps on the server side

  1. Setup an aws Network Load Balancer
  2. Create a Target group attached to the load balancer
  3. Create a new public certificate using aws certificate manager.
  4. Create an EC2 instance
  5. Install all required software.
sudo apt install python3-pip

pip3 install flask-restful
  1. Generate a certificate and private key
openssl req -x509 -newkey rsa:4096 -nodes -out cert.pem -keyout key.pem -days 365

make sure you specify the common name as the load balancer DNS Name or the Route 53 mapping corresponding to that name

  1. Launch the server application. (Code for the server app.py is provided at the end of this section)
sudo python3 app.py

and test it from another standalone client to verify it works. Two ways of testing from client are listed below.

  • The curl based method which can be used for quick checking and
  • The python client based which will be later used when we autoscale the aws EC2 instances.(Code for the client app.py is provided at the end of this section)
echo quit | openssl s_client -showcerts -servername ALB-xxxxxxxxxxx.elb.eu-east-x.amazonaws.com -connect ALB-xxxxxxxxxxx.elb.eu-east-x.amazonaws.com:443 > cacert.pem

curl --cacert cacert.pem https://ALB-xxxxxxxxxxx.elb.eu-east-x.amazonaws.com/device/FME2445
  1. create a new s3 bucket say s3://server-code-template and upload the app.zip file with app.py, cert.pem and key.pem files into it.
  2. Create a new cronfile.txt with the following code
@reboot . /home/ubuntu/startupscript.sh

startupscript.sh contains the following code

#!/bin/bash  -x
DATE=$(date +'%F %H:%M:%S')

DIR=/home/ubuntu
echo "Inside startupscript" > $DIR/scriptoutput.txt
sudo rm -rf artifactsfroms3 >> $DIR/scriptoutput.txt
mkdir artifactsfroms3 >> $DIR/scriptoutput.txt

/usr/local/bin/aws s3 cp --recursive s3://server-code-template/ /home/ubuntu/artifactsfroms3/ --profile default --debug  >> $DIR/scriptoutput.txt


alias RUN_DIR='cd /home/ubuntu/artifactsfroms3'
RUN_DIR
pwd

unzip -o app.zip >> $DIR/scriptoutput.txt

chmod +x app.py >> $DIR/scriptoutput.txt

sudo python app.py >> $DIR/scriptoutput.txt

echo "file executed" >> $DIR/scriptoutput.txt
  1. Shutdown the instance.
  2. Create an ami from the instance
  3. Create a Launch template from the ami.
  4. Create an AutoScaling group from the Launch Template with 0 instances
  5. Scale up the autoscaling group to 2 instances
  6. Add the 2 new instances to target group and ensure that they pass health checks.
  7. Post data to load balancer from a standalone client
sudo python3 appclient.py
  1. Putty into the servers to view the server logs.

Code for app.py

from flask import Flask
from flask_restful import Api, Resource, reqparse

app = Flask(__name__)
api = Api(app)

devices = [
    {
        "deviceSerialNumber": "ELK9045",
        "type": 42,
        "location": "New York"
    },
    {
        "deviceSerialNumber": "FME2445",
        "type": 42,
        "location": "London"
    },
    {
        "deviceSerialNumber": "JCB2489",
        "type": 42,
        "location": "Tokyo"
    }
]

class Devices(Resource):
    def get(self, deviceSerialNumber):
        for device in devices:
            if(deviceSerialNumber == device["deviceSerialNumber"]):
                return device, 200
        return "Device not found", 404

    def post(self, deviceSerialNumber):
        parser = reqparse.RequestParser()
        parser.add_argument("type")
        parser.add_argument("location")
        args = parser.parse_args()

        for device in devices:
            if(deviceSerialNumber == device["deviceSerialNumber"]):
                return "Device with deviceSerialNumber {} already exists".format(deviceSerialNumber), 400

        device = {
            "deviceSerialNumber": deviceSerialNumber,
            "type": args["type"],
            "location": args["location"]
        }
        devices.append(device)
        return device, 201

    def put(self, deviceSerialNumber):
        parser = reqparse.RequestParser()
        parser.add_argument("type")
        parser.add_argument("location")
        args = parser.parse_args()

        for device in devices:
            if(deviceSerialNumber == device["deviceSerialNumber"]):
                device["type"] = args["type"]
                device["location"] = args["location"]
                return device, 200
        
        device = {
            "deviceSerialNumber": deviceSerialNumber,
            "type": args["type"],
            "location": args["location"]
        }
        devices.append(device)
        return device, 201

    def delete(self, deviceSerialNumber):
        global devices
        devices = [device for device in devices if device["deviceSerialNumber"] != deviceSerialNumber]
        return "{} is deleted.".format(deviceSerialNumber), 200
      
api.add_resource(Devices, "/device/")

app.run(debug=True,host='0.0.0.0',port=443,ssl_context=('cert.pem', 'key.pem'))

Code for appclient.py

import json
import requests

def consumeGETRequestSync():
 clientCrt = "cert.pem"
 clientKey = "key.pem"
 url = "https://ALB-xxxxxxxxxxx.elb.eu-east-x.amazonaws.com/device/JCB2489"
 certServer = 'cacert.pem'
 headers = {'content-type': 'application/json'}
 r = requests.get(url,verify=certServer, headers=headers, cert=(clientCrt, clientKey))
 print(r.status_code)
 print(r.json())



def consumePOSTRequestSync():
 clientCrt = "cert.pem"
 clientKey = "key.pem"
 url = "https://ALB-xxxxxxxxxxx.elb.eu-east-x.amazonaws.com/device/ABJX9357"
 deviceParam = {"deviceSerialNumber": "ABJX9357","type": 20,"location": "Delhi"}
 certServer = 'cacert.pem'
 headers = {'content-type': 'application/json'}
 r = requests.post(url, data=json.dumps(deviceParam), verify=certServer, headers=headers, cert=(clientCrt, clientKey))
 print(r.status_code)
 print(r.json())

# call 
consumeGETRequestSync()
#consumePOSTRequestSync()

High level steps on the client side

  1. create a new s3 bucket say s3://client-code-template and upload the appclient.zip containing appclient.py and cacert.pem files into it.
  2. Create an EC2 instance
  3. Create a new cronfile.txt with the following code
@reboot . /home/ubuntu/startupscript.sh

startupscript.sh contains the following code

#!/bin/bash  -x
DATE=$(date +'%F %H:%M:%S')

DIR=/home/ubuntu
echo "Inside startupscript" > $DIR/scriptoutput.txt
sudo rm -rf artifactsfroms3 >> $DIR/scriptoutput.txt
mkdir artifactsfroms3 >> $DIR/scriptoutput.txt

/usr/local/bin/aws s3 cp --recursive s3://client-code-template/ /home/ubuntu/artifactsfroms3/ --profile default --debug  >> $DIR/scriptoutput.txt


alias RUN_DIR='cd /home/ec2-user/artifactsfroms3'
RUN_DIR
pwd

unzip -o appclient.zip >> $DIR/scriptoutput.txt

chmod +x appclient.py >> $DIR/scriptoutput.txt

sudo python appclient.py >> $DIR/scriptoutput.txt

echo "file executed" >> $DIR/rootscriptoutput.txt
  1. Fire sudo reboot at command prompt to ensure that the files are downloaded from S3 and the scripts executed to fetch and install the latest executable (appclien.py) on client, and the client script invokes the POST method to post data onto server
  2. Shutdown the instance.
  3. Create an ami from the instance
  4. Create a Launch template from the ami.
  5. Create an AutoScaling group from the Launch Template with 0 instances
  6. Scale up the auto group to 1 instance
  7. Once the instance is finished initializing, putty into the instance to view the client logs to ensure data is sent to server.
  8. You are now ready to scale it up to 1k+ instances

Detailed steps

  • Creating a load balancer
  • Creating an ami from the server EC2 instance
  • Create a launch template
  • Create an autoscaling group
  • Scale up the clients

Next navigate to the EC2 instances dashboard and you will find your new instance spinning up. Watch the initializing hourglass. Click refresh till the hourglass goes away. This might take some time since we have added the Startup client script to the reboot cronjob of the instance AMI.

Featured

Autoscaling a Kubernetes cluster using Horizontal Pod AutoScaler (HPA)

In the blog I shall be describing the step by step process to setup a Kubernetes cluster using minikube to run a node js based application in Azure cloud and autoscaling the application using Horizontal Pod AutoScaler.

  • Spin up a Standard D4s v3 (4 vcpus, 16 GiB memory) Virtual machine in Azure based on Linux (ubuntu 18.04)
  • Putty into the server and sudo su -. Execute all following commands as root.
sudo apt-get update
sudo apt-get install -y apt-transport-https
sudo apt-get install -y virtualbox virtualbox-ext-pack
  • Install kubectl
curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add –

sudo touch /etc/apt/sources.list.d/kubernetes.list

echo "deb http://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee -a /etc/apt/sources.list.d/kubernetes.list

sudo apt-get update

sudo apt-get install -y kubectl
  • Install minikube
curl -LO https://storage.googleapis.com/minikube/releases/latest/minikube-latest.x86_64.rpm

sudo rpm -ivh minikube-latest.x86_64.rpm


chmod +x minikube && sudo mv minikube /usr/local/bin/

minikube start
  • Run the following commands to ensure proper installation.
kubectl api-versions
       
kubectl get services
  • Create a new node js application
apt-get install npm

npm init  // this create a new package.json file in the directory
  • Create an index.js file with the following code.
const http = require(‘http’);
const port = process.env.PORT || 3000;
const server = http.createServer((req, res) => {
         res.statusCode = 200;
         res.setHeader(‘Content-Type’, ‘text/plain’);
        res.end(‘Hello World\n’);
});server.listen(port, () => {
 console.log(‘Server running on port: ${port}’);
});
  • Create a Dockerfile in the same directory with the following content
FROM node:alpine

RUN mkdir -p /usr/src/app

WORKDIR /usr/src/app

ADD index.js ./

ADD package.json ./

RUN npm install

CMD ["npm", "start"]

Sudo apt-get install docker.io
docker login

docker build -t rtdhaccount/node-application .

docker push rtdhaccount/node-application
  • Deploy to kubernetes
    • Create a folder called k8s
    • Create deployment.yml inside k8s with the following content
apiVersion: extensions/v1
kind: Deployment
metadata:
  name: node-application-deployment
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: node-application
    spec:
      containers:
      - name: node-application
        image: rtdhaccount/node-application
        imagePullPolicy: Always
        ports:
        - containerPort: 3000
  • Create service.yml inside k8s with the following content
apiVersion: v1
kind: Service
metadata:
  name: node-application
  labels:
    app: node-application
spec:
  selector:
    app: node-application
  ports:
  - port: 3000
    protocol: TCP
    nodePort: 30001
  type: LoadBalancer
  • Verify the service using the following commands
kubectl apply -f k8s
kubectl get services

kubectl get deployments
kubectl get pods
  • Access Minikube dashboard. To access the minikube dashboard. Open a new putty window and run
sudo kubectl proxy --address='0.0.0.0' --disable-filter=true

open a new browser window and enter the url below

http://xx.yy.zz.ww:8001/api/v1/namespaces/kube-system/services/kubernetes-dashboard/proxy/#!/deployment?namespace=default

where xx.yy.zz.ww is the public ipaddress of your VM.

Ensure that port 8001 is open to inbound traffic in the VM

Incase you get the error :

Unable to start VM: Error getting state for host: machine does not exist

minikube delete
minikube start
  • Accessing the node application.Open a new putty window and setup port forwarding like below
sudo ssh -i ~/.minikube/machines/minikube/id_rsa docker@$(minikube ip) -L \*:30001:0.0.0.0:30001

And the access your application from the browser window using the url

http://xx.yy.zz.ww:30001/

Ensure that port 30001 is open to inbound traffic in the VM

Incase of any errors in port forwarding

ssh-keygen -f "/root/.ssh/known_hosts" -R "192.168.99.100"
  • Scaling the cluster using Horizontal pod autoscaler. Create a new file hpa.yml under the k8s folder with the following content
apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
 annotations:
 name: node-application
 namespace: default
spec:
 maxReplicas: 5
 minReplicas: 1
 scaleTargetRef:
  apiVersion: extensions/v1
  kind: Deployment
  name: node-application
 targetCPUUtilizationPercentage: 1

With targetCPUUtilizationPercentage option, we are saying that, Once the cpu load inside observed CPU more than 1%, scale this pod.

kubectl apply -f k8s/hpa.yml
  • Deploy the metrics-server
kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.3.6/components.yaml

kubectl get deployment metrics-server -n kube-system

minikube addons list

minikube addons enable metrics-server

kubectl get hpa
  • Testing using apache benchmark : Install apache benchmark using the below command
apt-get install apache2-utils

ab -c 500 -n 100000 -t 100000 http://xx.yy.zz.ww:30001/
  • Observe the pods scaling up using the commands
kubectl get pods

kubectl get hpa

References :

https://matthewpalmer.net/kubernetes-app-developer/articles/install-kubernetes-ubuntu-tutorial.html

https://blog.kloia.com/deploy-auto-scalable-node-js-application-on-kubernetes-cluster-part-1-f40e622f2337

Featured

Augmented Reality – Healthcare Use Case

This blog demonstrates creating augmented reality application using Unity game engine and Vuforia.

Augmented reality (AR) can be defined as the overlaying of digital content (images, video, text, sound, etc.) onto physical objects or locations, and it is typically experienced by looking through the camera lens of an electronic device such as a smartphone, tablet, or specialized viewing devices like google glass or Microsoft hololens.

Objective

The purpose of this blog will be to help a patient taking prescription medications to view the pharmaceutical details of the medication under a mobile phone by just scanning the prescription label on the medication bottle.

Examples are shown below. They might not be medically accurate but for the purpose of this case study have just been picked up as images.

When the bottle with the label (We shall call this our Image Target) shown below is scanned using a laptop with a camera, the medication literature as shown below is displayed to the user for reading (We shall refer to this as our Image Overlay).

Fig 1 : Image Target
Fig 2 : Image Overlay

Download and Installation

Setting up Unity

  1. Navigate to the GameObject dropdown menu and select “Vuforia > AR Camera.” If a dialog box appears requesting that you import additional assets, select “Import.”
  2. Select “Vuforia > Image” in the GameObject dropdown menu to add an Image Target to your scene.
  3. Locate the directional widget in the top right corner of your scene panel. Click the green y-axis option of your directional widget. This will make the green arrow disappear and shift your perspective in the scene panel to a y-axis birds-eye view of your project.

Go to the Vuforia developer portal to create your free Vuforia account and generate a development license key for your application.

  1. Select “Register” in the upper right corner.
  2. Once your account is setup, select “Develop” in the menu panel and then “License Manager.”
  3. Select “Get Development Key” on the License Manager page.
  4. On the “Add a free Development License Key” page, give your application a name and agree to the terms.
  5. To access your license key, return to the “License Manager” page in the Vuforia developer portal and click the application name you just created. Copy the alphanumeric string on the right.
  6. Return to Unity and File > Build Setting and click the “Player Settings” button in the bottom of the pop up window.
  7. Navigate back to the Inspector panel and click the “XR Settings” option located at the bottom of the accordion menu. Select “Vuforia Augmented Reality Support.” Accept the Vuforia license in Unity.
  8. Select ARCamera in the Hierarchy pane. In the inspector panel, click the “Open Vuforia Configuration” button in the Vuforia Behavious (Script)” component. Copy and paste your license key in the App License Key text field.
  9. Navigate to “File > Save Scene as…” and give your scene a name. By default, Unity will save your scenes to the “Assets” folder of your project.

Convert your Image Target to a Dataset

Upload the medication bottle image shown above to Vuforia inorder to code it with the tracking information.

  1. Go to the Vuforia developer portal and click “Develop” in the top menu and then select “Target Manager.”
  2. Click the “Add Database” button on the Target Manager page. In the dialog box that appears, give your database a name and select the device option. Click “Create.”
  3. Once your database has been created, return to the Target Manager page and click your database. Select “Add Target,” choose “Single Image” as the type, add the width and height of your image, and upload by clicking on the “Browse” button.

To determine the width of your image, right-click the image file and choose Properties. Select the Details tab in the dialog box that appears and scroll down to the “Image” section to find the width of the image.

Vuforia assigns target images a feature tracking rating on a scale of 1 to 5 stars. Navigate to the Image Targets database in the Vuforia Developer Portal and select the image you just uploaded. click the Show Features link to reveal the image’s trackable features.

The yellow markers represent the areas in the image that your application will use to determine if it is looking at the proper image target. If you notice that Vuforia is tracking unstable elements in your image (e.g. shadow, person, etc.), then you will need to re-edit or choose a different image.

If your image is given a good augmentability rating (anything between 3-5 stars should work), access the Target Manager page, then select your image and click “Download Database.”

Clicking download database will create a package file like the one shown below. Doubleclick and Import it

Import the Image Target

Select the ImageTarget GameObject in the project Hierarchy panel. Navigate to the inspector panel and select the drop-down menu of the Database parameter in the Image Target Behaviour. Select the name of the database you created earlier. The ImageTarget game object should now be associated with your book cover.

Add an Image Overlay

You will need to add an overlay to test if your AR camera is tracking the image. The overlay is the multimedia content that your application will superimpose onto your trigger image. In Unity, you can add images, videos, audio files, and animated 3D models as overlays. In our case it is the Rx detailed information Fig 2 above.

Drag the image into the “Assets” folder in the project panel at the bottom of the editor. Once the image has finished importing, select it and change its Texture Type to “Sprite (2D and UI)” in the inspector panel. Click “Apply.”

Next, drag the sprite you just imported onto your “Image Target” game object in the project Hierarchy.

The sprite is now a “child” of the Image Target game object and will only appear if the ARCamera recognizes the Image Target.

Create a User Interface

Now we shall create a simple user interface that guides the patient to scan their pharma Rx label.

Import a .jpg or .png file of your target image into your Assets folder.

You can do this by simply clicking and dragging the image file from its current file location into the Unity Assets folder. Once the image is imported, convert it to “Sprite (2D and UI)” in the inspector panel like you did for the author image overlay.

Next, navigate to the Hierarchy panel and select “Create > UI > Image.” Select the “2D” option in your scene view menu and center the image you just added by selecting it and clicking “F” on your keyboard. Click and drag the blue corners to center the image in your Canvas frame. Then, drag your Rx label into the “Source Image” parameter in the inspector panel.

Next, add some instructions to your UI by

  1. Selecting “Create > UI > Text.”
  2. In the inspector panel, write some instructions in the text field, such as “Scan this Prescription Label.”
  3. Position your text box directly below your book cover and modify the font and color accordingly.

Before you can add functionality to the UI, you will need to combine your Canvas elements into a single game object.

  1. Select “Create > Create Empty.” This will add an empty GameObject to your project Hierarchy.
  2. Rename this GameObject “Finder” and make it a child of the Canvas by dragging it into the Canvas game object.
  3. Drag the Image and Text game objects and make them children of your new “Finder” game object.

For this next step of the UI design, you will need to modify Vuforia’s “Default Trackable Event Handler” so that your “Finder” game object disappears from the user’s view once the Image Target has been found by the application.

Locate the script number packages folder and rightclick on open c# project as shown below.

Clicking open c# project will open Visual Studio editor like below

For this simple case study. We might just be able to modify the code through Unity console.

Select ImageTarget in Hierarchy and find the DefaultTrackableEvent in Inspector. Initially there will not be entries under On Target Found () and On Target Lost () methods.

Click the “+” button.

Select values for the 3 fields under On Target Found and On Target Lost as below. Note the values for the check box under each.

These settings tell the application to disable the Finder game object if the target image is found and to enable the Finder game object if the target image is lost.

Test Your Scene

Save your scene and press the play icon in the menu above the scene view.

Hold your prescription label in front of your webcam. You should see your UI overlaid onto the camera view. Place your label into the camera view and the prescription information / medication literature details image should appear overlaid on top of it.

Detecting the onset of covid-19 early using heartrate measurements collected from wearable devices

Through this blog post I shall be creating a solution architecture for the following

  1. Collecting wearable devices data from multiple users. For example heartrate from Fitbit and Apple HealthKit devices.
  2. Ingesting this data into an influxdb database running inside a docker container
  3. Running a pyspark job on aws EMR cluster to calculate heart rate variability, monitor the resting heart rate and breathing rate increase
  4. Generate values that could possibly be used in a Deep Learning Model to predict the onset of Coronavirus in a patient atleast a week before actual symptoms showup.

Medical study indicates that breathing rate, resting heart rate and heart rate variability can be used to detect the onset of illnesses, especially when tracked when the user is asleep.

Researchers have found that in many cases heart rate variability decreases, while resting heart rate and breathing rate increase, in people who have Covid-19 symptoms. In some cases, those measurements could signal changes in a person’s health nearly a week before symptoms were reported.

Setup influxDB locally.

We shall go with influxDB since it is a good timeseries database, and available in opensource versions.

Copy the attached conf file to D:/ProgramData/InfluxDB

docker run -p 8086:8086 -v C:/ProgramData/InfluxDB:/var/lib/influxdb influxdb -config /var/lib/influxdb/influxdb.conf

Verify using

http://localhost:8086/query?q=show%20databases

Inorder to use InfluxDB cli

$ docker ps
copy image id 

$ docker exec -it  10733ba6d014 /bin/bash
root@10733ba6d014:/# influx

Ingesting data from Fitbit devices into InfluxDB.

create a new app by registering into Fitbit at https://dev.fitbit.com/apps/new

Request all users who would like to share their Fitbit data with you to login to their respective Fitbit apps and authorize your app.

Let us now create a database in InfluxDB to store all our participant user’s Fitbit data.

$ create database fitbit
$ use fitbit
CREATE USER root WITH PASSWORD 'root' WITH ALL PRIVILEGES

execute the code provided below

#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import sys
import os
import pytz
from datetime import datetime, date, timedelta
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError

LOCAL_TIMEZONE = pytz.timezone('America/New_York')
FITBIT_LANGUAGE = 'en_US'
FITBIT_CLIENT_ID = '2XXX2'
FITBIT_CLIENT_SECRET = 'cc2f9f738c695a763xxxxxxxxxx4480'
FITBIT_ACCESS_TOKEN = \
    'eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiIyMkJUWDIiLCJzdWIiOiI2WUxXM0YiLCJpc3MiOiJGaXRiaXQiLCJ0eXAiOiJhY2Nlc3NfdGxxxxxxxxxxxxxxxiJ3aHIgd3BybyB3bnV0IHdzbGUgd3dlaSB3c29jIHdzZXQgd2FjdCB3bG9jIiwiZXhwIjoxNjAwMTQzNDUzLCJpYXQiOjE2MDAxMTQ2NTN9.506-BPp5P5rRcTwBujGszfWZAziCJV2WY7_-QIRybQM'
FITBIT_INITIAL_CODE = '37f3932f38322df1547xxxxxxxxx33398aa6e0'
REDIRECT_URI = 'https://localhost'
INFLUXDB_HOST = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_USERNAME = 'root'
INFLUXDB_PASSWORD = 'root'
INFLUXDB_DATABASE = 'fitbit'
points = []


def fetch_data(category, type):
    try:
        response = requests.get('https://api.fitbit.com/1/user/-/'
                                + category + '/' + type
                                + '/date/today/1d.json',
                                headers={'Authorization': 'Bearer '
                                + FITBIT_ACCESS_TOKEN,
                                'Accept-Language': FITBIT_LANGUAGE})
        response.raise_for_status()
    except requests.exceptions.HTTPError, err:
        print 'HTTP request failed: %s' % err
        sys.exit()

    data = response.json()
    print 'Got ' + type + ' from Fitbit'

    for day in data[category.replace('/', '-') + '-' + type]:
        points.append({'measurement': type,
                      'time': LOCAL_TIMEZONE.localize(datetime.fromisoformat(day['dateTime'
                      ])).astimezone(pytz.utc).isoformat(),
                      'fields': {'value': float(day['value'])}})


def fetch_heartrate(date):
    try:
        response = \
            requests.get('https://api.fitbit.com/1/user/-/activities/heart/date/'
                          + date + '/1d/1min.json',
                         headers={'Authorization': 'Bearer '
                         + FITBIT_ACCESS_TOKEN,
                         'Accept-Language': FITBIT_LANGUAGE})
        response.raise_for_status()
    except requests.exceptions.HTTPError, err:
        print 'HTTP request failed: %s' % err
        sys.exit()

    data = response.json()
    print 'Got heartrates from Fitbit'

    for day in data['activities-heart']:
        if 'restingHeartRate' in day['value']:
            points.append({'measurement': 'restingHeartRate',
                          'time': datetime.fromisoformat(day['dateTime'
                          ]), 'fields': {'value': float(day['value'
                          ]['restingHeartRate'])}})

        if 'heartRateZones' in day['value']:
            for zone in day['value']['heartRateZones']:
                if 'caloriesOut' in zone and 'min' in zone and 'max' \
                    in zone and 'minutes' in zone:
                    points.append({
                        'measurement': 'heartRateZones',
                        'time': datetime.fromisoformat(day['dateTime'
                                ]),
                        'tags': {'zone': zone['name']},
                        'fields': {
                            'caloriesOut': float(zone['caloriesOut']),
                            'min': float(zone['min']),
                            'max': float(zone['max']),
                            'minutes': float(zone['minutes']),
                            },
                        })
                elif 'min' in zone and 'max' in zone and 'minutes' \
                    in zone:

                    points.append({
                        'measurement': 'heartRateZones',
                        'time': datetime.fromisoformat(day['dateTime'
                                ]),
                        'tags': {'zone': zone['name']},
                        'fields': {'min': float(zone['min']),
                                   'max': float(zone['max']),
                                   'minutes': float(zone['minutes'])},
                        })

    if 'activities-heart-intraday' in data:
        for value in data['activities-heart-intraday']['dataset']:
            time = datetime.fromisoformat(date + 'T' + value['time'])
            utc_time = \
                LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
            points.append({'measurement': 'heartrate',
                          'time': utc_time,
                          'fields': {'value': float(value['value'])}})


def process_levels(levels):
    for level in levels:
        type = level['level']
        if type == 'asleep':
            type = 'light'
        if type == 'restless':
            type = 'rem'
        if type == 'awake':
            type = 'wake'

        time = datetime.fromisoformat(level['dateTime'])
        utc_time = \
            LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
        points.append({'measurement': 'sleep_levels', 'time': utc_time,
                      'fields': {'seconds': int(level['seconds'])}})


def fetch_activities(date):
    try:
        response = \
            requests.get('https://api.fitbit.com/1/user/-/activities/list.json'
                         , headers={'Authorization': 'Bearer '
                         + FITBIT_ACCESS_TOKEN,
                         'Accept-Language': FITBIT_LANGUAGE}, params={
            'beforeDate': date,
            'sort': 'desc',
            'limit': 10,
            'offset': 0,
            })
        response.raise_for_status()
    except requests.exceptions.HTTPError, err:
        print 'HTTP request failed: %s' % err
        sys.exit()

    data = response.json()
    print 'Got activities from Fitbit'

    for activity in data['activities']:
        fields = {}

        if 'activeDuration' in activity:
            fields['activeDuration'] = int(activity['activeDuration'])
        if 'averageHeartRate' in activity:
            fields['averageHeartRate'] = int(activity['averageHeartRate'
                    ])
        if 'calories' in activity:
            fields['calories'] = int(activity['calories'])
        if 'duration' in activity:
            fields['duration'] = int(activity['duration'])
        if 'distance' in activity:
            fields['distance'] = float(activity['distance'])
            fields['distanceUnit'] = activity['distanceUnit']
        if 'pace' in activity:
            fields['pace'] = float(activity['pace'])
        if 'speed' in activity:
            fields['speed'] = float(activity['speed'])
        if 'elevationGain' in activity:
            fields['elevationGain'] = int(activity['elevationGain'])
        if 'steps' in activity:
            fields['steps'] = int(activity['steps'])

        for level in activity['activityLevel']:
            if level['name'] == 'sedentary':
                fields[level['name'] + 'Minutes'] = int(level['minutes'
                        ])
            else:
                fields[level['name'] + 'ActiveMinutes'] = \
                    int(level['minutes'])

        time = datetime.fromisoformat(activity['startTime'].strip('Z'))
        utc_time = time.astimezone(pytz.utc).isoformat()
        points.append({
            'measurement': 'activity',
            'time': utc_time,
            'tags': {'activityName': activity['activityName']},
            'fields': fields,
            })


try:
    client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT,
                            username=INFLUXDB_USERNAME,
                            password=INFLUXDB_PASSWORD)
    client.create_database(INFLUXDB_DATABASE)
    client.switch_database(INFLUXDB_DATABASE)
except InfluxDBClientError, err:
    print 'InfluxDB connection failed: %s' % err
    sys.exit()

if not FITBIT_ACCESS_TOKEN:
    if os.path.isfile('.fitbit-refreshtoken'):
        f = open('.fitbit-refreshtoken', 'r')
        token = f.read()
        f.close()
        response = requests.post('https://api.fitbit.com/oauth2/token',
                                 data={
            'client_id': FITBIT_CLIENT_ID,
            'grant_type': 'refresh_token',
            'redirect_uri': REDIRECT_URI,
            'refresh_token': token,
            }, auth=(FITBIT_CLIENT_ID, FITBIT_CLIENT_SECRET))
    else:
        response = requests.post('https://api.fitbit.com/oauth2/token',
                                 data={
            'client_id': FITBIT_CLIENT_ID,
            'grant_type': 'authorization_code',
            'redirect_uri': REDIRECT_URI,
            'code': FITBIT_INITIAL_CODE,
            }, auth=(FITBIT_CLIENT_ID, FITBIT_CLIENT_SECRET))

    response.raise_for_status()

    json = response.json()
    FITBIT_ACCESS_TOKEN = json['access_token']
    refresh_token = json['refresh_token']
    f = open('.fitbit-refreshtoken', 'w+')
    f.write(refresh_token)
    f.close()

end = date.today()
start = end - timedelta(days=1)

try:
    response = \
        requests.get('https://api.fitbit.com/1.2/user/-/sleep/date/'
                     + start.isoformat() + '/' + end.isoformat()
                     + '.json', headers={'Authorization': 'Bearer '
                     + FITBIT_ACCESS_TOKEN,
                     'Accept-Language': FITBIT_LANGUAGE})
    response.raise_for_status()
except requests.exceptions.HTTPError, err:
    print 'HTTP request failed: %s' % err
    sys.exit()

data = response.json()
print 'Got sleep sessions from Fitbit'

for day in data['sleep']:
    time = datetime.fromisoformat(day['startTime'])
    utc_time = \
        LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
    if day['type'] == 'stages':
        points.append({'measurement': 'sleep', 'time': utc_time,
                      'fields': {
            'duration': int(day['duration']),
            'efficiency': int(day['efficiency']),
            'is_main_sleep': bool(day['isMainSleep']),
            'minutes_asleep': int(day['minutesAsleep']),
            'minutes_awake': int(day['minutesAwake']),
            'time_in_bed': int(day['timeInBed']),
            'minutes_deep': int(day['levels']['summary']['deep'
                                ]['minutes']),
            'minutes_light': int(day['levels']['summary']['light'
                                 ]['minutes']),
            'minutes_rem': int(day['levels']['summary']['rem']['minutes'
                               ]),
            'minutes_wake': int(day['levels']['summary']['wake'
                                ]['minutes']),
            }})
    else:

        points.append({'measurement': 'sleep', 'time': utc_time,
                      'fields': {
            'duration': int(day['duration']),
            'efficiency': int(day['efficiency']),
            'is_main_sleep': bool(day['isMainSleep']),
            'minutes_asleep': int(day['minutesAsleep']),
            'minutes_awake': int(day['minutesAwake']),
            'time_in_bed': int(day['timeInBed']),
            'minutes_deep': 0,
            'minutes_light': int(day['levels']['summary']['asleep'
                                 ]['minutes']),
            'minutes_rem': int(day['levels']['summary']['restless'
                               ]['minutes']),
            'minutes_wake': int(day['levels']['summary']['awake'
                                ]['minutes']),
            }})

    if 'data' in day['levels']:
        process_levels(day['levels']['data'])

    if 'shortData' in day['levels']:
        process_levels(day['levels']['shortData'])

fetch_data('activities', 'steps')
fetch_data('activities', 'distance')
fetch_data('activities', 'floors')
fetch_data('activities', 'elevation')
fetch_data('activities', 'distance')
fetch_data('activities', 'minutesSedentary')
fetch_data('activities', 'minutesLightlyActive')
fetch_data('activities', 'minutesFairlyActive')
fetch_data('activities', 'minutesVeryActive')
fetch_data('activities', 'calories')
fetch_data('activities', 'activityCalories')
fetch_data('body', 'weight')
fetch_data('body', 'fat')
fetch_data('body', 'bmi')
fetch_data('foods/log', 'water')
fetch_data('foods/log', 'caloriesIn')
fetch_heartrate(date.today().isoformat())
fetch_activities((date.today() + timedelta(days=1)).isoformat())

try:
    client.write_points(points)
except InfluxDBClientError, err:
    print 'Unable to write points to InfluxDB: %s' % err
    sys.exit()

print 'Successfully wrote %s data points to InfluxDB' % len(points)

The primary artifact needed for executing the code above is the access token.

For fetching the access token we first need the code which is obtained by clicking on the link like below.

https://www.fitbit.com/oauth2/authorize?response_type=code&client_id=2XXXX2&redirect_uri=http%3A%2F%2F127.0.0.1%3A8085&scope=activity%20heartrate%20location%20nutrition%20profile%20settings%20sleep%20social%20weight&expires_in=604800

This keeps changing, the proper procedure to get a valid one is click on the tutorial link on the page below and follow the steps

https://dev.fitbit.com/apps/oauthinteractivetutorial?clientEncodedId=2XXXX2&clientSecret=cc2f9f738c695a763c0ecxxxxxxc4480&redirectUri=http://127.0.0.1:8085&applicationType=SERVER

Once the code is executed successfully, you will observe the following output.

Query the influxdb to verify the data inserted

Ingesting data from Apple Healthkit devices into InfluxDB.

use the following files :

healthkit.py

import healthkitfn
import healthkitcnf
import zipfile
import os
import shutil
import dropbox
from influxdb import InfluxDBClient
from datetime import datetime
from xml.dom import minidom
import csv

#Initiate Logging
logger = healthkitfn.init_logging()
#Temp time for tracking
startTime = datetime.now()

###################
# Start Functions #
###################

#Dropbox Download export.zip Function
def healthkit_import():
    #Connect To Dropbox
    dbx = dropbox.Dropbox(healthkitcnf.access_token)
    #Clean Existing Files
    if os.path.isdir(healthkitcnf.ExportDir):
        shutil.rmtree(healthkitcnf.ExportDir)
        logger.info(f"Removed {healthkitcnf.ExportDir}")
    if os.path.isfile(healthkitcnf.ExportZip):
        os.remove(healthkitcnf.ExportZip)
        logger.info(f"Removed {healthkitcnf.ExportZip}")
    if os.path.isfile(healthkitcnf.ExportXML):
        os.remove(healthkitcnf.ExportXML)
        logger.info(f"Removed {healthkitcnf.ExportXML}")
    #Download New export.zip and unzip
    with open("export.zip", "wb") as f:
        metadata, res = dbx.files_download(path="/export.zip")
        f.write(res.content)
        logger.info('Downloaded export.zip Successfully')
        zip_ref = zipfile.ZipFile(healthkitcnf.ExportZip, 'r')
        zip_ref.extractall()
        zip_ref.close()
        logger.info("Unzipped export.zip Successfully")
        shutil.copy("apple_health_export/export.xml", healthkitcnf.ExportXML)
        logger.info("Copied export.xml to primary directory")
    return logger.info('Download and Copy Completed')

#Connect to Database
def healthkit_db():
        client = InfluxDBClient(healthkitcnf.IP, healthkitcnf.PORT, healthkitcnf.USERNAME, healthkitcnf.PASSWORD, healthkitcnf.DB_NAME)
        client.create_database(healthkitcnf.DB_NAME)
        logger.info('Connected to Database Successfully')
        return client

#Load HK Type Values Into Array
def healthkit_text():
        f = open('HKValues.txt', 'r')
        HKValues = f.readlines()
        f.close()
        return HKValues

def healthkit_csv():
        with open('HKValues.csv', newline='') as csvfile:
                HKValues = list(csv.reader(csvfile))
        return HKValues

#Send Notification once completed
def healthkit_notify():
    try:
        email_ssl = smtplib.SMTP_SSL('smtp.gmail.com', 465)
        email_ssl.ehlo
        email_ssl.login(healthkitcnf.email_user, healthkitcnf.email_pass)
        sent_from = healthkitcnf.email_user  
        send_to = healthkitcnf.email_sendto  
        subject = healthkitcnf.email_subject
        email_text = healthkitcnf.email_body
        email_ssl.sendmail(sent_from, send_to, email_text)
        email_ssl.close()
        logger.info("Notification Sent")
    except:
        logger.debug("Failed to send completed notification.")
    return (logger.info("Email sent"))

#XML Parse / Import HKValues.txt / Update DB
def healthkit_xml():
        NAME = healthkitcnf.NAME
        #Stat Keeping
        NUMRECORDS = 12
        RECORDINC = 1

        #Setup XML Parse
        logger.info("Importing XML Into Record List")
        xmldoc = minidom.parse('export.xml')
        recordlist = xmldoc.getElementsByTagName('Record')
        logger.info('Imported XML Records Successfully')
        #Import Healthkit Values Into Array
        #HKValues = healthkit_csv()
        #logger.info('Imported Health Kit Type Values')

        logger.info("Starting Heart Rate Export")
        for s in recordlist:
            if s.attributes['type'].value == "HKQuantityTypeIdentifierHeartRate":
                client.write_points([{"measurement": "heartrate","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": s.attributes['startDate'].value,"fields": {"watch_heartrate": float(s.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #Resting Heart Rate
        logger.info("Starting Resting Heart Rate Export")
        for restingHeart in recordlist:
            if restingHeart.attributes['type'].value == "HKQuantityTypeIdentifierRestingHeartRate":
                client.write_points([{"measurement": "restingheartrate","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": restingHeart.attributes['startDate'].value,"fields": {"resting_heartrate": float(restingHeart.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Resting Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #Walking Heart Rate
        logger.info("Starting Walking Heart Rate Export")
        for walkingHeart in recordlist:
            if walkingHeart.attributes['type'].value == "HKQuantityTypeIdentifierWalkingHeartRateAverage":
                client.write_points([{"measurement": "walkingHeart","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": walkingHeart.attributes['startDate'].value,"fields": {"walking_heartrate": float(walkingHeart.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Walking Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #distance walked
        logger.info("Starting Distance Walked Export")
        for distance in recordlist:
            if distance.attributes['type'].value == 'HKQuantityTypeIdentifierDistanceWalkingRunning':
                client.write_points([{"measurement": "distance","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": distance.attributes['startDate'].value,"fields": {"distance": float(distance.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Distance Walked Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #basal calories
        logger.info("Starting Basal Calories Export")
        for basal in recordlist:
            if basal.attributes['type'].value == 'HKQuantityTypeIdentifierBasalEnergyBurned':
                client.write_points([{"measurement": "basal","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": basal.attributes['startDate'].value,"fields": {"basal": float(basal.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Basal Calories Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #active calories
        logger.info("Starting Active Calories Export")
        for active in recordlist:
            if active.attributes['type'].value == 'HKQuantityTypeIdentifierActiveEnergyBurned':
                client.write_points([{"measurement": "active","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": active.attributes['startDate'].value,"fields": {"active": float(active.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Active Calories Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #BMI
        logger.info("Starting BMI Export")
        for bmi in recordlist:
            if bmi.attributes['type'].value == 'HKQuantityTypeIdentifierBodyMassIndex':
                client.write_points([{"measurement": "bmi","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": bmi.attributes['startDate'].value,"fields": {"bmi": float(bmi.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"BMI Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #WEIGHT
        logger.info("Starting Weight Export")
        for weight in recordlist:
            if weight.attributes['type'].value == 'HKQuantityTypeIdentifierBodyMass':
                client.write_points([{"measurement": "weight","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": weight.attributes['startDate'].value,"fields": {"weight": float(weight.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Weight Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #BodyFatPercentage
        print("Starting Body Fat Percentage Export")
        for bodyfat in recordlist:
            if bodyfat.attributes['type'].value == 'HKQuantityTypeIdentifierBodyFatPercentage':
                client.write_points([{"measurement": "bodyfat","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": bodyfat.attributes['startDate'].value,"fields": {"bodyfat": float(bodyfat.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Body Fat Percentage Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #LeanBodyMass
        logger.info("Starting Lean Body Mass Export")
        for leanmass in recordlist:
            if leanmass.attributes['type'].value == 'HKQuantityTypeIdentifierLeanBodyMass':
                client.write_points([{"measurement": "leanmass","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": leanmass.attributes['startDate'].value,"fields": {"leanmass": float(leanmass.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Lean Body Mass Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #Systolic BP
        logger.info("Starting Systolic BP Export")
        for systolic in recordlist:
            if systolic.attributes['type'].value == 'HKQuantityTypeIdentifierBloodPressureSystolic':
                client.write_points([{"measurement": "systolic","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": systolic.attributes['startDate'].value,"fields": {"systolic": float(systolic.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Systolic BP Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #Diastolic BP
        logger.info("Starting Diastolic BP Export")
        for diastolic in recordlist:
            if diastolic.attributes['type'].value == 'HKQuantityTypeIdentifierBloodPressureDiastolic':
                client.write_points([{"measurement": "diastolic","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": diastolic.attributes['startDate'].value,"fields": {"diastolic": float(diastolic.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Diastolic Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1


        TEMPTIME = datetime.now() - startTime
        return logger.info(f"Export Completed in {TEMPTIME}")


healthkit_import()
client = healthkit_db()
healthkit_xml()
healthkit_notify()

healthkitcnf.py

#Zip Import / Export Config
#NO NEED TO CHANGE
ExportZip = "export.zip"
ExportDir = "apple_health_export"
ExportXML = "export.xml"

#Dropbox Config
access_token = '*************************'

#InfluxDB Server
IP = 'XXX.XXX.XXX.XXX'
PORT = '8086'
USERNAME = 'user'
PASSWORD = 'password'
DB_NAME = 'database'
NAME = 'Me' #Name of persons data

vCount = 0

#SMTP Config - For Gmail.  Email password is an app password generated from your security settings if using 2 factor auth.
email_user = 'email'
email_pass = 'password'
email_sendto = 'sendto email'
email_subject = 'Healthkit Import Notification'
email_body = 'SUBJECT: Healthkit Import Completed Successfully!'

healthkitfn.py

#Imports for Functions
import logging
import zipfile
import os
import shutil

#Logging Function
def init_logging():
    rootLogger = logging.getLogger('healthkit_logger')

    LOG_DIR = os.getcwd() + '/' + 'logs'
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    fileHandler = logging.FileHandler("{0}/{1}.log".format(LOG_DIR, "healthkit"))
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fileHandler.setFormatter(formatter)

    rootLogger.addHandler(fileHandler)

    rootLogger.setLevel(logging.DEBUG)

    consoleHandler = logging.StreamHandler()
    rootLogger.addHandler(consoleHandler)

    return rootLogger

HKValues.csv

HKQuantityTypeIdentifierBloodGlucose,HKQuantityTypeIdentifierBodyMassIndex,HKQuantityTypeIdentifierHeight,HKQuantityTypeIdentifierBodyMass,HKQuantityTypeIdentifierHeartRate,HKQuantityTypeIdentifierBloodPressureSystolic,HKQuantityTypeIdentifierBloodPressureDiastolic,HKQuantityTypeIdentifierBodyFatPercentage,HKQuantityTypeIdentifierLeanBodyMass,HKQuantityTypeIdentifierStepCount,HKQuantityTypeIdentifierDistanceWalkingRunning,HKQuantityTypeIdentifierBasalEnergyBurned,HKQuantityTypeIdentifierActiveEnergyBurned,HKQuantityTypeIdentifierFlightsClimbed,HKQuantityTypeIdentifierDietaryFatTotal,HKQuantityTypeIdentifierDietaryFatPolyunsaturated,HKQuantityTypeIdentifierDietaryFatMonounsaturated,HKQuantityTypeIdentifierDietaryFatSaturated,HKQuantityTypeIdentifierDietaryCholesterol,HKQuantityTypeIdentifierDietarySodium,HKQuantityTypeIdentifierDietaryCarbohydrates,HKQuantityTypeIdentifierDietaryFiber,HKQuantityTypeIdentifierDietarySugar,HKQuantityTypeIdentifierDietaryEnergyConsumed,HKQuantityTypeIdentifierDietaryProtein,HKQuantityTypeIdentifierDietaryVitaminC,HKQuantityTypeIdentifierDietaryCalcium,HKQuantityTypeIdentifierDietaryIron,HKQuantityTypeIdentifierDietaryPotassium,HKQuantityTypeIdentifierAppleExerciseTime,HKQuantityTypeIdentifierRestingHeartRate,HKQuantityTypeIdentifierWalkingHeartRateAverage

HKValues.txt

HKQuantityTypeIdentifierBloodGlucose
HKQuantityTypeIdentifierBodyMassIndex
HKQuantityTypeIdentifierHeight
HKQuantityTypeIdentifierBodyMass
HKQuantityTypeIdentifierHeartRate
HKQuantityTypeIdentifierBloodPressureSystolic
HKQuantityTypeIdentifierBloodPressureDiastolic
HKQuantityTypeIdentifierBodyFatPercentage
HKQuantityTypeIdentifierLeanBodyMass
HKQuantityTypeIdentifierStepCount
HKQuantityTypeIdentifierDistanceWalkingRunning
HKQuantityTypeIdentifierBasalEnergyBurned
HKQuantityTypeIdentifierActiveEnergyBurned
HKQuantityTypeIdentifierFlightsClimbed
HKQuantityTypeIdentifierDietaryFatTotal
HKQuantityTypeIdentifierDietaryFatPolyunsaturated
HKQuantityTypeIdentifierDietaryFatMonounsaturated
HKQuantityTypeIdentifierDietaryFatSaturated
HKQuantityTypeIdentifierDietaryCholesterol
HKQuantityTypeIdentifierDietarySodium
HKQuantityTypeIdentifierDietaryCarbohydrates
HKQuantityTypeIdentifierDietaryFiber
HKQuantityTypeIdentifierDietarySugar
HKQuantityTypeIdentifierDietaryEnergyConsumed
HKQuantityTypeIdentifierDietaryProtein
HKQuantityTypeIdentifierDietaryVitaminC
HKQuantityTypeIdentifierDietaryCalcium
HKQuantityTypeIdentifierDietaryIron
HKQuantityTypeIdentifierDietaryPotassium
HKQuantityTypeIdentifierAppleExerciseTime
HKQuantityTypeIdentifierRestingHeartRate
HKQuantityTypeIdentifierWalkingHeartRateAverage
HKCategoryTypeIdentifierSleepAnalysis
HKCategoryTypeIdentifierAppleStandHour
HKCategoryTypeIdentifierMindfulSession
HKQuantityTypeIdentifierHeartRateVariabilitySDNN

Use the code files attached above

  1. Edit healthkitcnf.py with Database info and Dropbox Access Token.
  2. Export your Apple Health data from device and select Dropbox.
  3. Run script.

Disclaimer :

Unlike the Fitbit script above , I haven’t run the Healthkit script since I do not own the device yet. So cant vouch for the correctness of this code.

Infact there may be several sections further down including this one where I shall be approaching the topic at a more theoretical level. I would encourage readers to try this code on their local and let me know the results.

However it may be worth mentioning here that in all my previous posts I have been ensuring that I only share code that has been tried and worked on my local even if the original code was copied from the web. Incase you are not aware, 80% of the code found on web fails to work out-of-box because either it is outdated or written for a different OS or you need to change settings to adapt it to your local environment. Several times these tasks take a long time to debug and you might be forced at times to just abandon that approach and search for some other page on the web.

Create an AWS EMR based pyspark job to run heart rate calculations.

Create your AWS account if you haven’t already. Install and configure the AWS Command Line Interface.

Create a new s3 bucket.

Upload the attached pyspark_job.py file to that bucket.

# pyspark_job.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def create_spark_session():
    """Create spark session.
Returns:
        spark (SparkSession) - spark session connected to AWS EMR
            cluster
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages",
                "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
	
	
def process_heartrate_data(spark, input_bucket, output_bucket):

    client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT, username=INFLUXDB_USERNAME, password=INFLUXDB_PASSWORD)
    
    try:

        query = f'from(bucket: \"{input_bucket}\") |> range(start: -10m)'
        tables = client.query_api().query(query, org=org)

        for table in tables:
            print(">>>>>>>>>> Showing table details >>>>>>>>>>>>")
            print(table)
            for row in table.records:
                print(">>>>>>>>>> Showing Rows >>>>>>>>>>>>")
                print (row.values)

    except:
        print('Something went wrong')
        
        
    try:
        #Compute averages here
        data = "mem,host=host1 used_percent=23.43234543"
        
        write_api = client.write_api(write_options=SYNCHRONOUS)
        
        write_api.write(output_bucket, org, data)

        print(f"Data has been saved {data}")
    except:
        print("Something went wrong")      
		
		
def main():
    spark = create_spark_session()
    #bucket_name="IronMan's Bucket"
    process_heartrate_data(spark, input_bucket, output_bucket)
if __name__ == '__main__':
    main()

To install useful packages on all of the nodes of our cluster, we’ll need to create the file emr_bootstrap.sh and add it to a bucket on S3.

#!/bin/bash
sudo pip install -U \
matplotlib \
pandas \
spark-nlp

Create a key pair file

Navigate to EC2 from the homepage of your console:

Select “Key Pairs”

Click “Create Key Pair” then enter a name and click “Create”.

The file emr-key.pem should download automatically. Store it in a directory you’ll remember.

Now run the command below to create the EMR cluster.

aws emr create-cluster --name "Spark cluster with step" \
--release-label emr-5.24.1 \
--applications Name=Spark \
--log-uri s3://your-bucket/logs/ \
--ec2-attributes KeyName=your-key-pair \
--instance-type m5.xlarge \
--instance-count 3 \
--bootstrap-actions Path=s3://your-bucket/emr_bootstrap.sh \
--steps Type=Spark,Name="Spark job",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--master,yarn,s3://your-bucket/pyspark_job.py] \
--use-default-roles \
--auto-terminate

Building a deep learning model for prediction

(this section is work-in-progress)

Build a deep learning model based upon the information provided in the paper below to predict the possibility of covid diagnosis. Kindly note that we are dealing with heart rate alone here and are not concerned with all the other tests mentioned in the paper.

https://www.medrxiv.org/content/10.1101/2020.08.14.20175265v1.full.pdf

Reference :

https://github.com/nu1lx/HealthKit
https://towardsdatascience.com/production-data-processing-with-apache-spark-96a58dfd3fe7
https://towardsdatascience.com/getting-started-with-pyspark-on-amazon-emr-c85154b6b921
https://www.hindawi.com/journals/ddns/2020/6152041/
https://www.mobihealthnews.com/news/early-data-fitbit-study-indicates-it-can-predict-covid-19-symptoms-show
https://blog.fitbit.com/early-findings-covid-19-study/
https://www.medrxiv.org/content/10.1101/2020.08.14.20175265v1.full.pdf
https://medium.com/welltory/alternative-at-home-test-for-coronavirus-heart-rate-variability-326a7237abe7

LocalStack : The zero cost aws cloud subscription

Development using aws cloud services requires a subscription. Development and testing can sometimes be a costly affair for some organizations incurring expenses to the tune of around 1000 USD. LocalStack is a more cheaper way of developing and testing code locally before it is deployed to aws cloud.

Architecture

Objective

The objective of this post is demonstrate the following using LocalStack :

LocalStack in Developer Machine

  1. Create an s3 bucket
  2. Create an SQS queue
  3. Setup an event in s3 such that when a file is loaded into the s3 bucket a notification is sent to sqs queue
  4. Create an AWS step function workflow consisting of
    • A lambda that picks up a file from a s3 bucket and delivers it to a SQS queue and
    • the data in the file is inserted into to DynamoDB

LocalStack in Continuous Integration

  1. Write some pytest unit testcases for a lambda
  2. Build the zip and upload it to LocalStack to create the lambda and
  3. Run the pytest unit test cases and generate report. 

Deploying Application to AWS

  1. Spin up a Jenkins in docker that will then
  2. Download the code for Lambda from git
  3. Run a ci/cd pipeline to deploy the artifacts to s3
  4. Run the unit test cases

Reference :

More information can be found at :

https://localstack.cloud/ or

https://github.com/localstack/localstack

All the steps in this post use the free Community Edition of LocalStack. There are also the Pro and Enterprise editions available at a small subscription fee that come with more powerful AWS services like RDS and AWS IoT.

Setting up :

Install and Configure aws cli

D:\localstack> aws configure --profile stack-profile
AWS Access Key ID [None]: covid
AWS Secret Access Key [None]: covid
Default region name [None]: us-east-1
Default output format [None]: json

Install and Configure docker

D:\localstack> docker --version
Docker version 19.03.12, build 48a66213fe

Create the following docker-compose.yml file

version: '2.1'

services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
    image: localstack/localstack
    ports:
      - "4566-4599:4566-4599"
      - "${PORT_WEB_UI-8080}:${PORT_WEB_UI-8080}"
    environment:
      - SERVICES=${SERVICES- }
      - DEBUG=${DEBUG- }
      - DATA_DIR=${DATA_DIR- }
      - PORT_WEB_UI=${PORT_WEB_UI- }
      - LAMBDA_EXECUTOR=${LAMBDA_EXECUTOR- }
      - KINESIS_ERROR_PROBABILITY=${KINESIS_ERROR_PROBABILITY- }
      - DOCKER_HOST=unix:///var/run/docker.sock
      - HOST_TMP_FOLDER=${TMPDIR}
    volumes:
      - "${TMPDIR:-/tmp/localstack}:/tmp/localstack"
      - "/var/run/docker.sock:/var/run/docker.sock"
docker-compose up -d

LocalStack in Developer Machine

Create a s3 bucket

aws --profile stack-profile --endpoint-url=http://localhost:4566 s3 mb s3://mulan

Create a sqs queue

aws --profile stack-profile --endpoint-url=http://localhost:4566 sqs create-queue --queue-name mulan

Add an event notification configuration to the s3 bucket using notification.json file

aws --profile stack-profile --endpoint-url=http://localhost:4566 s3api put-bucket-notification-configuration --bucket mulan --notification-configuration file://notification.json

Contents of notification.json file

{
    "QueueConfigurations": [
        {
            "QueueArn": "arn:aws:sqs:us-east-1:000000000000:mulan",
            "Events": [
                "s3:ObjectCreated:*"
            ]
        }
    ]
}

Get the arn above from :

aws --profile stack-profile --endpoint-url=http://localhost:4566 sqs get-queue-attributes --queue-url http://localhost:4566/queue/mulan --attribute-names All

Add a file test.csv to the s3 bucket

aws --profile stack-profile --endpoint-url=http://localhost:4566 s3 cp test.csv s3://mulan

Check if a message is received in sqs using the command

aws --profile stack-profile --endpoint-url=http://localhost:4566 sqs receive-message --queue-url http://localhost:4566/queue/mulan

AWS Step function workflow

Create a lambda.py file with the following code

import urllib.parse
import boto3
import json

# print('Loading function')

HOST = "http://127.0.0.1"
# Get the service resource
# To production it's not necessary inform the "endpoint_url" and "region_name"
s3 = boto3.client('s3',
                  endpoint_url= HOST + ":4566",
                  region_name="us-east-1")
sqs = boto3.client('sqs',
                  endpoint_url= HOST + ":4566",
                  region_name="us-east-1")

def handler(event, context):
    # print("Received event: " + json.dumps(event, indent=2))

    # Get the object from the event and show its content type
    bucket = event['Records'][0]['s3']['bucket']['name']
    key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
    url_queue = HOST + ":4566/queue/lambda-tutorial"

    try:

        response = s3.get_object(Bucket=bucket, Key=key)

        deb = {
            "request_id": response['ResponseMetadata']['RequestId'],
            "queue_url": url_queue,
            "key": key,
            "bucket": bucket,
            "message": "aws lambda with localstack..."
        }

        print("#########################################################")
        print("Send Message")
        #Send message to SQS queue
        response = sqs.send_message(
                QueueUrl=deb["queue_url"],
                MessageBody=json.dumps(deb)
        )

        print("response: {}".format(response))

        print("#########################################################")
        print("Receive 10 Messages From SQS Queue")
        response = sqs.receive_message(
            QueueUrl=deb["queue_url"],
            MaxNumberOfMessages=10,
            VisibilityTimeout=0,
            WaitTimeSeconds=0
        )

        print("#########################################################")
        print("Read All Messages From Response")
        messages = response['Messages']
        for message in messages:
            print("Message: {}".format(message))

        print("Final Output: {}".format(json.dumps(response)))
        return json.dumps(response)

    except Exception as e:
        print(e)
        raise e

zip the above lambda.py file into a papi-handler.zip

Create a new lambda function in aws using the command

aws --profile stack-profile --endpoint-url=http://localhost:4566 lambda create-function --function-name papi-handler --runtime python3.8 --handler lambda.handler --memory-size 128 --zip-file fileb://papi-handler.zip --role arn:aws:iam::123456:role/irrelevant
aws --profile stack-profile --endpoint-url=http://localhost:4566 lambda invoke --function-name papi-handler
aws --profile stack-profile --endpoint-url=http://localhost:4566 s3 mb s3://tutorial
aws --profile stack-profile --endpoint-url=http://localhost:4566 s3api put-object --bucket tutorial --key lambda
aws --profile stack-profile --endpoint-url=http://localhost:4566 s3 cp ./test/files/ s3://tutorial/lambda/ --recursive
curl -v http://localhost:4572/tutorial
aws --profile stack-profile --endpoint-url=http://localhost:4566 sqs create-queue --queue-name lambda-tutorial

Create AWS Step function

aws stepfunctions --profile stack-profile --endpoint-url=http://localhost:4566 create-state-machine --definition "{\"Comment\": \"Localstack step function example\",\"StartAt\": \"HelloWorld1\",\"States\": {\"HelloWorld1\": {\"Type\": \"Task\",\"Resource\": \"arn:aws:lambda:us-east-1:000000000000:function:papi-handler\",\"End\": true}}}}}" --name "HelloWorld1" --role-arn "arn:aws:iam::000000000000:role/a-role"

Execute aws step function

aws stepfunctions --profile stack-profile --endpoint-url=http://localhost:4566 start-execution --state-machine arn:aws:states:us-east-1:000000000000:stateMachine:HelloWorld1 --name test
aws stepfunctions --profile stack-profile --endpoint-url=http://localhost:4566 describe-execution --execution-arn arn:aws:states:us-east-1:000000000000:execution:HelloWorld1:test

Note : The above steps contain some minor bugs. I plan to resolve them and update the page over the next couple of days/months whenever I find time off from work.

DynamoDB functions :

#Create a table in DynamoDB
aws --profile stack-profile --endpoint-url=http://localhost:4566 dynamodb create-table --table-name mulan_table  --attribute-definitions AttributeName=first,AttributeType=S AttributeName=second,AttributeType=N --key-schema AttributeName=first,KeyType=HASH AttributeName=second,KeyType=RANGE --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

#List all tables in DynamoDB
aws --profile stack-profile --endpoint-url=http://localhost:4566 dynamodb list-tables

#Describe a table in DynamoDB
aws --profile stack-profile --endpoint-url=http://localhost:4566 dynamodb describe-table --table-name mulan_table

#Put item into a table
aws --profile stack-profile --endpoint-url=http://localhost:4566 dynamodb put-item --table-name test_table  --item "{\"first\":{\"S\":\"Eddy\"},\"second\":{\"N\":\"1542\"}}"

#Put another item
aws --profile stack-profile --endpoint-url=http://localhost:4566 dynamodb put-item --table-name test_table  --item "{\"first\":{\"S\":\"James\"},\"second\":{\"N\":\"6097\"}}"

#Scan a table
aws --profile stack-profile --endpoint-url=http://localhost:4566 dynamodb scan --table-name mulan

#Perform get item on a table
aws --profile stack-profile --endpoint-url=http://localhost:4566 dynamodb get-item --table-name test_table  --key "{\"first\":{\"S\":\"Eddy\"},\"second\":{\"N\":\"1542\"}}"

#Perform query on a table
aws --profile stack-profile --endpoint-url=http://localhost:4566 dynamodb query --table-name test_table --projection-expression "#first, #second" --key-condition-expression "#first = :value" --expression-attribute-values "{\":value\" : {\"S\":\"James\"}}" --expression-attribute-names "{\"#first\":\"first\", \"#second\":\"second\"}"

LocalStack in Continuous Integration

In this section we shall write the following :

Lambda.py :  A simple hello world lambda

Testutils.py : A utility to create the lambda function

Test_lambda.py : A utility that runs the unit test cases on lambda.py

  • Create a lambda.py as per the source code provided below
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info("I've been called!")
    return {
        "message": "Hello LocalStack pytest!"
    }
  • Create a testutils.py as per the source code provided below
import json
import os
from zipfile import ZipFile

import boto3
import botocore

CONFIG = botocore.config.Config(retries={'max_attempts': 0})
LAMBDA_ZIP = './lambda.zip'


def get_lambda_client():
    return boto3.client(
        'lambda',
        aws_access_key_id='covid',
        aws_secret_access_key='covid',
        region_name='us-east-1',
        endpoint_url='http://localhost:4574',
        config=CONFIG
    )


def create_lambda_zip(function_name):
    with ZipFile(LAMBDA_ZIP, 'w') as z:
        z.write(function_name + '.py')


def create_lambda(function_name):
    lambda_client = get_lambda_client()
    create_lambda_zip(function_name)
    with open(LAMBDA_ZIP, 'rb') as f:
        zipped_code = f.read()
    lambda_client.create_function(
        FunctionName=function_name,
        Runtime='python3.6',
        Role='role',
        Handler=function_name + '.handler',
        Code=dict(ZipFile=zipped_code)
    )


def delete_lambda(function_name):
    lambda_client = get_lambda_client()
    lambda_client.delete_function(
        FunctionName=function_name
    )
    os.remove(LAMBDA_ZIP)


def invoke_function_and_get_message(function_name):
    lambda_client = get_lambda_client()
    response = lambda_client.invoke(
        FunctionName=function_name,
        InvocationType='RequestResponse'
    )
    return json.loads(
        response['Payload']
        .read()
        .decode('utf-8')
    )

Create a test_lambda.py as per the source code provided below :

import testutils
from unittest import TestCase


class Test(TestCase):

    @classmethod
    def setup_class(cls):
        print('\r\nSetting up the class')
        testutils.create_lambda('lambda')

    @classmethod
    def teardown_class(cls):
        print('\r\nTearing down the class')
        testutils.delete_lambda('lambda')

    def test_that_lambda_returns_correct_message(self):
        payload = testutils.invoke_function_and_get_message('lambda')
        self.assertEqual(payload['message'], 'Hello LocalStack pytest!')

Run

pytest -s .

To check the output below

Deploying Application to AWS

For deploying the application to aws we need to first setup the CI/CD process locally. Lets start with Jenkins

Spinup Jenkins in a docker container locally

sudo yum install python3
sudo yum install python3-pip
sudo pip3 install -U pytest
pip3 install pytest-html
sudo pip3 install pylint


git clone https://github.com/4OH4/jenkins-docker.git

cd jenkins-docker

docker build -t jenkins-docker .

docker run -it -p 8080:8080 -p 50000:50000 -v jenkins_home:/var/jenkins_home -v /var/run/docker.sock:/var/run/docker.sock --restart unless-stopped jenkins-docker

Create a new pipeline job

  1. Click New Item on your Jenkins home page, enter a name for your (pipelinejob, select Pipeline, and click OK.
  2. In the Script text area of the configuration screen configure the below script.
pipeline {
	environment {

		AWS_ID = credentials("awsaccesskey")
		AWS_ACCESS_KEY_ID = "XXXXXXXXXXXXXX"
		AWS_SECRET_ACCESS_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

	}

	agent any

	stages {
		stage('checkout') {
			steps {
				git branch: 'master', credentialsId: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx', url: 'https://github.com/mulan/cfcode.git'

			}
		}


		stage('pylint') {
			steps {
				sh ''
				'
				cd $WORKSPACE / hello_world

					/
					usr / local / bin / pylint app.py > pylintreport.txt--exit - zero

				''
				'


			}
		}

		stage('validate cloudformation template') {
			steps {
				sh ''
				'
				aws--region us - east - 1 cloudformation validate - template--template - body file: //template.yaml


					''
				'

			}
		}

		stage('zip the sourcefiles') {
			steps {
				sh ''
				'
				zip - r hello_world.zip hello_world


				''
				'

			}
		}


		stage('push the sourfiles to S3') {
			steps {
				sh ''
				'

				aws s3 cp hello_world.zip s3: //mulan-folder/hello_world.zip


					''
				'

			}
		}

		stage('SamPackage') {
			steps {
				sh ''
				'

				aws--region us - east - 1 cloudformation package--template template.yaml--s3 - bucket awsdeploy001--output - template - file template.packaged.yml

				''
				'

			}
		}

		stage('deploy') {
			steps {
				sh ''
				'

				aws--region us - east - 1 cloudformation deploy--template - file template.packaged.yml--stack - name mymulanstack--capabilities CAPABILITY_IAM

				''
				'

			}
		}
		stage('unitest') {
			steps {
				sh ''
				'

				cd $WORKSPACE / hello_world


					/
					usr / local / bin / pytest test_handler.py--html = Report.html

				''
				'

			}
		}

	}
}

Use the below template.yaml file for the above pipeline

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: >
  AWS

  Sample SAM Template for AWS

# More info about Globals: https://github.com/awslabs/serverless-application-model/blob/master/docs/globals.rst
Globals:
  Function:
    Timeout: 3

Resources:
  HelloWorldFunction:
    Type: AWS::Serverless::Function # More info about Function Resource: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#awsserverlessfunction
    Properties:
      CodeUri: "s3://mulan-folder/hello_world.zip"
      Handler: app.lambda_handler
      Runtime: python3.8
      Events:
        HelloWorld:
          Type: Api # More info about API Event Source: https://github.com/awslabs/serverless-application-model/blob/master/versions/2016-10-31.md#api
          Properties:
            Path: /hello
            Method: get

Outputs:
  # ServerlessRestApi is an implicit API created out of Events key under Serverless::Function
  # Find out more about other implicit resources you can reference within SAM
  # https://github.com/awslabs/serverless-application-model/blob/master/docs/internals/generated_resources.rst#api
  HelloWorldApi:
    Description: "API Gateway endpoint URL for Prod stage for Hello World function"
    Value: !Sub "https://${ServerlessRestApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/hello/"
  HelloWorldFunction:
    Description: "Hello World Lambda Function ARN"
    Value: !GetAtt HelloWorldFunction.Arn
  HelloWorldFunctionIamRole:
    Description: "Implicit IAM Role created for Hello World function"
    Value: !GetAtt HelloWorldFunctionRole.Arn

Check the stage view and trigger the Jenkins job as per screenshots below

After the jenkins job succeeds the reports are published in the following format

pylint report :

************* Module app
app.py:14:0: C0301: Line too long (166/100) (line-too-long)
app.py:25:0: C0301: Line too long (118/100) (line-too-long)
app.py:1:0: C0114: Missing module docstring (missing-module-docstring)
app.py:6:19: W0613: Unused argument 'event' (unused-argument)
app.py:6:26: W0613: Unused argument 'context' (unused-argument)

--------------------------------------------------------------------
Your code has been rated at -6.67/10 (previous run: -6.67/10, +0.00)

Test report

Python based framework for FOTA (Firmware Over The Air)

A very common requirement in IoT based systems is to remotely install updated versions of firmware on devices.

Presented in this blog is a python based generic framework for performing the update.

Salient features :

  1. Meager code footprint of less than 4 KB.
  2. Secure download from s3 bucket using access key and secret.
  3. Secure device pull methodology vs cloud push updates.
  4. Extraction and installation without human intervention.
  5. Firmware file stored in encrypted format with encryption key in s3

High level Architecture

Setup process flow

Source code files

Code to encrypt the zip file (crypt.py)

from cryptography.fernet import Fernet
import os
import hashlib


def write_key():
    """
    Generates a key and save it into a file
    """
    key = Fernet.generate_key()
    with open("key.key", "wb") as key_file:
        key_file.write(key)

def load_key():
    """
    Loads the key from the current directory named `key.key`
    """
    return open("key.key", "rb").read()


def encrypt(filename, key):
    """
    Given a filename (str) and key (bytes), it encrypts the file and write it
    """
    f = Fernet(key)
    with open(filename, "rb") as file:
        # read all file data
        file_data = file.read()
    # encrypt data
    encrypted_data = f.encrypt(file_data)
    # write the encrypted file
    with open(filename, "wb") as file:
        file.write(encrypted_data)


def decrypt(filename, key):
    """
    Given a filename (str) and key (bytes), it decrypts the file and write it
    """
    f = Fernet(key)
    with open(filename, "rb") as file:
        # read the encrypted data
        encrypted_data = file.read()
    # decrypt data
    decrypted_data = f.decrypt(encrypted_data)
    # write the original file
    with open(filename, "wb") as file:
        file.write(decrypted_data)


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Simple File Encryptor Script")
    parser.add_argument("file", help="File to encrypt/decrypt")
    parser.add_argument("-g", "--generate-key", dest="generate_key", action="store_true",
                        help="Whether to generate a new key or use existing")
    parser.add_argument("-e", "--encrypt", action="store_true",
                        help="Whether to encrypt the file, only -e or -d can be specified.")
    parser.add_argument("-d", "--decrypt", action="store_true",
                        help="Whether to decrypt the file, only -e or -d can be specified.")

    args = parser.parse_args()
    file = args.file
    generate_key = args.generate_key

    if generate_key:
        write_key()
    # load the key
    key = load_key()

    encrypt_ = args.encrypt
    decrypt_ = args.decrypt

    if encrypt_ and decrypt_:
        raise TypeError("Please specify whether you want to encrypt the file or decrypt it.")
    elif encrypt_:
        encrypt(file, key)
        calculated_checksum = hashlib.md5(open(file,'rb').read()).hexdigest()
        print("Calculated checksum : " , calculated_checksum)
    elif decrypt_:
        decrypt(file, key)
    else:
        raise TypeError("Please specify whether you want to encrypt the file or decrypt it.")
python crypt.py <path>/firmware.zip --encrypt

Code to calculate checksum of encrypted zip file (calc-checksum.py)

import boto3
import botocore
import hashlib
from boto3 import client
import zipfile
import subprocess
import os
import json
from cryptography.fernet import Fernet



def readConfig():
    with open("config.yaml", 'r') as file:
        data = file.read()
        line = json.loads(json.dumps(data))
    
    for key in line.split('\n'):
        val1, val2 = key.split(":")
        if val1 == "file_name":
            file_name = val2
    file.close()
    return file_name


    
def checksum():

    
    file_name = readConfig()
    calculated_checksum = hashlib.md5(open(file_name,'rb').read()).hexdigest()
    print("Calculated checksum : " , calculated_checksum)   

    
checksum()

Key source file that implements FOTA (filedownload.py)

import boto3
import botocore
import hashlib
from boto3 import client
import zipfile
import subprocess
import os
import json
from cryptography.fernet import Fernet

#pip3 install cryptography

BUCKET_NAME = 'xxx-download' 
FILE_KEY = 'firmware.zip' 
CHECKSUM_KEY = 'checksum.txt'
ECDC_KEY = 'key.key'

def readConfig():
    with open("config.yaml", 'r') as file:
        data = file.read()
        line = json.loads(json.dumps(data))
    
    for key in line.split('\n'):
        val1, val2 = key.split(":")
        if val1 == "accesskey":
            accesskey = val2
        if val1 == "secretkey":
            secretkey = val2
        if val1 == "region":
            region = val2 
    file.close()
    return accesskey, secretkey, region



def encrypt(filename, key):
    """
    Given a filename (str) and key (bytes), it encrypts the file and write it
    """
    f = Fernet(key)
    with open(filename, "rb") as file:
        # read all file data
        file_data = file.read()
    # encrypt data
    encrypted_data = f.encrypt(file_data)
    # write the encrypted file
    with open(filename, "wb") as file:
        file.write(encrypted_data)


def decrypt(filename, key):
    """
    Given a filename (str) and key (bytes), it decrypts the file and write it
    """
    f = Fernet(key)
    with open(filename, "rb") as file:
        # read the encrypted data
        encrypted_data = file.read()
    # decrypt data
    decrypted_data = f.decrypt(encrypted_data)
    # write the original file
    with open(filename, "wb") as file:
        file.write(decrypted_data)

    
def index():
    session = boto3.Session()
    accesskey, secretkey, region = readConfig()
    s3 = session.resource(
    "s3",
    region_name=region,
    aws_access_key_id=accesskey,
    aws_secret_access_key=secretkey)
    try:
        s3.Bucket(BUCKET_NAME).download_file(FILE_KEY, 'local_firmware.zip')
        print('File downloaded successfully')
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise
            
    try:
        s3.Bucket(BUCKET_NAME).download_file(CHECKSUM_KEY, 'local_checksum.txt')
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise 

    try:
        s3.Bucket(BUCKET_NAME).download_file(ECDC_KEY, 'local_key.key')
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise             
            
    checksum()

    
def checksum():

    ecdc_file_name = '<path>/local_key.key'
    downloaded_key = open(ecdc_file_name).read()
    print("Downloaded Encryption Decryption Key : " ,downloaded_key)
    
    
    checksum_file_name = '<path>/local_checksum.txt'
    downloaded_checksum = open(checksum_file_name).read()
    print("Downloaded Checksum : " , downloaded_checksum)
    
    file_name = '<path>/local_firmware.zip'
    calculated_checksum = hashlib.md5(open(file_name,'rb').read()).hexdigest()
    print("Calculated checksum : " , calculated_checksum)   

    if downloaded_checksum==calculated_checksum:
        print('Checksum verified successfully')
        

    decrypt(file_name, downloaded_key)
    
    with zipfile.ZipFile(file_name,"r") as zip_ref:
        zip_ref.extractall("<path>") 

    print('File unzipped to <path> successfully')
    
    
    os.startfile("<path>/firmware.exe")
    #pyinstaller.exe --onefile --windowed app.py
    
    if os.path.exists("<path>/local_key.key"):
        os.remove("<path>/local_key.key")
    else:
        print("The local key file does not exist")
    
index()

Configuration file (config.yaml)

accesskey:XXXXXXXXXXXXXXXX
secretkey:XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
region:xx-west-x
file_name:xyz.zip

Sample firmware file used (firmware.c)

It is a c file , compiled into an exe and zipped.

#include <stdio.h>
int main() {
   // printf() displays the string inside quotation
   printf("Kindly wait till your software is installed");
   sleep(10000);
   printf("\nInstallation completed");
   sleep(1000);
   return 0;
}

FOTA Code in action

Creating a monitoring dashboard using Prometheus and Grafana

Prometheus is a monitoring platform that collects metrics from monitored targets by scraping metrics HTTP endpoints on these targets.

Grafana is a graphical tool that connects to Prometheus and helps build a visualization dashboard.

In this blog

  1. We shall spinup Prometheus as a docker container on a Ubuntu based server in Azure cloud.
  2. Install Node exporter , a tool that scrapes metrics from the host linux machine
  3. Spin Up Grafana as another docker container on the same Ubuntu server
  4. Create a datasource from Grafana to Prometheus
  5. Build a dashboard to view the Node exporter metrics
  • Spin Up an Ubuntu server in Azure cloud
  • copy the following Prometheus configuration file : prometheus.yml to /home/ubuntu/config directory
scrape_configs:
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
    static_configs:
    - targets: ['xx.yy.ww.zz:9090']

  - job_name: 'node'
    static_configs:
    - targets: ['xx.yy.ww.zz:9100']

where xx.yy.ww.zz is the public ip address of the Azure server.

The job name ‘node’ tells the Prometheus instance to scrape from the Node Exporter via xx.yy.ww.zz:9100

  • download , install and startup node exporter
sudo wget https://github.com/prometheus/node_exporter/releases/download/v1.0.1/node_exporter-1.0.1.linux-amd64.tar.gz

tar zxvf node_exporter-1.0.1.linux-amd64.tar.gz

cd node_exporter-1.0.1.linux-amd64

./node_exporter
  • Fireup Prometheus docker container
sudo apt-get install docker.io

sudo docker run --name prometheus -d -p 0.0.0.0:9090:9090 --volume="$PWD/config":/etc/config prom/prometheus --config.file=/etc/config/prometheus.yml

The ip address is entered 0.0.0.0 above in order to facilitate remote access of the server. Its ironic that all content on the web that provides information on setting up Prometheus or for that matter any other tool, is setup to run on localhost or 127.0.0.1. Who would ever run a server on localhost ?

  • Fireup Grafana docker container
sudo docker run -d --name=grafana -p 0.0.0.0:3000:3000 grafana/grafana
  • Ensure that ports 9090,3000 and 9100 are opened up for inbound traffic in Azure and also in Ubuntu using
sudo ufw allow <port>
  • Incase of any issues stop and restart the docker containers using the commands
docker container stop 65dccb7ea5eb970da62f639457bbc28962ac90f7bc84f2fcb10b1fa3a9691aa1

docker container rm 65dccb7ea5eb970da62f639457bbc28962ac90f7bc84f2fcb10b1fa3a9691aa1
  • Create a new datasource for Prometheus in Grafana.
  • Enter node exporter metrics id in the field and click load.
  • View the real time metrics generated.