Detecting the onset of covid-19 early using heartrate measurements collected from wearable devices

Through this blog post I shall be creating a solution architecture for the following

  1. Collecting wearable devices data from multiple users. For example heartrate from Fitbit and Apple HealthKit devices.
  2. Ingesting this data into an influxdb database running inside a docker container
  3. Running a pyspark job on aws EMR cluster to calculate heart rate variability, monitor the resting heart rate and breathing rate increase
  4. Generate values that could possibly be used in a Deep Learning Model to predict the onset of Coronavirus in a patient atleast a week before actual symptoms showup.

Medical study indicates that breathing rate, resting heart rate and heart rate variability can be used to detect the onset of illnesses, especially when tracked when the user is asleep.

Researchers have found that in many cases heart rate variability decreases, while resting heart rate and breathing rate increase, in people who have Covid-19 symptoms. In some cases, those measurements could signal changes in a person’s health nearly a week before symptoms were reported.

Setup influxDB locally.

We shall go with influxDB since it is a good timeseries database, and available in opensource versions.

Copy the attached conf file to D:/ProgramData/InfluxDB

docker run -p 8086:8086 -v C:/ProgramData/InfluxDB:/var/lib/influxdb influxdb -config /var/lib/influxdb/influxdb.conf

Verify using

http://localhost:8086/query?q=show%20databases

Inorder to use InfluxDB cli

$ docker ps
copy image id 

$ docker exec -it  10733ba6d014 /bin/bash
root@10733ba6d014:/# influx

Ingesting data from Fitbit devices into InfluxDB.

create a new app by registering into Fitbit at https://dev.fitbit.com/apps/new

Request all users who would like to share their Fitbit data with you to login to their respective Fitbit apps and authorize your app.

Let us now create a database in InfluxDB to store all our participant user’s Fitbit data.

$ create database fitbit
$ use fitbit
CREATE USER root WITH PASSWORD 'root' WITH ALL PRIVILEGES

execute the code provided below

#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import sys
import os
import pytz
from datetime import datetime, date, timedelta
from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError

LOCAL_TIMEZONE = pytz.timezone('America/New_York')
FITBIT_LANGUAGE = 'en_US'
FITBIT_CLIENT_ID = '2XXX2'
FITBIT_CLIENT_SECRET = 'cc2f9f738c695a763xxxxxxxxxx4480'
FITBIT_ACCESS_TOKEN = \
    'eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiIyMkJUWDIiLCJzdWIiOiI2WUxXM0YiLCJpc3MiOiJGaXRiaXQiLCJ0eXAiOiJhY2Nlc3NfdGxxxxxxxxxxxxxxxiJ3aHIgd3BybyB3bnV0IHdzbGUgd3dlaSB3c29jIHdzZXQgd2FjdCB3bG9jIiwiZXhwIjoxNjAwMTQzNDUzLCJpYXQiOjE2MDAxMTQ2NTN9.506-BPp5P5rRcTwBujGszfWZAziCJV2WY7_-QIRybQM'
FITBIT_INITIAL_CODE = '37f3932f38322df1547xxxxxxxxx33398aa6e0'
REDIRECT_URI = 'https://localhost'
INFLUXDB_HOST = 'localhost'
INFLUXDB_PORT = 8086
INFLUXDB_USERNAME = 'root'
INFLUXDB_PASSWORD = 'root'
INFLUXDB_DATABASE = 'fitbit'
points = []


def fetch_data(category, type):
    try:
        response = requests.get('https://api.fitbit.com/1/user/-/'
                                + category + '/' + type
                                + '/date/today/1d.json',
                                headers={'Authorization': 'Bearer '
                                + FITBIT_ACCESS_TOKEN,
                                'Accept-Language': FITBIT_LANGUAGE})
        response.raise_for_status()
    except requests.exceptions.HTTPError, err:
        print 'HTTP request failed: %s' % err
        sys.exit()

    data = response.json()
    print 'Got ' + type + ' from Fitbit'

    for day in data[category.replace('/', '-') + '-' + type]:
        points.append({'measurement': type,
                      'time': LOCAL_TIMEZONE.localize(datetime.fromisoformat(day['dateTime'
                      ])).astimezone(pytz.utc).isoformat(),
                      'fields': {'value': float(day['value'])}})


