[java] thread onderbreken tijdens I/O

Pagina: 1
Acties:

  • Johnny
  • Registratie: December 2001
  • Laatst online: 22-05 10:01

Johnny

ondergewaardeerde internetguru

Topicstarter
Ik ben bezig met een java applicatie die meerdere threads gebruikt. De meeste threads voeren om de zoveel tijd een actie uit, maar liggen voor het grootste deel van de tijd te slapen.

Een thread, werkt een lijst met internetadressen af waarmee hij een verbinding probeert te maken, het grootste gedeelte van de tijd is hij bezig met het wachten op antwoord en het ontvangen van data.

Om het te stoppen probeer ik egbruik te maken van intterupt(). Deze methode zou alle slapende/wachtende threads een schop moeten geven waardoor ze verder gaan, ook wordt er een vlag (flag) gehesen, waarmee met behlup van isIntterupted() kan worden gekeken of de thread moet stoppen of niet.

Het probleem is echter dat intterupt() wordt genegereerd tijdens I/O operaties waardoor het programma gewoon blijft lopen.

In een ander topic stond hier over het volgende:
What if a thread doesn't respond to Thread.interrupt?

In some cases, you can use application specific tricks. For example, if a thread is waiting on a known socket, you can close the socket to cause the thread to return immediately. Unfortunately, there really isn't any technique that works in general. It should be noted that in all situations where a waiting thread doesn't respond to Thread.interrupt, it wouldn't respond to Thread.stop either. Such cases include deliberate denial-of-service attacks, and I/O operations for which thread.stop and thread.interrupt do not work properly.
Het verbreken van de verbinding is echter niet mijn bedoeling. De thread mag gewoon de verbinding afwachten en netjes afsluiten, als hij maar niet weer een nieuwe opent.

IK heb al wat gevonden over InterruptedIOException, maar als ik die probeer krijg ik de melding dat die nooit kan worden gegooit (InterruptedIOException is never thrown), waarschijnlijk omdat er nergens een sleep/wait staat.


Pseudo code:
Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class ControlClass {

    if(startCommand) {
        foobar1Thread.start();
        foobar2Thread.start();
    }

    if(stopCommand) {
        foobar1Thread.interrupt();
        toobar2Thread.interrupt();
    }

}

//stopt wel goed
class foobar1 implements Runnable {

  private Thread myThread = Thread.currentThread();

    public void run() {

        try {
            while(!myThread.isInterrupted()) {

            //ga een uurtje slapen als er niets meer te doen is
            if(!doeIets()) {
                int sleepTime = 3600000;
                Thread.sleep(sleepTime);
            }
            }
        } catch(InterruptedException e){
            System.out.println("Thread Error: " + e);
        }
    }   

    private boolean doeIets() {
        //blabla
    }

}

//stopt niet
class foobar2 implements Runnable {

    private Thread myThread = Thread.currentThread();

    public void run() {

        try {
            while(!myThread.isInterrupted()) {

            //ga een uurtje slapen als er niets meer te doen is
            if(!doeEenLoop()) {
                int sleepTime = 3600000;
                Thread.sleep(sleepTime);
            }
            }
        } catch(InterruptedException e){
            System.out.println("Thread Error: " + e);
        }
    }   

    //blijft doorgaan!!! :(
    private void doeEenLoop() {

        while(dataInDB && !myThread.interrupted()) {
            openConnectie();
        }
    }

    private void openConnectie() {

        //open socket, I/O block, lang wachten enzo
    }

}


Hoe zorg ik dat als een intterupt() wordt gedaan terwijl een connectie open is hij na het sluiten van het socket niet meer verder gaat?

Aan de inhoud van de bovenstaande tekst kunnen geen rechten worden ontleend, tenzij dit expliciet in dit bericht is verwoord.


  • Voutloos
  • Registratie: Januari 2002
  • Niet online
Klopt het wel dat de while van de methode doeEenLoop() interrupted() gebruikt ipv isInterrupted()?

Beiden bestaan, maar interrupted() haalt de vlag na de 1x weg.
Tests whether the current thread has been interrupted. The interrupted status of the thread is cleared by this method.
NB: ik ben geen thread-expert, het verschil viel me alleen op... O-)
NB2: doeEenLoop returnt geen boolean, maar dat zal wel een knip/plak foutje in de pseudocode zijn.

[ Voor 10% gewijzigd door Voutloos op 08-07-2004 08:46 ]

{signature}


Verwijderd

