summaryrefslogtreecommitdiff
path: root/tascam_fw_console/console.py
diff options
context:
space:
mode:
authorsbahling <sbahling@mudgum.net>2018-10-26 14:23:08 +0200
committersbahling <sbahling@mudgum.net>2018-10-26 14:23:08 +0200
commit1d1b23e5154e093a3e3e3e40b05946ab06c9738c (patch)
treea57773bf632c21cecc6427619ec23fd8c88d8b5d /tascam_fw_console/console.py
parent38c4e8310ef6bde30d3628ee93103c631caed47b (diff)
downloadtascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.tar.gz
tascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.tar.xz
tascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.zip
Create initial package setup with setuptools
Diffstat (limited to 'tascam_fw_console/console.py')
-rw-r--r--tascam_fw_console/console.py340
1 files changed, 340 insertions, 0 deletions
diff --git a/tascam_fw_console/console.py b/tascam_fw_console/console.py
new file mode 100644
index 0000000..103f755
--- /dev/null
+++ b/tascam_fw_console/console.py
@@ -0,0 +1,340 @@
+#!/usr/bin/env python3
+"""
+ Open Sound Control send/recieve daemon for Tascam Firewire control surface
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ :copyright: Copyright (c) 2018 Scott Bahling
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License version 2 as
+ published by the Free Software Foundation.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program (see the file COPYING); if not, write to the
+ Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
+ :license: GPL-2.0, see COPYING for details
+"""
+
+import string
+import time
+import threading
+from pathlib import Path
+
+from tascam_fw_console import strips
+from tascam_fw_console import fw_1884_buttons
+
+import gi
+
+gi.require_version('Hinawa', '2.0')
+from gi.repository import Hinawa # noqa: E402
+from hinawa_utils.tscm.config_rom_parser import TscmConfigRomParser # noqa: E402 E501
+from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402
+
+
+bits32 = '{:032b}'.format
+
+
+class ConsoleStatus():
+ def __init__(self, quadlets):
+ self.quadlets = quadlets
+
+ def field(self, quadlet, first_bit=0, last_bit=32):
+ bits = self.quadlets[quadlet]
+
+ # reverse the bit order before slicing so the index '0' is the LSB
+ # and reverse back before converting to int
+ return int(bits32(bits)[::-1][first_bit-1:last_bit][::-1], 2)
+
+
+class RunningStatusThread():
+ def __init__(self, console, interval=0.1):
+ """ Constructor
+ :type interval: int
+ :param interval: Check interval, in seconds
+ """
+ self.console = console
+ self.interval = interval
+ self.callbacks = set()
+ self.last_status = []
+
+ thread = threading.Thread(target=self.run)
+ thread.daemon = True # Daemonize thread
+ thread.start() # Start the execution
+
+ def add_callback(self, obj):
+ self.callbacks.add(obj)
+
+ def remove_callback(self, obj):
+ self.callbacks.remove(obj)
+
+ def run(self):
+ """ Method that runs forever """
+ while True:
+ for obj in self.callbacks:
+ value = self.console.status.field(obj.status_quadlet,
+ *obj.status_bits,
+ )
+ obj.status_callback(value)
+
+ time.sleep(self.interval)
+
+
+class Console():
+ def __init__(self, card_id=None, guid=None):
+
+ fullpath = None
+ if guid:
+ fullpath = self._seek_snd_unit_from_guid(card_id)
+ elif card_id:
+ fullpath = '/dev/snd/hwC{0}D0'.format(card_id)
+ else:
+ try:
+ units = list_units()
+ if units:
+ model, fullpath, guid = units[0]
+ except Exception:
+ raise Exception('No Tascam FW Console unit found')
+
+ if fullpath:
+ self.unit = TscmConsoleUnit(fullpath)
+ model = self.unit.model_name
+ guid = self.unit.get_property('guid')
+ print('Found Tascam {0} unit with GUID: {1:016x}'.format(model, guid)) # noqa E501
+ else:
+ raise Exception('No Tascam FW Console unit found')
+
+ self.state = {}
+ self.current_bank = 1
+ self.more_banks_up = False
+ self.more_banks_down = False
+ self.strips = strips.init_strips(self)
+ self.buttons = {}
+ self.init_buttons()
+
+ self.unit.connect('control', self.handle_control)
+
+ self.status_thread = RunningStatusThread(self) # noqa F841
+
+ def _seek_snd_unit_from_guid(self, guid):
+ for fullpath in Path('/dev/snd').glob('hw*'):
+ fullpath = str(fullpath)
+ try:
+ unit = Hinawa.SndUnit()
+ unit.open(fullpath)
+ if unit.get_property('guid') == guid:
+ return fullpath
+ except Exception as e:
+ pass
+ finally:
+ del unit
+ return None
+
+ def init_buttons(self):
+ self.button_map = fw_1884_buttons.init_buttons(self)
+ for index, items in self.button_map.items():
+ for item in items:
+ if item is None:
+ continue
+ self.buttons[item.name] = item
+
+ @property
+ def status(self):
+ try:
+ return ConsoleStatus(self.unit.get_status())
+ except Exception as e:
+ raise e
+
+ def handle_bit_flags(self, index, before, after):
+
+ changed = before ^ after
+
+ bits = reversed(bits32(changed))
+ for bit in [i for i, b in enumerate(bits) if int(b)]:
+ high = bool(after & (0b1 << bit))
+ button = self.button_map[index][int(bit)]
+ if button is None:
+ print('unhandled control bit {}:{}'.format(index, bit))
+ continue
+
+ if high:
+ button.release()
+ else:
+ button.press()
+
+ def handle_encoder(self, index, before, after):
+ strip1 = (index * 2 - 19)
+ strip2 = (index * 2 - 18)
+ bval1 = before & 0xffff
+ bval2 = before >> 0x10
+ aval1 = after & 0xffff
+ aval2 = after >> 0x10
+
+ delta1 = roll_over_delta(aval1 - bval1)
+ delta2 = roll_over_delta(aval2 - bval2)
+
+ if delta1:
+ if self.state.get('encoder_mode', '') == 'PAN':
+ self.strips[strip1].send_pan(delta1)
+
+ if delta2:
+ if self.state.get('encoder_mode', '') == 'PAN':
+ self.strips[strip2].send_pan(delta2)
+
+ def handle_control(self, unit, index, before, after):
+
+ print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after))
+
+ if index in [5, 6, 7, 8, 9]:
+ self.handle_bit_flags(index, before, after)
+ return
+
+ if index in [10, 11, 12, 13, 14, 15]:
+ self.handle_encoder(index, before, after)
+ return
+
+ def strip_fader_handler(self, addr, ssid, pos):
+ print('fader_handler', addr, ssid, pos)
+ strip = self.strips[int(ssid)]
+ if strip.name == ' ':
+ return
+ pos = pos * 1023
+ strip.fader.position = pos
+
+ def master_fader_handler(self, addr, pos):
+ print('master_fader_handler', pos)
+ self.strips[0].fader.position(1023 * pos)
+
+ def default_handler(self, addr, *args):
+ print(addr, args)
+
+ def strip_mute_handler(self, addr, ssid, state):
+ strip = self.strips[int(ssid)]
+ print('mute_handler', strip, state)
+ if strip.name == ' ':
+ return
+ if state:
+ strip.mute = True
+ else:
+ strip.mute = False
+
+ def strip_solo_handler(self, addr, ssid, state):
+ strip = self.strips[int(ssid)]
+ print('solo_handler', strip, state)
+ if strip.name == ' ':
+ return
+ if state:
+ strip.solo = True
+ else:
+ strip.solo = False
+
+ def strip_recenable_handler(self, addr, ssid, state):
+ strip = self.strips[int(ssid)]
+ print('recenable_handler', strip, state)
+ if strip.name == ' ':
+ return
+ if state:
+ strip.rec = True
+ else:
+ strip.rec = False
+
+ def loop_toggle_handler(self, addr, state):
+ print(addr, state)
+ if state:
+ self.state['LOOP'] = 1
+ self.unit.leds.loop.turn_on()
+ else:
+ self.state['LOOP'] = 0
+ self.unit.leds.loop.turn_off()
+
+ def transport_stop_handler(self, addr, state):
+ print(addr, state)
+ if state:
+ self.unit.leds.stop.turn_on()
+ else:
+ self.unit.leds.stop.turn_off()
+
+ def transport_play_handler(self, addr, state):
+ print(addr, state)
+ if state:
+ self.unit.leds.play.turn_on()
+ else:
+ self.unit.leds.play.turn_off()
+
+ def ffwd_handler(self, addr, state):
+ if state:
+ self.unit.leds.f_fwd.turn_on()
+ else:
+ self.unit.leds.f_fwd.turn_off()
+
+ def rewind_handler(self, addr, state):
+ if state:
+ self.unit.leds.rew.turn_on()
+ else:
+ self.unit.leds.rew.turn_off()
+
+ def rec_enable_toggle_handler(self, addr, state):
+ if state:
+ self.unit.leds.rec.turn_on()
+ else:
+ self.unit.leds.rec.turn_off()
+
+ def pan_stereo_position_handler(self, addr, pos, pan):
+ self.strips[pos].recv_pan(pan)
+
+ def strip_name_handler(self, addr, ssid, name):
+ self.strips[int(ssid)].name = name
+
+ def bank_up_handler(self, addr, more_up):
+ print(addr, more_up)
+ self.more_banks_up = bool(more_up)
+
+ def bank_down_handler(self, addr, more_down):
+ print(addr, more_down)
+ self.more_banks_down = bool(more_down)
+
+
+def roll_over_delta(delta, ceiling=0xffff):
+
+ print(delta)
+ if delta > ceiling - 10:
+ return delta - (ceiling + 1)
+ if delta < -ceiling + 10:
+ return delta + (ceiling + 1)
+ return delta
+
+
+def _check_hexadecimal(literal):
+ if literal.find('0x') == 0:
+ literal = literal[2:]
+ if len(literal) != 16:
+ return False
+ for character in literal:
+ if character not in string.hexdigits:
+ return False
+ else:
+ return True
+
+
+def list_units():
+ units = []
+ for fullpath in Path('/dev/snd').glob('hw*'):
+ fullpath = str(fullpath)
+ unit = None
+ try:
+ unit = Hinawa.SndUnit()
+ unit.open(fullpath)
+ parser = TscmConfigRomParser()
+ info = parser.parse_rom(unit.get_config_rom())
+ model = info['model-name']
+ guid = unit.get_property('guid')
+ if model in ('FW-1082', 'FW-1884'):
+ units.append((model, fullpath, guid))
+ except Exception as e:
+ pass
+
+ return units