summaryrefslogtreecommitdiff
path: root/console.py
diff options
context:
space:
mode:
Diffstat (limited to 'console.py')
-rw-r--r--console.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/console.py b/console.py
new file mode 100644
index 0000000..a06c5ee
--- /dev/null
+++ b/console.py
@@ -0,0 +1,202 @@
+#!/usr/bin/env python3
+"""
+ Open Sound Control send/recieve daemon for Tascam Firewire control surface
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ :copyright: Copyright (c) 2012-2014 Scott Bahling
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License version 2 as
+ published by the Free Software Foundation.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program (see the file COPYING); if not, write to the
+ Free Software Foundation, Inc.,
+ 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
+ :license: GPL-2.0, see COPYING for details
+"""
+
+import argparse
+import string
+import time
+import threading
+from pathlib import Path
+
+import strips
+import fw_1884_buttons
+
+import gi
+
+gi.require_version('Hinawa', '2.0')
+from gi.repository import Hinawa # noqa: E402
+from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402
+
+
+bits32 = '{:032b}'.format
+
+current_bank = 1
+more_banks_up = False
+more_banks_down = False
+
+
+def check_bitR2L(data, bit):
+ return bool(data & (0b1 << bit))
+
+
+class RunningStatusThread():
+ def __init__(self, console, interval=1):
+ """ Constructor
+ :type interval: int
+ :param interval: Check interval, in seconds
+ """
+ self.interval = interval
+
+ thread = threading.Thread(target=self.run, args=(console))
+ thread.daemon = True # Daemonize thread
+ thread.start() # Start the execution
+ self.callbacks = set()
+ self.last_status = []
+
+ def add_callback(self, obj):
+ self.callbacks.add(obj)
+
+ def remove_callback(self, obj):
+ self.callbacks.remove(obj)
+
+ def fetch_status_values(self, quadlets=None, first_bit=0, last_bit=32):
+ if quadlets is None:
+ quadlets = [i for i in range(0, 16)]
+
+ def getvalue(bits):
+ # reverse the bit order before slicing so the index '0' is the LSB
+ # and reverse back before converting to int
+ return int(bits32(bits)[::-1][first_bit:last_bit][::-1], 2)
+
+ values = [getvalue(self.last_status[q]) for q in quadlets]
+
+ if len(values) == 1:
+ return values[0]
+
+ return values
+
+ def run(self, console):
+ """ Method that runs forever """
+ while True:
+
+ if not self.callbacks:
+ continue
+
+ try:
+ self.last_status = console.unit.get_status()
+ except Exception:
+ continue # we should emit warning or error here!
+
+ for callback in self.callbacks:
+ callback.call(self.fetch_status_values(**callback.args))
+
+ time.sleep(self.interval)
+
+
+class Console():
+ def __init__(self, card):
+
+ fullpath = seek_snd_unit_path(card)
+ if fullpath:
+ self.unit = TscmConsoleUnit(fullpath)
+ else:
+ exit()
+
+ self.state = {}
+ self.strips = strips.init_strips(self.unit)
+ self.buttons = fw_1884_buttons.init_buttons(self)
+
+ self.unit.connect('control', self.handle_control)
+
+ status_thread = RunningStatusThread(self) # noqa F841
+
+ def handle_bit_flags(self, index, before, after):
+
+ changed = before ^ after
+
+ for bit in [i for i, b in enumerate(bits32(changed)) if int(b)]:
+ high = check_bitR2L(after, bit)
+ button = self.buttons[index][int(bit)]
+ if button is None:
+ print('unhandled control bit {}:{}'.format(index, bit))
+ continue
+
+ if high:
+ button.release()
+ else:
+ button.press()
+
+ def handle_control(self, index, before, after):
+
+ print('{0:02d}: {1:08x} {2:08x}'.format(index, before, after))
+
+ if index in [5, 6, 7, 8, 9]:
+ self.handle_bit_flags(index, before, after)
+ return
+
+ if index in [10, 11, 12, 13, 14, 15]:
+ self.handle_encoder(index, before, after)
+ return
+
+
+def _check_hexadecimal(literal):
+ if literal.find('0x') == 0:
+ literal = literal[2:]
+ if len(literal) != 16:
+ return False
+ for character in literal:
+ if character not in string.hexdigits:
+ return False
+ else:
+ return True
+
+
+def _seek_snd_unit_from_guid(guid):
+ for fullpath in Path('/dev/snd').glob('hw*'):
+ fullpath = str(fullpath)
+ try:
+ unit = Hinawa.SndUnit()
+ unit.open(fullpath)
+ if unit.get_property('guid') == guid:
+ return fullpath
+ except Exception as e:
+ pass
+ finally:
+ del unit
+ return None
+
+
+def seek_snd_unit_path(card_id):
+ # Assume as sound card number if it's digit literal.
+ if card_id.isdigit():
+ return '/dev/snd/hwC{0}D0'.format(card_id)
+ # Assume as GUID on IEEE 1394 bus if it's hexadecimal literal.
+ elif _check_hexadecimal(card_id):
+ return _seek_snd_unit_from_guid(int(card_id, base=16))
+ return None
+
+
+if __name__ == "__main__":
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--card", default="1",
+ help="number of the ALSA sound card")
+ parser.add_argument("--listen-ip", default="127.0.0.1",
+ help="The ip to listen on")
+ parser.add_argument("--listen-port", type=int, default=5005,
+ help="The port to listen on")
+ parser.add_argument("--ip", default="127.0.0.1",
+ help="The ip of the OSC server")
+ parser.add_argument("--port", type=int, default=3819,
+ help="The port the OSC server is listening on")
+ args = parser.parse_args()
+
+ console = Console(args.card)