Skip to content
Snippets Groups Projects
Commit d64c6a70 authored by leo's avatar leo
Browse files

Copied form source repo

parent c8e2b1b0
No related branches found
No related tags found
No related merge requests found
from packets.output_packet import DmxPacket
from packets.poll_packet import PollPacket
pp = PollPacket()
p = DmxPacket([128, 0, 0])
e = p.encode()
print("bytearray(" + str(e))
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
.connect(("10.0.2.22", 6454))
s.send(e)
#s.send(pp.encode())
b = bytearray()
b.extend("Art-Net\x00".encode())
b.extend([0x00])
b.extend([0x0050, 0, 14, 0, 0, 0, 0, 0, 0x02, 255, 0, 0])
print(b)
s.send(b)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import bitstring
class ArtNetPacket(object):
"""
This is the super class of all art-net packets. It contains basic functionality and
some fields needed for creating packets.
"""
opcode = None
schema = ()
opcode_map = dict()
header = 'Art-Net\0'
protocol_version = 14
filler1 = 42
filler2 = 32
filler4 = 0
@classmethod
def register(cls, packet_class):
"""
Registers a packet. It maps the opcode to the given implementation of the packet
"""
cls.opcode_map[packet_class.opcode] = packet_class
return packet_class
@classmethod
def get_available_packets(cls):
"""
Returns a list of all registered packets that are available for use.
"""
return list(cls.opcode_map.values())
@classmethod
def decode(cls, address, data):
"""
Decodes a package
"""
# Retreives the opcode and looks for matching packets.
[opcode] = struct.unpack('!H', data[8:10])
if(opcode not in cls.opcode_map):
raise NotImplementedError('%x' % opcode)
# Iterates through the schema of the corresponding class
klass = cls.opcode_map[opcode]
b = bitstring.BitStream(bytes=data)
fields = dict()
for name, fmt in klass.schema:
# If there is a parse function for value the function is called
accessor = getattr(klass, 'parse_%s' % name, None)
if(callable(accessor)):
fields[name] = accessor(b, fmt)
else:
# Else the value is read directly from the bitstream
fields[name] = b.read(fmt)
# Creating an instance of the packet-class and set the values
p = klass(address=address)
for k,v in fields.items():
setattr(p, k, v)
return p
def __init__(self, address=None, sequence=0, physical=0, universe=0):
self.address = address
self.sequence = sequence
self.physical = physical
self.universe = universe
for name, fmt in self.schema:
if not(hasattr(self, name)):
setattr(self, name, 0)
def __str__(self):
return '<%(klass)s from %(address)s:%(universe)s/%(physical)s>' % dict(
klass = self.__class__.__name__,
address = self.address,
universe = self.universe,
physical = self.physical,
)
def encode(self):
"""
Encodes a package into a bytearray.
"""
fields = []
# Iterates through all entries of the schema
for name, fmt in self.schema:
# If there is a function to access the value, the function is called
# else the value is revtreived directly
accessor = getattr(self, 'format_%s' % name, '\0')
if(callable(accessor)):
value = accessor()
else:
value = getattr(self, name)
# Store values in array
fields.append([name, fmt, value])
# Builds a bytearray to send as packet
fmt = ', '.join(['='.join([f,n]) for n,f,v in fields])
data = dict([(n,v) for n,f,v in fields])
return bitstring.pack(fmt, **data).tobytes()
STANDARD_PORT = 6454
OPCODES = dict(
# This is an ArtPoll packet, no other data is contained in this UDP packet.
OpPoll = 0x0020,
# This is an ArtPollReply Packet. It contains device status information.
OpPollReply = 0x0021,
# Diagnostics and data logging packet.
OpDiagData = 0x0023,
# Used to send text based parameter commands.
OpCommand = 0x0024,
# This is an ArtDmx data packet. It contains zero start code DMX512 information for a single Universe.
OpOutput = 0x0050,
# This is an ArtDmx data packet. It contains zero start code DMX512 information for a single Universe.
OpDmx = 0x0050,
# This is an ArtNzs data packet. It contains non-zero start code (except RDM) DMX512 information for a single Universe.
OpNzs = 0x0051,
# This is an ArtAddress packet. It contains remote programming information for a Node.
OpAddress = 0x0060,
# This is an ArtInput packet. It contains enable disable data for DMX inputs.
OpInput = 0x0070,
# This is an ArtTodRequest packet. It is used to request a Table of Devices (ToD) for RDM discovery.
OpTodRequest = 0x0080,
# This is an ArtTodData packet. It is used to send a Table of Devices (ToD) for RDM discovery.
OpTodData = 0x0081,
# This is an ArtTodControl packet. It is used to send RDM discovery control messages.
OpTodControl = 0x0082,
# This is an ArtRdm packet. It is used to send all non discovery RDM messages.
OpRdm = 0x0083,
# This is an ArtRdmSub packet. It is used to send compressed, RDM Sub-Device data.
OpRdmSub = 0x0084,
# This is an ArtVideoSetup packet. It contains video screen setup information for nodes that implement the extended video features.
OpVideoSetup = 0x10a0,
# This is an ArtVideoPalette packet. It contains colour palette setup information for nodes that implement the extended video features.
OpVideoPalette = 0x20a0,
# This is an ArtVideoData packet. It contains display data for nodes that implement the extended video features.
OpVideoData = 0x40a0,
# This is an ArtMacMaster packet. It is used to program the Node's MAC address, Oem device type and ESTA manufacturer code.
# This is for factory initialisation of a Node. It is not to be used by applications.
OpMacMaster = 0x00f0,
# This is an ArtMacSlave packet. It is returned by the node to acknowledge receipt of an ArtMacMaster packet.
OpMacSlave = 0x00f1,
# This is an ArtFirmwareMaster packet. It is used to upload new firmware or firmware extensions to the Node.
OpFirmwareMaster = 0x00f2,
# This is an ArtFirmwareReply packet. It is returned by the node to acknowledge receipt of an ArtFirmwareMaster packet or ArtFileTnMaster packet.
OpFirmwareReply = 0x00f3,
# Uploads user file to node.
OpFileTnMaster = 0x00f4,
# Downloads user file from node.
OpFileFnMaster = 0x00f5,
# Node acknowledge for downloads.
OpFileFnReply = 0x00f6,
# This is an ArtIpProg packet. It is used to reprogramme the IP, Mask and Port address of the Node.
OpIpProg = 0x00f8,
# This is an ArtIpProgReply packet. It is returned by the node to acknowledge receipt of an ArtIpProg packet.
OpIpProgReply = 0x00f9,
# This is an ArtMedia packet. It is Unicast by a Media Server and acted upon by a Controller.
OpMedia = 0x0090,
# This is an ArtMediaPatch packet. It is Unicast by a Controller and acted upon by a Media Server.
OpMediaPatch = 0x0091,
# This is an ArtMediaControl packet. It is Unicast by a Controller and acted upon by a Media Server.
OpMediaControl = 0x0092,
# This is an ArtMediaControlReply packet. It is Unicast by a Media Server and acted upon by a Controller.
OpMediaContrlReply = 0x0093,
# This is an ArtTimeCode packet. It is used to transport time code over the network.
OpTimeCode = 0x0097,
# Used to synchronise real time date and clock
OpTimeSync = 0x0098,
# Used to send trigger macros
OpTrigger = 0x0099,
# Requests a node's file list
OpDirectory = 0x009a,
# Replies to OpDirectory with file list
OpDirectoryReply = 0x9b00
)
NODE_REPORT_CODES = dict(
RcDebug = ('0x0000', "Booted in debug mode"),
RcPowerOk = ('0x0001', "Power On Tests successful"),
RcPowerFail = ('0x0002', "Hardware tests failed at Power On"),
RcSocketWr1 = ('0x0003', "Last UDP from Node failed due to truncated length, Most likely caused by a collision."),
RcParseFail = ('0x0004', "Unable to identify last UDP transmission. Check OpCode and packet length."),
RcUdpFail = ('0x0005', "Unable to open Udp Socket in last transmission attempt"),
RcShNameOk = ('0x0006', "Confirms that Short Name programming via ArtAddress, was successful."),
RcLoNameOk = ('0x0007', "Confirms that Long Name programming via ArtAddress, was successful."),
RcDmxError = ('0x0008', "DMX512 receive errors detected."),
RcDmxUdpFull = ('0x0009', "Ran out of internal DMX transmit buffers."),
RcDmxRxFull = ('0x000a', "Ran out of internal DMX Rx buffers."),
RcSwitchErr = ('0x000b', "Rx Universe switches conflict."),
RcConfigErr = ('0x000c', "Product configuration does not match firmware."),
RcDmxShort = ('0x000d', "DMX output short detected. See GoodOutput field."),
RcFirmwareFail = ('0x000e', "Last attempt to upload new firmware failed."),
RcUserFail = ('0x000f', "User changed switch settings when address locked by remote programming. User changes ignored.")
)
STYLE_CODES = dict(
# A DMX to / from Art-Net device
StNode = 0x00,
# A lighting console.
StController = 0x01,
# A Media Server.
StMedia = 0x02,
# A network routing device.
StRoute = 0x03,
# A backup device.
StBackup = 0x04,
# A configuration or diagnostic tool.
StConfig = 0x05,
# A visualiser.
StVisual = 0x06
)
\ No newline at end of file
File added
File added
File added
from packets import ArtNetPacket
from packets import OPCODES, STANDARD_PORT, STYLE_CODES
from packets import ArtNetPacket
from packets import OPCODES, STANDARD_PORT, STYLE_CODES
@ArtNetPacket.register
class IpProgPacket(ArtNetPacket):
opcode = OPCODES['OpIpProg']
schema = (
('header', 'bytes:8'),
('opcode', 'int:16'),
('protocol_version', 'uintbe:16'),
('filler1', 'bytes:8'),
('filler2', 'bytes:8'),
('command', 'bytes:8'),
('filler4', 'bytes:8'),
)
\ No newline at end of file
from packets import ArtNetPacket
from packets import OPCODES, STANDARD_PORT, STYLE_CODES
import bitstring
@ArtNetPacket.register
class DmxPacket(ArtNetPacket):
opcode = OPCODES['OpDmx']
schema = (
('header', 'bytes:8'),
('opcode', 'int:16'),
('protocol_version', 'uintbe:16'),
('sequence', 'int:8'),
('physical', 'int:8'),
('universe', 'uintle:16'),
('length', 'uintbe:16'),
('framedata', 'bytes')
)
def __init__(self, frame=None, **kwargs):
super(DmxPacket, self).__init__(**kwargs)
self.frame = frame
#@classmethod
#def parse_framedata(cls, b, fmt):
#from artnet import dmx
#return dmx.Frame([ord(x) for x in b.read('bytes:512')])
def format_length(self):
return len(self.frame)
def format_framedata(self):
return ''.join([chr(i or 0) for i in self.frame])
def __str__(self):
return '<DMX(%(sequence)s): %(channels)s>' % dict(
sequence = self.sequence,
channels = ', '.join([
'%s: %s' % (
address + 1,
self.frame[address]
) for address in range(len(self.frame)) if self.frame[address]
])
)
\ No newline at end of file
import uuid
from packets import ArtNetPacket
from packets import OPCODES, STANDARD_PORT, STYLE_CODES
import bitstring
@ArtNetPacket.register
class PollPacket(ArtNetPacket):
opcode = OPCODES['OpPoll']
schema = (
('header', 'bytes:8'),
('opcode', 'int:16'),
('protocol_version', 'uintbe:16'),
('talktome', 'int:8'),
('priority', 'int:8')
)
def __init__(self, talktome=0x02, priority=0, **kwargs):
super(PollPacket, self).__init__(**kwargs)
self.talktome = talktome
self.priority = priority
@ArtNetPacket.register
class PollReplyPacket(ArtNetPacket):
opcode = OPCODES['OpPollReply']
counter = 0
port = STANDARD_PORT
short_name = 'python-artnet'
long_name = 'https://github.com/philchristensen/python-artnet.git'
style = STYLE_CODES['StController']
esta_manufacturer = 'PA'
version = 1
universe = 0
status1 = 2
status2 = bitstring.Bits('0b0111').int
num_ports = 0
port_types = '\0\0\0\0'
good_input = '\0\0\0\0'
good_output = '\0\0\0\0'
bind_ip = '\0\0\0\0'
mac_address = uuid.getnode()
schema = (
('header', 'bytes:8'),
('opcode', 'int:16'),
('ip_address', 'bytes:4'),
('port', 'int:16'),
('version', 'uintbe:16'),
('net_switch', 'int:8'),
('sub_switch', 'int:8'),
('oem', 'uintbe:16'),
('ubea_version', 'int:8'),
('status1', 'int:8'),
('esta_manufacturer', 'bytes:2'),
('short_name', 'bytes:18'),
('long_name', 'bytes:64'),
('node_report', 'bytes:64'),
('num_ports', 'uintbe:16'),
('port_types', 'bytes:4'),
('good_input', 'bytes:4'),
('good_output', 'bytes:4'),
('switch_in', 'int:8'),
('switch_out', 'int:8'),
('switch_video', 'int:8'),
('switch_macro', 'int:8'),
('switch_remote', 'int:8'),
('spare1', 'int:8'),
('spare2', 'int:8'),
('spare3', 'int:8'),
('style', 'int:8'),
('mac_address', 'uintle:48'),
('bind_ip', 'bytes:4'),
('bind_index', 'int:8'),
('status2', 'int:8'),
('filler', 'bytes')
)
def __init__(self, **kwargs):
super(PollReplyPacket, self).__init__(**kwargs)
PollReplyPacket.counter += 1
def format_ip_address(self):
address = socket.gethostbyname(socket.gethostname())
return bitstring.pack('uint:8, uint:8, uint:8, uint:8', *[int(x) for x in address.split('.')]).bytes
@classmethod
def parse_ip_address(cls, b, fmt):
b = bitstring.BitStream(bytes=b.read(fmt))
address = b.readlist(','.join(['uint:8'] * 4))
return '.'.join([str(x) for x in address])
def format_short_name(self):
return self.short_name[0:18].ljust(18)
@classmethod
def parse_short_name(cls, b, fmt):
short_name = b.read(fmt)
return short_name.strip()
def format_long_name(self):
return self.long_name[0:64].ljust(64)
@classmethod
def parse_long_name(cls, b, fmt):
long_name = b.read(fmt)
return long_name.strip()
def format_node_report(self):
node_report = "#0001 [%s] Power On Tests successful" % PollReplyPacket.counter
return node_report[0:64].ljust(64)
@classmethod
def parse_node_report(cls, b, fmt):
node_report = b.read(fmt)
return node_report.strip()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment