Through this blog post I shall be creating a solution architecture for the following
- Collecting wearable devices data from multiple users. For example heartrate from Fitbit and Apple HealthKit devices.
- Ingesting this data into an influxdb database running inside a docker container
- Running a pyspark job on aws EMR cluster to calculate heart rate variability, monitor the resting heart rate and breathing rate increase
- Generate values that could possibly be used in a Deep Learning Model to predict the onset of Coronavirus in a patient atleast a week before actual symptoms showup.
Medical study indicates that breathing rate, resting heart rate and heart rate variability can be used to detect the onset of illnesses, especially when tracked when the user is asleep.
Researchers have found that in many cases heart rate variability decreases, while resting heart rate and breathing rate increase, in people who have Covid-19 symptoms. In some cases, those measurements could signal changes in a person’s health nearly a week before symptoms were reported.
Setup influxDB locally.
We shall go with influxDB since it is a good timeseries database, and available in opensource versions.
Copy the attached conf file to D:/ProgramData/InfluxDB
docker run -p 8086:8086 -v C:/ProgramData/InfluxDB:/var/lib/influxdb influxdb -config /var/lib/influxdb/influxdb.conf
Verify using
http://localhost:8086/query?q=show%20databases

Inorder to use InfluxDB cli
$ docker ps
copy image id
$ docker exec -it 10733ba6d014 /bin/bash
root@10733ba6d014:/# influx
Ingesting data from Fitbit devices into InfluxDB.
create a new app by registering into Fitbit at https://dev.fitbit.com/apps/new

Request all users who would like to share their Fitbit data with you to login to their respective Fitbit apps and authorize your app.
Let us now create a database in InfluxDB to store all our participant user’s Fitbit data.
$ create database fitbit
$ use fitbit
CREATE USER root WITH PASSWORD 'root' WITH ALL PRIVILEGES

execute the code provided below
#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import sys
import os
import pytz
from datetime import datetime, date, timedelta
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
LOCAL_TIMEZONE = pytz.timezone('America/New_York')
FITBIT_LANGUAGE = 'en_US'
FITBIT_CLIENT_ID = '2XXX2'
FITBIT_CLIENT_SECRET = 'cc2f9f738c695a763xxxxxxxxxx4480'
FITBIT_ACCESS_TOKEN = \
'eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiIyMkJUWDIiLCJzdWIiOiI2WUxXM0YiLCJpc3MiOiJGaXRiaXQiLCJ0eXAiOiJhY2Nlc3NfdGxxxxxxxxxxxxxxxiJ3aHIgd3BybyB3bnV0IHdzbGUgd3dlaSB3c29jIHdzZXQgd2FjdCB3bG9jIiwiZXhwIjoxNjAwMTQzNDUzLCJpYXQiOjE2MDAxMTQ2NTN9.506-BPp5P5rRcTwBujGszfWZAziCJV2WY7_-QIRybQM'
FITBIT_INITIAL_CODE = '37f3932f38322df1547xxxxxxxxx33398aa6e0'
REDIRECT_URI = 'https://localhost'
INFLUXDB_HOST = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_USERNAME = 'root'
INFLUXDB_PASSWORD = 'root'
INFLUXDB_DATABASE = 'fitbit'
points = []
def fetch_data(category, type):
try:
response = requests.get('https://api.fitbit.com/1/user/-/'
+ category + '/' + type
+ '/date/today/1d.json',
headers={'Authorization': 'Bearer '
+ FITBIT_ACCESS_TOKEN,
'Accept-Language': FITBIT_LANGUAGE})
response.raise_for_status()
except requests.exceptions.HTTPError, err:
print 'HTTP request failed: %s' % err
sys.exit()
data = response.json()
print 'Got ' + type + ' from Fitbit'
for day in data[category.replace('/', '-') + '-' + type]:
points.append({'measurement': type,
'time': LOCAL_TIMEZONE.localize(datetime.fromisoformat(day['dateTime'
])).astimezone(pytz.utc).isoformat(),
'fields': {'value': float(day['value'])}})
def fetch_heartrate(date):
try:
response = \
requests.get('https://api.fitbit.com/1/user/-/activities/heart/date/'
+ date + '/1d/1min.json',
headers={'Authorization': 'Bearer '
+ FITBIT_ACCESS_TOKEN,
'Accept-Language': FITBIT_LANGUAGE})
response.raise_for_status()
except requests.exceptions.HTTPError, err:
print 'HTTP request failed: %s' % err
sys.exit()
data = response.json()
print 'Got heartrates from Fitbit'
for day in data['activities-heart']:
if 'restingHeartRate' in day['value']:
points.append({'measurement': 'restingHeartRate',
'time': datetime.fromisoformat(day['dateTime'
]), 'fields': {'value': float(day['value'
]['restingHeartRate'])}})
if 'heartRateZones' in day['value']:
for zone in day['value']['heartRateZones']:
if 'caloriesOut' in zone and 'min' in zone and 'max' \
in zone and 'minutes' in zone:
points.append({
'measurement': 'heartRateZones',
'time': datetime.fromisoformat(day['dateTime'
]),
'tags': {'zone': zone['name']},
'fields': {
'caloriesOut': float(zone['caloriesOut']),
'min': float(zone['min']),
'max': float(zone['max']),
'minutes': float(zone['minutes']),
},
})
elif 'min' in zone and 'max' in zone and 'minutes' \
in zone:
points.append({
'measurement': 'heartRateZones',
'time': datetime.fromisoformat(day['dateTime'
]),
'tags': {'zone': zone['name']},
'fields': {'min': float(zone['min']),
'max': float(zone['max']),
'minutes': float(zone['minutes'])},
})
if 'activities-heart-intraday' in data:
for value in data['activities-heart-intraday']['dataset']:
time = datetime.fromisoformat(date + 'T' + value['time'])
utc_time = \
LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
points.append({'measurement': 'heartrate',
'time': utc_time,
'fields': {'value': float(value['value'])}})
def process_levels(levels):
for level in levels:
type = level['level']
if type == 'asleep':
type = 'light'
if type == 'restless':
type = 'rem'
if type == 'awake':
type = 'wake'
time = datetime.fromisoformat(level['dateTime'])
utc_time = \
LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
points.append({'measurement': 'sleep_levels', 'time': utc_time,
'fields': {'seconds': int(level['seconds'])}})
def fetch_activities(date):
try:
response = \
requests.get('https://api.fitbit.com/1/user/-/activities/list.json'
, headers={'Authorization': 'Bearer '
+ FITBIT_ACCESS_TOKEN,
'Accept-Language': FITBIT_LANGUAGE}, params={
'beforeDate': date,
'sort': 'desc',
'limit': 10,
'offset': 0,
})
response.raise_for_status()
except requests.exceptions.HTTPError, err:
print 'HTTP request failed: %s' % err
sys.exit()
data = response.json()
print 'Got activities from Fitbit'
for activity in data['activities']:
fields = {}
if 'activeDuration' in activity:
fields['activeDuration'] = int(activity['activeDuration'])
if 'averageHeartRate' in activity:
fields['averageHeartRate'] = int(activity['averageHeartRate'
])
if 'calories' in activity:
fields['calories'] = int(activity['calories'])
if 'duration' in activity:
fields['duration'] = int(activity['duration'])
if 'distance' in activity:
fields['distance'] = float(activity['distance'])
fields['distanceUnit'] = activity['distanceUnit']
if 'pace' in activity:
fields['pace'] = float(activity['pace'])
if 'speed' in activity:
fields['speed'] = float(activity['speed'])
if 'elevationGain' in activity:
fields['elevationGain'] = int(activity['elevationGain'])
if 'steps' in activity:
fields['steps'] = int(activity['steps'])
for level in activity['activityLevel']:
if level['name'] == 'sedentary':
fields[level['name'] + 'Minutes'] = int(level['minutes'
])
else:
fields[level['name'] + 'ActiveMinutes'] = \
int(level['minutes'])
time = datetime.fromisoformat(activity['startTime'].strip('Z'))
utc_time = time.astimezone(pytz.utc).isoformat()
points.append({
'measurement': 'activity',
'time': utc_time,
'tags': {'activityName': activity['activityName']},
'fields': fields,
})
try:
client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT,
username=INFLUXDB_USERNAME,
password=INFLUXDB_PASSWORD)
client.create_database(INFLUXDB_DATABASE)
client.switch_database(INFLUXDB_DATABASE)
except InfluxDBClientError, err:
print 'InfluxDB connection failed: %s' % err
sys.exit()
if not FITBIT_ACCESS_TOKEN:
if os.path.isfile('.fitbit-refreshtoken'):
f = open('.fitbit-refreshtoken', 'r')
token = f.read()
f.close()
response = requests.post('https://api.fitbit.com/oauth2/token',
data={
'client_id': FITBIT_CLIENT_ID,
'grant_type': 'refresh_token',
'redirect_uri': REDIRECT_URI,
'refresh_token': token,
}, auth=(FITBIT_CLIENT_ID, FITBIT_CLIENT_SECRET))
else:
response = requests.post('https://api.fitbit.com/oauth2/token',
data={
'client_id': FITBIT_CLIENT_ID,
'grant_type': 'authorization_code',
'redirect_uri': REDIRECT_URI,
'code': FITBIT_INITIAL_CODE,
}, auth=(FITBIT_CLIENT_ID, FITBIT_CLIENT_SECRET))
response.raise_for_status()
json = response.json()
FITBIT_ACCESS_TOKEN = json['access_token']
refresh_token = json['refresh_token']
f = open('.fitbit-refreshtoken', 'w+')
f.write(refresh_token)
f.close()
end = date.today()
start = end - timedelta(days=1)
try:
response = \
requests.get('https://api.fitbit.com/1.2/user/-/sleep/date/'
+ start.isoformat() + '/' + end.isoformat()
+ '.json', headers={'Authorization': 'Bearer '
+ FITBIT_ACCESS_TOKEN,
'Accept-Language': FITBIT_LANGUAGE})
response.raise_for_status()
except requests.exceptions.HTTPError, err:
print 'HTTP request failed: %s' % err
sys.exit()
data = response.json()
print 'Got sleep sessions from Fitbit'
for day in data['sleep']:
time = datetime.fromisoformat(day['startTime'])
utc_time = \
LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
if day['type'] == 'stages':
points.append({'measurement': 'sleep', 'time': utc_time,
'fields': {
'duration': int(day['duration']),
'efficiency': int(day['efficiency']),
'is_main_sleep': bool(day['isMainSleep']),
'minutes_asleep': int(day['minutesAsleep']),
'minutes_awake': int(day['minutesAwake']),
'time_in_bed': int(day['timeInBed']),
'minutes_deep': int(day['levels']['summary']['deep'
]['minutes']),
'minutes_light': int(day['levels']['summary']['light'
]['minutes']),
'minutes_rem': int(day['levels']['summary']['rem']['minutes'
]),
'minutes_wake': int(day['levels']['summary']['wake'
]['minutes']),
}})
else:
points.append({'measurement': 'sleep', 'time': utc_time,
'fields': {
'duration': int(day['duration']),
'efficiency': int(day['efficiency']),
'is_main_sleep': bool(day['isMainSleep']),
'minutes_asleep': int(day['minutesAsleep']),
'minutes_awake': int(day['minutesAwake']),
'time_in_bed': int(day['timeInBed']),
'minutes_deep': 0,
'minutes_light': int(day['levels']['summary']['asleep'
]['minutes']),
'minutes_rem': int(day['levels']['summary']['restless'
]['minutes']),
'minutes_wake': int(day['levels']['summary']['awake'
]['minutes']),
}})
if 'data' in day['levels']:
process_levels(day['levels']['data'])
if 'shortData' in day['levels']:
process_levels(day['levels']['shortData'])
fetch_data('activities', 'steps')
fetch_data('activities', 'distance')
fetch_data('activities', 'floors')
fetch_data('activities', 'elevation')
fetch_data('activities', 'distance')
fetch_data('activities', 'minutesSedentary')
fetch_data('activities', 'minutesLightlyActive')
fetch_data('activities', 'minutesFairlyActive')
fetch_data('activities', 'minutesVeryActive')
fetch_data('activities', 'calories')
fetch_data('activities', 'activityCalories')
fetch_data('body', 'weight')
fetch_data('body', 'fat')
fetch_data('body', 'bmi')
fetch_data('foods/log', 'water')
fetch_data('foods/log', 'caloriesIn')
fetch_heartrate(date.today().isoformat())
fetch_activities((date.today() + timedelta(days=1)).isoformat())
try:
client.write_points(points)
except InfluxDBClientError, err:
print 'Unable to write points to InfluxDB: %s' % err
sys.exit()
print 'Successfully wrote %s data points to InfluxDB' % len(points)
The primary artifact needed for executing the code above is the access token.
For fetching the access token we first need the code which is obtained by clicking on the link like below.
https://www.fitbit.com/oauth2/authorize?response_type=code&client_id=2XXXX2&redirect_uri=http%3A%2F%2F127.0.0.1%3A8085&scope=activity%20heartrate%20location%20nutrition%20profile%20settings%20sleep%20social%20weight&expires_in=604800
This keeps changing, the proper procedure to get a valid one is click on the tutorial link on the page below and follow the steps
https://dev.fitbit.com/apps/oauthinteractivetutorial?clientEncodedId=2XXXX2&clientSecret=cc2f9f738c695a763c0ecxxxxxxc4480&redirectUri=http://127.0.0.1:8085&applicationType=SERVER


Once the code is executed successfully, you will observe the following output.

Query the influxdb to verify the data inserted



Ingesting data from Apple Healthkit devices into InfluxDB.
use the following files :
healthkit.py
import healthkitfn
import healthkitcnf
import zipfile
import os
import shutil
import dropbox
from influxdb import InfluxDBClient
from datetime import datetime
from xml.dom import minidom
import csv
#Initiate Logging
logger = healthkitfn.init_logging()
#Temp time for tracking
startTime = datetime.now()
###################
# Start Functions #
###################
#Dropbox Download export.zip Function
def healthkit_import():
#Connect To Dropbox
dbx = dropbox.Dropbox(healthkitcnf.access_token)
#Clean Existing Files
if os.path.isdir(healthkitcnf.ExportDir):
shutil.rmtree(healthkitcnf.ExportDir)
logger.info(f"Removed {healthkitcnf.ExportDir}")
if os.path.isfile(healthkitcnf.ExportZip):
os.remove(healthkitcnf.ExportZip)
logger.info(f"Removed {healthkitcnf.ExportZip}")
if os.path.isfile(healthkitcnf.ExportXML):
os.remove(healthkitcnf.ExportXML)
logger.info(f"Removed {healthkitcnf.ExportXML}")
#Download New export.zip and unzip
with open("export.zip", "wb") as f:
metadata, res = dbx.files_download(path="/export.zip")
f.write(res.content)
logger.info('Downloaded export.zip Successfully')
zip_ref = zipfile.ZipFile(healthkitcnf.ExportZip, 'r')
zip_ref.extractall()
zip_ref.close()
logger.info("Unzipped export.zip Successfully")
shutil.copy("apple_health_export/export.xml", healthkitcnf.ExportXML)
logger.info("Copied export.xml to primary directory")
return logger.info('Download and Copy Completed')
#Connect to Database
def healthkit_db():
client = InfluxDBClient(healthkitcnf.IP, healthkitcnf.PORT, healthkitcnf.USERNAME, healthkitcnf.PASSWORD, healthkitcnf.DB_NAME)
client.create_database(healthkitcnf.DB_NAME)
logger.info('Connected to Database Successfully')
return client
#Load HK Type Values Into Array
def healthkit_text():
f = open('HKValues.txt', 'r')
HKValues = f.readlines()
f.close()
return HKValues
def healthkit_csv():
with open('HKValues.csv', newline='') as csvfile:
HKValues = list(csv.reader(csvfile))
return HKValues
#Send Notification once completed
def healthkit_notify():
try:
email_ssl = smtplib.SMTP_SSL('smtp.gmail.com', 465)
email_ssl.ehlo
email_ssl.login(healthkitcnf.email_user, healthkitcnf.email_pass)
sent_from = healthkitcnf.email_user
send_to = healthkitcnf.email_sendto
subject = healthkitcnf.email_subject
email_text = healthkitcnf.email_body
email_ssl.sendmail(sent_from, send_to, email_text)
email_ssl.close()
logger.info("Notification Sent")
except:
logger.debug("Failed to send completed notification.")
return (logger.info("Email sent"))
#XML Parse / Import HKValues.txt / Update DB
def healthkit_xml():
NAME = healthkitcnf.NAME
#Stat Keeping
NUMRECORDS = 12
RECORDINC = 1
#Setup XML Parse
logger.info("Importing XML Into Record List")
xmldoc = minidom.parse('export.xml')
recordlist = xmldoc.getElementsByTagName('Record')
logger.info('Imported XML Records Successfully')
#Import Healthkit Values Into Array
#HKValues = healthkit_csv()
#logger.info('Imported Health Kit Type Values')
logger.info("Starting Heart Rate Export")
for s in recordlist:
if s.attributes['type'].value == "HKQuantityTypeIdentifierHeartRate":
client.write_points([{"measurement": "heartrate","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": s.attributes['startDate'].value,"fields": {"watch_heartrate": float(s.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#Resting Heart Rate
logger.info("Starting Resting Heart Rate Export")
for restingHeart in recordlist:
if restingHeart.attributes['type'].value == "HKQuantityTypeIdentifierRestingHeartRate":
client.write_points([{"measurement": "restingheartrate","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": restingHeart.attributes['startDate'].value,"fields": {"resting_heartrate": float(restingHeart.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Resting Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#Walking Heart Rate
logger.info("Starting Walking Heart Rate Export")
for walkingHeart in recordlist:
if walkingHeart.attributes['type'].value == "HKQuantityTypeIdentifierWalkingHeartRateAverage":
client.write_points([{"measurement": "walkingHeart","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": walkingHeart.attributes['startDate'].value,"fields": {"walking_heartrate": float(walkingHeart.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Walking Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#distance walked
logger.info("Starting Distance Walked Export")
for distance in recordlist:
if distance.attributes['type'].value == 'HKQuantityTypeIdentifierDistanceWalkingRunning':
client.write_points([{"measurement": "distance","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": distance.attributes['startDate'].value,"fields": {"distance": float(distance.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Distance Walked Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#basal calories
logger.info("Starting Basal Calories Export")
for basal in recordlist:
if basal.attributes['type'].value == 'HKQuantityTypeIdentifierBasalEnergyBurned':
client.write_points([{"measurement": "basal","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": basal.attributes['startDate'].value,"fields": {"basal": float(basal.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Basal Calories Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#active calories
logger.info("Starting Active Calories Export")
for active in recordlist:
if active.attributes['type'].value == 'HKQuantityTypeIdentifierActiveEnergyBurned':
client.write_points([{"measurement": "active","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": active.attributes['startDate'].value,"fields": {"active": float(active.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Active Calories Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#BMI
logger.info("Starting BMI Export")
for bmi in recordlist:
if bmi.attributes['type'].value == 'HKQuantityTypeIdentifierBodyMassIndex':
client.write_points([{"measurement": "bmi","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": bmi.attributes['startDate'].value,"fields": {"bmi": float(bmi.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"BMI Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#WEIGHT
logger.info("Starting Weight Export")
for weight in recordlist:
if weight.attributes['type'].value == 'HKQuantityTypeIdentifierBodyMass':
client.write_points([{"measurement": "weight","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": weight.attributes['startDate'].value,"fields": {"weight": float(weight.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Weight Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#BodyFatPercentage
print("Starting Body Fat Percentage Export")
for bodyfat in recordlist:
if bodyfat.attributes['type'].value == 'HKQuantityTypeIdentifierBodyFatPercentage':
client.write_points([{"measurement": "bodyfat","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": bodyfat.attributes['startDate'].value,"fields": {"bodyfat": float(bodyfat.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Body Fat Percentage Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#LeanBodyMass
logger.info("Starting Lean Body Mass Export")
for leanmass in recordlist:
if leanmass.attributes['type'].value == 'HKQuantityTypeIdentifierLeanBodyMass':
client.write_points([{"measurement": "leanmass","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": leanmass.attributes['startDate'].value,"fields": {"leanmass": float(leanmass.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Lean Body Mass Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#Systolic BP
logger.info("Starting Systolic BP Export")
for systolic in recordlist:
if systolic.attributes['type'].value == 'HKQuantityTypeIdentifierBloodPressureSystolic':
client.write_points([{"measurement": "systolic","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": systolic.attributes['startDate'].value,"fields": {"systolic": float(systolic.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Systolic BP Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
#Diastolic BP
logger.info("Starting Diastolic BP Export")
for diastolic in recordlist:
if diastolic.attributes['type'].value == 'HKQuantityTypeIdentifierBloodPressureDiastolic':
client.write_points([{"measurement": "diastolic","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": diastolic.attributes['startDate'].value,"fields": {"diastolic": float(diastolic.attributes['value'].value)}}])
else:
pass
TEMPTIME = datetime.now() - startTime
logger.info(f"Diastolic Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
RECORDINC += 1
TEMPTIME = datetime.now() - startTime
return logger.info(f"Export Completed in {TEMPTIME}")
healthkit_import()
client = healthkit_db()
healthkit_xml()
healthkit_notify()
healthkitcnf.py
#Zip Import / Export Config
#NO NEED TO CHANGE
ExportZip = "export.zip"
ExportDir = "apple_health_export"
ExportXML = "export.xml"
#Dropbox Config
access_token = '*************************'
#InfluxDB Server
IP = 'XXX.XXX.XXX.XXX'
PORT = '8086'
USERNAME = 'user'
PASSWORD = 'password'
DB_NAME = 'database'
NAME = 'Me' #Name of persons data
vCount = 0
#SMTP Config - For Gmail. Email password is an app password generated from your security settings if using 2 factor auth.
email_user = 'email'
email_pass = 'password'
email_sendto = 'sendto email'
email_subject = 'Healthkit Import Notification'
email_body = 'SUBJECT: Healthkit Import Completed Successfully!'
healthkitfn.py
#Imports for Functions
import logging
import zipfile
import os
import shutil
#Logging Function
def init_logging():
rootLogger = logging.getLogger('healthkit_logger')
LOG_DIR = os.getcwd() + '/' + 'logs'
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
fileHandler = logging.FileHandler("{0}/{1}.log".format(LOG_DIR, "healthkit"))
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fileHandler.setFormatter(formatter)
rootLogger.addHandler(fileHandler)
rootLogger.setLevel(logging.DEBUG)
consoleHandler = logging.StreamHandler()
rootLogger.addHandler(consoleHandler)
return rootLogger
HKValues.csv
HKQuantityTypeIdentifierBloodGlucose,HKQuantityTypeIdentifierBodyMassIndex,HKQuantityTypeIdentifierHeight,HKQuantityTypeIdentifierBodyMass,HKQuantityTypeIdentifierHeartRate,HKQuantityTypeIdentifierBloodPressureSystolic,HKQuantityTypeIdentifierBloodPressureDiastolic,HKQuantityTypeIdentifierBodyFatPercentage,HKQuantityTypeIdentifierLeanBodyMass,HKQuantityTypeIdentifierStepCount,HKQuantityTypeIdentifierDistanceWalkingRunning,HKQuantityTypeIdentifierBasalEnergyBurned,HKQuantityTypeIdentifierActiveEnergyBurned,HKQuantityTypeIdentifierFlightsClimbed,HKQuantityTypeIdentifierDietaryFatTotal,HKQuantityTypeIdentifierDietaryFatPolyunsaturated,HKQuantityTypeIdentifierDietaryFatMonounsaturated,HKQuantityTypeIdentifierDietaryFatSaturated,HKQuantityTypeIdentifierDietaryCholesterol,HKQuantityTypeIdentifierDietarySodium,HKQuantityTypeIdentifierDietaryCarbohydrates,HKQuantityTypeIdentifierDietaryFiber,HKQuantityTypeIdentifierDietarySugar,HKQuantityTypeIdentifierDietaryEnergyConsumed,HKQuantityTypeIdentifierDietaryProtein,HKQuantityTypeIdentifierDietaryVitaminC,HKQuantityTypeIdentifierDietaryCalcium,HKQuantityTypeIdentifierDietaryIron,HKQuantityTypeIdentifierDietaryPotassium,HKQuantityTypeIdentifierAppleExerciseTime,HKQuantityTypeIdentifierRestingHeartRate,HKQuantityTypeIdentifierWalkingHeartRateAverage
HKValues.txt
HKQuantityTypeIdentifierBloodGlucose
HKQuantityTypeIdentifierBodyMassIndex
HKQuantityTypeIdentifierHeight
HKQuantityTypeIdentifierBodyMass
HKQuantityTypeIdentifierHeartRate
HKQuantityTypeIdentifierBloodPressureSystolic
HKQuantityTypeIdentifierBloodPressureDiastolic
HKQuantityTypeIdentifierBodyFatPercentage
HKQuantityTypeIdentifierLeanBodyMass
HKQuantityTypeIdentifierStepCount
HKQuantityTypeIdentifierDistanceWalkingRunning
HKQuantityTypeIdentifierBasalEnergyBurned
HKQuantityTypeIdentifierActiveEnergyBurned
HKQuantityTypeIdentifierFlightsClimbed
HKQuantityTypeIdentifierDietaryFatTotal
HKQuantityTypeIdentifierDietaryFatPolyunsaturated
HKQuantityTypeIdentifierDietaryFatMonounsaturated
HKQuantityTypeIdentifierDietaryFatSaturated
HKQuantityTypeIdentifierDietaryCholesterol
HKQuantityTypeIdentifierDietarySodium
HKQuantityTypeIdentifierDietaryCarbohydrates
HKQuantityTypeIdentifierDietaryFiber
HKQuantityTypeIdentifierDietarySugar
HKQuantityTypeIdentifierDietaryEnergyConsumed
HKQuantityTypeIdentifierDietaryProtein
HKQuantityTypeIdentifierDietaryVitaminC
HKQuantityTypeIdentifierDietaryCalcium
HKQuantityTypeIdentifierDietaryIron
HKQuantityTypeIdentifierDietaryPotassium
HKQuantityTypeIdentifierAppleExerciseTime
HKQuantityTypeIdentifierRestingHeartRate
HKQuantityTypeIdentifierWalkingHeartRateAverage
HKCategoryTypeIdentifierSleepAnalysis
HKCategoryTypeIdentifierAppleStandHour
HKCategoryTypeIdentifierMindfulSession
HKQuantityTypeIdentifierHeartRateVariabilitySDNN
Use the code files attached above
- Edit healthkitcnf.py with Database info and Dropbox Access Token.
- Export your Apple Health data from device and select Dropbox.
- Run script.
Disclaimer :
Unlike the Fitbit script above , I haven’t run the Healthkit script since I do not own the device yet. So cant vouch for the correctness of this code.
Infact there may be several sections further down including this one where I shall be approaching the topic at a more theoretical level. I would encourage readers to try this code on their local and let me know the results.
However it may be worth mentioning here that in all my previous posts I have been ensuring that I only share code that has been tried and worked on my local even if the original code was copied from the web. Incase you are not aware, 80% of the code found on web fails to work out-of-box because either it is outdated or written for a different OS or you need to change settings to adapt it to your local environment. Several times these tasks take a long time to debug and you might be forced at times to just abandon that approach and search for some other page on the web.
Create an AWS EMR based pyspark job to run heart rate calculations.
Create your AWS account if you haven’t already. Install and configure the AWS Command Line Interface.
Create a new s3 bucket.
Upload the attached pyspark_job.py file to that bucket.
# pyspark_job.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def create_spark_session():
"""Create spark session.
Returns:
spark (SparkSession) - spark session connected to AWS EMR
cluster
"""
spark = SparkSession \
.builder \
.config("spark.jars.packages",
"org.apache.hadoop:hadoop-aws:2.7.0") \
.getOrCreate()
return spark
def process_heartrate_data(spark, input_bucket, output_bucket):
client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT, username=INFLUXDB_USERNAME, password=INFLUXDB_PASSWORD)
try:
query = f'from(bucket: \"{input_bucket}\") |> range(start: -10m)'
tables = client.query_api().query(query, org=org)
for table in tables:
print(">>>>>>>>>> Showing table details >>>>>>>>>>>>")
print(table)
for row in table.records:
print(">>>>>>>>>> Showing Rows >>>>>>>>>>>>")
print (row.values)
except:
print('Something went wrong')
try:
#Compute averages here
data = "mem,host=host1 used_percent=23.43234543"
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(output_bucket, org, data)
print(f"Data has been saved {data}")
except:
print("Something went wrong")
def main():
spark = create_spark_session()
#bucket_name="IronMan's Bucket"
process_heartrate_data(spark, input_bucket, output_bucket)
if __name__ == '__main__':
main()
To install useful packages on all of the nodes of our cluster, we’ll need to create the file emr_bootstrap.sh and add it to a bucket on S3.
#!/bin/bash
sudo pip install -U \
matplotlib \
pandas \
spark-nlp
Create a key pair file
Navigate to EC2 from the homepage of your console:

Select “Key Pairs”

Click “Create Key Pair” then enter a name and click “Create”.


The file emr-key.pem should download automatically. Store it in a directory you’ll remember.
Now run the command below to create the EMR cluster.
aws emr create-cluster --name "Spark cluster with step" \
--release-label emr-5.24.1 \
--applications Name=Spark \
--log-uri s3://your-bucket/logs/ \
--ec2-attributes KeyName=your-key-pair \
--instance-type m5.xlarge \
--instance-count 3 \
--bootstrap-actions Path=s3://your-bucket/emr_bootstrap.sh \
--steps Type=Spark,Name="Spark job",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--master,yarn,s3://your-bucket/pyspark_job.py] \
--use-default-roles \
--auto-terminate
Building a deep learning model for prediction
(this section is work-in-progress)
Build a deep learning model based upon the information provided in the paper below to predict the possibility of covid diagnosis. Kindly note that we are dealing with heart rate alone here and are not concerned with all the other tests mentioned in the paper.
https://www.medrxiv.org/content/10.1101/2020.08.14.20175265v1.full.pdf
Reference :
https://github.com/nu1lx/HealthKit
https://towardsdatascience.com/production-data-processing-with-apache-spark-96a58dfd3fe7
https://towardsdatascience.com/getting-started-with-pyspark-on-amazon-emr-c85154b6b921
https://www.hindawi.com/journals/ddns/2020/6152041/
https://www.mobihealthnews.com/news/early-data-fitbit-study-indicates-it-can-predict-covid-19-symptoms-show
https://blog.fitbit.com/early-findings-covid-19-study/
https://www.medrxiv.org/content/10.1101/2020.08.14.20175265v1.full.pdf
https://medium.com/welltory/alternative-at-home-test-for-coronavirus-heart-rate-variability-326a7237abe7