diff options
| author | sbahling <sbahling@mudgum.net> | 2018-10-26 14:23:08 +0200 |
|---|---|---|
| committer | sbahling <sbahling@mudgum.net> | 2018-10-26 14:23:08 +0200 |
| commit | 1d1b23e5154e093a3e3e3e40b05946ab06c9738c (patch) | |
| tree | a57773bf632c21cecc6427619ec23fd8c88d8b5d /tascam_fw_console/console.py | |
| parent | 38c4e8310ef6bde30d3628ee93103c631caed47b (diff) | |
| download | tascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.tar.gz tascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.tar.xz tascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.zip | |
Create initial package setup with setuptools
Diffstat (limited to 'tascam_fw_console/console.py')
| -rw-r--r-- | tascam_fw_console/console.py | 340 |
1 files changed, 340 insertions, 0 deletions
diff --git a/tascam_fw_console/console.py b/tascam_fw_console/console.py new file mode 100644 index 0000000..103f755 --- /dev/null +++ b/tascam_fw_console/console.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python3 +""" + Open Sound Control send/recieve daemon for Tascam Firewire control surface + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + :copyright: Copyright (c) 2018 Scott Bahling + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License version 2 as + published by the Free Software Foundation. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program (see the file COPYING); if not, write to the + Free Software Foundation, Inc., + 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA + :license: GPL-2.0, see COPYING for details +""" + +import string +import time +import threading +from pathlib import Path + +from tascam_fw_console import strips +from tascam_fw_console import fw_1884_buttons + +import gi + +gi.require_version('Hinawa', '2.0') +from gi.repository import Hinawa # noqa: E402 +from hinawa_utils.tscm.config_rom_parser import TscmConfigRomParser # noqa: E402 E501 +from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402 + + +bits32 = '{:032b}'.format + + +class ConsoleStatus(): + def __init__(self, quadlets): + self.quadlets = quadlets + + def field(self, quadlet, first_bit=0, last_bit=32): + bits = self.quadlets[quadlet] + + # reverse the bit order before slicing so the index '0' is the LSB + # and reverse back before converting to int + return int(bits32(bits)[::-1][first_bit-1:last_bit][::-1], 2) + + +class RunningStatusThread(): + def __init__(self, console, interval=0.1): + """ Constructor + :type interval: int + :param interval: Check interval, in seconds + """ + self.console = console + self.interval = interval + self.callbacks = set() + self.last_status = [] + + thread = threading.Thread(target=self.run) + thread.daemon = True # Daemonize thread + thread.start() # Start the execution + + def add_callback(self, obj): + self.callbacks.add(obj) + + def remove_callback(self, obj): + self.callbacks.remove(obj) + + def run(self): + """ Method that runs forever """ + while True: + for obj in self.callbacks: + value = self.console.status.field(obj.status_quadlet, + *obj.status_bits, + ) + obj.status_callback(value) + + time.sleep(self.interval) + + +class Console(): + def __init__(self, card_id=None, guid=None): + + fullpath = None + if guid: + fullpath = self._seek_snd_unit_from_guid(card_id) + elif card_id: + fullpath = '/dev/snd/hwC{0}D0'.format(card_id) + else: + try: + units = list_units() + if units: + model, fullpath, guid = units[0] + except Exception: + raise Exception('No Tascam FW Console unit found') + + if fullpath: + self.unit = TscmConsoleUnit(fullpath) + model = self.unit.model_name + guid = self.unit.get_property('guid') + print('Found Tascam {0} unit with GUID: {1:016x}'.format(model, guid)) # noqa E501 + else: + raise Exception('No Tascam FW Console unit found') + + self.state = {} + self.current_bank = 1 + self.more_banks_up = False + self.more_banks_down = False + self.strips = strips.init_strips(self) + self.buttons = {} + self.init_buttons() + + self.unit.connect('control', self.handle_control) + + self.status_thread = RunningStatusThread(self) # noqa F841 + + def _seek_snd_unit_from_guid(self, guid): + for fullpath in Path('/dev/snd').glob('hw*'): + fullpath = str(fullpath) + try: + unit = Hinawa.SndUnit() + unit.open(fullpath) + if unit.get_property('guid') == guid: + return fullpath + except Exception as e: + pass + finally: + del unit + return None + + def init_buttons(self): + self.button_map = fw_1884_buttons.init_buttons(self) + for index, items in self.button_map.items(): + for item in items: + if item is None: + continue + self.buttons[item.name] = item + + @property + def status(self): + try: + return ConsoleStatus(self.unit.get_status()) + except Exception as e: + raise e + + def handle_bit_flags(self, index, before, after): + + changed = before ^ after + + bits = reversed(bits32(changed)) + for bit in [i for i, b in enumerate(bits) if int(b)]: + high = bool(after & (0b1 << bit)) + button = self.button_map[index][int(bit)] + if button is None: + print('unhandled control bit {}:{}'.format(index, bit)) + continue + + if high: + button.release() + else: + button.press() + + def handle_encoder(self, index, before, after): + strip1 = (index * 2 - 19) + strip2 = (index * 2 - 18) + bval1 = before & 0xffff + bval2 = before >> 0x10 + aval1 = after & 0xffff + aval2 = after >> 0x10 + + delta1 = roll_over_delta(aval1 - bval1) + delta2 = roll_over_delta(aval2 - bval2) + + if delta1: + if self.state.get('encoder_mode', '') == 'PAN': + self.strips[strip1].send_pan(delta1) + + if delta2: + if self.state.get('encoder_mode', '') == 'PAN': + self.strips[strip2].send_pan(delta2) + + def handle_control(self, unit, index, before, after): + + print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after)) + + if index in [5, 6, 7, 8, 9]: + self.handle_bit_flags(index, before, after) + return + + if index in [10, 11, 12, 13, 14, 15]: + self.handle_encoder(index, before, after) + return + + def strip_fader_handler(self, addr, ssid, pos): + print('fader_handler', addr, ssid, pos) + strip = self.strips[int(ssid)] + if strip.name == ' ': + return + pos = pos * 1023 + strip.fader.position = pos + + def master_fader_handler(self, addr, pos): + print('master_fader_handler', pos) + self.strips[0].fader.position(1023 * pos) + + def default_handler(self, addr, *args): + print(addr, args) + + def strip_mute_handler(self, addr, ssid, state): + strip = self.strips[int(ssid)] + print('mute_handler', strip, state) + if strip.name == ' ': + return + if state: + strip.mute = True + else: + strip.mute = False + + def strip_solo_handler(self, addr, ssid, state): + strip = self.strips[int(ssid)] + print('solo_handler', strip, state) + if strip.name == ' ': + return + if state: + strip.solo = True + else: + strip.solo = False + + def strip_recenable_handler(self, addr, ssid, state): + strip = self.strips[int(ssid)] + print('recenable_handler', strip, state) + if strip.name == ' ': + return + if state: + strip.rec = True + else: + strip.rec = False + + def loop_toggle_handler(self, addr, state): + print(addr, state) + if state: + self.state['LOOP'] = 1 + self.unit.leds.loop.turn_on() + else: + self.state['LOOP'] = 0 + self.unit.leds.loop.turn_off() + + def transport_stop_handler(self, addr, state): + print(addr, state) + if state: + self.unit.leds.stop.turn_on() + else: + self.unit.leds.stop.turn_off() + + def transport_play_handler(self, addr, state): + print(addr, state) + if state: + self.unit.leds.play.turn_on() + else: + self.unit.leds.play.turn_off() + + def ffwd_handler(self, addr, state): + if state: + self.unit.leds.f_fwd.turn_on() + else: + self.unit.leds.f_fwd.turn_off() + + def rewind_handler(self, addr, state): + if state: + self.unit.leds.rew.turn_on() + else: + self.unit.leds.rew.turn_off() + + def rec_enable_toggle_handler(self, addr, state): + if state: + self.unit.leds.rec.turn_on() + else: + self.unit.leds.rec.turn_off() + + def pan_stereo_position_handler(self, addr, pos, pan): + self.strips[pos].recv_pan(pan) + + def strip_name_handler(self, addr, ssid, name): + self.strips[int(ssid)].name = name + + def bank_up_handler(self, addr, more_up): + print(addr, more_up) + self.more_banks_up = bool(more_up) + + def bank_down_handler(self, addr, more_down): + print(addr, more_down) + self.more_banks_down = bool(more_down) + + +def roll_over_delta(delta, ceiling=0xffff): + + print(delta) + if delta > ceiling - 10: + return delta - (ceiling + 1) + if delta < -ceiling + 10: + return delta + (ceiling + 1) + return delta + + +def _check_hexadecimal(literal): + if literal.find('0x') == 0: + literal = literal[2:] + if len(literal) != 16: + return False + for character in literal: + if character not in string.hexdigits: + return False + else: + return True + + +def list_units(): + units = [] + for fullpath in Path('/dev/snd').glob('hw*'): + fullpath = str(fullpath) + unit = None + try: + unit = Hinawa.SndUnit() + unit.open(fullpath) + parser = TscmConfigRomParser() + info = parser.parse_rom(unit.get_config_rom()) + model = info['model-name'] + guid = unit.get_property('guid') + if model in ('FW-1082', 'FW-1884'): + units.append((model, fullpath, guid)) + except Exception as e: + pass + + return units |
