#!/usr/bin/env python3 """ Open Sound Control send/recieve daemon for Tascam Firewire control surface ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :copyright: Copyright (c) 2018 Scott Bahling This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program (see the file COPYING); if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA :license: GPL-2.0, see COPYING for details """ import string import time import threading from pathlib import Path from tascam_fw_console.encoder import Encoder from tascam_fw_console.strip import Strip from tascam_fw_console import fw_1884_buttons import gi gi.require_version('Hinawa', '2.0') from gi.repository import Hinawa # noqa: E402 from hinawa_utils.tscm.config_rom_parser import TscmConfigRomParser # noqa: E402 E501 from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402 bits32 = '{:032b}'.format ENCODER_MODES = ['PAN', 'AUX1', 'AUX2', 'AUX3', 'AUX4', 'AUX5', 'AUX6', 'AUX7', 'AUX8'] class ConsoleStatus(): def __init__(self, quadlets): self.quadlets = quadlets def field(self, index, first_bit=0, last_bit=32): bits = self.quadlets[index] # reverse the bit order before slicing so the index '0' is the LSB # and reverse back before converting to int return int(bits32(bits)[::-1][first_bit-1:last_bit][::-1], 2) class RunningStatusThread(): def __init__(self, console, interval=0.1): """ Background thread that will poll the console status data and pass it to all objects in the callbacks set. If the callback set is empty, polling doesn't happen. Objects added to the callback set must have a status_callback() method, a status_quadlet attribute containing the index of the quadlet of interest, and a status_bits attribute containing a 2 member iterable with the start and end bits of the quadlet of interest. :param console: Console object :type interval: float :param interval: Poll interval, in seconds """ self.console = console self.interval = interval self.callbacks = set() thread = threading.Thread(target=self.run) thread.daemon = True # Daemonize thread thread.start() # Start the execution def add_callback(self, obj): self.callbacks.add(obj) def remove_callback(self, obj): self.callbacks.remove(obj) def run(self): while True: for obj in self.callbacks: value = self.console.status.field(obj.status_quadlet, *obj.status_bits, ) obj.status_callback(value) time.sleep(self.interval) class Console(): def __init__(self, card_id=None, guid=None): fullpath = None if guid: fullpath = self._seek_snd_unit_from_guid(card_id) elif card_id: fullpath = '/dev/snd/hwC{0}D0'.format(card_id) else: try: units = list_units() if units: model, fullpath, guid = units[0] except Exception: raise Exception('No Tascam FW Console unit found') if fullpath: self.unit = TscmConsoleUnit(fullpath) model = self.unit.model_name guid = self.unit.get_property('guid') print('Found Tascam {0} unit with GUID: {1:016x}'.format(model, guid)) # noqa E501 else: raise Exception('No Tascam FW Console unit found') self.state = {} self.current_bank = 1 self.more_banks_up = True self.more_banks_down = True self.init_strips() self.buttons = {} self.init_buttons() self.init_encoders() self.encoders = { 10: (self.strips[1].encoder.update, self.strips[2].encoder.update), 11: (self.strips[3].encoder.update, self.strips[4].encoder.update), 12: (self.strips[5].encoder.update, self.strips[6].encoder.update), 13: (self.strips[7].encoder.update, self.strips[8].encoder.update), 14: (self.gain_encoder.update, self.freq_encoder.update), 15: (self.q_encoder.update, self.jogwheel.update), } self.unit.set_master_fader(False) self.unit.connect('control', self.handle_control) self.status_thread = RunningStatusThread(self) # noqa F841 def init_strips(self): self.strips = [] for strip_num in range(0, 9): self.strips.append(Strip(self, strip_num)) def init_encoders(self): self.gain_encoder = Encoder('EQ Gain', '/eq/gain') self.freq_encoder = Encoder('EQ Freq', '/eq/freq') self.q_encoder = Encoder('EQ Q', '/eq/q') self.jogwheel = Encoder('Jog', '/jogwheel') def _seek_snd_unit_from_guid(self, guid): for fullpath in Path('/dev/snd').glob('hw*'): fullpath = str(fullpath) try: unit = Hinawa.SndUnit() unit.open(fullpath) if unit.get_property('guid') == guid: return fullpath except Exception as e: pass finally: del unit return None def init_buttons(self): self.button_map = fw_1884_buttons.init_buttons(self) for index, items in self.button_map.items(): for item in items: if item is None: continue self.buttons[item.name] = item @property def status(self): try: return ConsoleStatus(self.unit.get_status()) except Exception as e: raise e def handle_bit_flags(self, index, before, after): changed = before ^ after bits = reversed(bits32(changed)) for bit in [i for i, b in enumerate(bits) if int(b)]: high = bool(after & (0b1 << bit)) button = self.button_map[index][int(bit)] if button is None: print('unhandled control bit {}:{}'.format(index, bit)) continue if high: button.release() else: button.press() def handle_encoder(self, index, before, after): handler1, handler2 = self.encoders.get(int(index), (None, None)) bval1 = before & 0xffff bval2 = before >> 0x10 aval1 = after & 0xffff aval2 = after >> 0x10 delta1 = roll_over_delta(aval1 - bval1) delta2 = roll_over_delta(aval2 - bval2) if delta1 and handler1: handler1(delta1) if delta2 and handler2: handler2(delta2) def handle_control(self, unit, index, before, after): # print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after)) if index in [5, 6, 7, 8, 9]: self.handle_bit_flags(index, before, after) return if index in [10, 11, 12, 13, 14, 15]: self.handle_encoder(index, before, after) return def strip_fader_handler(self, addr, ssid, pos): strip = self.strips[int(ssid)] if strip.name == ' ': return strip.fader.position = pos def master_fader_handler(self, addr, pos): self.strip_fader_handler(addr, 0, pos) def default_handler(self, addr, *args): print(addr, args) def strip_select_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('select_handler', strip, state) if strip.num == 0 or strip.name == ' ': return if state: strip.select = True else: strip.select = False def strip_mute_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('mute_handler', strip, state) if strip.num == 0 or strip.name == ' ': return if state: strip.mute = True else: strip.mute = False def strip_solo_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('solo_handler', strip, state) if strip.num == 0 or strip.name == ' ': return if state: strip.solo = True else: strip.solo = False def strip_recenable_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('recenable_handler', strip, state) if strip.num == 0 or strip.name == ' ': return if state: strip.rec = True else: strip.rec = False def loop_toggle_handler(self, addr, state): print(addr, state) if state: self.state['LOOP'] = 1 self.unit.leds.loop.turn_on() else: self.state['LOOP'] = 0 self.unit.leds.loop.turn_off() def transport_stop_handler(self, addr, state): print(addr, state) if state: self.unit.leds.stop.turn_on() else: self.unit.leds.stop.turn_off() def transport_play_handler(self, addr, state): print(addr, state) if state: self.unit.leds.play.turn_on() else: self.unit.leds.play.turn_off() def ffwd_handler(self, addr, state): if state: self.unit.leds.f_fwd.turn_on() else: self.unit.leds.f_fwd.turn_off() def rewind_handler(self, addr, state): if state: self.unit.leds.rew.turn_on() else: self.unit.leds.rew.turn_off() def rec_enable_toggle_handler(self, addr, state): if state: self.unit.leds.rec.turn_on() else: self.unit.leds.rec.turn_off() def pan_stereo_position_handler(self, addr, ssid, pan): self.strips[int(ssid)].recv_pan(pan) def strip_trimdb_handler(self, addr, ssid, trim): self.strips[int(ssid)].recv_trim(trim) def strip_name_handler(self, addr, ssid, name): self.strips[int(ssid)].name = name def bank_up_handler(self, addr, more_up): print(addr, more_up) self.more_banks_up = bool(more_up) def bank_down_handler(self, addr, more_down): print(addr, more_down) self.more_banks_down = bool(more_down) def encoder_mode_handler(self, addr, mode): mode = mode.upper() print(mode) for other in ENCODER_MODES: self.unit.leds.turn_off(other) self.unit.leds.turn_on(mode) def roll_over_delta(delta, ceiling=0xffff): if delta > ceiling - 10: return delta - (ceiling + 1) if delta < -ceiling + 10: return delta + (ceiling + 1) return delta def _check_hexadecimal(literal): if literal.find('0x') == 0: literal = literal[2:] if len(literal) != 16: return False for character in literal: if character not in string.hexdigits: return False else: return True def list_units(): units = [] for fullpath in Path('/dev/snd').glob('hw*'): fullpath = str(fullpath) unit = None try: unit = Hinawa.SndUnit() unit.open(fullpath) parser = TscmConfigRomParser() info = parser.parse_rom(unit.get_config_rom()) model = info['model-name'] guid = unit.get_property('guid') if model in ('FW-1082', 'FW-1884'): units.append((model, fullpath, guid)) except Exception as e: pass return units