Python, ANT+, a dude and his Bike...

in #exhaust5 years ago (edited)

I've had an idea floating around in my head recently -- mostly inspired from my buddies that are into cycling, and my endeavouring to learn some Python. Now that I think about it -- I've learned quite a lot of python over the last year or so... Picking a project and running with it is a pretty great way to learn (bad habits).

Regardless.

  • I've got a bike a home w/ a neat indoor trainer set up. Means I can get sweaty while going precisely nowhere, with the added benefit of terrifying my dog the whole time;
  • Said bicycle has ANT+ speed and cadence sensors, and I've got a comparable heart-rate monitor as well;
  • I've got an ANT+ USB dongle that acts as a receiver; and
  • I recently discovered THIS python library that implements the ANT protocols:

Python implementation of the ANT, ANT+, and ANT-FS protocols. For more information about ANT, see http://www.thisisant.com/.

ANT
python
Tape that shit on
ANT+
Python
Bike and Sensors

Lets play!

Add the afforementioned python library to your projects requirements.txt file:

# requirements.txt
git+https://github.com/SamyCookie/python-ant

Install:
pip install -r requirements.txt

Enjoy:

Start up a new python file called cadence.py, and copy/paste the following:

import math
import re
import sys
import time

from ant.core import driver, node, event, message
from ant.core.constants import *

NETKEY = b'\xB9\xA5\x21\xFB\xBD\x72\xC3\x45'

class ANTListener(event.EventCallback):
    last_speed_time = 0
    last_speed_revs = 0
    now_speed_revs = 0
#    fivesec_speed_revs = 0
#    tensec_speed_revs = 0
    last_cadence_time = 0
    last_cadence_revs = 0
    now_cadence_revs = 0
#    fives_cadence_revs = 0
#    tens_cadence_revs = 0
    wheel_diameter = 0.673 ## Wheel diameter in m

    def process(self, msg, _channel):
        if isinstance(msg, message.ChannelBroadcastDataMessage):
            self.last_cadence_revs = self.now_cadence_revs
            self.last_speed_revs = self.now_speed_revs
            ## Process Cadence Cumulative Revolutions
            ## ctimeLSB = msg.payload[1],ctimeMSB = msg.payload[2],cadenceLSB = msg.payload[3], cadenceMSB = msg.payload[4]
            ctime = int(format(msg.payload[2],'#010b')+re.sub('0b','',format(msg.payload[1],'#010b')),2)
            self.now_cadence_revs = int(format(msg.payload[4],'#010b')+re.sub('0b','',format(msg.payload[3],'#010b')),2)
            ## Precess Speed Cumulative Revolutions
            ## stimeLSB = msg.payload[5],stimeMSB = msg.payload[6],speedLSB = msg.payload[7],speedMSB = msg.payload[8]
            stime = int(format(msg.payload[6],'#010b')+re.sub('0b','',format(msg.payload[5],'#010b')),2)
            self.now_speed_revs = int(format(msg.payload[8],'#010b')+re.sub('0b','',format(msg.payload[7],'#010b')),2)

            speed_rev_delta = self.now_speed_revs - self.last_speed_revs
            speed_time_delta = stime - self.last_speed_time
            cadence_rev_delta = self.now_cadence_revs - self.last_cadence_revs
            cadence_time_delta = ctime - self.last_speed_time
            speed = (speed_rev_delta/speed_time_delta)*1024*(self.wheel_diameter*math.pi*60*60/1000) ## rev/sec x 0.673*math.pi m/rev x 1km/1000m x 60s/min x 60min/hr
            cadence = (cadence_rev_delta/cadence_time_delta)*1024*60

            print("Cadence Timestamps: ",ctime/1024,"|| Cadence Revolutions: ",self.now_cadence_revs)
            print("Cadence Rev Delta: ", cadence_rev_delta," Cadence Time Delta", cadence_time_delta," cadence: ",cadence," RPM")
            print("Speed Timestamps: ",stime/1024,"|| Speed Revolutions: ",self.now_speed_revs)
            print("Speed Rev Delta: ", speed_rev_delta," Speed Time Delta", speed_time_delta," Speed: ",speed," km/hr")
            print("=============")
            self.last_speed_time = stime
            self.last_cadence_time = ctime


    antDevice = '/dev/ttyUSB0'
    antVendor=0x0fcf
    # antProduct=0x1009
    antProduct=0x1008
    stick = driver.USB2Driver(idVendor=antVendor, idProduct=antProduct)
    antnode = node.Node(stick)
    antnode.start()
    channel = antnode.getFreeChannel()

    def setup(self):
        # Start shit up
        #stick = driver.USB2Driver()
        if not self.antnode.running:
            self.antnode.start()

        # Setup channel
        net = node.Network(name='N:ANT+', key=NETKEY)
        self.antnode.setNetworkKey(0, net)
        self.channel.name = 'C:HRM'
        self.channel.assign(net, CHANNEL_TYPE_TWOWAY_RECEIVE)
        self.channel.setID(121, 0, 0)
        self.channel.searchTimeout = TIMEOUT_NEVER
        self.channel.period = 8070
        self.channel.frequency = 57
        self.channel.open()

    def start_listen(self):
        if not self.antnode.running:
            self.setup()
        # Setup callback
        # Note: We could also register an event listener for non-channel events by
        # calling registerEventListener() on antnode rather than channel.
        self.channel.registerCallback(ANTListener())

    def stop_listen(self):
        # Shutdown
        self.channel.close()
        self.channel.unassign()
        self.antnode.stop()

The above script generally does the following:

  • setup() preps the ANT stick for the appropriate idVendor and idProduct, and finds a free channel to work on;
    • Currently, we set up a channel only for our speed/cadence sensors;
    • You can find your device type (if you're a linux user) via lsusb.
  • start_listen() starts the device listening for broadcast events from any sensors matching our specified type;
  • stop_listen() stops the device and closes the channel and antnode -- freeing the USB device to use again;
    • For whatever reason, I end up having to unplug / re-plug the device to get it connected again.. whatever...
  • Listens for broadcast events on the channel

Some notes on how this stuff works:

  • Interpreting / handling the data was a massive pain in the ass... for whatever reason, the data is sent out in byte-arrays and required a fair deal of converting back and forth between binary and integer formatting...
  • Had to learn a bit about MSB and LSB bit numbering... That was fun;
  • If your wheels are a different size, you'll probably want to update the wheel_diameter value -- mine is 673mm;
  • Script spits out your instantaneous cadence in RPM's, and your instantaneous speed in km/hr -- if you want your shit Americanized, you can 'GIT OUT:

So assuming that you've punched in the appropriate antProduct listed in your lsusb, and updated for any difference in wheel size -- you SHOULD be up and rolling:

Open up your terminal:

source env/bin/activate
python
import cadence
listener = cadence.ANTListener()
listener.setup()
listener.start_listen()

you SHOULD start seeing outputs like the following:

Cadence Timestamps:  6.95703125 || Cadence Revolutions:  285
Cadence Rev Delta:  4  Cadence Time Delta 2742  cadence:  89.62800875273523  RPM
Speed Timestamps:  7.2412109375 || Speed Revolutions:  1581
Speed Rev Delta:  8  Speed Time Delta 3033  Speed:  20.55819452018244  km/hr
=============
Cadence Timestamps:  9.6435546875 || Cadence Revolutions:  289
Cadence Rev Delta:  4  Cadence Time Delta 2460  cadence:  99.90243902439025  RPM
Speed Timestamps:  10.1884765625 || Speed Revolutions:  1589
Speed Rev Delta:  8  Speed Time Delta 3018  Speed:  20.66037242535233  km/hr
=============
Cadence Timestamps:  13.0166015625 || Cadence Revolutions:  294
Cadence Rev Delta:  5  Cadence Time Delta 2896  cadence:  106.07734806629834  RPM
Speed Timestamps:  13.140625 || Speed Revolutions:  1597
Speed Rev Delta:  8  Speed Time Delta 3023  Speed:  20.62620045640534  km/hr
=============
Cadence Timestamps:  15.744140625 || Cadence Revolutions:  298
Cadence Rev Delta:  4  Cadence Time Delta 2666  cadence:  92.18304576144037  RPM
Speed Timestamps:  16.130859375 || Speed Revolutions:  1605
Speed Rev Delta:  8  Speed Time Delta 3062  Speed:  20.363489216104945  km/hr
=============
Cadence Timestamps:  18.486328125 || Cadence Revolutions:  302
Cadence Rev Delta:  4  Cadence Time Delta 2412  cadence:  101.8905472636816  RPM
Speed Timestamps:  18.7626953125 || Speed Revolutions:  1612
Speed Rev Delta:  7  Speed Time Delta 2695  Speed:  20.24448181159524  km/hr

So that's it that's all! That's how you can whip up a quick and dirty ANT+ speed/cadence reader...

Next steps are up to you -- I've got some cool ideas for what I want to accomplish... Images below paint a bit of a story for what I'm thinking...

Godot
Zwift Cycling Game
xhaust.me
Open Source Game Engine
Inspiration
Community

What projects / ideas are you working on? Do you like to have some kind of creative fire burning? I feel like most people here are pretty creative -- that's kind of the "early adopter" crowd that Steem has attracted.

Do you think you'd play a cycling game where you can meetup digitally and race other Steemians / Exhaust-o-nauts? I probably would, so long as I didn't have to go up against @run.vince.run or @jgrieco. This would probably be a fun way to have another "Solstice Race" event...

Sort:  

I can see that was too cool to resist. I really ought to look into what I can do with Python as I use it at work. Maybe I can automate some of my posts a bit.

Yeah, man! Play around and see what you can come up with.

@holger80's beem library is a pretty great resource for playing w/ python and Steem.

I've seen @felixxx has some tutorials I will bookmark. It's just a matter of finding time.

I am quite proud of part 3 tbh :)

Looks cool! I might give 'em a read!

You probably won't need them anymore. It is fairly basic stuff and only meant to bridge the gap between knowing some Python and actually getting a tool out for STEEM.

I think I would in fact I stopped cycling when the rainy season started, I'm happy to say that is mostly gone which means I'll be able to enjoy cycling around more but I would def play at night or maybe when rain is too bad.

Cool! Good to know!

I think it would be a strong community builder, and a great way to chat w/ some of the stronger athletes. I'm actually really excited about working on this.... Hoping to have a proof-of-concept fleshed out in the coming weeks....

Ideally -- it would integrate w/ EXHAUST nicely, too!

Think on building a developers support group as well I could also help if needed. Maybe start adding few tasks in @utopian-io and maybe get some support from the devlepers