Is het mogelijk om je IO asynchroon te doen?
Een thread die asynchrone IO doet echt bijna nooit wat en kun je dus ook interrupteren.
Het werkt zo:
Je zet een IO-rqst uit, en die functiecall komt direct terug.
Je object in kwestie heeft een callback-handler (aka eventhandler) method. Die method is bekend aan de IO-module.
De IO module roept jouw event handler aan als er iets te melden is.

Belangrijkste is dus: Je thread hangt niet terwijl hij op antwoord wacht.

  • Macros
  • Registratie: Februari 2000
  • Laatst online: 30-04 09:28

Macros

I'm watching...

Je moet je IO thread gewoon een var laten checken of hij moet stoppen ja of nee. Als je de methodes om die var te controleren en te setten syncronised maakt gaat alles nog goed ook. Dan kan je in een andere thread (de main thread) die var op false zetten (of true, whatever je kiest). En als je IO thread klaar is met de socket kijkt ie naar die var voordat ie verder gaat.

Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean continue = false;
syncronized public void stop(){
  continue = false;
}
syncronized public void resume(){
  // misschien wil je deze ook echt de thread laten starten...
  continue = true;
}

public void loopingIO(){
  resume();
  while(continue){
    //do my thing
  }
}

"Beauty is the ultimate defence against complexity." David Gelernter


  • frice
  • Registratie: Januari 2001
  • Laatst online: 01-04 17:55
Een interrupt naar een thread sturen is eigenlijk bedoeld als een soort nood-mechanisme. Een elegantere manier is om een wait(..) te doen. Een notify() naar de thread sturen stopt dan de wait.

Als je wilt dat IO niet te lang duurt moet je een timeout zetten. Als je bv van een socket wilt lezen en niet wilt dat er langer dan drie seconden gewacht wordt op input, doe dan socket.setSoTimeout(3000); en hij throwt een SocketTimeoutException als het te lang duurt. Deze timout werkt bij een read() op een Socket en een accept() op een ServerSocket.

Als je wilt dat het connecten van een socket een beperkte tijd duurt, moet je de socket iets anders openen. Ipv new Socket(url, port); maak je eerst een unbound socket: socket=new Socket(); waarna je de socket connect: socket.connect(address, timeoutMs); waar address een SocketAddress is wat het poortnr ook bevat.

Succes

Verwijderd

Interrupt werkt niet op een thread die blocked staat in een IO request. (staat netjes in de API)

Het werkt alleen maar als je een IO request doet op een interruptable channel of als de thread blocked staat in een Selector.select()

Maar als je dus gewoon een read doet van een inputstream (zonder channels) en je roept interrupt aan... dan gebeurd er dus niets.

  • Johnny
  • Registratie: December 2001
  • Laatst online: 22-05 10:01

Johnny

ondergewaardeerde internetguru

Topicstarter
Voutloos schreef op 08 juli 2004 @ 08:44:
Klopt het wel dat de while van de methode doeEenLoop() interrupted() gebruikt ipv isInterrupted()?

Beiden bestaan, maar interrupted() haalt de vlag na de 1x weg.

[...]

NB: ik ben geen thread-expert, het verschil viel me alleen op... O-)
NB2: doeEenLoop returnt geen boolean, maar dat zal wel een knip/plak foutje in de pseudocode zijn.
Dat is inderdaad een foutje inde pseudo-code, net als dat intterupted()
Verwijderd schreef op 08 juli 2004 @ 08:55:
Is het mogelijk om je IO asynchroon te doen?
Een thread die asynchrone IO doet echt bijna nooit wat en kun je dus ook interrupteren.
Het werkt zo:
Je zet een IO-rqst uit, en die functiecall komt direct terug.
Je object in kwestie heeft een callback-handler (aka eventhandler) method. Die method is bekend aan de IO-module.
De IO module roept jouw event handler aan als er iets te melden is.

Belangrijkste is dus: Je thread hangt niet terwijl hij op antwoord wacht.
Ik snap je wel ongeveer maar niet helemaal.

Ik heb nergens synchronized gebruikt, dat impliceert dan toch dat het al asynchroom is?

Ik zou dus de klasse foobar2 een callbackhandler moeten geven, deze werkt net als een eventhandler en wordt pas aangeroepen wanneer de ontvangen data is verwerkt. Zodra dat gebeurt weet ik zeker dat de connecties is beindigd, en controleer ik of de thread moet worden onderbroken.

Ik heb alleen geen flauw idee hoe ik dat zou moeten implementeren.
Macros schreef op 08 juli 2004 @ 09:23:
Je moet je IO thread gewoon een var laten checken of hij moet stoppen ja of nee. Als je de methodes om die var te controleren en te setten syncronised maakt gaat alles nog goed ook. Dan kan je in een andere thread (de main thread) die var op false zetten (of true, whatever je kiest). En als je IO thread klaar is met de socket kijkt ie naar die var voordat ie verder gaat.

Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean continue = false;
syncronized public void stop(){
  continue = false;
}
syncronized public void resume(){
  // misschien wil je deze ook echt de thread laten starten...
  continue = true;
}

public void loopingIO(){
  resume();
  while(continue){
    //do my thing
  }
}
Die variable is nu toch gewoon isInterrupted()?

Dat syncronized klinkt wel interressant, daarmee zou ik dus tijdens de IO toch nog die methoden kunnen benaderen om die variable/ isInterrupted() in te stellen zodat ze niet geblockt worden?

Maar stop() mag je toch niet meer gebruiken omdat hij onveilig is?
frice schreef op 08 juli 2004 @ 10:05:
Een interrupt naar een thread sturen is eigenlijk bedoeld als een soort nood-mechanisme. Een elegantere manier is om een wait(..) te doen. Een notify() naar de thread sturen stopt dan de wait.
Dat zou dan als vervanging van sleep() moeten zijn? Maar er is eigenlijk niets waarop moet worden gewacht, om de zoveel tijd moet er een functie worden uitgevoerd, er is geen gebruikersinteractie, het enige moment waarop een thread/sleep wordt onderbroken is als het programma wordt afgesloten, wat (hopelijk) niet te vaak voor zal komen.
Als je wilt dat IO niet te lang duurt moet je een timeout zetten. Als je bv van een socket wilt lezen en niet wilt dat er langer dan drie seconden gewacht wordt op input, doe dan socket.setSoTimeout(3000); en hij throwt een SocketTimeoutException als het te lang duurt. Deze timout werkt bij een read() op een Socket en een accept() op een ServerSocket.
Dat heb ik ook gedaan, juist om die reden maakik gebruik van een socket en niet van URLConnection.
Verwijderd schreef op 08 juli 2004 @ 11:28:
Interrupt werkt niet op een thread die blocked staat in een IO request. (staat netjes in de API)

Het werkt alleen maar als je een IO request doet op een interruptable channel of als de thread blocked staat in een Selector.select()

Maar als je dus gewoon een read doet van een inputstream (zonder channels) en je roept interrupt aan... dan gebeurd er dus niets.
Dat hij geblocked wordt was ik ook al achter. Maar wat ik uit de documentatie van InterruptibleChannel begrijp is dat het nogal hardhandig de verbinding verbreekt en een ClosedByInterruptException gooit, dat lijkt me niet een erg elegante manier om een programma af te sluiten.

Aan de inhoud van de bovenstaande tekst kunnen geen rechten worden ontleend, tenzij dit expliciet in dit bericht is verwoord.


  • Macros
  • Registratie: Februari 2000
  • Laatst online: 30-04 09:28

Macros

I'm watching...

Johnny schreef op 08 juli 2004 @ 14:56:
Ik heb nergens synchronized gebruikt, dat impliceert dan toch dat het al asynchroom is?
Dat syncronized klinkt wel interressant, daarmee zou ik dus tijdens de IO toch nog die methoden kunnen benaderen om die variable/ isInterrupted() in te stellen zodat ze niet geblockt worden?

Maar stop() mag je toch niet meer gebruiken omdat hij onveilig is?
syncronized als keyword bij een methode betekend dat Java ervoor zorgt dat altijd maar 1 thread tegelijk in die methode komt.

Je mag de stop methode van Thread niet meer gebruiken, maar dit is een eigen stop. De methode die ik aangeef wordt ook uitgelegd in de Java documentatie als alternatief voor stop().

"Beauty is the ultimate defence against complexity." David Gelernter


  • Johnny
  • Registratie: December 2001
  • Laatst online: 22-05 10:01

Johnny

ondergewaardeerde internetguru

Topicstarter
Macros schreef op 08 juli 2004 @ 20:01:
[...]

syncronized als keyword bij een methode betekend dat Java ervoor zorgt dat altijd maar 1 thread tegelijk in die methode komt.

