Check alle échte Black Friday-deals Ook zo moe van nepaanbiedingen? Wij laten alleen échte deals zien

Java PriorityBlockingQueue OnPutListener/OnTakeListener

Pagina: 1
Acties:

  • redesign
  • Registratie: Juni 2001
  • Laatst online: 14-10-2021
Hallo,

Kort vraagje: bestaat er zoiets als een onPutListener en een onTakeListener in een PriorityBlockingQueue in java waarbij de onPutListener aangeroepen wordt op het moment dat iemand (= stukje code) iets nieuws toevoegt in de queue? En andersom: waarbij de onTakeListener aangeroepen wordt op het moment dat iemand (= stukje code) iets van de queue afhaalt?

In de documentatie kom ik dit soort gewenste functionaliteit niet tegen, of zoek ik naar de verkeerde spelt in de hooiberg, dat kan ook :+

Wat ik wil bereiken is dat ik twee queues gebruik die door 1 gemeenschappelijke thread (de dispatch thread) beheert worden, en waarbij andere threads (stuk of 10) zowel kunnen put-en en take-en zonder dat deze threads daarwerkelijk de queue beinvloeden (maw: NIET de berichten in de queue verwijderen, maar overlaten aan de dispatch thread zelf)

Wat ik zou willen is dat de queue en/of dispatch thread weet welke threads een bericht hebben geplaatst en ook welke threads een bericht van de queue hebben gepakt (eigenlijk hebben gekopieerd ipv eraf pakken). Op die manier weet ik wanneer een bericht uit de queue verwijderd kan worden, nadat de laatste thread heeft aangegeven: "ik heb dit bericht ook gelezen". De threads mogen niet bepalen wat van de queue afgehaald wordt, dat moet de dispatch thread doen.

Dus technisch gezien:

Zodra een thread iets op de queue zet: worden alle andere threads geinformeerd dat er een nieuw bericht op de queue is gezet. Zodra alle threads iets van de queue hebben afgehaald/gekopieerd, zal de dispatch thread ook daadwerkelijk het bericht van de queue verwijderen.

Let op: dit is GEEN event bus! Bij een event bus worden de events altijd afgeleverd bij de event-subscriber. In mijn geval is het andersom: de subscriber bepaalt zelf wanneer hij dit bericht van de queue afhaalt. Het kan dus ook zijn dat dit bericht door 9 van de 10 threads direct wordt opgehaald na een "seintje" te hebben gekregen dmv een onPutListener(), maar dat de 10de thread zoiets heeft van: ik haal dit bericht later wel van de queue, ik ben nu met iets anders bezig.

Bestaat er zoiets? Ik zoek nu op PriorityBlockingQueue, BlockingDeque, Event Listenener, Put/Take, maar kom niets op google/stackoverflow tegen dat ook maar in de buurt komt van wat ik wil.

Let wel: de threads dienen allemaal geinformeerd te worden wanneer er iets op de queue gezet wordt, zonder telkens te moeten pollen op de queue. Dus een soort van onClickListener, maar dan voor queues.

Dit zelf implementeren gaan me toch iets boven mijn petje, vooral omdat het gegarandeerd threadsafe moet zijn. Ik ben vast niet de eerste met zulke "queue-wensen".

Alvast bedankt!

  • pedorus
  • Registratie: Januari 2008
  • Niet online
Zo'n listener bestaat niet, maar PriorityBlockingQueue is niet final, dus die zou je zelf kunnen maken.

Maar eigenlijk snap ik niet wat je wil bereiken. Waarom niet gewoon elke thread zijn eigen event-queue geven? Waarom niet een afgehandeld-tellertje bijhouden per event (AtomicInteger) en bij bereiken tellertje een push naar de dispatch-thread queue doen?

Fundamenteel kun je niet een event plaats laten vinden in een andere thread zonder dat die thread iets pollt. Dat iets is dan vaak een event-queue, zoals bijna iedere desktop-applicatie die heeft.

Je huidige model gaat een enorme synchronisatie-overhead geven, waarbij ik me sterk af vraag of je dat wel nodig hebt.

Vitamine D tekorten in Nederland | Dodelijk coronaforum gesloten


  • redesign
  • Registratie: Juni 2001
  • Laatst online: 14-10-2021
pedorus schreef op dinsdag 19 november 2013 @ 21:19:
Zo'n listener bestaat niet, maar PriorityBlockingQueue is niet final, dus die zou je zelf kunnen maken.
Die route ga ik liever niet op, qua deadlocks en dat soort fratsen. Ik ken mijn beperkingen :)
Maar eigenlijk snap ik niet wat je wil bereiken. Waarom niet gewoon elke thread zijn eigen event-queue geven? Waarom niet een afgehandeld-tellertje bijhouden per event (AtomicInteger) en bij bereiken tellertje een push naar de dispatch-thread queue doen?
Bedoel je soms dat ik elke thread zijn eigen takeQueue moet geven waarin de dispatch thread een bericht plaatst wanneer deze voor die thread is bedoeld? De dispatch thread hoeft dan inderdaad niet meer bij te houden wie welk bericht heeft "afgehaald" uit de dispatch queue.
Ik kan natuurlijk ook iedere thread de putQueue van de dispatch thread meegeven, waarin elke thread zijn antwoord terug plempt richting de dispatch thread. Op die manier hoef ik ook niet bij te houden of elke subscribers het bericht hebben opgehaald. Het enige dat ik dan in deze oplossingsrichting nog mis is: hoe weet de dispatch thread wie welk type bericht wilt hebben, om die vervolgens in zijn eigen queue te plempen? Moet ik dan een soort van subscriberArray maken? Ik zou bijvoorbeeld elke thread die zich wil abonneren op de berichten van de dispatch thread eerst een bericht in de putQueue van de dispatch thread plaatsen dat aangeeft "abonneer mij op deze berichten: P, X en Z". Dan weet de dispatch thread welke takeQueues hij moet beantwoorden als hij de berichten moet verdelen. Zo ook met "unsubscribe".
Fundamenteel kun je niet een event plaats laten vinden in een andere thread zonder dat die thread iets pollt. Dat iets is dan vaak een event-queue, zoals bijna iedere desktop-applicatie die heeft.
Dat zou dus (kunnen) betekenen dat ik eigenlijk in elke thread, die berichten uitwisselt met de dispatch thread, twee threads moet hebben voor IO werkzaamheden (lezen en schrijven naar blocking IO, zoals een BluetoothSocket)? Het nadeel is een beetje dat een read() of write() op een socket blocked kan zijn, of foutmeldingen bevat. Dus terwijl de foutafhandeling plaatsvind van een write(), kan ik geen berichten inlezen met read() en deze vervolgens in de queue zetten. Ik ben op zoek naar een mechanisme dat mij in staat stelt te lezen en schrijven naar zoeen socket waarbij de berichten over en weer gestuurd worden over meerdere threads (vandaar ook de dispatch thread).
Je huidige model gaat een enorme synchronisatie-overhead geven, waarbij ik me sterk af vraag of je dat wel nodig hebt.
Dat vraag ik mij dus ook af. Ik probeer zo min mogelijk synchronized methoden te hebben, en gebruik makende van een PriorityBlockingQueue voorkomt dit in mijn geval.

Volgens mij zoek ik mijn oplossing in de verkeerde hooiberg. Het kan/moet simpeler kunnen dunkt mij.

EDIT: mmh, ik lees net in de javadoc dat de read() en write() van een BluetoothSocket (InputStream/OutputStream) helemaal niet blocked zijn. Ik was in de war met close() en connect().
Het dient volgens mij wel de aanbeveling apart exception handling voor read() en write() toe te passen, zodat write() wel kan plaatsvinden als een read() problemen zou geven.
Maw: read() en write() kunnen in één while(true) lus door de queue te pollen voor een write() en de hardware te pollen voor een read(). Als de read() iets oplevert, dan put-en in de queue. Als de queue een bericht bevat, dan take-en en een write() toepassen. Toch?

[ Voor 9% gewijzigd door redesign op 19-11-2013 22:17 ]


  • pedorus
  • Registratie: Januari 2008
  • Niet online
Over dat blocking io, klopt in combinatie met available(), zie het antwoord bij http://stackoverflow.com/...call-if-x-time-has-passed

