[C#] Asynchronous Socket stream raakt corrupt door verzenden

Pagina: 1
Acties:

Onderwerpen


Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
Ik heb een class geschreven die data over tcp/ip verstuurd naar een server.
Deze 'data packages' zijn als volgt opgebouwd:

[WORD] Packet groote
[PACKET]

Nu is het probleem dat verschillende threads tegelijkertijd Packages kunnen versturen waardoor de packages door elkaar gaan lopen:

[[WORD]Dit is [[WORD]Dit is package 2] package 1]

Ik heb al geprobeerd lock's toe te voegen maar dit heeft blijkbaar geen nut.

C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
    public class Server
    {
        private System.Net.Sockets.TcpClient pr_tcpClient;
        private System.Threading.Thread pr_parseThread;

        private bool pr_isConnected = false;
        private byte[] pr_buffer = new byte[5120];
        private string pr_parseBuffer = "";
        
        private System.Net.IPAddress pr_ipAddress;
        private int pr_portNumber;

        public delegate void connectedDelegate();
        public event connectedDelegate Connected;

        public delegate void disconnectedDelegate();
        public event disconnectedDelegate Disconnected;

        public delegate void packetReceivedDelegate(string Packet);
        public event packetReceivedDelegate PacketReceived;

        public bool isConnected
        {
            get { return pr_isConnected; }
        }
        public void Connect(string Hostname, int PortNumber)
        {
            pr_buffer = new byte[5120];
            pr_parseBuffer = "";
            System.Threading.Thread.Sleep(3000);
            pr_isConnected = false;
            System.Net.IPAddress[] _ipAddresses = System.Net.Dns.GetHostAddresses(Hostname);
            pr_ipAddress = null;
            foreach (System.Net.IPAddress _ipAddress in _ipAddresses)
            {
                if (_ipAddress.AddressFamily == System.Net.Sockets.AddressFamily.InterNetwork)
                {
                    pr_ipAddress = _ipAddress;
                    break;
                }
            }
            pr_portNumber = PortNumber;
            if (pr_ipAddress != null)
            {
                pr_tcpClient = new System.Net.Sockets.TcpClient();
                pr_tcpClient.BeginConnect(pr_ipAddress, pr_portNumber, new AsyncCallback(this.connectCallback), null);
            }
        }
        private void reConnect()
        {
            System.Threading.Thread.Sleep(3000);
            pr_tcpClient.BeginConnect(pr_ipAddress, pr_portNumber, new AsyncCallback(this.connectCallback), null);
        }
        public void Disconnect()
        {
            // Close ParseThread
            if (pr_parseThread != null)
            {
                pr_parseThread.Abort();
                pr_parseThread = null;
            }
            try
            {
                System.Net.Sockets.NetworkStream _networkStream = pr_tcpClient.GetStream();
                _networkStream.Flush();
            }
            catch { }
            Console.WriteLine("Connection closed");
            pr_tcpClient.Close();
            pr_isConnected = false;
            if (this.Disconnected != null) { this.Disconnected(); }
        }
      
        public void Send(string Packet)
        {
            try
            {
                lock (pr_tcpClient.GetStream())
                {
                    System.IO.StreamWriter _streamWriter = new System.IO.StreamWriter(pr_tcpClient.GetStream());
                    _streamWriter.Write(Packet);
                    _streamWriter.Flush();
                }
            }
            catch { }
        }
     
        private void connectCallback(IAsyncResult ar)
        {
            try
            {
                pr_tcpClient.EndConnect(ar);
                pr_isConnected = true;
                pr_parseThread = new System.Threading.Thread(new System.Threading.ThreadStart(this.parseThread));
                pr_parseThread.Start();
                if (this.Connected != null) { this.Connected(); }
                Console.WriteLine("Connected to server.");
                pr_tcpClient.GetStream().BeginRead(pr_buffer, 0, 5120, new AsyncCallback(this.receiveCallback), null);
            }
            catch (Exception Ex)
            {
                Console.WriteLine(Ex.Message);
                this.reConnect();
            }
        }
        private void receiveCallback(IAsyncResult ar)
        {
            int _bytesRead = 0;
            try
            {
                try
                {
                    lock (pr_tcpClient.GetStream())
                    {
                        _bytesRead = pr_tcpClient.GetStream().EndRead(ar);
                    }
                }
                catch { }
                if (_bytesRead < 1)
                {
                    try
                    {
                        System.Net.Sockets.NetworkStream _networkStream = pr_tcpClient.GetStream();
                        _networkStream.Flush();
                    }
                    catch { }
                    pr_tcpClient.Client.Shutdown(System.Net.Sockets.SocketShutdown.Both);
                    pr_tcpClient.Close();
                    pr_isConnected = false;
                    Console.WriteLine("Connection closed by server.");
                    if (this.Disconnected != null) { this.Disconnected(); }
                    return;
                }
                string _received = System.Text.Encoding.UTF8.GetString(pr_buffer, 0, _bytesRead);
                lock (pr_parseBuffer) { pr_parseBuffer += _received; }
                lock (pr_tcpClient.GetStream())
                {
                    pr_tcpClient.GetStream().BeginRead(pr_buffer, 0, 5120, new AsyncCallback(this.receiveCallback), null);
                }
            }
            catch { }
        }
        private void parseThread()
        {
            do
            {
                System.Threading.Thread.Sleep(100);
                if (pr_parseBuffer.Length > 1)
                {
                    int _packetLength = InstanceId.Routines.getWord(pr_parseBuffer.Substring(0, 2));
                    lock (pr_parseBuffer) { pr_parseBuffer = pr_parseBuffer.Remove(0, 2); }
                    do
                    {
                        System.Threading.Thread.Sleep(250);
                    } while (pr_parseBuffer.Length < _packetLength);
                    string _packet = pr_parseBuffer.Substring(0, _packetLength);
                    lock (pr_parseBuffer) { pr_parseBuffer = pr_parseBuffer.Remove(0, _packetLength); }
                    if (this.PacketReceived != null) { this.PacketReceived(packet); }
                }
            } while (pr_isConnected == true);
        }
    }

Acties:
  • 0 Henk 'm!

  • Xiphalon
  • Registratie: Juni 2001
  • Laatst online: 17-09 17:27
Kijk even op welke objecten je lock'ed. Dat zijn altijd objecten in de huidige thread. De overige threads zien dat dus niet, en dus is het locken ineffectief.
Laatmaar. Volgens mij moet je gewoon breder locken. Dus meer code in je lock { ... } blok.

[ Voor 23% gewijzigd door Xiphalon op 01-03-2010 15:59 ]


Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
darkmage schreef op maandag 01 maart 2010 @ 15:57:
Kijk even op welke objecten je lock'ed. Dat zijn altijd objecten in de huidige thread. De overige threads zien dat dus niet, en dus is het locken ineffectief.
ok, dit is duidelijk, snap ik meteen waarom de locks geen nut hadden.
Maar ik heb verschillende threads draaien die events raisen wanneer er data verstuurd moet worden hoe kan ik er voor zorgen dat deze threads in de juiste thread data verzenden.

C#:
1
2
3
4
lock(server.send())
{
  server.send(data);
}


Heeft dus ook geen nut omdat elke event in een apparte thread draait.

Acties:
  • 0 Henk 'm!

  • Bozozo
  • Registratie: Januari 2005
  • Laatst online: 20-02 16:10

Bozozo

Your ad here?

Misschien kun je de architectuur beter aanpassen. Iets van een 'queue' thread waar de andere threads hun pakketjes aan doorgeven lijkt me geen overbodige luxe.

TabCinema : NiftySplit


Acties:
  • 0 Henk 'm!

  • urk_forever
  • Registratie: Juni 2001
  • Laatst online: 17-09 15:08
Verwijderd schreef op maandag 01 maart 2010 @ 16:05:
[...]


ok, dit is duidelijk, snap ik meteen waarom de locks geen nut hadden.
Maar ik heb verschillende threads draaien die events raisen wanneer er data verstuurd moet worden hoe kan ik er voor zorgen dat deze threads in de juiste thread data verzenden.

C#:
1
2
3
4
lock(server.send())
{
  server.send(data);
}


Heeft dus ook geen nut omdat elke event in een apparte thread draait.
Vziw is het beter om een lock object te maken wat je lockt ipv zoals je het nu doet. Voorbeeldje:

C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Server
    { 
        private object LockObject = new object();

        public void Send(string Packet)
        {
            lock(LockObject)
            {
                //Verstuur je data
                System.IO.StreamWriter _streamWriter = new System.IO.StreamWriter(pr_tcpClient.GetStream());
                    _streamWriter.Write(Packet);
                    _streamWriter.Flush(); 
            }
        }
    }

Hail to the king baby!


Acties:
  • 0 Henk 'm!

  • CodeCaster
  • Registratie: Juni 2003
  • Niet online

CodeCaster

Can I get uhm...

Locking is in deze situatie toch helemaal niet wenselijk, want dan moeten alle sockets alsnog op elkaar wachten. Dan kun je 'm net zo goed single-threaded uitvoeren?

https://oneerlijkewoz.nl
Op papier is hij aan het tekenen, maar in de praktijk...


Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
CodeCaster schreef op maandag 01 maart 2010 @ 16:53:
Locking is in deze situatie toch helemaal niet wenselijk, want dan moeten alle sockets alsnog op elkaar wachten. Dan kun je 'm net zo goed single-threaded uitvoeren?
Hier heb je gelijk in.... hoewel ik het ontvangen van de data wel multithreaded wil... verzenden zou eigenlijk gewoon single-threaded moeten zijn.

Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
urk_forever schreef op maandag 01 maart 2010 @ 16:49:
[...]


Vziw is het beter om een lock object te maken wat je lockt ipv zoals je het nu doet. Voorbeeldje:

C#:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Server
    { 
        private object LockObject = new object();

        public void Send(string Packet)
        {
            lock(LockObject)
            {
                //Verstuur je data
                System.IO.StreamWriter _streamWriter = new System.IO.StreamWriter(pr_tcpClient.GetStream());
                    _streamWriter.Write(Packet);
                    _streamWriter.Flush(); 
            }
        }
    }
Na het proberen op deze manier blijf ik nog steeds met hetzelfde probleem zitten....

Acties:
  • 0 Henk 'm!

  • Bozozo
  • Registratie: Januari 2005
  • Laatst online: 20-02 16:10

Bozozo

Your ad here?

Waarom reageer je niet op mijn suggestie?

De meest logische aanpak is om een wachtrij thread te maken, waaraan alle andere threads data leveren. Zodra er een item in de rij staat begin je te zenden. Als er nog een item in de rij komt te staan tijdens het zenden gebeurt er niets. Als het huidige item is verzonden (dan wordt er ongetwijfeld een event afgevuurd) kan het eerstvolgende item in de wachtrij de deur uit. Je hebt dan zelfs de mogelijkheid om meerdere items in de wachtrij te mergen, zodat je minder requests hoeft te doen.

TabCinema : NiftySplit


Acties:
  • 0 Henk 'm!

  • ThaStealth
  • Registratie: Oktober 2004
  • Laatst online: 11-09 10:19
Misschien maak je elke keer een nieuw Server object aan (dan wordt het lockobject ook opnieuw aangemaakt).

De beste oplossing is wat Bozozo omschrijft, een Queue<T> gebruiken. Als je het perse multithreading wil verzenden moet je meerdere sockets (=connecties) aanmaken ben ik bang

Mess with the best, die like the rest


Acties:
  • 0 Henk 'm!

  • pedorus
  • Registratie: Januari 2008
  • Niet online
Wat me opvalt is dat je GetStream() veels te vaak aanroept (zelfs in lock), zonder dat je deze sluit zoals staat aangegeven in de documentatie. Verder is die Thread.Sleep() (2x) een indicatie dat je niet helemaal begrijpt hoe je met meerdere threads om moet gaan. Ik zou zeggen, pak er eerst een boek of tutorial bij (voorbeeld). :p
offtopic:
Overigens vind ik die naamgeving met pr_ en _ niet handig - kleine beginletters horen private/local aan te geven, zie oa hier.
ThaStealth schreef op maandag 01 maart 2010 @ 18:27:
Als je het perse multithreading wil verzenden moet je meerdere sockets (=connecties) aanmaken ben ik bang
Op zich kun je prima een connectie delen over zeg meerdere rekenthreads.

Vitamine D tekorten in Nederland | Dodelijk coronaforum gesloten


Acties:
  • 0 Henk 'm!

  • Soultaker
  • Registratie: September 2000
  • Laatst online: 03:13
Het essentiële probleem zit waarschijnlijk in het gebruik van meerdere TcpStreams. Ik ben niet heel bekend met .NET, maar ik vermoed dat die niet uniek zijn, en je dus bij elke call naar GetStream een nieuwe krijgt. Het feit dat je 'm moet closen na gebruik suggereert dat ook (hoewel het dan nog wel één enkel reference-counted object zou kunnen zijn).

De oplossing is dan simpel: roep één keer GetStream aan, en zorg dat alle threads dezelfde stream gebruiken. Je kunt dan simpelweg het (enige) stream-object locken voor je er naar schrijft.

[ Voor 14% gewijzigd door Soultaker op 01-03-2010 19:40 ]


Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
pedorus schreef op maandag 01 maart 2010 @ 18:46:
Wat me opvalt is dat je GetStream() veels te vaak aanroept (zelfs in lock), zonder dat je deze sluit zoals staat aangegeven in de documentatie. Verder is die Thread.Sleep() (2x) een indicatie dat je niet helemaal begrijpt hoe je met meerdere threads om moet gaan. Ik zou zeggen, pak er eerst een boek of tutorial bij (voorbeeld). :p
offtopic:
Overigens vind ik die naamgeving met pr_ en _ niet handig - kleine beginletters horen private/local aan te geven, zie oa hier.


[...]

Op zich kun je prima een connectie delen over zeg meerdere rekenthreads.
Soultaker schreef op maandag 01 maart 2010 @ 19:39:
Het essentiële probleem zit waarschijnlijk in het gebruik van meerdere TcpStreams. Ik ben niet heel bekend met .NET, maar ik vermoed dat die niet uniek zijn, en je dus bij elke call naar GetStream een nieuwe krijgt. Het feit dat je 'm moet closen na gebruik suggereert dat ook (hoewel het dan nog wel één enkel reference-counted object zou kunnen zijn).

De oplossing is dan simpel: roep één keer GetStream aan, en zorg dat alle threads dezelfde stream gebruiken. Je kunt dan simpelweg het (enige) stream-object locken voor je er naar schrijft.
probeem is opgelost, GetStream() maakte elke keer een nieuwe networkstream aan waardoor locken dus ook geen zin had. Nu word er bij het openen van de connectie 1x GetStream aangeroepen en word deze telkens opnieuw gebruikt. Locken werkt nu ook.

en de thread.sleep (2x) klopt iid niet, waarom ik dit gedaan heb weet ik niet... beetje stom
"Capitalization Styles" daar ga ik me in het vervolg aanhouden, bedankt voor de tip

[ Voor 4% gewijzigd door Verwijderd op 01-03-2010 20:48 ]


Acties:
  • 0 Henk 'm!

  • Hydra
  • Registratie: September 2000
  • Laatst online: 21-08 17:09
Verwijderd schreef op maandag 01 maart 2010 @ 20:40:
probeem is opgelost, GetStream() maakte elke keer een nieuwe networkstream aan waardoor locken dus ook geen zin had. Nu word er bij het openen van de connectie 1x GetStream aangeroepen en word deze telkens opnieuw gebruikt. Locken werkt nu ook.
FF een 'tip': over het algemeen gebruik je een monitor object als semafoor dat je aan je verschillende threads geeft, om twee redenen: je maakt expliciet dat je dat object als lock gebruikt (duidelijkere code), en je voorkomt bovenstaande probleem.

Verder is het een beetje een nono om zaken die je uit een API krijgt te gebruiken als semafoor omdat ze daar niet voor bedoeld zijn. Je hebt geen garanties over het uniek zijn van het object, locken op iets als Console.OpenStandardOutput() bijvoorbeeld gaat je problemen opleveren als andere threads binnen je VM dat object voor dezelfde zaken 'misbruiken'.

[ Voor 3% gewijzigd door Hydra op 02-03-2010 11:10 ]

https://niels.nu

Pagina: 1