#!/usr/bin/env python3 """ Open Sound Control send/recieve daemon for Tascam Firewire control surface ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :copyright: Copyright (c) 2012-2014 Scott Bahling This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License version 2 as published by the Free Software Foundation. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program (see the file COPYING); if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA :license: GPL-2.0, see COPYING for details """ import argparse import string import re import pyautogui from pathlib import Path from pythonosc import dispatcher from pythonosc import osc_server from pythonosc import udp_client import gi gi.require_version('Hinawa', '2.0') from gi.repository import Hinawa # noqa: E402 from hinawa_utils.tscm.tscm_console_unit import TscmConsoleUnit # noqa: E402 keymap = {'CTRL': 'ctrl', 'ALT': 'alt', 'SHIFT': 'shift', } osc = None osc_addrs = {'STOP': '/transport_stop', 'PLAY': '/transport_play', 'F.FWD': '/ffwd', 'REW': '/rewind', 'REC': '/rec_enable_toggle', } control_state = {} button_state = {} re_strip_num = re.compile('.*([1-9])') current_bank = 1 more_banks_up = False more_banks_down = False def check_bitR2L(data, bit): return bool(data & (0b1 << bit)) def handle_master_fader(unit, flags): addr = '/master/fader' pos = (flags & 0xffff) / 1023 if pos != control_state.get('Master Fader', 0): print('{}: {}'.format('Master', pos)) if strip_states[0].fader_touch: osc.send_message(addr, pos) def handle_strip_fader(unit, index, flags): addr = '/strip/fader' strip1 = (index * 2 + 1) strip2 = (index * 2 + 2) fader1 = 'Strip %s Fader' % strip1 fader2 = 'Strip %s Fader' % strip2 pos1 = (flags & 0xffff) / 1023 pos2 = (flags >> 0x10) / 1023 if pos1 != strip_states[strip1].fader: print('{}: {}'.format(fader1, pos1)) if strip_states[strip1].fader_touch: osc.send_message(addr, (strip1, pos1)) if pos2 != strip_states[strip2].fader: print('{}: {}'.format(fader2, pos2)) if strip_states[strip2].fader_touch: osc.send_message(addr, (strip2, pos2)) def roll_over_delta(delta, ceiling=0xffff): print(delta) if delta > ceiling - 10: return delta - (ceiling + 1) if delta < -ceiling + 10: return delta + (ceiling + 1) return delta def handle_encoder(unit, index, flags): strip1 = (index * 2 - 19) strip2 = (index * 2 - 18) enc1 = 'Strip %s Encoder' % strip1 enc2 = 'Strip %s Encoder' % strip2 val1 = flags & 0xffff val2 = flags >> 0x10 delta1 = roll_over_delta(val1 - control_state.get(enc1, val1)) delta2 = roll_over_delta(val2 - control_state.get(enc2, val2)) if delta1: if control_state.get('PAN', 0): send_pan(strip1, delta1) if delta2: if control_state.get('PAN', 0): send_pan(strip2, delta2) control_state[enc1] = val1 control_state[enc2] = val2 def handle_eq_encoder(unit, index, flags): val1 = flags & 0xffff val2 = flags >> 0x10 if index == 14: enc1 = 'Gain Encoder' enc2 = 'Freq Encoder' if index == 15: enc1 = 'Q Encoder' enc2 = 'Data Wheel' # delta1 = roll_over_delta(val1 - control_state.get(enc1, val1)) delta2 = roll_over_delta(val2 - control_state.get(enc2, val2)) if val1 != control_state.get(enc1, 0): strip_states[strip].mute = 0 print('{}: {}'.format(enc1, val1)) control_state[enc1] = val1 if val2 != control_state.get(enc2, 0): print('{}: {}'.format(enc2, val2)) control_state[enc2] = val2 if enc2 == 'Data Wheel': if button_state.get('SHIFT', 0): amount = 0.01 elif button_state.get('CTRL', 0): amount = 0.001 else: amount = 0.1 osc.send_message('/jog', delta2 * amount) def handle_bit_flags(unit, index, flags): for bit in range(32): high = check_bitR2L(flags, bit) low = not high command_map = control_flags[index][bit] if command_map is None or command_map[1] is None: continue control = command_map[0] on = control_state.get(control, 0) or button_state.get(control, 0) button_state[control] = int(low) if (low and on) or (high and not on): continue bitstate = int(high) print(command_map) handler = command_map[1] args = (None,) if len(command_map) > 2: args = command_map[2:] try: handler(index, flags, bit, bitstate, control, *args) except Exception as e: print(e) def handle_control(unit, index, flags): print('{0:02d}: {1:08x}'.format(index, flags)) if index in [0, 1, 2, 3]: handle_strip_fader(unit, index, flags) return if index == 4: handle_master_fader(unit, flags) return if index in [5, 6, 7, 8, 9]: handle_bit_flags(unit, index, flags) return if index in [10, 11, 12, 13]: handle_encoder(unit, index, flags) return if index in [14, 15]: handle_eq_encoder(unit, index, flags) def _check_hexadecimal(literal): if literal.find('0x') == 0: literal = literal[2:] if len(literal) != 16: return False for character in literal: if character not in string.hexdigits: return False else: return True def _seek_snd_unit_from_guid(guid): for fullpath in Path('/dev/snd').glob('hw*'): fullpath = str(fullpath) try: unit = Hinawa.SndUnit() unit.open(fullpath) if unit.get_property('guid') == guid: return fullpath except Exception as e: pass finally: del unit return None def seek_snd_unit_path(card_id): # Assume as sound card number if it's digit literal. if card_id.isdigit(): return '/dev/snd/hwC{0}D0'.format(card_id) # Assume as GUID on IEEE 1394 bus if it's hexadecimal literal. elif _check_hexadecimal(card_id): return _seek_snd_unit_from_guid(int(card_id, base=16)) return None def print_default_handler(addr, *args): print(addr, args) def strip_fader_handler(addr, args, ssid, pos): unit = args[0] strip = int(ssid) print('fader_handler', addr, ssid, pos) pos = pos * 1023 if strip_states[strip].name != ' ': unit.strips[strip].fader.set_position(pos) strip_states[strip].fader = pos def strip_mute_handler(addr, args, ssid, state): unit = args[0] strip = int(ssid) print('mute_handler', strip, state) if strip_states[strip].name == ' ': return if state: unit.strips[int(ssid)].mute_led.turn_on() strip_states[int(ssid)].mute = True else: unit.strips[int(ssid)].mute_led.turn_off() strip_states[int(ssid)].mute = False def strip_solo_handler(addr, args, ssid, state): unit = args[0] strip = int(ssid) print('solo_handler', strip, state) if strip_states[strip].name == ' ': return if state: unit.strips[strip].solo_led.turn_on() strip_states[strip].solo = True else: unit.strips[strip].solo_led.turn_off() strip_states[strip].solo = False def strip_recenable_handler(addr, args, ssid, state): unit = args[0] strip = int(ssid) print('recenable_handler', strip, state) if strip_states[strip].name == ' ': return if state: unit.strips[strip].rec_led.turn_on() strip_states[strip].recenable = True else: unit.strips[strip].rec_led.turn_off() strip_states[strip].recenable = False def loop_toggle_handler(addr, state): print(addr, state) if state: control_state['LOOP'] = 1 unit.leds.loop.turn_on() else: control_state['LOOP'] = 0 unit.leds.loop.turn_off() def transport_stop_handler(addr, args, state): print(addr, args, state) unit = args[0] if state: unit.leds.stop.turn_on() else: unit.leds.stop.turn_off() def transport_play_handler(addr, args, state): print(addr, args, state) unit = args[0] if state: unit.leds.play.turn_on() else: unit.leds.play.turn_off() def ffwd_handler(addr, args, state): unit = args[0] if state: unit.leds.f_fwd.turn_on() else: unit.leds.f_fwd.turn_off() def rewind_handler(addr, args, state): unit = args[0] if state: unit.leds.rew.turn_on() else: unit.leds.rew.turn_off() def rec_enable_toggle_handler(addr, args, state): unit = args[0] if state: unit.leds.rec.turn_on() else: unit.leds.rec.turn_off() def master_fader_handler(addr, args, pos): unit = args[0] print('master_fader_handler', pos) unit.master_fader.set_position(1023 * pos) def pan_stereo_position_handler(addr, pos, pan): recv_pan(pos, pan) def strip_name_handler(addr, ssid, name): strip_states[int(ssid)].name = name def bank_up_handler(addr, more_up): print(addr, more_up) global more_banks_up more_banks_up = bool(more_up) def bank_down_handler(addr, more_down): print(addr, more_down) global more_banks_down more_banks_down = bool(more_down) def init_console(): handle_encoder_button(1, 1, 1, 1, 'PAN', 1) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--card", default="1", help="number of the ALSA sound card") parser.add_argument("--listen-ip", default="127.0.0.1", help="The ip to listen on") parser.add_argument("--listen-port", type=int, default=5005, help="The port to listen on") parser.add_argument("--ip", default="127.0.0.1", help="The ip of the OSC server") parser.add_argument("--port", type=int, default=3819, help="The port the OSC server is listening on") args = parser.parse_args() fullpath = seek_snd_unit_path(args.card) if fullpath: unit = TscmConsoleUnit(fullpath) else: exit() unit.connect('control', handle_control) osc = udp_client.SimpleUDPClient(args.ip, args.port) init_console() dispatcher = dispatcher.Dispatcher() dispatcher.map("/loop_toggle", loop_toggle_handler) dispatcher.map("/transport_stop", transport_stop_handler, unit) dispatcher.map("/transport_play", transport_play_handler, unit) dispatcher.map("/ffwd", ffwd_handler, unit) dispatcher.map("/rewind", rewind_handler, unit) dispatcher.map("/rec_enable_toggle", rec_enable_toggle_handler, unit) dispatcher.map('/strip/fader', strip_fader_handler, unit) dispatcher.map('/strip/mute', strip_mute_handler, unit) dispatcher.map('/strip/solo', strip_solo_handler, unit) dispatcher.map('/strip/recenable', strip_recenable_handler, unit) dispatcher.map('/master/fader', master_fader_handler, unit) dispatcher.map('/strip/pan_stereo_position', pan_stereo_position_handler) dispatcher.map('/strip/name', strip_name_handler) dispatcher.map('/bank_up', bank_up_handler) dispatcher.map('/bank_down', bank_down_handler) dispatcher.set_default_handler(print_default_handler) server = osc_server.ThreadingOSCUDPServer((args.listen_ip, args.listen_port), dispatcher ) print("Serving on {}".format(server.server_address)) server.serve_forever()