Skip to content
Snippets Groups Projects
Commit 54d16853 authored by leo's avatar leo
Browse files

~folder structure

parent f8a67292
No related branches found
No related tags found
No related merge requests found
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import colorsys
import numpy
class Color(object):
red = None
green = None
blue = None
hue = None
saturation = None
value = None
alpha = None
shortcuts = { "h":"hue", "s":"saturation", "v":"value", "r":"red", "g":"green", "b":"blue" }
attributes_range = { "hue":360, "saturation":1, "value":1, "red":255, "green":255, "blue":255 }
def __init__(self, red=None, green=None, blue=None, hue=None, saturation=None, value=None, alpha=None):
rgb_passed = bool(red)|bool(green)|bool(blue)
hsv_passed = bool(hue)|bool(saturation)|bool(value)
if not alpha:
alpha = 0.0
if rgb_passed and hsv_passed:
raise ValueError("Color can't be initialized with RGB and HSV at the same time.")
elif hsv_passed:
if not hue:
hue = 0.0
if not saturation:
saturation = 0.0
if not value:
value = 0.0
super(Color, self).__setattr__('hue', hue)
super(Color, self).__setattr__('saturation', saturation)
super(Color, self).__setattr__('value', value)
self._update_rgb()
else:
if not red:
red = 0
if not green:
green = 0
if not blue:
blue = 0
super(Color, self).__setattr__('red', red)
super(Color, self).__setattr__('green', green)
super(Color, self).__setattr__('blue', blue)
self._update_hsv()
super(Color, self).__setattr__('alpha', alpha)
def __getattr__(self, key):
if key == "hue" or key == "h": return self.hue
if key == "saturation" or key == "s": return self.saturation
if key == "value" or key == "v": return self.value
if key == "red" or key == "r": return self.red
if key == "green" or key == "g": return self.green
if key == "blue" or key == "b": return self.blue
else: return None
def translate_shortcut( self, p_shortcut ):
return self.shortcuts[ p_shortcut ]
def get_attributes_range( self, key ):
return self.attributes_range[ key ]
def __setattr__(self, key, value):
if key in ('red', 'green', 'blue'):
if value > 255.0:
value = value % 255.0
super(Color, self).__setattr__(key, value)
self._update_hsv()
elif key in ('hue', 'saturation', 'value'):
if key == 'hue' and (value >= 360.0 or value < 0):
value = value % 360.0
elif key != 'hue' and value > 1.0:
value = 1.0
super(Color, self).__setattr__(key, value)
self._update_rgb()
else:
if key == 'alpha' and value > 1.0: # TODO: Might this be more fitting in another place?
value = 1.0
super(Color, self).__setattr__(key, value)
def __repr__(self):
return '<%s: red %f, green %f, blue %f, hue %f, saturation %f, value %f, alpha %f>' % (
self.__class__.__name__,
self.red,
self.green,
self.blue,
self.hue,
self.saturation,
self.value,
self.alpha
)
def __str__(self):
return "%d %d %d" % (
int(round(self.red * self.alpha)),
int(round(self.green * self.alpha)),
int(round(self.blue * self.alpha)),
)
def blend(self, other, mode='normal'):
if self.alpha != 1.0: # no clue how to blend with a translucent bottom layer
self.red = self.red * self.alpha
self.green = self.green * self.alpha
self.blue = self.blue * self.alpha
self.alpha = 1.0
if mode == 'normal':
own_influence = 1.0 - other.alpha
self.red = (self.red * own_influence) + (other.red * other.alpha)
self.green = (self.green * own_influence) + (other.green * other.alpha)
self.blue = (self.blue * own_influence) + (other.blue * other.alpha)
def lighten(self, other):
if isinstance(other, int) or isinstance(other, float):
other = Color(red=other, green=other, blue=other, alpha=1.0)
if self.alpha != 1.0:
self.red = self.red * self.alpha
self.green = self.green * self.alpha
self.blue = self.blue * self.alpha
self.alpha = 1.0
red = self.red + (other.red * other.alpha)
green = self.green + (other.green * other.alpha)
blue = self.blue + (other.blue * other.alpha)
if red > 255.0:
red = 255.0
if green > 255.0:
green = 255.0
if blue > 255.0:
blue = 255.0
self.red = red
self.green = green
self.blue = blue
def darken(self, other):
if isinstance(other, int) or isinstance(other, float):
other = Color(red=other, green=other, blue=other, alpha=1.0)
red = self.red - other.red
green = self.green - other.green
blue = self.blue - other.blue
if red < 0:
red = 0
if green < 0:
green = 0
if blue < 0:
blue = 0
self.red = red
self.green = green
self.blue = blue
def hex(self):
return "%.2X%.2X%.2X" % (self.red, self.green, self.blue)
def _update_hsv(self):
hue, saturation, value = colorsys.rgb_to_hsv(self.red/255.0, self.green/255.0, self.blue/255.0)
super(Color, self).__setattr__('hue', hue * 360.0)
super(Color, self).__setattr__('saturation', saturation)
super(Color, self).__setattr__('value', value)
def _update_rgb(self):
red, green, blue = colorsys.hsv_to_rgb(self.hue / 360.0, self.saturation, self.value)
super(Color, self).__setattr__('red', red * 255.0)
super(Color, self).__setattr__('green', green * 255.0)
super(Color, self).__setattr__('blue', blue * 255.0)
def copy(self):
return Color( self.r, self.g, self.b )
def rgb_tripel(self, p_int=False):
if p_int:
return (int(self.red), int(self.green), int(self.blue))
else:
return (self.red, self.green, self.blue)
def bin_ranges(length, num_bins):
length = numpy.float64(length)
bin_range = []
current_range = length / (2 ** num_bins -1)
lower = 0
upper = current_range -1
if upper < lower:
upper = lower
bin_range.append([int(round(lower)), int(round(upper))])
for i in range(1, num_bins):
current_range *=2
lower = upper + 1
upper = lower + current_range - 1
if upper < lower:
upper = lower
bin_range.append([int(round(lower)), int(round(upper))])
return bin_range
def octave_amplitudes(amplitudes, num_bins):
bins = numpy.zeros(num_bins)
i = 0
ranges = bin_ranges(len(amplitudes), num_bins)
for lower, upper in ranges:
if lower == upper:
bin = amplitudes[lower]
else:
#bin = amplitudes[lower:upper].mean()
#bin = sum(amplitudes[lower:upper]) / float(len(amplitudes[lower:upper])) # average instead of mean
#bin = numpy.max(amplitudes[lower:upper]) # this is an ugly hack instead of doing, like, statistical analysis or anything.
bin = (numpy.max(amplitudes[lower:upper]) + numpy.mean(amplitudes[lower:upper])) / 2 # mean and max mixed
if numpy.isnan(bin):
bin = numpy.float64(0)
bins[i] = bin
i += 1
return bins
import socket
STANDARD_WIDTH = 3.5
class Blinkenfoo(object):
"""This class represents a blinkenfoo instance.
A blinkenfoo can display information send via upd and can listen to mqtt to play
preconfigured animations. It als knows how many pixels it has:
A pixel is one blinken-unit which can be controlled separately from the other
units. A pixel can be a single LED on a i.E. ws2812 or a whole stripe.
If the blinkenfoo is a led matrix then the pixel count is a*b.
The physical width describes the width in cm of one blinkenunit/pixel. This is
to make animations run the same speed on different blinkenfoo devices"""
def __init__(self, p_name, p_udp_host, p_length, p_physikal_width = STANDARD_WIDTH, p_udp_port = 2390, p_mqtt_channel = None):
self.name = p_name
self.udp_host = p_udp_host
self.udp_port = p_udp_port
self.length = p_length
self.physikal_width = p_physikal_width
self.mqtt_channel = p_mqtt_channel
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._reset_buffer()
def _unify_color_to_rgb(self, p_color):
if type(p_color) is tuple:
return p_color
elif type(p_color) is str:
return self._hex_to_rgb(p_color)
else:
raise ValueError("Unknown type! Color must be RGB Tuple or hex values as string")
def _hex_string_to_rgb_list(self, p_hex_string):
"""
Converts a hex string to an rgb frame
"""
length = len(p_hex_string)
if not length % 6 == 0:
raise ValueError("Invalid Hex-string: length does not divide by 6")
rgb_list = []
for i in range(0, length, 6):
hex_value = p_hex_string[i:i+6]
rgb_list.append(self._hex_to_rgb(hex_value))
return rgb_list
def _hex_list_to_rgb_list(self, p_hex_list):
"""
Converts a hex frame into a rgb frame
"""
rgb_list = []
for hex in p_hex_list:
rgb_list.append(self._hex_to_rgb(hex))
return rgb_list
def _hex_to_rgb(self, p_hex):
"""Turns a hex value #123456 into rgb (18, 52, 86)"""
return tuple(int(p_hex[i:i+2], 16) for i in (0, 2 ,4))
"""
Example:
package = _generate_packet(0, 9, [(255, 0, 0), (255, 0, 0), ...., (255, 0, 0)])
_send_packet(package)
Works for python2
"""
def _generate_packet(self, startpixel, endpixel, rgb):
""" Generates a udp packet the blinkenfoo understands"""
pack = chr(0) + chr(startpixel) + chr(0) + chr(endpixel)
# pack = chr((startpixel >> 8) & 0xff) + chr(startpixel & 0xff) + chr((startpixel) >> 8) & 0xff) + chr(endpixel & 0xff)
for color in rgb:
r, g, b = color
pack += chr(r) + chr(g) + chr(b)
# pack += chr(r & 0xff) + chr(g & 0xff) + chr(b & 0xff)
return pack
def _send_packet(self, p_packet):
"""Sends the given udp_packet to this's host:port"""
try:
self.udp_socket.sendto(p_packet, (self.udp_host, self.udp_port))
except socket.error:
pass
def _reset_buffer(self):
self.buffer = [(0, 0, 0) for _ in range(self.length)]
def _send_rgb_frame(self, p_rgb_list):
"""
Assuming the type/length have already been checked.
"""
packet = self._generate_packet(0, len(p_rgb_list)-1, p_rgb_list)
self._send_packet(packet)
####################
###Public methods###
####################
def is_reachable(self):
rgb_frame = [(0, 0, 0) for _ in range(self.length)]
package = self._generate_packet(0, self.length -1, rgb_frame)
try:
self.udp_socket.sendto(package, (self.udp_host, self.udp_port))
return True
except socket.error:
return False
def calc_fps(self, p_fps):
return STANDARD_WIDTH * p_fps / self.physikal_width
def send_frame(self, p_frame):
"""
Sends an frame to the blinkenfoo. A frame can be a hex-string,
an hex list or an rgb list. Other types may cause erros later in the
programm. The length of the frame must match the pixel count of this
blinkenfoo. Example input, length = 3:
Hex-String: "ff000000ff000000ff"
Hex-List: ["ff0000", "00ff00", "0000ff"]
RGB-List: [(255, 0, 0), (0, 255, 0), (0, 0, 255)]
"""
rgb_frame = []
if type(p_frame) is str:
rgb_frame = self._hex_string_to_rgb_list(p_frame)
elif type(p_frame) is list:
if type(p_frame[0]) is tuple:
rgb_frame = p_frame
else:
rgb_frame = self._hex_list_to_rgb_list(p_frame)
else:
raise ValueError("Frame is neither str or list. Given type: %s" % str(type(p_frame)))
length = len(rgb_frame)
if length == self.length:
packet = self._generate_packet(0, len(rgb_frame)-1, rgb_frame)
else:
raise ValueError("Given list length does not match pixel count!: Given %d, expected %d" % (length, self.length))
self._send_packet(packet)
def set_full_color(self, p_color):
"""Displays the given color on every pixel of the blinkenfoo"""
color = self._unify_color_to_rgb(p_color)
rgb_list = [color for _ in range(self.length)]
self._send_rgb_frame(rgb_list)
def set_led(self, p_index, p_color):
"""Sets an specific color at the given index. The change is visible after
flush is called"""
if p_index > self.length:
raise ValueError("Out of range: given %d, length %d" % (p_index, self.length))
p_color = self._unify_color_to_rgb(p_color)
self.buffer[p_index] = p_color
def flush(self, p_reset = True):
self._send_rgb_frame(self.buffer)
if p_reset:
self._reset_buffer()
##################
###End of class###
##################
spheres = Blinkenfoo("Spheres", "ESP_35D447.warpzone", 9, 20)
panel = Blinkenfoo("Panel", "ESP_35d9E4.warpzone", 8, 12)
warp_sign = Blinkenfoo("Warp-Sign", "ESP_133C4C.warpzone", 1, 25)
dmx = Blinkenfoo("DMX", "10.0.3.27", 5, 10)
cubes = Blinkenfoo("Cubes", "cubes.warpzone", 8, 5)
import socket
STANDARD_WIDTH = 3.5
class Blinkenfoo(object):
"""This class represents a blinkenfoo instance.
A blinkenfoo can display information send via upd and can listen to mqtt to play
preconfigured animations. It als knows how many pixels it has:
A pixel is one blinken-unit which can be controlled separately from the other
units. A pixel can be a single LED on a i.E. ws2812 or a whole stripe.
If the blinkenfoo is a led matrix then the pixel count is a*b.
The physikale width describes the width in cm of one blinkenunit/pixel. This is
to make animations run the same speed on different blinkenfoo devices"""
def __init__(self, p_name, p_udp_host, p_length, p_physikal_width = STANDARD_WIDTH, p_udp_port = 2390, p_mqtt_channel = None):
self.name = p_name
self.udp_host = p_udp_host
self.udp_port = p_udp_port
self.length = p_length
self.physikal_width = p_physikal_width
self.mqtt_channel = p_mqtt_channel
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._reset_buffer()
def _send_packet(self, p_packet):
"""Sends the given udp_packet to this's host:port"""
self.udp_socket.sendto(p_packet, (self.udp_host, self.udp_port))
def _hex_string_to_hex_list(self, p_hex_string):
""" Converts a string with hex values to a corresponding list:
"ff0000ff000000ff00" -> ["ff0000", "ff0000", "00ff00"]"""
hex_list = []
length = len(p_hex_string)
if not length % 6 == 0:
raise ValueError("Invalid Hex-string: length does not divide by 6")
for i in range(0, length, 6):
hex_list.append(p_hex_string[i:i+6])
return hex_list
def _reset_buffer(self):
self.buffer = [(0, 0, 0) for _ in range(self.length)]
####################
###Public methods###
####################
def calc_fps(self, p_fps):
return STANDARD_WIDTH * p_fps / self.physikal_width
def send_hex_frame(self, p_hex):
"""Sends a frame described through hex. This methods consumes hex strings
"ff0000ff000000ff00" or hex list ["ff0000", "ff0000", "00ff00"]"""
if type(p_hex) == str:
hex = self._hex_string_to_hex_list(p_hex)
else:
hex = p_hex
rgb_list = []
for value in hex:
rgb_list.append(_hex_to_rgb(value))
self.send_rgb_frame(rgb_list)
def send_rgb_frame(self, p_rgb_list):
"""Sends the given rgb-list to this's host via udp.
The lenght of the list musst match the pixel count"""
length = len(p_rgb_list)
if length == self.length:
packet = _generate_packet(0, len(p_rgb_list)-1, p_rgb_list)
else:
raise ValueError("Given list length does not match pixel count!: Given %d, expected %d" % (length, self.length))
self._send_packet(packet)
def set_full_color(self, p_color):
"""Displays the given color on every pixel of the blinkenfoo"""
if type(p_color) == str:
color = _hex_to_rgb(p_color)
else:
color = p_color
rgb_list = [color for _ in range(self.length)]
self.send_rgb_frame(rgb_list)
def set_led(self, p_index, p_r, p_g, p_b):
if p_index > self.length:
raise ValueError("Out of range: given %d, length %d" % (p_index, self.length))
self.buffer[p_index] = (p_r, p_g, p_b)
def flush(self, p_reset = True):
self.send_rgb_frame(self.buffer)
if p_reset:
self._reset_buffer()
##################
###End of class###
##################
def _hex_to_rgb(p_hex):
"""Turns a hex value #123456 into rgb (18, 52, 86)"""
return tuple(int(p_hex[i:i+2], 16) for i in (0, 2 ,4))
def _generate_packet(startpixel, endpixel, rgb):
""" Generates a udp packet the blinkenfoo understands"""
pack = chr(0) + chr(startpixel) + chr(0) + chr(endpixel)
# pack = chr((startpixel >> 8) & 0xff) + chr(startpixel & 0xff) + chr((startpixel) >> 8) & 0xff) + chr(endpixel & 0xff)
for color in rgb:
r, g, b = color
pack += chr(r) + chr(g) + chr(b)
# pack += chr(r & 0xff) + chr(g & 0xff) + chr(b & 0xff)
return pack
spheres = Blinkenfoo("Spheres", "ESP_35D447.warpzone", 9, 20)
panel = Blinkenfoo("Panel", "ESP_35d9E4.warpzone", 8, 12)
warp_sign = Blinkenfoo("Warp-Sign", "ESP_133C4C.warpzone", 1, 25)
dmx = Blinkenfoo("DMX", "10.0.3.27", 5, 10)
cubes = Blinkenfoo("Cubes", "cubes.warpzone", 8, 5)
import socket
STANDARD_WIDTH = 3.5
class Blinkenfoo(object):
"""This class represents a blinkenfoo instance.
A blinkenfoo can display information send via upd and can listen to mqtt to play
preconfigured animations. It als knows how many pixels it has:
A pixel is one blinken-unit which can be controlled separately from the other
units. A pixel can be a single LED on a i.E. ws2812 or a whole stripe.
If the blinkenfoo is a led matrix then the pixel count is a*b.
The physikale width describes the width in cm of one blinkenunit/pixel. This is
to make animations run the same speed on different blinkenfoo devices"""
def __init__(self, p_name, p_udp_host, p_length, p_physikal_width = STANDARD_WIDTH, p_udp_port = 2390, p_mqtt_channel = None):
self.name = p_name
self.udp_host = p_udp_host
self.udp_port = p_udp_port
self.length = p_length
self.physikal_width = p_physikal_width
self.mqtt_channel = p_mqtt_channel
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._reset_buffer()
def _send_packet(self, p_packet):
"""Sends the given udp_packet to this's host:port"""
self.udp_socket.sendto(p_packet, (self.udp_host, self.udp_port))
def _hex_string_to_hex_list(self, p_hex_string):
""" Converts a string with hex values to a corresponding list:
"ff0000ff000000ff00" -> ["ff0000", "ff0000", "00ff00"]"""
hex_list = []
length = len(p_hex_string)
if not length % 6 == 0:
raise ValueError("Invalid Hex-string: length does not divide by 6")
for i in range(0, length, 6):
hex_list.append(p_hex_string[i:i+6])
return hex_list
def _reset_buffer(self):
self.buffer = [(0, 0, 0) for _ in range(self.length)]
####################
###Public methods###
####################
def calc_fps(self, p_fps):
return STANDARD_WIDTH * p_fps / self.physikal_width
def send_hex_frame(self, p_hex):
"""Sends a frame described through hex. This methods consumes hex strings
"ff0000ff000000ff00" or hex list ["ff0000", "ff0000", "00ff00"]"""
if type(p_hex) == str:
hex = self._hex_string_to_hex_list(p_hex)
else:
hex = p_hex
rgb_list = []
for value in hex:
rgb_list.append(_hex_to_rgb(value))
self.send_rgb_frame(rgb_list)
def send_rgb_frame(self, p_rgb_list):
"""Sends the given rgb-list to this's host via udp.
The lenght of the list musst match the pixel count"""
length = len(p_rgb_list)
if length == self.length:
packet = _generate_packet(0, len(p_rgb_list)-1, p_rgb_list)
else:
raise ValueError("Given list length does not match pixel count!: Given %d, expected %d" % (length, self.length))
self._send_packet(packet)
def set_full_color(self, p_color):
"""Displays the given color on every pixel of the blinkenfoo"""
if type(p_color) == str:
color = _hex_to_rgb(p_color)
else:
color = p_color
rgb_list = [color for _ in range(self.length)]
self.send_rgb_frame(rgb_list)
def set_led(self, p_index, p_r, p_g, p_b):
if p_index > self.length:
raise ValueError("Out of range: given %d, length %d" % (p_index, self.length))
self.buffer[p_index] = (p_r, p_g, p_b)
def flush(self, p_reset = True):
self.send_rgb_frame(self.buffer)
if p_reset:
self._reset_buffer()
##################
###End of class###
##################
def _hex_to_rgb(p_hex):
"""Turns a hex value #123456 into rgb (18, 52, 86)"""
return tuple(int(p_hex[i:i+2], 16) for i in (0, 2 ,4))
def _generate_packet(startpixel, endpixel, rgb):
""" Generates a udp packet the blinkenfoo understands"""
pack = chr(0) + chr(startpixel) + chr(0) + chr(endpixel)
# pack = chr((startpixel >> 8) & 0xff) + chr(startpixel & 0xff) + chr((startpixel) >> 8) & 0xff) + chr(endpixel & 0xff)
for color in rgb:
r, g, b = color
pack += chr(r) + chr(g) + chr(b)
# pack += chr(r & 0xff) + chr(g & 0xff) + chr(b & 0xff)
return pack
spheres = Blinkenfoo("Spheres", "ESP_35D447.warpzone", 9, 20)
panel = Blinkenfoo("Panel", "ESP_35d9E4.warpzone", 8, 12)
warp_sign = Blinkenfoo("Warp-Sign", "ESP_133C4C.warpzone", 1, 25)
dmx = Blinkenfoo("DMX", "10.0.3.27", 5, 10)
cubes = Blinkenfoo("Cubes", "cubes.warpzone", 8, 5)
""" This class represents all warp-ledstripes combined. Starting with cybars 0th pixel up to the 9th sphere. More blikenfoo will be added in the future"""
import blinkenfoo
from Color import Color
class Blinkenroom:
def __init__(self, p_device_list, p_fps = 42):
self.fps = p_fps
self.device_list = p_device_list
self._calc_total_length()
self._generate_index_map()
self.previous_device = None
self._remove_unreachable_devices()
def _remove_unreachable_devices(self):
for device in self.device_list:
if not device.is_reachable():
del self.device_list[self.device_list.index(device)]
def set_rgb_frame(self, p_rgb_frame, p_offset = 0, p_fps_indicator = 0, p_fps_setter = None):
frame_length = len(p_rgb_frame)
if not frame_length == self.total_length:
raise ValueError("Frame length does not match total length: Given %d, expected %d" % (frame_length, self.total_length))
for i in range(frame_length):
current_index = (i + p_offset) % frame_length
r, g, b = p_rgb_frame[current_index]
if not p_fps_setter == None and p_fps_indicator == current_index:
self.set_led(i, r, g, b, p_fps_setter)
else:
self.set_led(i, r, g, b)
self.flush()
def set_full_color(self, p_color):
for device in self.device_list:
device.set_full_color(p_color)
def set_led(self, p_index, p_r, p_g, p_b, p_fps_setter = None):
device_index = self.global_map[p_index]
device = self.device_list[device_index]
if device == None:
raise ValueError("Index is not mapped to any device (out of range)")
if not p_fps_setter == None:
if not self.previous_device == device:
fps = device.calc_fps(self.fps)
p_fps_setter(fps)
for i in range(device_index):
p_index -= self.device_list[i].length
device.set_led(p_index, (p_r, p_g, p_b))
def flush(self, p_reset = True):
for device in self.device_list:
device.flush(p_reset)
def _calc_total_length(self):
self.total_length = 0
for device in self.device_list:
self.total_length += device.length
return self.total_length
def _generate_index_map(self):
self.global_map = [0 for _ in range(self.total_length)]
current_index = 0
for d in range(len(self.device_list)):
device = self.device_list[d]
for i in range(device.length):
self.global_map[current_index + i] = d
current_index += device.length
#warp_lounge = Blinkenroom(blinkenfoo.panel, blinkenfoo.spheres, blinkenfoo.warp_sign)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment