[Java] WorkerManager met Workers en externe controle

Pagina: 1
Acties:

Acties:
  • 0 Henk 'm!

  • Spockz
  • Registratie: Augustus 2003
  • Laatst online: 10:08

Spockz

Live and Let Live

Topicstarter
Ik zit nu al een paar dagen met een irritant probleem. De situatie is als volgt: Er is een aantal Worker threads die berekeningen uitvoeren. Deze worden beheerd door de WorkerManager. De simulatie (want daar is het voor) gebeurd in rondes. Er zijn een aantal berekeningen die uitgevoerd moeten worden (gesimuleerd door reqReports). Als deze berekeningen 'op' zijn moeten de Workers wachten totdat er weer nieuwe berekeningen zijn. De nieuwe berekeningen worden weer klaargezet door de method WorkerManager.step. Deze step functie wordt extern, door de simulatieclass aangeroepen worden. Dit mag echter pas gebeuren zodra alle Workers klaar zijn met hun werk c.q. al het werk voor de ronde gedaan is.

Het eerste gedeelte, het laten wachten van de Workers todat er weer werk is, krijg ik voorelkaar. Het laten wachten van de simulatie op het voltooien van al het werk echter niet. Hieronder heb ik twee sets code, een waarbij er een sleep is ingebouwd voor elke step om, erg karig, de block te simuleren. Wat ik al gedaan heb om de steps te blocken:
  • Het gebruik van de eigen monitor om te wachten, dit werkt natuurlijk niet aangezien die ook gebruikt wordt om te wachten bij het dispensen.
  • Het toevoegen van een extra monitor, binnen de Manager, waarop gewait wordt als een step wordt aangeroepen, en waar dan een notify of notifyall (beiden geprobeerd) naar geroepen wordt op het moment dat de laatste Worker zijn werk inlevert.
  • Wat nachtelijk gepruts met semaphoren
Bovenstaande manieren leverden of deadlock op, of hadden geen effect... Hopelijk hebben jullie nog wat ideeën. In de code is het gebruik van de extra monitoren uitgecommentarieerd om aan te geven hoe ik het o.a. geprobeerd heb.

Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Simulation extends Thread {
    private WorkerManager wm;
    private boolean run;
    public Monitor stepMonitor; 
    
    public Simulation() {
        wm = new WorkerManager(this);
        run = true;
        stepMonitor = new Monitor();
    }
    
    public void run() {
        System.out.println("Starting simulation");
        wm.firstStep();
        System.out.println("Done with first step");
        while(run) {
            try { sleep(10000); } 
            catch (InterruptedException e) { e.printStackTrace(); }
//          stepMonitor.wacht();
            wm.step();
        }
        System.out.println("Ending simulation");
    }
}


Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class WorkerManager {
//  private Simulation simulation;
    private volatile int reqReports;
    private LinkedList<Worker> workers;
    
    public WorkerManager(Simulation aSimulation) {
        workers = new LinkedList<Worker>();
        int workerCount = 2;
        for (int i=0; i<workerCount; i++) {
            workers.add(new Worker(this));
        }
//      simulation = aSimulation;
        reqReports = 200;
    }
    
    public synchronized List<Object> dispense() {
        System.out.println("Dispensing");
        if (reqReports <= 0) {
            try {
                System.out.println("Waiting on new resources");
                wait();
            } catch (InterruptedException e) { e.printStackTrace(); }
        }
        
        List<Object> result = new LinkedList<Object>();
        
        return result;
    }
    
    public synchronized void reportIn(List<Object> list) {
        System.out.println(""+reqReports);
        --reqReports;
//      if (reqReports==0) {
//          System.out.println("Reached end of round. Notifying for next step.");
//          simulation.stepMonitor.verwittigAllen();
//      }
    }
    
    public synchronized void step() {
        System.out.println("Stepping");
        // do stuff
//      if (reqReports == 0) {
//          try { simulation.stepMonitor.wacht(); }
//          catch (InterruptedException e) { e.printStackTrace(); }
//      }
        
        
        // Reset conditions
        reqReports = 200;
        // Vertel de Workers dat ze weer verder kunnen
        notifyAll();
    }

    public synchronized void firstStep() {
        for (int i=0; i<workers.size(); i++) {
            workers.get(i).start();
        }
    }
}


Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class Worker extends Thread{
    private WorkerManager wm;
    
    public Worker(WorkerManager aWM) {
        wm = aWM;
    }
    
    public void run() {
        while (true) {
            List<Object> list = wm.dispense();
            wm.reportIn(list);
        }
    }
}


