Voor een hobby projectje waar ik mee bezig ben, heb ik een multicast server/client nodig, om state-updates rond te sturen (een array met hinfo dictionary's, zie de code). Nu heb ik aan de hand van PyZeroConf een multicast client/server gemaakt (daar de documentatie over python icm multicast nogal summier is). Het server gedeelte werkt naar behoren (te zien aan de tcpdump output hieronder). Echter krijg ik in de client helemaal niets binnen.. Zie ik nu wat over het hoofd? Mis ik een stukje kennis over multicast / igmp waardoor dit niet werkt? Is m'n code te brak voor woorden waardoor het niet werkt? Hebben jullie meer info nodig? Erm, help 
De code werkt als volgt:
Op het moment dat je de code opstart met de "msd" parameter, dan wordt als eerste een thread gestart, waarin een multicast socket geopent wordt. Hierna wordt een array aangemaakt, welke hinfo dicts bevat. Deze worden mvb cPickle ge-serialized en vervolgens, iedere seconde, verstuurd via de multicast socket.
De client wordt gestart door een willekeurige andere parameter mee te geven. Ook hier wordt een thread gestart, welke een multicast socket opent. Als client wordt echter ook een class(McastBroker) opgestart, welke dmv "select" alle geregistreerde sockets in de gaten houd, en een andere class(in dit geval MasterListener) start en de handle_read() call uitvoerd op het moment dat er data ontvangen wordt.
Als eerste de code:
Ik heb verschillende test scenario's uitgeprobeerd, en deze met tcpdump bekeken. Hieronder de verschillende scenario's:
- starten van de server:
- starten van de client:
- tcpdump output van dataverkeer met intf="127.0.0.1":
- tcpdump output van dataverkeer met intf="172.16.1.20"
De code werkt als volgt:
Op het moment dat je de code opstart met de "msd" parameter, dan wordt als eerste een thread gestart, waarin een multicast socket geopent wordt. Hierna wordt een array aangemaakt, welke hinfo dicts bevat. Deze worden mvb cPickle ge-serialized en vervolgens, iedere seconde, verstuurd via de multicast socket.
De client wordt gestart door een willekeurige andere parameter mee te geven. Ook hier wordt een thread gestart, welke een multicast socket opent. Als client wordt echter ook een class(McastBroker) opgestart, welke dmv "select" alle geregistreerde sockets in de gaten houd, en een andere class(in dit geval MasterListener) start en de handle_read() call uitvoerd op het moment dat er data ontvangen wordt.
Als eerste de code:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
| #!/usr/bin/env python
from socket import socket
from socket import inet_aton
from socket import gethostname
from socket import gethostbyname
from socket import AF_INET
from socket import SOCK_DGRAM
from socket import SOL_SOCKET
from socket import SO_REUSEADDR
from socket import SOL_IP
from socket import IP_MULTICAST_IF
from socket import IP_MULTICAST_TTL
from socket import IP_MULTICAST_LOOP
from socket import IP_ADD_MEMBERSHIP
from socket import IP_DROP_MEMBERSHIP
from cPickle import dumps
from threading import Thread
from threading import Condition
from time import sleep
from traceback import print_exc
from sys import argv
from select import select
globals()['_GLOBAL_DONE'] = 0
class McastBroker(Thread):
def __init__(self, masterclient):
Thread.__init__(self)
self.masterclient = masterclient
self.readers = {} # maps socket to reader
self.timeout = 5
self.condition = Condition()
self.setDaemon(True)
self.start()
def run(self):
while not globals()['_GLOBAL_DONE']:
rs = self.getReaders()
print "len(rs) == %s" % (len(rs))
if len(rs) == 0:
self.condition.acquire()
self.condition.wait(self.timeout)
self.condition.release()
else:
#try:
print "before select: rs=%s, readers=%s" % (rs, self.readers)
rr, wr, er = select(rs, [], [], self.timeout)
print "after select: rr=%s, wr=%s, er=%s" % (rr, wr, er)
for socket in rr:
#try:
print "got read request"
self.readers[socket].handle_read()
#except:
# traceback.print_exc()
#except:
# pass
def getReaders(self):
result = []
self.condition.acquire()
result = self.readers.keys()
self.condition.release()
return result
def addReader(self, reader, socket):
self.condition.acquire()
self.readers[socket] = reader
self.condition.notify()
self.condition.release()
def delReader(self, socket):
self.condition.acquire()
del(self.readers[socket])
self.condition.notify()
self.condition.release()
def notify(self):
self.condition.acquire()
self.condition.notify()
self.condition.release()
class MasterListener(object):
def __init__(self, masterclient):
self.masterclient = masterclient
self.masterclient.mcastbroker.addReader(self, self.masterclient.msoc)
def handle_read(self):
print "data received"
data, (addr, port) = self.masterclient.msoc.recvfrom(8972)
print "data received from %s:%s" % (addr, port)
print "data = %s" % (data)
class MasterClient(Thread):
def __init__(self, intf, mcast_addr, mcast_port, hinfo, type):
Thread.__init__(self)
self.__intf = intf
self.__addr = mcast_addr
self.__port = mcast_port
self.__type = type
self.__group = ('', self.__port)
self.__setup_socket()
self.masters = {}
self.masters["msd"] = []
self.masters["msd"].append(hinfo)
if type != "msd":
self.mcastbroker = McastBroker(self)
self.listener = MasterListener(self)
self.condition = Condition()
self.setDaemon(True)
self.start()
def __setup_socket(self):
print "setting up socket"
self.msoc = socket(AF_INET, SOCK_DGRAM)
try:
print "setting REUSE flags"
self.msoc.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
self.msoc.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
except:
pass
print "setting MULTICAST options"
self.msoc.setsockopt(SOL_IP, IP_MULTICAST_TTL, 255)
self.msoc.setsockopt(SOL_IP, IP_MULTICAST_LOOP, 1)
try:
print "binding to %s:%s" % (self.__addr, self.__port)
self.msoc.bind(self.group)
except:
pass
print "joining multicast group"
self.msoc.setsockopt(SOL_IP, IP_MULTICAST_IF, inet_aton(self.__intf) +\
inet_aton('0.0.0.0'))
self.msoc.setsockopt(SOL_IP, IP_ADD_MEMBERSHIP, inet_aton(self.__addr)\
+ inet_aton('0.0.0.0'))
print "socket setup"
def __destroy_socket(self):
self.notifyAll()
self.mcastbroker.notify()
self.msoc.setsockopt(SOL_IP, IP_DROP_MEMBERSHIP, inet_aton(self.__port)\
+ inet_aton('0.0.0.0'))
self.msoc.close()
def notifyAll(self):
self.condition.acquire()
self.condition.notifyAll()
self.condition.release()
def send(self, out):
try:
bytes_sent = self.msoc.sendto(out, 0, (self.__addr, self.__port))
print "%s bytes sent to %s:%s" % \
(bytes_sent, self.__addr, self.__port)
except:
print "can't send packet"
def run(self):
while not globals()['_GLOBAL_DONE']:
if self.__type == "msd":
CMD_STR = "%s:%s" % (1, dumps((self.masters), 1))
self.send(CMD_STR)
sleep(1.0)
else:
print "listening"
sleep(1.0)
hinfo = {}
hinfo["hostname"] = gethostname()
hinfo["role"] = 0
hinfo["user"] = "msd_macross"
hinfo["pass"] = "*****"
hinfo["bindto_ip"] = '127.0.0.1'
hinfo["bindto_port"] = 12485
hinfo["global_debug"] = 1
intf = "172.16.1.20"
mcast_addr = "224.0.0.251"
mcast_port = 12485
if __name__ == "__main__":
type = argv[1]
masterclient = MasterClient(intf, mcast_addr, mcast_port, hinfo, type)
tmp = raw_input("running, press a key to quit\n") |
Ik heb verschillende test scenario's uitgeprobeerd, en deze met tcpdump bekeken. Hieronder de verschillende scenario's:
- starten van de server:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| r3boot@macross:~/Documents/Projects/ad$ ./mcast.py msd setting up socket setting REUSE flags setting MULTICAST options binding to 224.0.0.251:12485 joining multicast group socket setup running, press a key to quit 162 bytes sent to 224.0.0.251:12485 162 bytes sent to 224.0.0.251:12485 162 bytes sent to 224.0.0.251:12485 162 bytes sent to 224.0.0.251:12485 162 bytes sent to 224.0.0.251:12485 r3boot@macross:~/Documents/Projects/ad$ |
- starten van de client:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
| r3boot@macross:~/Documents/Projects/ad$ ./mcast.py client setting up socket setting REUSE flags setting MULTICAST options binding to 224.0.0.251:12485 joining multicast group socket setup running, press a key to quit listening listening listening listening listening r3boot@macross:~/Documents/Projects/ad$ |
- tcpdump output van dataverkeer met intf="127.0.0.1":
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| r3boot@macross:~$ netstat -rn | grep "224" 224.0.0.0 127.0.0.1 255.0.0.0 UG 0 0 0 lo r3boot@macross:~$ sudo tcpdump -ni lo host 224.0.0.251 or igmp tcpdump: verbose output suppressed, use -v or -vv for full protocol decode listening on lo, link-type EN10MB (Ethernet), capture size 96 bytes 14:01:00.845175 IP 172.16.1.20 > 224.0.0.22: igmp v3 report, 1 group record(s) 14:01:00.905130 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162 14:01:01.969293 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162 14:01:02.969126 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162 14:01:03.968990 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162 14:01:04.968836 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162 14:01:05.952378 IP 172.16.1.20 > 224.0.0.22: igmp v3 report, 1 group record(s) 7 packets captured 14 packets received by filter 0 packets dropped by kernel |
- tcpdump output van dataverkeer met intf="172.16.1.20"
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| r3boot@macross:~$ netstat -rn | grep "224" 224.0.0.0 172.16.1.20 255.0.0.0 UG 0 0 0 eth0 r3boot@macross:~$ sudo tcpdump -ni eth0 host 224.0.0.251 or igmp tcpdump: verbose output suppressed, use -v or -vv for full protocol decode listening on eth0, link-type EN10MB (Ethernet), capture size 96 bytes 14:09:27.771518 IP 172.16.1.20 > 224.0.0.22: igmp v3 report, 1 group record(s) 14:09:27.843303 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162 14:09:28.909107 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162 14:09:29.908966 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162 14:09:30.908812 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162 14:09:31.908672 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162 14:09:32.261247 IP 172.16.1.20 > 224.0.0.22: igmp v3 report, 1 group record(s) 7 packets captured 7 packets received by filter 0 packets dropped by kernel r3boot@macross:~$ |
offtopic:
Hmm, m'n eerste /14 topic,... be gentle
Hmm, m'n eerste /14 topic,... be gentle
[ Voor 7% gewijzigd door Verwijderd op 20-05-2005 14:30 ]