#!/usr/bin/env python3 """ Open Sound Control send/recieve daemon for Tascam Firewire control surface ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :copyright: Copyright (c) 2012-2014 Scott Bahling This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program (see the file COPYING); if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA :license: GPL-2.0, see COPYING for details """ import string import time import threading from pathlib import Path import strips import fw_1884_buttons import gi gi.require_version('Hinawa', '2.0') from gi.repository import Hinawa # noqa: E402 from hinawa_utils.tscm.config_rom_parser import TscmConfigRomParser # noqa: E402 E501 from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402 bits32 = '{:032b}'.format class ConsoleStatus(): def __init__(self, quadlets): self.quadlets = quadlets def field(self, quadlet, first_bit=0, last_bit=32): bits = self.quadlets[quadlet] # reverse the bit order before slicing so the index '0' is the LSB # and reverse back before converting to int return int(bits32(bits)[::-1][first_bit-1:last_bit][::-1], 2) class RunningStatusThread(): def __init__(self, console, interval=0.1): """ Constructor :type interval: int :param interval: Check interval, in seconds """ self.console = console self.interval = interval self.callbacks = set() self.last_status = [] thread = threading.Thread(target=self.run) thread.daemon = True # Daemonize thread thread.start() # Start the execution def add_callback(self, obj): self.callbacks.add(obj) def remove_callback(self, obj): self.callbacks.remove(obj) def run(self): """ Method that runs forever """ while True: for obj in self.callbacks: value = self.console.status.field(obj.status_quadlet, *obj.status_bits, ) obj.status_callback(value) time.sleep(self.interval) class Console(): def __init__(self, card_id=None, guid=None): fullpath = None if guid: fullpath = self._seek_snd_unit_from_guid(card_id) elif card_id: fullpath = '/dev/snd/hwC{0}D0'.format(card_id) else: try: units = list_units() if units: model, fullpath, guid = units[0] except Exception: raise Exception('No Tascam FW Console unit found') if fullpath: self.unit = TscmConsoleUnit(fullpath) model = self.unit.model_name guid = self.unit.get_property('guid') print('Found Tascam {0} unit with GUID: {1:016x}'.format(model, guid)) # noqa E501 else: raise Exception('No Tascam FW Console unit found') self.state = {} self.current_bank = 1 self.more_banks_up = False self.more_banks_down = False self.strips = strips.init_strips(self) self.buttons = {} self.init_buttons() self.unit.connect('control', self.handle_control) self.status_thread = RunningStatusThread(self) # noqa F841 def _seek_snd_unit_from_guid(self, guid): for fullpath in Path('/dev/snd').glob('hw*'): fullpath = str(fullpath) try: unit = Hinawa.SndUnit() unit.open(fullpath) if unit.get_property('guid') == guid: return fullpath except Exception as e: pass finally: del unit return None def init_buttons(self): self.button_map = fw_1884_buttons.init_buttons(self) for index, items in self.button_map.items(): for item in items: if item is None: continue self.buttons[item.name] = item @property def status(self): try: return ConsoleStatus(self.unit.get_status()) except Exception as e: raise e def handle_bit_flags(self, index, before, after): changed = before ^ after bits = reversed(bits32(changed)) for bit in [i for i, b in enumerate(bits) if int(b)]: high = bool(after & (0b1 << bit)) button = self.button_map[index][int(bit)] if button is None: print('unhandled control bit {}:{}'.format(index, bit)) continue if high: button.release() else: button.press() def handle_encoder(self, index, before, after): strip1 = (index * 2 - 19) strip2 = (index * 2 - 18) bval1 = before & 0xffff bval2 = before >> 0x10 aval1 = after & 0xffff aval2 = after >> 0x10 delta1 = roll_over_delta(aval1 - bval1) delta2 = roll_over_delta(aval2 - bval2) if delta1: if self.state.get('encoder_mode', '') == 'PAN': self.strips[strip1].send_pan(delta1) if delta2: if self.state.get('encoder_mode', '') == 'PAN': self.strips[strip2].send_pan(delta2) def handle_control(self, unit, index, before, after): print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after)) if index in [5, 6, 7, 8, 9]: self.handle_bit_flags(index, before, after) return if index in [10, 11, 12, 13, 14, 15]: self.handle_encoder(index, before, after) return def strip_fader_handler(self, addr, ssid, pos): print('fader_handler', addr, ssid, pos) strip = self.strips[int(ssid)] if strip.name == ' ': return pos = pos * 1023 strip.fader.position = pos def master_fader_handler(self, addr, pos): print('master_fader_handler', pos) self.strips[0].fader.position(1023 * pos) def default_handler(self, addr, *args): print(addr, args) def strip_mute_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('mute_handler', strip, state) if strip.name == ' ': return if state: strip.mute = True else: strip.mute = False def strip_solo_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('solo_handler', strip, state) if strip.name == ' ': return if state: strip.solo = True else: strip.solo = False def strip_recenable_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('recenable_handler', strip, state) if strip.name == ' ': return if state: strip.rec = True else: strip.rec = False def loop_toggle_handler(self, addr, state): print(addr, state) if state: self.state['LOOP'] = 1 self.unit.leds.loop.turn_on() else: self.state['LOOP'] = 0 self.unit.leds.loop.turn_off() def transport_stop_handler(self, addr, state): print(addr, state) if state: self.unit.leds.stop.turn_on() else: self.unit.leds.stop.turn_off() def transport_play_handler(self, addr, state): print(addr, state) if state: self.unit.leds.play.turn_on() else: self.unit.leds.play.turn_off() def ffwd_handler(self, addr, state): if state: self.unit.leds.f_fwd.turn_on() else: self.unit.leds.f_fwd.turn_off() def rewind_handler(self, addr, state): if state: self.unit.leds.rew.turn_on() else: self.unit.leds.rew.turn_off() def rec_enable_toggle_handler(self, addr, state): if state: self.unit.leds.rec.turn_on() else: self.unit.leds.rec.turn_off() def pan_stereo_position_handler(self, addr, pos, pan): self.strips[pos].recv_pan(pan) def strip_name_handler(self, addr, ssid, name): self.strips[int(ssid)].name = name def bank_up_handler(self, addr, more_up): print(addr, more_up) self.more_banks_up = bool(more_up) def bank_down_handler(self, addr, more_down): print(addr, more_down) self.more_banks_down = bool(more_down) def roll_over_delta(delta, ceiling=0xffff): print(delta) if delta > ceiling - 10: return delta - (ceiling + 1) if delta < -ceiling + 10: return delta + (ceiling + 1) return delta def _check_hexadecimal(literal): if literal.find('0x') == 0: literal = literal[2:] if len(literal) != 16: return False for character in literal: if character not in string.hexdigits: return False else: return True def list_units(): units = [] for fullpath in Path('/dev/snd').glob('hw*'): fullpath = str(fullpath) unit = None try: unit = Hinawa.SndUnit() unit.open(fullpath) parser = TscmConfigRomParser() info = parser.parse_rom(unit.get_config_rom()) model = info['model-name'] guid = unit.get_property('guid') if model in ('FW-1082', 'FW-1884'): units.append((model, fullpath, guid)) except Exception as e: pass return units