Deze monitor klasse is zodat ik niet zelf bij elke aanroep een lock op hoef te vragen.
Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Monitor {
    public synchronized void wacht() throws InterruptedException {
        wait();
    }
    
    public synchronized void verwittig() {
        notify();
    }
    
    public synchronized void verwittigAllen() {
        notifyAll();
    }
}


Ik hoop dat jullie kunnen zeggen of ik ergens een grote denkfout maak, of gewoon in compleet de verkeerde richting zit te denken en dat er een veel makkelijkere, robustere oplossing is. Want ik zie het even niet meer. :?

C'est le ton qui fait la musique. | Blog | @linkedin
R8 | 18-55 IS | 50mm 1.8 2 | 70-200 2.8 APO EX HSM | 85 1.8


Acties:
  • 0 Henk 'm!

  • bomberboy
  • Registratie: Mei 2007
  • Laatst online: 18:26

bomberboy

BOEM!

Spockz schreef op maandag 04 mei 2009 @ 21:23:
Ik hoop dat jullie kunnen zeggen of ik ergens een grote denkfout maak, of gewoon in compleet de verkeerde richting zit te denken en dat er een veel makkelijkere, robustere oplossing is. Want ik zie het even niet meer. :?
Java heeft heel wat ingebouwde functionaliteit om deze zaken elegant op te lossen zodat je zelf niet met wait en notify moet spelen. Tenzij je dit doet om de eigenlijke werking van deze zaken te leren gebruik je dus beter deze functionaliteit. (Of het zou moeten zijn dat een java versie < 5.0 vereist is voor je project)

Een aantal zaken die je vast kan gebruiken:
http://java.sun.com/javas...urrent/CyclicBarrier.html
http://java.sun.com/javas...urrent/BlockingQueue.html
http://java.sun.com/javas...rent/ExecutorService.html

Kijk vooral eens naar alles wat in het package java.util.concurrent en kinderen zit. Een samenvattend artikeltje kan je vinden op http://java.sun.com/devel...rticles/J2SE/concurrency/

Meer gedetailleerde tutorials over de zaken die je wil gebruiken zijn er vast ook te vinden.

Acties:
  • 0 Henk 'm!

  • Spockz
  • Registratie: Augustus 2003
  • Laatst online: 10:08

Spockz

Live and Let Live

Topicstarter
Java heeft heel wat ingebouwde functionaliteit om deze zaken elegant op te lossen zodat je zelf niet met wait en notify moet spelen. Tenzij je dit doet om de eigenlijke werking van deze zaken te leren gebruik je dus beter deze functionaliteit. (Of het zou moeten zijn dat een java versie < 5.0 vereist is voor je project)
Het is inderdaad de bedoeling om de werking te leren. Het gebruik van libs waar het dus al in verwerkt zit is dan niet echt handig.

Al lijken mij de CyclicBarrier en de CountDownLatch wel handig om te gebruiken. Maar toch zou ik liever zien hoe het wel moet via de meer low-level manier. Dat geeft toch altijd een beter inzicht in hoe het geheel werkt. (Als je het goed doet that is. :P)

C'est le ton qui fait la musique. | Blog | @linkedin
R8 | 18-55 IS | 50mm 1.8 2 | 70-200 2.8 APO EX HSM | 85 1.8


Acties:
  • 0 Henk 'm!

  • Mr_Light
  • Registratie: Maart 2006
  • Niet online

Mr_Light

Zo-i-Zo de gekste.

Java:
1
catch (InterruptedException e) { e.printStackTrace(); } 

By een InterruptedException wil je de interrupted status opnieuw setten(zie google) of zorg er voor dat onafhankelijk dat hoe je code word uitgevoerd dat je thread dood gaat. De enkele uitzonderingen herken overduidelijk als je ze nog hebt.

synchronized methoden tja ik gebruik ze nooit(ik maak altijd apart mutex object:)
Java:
1
2
3
Object mutex = new Object();
...
synchronized(mutex) { }

Als je ze gebruikt gebruik ze iig niet in het wilde weg - en wees ook zeker dat je dat stuk van de 'implementatie' van je klasse openbaar wilt maken.

Misschien is je code incompleet en is er nog een methode om run daadwerkelijk op false te zetten - ook al is die er gaat het niet werken. En wel omdat run niet volatile is. Goede kans dat de compiler while(run) {} optimaliseert naar while(true) {} en ja dat is legaal als run niet volatile is. (je kan ook AtomicBoolean gebruiken)

Ook wil ik je aanraden het enkele verwoordelijkheidsprincipe toe te passen(Do one thing and do it well)
of de klasse houd bij hoeveel werk bij/secduled het werk of hij coördineert de threads.

Wat betreft je daadwerkelijke probleem volgens mij komen die allemaal omdat je worker(waarom engels/nederlands door elkaar eigenlijk?) klasse wat onhandig is dit komt omdat je alleen maar
ik kan er me vinger niet goed op leggen.

Beetje laat op de avond dus vergeef me de bugs:
Java:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class Worker implements Runnable {

  private volatile Thread thread;
  private volatile Work currentWork;

  public void run() {
    if(thread != Thread.currentThread()) {
      throw new IllegalStateException("Run should not be called externally");
    }

    try {
      while(true) {
        synchronized(thread) {
          thread.wait();
        
          if(currentWork!=null && !currentWork.isFinished()) {
            currentWork.execute();
            currentWork.assertFinishedExecuting();
            /*
             * Als execute goed is geschreven
             * ala
             * try { eenAbstracteMethode() } finally { finishedExecuting = true }
             * + subclasses van Work heb je deze laste call niet nodig (maar
             * goed geschreven code is nooit een zekerheid het is beter om
             * onszelf te beschermen.)
             */
            currentWork = null;
          }
        }
      }
    } catch (InterruptedException e) {
      thread.interrupt();
    }
  }

  public void execute(Work work) {
    assertOpen();    
    synchronized(thread) {
      if(isBuzy()) throw new DontCallThisTillImFinishedException();
      currentWork = work;    
      thread.notify();
    }
  }

  public boolean isBuzy() {
    return currentWork != null;
  }
  
  /** makes sure that this worker is open for bisness. */
  protected void assertOpen() {
    if(thread==null) {
      thread = new Thread(this);
    }
  }

  public void close() throws InterruptedException {
    thread.interrupt();
    thread.join();
    thread = null;
  }
}


Een counter zou wel moeten werken - wat je er instopt zou er ook weer uit moeten komen...

Ik zou gewoon een collectie Work pakken - als je event-driven wilt een WorkListener maken:
Java:
1
2
3
4
5
6
7
8
9
10
11
12
listener = new WorkListener() {
  public void onFinished(Work work) {
    step.remove(work);
    availableWorkers.push(buzyWorkers.remove(work));
    if(step.isEmpty()) fireStepIsFinished();
  }
}
...
work.addListener(listener);
freeWorker = availableWorkers.pop();
buzyWorkers.put(work, freeWorker);
freeWorker.execute(work);
pollen kan ook via work.isFinished() resp worker.isBuzy()

IceManX schreef: sowieso


Acties:
  • 0 Henk 'm!

  • Hydra
  • Registratie: September 2000
  • Laatst online: 21-08 17:09
Spockz schreef op maandag 04 mei 2009 @ 22:44:
Het is inderdaad de bedoeling om de werking te leren. Het gebruik van libs waar het dus al in verwerkt zit is dan niet echt handig.
Tijdens mijn studie was er toen we met het vak Concurrent Programming begonnen nog geen Java 1.5, en als die er wel was geweest hadden we de BlockingQueue ook niet mogen gebruiken waarschijnlijk, maar het is een concept dat vrij simpel zelf te implementeren is (was ook een onderdeel van het vak). Message queues zijn IMHO een van de beste manieren om concurrency problemen zoals deadlocks te voorkomen. Waar je erg mee moet oppassen zijn verschillende threads die mekaar gaan benaderen, dat is een deadlock waiting to happen, zoals je zelf ondervonden hebt. Als ik kijk naar je opdracht, zijn message queues volgens mij zeker "the way to go".

- Je hebt 2 threads, workers en managers.
- De worker thread voert de simulatie uit
- De manager thread zet werk klaar, en verwerkt resultaten

Hoe je het volgens mij het beste op kunt zetten is een verzameling met X worker threads, een manager thread, een input queue en een output queue. De manager stop N (N is minimaal het aantal threads dat je hebt, maar kan veel meer zijn) brokken simulatiewerk in de input queue, de worker threads wachten op de input queue. Een goeie blocking queue laat 1 thread een work object uit de queue grijpen. Terwijl de worker threads aan de gang gaan, gaat de manager thread wachten op de output queue. Elke keer als een worker klaar is, stop 'ie het simulatie resultaat in de output, en kijkt weer in de input of het volgende item klaar staat. De manager wacht op de output, en verwerkt de simulatie resultaten.

