Skip to content
Snippets Groups Projects
Commit 8d6c2eed authored by leo's avatar leo
Browse files

~various; Major: Runs on Python3 now

parent 54d16853
No related branches found
No related tags found
No related merge requests found
Color.py 0 → 100644
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
import colorsys
import numpy
class Color(object):
@staticmethod
def hex_to_rgb(hex):
"""Turns a hex value #123456 into rgb (18, 52, 86)"""
return tuple(int(hex[i:i+2], 16) for i in (0, 2 ,4))
red = 0
green = 0
blue = 0
hue = 0
saturation = 0
value = 0
def __init__(self, rgb=None, hsv=None, hex=None):
rgb_passed = type(rgb) is tuple
hsv_passed = type(hsv) is tuple
hex_passed = type(hex) is str
if rgb_passed and hsv_passed and hex_passed:
raise ValueError("Color can't be initialized with RGB, HSV and Hex at the same time.")
elif hsv_passed:
hue, saturation, value = hsv
if not hue:
hue = 0.0
if not saturation:
saturation = 0.0
if not value:
value = 0.0
self.hue = hue
self.saturation = saturation
self.value = value
elif rgb_passed:
red, green, blue = rgb
if not red:
red = 0
if not green:
green = 0
if not blue:
blue = 0
self.red = red
self.green = green
self.blue = blue
elif hex_passed:
red, green, blue = Color.hex_to_rgb(hex)
self.red = red
self.green = green
self.blue = blue
else:
raise ValueError("No format was specified (RGB, HSV, Hex)")
def _update_hsv(self):
hue, saturation, value = colorsys.rgb_to_hsv(self.red/255.0, self.green/255.0, self.blue/255.0)
super(Color, self).__setattr__('hue', hue * 360.0)
super(Color, self).__setattr__('saturation', saturation)
super(Color, self).__setattr__('value', value)
def _update_rgb(self):
red, green, blue = colorsys.hsv_to_rgb(self.hue / 360.0, self.saturation, self.value)
super(Color, self).__setattr__('red', red * 255.0)
super(Color, self).__setattr__('green', green * 255.0)
super(Color, self).__setattr__('blue', blue * 255.0)
def __setattr__(self, key, value):
if key in ('red', 'green', 'blue'):
if value > 255.0:
value = value % 255.0
super(Color, self).__setattr__(key, value)
self._update_hsv()
elif key in ('hue', 'saturation', 'value'):
if key == 'hue' and (value >= 360.0 or value < 0):
value = value % 360.0
elif key != 'hue' and value > 1.0:
value = 1.0
super(Color, self).__setattr__(key, value)
self._update_rgb()
else:
super(Color, self).__setattr__(key, value)
def __repr__(self):
return '<%s: red %f, green %f, blue %f, hue %f, saturation %f, value %f>' % (
self.__class__.__name__,
self.red,
self.green,
self.blue,
self.hue,
self.saturation,
self.value,
)
def __str__(self):
return "%d %d %d" % (
int(self.red),
int(self.green),
int(self.blue),
)
def copy(self):
return Color(self.red, self.green, self.blue)
def hex(self):
return "%.2X%.2X%.2X" % (self.red, self.green, self.blue)
def rgb_tuple(self, p_int=False):
if p_int:
return (int(self.red), int(self.green), int(self.blue))
else:
return (self.red, self.green, self.blue)
import time
from threading import Thread
from blinkenfoo import DMX, WARP_SIGN, SPHERES, PANEL, CUBES
class Animation(Thread):
"""
An animation describes transition of pixels on various blinkenfoos. The
blinkenfoo can not be changed after the animation has been initialized. It
is possible to start an animation with a different blinkenroom everytime.
"""
required_args = {}
def __init__(self):
Thread.__init__(self)
self.name = None
self.args = None
self.blinkenroom = None
self._frame = None
self._fps = 0
self._time_should = 0
self._currently_adjusted_for = None
self._running = False
self._paused = False
self._last_cycle = time.time()
self.set_fps(30)
def set_args(self, p_args):
self.args = p_args
def calc_devices_adjusted_fps(self, p_fps):
for device in self.blinkenroom.device_list:
device.calc_fps(p_fps)
def adjust_fps_for_device(self, p_device):
adjusted_fps = p_device.current_fps
if not self._currently_adjusted_for == p_device:
self.set_fps(adjusted_fps)
self._currently_adjusted_for = p_device
def adjust_fps_for_index(self, p_index):
device = self.blinkenroom.get_device_by_index(p_index)
self.adjust_fps_for_device(device)
def cycle(self):
pass
def set_fps(self, p_fps):
self._fps = p_fps
self._time_should = 1 / float(self._fps)
def run(self):
"""
Code stolen from olel
"""
self._running = True
while self._running:
if self._paused:
continue
self.cycle()
if self._fps:
time_spend = time.time() - self._last_cycle
sleep_time = self._time_should - time_spend
if sleep_time > 0:
time.sleep(sleep_time)
self._last_cycle = time.time()
def stop(self):
self._running = False
def pause(self):
self._paused = True
def resume(self):
self._paused = False
from animation import Animation
from blinkenroom import Blinkenroom
import blinkenroom
import animation
import time
from darken import darken
class AnimationHandler(object):
ERROR_ANIMATION_RUNNING = 100
ERROR_ANIMATION_NOT_FOUND = 101
ERROR_OVERLAPPING = 102
def __init__(self):
self.running_animations = {}
self.known_animations = {}
self.last_frame = None
#######################
###Animation control###
######################
def stop_animation(self, p_name):
if p_name in self.running_animations:
animation = self.running_animations[p_name]
print("Stopping %s" % p_name)
animation.stop()
del self.running_animations[p_name]
def stop_all_animations(self, p_args=None):
for running in self.running_animations.values():
running.stop()
self.running_animations = {}
self.last_frame = blinkenroom.BLINKENROOM_LOUNGE.get_last_frame()
def pause_animation(self, p_animation_name):
if p_animation_name in self.running_animations:
animation = self.running_animations[p_animation_name]
animation.pause()
def pause_all_animations(self):
for animation in self.running_animations.values():
animation.pause()
def resume_animation(self, p_animation_name):
if p_animation_name in self.running_animations:
animation = self.running_animations[p_animation_name]
animation.resume()
def resume_all_animations(self):
for animation in self.running_animations.values():
animation.resume()
def start_animation(self, p_name, p_animation_args):
#def start_animation(self, p_name):
p_name = p_name.lower()
#if p_name in self.running_animations:
# print("Animation already running")
# return self.ERROR_ANIMATION_RUNNING
animation_class = None
try:
exec("from %s import %s" % (p_name, p_name))
animation_class = eval(p_name)
except ImportError:
print("Animation not found '%s'" % p_name)
return self.ERROR_ANIMATION_NOT_FOUND
animation = animation_class()
required_args = animation.required_args
for arg in required_args:
required = required_args[arg]
if arg not in p_animation_args:
p_animation_args[arg] = required[1]
else:
casting_method = required[0]
current_arg = p_animation_args[arg]
p_animation_args[arg] = casting_method(current_arg)
animation.set_args(p_animation_args)
animation.init_animation()
to_stop = []
for running in self.running_animations.values():
if animation.blinkenroom.is_overlapping(running.blinkenroom):
to_stop.append(running.name)
for running in to_stop:
self.stop_animation(running)
self.running_animations[p_name] = animation
animation.start()
#################
###Light-Level###
#################
def darken(self, p_args):
self.stop_all_animations()
darken_dicts = {
"frame": self.last_frame,
"rate": 30,
"steps": 100
}
animation = darken()
animation.set_args(darken_dicts)
animation.init_animation()
animation.start()
import socket
from Color import Color
STANDARD_WIDTH = 3.5
COLOR_BLACK = Color((0, 0, 0))
class Blinkenfoo(object):
"""This class represents a blinkenfoo instance.
A blinkenfoo can display information send via upd and can listen to mqtt to play
preconfigured animations. It als knows how many pixels it has:
A pixel is one blinken-unit which can be controlled separately from the other
units. A pixel can be a single LED on a i.E. ws2812 or a whole stripe.
If the blinkenfoo is a led matrix then the pixel count is a*b.
The physical width describes the width in cm of one blinkenunit/pixel. This is
to make animations run the same speed on different blinkenfoo devices"""
def __init__(self, p_name, p_udp_host, p_length, p_physikal_width = STANDARD_WIDTH, p_udp_port = 2390, p_brightness = 100):
self.name = p_name
self.udp_host = p_udp_host
self.udp_port = p_udp_port
self.length = p_length
self.physikal_width = p_physikal_width
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.buffer = None
self._reset_buffer()
self.current_fps = None
if p_brightness < 100:
self.brightness_factor = (100.0 / p_brightness)
else:
self.brightness_factor = 1
def _split_to_byte(self, p_number):
"""
Splits an integer into an array of two bytes. the lower bytes are in [1]
"""
return [(p_number >> 8) & 0xFF, p_number & 0xFF]
#return [0, p_number]
def _generate_package(self, startpixel, endpixel, rgb):
""" Generates a udp package the blinkenfoo understands"""
pack = bytearray()
pack.extend(self._split_to_byte(startpixel))
pack.extend(self._split_to_byte(endpixel))
for color in rgb:
r, g, b = color
pack.extend([r, g, b])
return pack
def _send_package(self, p_package):
"""Sends the given udp_package to this's host:port"""
try:
self.udp_socket.sendto(p_package, (self.udp_host, self.udp_port))
except socket.error:
pass
def _reset_buffer(self):
"""
Sets the content of the buffer to black.
"""
self.buffer = [COLOR_BLACK for _ in range(self.length)]
####################
###Public methods###
####################
def is_reachable(self):
"""
Checks wether a blinkenfoo is reachable or not. sends a dummy package.
"""
rgb_frame = [(0, 0, 0) for _ in range(self.length)]
package = self._generate_package(0, 1, [(0, 0, 0)])
try:
self.udp_socket.sendto(package, (self.udp_host, self.udp_port))
return True
except socket.error:
return False
def calc_fps(self, p_fps):
"""
Calculates fps according to this blinkenfoo width in order for an
animation to run the same "speed" on every device.
"""
return STANDARD_WIDTH * p_fps / self.physikal_width
def send_frame(self, p_frame):
"""
Sends an frame to the blinkenfoo. A frame is a list of color objects.
Other types may cause erros later in the programm. The length of the
frame must match the pixel count of this blinkenfoo.
"""
length = len(p_frame)
if length == self.length:
rgb_frame = []
self.buffer = p_frame
for i in range(len(p_frame)):
r, g, b = p_frame[i].rgb_tuple(True)
r = int(r / self.brightness_factor)
g = int(g / self.brightness_factor)
b = int(b / self.brightness_factor)
rgb_frame.append((r, g, b))
package = self._generate_package(0, length-1, rgb_frame)
else:
raise ValueError("Given list length does not match pixel count!: Given %d, expected %d" % (length, self.length))
self._send_package(package)
def set_full_color(self, p_color):
"""Displays the given color on every pixel of the blinkenfoo"""
rgb_list = [p_color for _ in range(self.length)]
self.send_frame(rgb_list)
def set_pixel(self, p_index, p_color):
"""Sets an specific color at the given index. The change is visible after
flush is called"""
if p_index > self.length:
raise ValueError("Out of range: given %d, length %d" % (p_index, self.length))
self.buffer[p_index] = p_color
def flush(self, p_reset=False):
"""
Sends the content of the buffer to the blinkenfoo. If p_reset = True
then the buffer is resetet (see _reset_buffer())
"""
self.send_frame(self.buffer)
if p_reset:
self._reset_buffer()
##################
###End of class###
##################
SPHERES = Blinkenfoo("Spheres", "ESP_35D447.warpzone", 9, 20)
PANEL = Blinkenfoo("Panel", "ESP_35d9E4.warpzone", 8, 12)
WARP_SIGN = Blinkenfoo("Warp-Sign", "ESP_133C4C.warpzone", 1, 25)
DMX = Blinkenfoo("DMX", "10.0.3.27", 5, 10, p_brightness = 50)
CUBES = Blinkenfoo("Cubes", "cubes.warpzone", 8, 5)
TISCH = Blinkenfoo("Tisch", "tisch.warpzone", 700)
DEVICE_LIST_LOUNGE = [DMX, WARP_SIGN, SPHERES, PANEL, CUBES, TISCH]
#DEVICE_LIST_LOUNGE = [PANEL]
#DEVICE_LIST_LOUNGE = [TISCH]
import blinkenfoo
from Color import Color
class Blinkenroom:
def __init__(self, p_device_list, p_fps = 42):
self.fps = p_fps
self.device_list = p_device_list
self._calc_total_length()
self._generate_index_map()
self.previous_device = None
self._remove_unreachable_devices()
#####################
###private methods###
#####################
def _remove_unreachable_devices(self):
"""
Removes devices that are not reachable from the devices list
"""
for device in self.device_list:
if not device.is_reachable():
print(device.name + " is not available")
del self.device_list[self.device_list.index(device)]
def _calc_total_length(self):
self.total_length = 0
for device in self.device_list:
self.total_length += device.length
return self.total_length
def _generate_index_map(self):
self.global_map = [0 for _ in range(self.total_length)]
current_index = 0
for device in self.device_list:
for i in range(device.length):
self.global_map[current_index + i] = device
current_index += device.length
####################
###public methods###
####################
def send_frame(self, p_frame, p_offset = 0):
"""
Sends an whole frame to all blinkenfoo devices. The frames length has to
match all blinkenfoos length combined.
"""
frame_length = len(p_frame)
#TODO Make this more efficient -> use blinkenfoo's send_frame, dont send
#a frame pixel wise, as every single pixel has to be mapped to a
#blinkenfoo
if not frame_length == self.total_length:
raise ValueError("Frame length does not match total length: Given %d, expected %d" % (frame_length, self.total_length))
for i in range(frame_length):
current_index = (i + p_offset) % frame_length
pixel = p_frame[current_index]
self.set_pixel(i, pixel)
self.flush()
def set_full_color(self, p_color):
"""
Sets all blinkenfoo to the given color.
"""
for device in self.device_list:
device.set_full_color(p_color)
def set_pixel(self, p_index, p_color):
"""
Sets a single pixel. The index is mapped to a blinkenfoo in the
blinkenroom.
"""
device = self.global_map[p_index]
if device == None:
raise ValueError("Index is not mapped to any device (out of range)")
# Mapping index to device
# (Substracting foregone device length)
for i in range(len(self.device_list)):
current_device = self.device_list[i]
if current_device == device:
break
p_index -= current_device.length
device.set_pixel(p_index, p_color)
def flush(self, p_reset=False):
"""
Flushes the changes made in the buffer (blinkefoo buffers, a
blinkenroom has no buffer).
"""
for device in self.device_list:
device.flush(p_reset)
def get_device_by_index(self, p_index):
"""
Returns the blinkenfoo that the given index is mapped on
"""
if p_index > self.total_length:
raise ValueError("Index out of range:%d > %d" % (p_index, self.total_length))
return self.global_map[p_index]
def is_overlapping(self, p_blinkeroom):
"""
Checks wether a blinkenroom's devices overlap with this's devices
"""
return len(set(self.device_list).intersection(p_blinkeroom.device_list)) is not 0
def get_last_frame(self):
"""
Returns the buffer of all blinkenfoos concatenated. Does not work if
flush(True) is invoked, since this will clear the buffer after sending.
"""
frame = []
for device in self.device_list:
frame += device.buffer
return frame
BLINKENROOM_LOUNGE = Blinkenroom(blinkenfoo.DEVICE_LIST_LOUNGE)
import animation
import blinkenfoo
from animation import Animation
from blinkenroom import Blinkenroom
class darken(Animation):
def init_animation(self):
self.frame = self.args["frame"]
self.rate = float(self.args["rate"])
self.steps = float(self.args["steps"])
self.blinkenroom = Blinkenroom(blinkenfoo.DEVICE_LIST_LOUNGE)
self.name = "darken"
self._calc_pixel_delta()
def cycle(self):
is_dark = True
for i in range(len(self.frame)):
delta = self.delta_list[i]
pixel = self.frame[i]
value = pixel.value
if value <= 0:
continue
else:
is_dark = False
pixel.__setattr__("value", value - delta)
self.frame[i] = pixel
self.blinkenroom.send_frame(self.frame)
if is_dark:
self.stop()
def _calc_pixel_delta(self):
delta_list = []
self.factor = 100.0 / self.rate
for i in range(len(self.frame)):
pixel = self.frame[i]
value = pixel.value
new_value = value / self.factor
delta_per_step = (value - new_value) / self.steps
delta_list.append(delta_per_step)
self.delta_list = delta_list
flash.py 0 → 100644
from animation import Animation
from blinkenroom import BLINKENROOM_LOUNGE
import random
from Color import Color
import time
class flash(Animation):
required_args = {
"wait_range": [int, 100],
"stay": [float, 0.1]
}
def init_animation(self):
self.name = "flash"
self.wait_range = int(self.args["wait_range"])
self.stay = float(self.args["stay"])
self.wait = random.randrange(self.wait_range)
self.blinkenroom = BLINKENROOM_LOUNGE
self.white = Color((255, 255, 255))
def cycle(self):
if self.wait > 0:
self.wait -= 1
else:
index = random.randrange(self.blinkenroom.total_length)
self.blinkenroom.set_pixel(index, self.white)
#flushing and setting every pixel to black
self.blinkenroom.flush(True)
time.sleep(self.stay)
self.blinkenroom.flush()
self.wait = random.randrange(self.wait_range)
mqtt.py 0 → 100644
import paho.mqtt.client as mqtt
from animation_handler import AnimationHandler
handler = AnimationHandler()
command_handler_dict = {}
client = mqtt.Client()
def on_connect(c, u, f, rc):
print("connected")
client.subscribe("light/blinkenfoo")
def on_message(client, userdata, p_message):
p_message = p_message.payload.decode("UTF-8")
print("Received message:", p_message)
args = p_message.split(" ")
if args[0] in command_handler_dict:
command_handler = command_handler_dict[args[0]]
command_handler(args[1:])
client.on_connect = on_connect
client.on_message = on_message
def handler_start_animation(p_args):
if len(p_args) > 1 and len(p_args) % 2 == 0:
print("Animation args count invalid")
return
animation_name = p_args[0]
animation_args_dict = {}
for i in range(1, len(p_args), 2):
key = str(p_args[i])
value = p_args[i+1]
animation_args_dict[key] = value
handler.start_animation(animation_name, animation_args_dict)
def handler_pause(p_args):
length = len(p_args)
if length is 0:
handler.pause_all_animations()
else:
args = p_args[1:]
for name in args:
handler.pause_animation(name)
def handler_resume(p_args):
length = len(p_args)
if length is 0:
handler.resume_all_animations()
else:
args = p_args[1:]
for name in args:
handler.resume_animation(name)
command_handler_dict["play"] = handler_start_animation
command_handler_dict["exit"] = handler.stop_all_animations
command_handler_dict["resume"] = handler_resume
command_handler_dict["pause"] = handler_pause
command_handler_dict["darken"] = handler.darken
client.connect("warpsrvint.warpzone", 1883, 60)
client.loop_forever()
from animation import Animation
import blinkenfoo
from blinkenroom import Blinkenroom
from Color import Color
import random
class Blink(object):
def __init__(self, p_index):
self.index = p_index
hue = random.randrange(360)
self.color = Color(hsv=(hue,1,0))
self.step_per_cycle = 1.0/ random.randrange(5, 25)
self.rising = 1
self.dest_value = 1
def step(self):
current_value = self.color.value
if current_value >= self.dest_value:
self.rising = -1
next_value = current_value + self.step_per_cycle * self.rising
if next_value < 0:
next_value = 0
self.color.__setattr__("value", next_value)
if next_value <= 0:
return True
return False
class random_color(Animation):
def init_animation(self):
self.blinkenroom = Blinkenroom(blinkenfoo.DEVICE_LIST_LOUNGE)
self.set_fps(20)
self.blink_dict = {}
self.name = "random_color"
def cycle(self):
for _ in range(3):
index = random.randrange(self.blinkenroom.total_length)
if index not in self.blink_dict:
blink = Blink(index)
self.blink_dict[index] = blink
to_pop =[]
for blink in self.blink_dict.values():
result = blink.step()
index = blink.index
if result:
to_pop.append(index)
self.blinkenroom.set_pixel(index, blink.color)
for index in to_pop:
self.blink_dict.pop(index, None)
self.blinkenroom.flush()
from animation import Animation
from Color import Color
from blinkenroom import Blinkenroom
import blinkenfoo
import time
class sirene(Animation):
def init_animation(self):
red = Color((255, 0, 0))
blue = Color((0, 0, 255))
white = Color((255, 255, 255))
black = Color((0, 0, 0))
grey = Color((32, 32, 32))
self.flash_left = [
[red, blue, red, grey, grey, black, black, black],
[white, white, white, grey, grey, black, black, black],
[blue, red, blue, grey, grey, black, black, black],
[white, white, white, grey, grey, black, black, black],
[red, blue, red, grey, grey, black, black, black],
[white, white, white, grey, grey, black, black, black]
]
self.flash_right = [
[black, black, black, grey, grey, red, blue, red],
[black, black, black, grey, grey, white, white, white],
[black, black, black, grey, grey, blue, red, blue],
[black, black, black, grey, grey, white, white, white],
[black, black, black, grey, grey, red, blue, red],
[black, black, black, grey, grey, white, white, white]
]
self.states = [
self.flash_right,
self.flash_left
]
self.counter = 0
self.name = "sirene"
self.blinkenroom = Blinkenroom([blinkenfoo.CUBES])
self.set_fps(5)
def cycle(self):
i = self.counter % 2
state = self.states[i]
for line in state:
self.blinkenroom.send_frame(line)
time.sleep(0.06)
self.counter += 1
from blinkenfoo import Blinkenfoo
from blinkenfoo import spheres, warp_sign, panel, cubes
from blinkenfoo import DMX, WARP_SIGN, SPHERES, PANEL, CUBES
from blinkenroom import Blinkenroom
from Color import Color
from animation import Animation
import animation
device_list = [spheres, panel, cubes, warp_sign]
blinkenroom = Blinkenroom(device_list)
blinkenroom.set_full_color("ff0000")
device_list = [DMX, WARP_SIGN, SPHERES, PANEL, CUBES]
blinkenroom = Blinkenroom(animation.DEVICE_LIST_LOUNGE)
#blinkenroom.set_full_color("ff0000")
class test(Animation):
def init_animation(self):
self.counter = 0
self.color = Color(hsv=(0, 1, 1))
self.blinkenroom = blinkenroom
self.name = "test"
self.set_fps(self.args["fps"])
def cycle(self):
self.color.__setattr__("hue", self.counter % 360)
self.blinkenroom.set_full_color(self.color)
self.counter += 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment