summaryrefslogtreecommitdiff
path: root/console.py
diff options
context:
space:
mode:
authorsbahling <sbahling@mudgum.net>2018-10-26 14:23:08 +0200
committersbahling <sbahling@mudgum.net>2018-10-26 14:23:08 +0200
commit1d1b23e5154e093a3e3e3e40b05946ab06c9738c (patch)
treea57773bf632c21cecc6427619ec23fd8c88d8b5d /console.py
parent38c4e8310ef6bde30d3628ee93103c631caed47b (diff)
downloadtascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.tar.gz
tascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.tar.xz
tascam-fw-osc-1d1b23e5154e093a3e3e3e40b05946ab06c9738c.zip
Create initial package setup with setuptools
Diffstat (limited to 'console.py')
-rw-r--r--console.py340
1 files changed, 0 insertions, 340 deletions
diff --git a/console.py b/console.py
deleted file mode 100644
index 3fdb3d0..0000000
--- a/console.py
+++ /dev/null
@@ -1,340 +0,0 @@
-#!/usr/bin/env python3
-"""
- Open Sound Control send/recieve daemon for Tascam Firewire control surface
- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-
- :copyright: Copyright (c) 2018 Scott Bahling
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License version 2 as
- published by the Free Software Foundation.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with this program (see the file COPYING); if not, write to the
- Free Software Foundation, Inc.,
- 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
- :license: GPL-2.0, see COPYING for details
-"""
-
-import string
-import time
-import threading
-from pathlib import Path
-
-import strips
-import fw_1884_buttons
-
-import gi
-
-gi.require_version('Hinawa', '2.0')
-from gi.repository import Hinawa # noqa: E402
-from hinawa_utils.tscm.config_rom_parser import TscmConfigRomParser # noqa: E402 E501
-from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402
-
-
-bits32 = '{:032b}'.format
-
-
-class ConsoleStatus():
- def __init__(self, quadlets):
- self.quadlets = quadlets
-
- def field(self, quadlet, first_bit=0, last_bit=32):
- bits = self.quadlets[quadlet]
-
- # reverse the bit order before slicing so the index '0' is the LSB
- # and reverse back before converting to int
- return int(bits32(bits)[::-1][first_bit-1:last_bit][::-1], 2)
-
-
-class RunningStatusThread():
- def __init__(self, console, interval=0.1):
- """ Constructor
- :type interval: int
- :param interval: Check interval, in seconds
- """
- self.console = console
- self.interval = interval
- self.callbacks = set()
- self.last_status = []
-
- thread = threading.Thread(target=self.run)
- thread.daemon = True # Daemonize thread
- thread.start() # Start the execution
-
- def add_callback(self, obj):
- self.callbacks.add(obj)
-
- def remove_callback(self, obj):
- self.callbacks.remove(obj)
-
- def run(self):
- """ Method that runs forever """
- while True:
- for obj in self.callbacks:
- value = self.console.status.field(obj.status_quadlet,
- *obj.status_bits,
- )
- obj.status_callback(value)
-
- time.sleep(self.interval)
-
-
-class Console():
- def __init__(self, card_id=None, guid=None):
-
- fullpath = None
- if guid:
- fullpath = self._seek_snd_unit_from_guid(card_id)
- elif card_id:
- fullpath = '/dev/snd/hwC{0}D0'.format(card_id)
- else:
- try:
- units = list_units()
- if units:
- model, fullpath, guid = units[0]
- except Exception:
- raise Exception('No Tascam FW Console unit found')
-
- if fullpath:
- self.unit = TscmConsoleUnit(fullpath)
- model = self.unit.model_name
- guid = self.unit.get_property('guid')
- print('Found Tascam {0} unit with GUID: {1:016x}'.format(model, guid)) # noqa E501
- else:
- raise Exception('No Tascam FW Console unit found')
-
- self.state = {}
- self.current_bank = 1
- self.more_banks_up = False
- self.more_banks_down = False
- self.strips = strips.init_strips(self)
- self.buttons = {}
- self.init_buttons()
-
- self.unit.connect('control', self.handle_control)
-
- self.status_thread = RunningStatusThread(self) # noqa F841
-
- def _seek_snd_unit_from_guid(self, guid):
- for fullpath in Path('/dev/snd').glob('hw*'):
- fullpath = str(fullpath)
- try:
- unit = Hinawa.SndUnit()
- unit.open(fullpath)
- if unit.get_property('guid') == guid:
- return fullpath
- except Exception as e:
- pass
- finally:
- del unit
- return None
-
- def init_buttons(self):
- self.button_map = fw_1884_buttons.init_buttons(self)
- for index, items in self.button_map.items():
- for item in items:
- if item is None:
- continue
- self.buttons[item.name] = item
-
- @property
- def status(self):
- try:
- return ConsoleStatus(self.unit.get_status())
- except Exception as e:
- raise e
-
- def handle_bit_flags(self, index, before, after):
-
- changed = before ^ after
-
- bits = reversed(bits32(changed))
- for bit in [i for i, b in enumerate(bits) if int(b)]:
- high = bool(after & (0b1 << bit))
- button = self.button_map[index][int(bit)]
- if button is None:
- print('unhandled control bit {}:{}'.format(index, bit))
- continue
-
- if high:
- button.release()
- else:
- button.press()
-
- def handle_encoder(self, index, before, after):
- strip1 = (index * 2 - 19)
- strip2 = (index * 2 - 18)
- bval1 = before & 0xffff
- bval2 = before >> 0x10
- aval1 = after & 0xffff
- aval2 = after >> 0x10
-
- delta1 = roll_over_delta(aval1 - bval1)
- delta2 = roll_over_delta(aval2 - bval2)
-
- if delta1:
- if self.state.get('encoder_mode', '') == 'PAN':
- self.strips[strip1].send_pan(delta1)
-
- if delta2:
- if self.state.get('encoder_mode', '') == 'PAN':
- self.strips[strip2].send_pan(delta2)
-
- def handle_control(self, unit, index, before, after):
-
- print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after))
-
- if index in [5, 6, 7, 8, 9]:
- self.handle_bit_flags(index, before, after)
- return
-
- if index in [10, 11, 12, 13, 14, 15]:
- self.handle_encoder(index, before, after)
- return
-
- def strip_fader_handler(self, addr, ssid, pos):
- print('fader_handler', addr, ssid, pos)
- strip = self.strips[int(ssid)]
- if strip.name == ' ':
- return
- pos = pos * 1023
- strip.fader.position = pos
-
- def master_fader_handler(self, addr, pos):
- print('master_fader_handler', pos)
- self.strips[0].fader.position(1023 * pos)
-
- def default_handler(self, addr, *args):
- print(addr, args)
-
- def strip_mute_handler(self, addr, ssid, state):
- strip = self.strips[int(ssid)]
- print('mute_handler', strip, state)
- if strip.name == ' ':
- return
- if state:
- strip.mute = True
- else:
- strip.mute = False
-
- def strip_solo_handler(self, addr, ssid, state):
- strip = self.strips[int(ssid)]
- print('solo_handler', strip, state)
- if strip.name == ' ':
- return
- if state:
- strip.solo = True
- else:
- strip.solo = False
-
- def strip_recenable_handler(self, addr, ssid, state):
- strip = self.strips[int(ssid)]
- print('recenable_handler', strip, state)
- if strip.name == ' ':
- return
- if state:
- strip.rec = True
- else:
- strip.rec = False
-
- def loop_toggle_handler(self, addr, state):
- print(addr, state)
- if state:
- self.state['LOOP'] = 1
- self.unit.leds.loop.turn_on()
- else:
- self.state['LOOP'] = 0
- self.unit.leds.loop.turn_off()
-
- def transport_stop_handler(self, addr, state):
- print(addr, state)
- if state:
- self.unit.leds.stop.turn_on()
- else:
- self.unit.leds.stop.turn_off()
-
- def transport_play_handler(self, addr, state):
- print(addr, state)
- if state:
- self.unit.leds.play.turn_on()
- else:
- self.unit.leds.play.turn_off()
-
- def ffwd_handler(self, addr, state):
- if state:
- self.unit.leds.f_fwd.turn_on()
- else:
- self.unit.leds.f_fwd.turn_off()
-
- def rewind_handler(self, addr, state):
- if state:
- self.unit.leds.rew.turn_on()
- else:
- self.unit.leds.rew.turn_off()
-
- def rec_enable_toggle_handler(self, addr, state):
- if state:
- self.unit.leds.rec.turn_on()
- else:
- self.unit.leds.rec.turn_off()
-
- def pan_stereo_position_handler(self, addr, pos, pan):
- self.strips[pos].recv_pan(pan)
-
- def strip_name_handler(self, addr, ssid, name):
- self.strips[int(ssid)].name = name
-
- def bank_up_handler(self, addr, more_up):
- print(addr, more_up)
- self.more_banks_up = bool(more_up)
-
- def bank_down_handler(self, addr, more_down):
- print(addr, more_down)
- self.more_banks_down = bool(more_down)
-
-
-def roll_over_delta(delta, ceiling=0xffff):
-
- print(delta)
- if delta > ceiling - 10:
- return delta - (ceiling + 1)
- if delta < -ceiling + 10:
- return delta + (ceiling + 1)
- return delta
-
-
-def _check_hexadecimal(literal):
- if literal.find('0x') == 0:
- literal = literal[2:]
- if len(literal) != 16:
- return False
- for character in literal:
- if character not in string.hexdigits:
- return False
- else:
- return True
-
-
-def list_units():
- units = []
- for fullpath in Path('/dev/snd').glob('hw*'):
- fullpath = str(fullpath)
- unit = None
- try:
- unit = Hinawa.SndUnit()
- unit.open(fullpath)
- parser = TscmConfigRomParser()
- info = parser.parse_rom(unit.get_config_rom())
- model = info['model-name']
- guid = unit.get_property('guid')
- if model in ('FW-1082', 'FW-1884'):
- units.append((model, fullpath, guid))
- except Exception as e:
- pass
-
- return units