Dit heeft als voordeel dat het werk in kleine logische stappen opgedeeld is. De worders doen niks anders dan dit:
- Pak item uit input
- Draai simulatie
- Stop item in output

De manager doet niks anders dan:
- Stop N items in input
- Lees N items uit output

Op geen enkel moment is het nodig dat de managerthread the workerthreads benaderd. Met deze opzet kan er, tenzij er in ed simulatie op resources gewacht wordt, geen deadlock optreden.

https://niels.nu


Acties:
  • 0 Henk 'm!

  • Spockz
  • Registratie: Augustus 2003
  • Laatst online: 10:08

Spockz

Live and Let Live

Topicstarter
Ik zal eerst even op de makkelijke dingen reageren. :)
waarom engels/nederlands door elkaar eigenlijk?
Dat komt omdat ik zelf niet meer in de gaten heb wanneer ik engels of nederlands schrijf. In het geval van de monitor is het zo omdat je de wait, notfify, en notifyAll niet mag overloaden. Vandaar dat ik ze een andere naam heb gegeven om ze synchronized te maken.
Beetje laat op de avond dus vergeef me de bugs:
Wat is eigenlijk de reden dat je per worker ook weer een aparte thread bijhoudt? Om die thread makkelijker te kunnen beheren? Maar op deze manier heb je wel twee keer meer threads.

Ik zie niet helemaal wat je probeert te bereiken in je eerste codeblock.
- Je hebt 2 threads, workers en managers.
- De worker thread voert de simulatie uit
- De manager thread zet werk klaar, en verwerkt resultaten
Dit is eigenlijk ook zoals ik het voor ogen had. De input queue wordt gesimuleerd door de dispense() method. En de reportIn() method simuleert de input queue.

Het lastige is nu dat ik het uitdelen van de items in rondes moet verdelen. Op het moment heb ik het zo bedacht dat een externe partij (de simulatie) elke keer moet vertellen dat er een nieuwe ronde mag plaatsvinden omdat het mij dan makkelijker leek om de simulatie te laten pauzeren na een ronde. Maar als ik er nu beter over nadenk kan ik dat 'steppen' beter laten plaatsvinden op het moment dat de output queue vol is.
In de step kan ik dan de inputqueue weer vullen en alle andere administratie bijhouden. Zoals bijvoorbeeld het slopen van threads of het extra aanmaken van threads.

C'est le ton qui fait la musique. | Blog | @linkedin
R8 | 18-55 IS | 50mm 1.8 2 | 70-200 2.8 APO EX HSM | 85 1.8


Acties:
  • 0 Henk 'm!

  • Hydra
  • Registratie: September 2000
  • Laatst online: 21-08 17:09
Je moet die queues niet simuleren, je moet of een BlockingQueue gebruiken, of zelf een Queue class maken.

https://niels.nu


Acties:
  • 0 Henk 'm!

  • Spockz
  • Registratie: Augustus 2003
  • Laatst online: 10:08

Spockz

Live and Let Live

Topicstarter
Dat heb ik nu inderdaad ook gedaan en het werkt als een trein. :) En het steppen wordt aangeroepen zodra de outputQueue `vol' zit. Dus dat gebeurt ook niet meer door de buitenwereld.
Maar wat ik nog wel het meest frappante vind: ik kan nu Workers aanmaken en laten verwijden. Workers melden zichzelf aan bij de WorkerManager als ze gestart worden, en als ze doodgaan dan melden ze zich ook weer af. Het ophogen van het aantal threads gaat prima. Maar als ik dan weer terugga naar een lager getal, dan worden eerst netjes alle overtollige threads afgesloten maar na een poosje dan komt er opeens een extra thread bij. En deze heeft zich niet aangemeld bij de WorkerManager. Nu kan ik natuurlijk wel bij het uitdelen checken of de thread die wat wil hebben ook in de queue zit en hem anders de mond snoeren, maar dat is meer een pleister dan een oplossing vind ik. :(

C'est le ton qui fait la musique. | Blog | @linkedin
R8 | 18-55 IS | 50mm 1.8 2 | 70-200 2.8 APO EX HSM | 85 1.8


Acties:
  • 0 Henk 'm!

  • Mr_Light
  • Registratie: Maart 2006
  • Niet online

Mr_Light

Zo-i-Zo de gekste.

Spockz schreef op dinsdag 05 mei 2009 @ 11:20:
[...]

Dat komt omdat ik zelf niet meer in de gaten heb wanneer ik engels of nederlands schrijf.
Bekend probleem - daarom doe ik ook eigenlijk helemaal niets meer in het nederlands
Wat is eigenlijk de reden dat je per worker ook weer een aparte thread bijhoudt? Om die thread makkelijker te kunnen beheren? Maar op deze manier heb je wel twee keer meer threads.
Er is maar een thread hoor bij de worker als je runnable implementeert word er niet magisch een thread gestart(ongeacht wat sommige docenten ook mogen beweren).

Sterker nog - ik zie nu dat die worker helemaal 'geen' thread heeft... Ik roep nergens start() aan. _O-
Er moet onder thread = new Thread); nog thread.start() aangeroepen worden. Het voordeel van een privateprotected field Thread (behalve het composition vs inheritence gezeurverhaal) is dat je dus voorkomt dat iemand anders het thread kan starten of interrupten of ... etc. Voorkom misbruik ;)

Om op je andere vraag terug te komen soortgelijk verhaal gaat op voor synchronised methods - omdat dit:
Java:
1
2
3
synchronised void foo() {

}

en
Java:
1
2
3
4
5
void foo() {
  synchronised(this) {

  }
}

Het 'zelfde' is betekend het dus dat je rekening moet houden met dat anderen dus ook dit kunnen doen
Java:
1
2
3
4
objectMetSyncronisedMethods = new ObjectMetSyncronisedMethods();
synchronised(objectMetSyncronisedMethods) {

}

Waarbij er dus op de zelfde monitor er word gesynchronised wat weer tot deathlocks kan leiden bij mis-/ge-bruik.
Daarnaast zul je situaties tegen komen waarbij je meerdere monitoren nodig hebt. Het word er mijn inziens dan allemaal ook niet duidelijker op(voor welke taken gebruik je de monitor van het object zelf voor welke taken die monitor) Mocht het zinnig zijn voor andere objecten om die monitoren te gebruiken dan kan je altijd een getter toevoegen en meteen daarop documenteren waarvoor de monitor dient.
Ik zie niet helemaal wat je probeert te bereiken in je eerste codeblock.
zie bovenstaand - ik weet niet of er iemand een sluitende argumentatie om het zo te doen, ik heb hem niet iig niet paraat dus gooi het maar op persoonlijke voorkeur. In de praktijk zie synchronised methoden bijna niet en volgens mij is het voor java api developers ook voorschrift.
// edit het ook de regel dat je de locks zo kort mogelijk moet vast houden(maar niet korter ;)) meer controle over welke regels er wel en niet binnen het syncronized block vallen - meer controle is hierbij handig.
Het lastige is nu dat ik het uitdelen van de items in rondes moet verdelen. Op het moment heb ik het zo bedacht dat een externe partij (de simulatie) elke keer moet vertellen dat er een nieuwe ronde mag plaatsvinden omdat het mij dan makkelijker leek om de simulatie te laten pauzeren na een ronde. Maar als ik er nu beter over nadenk kan ik dat 'steppen' beter laten plaatsvinden op het moment dat de output queue vol is.
Klinkt prima.
Hydra schreef op dinsdag 05 mei 2009 @ 11:55:
Je moet die queues niet simuleren, je moet of een BlockingQueue gebruiken, of zelf een Queue class maken.
Wait/notify kan je inderdaad verplaatsen naar/overlaten aan een blocking queue - je geeft dan wel loose coupling op met je WorkManager - met andere (die ik dus poste) kan je hem ook nog op zich staand gebruiker als je maar een worker nodig hebt.
In het geval van de monitor is het zo omdat je de wait, notfify, en notifyAll niet mag overloaden. Vandaar dat ik ze een andere naam heb gegeven om ze synchronized te maken.
Persoonlijk vind ik het wel prettig dat die niet synchronized zijn dan word ik er aan herinnerend dat ik er zelf een block omheen moet doen dit ook omdat wait notify etc. bijna nooit op zichzelf gebruik en er dus nog vaak regels bij moet betrekken. Ik vraag me af hoe de happens before in jouw situatie werkt - moet ik eens na kijken.

[ Voor 13% gewijzigd door Mr_Light op 05-05-2009 18:02 ]

IceManX schreef: sowieso

Pagina: 1