diff options
Diffstat (limited to 'console.py')
| -rw-r--r-- | console.py | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/console.py b/console.py new file mode 100644 index 0000000..a06c5ee --- /dev/null +++ b/console.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +""" + Open Sound Control send/recieve daemon for Tascam Firewire control surface + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + :copyright: Copyright (c) 2012-2014 Scott Bahling + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License version 2 as + published by the Free Software Foundation. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program (see the file COPYING); if not, write to the + Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + :license: GPL-2.0, see COPYING for details +""" + +import argparse +import string +import time +import threading +from pathlib import Path + +import strips +import fw_1884_buttons + +import gi + +gi.require_version('Hinawa', '2.0') +from gi.repository import Hinawa # noqa: E402 +from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402 + + +bits32 = '{:032b}'.format + +current_bank = 1 +more_banks_up = False +more_banks_down = False + + +def check_bitR2L(data, bit): + return bool(data & (0b1 << bit)) + + +class RunningStatusThread(): + def __init__(self, console, interval=1): + """ Constructor + :type interval: int + :param interval: Check interval, in seconds + """ + self.interval = interval + + thread = threading.Thread(target=self.run, args=(console)) + thread.daemon = True # Daemonize thread + thread.start() # Start the execution + self.callbacks = set() + self.last_status = [] + + def add_callback(self, obj): + self.callbacks.add(obj) + + def remove_callback(self, obj): + self.callbacks.remove(obj) + + def fetch_status_values(self, quadlets=None, first_bit=0, last_bit=32): + if quadlets is None: + quadlets = [i for i in range(0, 16)] + + def getvalue(bits): + # reverse the bit order before slicing so the index '0' is the LSB + # and reverse back before converting to int + return int(bits32(bits)[::-1][first_bit:last_bit][::-1], 2) + + values = [getvalue(self.last_status[q]) for q in quadlets] + + if len(values) == 1: + return values[0] + + return values + + def run(self, console): + """ Method that runs forever """ + while True: + + if not self.callbacks: + continue + + try: + self.last_status = console.unit.get_status() + except Exception: + continue # we should emit warning or error here! + + for callback in self.callbacks: + callback.call(self.fetch_status_values(**callback.args)) + + time.sleep(self.interval) + + +class Console(): + def __init__(self, card): + + fullpath = seek_snd_unit_path(card) + if fullpath: + self.unit = TscmConsoleUnit(fullpath) + else: + exit() + + self.state = {} + self.strips = strips.init_strips(self.unit) + self.buttons = fw_1884_buttons.init_buttons(self) + + self.unit.connect('control', self.handle_control) + + status_thread = RunningStatusThread(self) # noqa F841 + + def handle_bit_flags(self, index, before, after): + + changed = before ^ after + + for bit in [i for i, b in enumerate(bits32(changed)) if int(b)]: + high = check_bitR2L(after, bit) + button = self.buttons[index][int(bit)] + if button is None: + print('unhandled control bit {}:{}'.format(index, bit)) + continue + + if high: + button.release() + else: + button.press() + + def handle_control(self, index, before, after): + + print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after)) + + if index in [5, 6, 7, 8, 9]: + self.handle_bit_flags(index, before, after) + return + + if index in [10, 11, 12, 13, 14, 15]: + self.handle_encoder(index, before, after) + return + + +def _check_hexadecimal(literal): + if literal.find('0x') == 0: + literal = literal[2:] + if len(literal) != 16: + return False + for character in literal: + if character not in string.hexdigits: + return False + else: + return True + + +def _seek_snd_unit_from_guid(guid): + for fullpath in Path('/dev/snd').glob('hw*'): + fullpath = str(fullpath) + try: + unit = Hinawa.SndUnit() + unit.open(fullpath) + if unit.get_property('guid') == guid: + return fullpath + except Exception as e: + pass + finally: + del unit + return None + + +def seek_snd_unit_path(card_id): + # Assume as sound card number if it's digit literal. + if card_id.isdigit(): + return '/dev/snd/hwC{0}D0'.format(card_id) + # Assume as GUID on IEEE 1394 bus if it's hexadecimal literal. + elif _check_hexadecimal(card_id): + return _seek_snd_unit_from_guid(int(card_id, base=16)) + return None + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser() + parser.add_argument("--card", default="1", + help="number of the ALSA sound card") + parser.add_argument("--listen-ip", default="127.0.0.1", + help="The ip to listen on") + parser.add_argument("--listen-port", type=int, default=5005, + help="The port to listen on") + parser.add_argument("--ip", default="127.0.0.1", + help="The ip of the OSC server") + parser.add_argument("--port", type=int, default=3819, + help="The port the OSC server is listening on") + args = parser.parse_args() + + console = Console(args.card) |
