import socket STANDARD_WIDTH = 3.5 class Blinkenfoo(object): """This class represents a blinkenfoo instance. A blinkenfoo can display information send via upd and can listen to mqtt to play preconfigured animations. It als knows how many pixels it has: A pixel is one blinken-unit which can be controlled separately from the other units. A pixel can be a single LED on a i.E. ws2812 or a whole stripe. If the blinkenfoo is a led matrix then the pixel count is a*b. The physikale width describes the width in cm of one blinkenunit/pixel. This is to make animations run the same speed on different blinkenfoo devices""" def __init__(self, p_name, p_udp_host, p_length, p_physikal_width = STANDARD_WIDTH, p_udp_port = 2390, p_mqtt_channel = None): self.name = p_name self.udp_host = p_udp_host self.udp_port = p_udp_port self.length = p_length self.physikal_width = p_physikal_width self.mqtt_channel = p_mqtt_channel self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self._reset_buffer() def _send_packet(self, p_packet): """Sends the given udp_packet to this's host:port""" self.udp_socket.sendto(p_packet, (self.udp_host, self.udp_port)) def _hex_string_to_hex_list(self, p_hex_string): """ Converts a string with hex values to a corresponding list: "ff0000ff000000ff00" -> ["ff0000", "ff0000", "00ff00"]""" hex_list = [] length = len(p_hex_string) if not length % 6 == 0: raise ValueError("Invalid Hex-string: length does not divide by 6") for i in range(0, length, 6): hex_list.append(p_hex_string[i:i+6]) return hex_list def _reset_buffer(self): self.buffer = [(0, 0, 0) for _ in range(self.length)] #################### ###Public methods### #################### def calc_fps(self, p_fps): return STANDARD_WIDTH * p_fps / self.physikal_width def send_hex_frame(self, p_hex): """Sends a frame described through hex. This methods consumes hex strings "ff0000ff000000ff00" or hex list ["ff0000", "ff0000", "00ff00"]""" if type(p_hex) == str: hex = self._hex_string_to_hex_list(p_hex) else: hex = p_hex rgb_list = [] for value in hex: rgb_list.append(_hex_to_rgb(value)) self.send_rgb_frame(rgb_list) def send_rgb_frame(self, p_rgb_list): """Sends the given rgb-list to this's host via udp. The lenght of the list musst match the pixel count""" length = len(p_rgb_list) if length == self.length: packet = _generate_packet(0, len(p_rgb_list)-1, p_rgb_list) else: raise ValueError("Given list length does not match pixel count!: Given %d, expected %d" % (length, self.length)) self._send_packet(packet) def set_full_color(self, p_color): """Displays the given color on every pixel of the blinkenfoo""" if type(p_color) == str: color = _hex_to_rgb(p_color) else: color = p_color rgb_list = [color for _ in range(self.length)] self.send_rgb_frame(rgb_list) def set_led(self, p_index, p_r, p_g, p_b): if p_index > self.length: raise ValueError("Out of range: given %d, length %d" % (p_index, self.length)) self.buffer[p_index] = (p_r, p_g, p_b) def flush(self, p_reset = True): self.send_rgb_frame(self.buffer) if p_reset: self._reset_buffer() ################## ###End of class### ################## def _hex_to_rgb(p_hex): """Turns a hex value #123456 into rgb (18, 52, 86)""" return tuple(int(p_hex[i:i+2], 16) for i in (0, 2 ,4)) def _generate_packet(startpixel, endpixel, rgb): """ Generates a udp packet the blinkenfoo understands""" pack = chr(0) + chr(startpixel) + chr(0) + chr(endpixel) # pack = chr((startpixel >> 8) & 0xff) + chr(startpixel & 0xff) + chr((startpixel) >> 8) & 0xff) + chr(endpixel & 0xff) for color in rgb: r, g, b = color pack += chr(r) + chr(g) + chr(b) # pack += chr(r & 0xff) + chr(g & 0xff) + chr(b & 0xff) return pack spheres = Blinkenfoo("Spheres", "ESP_35D447.warpzone", 9, 20) panel = Blinkenfoo("Panel", "ESP_35d9E4.warpzone", 8, 12) warp_sign = Blinkenfoo("Warp-Sign", "ESP_133C4C.warpzone", 1, 25) dmx = Blinkenfoo("DMX", "10.0.3.27", 5, 10) cubes = Blinkenfoo("Cubes", "cubes.warpzone", 8, 5)