Je mag de stop methode van Thread niet meer gebruiken, maar dit is een eigen stop. De methode die ik aangeef wordt ook uitgelegd in de Java documentatie als alternatief voor stop().
Zo dus ongeveer? (hij stopt nog steeds niet)

Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class controlClass {

    Thread testThread;
    Foobar foobar = new Foobar();

    private void startThread() {
    testThread = new Thread(foobar);
    //zet de variable op true, en zorgt er voor dat deze thread toegang heeft tot de methode
    foobar.resume();
    //start het uitvoeren van de thread
    foobar.start();

private void stopThread() {
    testThread.stop();
    //foobar.stop(); werkt ook niet, de variable blijft op true staan.
}

}

class Foobar implements Runnable{

    private Thread myThread = Thread.currentThread();
    //variable, geen "continue" noemen, is een gereserveerd woord
    private boolean keepGoing = false;

  
    synchronized public void stop() {
    keepGoing = false;
    }
    
    synchronized public void resume() {
    // misschien wil je deze ook echt de thread laten starten...
        //dat doe ik nu toch?
    keepGoing = true;
    }

private void doedeIO() {
    //moet die hier?! Als ik hem weg haal maakt het ook weinig verschil
    resume();

    //geeft uiteraard altijd "true" terug
    System.out.println("keepGoing: " + keepGoing);


    while(dataInDB && keepGoing) {
       //allerlei IO
    }

}
    public void run() {
    
    try {
        while(keepGoing) {

        //slapen
        if(alleIOklaar) {
            int sleepTime = 3600;
            Thread.sleep(sleepTime);
        }
        }
    } catch(InterruptedException e){
        System.out.println("Thread Error: " + e);
    }
    }   

}

}

Aan de inhoud van de bovenstaande tekst kunnen geen rechten worden ontleend, tenzij dit expliciet in dit bericht is verwoord.


  • frice
  • Registratie: Januari 2001
  • Laatst online: 01-04 17:55
Hier een voorbeeldje dat je misschien verder helpt. Het start jouw 'werker' in een aparte thread, laat 'm een aantal seconden lopen en stopt hem dan weer.


Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ControlClass {
    boolean running;
    boolean stop;
    Thread runner;
    
    public ControlClass() {
        running = false;
    }
    
    /** Do 'some work' in a seperate thread until stop() is called.
      */
    public void start() {
        if( running) return;
        stop = false;
        
        runner = new Thread( new Runnable() {
            public void run() {
                while( !stop ) {
                    // do some work
                    System.out.println("did some work");
                    
                    // sleep for a while
                    synchronized( this ) {
                        try {
                            wait( 1000 );
                        } catch(Exception ignored) {
                        }
                    }
                }
            }
        });
        runner.start();
    }
    
    /** Stop the thread calling doSomeWork()
      * @param wait  When true blocks until the thread to stop has finished
      */
    public void stop(boolean wait) {
        stop = true;
        synchronized( runner ) { runner.notify(); } // in case it is waiting
        runner.interrupt(); // to interrupt IO (dirty!)

        if( wait ) {
            try { runner.join(); } catch(Exception ignored) {}
        }
    }

    /** Start the control class, let it do some work, then stop it again. */
    public static void main(String[] args) {
        ControlClass c = new ControlClass();

        c.start();
        try { Thread.sleep( 5*1000 ); } catch(Exception ignored) {}
        c.stop( /*wait=*/true );

        System.out.println("finished");
    }
}

[ Voor 2% gewijzigd door frice op 09-07-2004 14:37 . Reden: was syntax highlighting vergeten ]


  • Johnny
  • Registratie: December 2001
  • Laatst online: 22-05 10:01

Johnny

ondergewaardeerde internetguru

Topicstarter
Ik heb getracht om de I/O in een aparte klasse/thread te zetten om te voorkomen dat hij geblokkeerd raakt.

Het is een beter ontwerp omdat ik de I/O klasse nu ook ergens anders kan hergebruiken, of meerdere threads I/O requests tegelijkertijd zou kunnen doen.

Afbeeldingslocatie: http://people.zeelandnet.nl/hoogstrate/ontwerp.png

Ik heb echter het idee dat ik iets fout do met de I/O thread, dat hij niet gestart wordt, of er toch voor zorgt dat de parent-thread ook geblocked wordt tijdens een I/O operatie.

Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
class controlClass {

    Thread testThread;
    Foobar foobar = new Foobar();

    private void startThread() {
        testThread = new Thread(foobar);
        //zet de variable op true, en zorgt er voor dat deze thread toegang heeft tot de methode
        foobar.resume();
        //start het uitvoeren van de thread
        foobar.start();
    }

    private void stopThread() {
        testThread.stop();
        //foobar.stop(); werkt ook niet, de variable blijft op true staan.
    }

}

//********************************************************

class Foobar implements Runnable{

    private Thread myThread = Thread.currentThread();
    private Thread IOKlasseThread;
    private IOKlasse IOKlasse;

    //variable, geen "continue" noemen, is een gereserveerd woord
    private boolean keepGoing = false;

    //start bij de contructor
    public FooBar() {
        keepGoing = true;
    }

    synchronized public void stop() {
        keepGoing = false;
    }
 

    private void doedeIO() {

        while(dataInDB && keepGoing) {
           
            
            IOKlasse = new IOKlasse();
            IOKlasseThread = new Thread(IOKlasse);

            //start de IO thread
            IOKlasseThread.start();
            
            //haal ik de data nu uit de thread of uit de klasse?!
            Reader r = IOKlasse.visit(url);

            //verwerk data uit r
        }

    }

    public void run() {
    
        try {
            while(keepGoing) {

                //slapen
                if(alleIOklaar) {
                    int sleepTime = 3600;
                    Thread.sleep(sleepTime);
                }
            }
        } catch(InterruptedException e){
            System.out.println("Thread Error: " + e);
        }
    }   

}

//********************************************************

public class IOKlasse implements Runnable {
    
    private Thread myThread = Thread.currentThread();
        
    public VisitGalleryPage() {
    }
    
    //geeft een Reader object terug
    public Reader visit(URL pageURL) {
    
        //IO blabla

        Reader r = new InputStreamReader(response.getInputStream());
        
        return r;
    }

    //moet hier niks in?
    public void run() {
    }
}

Aan de inhoud van de bovenstaande tekst kunnen geen rechten worden ontleend, tenzij dit expliciet in dit bericht is verwoord.


  • Johnny
  • Registratie: December 2001
  • Laatst online: 22-05 10:01

Johnny

ondergewaardeerde internetguru

Topicstarter
Ik heb weer wat wijzigingen doorgevoerd. Het ontwerp werkt nu helemaal, behalve het stoppen.

Er zijn nu drie aparte threads. De derde (rechts) zou nu alleen geblocked mogen worden door de IO.

Tijdens de IO moet de tweede gewoon afwachten, en ondertussen nog wel beschikbaar zijn voor interrupt().

Wanneer de intterupt() is ontvangen wordt de data nog verwerkt en daarna stopt de thread.

Afbeeldingslocatie: http://people.zeelandnet.nl/hoogstrate/ontwerp2.png

Het enige probleem is volgens mij dat tijdens het wachten op getReader() de tweede thread ook geblocked wordt. Hoe voorkom ik dat?

Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
class controlClass {

    Thread testThread;
    Foobar foobar = new Foobar();

    private void startThread() {
        testThread = new Thread(foobar);
    
        //start het uitvoeren van de thread
        foobar.start();
    }

    private void stopThread() {
        testThread.interrupt();
    }

}

//********************************************************

class Foobar implements Runnable{

    private Thread myThread = Thread.currentThread();
    private Thread IOKlasseThread;
    private IOKlasse IOKlasse;


    private void doedeIO() {

        while(dataInDB && !myThread.isInterrupted()) {
           
            //maak het IO object met de URL
            IOKlasse = new IOKlasse(URL);
            IOKlasseThread = new Thread(IOKlasse);

            //start de IO thread
            IOKlasseThread.start();
            
            //haal de data uit de synchronized methode zodra de data beschikbaar is
            Reader r = IOKlasse.getReader();

            //verwerk data uit r

        }

    }

    public void run() {
    
        try {
            while(!myThread.isInterrupted()) {

                //slapen
                if(alleIOklaar) {
                    int sleepTime = 3600;
                    Thread.sleep(sleepTime);
                }
            }
        } catch(InterruptedException e){
            System.out.println("Thread Error: " + e);
        }
    }   

}

//********************************************************

public class IOKlasse implements Runnable {
    
    private Thread myThread = Thread.currentThread();
    private reader remoteReader;
    private URL pageURL;
        
    public VisitGalleryPage(URL pageURL) {
        //zet de te bezoeken URL zodra het object wordt gemaakt
        this.pageURL = pageURL;
    }
    
    //geeft een Reader object terug
    private URL pageURL Reader visit() {

        remoteReader = null;

        //haal de data op
        remoteReader = new InputStreamReader(response.getInputStream());
        
    }


//geeft de Reader terug zodra hij niet meer "null" is.
    public synchronized Reader getReader() {
    
        while(remoteReader == null) {
         
            try {
                wait();
            } catch (InterruptedException e) {
                System.out.println("InterruptedException: " + e);
            }
        }
    
    return remoteReader;
    }

    //start de IO zodra de thread wordt gestart
    public void run() {
        visit();
    }
}

Aan de inhoud van de bovenstaande tekst kunnen geen rechten worden ontleend, tenzij dit expliciet in dit bericht is verwoord.

Pagina: 1