#!/usr/bin/env python3 """ Open Sound Control send/recieve daemon for Tascam Firewire control surface ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :copyright: Copyright (c) 2012-2014 Scott Bahling This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program (see the file COPYING); if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA :license: GPL-2.0, see COPYING for details """ import argparse import string import time import threading from pathlib import Path import strips import fw_1884_buttons import gi gi.require_version('Hinawa', '2.0') from gi.repository import Hinawa # noqa: E402 from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402 bits32 = '{:032b}'.format current_bank = 1 more_banks_up = False more_banks_down = False def check_bitR2L(data, bit): return bool(data & (0b1 << bit)) class RunningStatusThread(): def __init__(self, console, interval=1): """ Constructor :type interval: int :param interval: Check interval, in seconds """ self.interval = interval thread = threading.Thread(target=self.run, args=(console)) thread.daemon = True # Daemonize thread thread.start() # Start the execution self.callbacks = set() self.last_status = [] def add_callback(self, obj): self.callbacks.add(obj) def remove_callback(self, obj): self.callbacks.remove(obj) def fetch_status_values(self, quadlets=None, first_bit=0, last_bit=32): if quadlets is None: quadlets = [i for i in range(0, 16)] def getvalue(bits): # reverse the bit order before slicing so the index '0' is the LSB # and reverse back before converting to int return int(bits32(bits)[::-1][first_bit:last_bit][::-1], 2) values = [getvalue(self.last_status[q]) for q in quadlets] if len(values) == 1: return values[0] return values def run(self, console): """ Method that runs forever """ while True: if not self.callbacks: continue try: self.last_status = console.unit.get_status() except Exception: continue # we should emit warning or error here! for callback in self.callbacks: callback.call(self.fetch_status_values(**callback.args)) time.sleep(self.interval) class Console(): def __init__(self, card): fullpath = seek_snd_unit_path(card) if fullpath: self.unit = TscmConsoleUnit(fullpath) else: exit() self.state = {} self.strips = strips.init_strips(self.unit) self.buttons = fw_1884_buttons.init_buttons(self) self.unit.connect('control', self.handle_control) status_thread = RunningStatusThread(self) # noqa F841 def handle_bit_flags(self, index, before, after): changed = before ^ after for bit in [i for i, b in enumerate(bits32(changed)) if int(b)]: high = check_bitR2L(after, bit) button = self.buttons[index][int(bit)] if button is None: print('unhandled control bit {}:{}'.format(index, bit)) continue if high: button.release() else: button.press() def handle_control(self, index, before, after): print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after)) if index in [5, 6, 7, 8, 9]: self.handle_bit_flags(index, before, after) return if index in [10, 11, 12, 13, 14, 15]: self.handle_encoder(index, before, after) return def _check_hexadecimal(literal): if literal.find('0x') == 0: literal = literal[2:] if len(literal) != 16: return False for character in literal: if character not in string.hexdigits: return False else: return True def _seek_snd_unit_from_guid(guid): for fullpath in Path('/dev/snd').glob('hw*'): fullpath = str(fullpath) try: unit = Hinawa.SndUnit() unit.open(fullpath) if unit.get_property('guid') == guid: return fullpath except Exception as e: pass finally: del unit return None def seek_snd_unit_path(card_id): # Assume as sound card number if it's digit literal. if card_id.isdigit(): return '/dev/snd/hwC{0}D0'.format(card_id) # Assume as GUID on IEEE 1394 bus if it's hexadecimal literal. elif _check_hexadecimal(card_id): return _seek_snd_unit_from_guid(int(card_id, base=16)) return None if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--card", default="1", help="number of the ALSA sound card") parser.add_argument("--listen-ip", default="127.0.0.1", help="The ip to listen on") parser.add_argument("--listen-port", type=int, default=5005, help="The port to listen on") parser.add_argument("--ip", default="127.0.0.1", help="The ip of the OSC server") parser.add_argument("--port", type=int, default=3819, help="The port the OSC server is listening on") args = parser.parse_args() console = Console(args.card)