Toon posts:

[python] multicast client ontvangt geen data

Pagina: 1
Acties:

Verwijderd

Topicstarter
Voor een hobby projectje waar ik mee bezig ben, heb ik een multicast server/client nodig, om state-updates rond te sturen (een array met hinfo dictionary's, zie de code). Nu heb ik aan de hand van PyZeroConf een multicast client/server gemaakt (daar de documentatie over python icm multicast nogal summier is). Het server gedeelte werkt naar behoren (te zien aan de tcpdump output hieronder). Echter krijg ik in de client helemaal niets binnen.. Zie ik nu wat over het hoofd? Mis ik een stukje kennis over multicast / igmp waardoor dit niet werkt? Is m'n code te brak voor woorden waardoor het niet werkt? Hebben jullie meer info nodig? Erm, help :+

De code werkt als volgt:
Op het moment dat je de code opstart met de "msd" parameter, dan wordt als eerste een thread gestart, waarin een multicast socket geopent wordt. Hierna wordt een array aangemaakt, welke hinfo dicts bevat. Deze worden mvb cPickle ge-serialized en vervolgens, iedere seconde, verstuurd via de multicast socket.

De client wordt gestart door een willekeurige andere parameter mee te geven. Ook hier wordt een thread gestart, welke een multicast socket opent. Als client wordt echter ook een class(McastBroker) opgestart, welke dmv "select" alle geregistreerde sockets in de gaten houd, en een andere class(in dit geval MasterListener) start en de handle_read() call uitvoerd op het moment dat er data ontvangen wordt.

Als eerste de code:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python

from socket import socket
from socket import inet_aton
from socket import gethostname
from socket import gethostbyname
from socket import AF_INET
from socket import SOCK_DGRAM
from socket import SOL_SOCKET
from socket import SO_REUSEADDR
from socket import SOL_IP
from socket import IP_MULTICAST_IF
from socket import IP_MULTICAST_TTL
from socket import IP_MULTICAST_LOOP
from socket import IP_ADD_MEMBERSHIP
from socket import IP_DROP_MEMBERSHIP

from cPickle    import dumps

from threading  import Thread
from threading  import Condition
from time   import sleep
from traceback  import print_exc
from sys    import argv
from select import select

globals()['_GLOBAL_DONE'] = 0

class McastBroker(Thread):
    def __init__(self, masterclient):
        Thread.__init__(self)
        self.masterclient = masterclient
        self.readers = {} # maps socket to reader
        self.timeout = 5
        self.condition = Condition()
        self.setDaemon(True)
        self.start()

    def run(self):
        while not globals()['_GLOBAL_DONE']:
            rs = self.getReaders()
            print "len(rs) == %s" % (len(rs))
            if len(rs) == 0:
                self.condition.acquire()
                self.condition.wait(self.timeout)
                self.condition.release()
            else:
                #try:
                print "before select: rs=%s, readers=%s" % (rs, self.readers)
                rr, wr, er = select(rs, [], [], self.timeout)
                print "after select: rr=%s, wr=%s, er=%s" % (rr, wr, er)
                for socket in rr:
                        #try:
                    print "got read request"
                    self.readers[socket].handle_read()
                        #except:
                        #   traceback.print_exc()
                #except:
                #   pass

    def getReaders(self):
        result = []
        self.condition.acquire()
        result = self.readers.keys()
        self.condition.release()
        return result

    def addReader(self, reader, socket):
        self.condition.acquire()
        self.readers[socket] = reader
        self.condition.notify()
        self.condition.release()

    def delReader(self, socket):
        self.condition.acquire()
        del(self.readers[socket])
        self.condition.notify()
        self.condition.release()

    def notify(self):
        self.condition.acquire()
        self.condition.notify()
        self.condition.release()

class MasterListener(object):
    def __init__(self, masterclient):
        self.masterclient = masterclient
        self.masterclient.mcastbroker.addReader(self, self.masterclient.msoc)
    def handle_read(self):
        print "data received"
        data, (addr, port) = self.masterclient.msoc.recvfrom(8972)
        print "data received from %s:%s" % (addr, port)
        print "data = %s" % (data)

class MasterClient(Thread):
    def __init__(self, intf, mcast_addr, mcast_port, hinfo, type):
        Thread.__init__(self)
                self.__intf = intf
        self.__addr = mcast_addr
        self.__port = mcast_port
        self.__type = type
        self.__group = ('', self.__port)
        self.__setup_socket()
        self.masters = {}
        self.masters["msd"] = []
        self.masters["msd"].append(hinfo)
        if type != "msd":
            self.mcastbroker = McastBroker(self)
            self.listener = MasterListener(self)
        self.condition = Condition()
        self.setDaemon(True)
        self.start()
    def __setup_socket(self):
        print "setting up socket"
        self.msoc = socket(AF_INET, SOCK_DGRAM)
        try:
            print "setting REUSE flags"
            self.msoc.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
            self.msoc.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1)
        except:
            pass
        print "setting MULTICAST options"
        self.msoc.setsockopt(SOL_IP, IP_MULTICAST_TTL, 255)
        self.msoc.setsockopt(SOL_IP, IP_MULTICAST_LOOP, 1)
    
        try:
            print "binding to %s:%s" % (self.__addr, self.__port)
            self.msoc.bind(self.group)
        except:
            pass

        print "joining multicast group"
        self.msoc.setsockopt(SOL_IP, IP_MULTICAST_IF, inet_aton(self.__intf) +\
                      inet_aton('0.0.0.0'))
        self.msoc.setsockopt(SOL_IP, IP_ADD_MEMBERSHIP, inet_aton(self.__addr)\
                      + inet_aton('0.0.0.0'))
        print "socket setup"
    def __destroy_socket(self):
        self.notifyAll()
        self.mcastbroker.notify()
        self.msoc.setsockopt(SOL_IP, IP_DROP_MEMBERSHIP, inet_aton(self.__port)\
                      + inet_aton('0.0.0.0'))
        self.msoc.close()
    def notifyAll(self):
        self.condition.acquire()
        self.condition.notifyAll()
        self.condition.release()
    def send(self, out):
        try:
            bytes_sent = self.msoc.sendto(out, 0, (self.__addr, self.__port))
            print "%s bytes sent to %s:%s" % \
                            (bytes_sent, self.__addr, self.__port)
        except:
            print "can't send packet"
    def run(self):
        while not globals()['_GLOBAL_DONE']:
            if self.__type == "msd":
                CMD_STR = "%s:%s" % (1, dumps((self.masters), 1))
                self.send(CMD_STR)
                sleep(1.0)
            else:
                print "listening"
                sleep(1.0)

hinfo = {}
hinfo["hostname"] = gethostname()
hinfo["role"] = 0
hinfo["user"] = "msd_macross"
hinfo["pass"] = "*****"
hinfo["bindto_ip"] = '127.0.0.1'
hinfo["bindto_port"] = 12485
hinfo["global_debug"] = 1

intf = "172.16.1.20"
mcast_addr = "224.0.0.251"
mcast_port = 12485

if __name__ == "__main__":
    type = argv[1]
    masterclient = MasterClient(intf, mcast_addr, mcast_port, hinfo, type)
    tmp = raw_input("running, press a key to quit\n")


Ik heb verschillende test scenario's uitgeprobeerd, en deze met tcpdump bekeken. Hieronder de verschillende scenario's:

- starten van de server:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
r3boot@macross:~/Documents/Projects/ad$ ./mcast.py msd
setting up socket
setting REUSE flags
setting MULTICAST options
binding to 224.0.0.251:12485
joining multicast group
socket setup
running, press a key to quit
162 bytes sent to 224.0.0.251:12485
162 bytes sent to 224.0.0.251:12485
162 bytes sent to 224.0.0.251:12485
162 bytes sent to 224.0.0.251:12485
162 bytes sent to 224.0.0.251:12485

r3boot@macross:~/Documents/Projects/ad$


- starten van de client:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
r3boot@macross:~/Documents/Projects/ad$ ./mcast.py client
setting up socket
setting REUSE flags
setting MULTICAST options
binding to 224.0.0.251:12485
joining multicast group
socket setup
running, press a key to quit
listening
listening
listening
listening
listening

r3boot@macross:~/Documents/Projects/ad$


- tcpdump output van dataverkeer met intf="127.0.0.1":
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
r3boot@macross:~$ netstat -rn | grep "224"
224.0.0.0       127.0.0.1       255.0.0.0       UG        0 0          0 lo
r3boot@macross:~$ sudo tcpdump -ni lo host 224.0.0.251 or igmp
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on lo, link-type EN10MB (Ethernet), capture size 96 bytes
14:01:00.845175 IP 172.16.1.20 > 224.0.0.22: igmp v3 report, 1 group record(s)
14:01:00.905130 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162
14:01:01.969293 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162
14:01:02.969126 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162
14:01:03.968990 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162
14:01:04.968836 IP 127.0.0.1.32873 > 224.0.0.251.12485: UDP, length: 162
14:01:05.952378 IP 172.16.1.20 > 224.0.0.22: igmp v3 report, 1 group record(s)

7 packets captured
14 packets received by filter
0 packets dropped by kernel


- tcpdump output van dataverkeer met intf="172.16.1.20"
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
r3boot@macross:~$ netstat -rn | grep "224"
224.0.0.0       172.16.1.20     255.0.0.0       UG        0 0          0 eth0
r3boot@macross:~$ sudo tcpdump -ni eth0 host 224.0.0.251 or igmp
tcpdump: verbose output suppressed, use -v or -vv for full protocol decode
listening on eth0, link-type EN10MB (Ethernet), capture size 96 bytes
14:09:27.771518 IP 172.16.1.20 > 224.0.0.22: igmp v3 report, 1 group record(s)
14:09:27.843303 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162
14:09:28.909107 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162
14:09:29.908966 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162
14:09:30.908812 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162
14:09:31.908672 IP 172.16.1.20.32876 > 224.0.0.251.12485: UDP, length: 162
14:09:32.261247 IP 172.16.1.20 > 224.0.0.22: igmp v3 report, 1 group record(s)

7 packets captured
7 packets received by filter
0 packets dropped by kernel
r3boot@macross:~$


offtopic:
Hmm, m'n eerste /14 topic,... be gentle :+

[ Voor 7% gewijzigd door Verwijderd op 20-05-2005 14:30 ]


  • Domokoen
  • Registratie: Januari 2003
  • Laatst online: 06-05 09:27
Ja je ziet iets over het hoofd (denk ik):
code:
1
2
3
4
5
6
7
8
9
    def run(self):
        while not globals()['_GLOBAL_DONE']:
            if self.__type == "msd":
                CMD_STR = "%s:%s" % (1, dumps((self.masters), 1))
                self.send(CMD_STR)
                sleep(1.0)
            else:
                print "listening"
                sleep(1.0)
edit:
Hmmm... je code is misschien gewoon erg onoverzichtelijk. Waarom moet je server en client per se dezelfde code hebben? Zet het gewoon in 2 modules, da's een stuk leesbaarder. Wat ik hieronder zeg klopt dan ook niet, omdat er blijkbaar nóg een andere thread ook draait met wat dingen. 8)7


Je print alleen listening... moet daar niet ook iets als receive bij in de client!? Want listening wordt wél gewoon geprint. Ik weet niet welke code je nog zou willen aanroepen, maar het gebeurd iig niet.

[ Voor 24% gewijzigd door Domokoen op 21-05-2005 00:47 . Reden: klopt niet wat ik zeg ]