Over subscriben: lijkt me prima, al weet ik nog steeds niet precies wat je probeert te doen. Wel zou ik een dispatcher wat meer los zien van de thread waarin deze werkt. Je kunt meestal prima direct berichten naar andere threads sturen vanuit de thread waar het bericht vandaan komt. Een dispatcher object zou dat prima kunnen afhandelen vanuit de thread waar deze wordt aangeroepen.

Vitamine D tekorten in Nederland | Dodelijk coronaforum gesloten


  • redesign
  • Registratie: Juni 2001
  • Laatst online: 14-10-2021
pedorus schreef op dinsdag 19 november 2013 @ 22:46:
Over dat blocking io, klopt in combinatie met available(), zie het antwoord bij http://stackoverflow.com/...call-if-x-time-has-passed
Interessante post bij SO. Die had ik niet zelf gevonden. Ik moet inderdaad na 15 seconden volgens specs kappen met de socket uit te lezen als er niks binnenkomt, dus dit SO-topic komt zeker van pas.
Over subscriben: lijkt me prima, al weet ik nog steeds niet precies wat je probeert te doen.
Dat vraag ik mijzelf ook vaak af 8)7 |:(
Technisch gezien: managen van meerdere Bluetooth verbindingen tegelijkertijd die onderling ook nog eens met elkaar in verbinding staan, maar waarbij ieder apparaat zijn eigen autonomiteit heeft/behoudt (maw: zelfstandig in het bepalen wat elk apparaat wilt doen).
Wel zou ik een dispatcher wat meer los zien van de thread waarin deze werkt. Je kunt meestal prima direct berichten naar andere threads sturen vanuit de thread waar het bericht vandaan komt. Een dispatcher object zou dat prima kunnen afhandelen vanuit de thread waar deze wordt aangeroepen.
Op zich klinkt dit handig voor 1 of misschien 2 Bluetooth verbindingen, maar niet voor 3 en meer. Dan heb ik een "regelaar" nodig die alles grondig aanstuurt anders wordt het een zooitje, zowel qua berichtgevingen aan elkaar alsook java code (spagetti code). Ik wil eigenlijk helemaal geen onderlinge relaties tussen de verschillende threads, tenzij deze bij elkaar horen, zoals een time-out timer voor het bijhouden hoelang er al geen berichten meer zijn ontvangen. Dat hoort bij elkaar. Maar een server thread, connectie opbouw thread, connectie thread zelf en berichten-verdeler thread horen wel met elkaar samen te werken, maar dan wel decoupled. Dat wil ik o.a. dmv queues bewerkstelligen. Als ik een verbinding opbouw met een extern apparaat, en dat is gelukt (geldige BluetoothSocket ontvangen na een blocking call) dan roep ik niet direct een andere thread aan om met deze socket een verbinding op te bouwen, maar "post" ik deze socket of een queue (weet nog niet precies hoe) zodat de dispatcher/regelaar thread dan weet: aha, dus jij wilt dat ik voor jou een verbinding ga beheren, prima doe ik voor je! Zoiets dus.
De connectie opbouw thread kan dan ophouden te bestaan, want verbinding is gelegd. Dat een andere thread het stokje overneemt is meer dan logisch.

Voordeel is: de dispatcher weet precies wat iedereen aan het doen is, wat de noden van anderen zijn en wie-met-wie aan het communiceren is. Dat weet anders niemand (meer) nadat je rechtstreeks verbindingen opbouwt. Ik heb ergens een "gemeenschappelijke basis administratie" nodig om dat te bewerkstelligen. Daarnaast moet ik een dispatcher hebben omdat ik geen 1-op-1 communicatie kan gebruiken, want meerdere apparaten kunnen (maar hoeft niet) met elkaar in verbinding treden (op ieders eigen initiatief). Eigenlijk een "zoek het verder maar lekker zelf uit verbindingen beheer thread" :-)

Door met queues te werken ontkoppel ik hardcoded java aanroepen van de ene naar de andere thread, hierdoor hoef ik dus het synchronized keyword niet meer te gebruiken. Synchronized is onnodig dunkt mij. Nu hoef ik nieteens een lock of atomic integer te gebruiken: niemand die in elkaars vaarwater kan komen.

Corrigeer me aub als ik ongelijk heb _/-\o_