def fetch_heartrate(date):
    try:
        response = \
            requests.get('https://api.fitbit.com/1/user/-/activities/heart/date/'
                          + date + '/1d/1min.json',
                         headers={'Authorization': 'Bearer '
                         + FITBIT_ACCESS_TOKEN,
                         'Accept-Language': FITBIT_LANGUAGE})
        response.raise_for_status()
    except requests.exceptions.HTTPError, err:
        print 'HTTP request failed: %s' % err
        sys.exit()

    data = response.json()
    print 'Got heartrates from Fitbit'

    for day in data['activities-heart']:
        if 'restingHeartRate' in day['value']:
            points.append({'measurement': 'restingHeartRate',
                          'time': datetime.fromisoformat(day['dateTime'
                          ]), 'fields': {'value': float(day['value'
                          ]['restingHeartRate'])}})

        if 'heartRateZones' in day['value']:
            for zone in day['value']['heartRateZones']:
                if 'caloriesOut' in zone and 'min' in zone and 'max' \
                    in zone and 'minutes' in zone:
                    points.append({
                        'measurement': 'heartRateZones',
                        'time': datetime.fromisoformat(day['dateTime'
                                ]),
                        'tags': {'zone': zone['name']},
                        'fields': {
                            'caloriesOut': float(zone['caloriesOut']),
                            'min': float(zone['min']),
                            'max': float(zone['max']),
                            'minutes': float(zone['minutes']),
                            },
                        })
                elif 'min' in zone and 'max' in zone and 'minutes' \
                    in zone:

                    points.append({
                        'measurement': 'heartRateZones',
                        'time': datetime.fromisoformat(day['dateTime'
                                ]),
                        'tags': {'zone': zone['name']},
                        'fields': {'min': float(zone['min']),
                                   'max': float(zone['max']),
                                   'minutes': float(zone['minutes'])},
                        })

    if 'activities-heart-intraday' in data:
        for value in data['activities-heart-intraday']['dataset']:
            time = datetime.fromisoformat(date + 'T' + value['time'])
            utc_time = \
                LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
            points.append({'measurement': 'heartrate',
                          'time': utc_time,
                          'fields': {'value': float(value['value'])}})


def process_levels(levels):
    for level in levels:
        type = level['level']
        if type == 'asleep':
            type = 'light'
        if type == 'restless':
            type = 'rem'
        if type == 'awake':
            type = 'wake'

        time = datetime.fromisoformat(level['dateTime'])
        utc_time = \
            LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
        points.append({'measurement': 'sleep_levels', 'time': utc_time,
                      'fields': {'seconds': int(level['seconds'])}})


def fetch_activities(date):
    try:
        response = \
            requests.get('https://api.fitbit.com/1/user/-/activities/list.json'
                         , headers={'Authorization': 'Bearer '
                         + FITBIT_ACCESS_TOKEN,
                         'Accept-Language': FITBIT_LANGUAGE}, params={
            'beforeDate': date,
            'sort': 'desc',
            'limit': 10,
            'offset': 0,
            })
        response.raise_for_status()
    except requests.exceptions.HTTPError, err:
        print 'HTTP request failed: %s' % err
        sys.exit()

    data = response.json()
    print 'Got activities from Fitbit'

    for activity in data['activities']:
        fields = {}

        if 'activeDuration' in activity:
            fields['activeDuration'] = int(activity['activeDuration'])
        if 'averageHeartRate' in activity:
            fields['averageHeartRate'] = int(activity['averageHeartRate'
                    ])
        if 'calories' in activity:
            fields['calories'] = int(activity['calories'])
        if 'duration' in activity:
            fields['duration'] = int(activity['duration'])
        if 'distance' in activity:
            fields['distance'] = float(activity['distance'])
            fields['distanceUnit'] = activity['distanceUnit']
        if 'pace' in activity:
            fields['pace'] = float(activity['pace'])
        if 'speed' in activity:
            fields['speed'] = float(activity['speed'])
        if 'elevationGain' in activity:
            fields['elevationGain'] = int(activity['elevationGain'])
        if 'steps' in activity:
            fields['steps'] = int(activity['steps'])

        for level in activity['activityLevel']:
            if level['name'] == 'sedentary':
                fields[level['name'] + 'Minutes'] = int(level['minutes'
                        ])
            else:
                fields[level['name'] + 'ActiveMinutes'] = \
                    int(level['minutes'])

        time = datetime.fromisoformat(activity['startTime'].strip('Z'))
        utc_time = time.astimezone(pytz.utc).isoformat()
        points.append({
            'measurement': 'activity',
            'time': utc_time,
            'tags': {'activityName': activity['activityName']},
            'fields': fields,
            })


try:
    client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT,
                            username=INFLUXDB_USERNAME,
                            password=INFLUXDB_PASSWORD)
    client.create_database(INFLUXDB_DATABASE)
    client.switch_database(INFLUXDB_DATABASE)
except InfluxDBClientError, err:
    print 'InfluxDB connection failed: %s' % err
    sys.exit()

if not FITBIT_ACCESS_TOKEN:
    if os.path.isfile('.fitbit-refreshtoken'):
        f = open('.fitbit-refreshtoken', 'r')
        token = f.read()
        f.close()
        response = requests.post('https://api.fitbit.com/oauth2/token',
                                 data={
            'client_id': FITBIT_CLIENT_ID,
            'grant_type': 'refresh_token',
            'redirect_uri': REDIRECT_URI,
            'refresh_token': token,
            }, auth=(FITBIT_CLIENT_ID, FITBIT_CLIENT_SECRET))
    else:
        response = requests.post('https://api.fitbit.com/oauth2/token',
                                 data={
            'client_id': FITBIT_CLIENT_ID,
            'grant_type': 'authorization_code',
            'redirect_uri': REDIRECT_URI,
            'code': FITBIT_INITIAL_CODE,
            }, auth=(FITBIT_CLIENT_ID, FITBIT_CLIENT_SECRET))

    response.raise_for_status()

    json = response.json()
    FITBIT_ACCESS_TOKEN = json['access_token']
    refresh_token = json['refresh_token']
    f = open('.fitbit-refreshtoken', 'w+')
    f.write(refresh_token)
    f.close()

end = date.today()
start = end - timedelta(days=1)

try:
    response = \
        requests.get('https://api.fitbit.com/1.2/user/-/sleep/date/'
                     + start.isoformat() + '/' + end.isoformat()
                     + '.json', headers={'Authorization': 'Bearer '
                     + FITBIT_ACCESS_TOKEN,
                     'Accept-Language': FITBIT_LANGUAGE})
    response.raise_for_status()
except requests.exceptions.HTTPError, err:
    print 'HTTP request failed: %s' % err
    sys.exit()

data = response.json()
print 'Got sleep sessions from Fitbit'

for day in data['sleep']:
    time = datetime.fromisoformat(day['startTime'])
    utc_time = \
        LOCAL_TIMEZONE.localize(time).astimezone(pytz.utc).isoformat()
    if day['type'] == 'stages':
        points.append({'measurement': 'sleep', 'time': utc_time,
                      'fields': {
            'duration': int(day['duration']),
            'efficiency': int(day['efficiency']),
            'is_main_sleep': bool(day['isMainSleep']),
            'minutes_asleep': int(day['minutesAsleep']),
            'minutes_awake': int(day['minutesAwake']),
            'time_in_bed': int(day['timeInBed']),
            'minutes_deep': int(day['levels']['summary']['deep'
                                ]['minutes']),
            'minutes_light': int(day['levels']['summary']['light'
                                 ]['minutes']),
            'minutes_rem': int(day['levels']['summary']['rem']['minutes'
                               ]),
            'minutes_wake': int(day['levels']['summary']['wake'
                                ]['minutes']),
            }})
    else:

        points.append({'measurement': 'sleep', 'time': utc_time,
                      'fields': {
            'duration': int(day['duration']),
            'efficiency': int(day['efficiency']),
            'is_main_sleep': bool(day['isMainSleep']),
            'minutes_asleep': int(day['minutesAsleep']),
            'minutes_awake': int(day['minutesAwake']),
            'time_in_bed': int(day['timeInBed']),
            'minutes_deep': 0,
            'minutes_light': int(day['levels']['summary']['asleep'
                                 ]['minutes']),
            'minutes_rem': int(day['levels']['summary']['restless'
                               ]['minutes']),
            'minutes_wake': int(day['levels']['summary']['awake'
                                ]['minutes']),
            }})

    if 'data' in day['levels']:
        process_levels(day['levels']['data'])

    if 'shortData' in day['levels']:
        process_levels(day['levels']['shortData'])

fetch_data('activities', 'steps')
fetch_data('activities', 'distance')
fetch_data('activities', 'floors')
fetch_data('activities', 'elevation')
fetch_data('activities', 'distance')
fetch_data('activities', 'minutesSedentary')
fetch_data('activities', 'minutesLightlyActive')
fetch_data('activities', 'minutesFairlyActive')
fetch_data('activities', 'minutesVeryActive')
fetch_data('activities', 'calories')
fetch_data('activities', 'activityCalories')
fetch_data('body', 'weight')
fetch_data('body', 'fat')
fetch_data('body', 'bmi')
fetch_data('foods/log', 'water')
fetch_data('foods/log', 'caloriesIn')
fetch_heartrate(date.today().isoformat())
fetch_activities((date.today() + timedelta(days=1)).isoformat())

try:
    client.write_points(points)
except InfluxDBClientError, err:
    print 'Unable to write points to InfluxDB: %s' % err
    sys.exit()

print 'Successfully wrote %s data points to InfluxDB' % len(points)

The primary artifact needed for executing the code above is the access token.

For fetching the access token we first need the code which is obtained by clicking on the link like below.

https://www.fitbit.com/oauth2/authorize?response_type=code&client_id=2XXXX2&redirect_uri=http%3A%2F%2F127.0.0.1%3A8085&scope=activity%20heartrate%20location%20nutrition%20profile%20settings%20sleep%20social%20weight&expires_in=604800

This keeps changing, the proper procedure to get a valid one is click on the tutorial link on the page below and follow the steps

https://dev.fitbit.com/apps/oauthinteractivetutorial?clientEncodedId=2XXXX2&clientSecret=cc2f9f738c695a763c0ecxxxxxxc4480&redirectUri=http://127.0.0.1:8085&applicationType=SERVER

Once the code is executed successfully, you will observe the following output.

Query the influxdb to verify the data inserted

Ingesting data from Apple Healthkit devices into InfluxDB.

use the following files :

healthkit.py

import healthkitfn
import healthkitcnf
import zipfile
import os
import shutil
import dropbox
from influxdb import InfluxDBClient
from datetime import datetime
from xml.dom import minidom
import csv

#Initiate Logging
logger = healthkitfn.init_logging()
#Temp time for tracking
startTime = datetime.now()

###################
# Start Functions #
###################

#Dropbox Download export.zip Function
def healthkit_import():
    #Connect To Dropbox
    dbx = dropbox.Dropbox(healthkitcnf.access_token)
    #Clean Existing Files
    if os.path.isdir(healthkitcnf.ExportDir):
        shutil.rmtree(healthkitcnf.ExportDir)
        logger.info(f"Removed {healthkitcnf.ExportDir}")
    if os.path.isfile(healthkitcnf.ExportZip):
        os.remove(healthkitcnf.ExportZip)
        logger.info(f"Removed {healthkitcnf.ExportZip}")
    if os.path.isfile(healthkitcnf.ExportXML):
        os.remove(healthkitcnf.ExportXML)
        logger.info(f"Removed {healthkitcnf.ExportXML}")
    #Download New export.zip and unzip
    with open("export.zip", "wb") as f:
        metadata, res = dbx.files_download(path="/export.zip")
        f.write(res.content)
        logger.info('Downloaded export.zip Successfully')
        zip_ref = zipfile.ZipFile(healthkitcnf.ExportZip, 'r')
        zip_ref.extractall()
        zip_ref.close()
        logger.info("Unzipped export.zip Successfully")
        shutil.copy("apple_health_export/export.xml", healthkitcnf.ExportXML)
        logger.info("Copied export.xml to primary directory")
    return logger.info('Download and Copy Completed')

#Connect to Database
def healthkit_db():
        client = InfluxDBClient(healthkitcnf.IP, healthkitcnf.PORT, healthkitcnf.USERNAME, healthkitcnf.PASSWORD, healthkitcnf.DB_NAME)
        client.create_database(healthkitcnf.DB_NAME)
        logger.info('Connected to Database Successfully')
        return client

#Load HK Type Values Into Array
def healthkit_text():
        f = open('HKValues.txt', 'r')
        HKValues = f.readlines()
        f.close()
        return HKValues

def healthkit_csv():
        with open('HKValues.csv', newline='') as csvfile:
                HKValues = list(csv.reader(csvfile))
        return HKValues

#Send Notification once completed
def healthkit_notify():
    try:
        email_ssl = smtplib.SMTP_SSL('smtp.gmail.com', 465)
        email_ssl.ehlo
        email_ssl.login(healthkitcnf.email_user, healthkitcnf.email_pass)
        sent_from = healthkitcnf.email_user  
        send_to = healthkitcnf.email_sendto  
        subject = healthkitcnf.email_subject
        email_text = healthkitcnf.email_body
        email_ssl.sendmail(sent_from, send_to, email_text)
        email_ssl.close()
        logger.info("Notification Sent")
    except:
        logger.debug("Failed to send completed notification.")
    return (logger.info("Email sent"))

#XML Parse / Import HKValues.txt / Update DB
def healthkit_xml():
        NAME = healthkitcnf.NAME
        #Stat Keeping
        NUMRECORDS = 12
        RECORDINC = 1

        #Setup XML Parse
        logger.info("Importing XML Into Record List")
        xmldoc = minidom.parse('export.xml')
        recordlist = xmldoc.getElementsByTagName('Record')
        logger.info('Imported XML Records Successfully')
        #Import Healthkit Values Into Array
        #HKValues = healthkit_csv()
        #logger.info('Imported Health Kit Type Values')

        logger.info("Starting Heart Rate Export")
        for s in recordlist:
            if s.attributes['type'].value == "HKQuantityTypeIdentifierHeartRate":
                client.write_points([{"measurement": "heartrate","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": s.attributes['startDate'].value,"fields": {"watch_heartrate": float(s.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #Resting Heart Rate
        logger.info("Starting Resting Heart Rate Export")
        for restingHeart in recordlist:
            if restingHeart.attributes['type'].value == "HKQuantityTypeIdentifierRestingHeartRate":
                client.write_points([{"measurement": "restingheartrate","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": restingHeart.attributes['startDate'].value,"fields": {"resting_heartrate": float(restingHeart.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Resting Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #Walking Heart Rate
        logger.info("Starting Walking Heart Rate Export")
        for walkingHeart in recordlist:
            if walkingHeart.attributes['type'].value == "HKQuantityTypeIdentifierWalkingHeartRateAverage":
                client.write_points([{"measurement": "walkingHeart","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": walkingHeart.attributes['startDate'].value,"fields": {"walking_heartrate": float(walkingHeart.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Walking Heart Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #distance walked
        logger.info("Starting Distance Walked Export")
        for distance in recordlist:
            if distance.attributes['type'].value == 'HKQuantityTypeIdentifierDistanceWalkingRunning':
                client.write_points([{"measurement": "distance","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": distance.attributes['startDate'].value,"fields": {"distance": float(distance.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Distance Walked Rate Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #basal calories
        logger.info("Starting Basal Calories Export")
        for basal in recordlist:
            if basal.attributes['type'].value == 'HKQuantityTypeIdentifierBasalEnergyBurned':
                client.write_points([{"measurement": "basal","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": basal.attributes['startDate'].value,"fields": {"basal": float(basal.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Basal Calories Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #active calories
        logger.info("Starting Active Calories Export")
        for active in recordlist:
            if active.attributes['type'].value == 'HKQuantityTypeIdentifierActiveEnergyBurned':
                client.write_points([{"measurement": "active","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": active.attributes['startDate'].value,"fields": {"active": float(active.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Active Calories Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #BMI
        logger.info("Starting BMI Export")
        for bmi in recordlist:
            if bmi.attributes['type'].value == 'HKQuantityTypeIdentifierBodyMassIndex':
                client.write_points([{"measurement": "bmi","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": bmi.attributes['startDate'].value,"fields": {"bmi": float(bmi.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"BMI Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #WEIGHT
        logger.info("Starting Weight Export")
        for weight in recordlist:
            if weight.attributes['type'].value == 'HKQuantityTypeIdentifierBodyMass':
                client.write_points([{"measurement": "weight","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": weight.attributes['startDate'].value,"fields": {"weight": float(weight.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Weight Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #BodyFatPercentage
        print("Starting Body Fat Percentage Export")
        for bodyfat in recordlist:
            if bodyfat.attributes['type'].value == 'HKQuantityTypeIdentifierBodyFatPercentage':
                client.write_points([{"measurement": "bodyfat","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": bodyfat.attributes['startDate'].value,"fields": {"bodyfat": float(bodyfat.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Body Fat Percentage Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #LeanBodyMass
        logger.info("Starting Lean Body Mass Export")
        for leanmass in recordlist:
            if leanmass.attributes['type'].value == 'HKQuantityTypeIdentifierLeanBodyMass':
                client.write_points([{"measurement": "leanmass","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": leanmass.attributes['startDate'].value,"fields": {"leanmass": float(leanmass.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Lean Body Mass Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #Systolic BP
        logger.info("Starting Systolic BP Export")
        for systolic in recordlist:
            if systolic.attributes['type'].value == 'HKQuantityTypeIdentifierBloodPressureSystolic':
                client.write_points([{"measurement": "systolic","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": systolic.attributes['startDate'].value,"fields": {"systolic": float(systolic.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Systolic BP Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1

        #Diastolic BP
        logger.info("Starting Diastolic BP Export")
        for diastolic in recordlist:
            if diastolic.attributes['type'].value == 'HKQuantityTypeIdentifierBloodPressureDiastolic':
                client.write_points([{"measurement": "diastolic","tags": {"service": "HealthKit","person": healthkitcnf.NAME},"time": diastolic.attributes['startDate'].value,"fields": {"diastolic": float(diastolic.attributes['value'].value)}}])
            else:
                pass
        TEMPTIME = datetime.now() - startTime
        logger.info(f"Diastolic Completed in {TEMPTIME} ({RECORDINC}/{NUMRECORDS})")
        RECORDINC += 1


        TEMPTIME = datetime.now() - startTime
        return logger.info(f"Export Completed in {TEMPTIME}")


healthkit_import()
client = healthkit_db()
healthkit_xml()
healthkit_notify()

healthkitcnf.py

#Zip Import / Export Config
#NO NEED TO CHANGE
ExportZip = "export.zip"
ExportDir = "apple_health_export"
ExportXML = "export.xml"

#Dropbox Config
access_token = '*************************'

#InfluxDB Server
IP = 'XXX.XXX.XXX.XXX'
PORT = '8086'
USERNAME = 'user'
PASSWORD = 'password'
DB_NAME = 'database'
NAME = 'Me' #Name of persons data

vCount = 0

#SMTP Config - For Gmail.  Email password is an app password generated from your security settings if using 2 factor auth.
email_user = 'email'
email_pass = 'password'
email_sendto = 'sendto email'
email_subject = 'Healthkit Import Notification'
email_body = 'SUBJECT: Healthkit Import Completed Successfully!'

healthkitfn.py

#Imports for Functions
import logging
import zipfile
import os
import shutil

#Logging Function
def init_logging():
    rootLogger = logging.getLogger('healthkit_logger')

    LOG_DIR = os.getcwd() + '/' + 'logs'
    if not os.path.exists(LOG_DIR):
        os.makedirs(LOG_DIR)
    fileHandler = logging.FileHandler("{0}/{1}.log".format(LOG_DIR, "healthkit"))
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fileHandler.setFormatter(formatter)

    rootLogger.addHandler(fileHandler)

    rootLogger.setLevel(logging.DEBUG)

    consoleHandler = logging.StreamHandler()
    rootLogger.addHandler(consoleHandler)

    return rootLogger

HKValues.csv

HKQuantityTypeIdentifierBloodGlucose,HKQuantityTypeIdentifierBodyMassIndex,HKQuantityTypeIdentifierHeight,HKQuantityTypeIdentifierBodyMass,HKQuantityTypeIdentifierHeartRate,HKQuantityTypeIdentifierBloodPressureSystolic,HKQuantityTypeIdentifierBloodPressureDiastolic,HKQuantityTypeIdentifierBodyFatPercentage,HKQuantityTypeIdentifierLeanBodyMass,HKQuantityTypeIdentifierStepCount,HKQuantityTypeIdentifierDistanceWalkingRunning,HKQuantityTypeIdentifierBasalEnergyBurned,HKQuantityTypeIdentifierActiveEnergyBurned,HKQuantityTypeIdentifierFlightsClimbed,HKQuantityTypeIdentifierDietaryFatTotal,HKQuantityTypeIdentifierDietaryFatPolyunsaturated,HKQuantityTypeIdentifierDietaryFatMonounsaturated,HKQuantityTypeIdentifierDietaryFatSaturated,HKQuantityTypeIdentifierDietaryCholesterol,HKQuantityTypeIdentifierDietarySodium,HKQuantityTypeIdentifierDietaryCarbohydrates,HKQuantityTypeIdentifierDietaryFiber,HKQuantityTypeIdentifierDietarySugar,HKQuantityTypeIdentifierDietaryEnergyConsumed,HKQuantityTypeIdentifierDietaryProtein,HKQuantityTypeIdentifierDietaryVitaminC,HKQuantityTypeIdentifierDietaryCalcium,HKQuantityTypeIdentifierDietaryIron,HKQuantityTypeIdentifierDietaryPotassium,HKQuantityTypeIdentifierAppleExerciseTime,HKQuantityTypeIdentifierRestingHeartRate,HKQuantityTypeIdentifierWalkingHeartRateAverage

HKValues.txt

HKQuantityTypeIdentifierBloodGlucose
HKQuantityTypeIdentifierBodyMassIndex
HKQuantityTypeIdentifierHeight
HKQuantityTypeIdentifierBodyMass
HKQuantityTypeIdentifierHeartRate
HKQuantityTypeIdentifierBloodPressureSystolic
HKQuantityTypeIdentifierBloodPressureDiastolic
HKQuantityTypeIdentifierBodyFatPercentage
HKQuantityTypeIdentifierLeanBodyMass
HKQuantityTypeIdentifierStepCount
HKQuantityTypeIdentifierDistanceWalkingRunning
HKQuantityTypeIdentifierBasalEnergyBurned
HKQuantityTypeIdentifierActiveEnergyBurned
HKQuantityTypeIdentifierFlightsClimbed
HKQuantityTypeIdentifierDietaryFatTotal
HKQuantityTypeIdentifierDietaryFatPolyunsaturated
HKQuantityTypeIdentifierDietaryFatMonounsaturated
HKQuantityTypeIdentifierDietaryFatSaturated
HKQuantityTypeIdentifierDietaryCholesterol
HKQuantityTypeIdentifierDietarySodium
HKQuantityTypeIdentifierDietaryCarbohydrates
HKQuantityTypeIdentifierDietaryFiber
HKQuantityTypeIdentifierDietarySugar
HKQuantityTypeIdentifierDietaryEnergyConsumed
HKQuantityTypeIdentifierDietaryProtein
HKQuantityTypeIdentifierDietaryVitaminC
HKQuantityTypeIdentifierDietaryCalcium
HKQuantityTypeIdentifierDietaryIron
HKQuantityTypeIdentifierDietaryPotassium
HKQuantityTypeIdentifierAppleExerciseTime
HKQuantityTypeIdentifierRestingHeartRate
HKQuantityTypeIdentifierWalkingHeartRateAverage
HKCategoryTypeIdentifierSleepAnalysis
HKCategoryTypeIdentifierAppleStandHour
HKCategoryTypeIdentifierMindfulSession
HKQuantityTypeIdentifierHeartRateVariabilitySDNN

Use the code files attached above

  1. Edit healthkitcnf.py with Database info and Dropbox Access Token.
  2. Export your Apple Health data from device and select Dropbox.
  3. Run script.

Disclaimer :

Unlike the Fitbit script above , I haven’t run the Healthkit script since I do not own the device yet. So cant vouch for the correctness of this code.

Infact there may be several sections further down including this one where I shall be approaching the topic at a more theoretical level. I would encourage readers to try this code on their local and let me know the results.

However it may be worth mentioning here that in all my previous posts I have been ensuring that I only share code that has been tried and worked on my local even if the original code was copied from the web. Incase you are not aware, 80% of the code found on web fails to work out-of-box because either it is outdated or written for a different OS or you need to change settings to adapt it to your local environment. Several times these tasks take a long time to debug and you might be forced at times to just abandon that approach and search for some other page on the web.

Create an AWS EMR based pyspark job to run heart rate calculations.

Create your AWS account if you haven’t already. Install and configure the AWS Command Line Interface.

Create a new s3 bucket.

Upload the attached pyspark_job.py file to that bucket.

# pyspark_job.py
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
def create_spark_session():
    """Create spark session.
Returns:
        spark (SparkSession) - spark session connected to AWS EMR
            cluster
    """
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages",
                "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
	
	
def process_heartrate_data(spark, input_bucket, output_bucket):

    client = InfluxDBClient(host=INFLUXDB_HOST, port=INFLUXDB_PORT, username=INFLUXDB_USERNAME, password=INFLUXDB_PASSWORD)
    
    try:

        query = f'from(bucket: \"{input_bucket}\") |> range(start: -10m)'
        tables = client.query_api().query(query, org=org)

        for table in tables:
            print(">>>>>>>>>> Showing table details >>>>>>>>>>>>")
            print(table)
            for row in table.records:
                print(">>>>>>>>>> Showing Rows >>>>>>>>>>>>")
                print (row.values)

    except:
        print('Something went wrong')
        
        
    try:
        #Compute averages here
        data = "mem,host=host1 used_percent=23.43234543"
        
        write_api = client.write_api(write_options=SYNCHRONOUS)
        
        write_api.write(output_bucket, org, data)

        print(f"Data has been saved {data}")
    except:
        print("Something went wrong")      
		
		
def main():
    spark = create_spark_session()
    #bucket_name="IronMan's Bucket"
    process_heartrate_data(spark, input_bucket, output_bucket)
if __name__ == '__main__':
    main()

To install useful packages on all of the nodes of our cluster, we’ll need to create the file emr_bootstrap.sh and add it to a bucket on S3.

#!/bin/bash
sudo pip install -U \
matplotlib \
pandas \
spark-nlp

Create a key pair file

Navigate to EC2 from the homepage of your console:

Select “Key Pairs”

Click “Create Key Pair” then enter a name and click “Create”.

The file emr-key.pem should download automatically. Store it in a directory you’ll remember.

Now run the command below to create the EMR cluster.

aws emr create-cluster --name "Spark cluster with step" \
--release-label emr-5.24.1 \
--applications Name=Spark \
--log-uri s3://your-bucket/logs/ \
--ec2-attributes KeyName=your-key-pair \
--instance-type m5.xlarge \
--instance-count 3 \
--bootstrap-actions Path=s3://your-bucket/emr_bootstrap.sh \
--steps Type=Spark,Name="Spark job",ActionOnFailure=CONTINUE,Args=[--deploy-mode,cluster,--master,yarn,s3://your-bucket/pyspark_job.py] \
--use-default-roles \
--auto-terminate

Building a deep learning model for prediction

(this section is work-in-progress)

Build a deep learning model based upon the information provided in the paper below to predict the possibility of covid diagnosis. Kindly note that we are dealing with heart rate alone here and are not concerned with all the other tests mentioned in the paper.

https://www.medrxiv.org/content/10.1101/2020.08.14.20175265v1.full.pdf

Reference :

https://github.com/nu1lx/HealthKit
https://towardsdatascience.com/production-data-processing-with-apache-spark-96a58dfd3fe7
https://towardsdatascience.com/getting-started-with-pyspark-on-amazon-emr-c85154b6b921
https://www.hindawi.com/journals/ddns/2020/6152041/
https://www.mobihealthnews.com/news/early-data-fitbit-study-indicates-it-can-predict-covid-19-symptoms-show
https://blog.fitbit.com/early-findings-covid-19-study/
https://www.medrxiv.org/content/10.1101/2020.08.14.20175265v1.full.pdf
https://medium.com/welltory/alternative-at-home-test-for-coronavirus-heart-rate-variability-326a7237abe7

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: