Python based framework for FOTA (Firmware Over The Air)

A very common requirement in IoT based systems is to remotely install updated versions of firmware on devices.

Presented in this blog is a python based generic framework for performing the update.

Salient features :

  1. Meager code footprint of less than 4 KB.
  2. Secure download from s3 bucket using access key and secret.
  3. Secure device pull methodology vs cloud push updates.
  4. Extraction and installation without human intervention.
  5. Firmware file stored in encrypted format with encryption key in s3

High level Architecture

Setup process flow

Source code files

Code to encrypt the zip file (crypt.py)

from cryptography.fernet import Fernet
import os
import hashlib


def write_key():
    """
    Generates a key and save it into a file
    """
    key = Fernet.generate_key()
    with open("key.key", "wb") as key_file:
        key_file.write(key)

def load_key():
    """
    Loads the key from the current directory named `key.key`
    """
    return open("key.key", "rb").read()


def encrypt(filename, key):
    """
    Given a filename (str) and key (bytes), it encrypts the file and write it
    """
    f = Fernet(key)
    with open(filename, "rb") as file:
        # read all file data
        file_data = file.read()
    # encrypt data
    encrypted_data = f.encrypt(file_data)
    # write the encrypted file
    with open(filename, "wb") as file:
        file.write(encrypted_data)


def decrypt(filename, key):
    """
    Given a filename (str) and key (bytes), it decrypts the file and write it
    """
    f = Fernet(key)
    with open(filename, "rb") as file:
        # read the encrypted data
        encrypted_data = file.read()
    # decrypt data
    decrypted_data = f.decrypt(encrypted_data)
    # write the original file
    with open(filename, "wb") as file:
        file.write(decrypted_data)


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="Simple File Encryptor Script")
    parser.add_argument("file", help="File to encrypt/decrypt")
    parser.add_argument("-g", "--generate-key", dest="generate_key", action="store_true",
                        help="Whether to generate a new key or use existing")
    parser.add_argument("-e", "--encrypt", action="store_true",
                        help="Whether to encrypt the file, only -e or -d can be specified.")
    parser.add_argument("-d", "--decrypt", action="store_true",
                        help="Whether to decrypt the file, only -e or -d can be specified.")

    args = parser.parse_args()
    file = args.file
    generate_key = args.generate_key

    if generate_key:
        write_key()
    # load the key
    key = load_key()

    encrypt_ = args.encrypt
    decrypt_ = args.decrypt

    if encrypt_ and decrypt_:
        raise TypeError("Please specify whether you want to encrypt the file or decrypt it.")
    elif encrypt_:
        encrypt(file, key)
        calculated_checksum = hashlib.md5(open(file,'rb').read()).hexdigest()
        print("Calculated checksum : " , calculated_checksum)
    elif decrypt_:
        decrypt(file, key)
    else:
        raise TypeError("Please specify whether you want to encrypt the file or decrypt it.")
python crypt.py <path>/firmware.zip --encrypt

Code to calculate checksum of encrypted zip file (calc-checksum.py)

import boto3
import botocore
import hashlib
from boto3 import client
import zipfile
import subprocess
import os
import json
from cryptography.fernet import Fernet



def readConfig():
    with open("config.yaml", 'r') as file:
        data = file.read()
        line = json.loads(json.dumps(data))
    
    for key in line.split('\n'):
        val1, val2 = key.split(":")
        if val1 == "file_name":
            file_name = val2
    file.close()
    return file_name


    
def checksum():

    
    file_name = readConfig()
    calculated_checksum = hashlib.md5(open(file_name,'rb').read()).hexdigest()
    print("Calculated checksum : " , calculated_checksum)   

    
checksum()

Key source file that implements FOTA (filedownload.py)

import boto3
import botocore
import hashlib
from boto3 import client
import zipfile
import subprocess
import os
import json
from cryptography.fernet import Fernet

#pip3 install cryptography

BUCKET_NAME = 'xxx-download' 
FILE_KEY = 'firmware.zip' 
CHECKSUM_KEY = 'checksum.txt'
ECDC_KEY = 'key.key'

def readConfig():
    with open("config.yaml", 'r') as file:
        data = file.read()
        line = json.loads(json.dumps(data))
    
    for key in line.split('\n'):
        val1, val2 = key.split(":")
        if val1 == "accesskey":
            accesskey = val2
        if val1 == "secretkey":
            secretkey = val2
        if val1 == "region":
            region = val2 
    file.close()
    return accesskey, secretkey, region



def encrypt(filename, key):
    """
    Given a filename (str) and key (bytes), it encrypts the file and write it
    """
    f = Fernet(key)
    with open(filename, "rb") as file:
        # read all file data
        file_data = file.read()
    # encrypt data
    encrypted_data = f.encrypt(file_data)
    # write the encrypted file
    with open(filename, "wb") as file:
        file.write(encrypted_data)


def decrypt(filename, key):
    """
    Given a filename (str) and key (bytes), it decrypts the file and write it
    """
    f = Fernet(key)
    with open(filename, "rb") as file:
        # read the encrypted data
        encrypted_data = file.read()
    # decrypt data
    decrypted_data = f.decrypt(encrypted_data)
    # write the original file
    with open(filename, "wb") as file:
        file.write(decrypted_data)

    
def index():
    session = boto3.Session()
    accesskey, secretkey, region = readConfig()
    s3 = session.resource(
    "s3",
    region_name=region,
    aws_access_key_id=accesskey,
    aws_secret_access_key=secretkey)
    try:
        s3.Bucket(BUCKET_NAME).download_file(FILE_KEY, 'local_firmware.zip')
        print('File downloaded successfully')
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise
            
    try:
        s3.Bucket(BUCKET_NAME).download_file(CHECKSUM_KEY, 'local_checksum.txt')
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise 

    try:
        s3.Bucket(BUCKET_NAME).download_file(ECDC_KEY, 'local_key.key')
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise             
            
    checksum()

    
def checksum():

    ecdc_file_name = '<path>/local_key.key'
    downloaded_key = open(ecdc_file_name).read()
    print("Downloaded Encryption Decryption Key : " ,downloaded_key)
    
    
    checksum_file_name = '<path>/local_checksum.txt'
    downloaded_checksum = open(checksum_file_name).read()
    print("Downloaded Checksum : " , downloaded_checksum)
    
    file_name = '<path>/local_firmware.zip'
    calculated_checksum = hashlib.md5(open(file_name,'rb').read()).hexdigest()
    print("Calculated checksum : " , calculated_checksum)   

    if downloaded_checksum==calculated_checksum:
        print('Checksum verified successfully')
        

    decrypt(file_name, downloaded_key)
    
    with zipfile.ZipFile(file_name,"r") as zip_ref:
        zip_ref.extractall("<path>") 

    print('File unzipped to <path> successfully')
    
    
    os.startfile("<path>/firmware.exe")
    #pyinstaller.exe --onefile --windowed app.py
    
    if os.path.exists("<path>/local_key.key"):
        os.remove("<path>/local_key.key")
    else:
        print("The local key file does not exist")
    
index()

Configuration file (config.yaml)

accesskey:XXXXXXXXXXXXXXXX
secretkey:XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
region:xx-west-x
file_name:xyz.zip

Sample firmware file used (firmware.c)

It is a c file , compiled into an exe and zipped.

#include <stdio.h>
int main() {
   // printf() displays the string inside quotation
   printf("Kindly wait till your software is installed");
   sleep(10000);
   printf("\nInstallation completed");
   sleep(1000);
   return 0;
}

FOTA Code in action

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: