#!/usr/bin/env python3 """ Open Sound Control send/recieve daemon for Tascam Firewire control surface ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :copyright: Copyright (c) 2018 Scott Bahling This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program (see the file COPYING); if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA :license: GPL-2.0, see COPYING for details """ import string import time import threading from pathlib import Path from tascam_fw_console import strips from tascam_fw_console import osc from tascam_fw_console import fw_1884_buttons import gi gi.require_version('Hinawa', '2.0') from gi.repository import Hinawa # noqa: E402 from hinawa_utils.tscm.config_rom_parser import TscmConfigRomParser # noqa: E402 E501 from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402 bits32 = '{:032b}'.format ENCODER_MODES = ['FLIP', 'AUX1', 'AUX2', 'AUX3', 'AUX4', 'AUX5', 'AUX6', 'AUX7', 'AUX8'] class ConsoleStatus(): def __init__(self, quadlets): self.quadlets = quadlets def field(self, quadlet, first_bit=0, last_bit=32): bits = self.quadlets[quadlet] # reverse the bit order before slicing so the index '0' is the LSB # and reverse back before converting to int return int(bits32(bits)[::-1][first_bit-1:last_bit][::-1], 2) class RunningStatusThread(): def __init__(self, console, interval=0.1): """ Constructor :type interval: int :param interval: Check interval, in seconds """ self.console = console self.interval = interval self.callbacks = set() self.last_status = [] thread = threading.Thread(target=self.run) thread.daemon = True # Daemonize thread thread.start() # Start the execution def add_callback(self, obj): self.callbacks.add(obj) def remove_callback(self, obj): self.callbacks.remove(obj) def run(self): """ Method that runs forever """ while True: for obj in self.callbacks: value = self.console.status.field(obj.status_quadlet, *obj.status_bits, ) obj.status_callback(value) time.sleep(self.interval) class Console(): def __init__(self, card_id=None, guid=None): fullpath = None if guid: fullpath = self._seek_snd_unit_from_guid(card_id) elif card_id: fullpath = '/dev/snd/hwC{0}D0'.format(card_id) else: try: units = list_units() if units: model, fullpath, guid = units[0] except Exception: raise Exception('No Tascam FW Console unit found') if fullpath: self.unit = TscmConsoleUnit(fullpath) model = self.unit.model_name guid = self.unit.get_property('guid') print('Found Tascam {0} unit with GUID: {1:016x}'.format(model, guid)) # noqa E501 else: raise Exception('No Tascam FW Console unit found') self.state = {} self.current_bank = 1 self.more_banks_up = True self.more_banks_down = True self.strips = strips.init_strips(self) self.buttons = {} self.init_buttons() self.init_encoder_mode() self.encoders = { 10: (self.strips[1].handle_encoder, self.strips[2].handle_encoder), 11: (self.strips[3].handle_encoder, self.strips[4].handle_encoder), 12: (self.strips[5].handle_encoder, self.strips[6].handle_encoder), 13: (self.strips[7].handle_encoder, self.strips[8].handle_encoder), 14: (self.handle_gain, self.handle_freq), 15: (self.handle_q, self.handle_jog), } self.unit.connect('control', self.handle_control) self.status_thread = RunningStatusThread(self) # noqa F841 def init_encoder_mode(self): self.state['encoder_mode'] = 'PAN' for mode in ENCODER_MODES: self.unit.leds.turn_off(mode) self.unit.leds.turn_on(self.state['encoder_mode']) def _seek_snd_unit_from_guid(self, guid): for fullpath in Path('/dev/snd').glob('hw*'): fullpath = str(fullpath) try: unit = Hinawa.SndUnit() unit.open(fullpath) if unit.get_property('guid') == guid: return fullpath except Exception as e: pass finally: del unit return None def init_buttons(self): self.button_map = fw_1884_buttons.init_buttons(self) for index, items in self.button_map.items(): for item in items: if item is None: continue self.buttons[item.name] = item @property def status(self): try: return ConsoleStatus(self.unit.get_status()) except Exception as e: raise e def handle_bit_flags(self, index, before, after): changed = before ^ after bits = reversed(bits32(changed)) for bit in [i for i, b in enumerate(bits) if int(b)]: high = bool(after & (0b1 << bit)) button = self.button_map[index][int(bit)] if button is None: print('unhandled control bit {}:{}'.format(index, bit)) continue if high: button.release() else: button.press() def handle_gain(self, mode, delta): print('Handle Gain:', mode, delta) def handle_freq(self, mode, delta): print('Handle Freq:', mode, delta) def handle_q(self, mode, delta): print('Handle Q:', mode, delta) def handle_jog(self, mode, delta): print('Handle Jog:', mode, delta) delta = delta * 0.5 if self.buttons['ALT'].pressed: if delta > 0: osc.client.send_message('/access_action', 'Editor/temporal-zoom-in') else: osc.client.send_message('/access_action', 'Editor/temporal-zoom-out') return if self.buttons['SHIFT'].pressed: delta = delta * 0.3 if self.buttons['CTRL'].pressed: delta = delta * 0.1 osc.client.send_message('/jog', delta) def handle_encoder(self, index, before, after): handler1, handler2 = self.encoders.get(int(index), (None, None)) bval1 = before & 0xffff bval2 = before >> 0x10 aval1 = after & 0xffff aval2 = after >> 0x10 delta1 = roll_over_delta(aval1 - bval1) delta2 = roll_over_delta(aval2 - bval2) encoder_mode = self.state.get('encoder_mode', '') if delta1 and handler1: handler1(encoder_mode, delta1) if delta2 and handler2: handler2(encoder_mode, delta2) def handle_control(self, unit, index, before, after): print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after)) if index in [5, 6, 7, 8, 9]: self.handle_bit_flags(index, before, after) return if index in [10, 11, 12, 13, 14, 15]: self.handle_encoder(index, before, after) return def strip_fader_handler(self, addr, ssid, pos): print('fader_handler', addr, ssid, pos) strip = self.strips[int(ssid)] if strip.name == ' ': return pos = pos * 1023 strip.fader.position = pos def master_fader_handler(self, addr, pos): print('master_fader_handler', pos) self.strip_fader_handler(addr, 0, pos) def default_handler(self, addr, *args): print(addr, args) def strip_select_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('select_handler', strip, state) if strip.num == 0 or strip.name == ' ': return if state: strip.select = True else: strip.select = False def strip_mute_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('mute_handler', strip, state) if strip.num == 0 or strip.name == ' ': return if state: strip.mute = True else: strip.mute = False def strip_solo_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('solo_handler', strip, state) if strip.num == 0 or strip.name == ' ': return if state: strip.solo = True else: strip.solo = False def strip_recenable_handler(self, addr, ssid, state): strip = self.strips[int(ssid)] print('recenable_handler', strip, state) if strip.num == 0 or strip.name == ' ': return if state: strip.rec = True else: strip.rec = False def loop_toggle_handler(self, addr, state): print(addr, state) if state: self.state['LOOP'] = 1 self.unit.leds.loop.turn_on() else: self.state['LOOP'] = 0 self.unit.leds.loop.turn_off() def transport_stop_handler(self, addr, state): print(addr, state) if state: self.unit.leds.stop.turn_on() else: self.unit.leds.stop.turn_off() def transport_play_handler(self, addr, state): print(addr, state) if state: self.unit.leds.play.turn_on() else: self.unit.leds.play.turn_off() def ffwd_handler(self, addr, state): if state: self.unit.leds.f_fwd.turn_on() else: self.unit.leds.f_fwd.turn_off() def rewind_handler(self, addr, state): if state: self.unit.leds.rew.turn_on() else: self.unit.leds.rew.turn_off() def rec_enable_toggle_handler(self, addr, state): if state: self.unit.leds.rec.turn_on() else: self.unit.leds.rec.turn_off() def pan_stereo_position_handler(self, addr, ssid, pan): self.strips[int(ssid)].recv_pan(pan) def strip_trimdb_handler(self, addr, ssid, trim): self.strips[int(ssid)].recv_trim(trim) def strip_name_handler(self, addr, ssid, name): self.strips[int(ssid)].name = name def bank_up_handler(self, addr, more_up): print(addr, more_up) self.more_banks_up = bool(more_up) def bank_down_handler(self, addr, more_down): print(addr, more_down) self.more_banks_down = bool(more_down) def roll_over_delta(delta, ceiling=0xffff): print(delta) if delta > ceiling - 10: return delta - (ceiling + 1) if delta < -ceiling + 10: return delta + (ceiling + 1) return delta def _check_hexadecimal(literal): if literal.find('0x') == 0: literal = literal[2:] if len(literal) != 16: return False for character in literal: if character not in string.hexdigits: return False else: return True def list_units(): units = [] for fullpath in Path('/dev/snd').glob('hw*'): fullpath = str(fullpath) unit = None try: unit = Hinawa.SndUnit() unit.open(fullpath) parser = TscmConfigRomParser() info = parser.parse_rom(unit.get_config_rom()) model = info['model-name'] guid = unit.get_property('guid') if model in ('FW-1082', 'FW-1884'): units.append((model, fullpath, guid)) except Exception as e: pass return units