import atexit
import threading
import time
from sys import exit, version_info
try:
from smbus import SMBus
except ImportError:
if version_info[0] < 3:
exit("This library requires python-smbus\nInstall with: sudo apt-get install python-smbus")
elif version_info[0] == 3:
exit("This library requires python3-smbus\nInstall with: sudo apt-get install python3-smbus")
try:
import RPi.GPIO as GPIO
except ImportError:
exit("This library requires the RPi.GPIO module\nInstall with: sudo pip install RPi.GPIO")
__version__ = '0.0.7'
SW_ADDR = 0x42
SW_RESET_PIN = 17
SW_XFER_PIN = 27
SW_HEADER_SIZE = 4
SW_DATA_DSP = 0b0000000000000001
SW_DATA_GESTURE = 0b0000000000000010
SW_DATA_TOUCH = 0b0000000000000100
SW_DATA_AIRWHEEL = 0b0000000000001000
SW_DATA_XYZ = 0b0000000000010000
SW_SYSTEM_STATUS = 0x15
SW_REQUEST_MSG = 0x06
SW_FW_VERSION = 0x83
SW_SET_RUNTIME = 0xA2
SW_SENSOR_DATA = 0x91
i2c_bus = 0
if GPIO.RPI_REVISION == 2 or GPIO.RPI_REVISION == 3:
i2c_bus = 1
i2c = SMBus(i2c_bus)
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(SW_RESET_PIN, GPIO.OUT, initial=GPIO.HIGH)
GPIO.setup(SW_XFER_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
x = 0.0
y = 0.0
z = 0.0
rotation = 0.0
_lastrotation = 0.0
gesture = 0
io_error_count = 0
_worker = None
_on_flick = None
_on_move = None
_on_airwheel = []
_on_touch = {}
_on_touch_repeat = {}
_on_touch_last = {}
_on_garbage = None
_on_circle = {}
def millis():
return int(round(time.time() * 1000))
def reset():
GPIO.output(SW_RESET_PIN, GPIO.LOW)
time.sleep(.1)
GPIO.output(SW_RESET_PIN, GPIO.HIGH)
time.sleep(.5) # Datasheet delay of 200ms plus change
[docs]class StoppableThread(threading.Thread):
'''Basic stoppable thread wrapper
Adds Event for stopping the execution loop
and exiting cleanly.
'''
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()
self.daemon = True
def start(self):
if self.isAlive() == False:
self.stop_event.clear()
threading.Thread.start(self)
def stop(self):
if self.isAlive() == True:
# set event to signal thread to terminate
self.stop_event.set()
# block calling thread until thread really has terminated
self.join()
[docs]class AsyncWorker(StoppableThread):
'''Basic thread wrapper class for asyncronously running functions
Basic thread wrapper class for running functions
asyncronously. Return False from your function
to abort looping.
'''
def __init__(self, todo):
StoppableThread.__init__(self)
self.todo = todo
def run(self):
while self.stop_event.is_set() == False:
if self.todo() == False:
self.stop_event.set()
break
def _handle_sensor_data(data):
global _lastrotation, rotation
d_configmask = data.pop(0) | data.pop(0) << 8
d_timestamp = data.pop(0) # 200hz, 8-bit counter, max ~1.25sec
d_sysinfo = data.pop(0)
d_dspstatus = data[0:2]
d_gesture = data[2:6]
d_touch = data[6:10]
d_airwheel = data[10:12]
d_xyz = data[12:20]
d_noisepow = data[20:24]
if d_configmask & SW_DATA_XYZ and d_sysinfo & 0b0000001:
# We have xyz info, and it's valid
x, y, z = (
(d_xyz[1] << 8 | d_xyz[0]) / 65536.0,
(d_xyz[3] << 8 | d_xyz[2]) / 65536.0,
(d_xyz[5] << 8 | d_xyz[4]) / 65536.0
)
if callable(_on_move):
_on_move(x, y, z)
if d_configmask & SW_DATA_GESTURE and not d_gesture[0] == 0:
# We have a gesture!
is_edge = (d_gesture[3] & 0b00000001) > 0
gestures = [
('garbage','',''),
('flick','west','east'),
('flick','east','west'),
('flick','south','north'),
('flick','north','south'),
('circle','clockwise',''),
('circle','counter-clockwise','')
]
for i,gesture in enumerate(gestures):
if d_gesture[0] == i + 1:
if gesture[0] == 'flick' and callable(_on_flick):
_on_flick(gesture[1], gesture[2])
break
if d_configmask & SW_DATA_TOUCH:
# We have a touch
d_action = d_touch[1] << 8 | d_touch[0]
d_touchcount = d_touch[2] * 5 # Time to touch in ms
actions = [
('touch','south'),
('touch','west'),
('touch','north'),
('touch','east'),
('touch','center'),
('tap','south'),
('tap','west'),
('tap','north'),
('tap','east'),
('tap','center'),
('doubletap','south'),
('doubletap','west'),
('doubletap','north'),
('doubletap','east'),
('doubletap','center')
]
comp = 0b0000000000000001 << len(actions)-1
for action in reversed(actions):
if d_action & comp:
handle_touch = False
if action[0] in _on_touch.keys() and action[1] in _on_touch[action[0]].keys():
if not action[0] in _on_touch_last.keys():
_on_touch_last[action[0]] = {}
handle_touch = True
if not action[1] in _on_touch_last[action[0]].keys():
_on_touch_last[action[0]][action[1]] = None
handle_touch = True
elif (millis() - _on_touch_last[action[0]][action[1]]) >= 1000.0 / _on_touch_repeat[action[0]][action[1]]:
handle_touch = True
if callable(_on_touch[action[0]][action[1]]) and handle_touch:
_on_touch[action[0]][action[1]]()
_on_touch_last[action[0]][action[1]] = millis()
if action[0] in _on_touch.keys() and 'all' in _on_touch[action[0]].keys():
if not action[0] in _on_touch_last.keys():
_on_touch_last[action[0]] = {}
handle_touch = True
if not 'all' in _on_touch_last[action[0]].keys():
_on_touch_last[action[0]]['all'] = None
handle_touch = True
elif (millis() - _on_touch_last[action[0]]['all']) >= 1000.0 / _on_touch_repeat[action[0]]['all']:
handle_touch = True
if callable(_on_touch[action[0]]['all']) and handle_touch:
_on_touch[action[0]]['all'](action[1])
_on_touch_last[action[0]]['all'] = millis()
break
comp = comp >> 1
if d_configmask & SW_DATA_AIRWHEEL and d_sysinfo & 0b00000010:
# Airwheel
delta = (d_airwheel[0] - _lastrotation) / 32.0
# Delta is in degrees, with 1 = full 360 degree rotation
# Positive numbers equal clockwise delta, negative are counter-clockwise
if delta != 0 and delta > -0.5 and delta < 0.5:
if callable(_on_airwheel):
_on_airwheel(delta * 360.0)
rotation += delta
if rotation < 0:
rotation = 0
if rotation > 1000:
rotation = 1000
_lastrotation = d_airwheel[0]
def _handle_status_info(data):
error = data[7] << 8 | data[6]
def _handle_firmware_info(data):
print('Got firmware info')
d_fw_valid = data.pop(0)
d_hw_rev = data.pop(0) | data.pop(0) << 8
d_param_st = data.pop(0)
d_loader_version = [ data.pop(0), data.pop(0), data.pop(0) ],
d_fw_st = data.pop(0)
d_fw_version = ''.join(map(chr,data))
print(d_fw_version)
if d_fw_valid == 0:
raise Exception("No valid GestIC Library could be located")
if d_fw_valid == 0x0A:
raise Exception("An invalid GestiIC Library was stored, or the last update failed")
def _do_poll():
global io_error_count
time.sleep(0.001)
if not GPIO.input(SW_XFER_PIN):
'''
Assert transfer line low to ensure
MGC3130 doesn't update data buffers
'''
GPIO.setup(SW_XFER_PIN, GPIO.OUT, initial=GPIO.LOW)
try:
data = i2c.read_i2c_block_data(SW_ADDR, 0x00, 26)
io_error_count = 0
except IOError:
io_error_count += 1
if io_error_count > 10:
raise Exception("Skywriter encoutered nore than 10 consecutive I2C IO errors!")
return
d_size = data.pop(0)
d_flags = data.pop(0)
d_seq = data.pop(0)
d_ident = data.pop(0)
if d_ident == 0x91:
_handle_sensor_data(data)
elif d_ident == 0x15:
_handle_status_info(data)
elif d_ident == 0x83:
_handle_firmware_info(data)
else:
pass
GPIO.setup(SW_XFER_PIN, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def _start_poll():
global _worker
if _worker == None:
_worker = AsyncWorker(_do_poll)
_worker.start()
def _stop_poll():
global _worker
_worker.stop()
def get_arg(args, arg, default = None):
if arg in args.keys():
return args[arg]
return default
[docs]def flick(*args, **kwargs):
'''Bind flick event'''
def register(handler):
global _on_flick
_on_flick = handler
return register
[docs]def touch(*args, **kwargs):
'''Bind touch event
:param repeat_rate: Max number of times/second to fire the touch event
:param position: Position of touch to watch- north, south, east, west, center
'''
global _on_touch, _on_touch_repeat
t_position = get_arg(kwargs, 'position', 'all')
t_repeat_rate = get_arg(kwargs, 'repeat_rate', 4)
if not 'touch' in _on_touch.keys():
_on_touch['touch'] = {}
if not 'touch' in _on_touch_repeat.keys():
_on_touch_repeat['touch'] = {}
def register(handler):
global _on_touch, _on_touch_repeat
_on_touch['touch'][t_position] = handler
_on_touch_repeat['touch'][t_position] = t_repeat_rate
return register
[docs]def tap(*args, **kwargs):
'''Bind tap event
:param repeat_rate: Max number of times/second to fire the touch event
:param position: Position of tap to watch- north, south, east, west, center
'''
global _on_touch, _on_touch_repeat
t_position = get_arg(kwargs, 'position', 'all')
t_repeat_rate = get_arg(kwargs, 'repeat_rate', 4)
if not 'tap' in _on_touch.keys():
_on_touch['tap'] = {}
if not 'tap' in _on_touch_repeat.keys():
_on_touch_repeat['tap'] = {}
def register(handler):
global _on_touch, _on_touch_repeat
_on_touch['tap'][t_position] = handler
_on_touch_repeat['tap'][t_position] = t_repeat_rate
return register
[docs]def double_tap(*args, **kwargs):
'''Bind double tap event
:param repeat_rate: Max number of times/second to fire the double tap event
:param position: Position of double tap to watch- north, south, east, west, center
'''
global _on_touch, _on_touch_repeat
t_position = get_arg(kwargs, 'position', 'all')
t_repeat_rate = get_arg(kwargs, 'repeat_rate', 4)
if not 'doubletap' in _on_touch.keys():
_on_touch['doubletap'] = {}
if not 'doubletap' in _on_touch_repeat.keys():
_on_touch_repeat['doubletap'] = {}
def register(handler):
global _on_touch
_on_touch['doubletap'][t_position] = handler
_on_touch_repeat['doubletap'][t_position] = t_repeat_rate
return register
[docs]def garbage():
'''Bind an action to the "garbage" gesture
A sort of grab-and-throw-away-garbage above the Skywriter
'''
def register(handler):
global _on_garbage
_on_garbage = handler
return register
[docs]def move():
'''Bind an action to move
The handler will receive x, y and z values
describing the tracked finger in 3D space above
the Skywriter.
'''
def register(handler):
global _on_move
_on_move = handler
return register
[docs]def airwheel():
'''Bind an action to the "airhweel" gesture
Point your finger at the Skywriter and spin it in a wheel
The handler will receive a rotation delta in degrees
'''
def register(handler):
global _on_airwheel
_on_airwheel = handler
return register
def _exit():
_stop_poll()
if GPIO != None:
GPIO.cleanup()
atexit.register(_exit)
reset()
i2c.write_i2c_block_data(SW_ADDR, 0xa1, [0b00000000, 0b00011111, 0b00000000, 0b00011111])
_start_poll()