[Python 2.6] Optimalisatie van subprocess in multiprocessing

Pagina: 1
Acties:

Onderwerpen


Acties:
  • 0 Henk 'm!

  • Nextron
  • Registratie: Maart 2005
  • Nu online

Nextron

Ik weet dat ik niets weet

Topicstarter
Ik heb een bestaande applicatie die een lijst data moet verwerken. De applicatie is singlethreaded en de verwachte verwerkingstijd ligt op enkele weken. Elke dataregel is onafhankelijk van de anderen, dus de verwerking is te paralleliseren. Verder is er een linuxserver beschikbaar met tientalle cores.
Er is echter veel verschil in verwerkingstijd per dataregel.

Met een kleine testdataset heb ik een script gemaakt dat de lijst verspreid opdeelt en de applicatie via multiprocessing parallel opstart met elk een eigen dataset. Het probleem dat zich voordeed is dat na een half uur de helft van de workers al klaar waren terwijl een uur daarna de laatste pas klaar was. Dit is dus geen optimale inzet van de rekencapaciteit.

Vervolgens heb ik alle data in een queue gezet die per dataregel aan een worker wordt uitgedeeld via subprocess.communicate(dataregel). Hiermee blijven tot het eind van de queue alle workers actief. In deze situatie vraagt de worker data uit de queue, start de applicatie op, wacht tot de applicatie afsluit en de data verwerkt is, en herhaalt dit process.
Het probleem is dat het duizenden keren opstarten van de applicatie meer tijd kost dan het actief houden van alle cores oplevert. |:(

Door de data aan te leveren via .stdin.write(dataregel) wordt volgens mij geen EOF meegestuurd en zal de applicatie actief blijven. Alleen heb ik geen idee hoe ik te weten kom wanneer het verwerken van de data klaar is en een nieuwe dataregel kan worden aangeleverd.

De applicatie geeft wel stdout wanneer verwerking van de data klaar is, maar er kunnen afhankelijk van de dataregel meerdere iteraties nodig zijn die automatisch worden herhaald. Wanneer de output verschijnt is voor mij dus niet duidelijk of de applicatie bezig is met een tweede ronde of wacht op nieuwe input.

Verder zal het bepalen of de applicatie klaar is sneller moeten gebeuren dan het herstarten ervan (dat enkele seconde duurt), om tijd te winnen.

Valt hier nog wat aan te redden of moet ik de eerste methode gebruiken en hopen dat de grootte van de complete lijst ervoor zorgt dat de verwerkingstijden uitmiddelen?

Hoe meer ik weet,
hoe meer ik weet,
dat ik niets weet.


Acties:
  • 0 Henk 'm!

  • ValHallASW
  • Registratie: Februari 2003
  • Niet online
Nextron schreef op vrijdag 09 maart 2012 @ 13:25:
Elke dataregel is onafhankelijk van de anderen, dus de verwerking is te paralleliseren. Verder is er een linuxserver beschikbaar met tientalle cores.
Er is echter veel verschil in verwerkingstijd per dataregel.
Met een kleine testdataset heb ik een script gemaakt dat de lijst verspreid opdeelt en de applicatie via multiprocessing parallel opstart met elk een eigen dataset.
Er van uitgaande dat je hier de multiprocessing-module bedoelt: waarom zet je een hele dataset in je queue en niet de losse regels?
Vervolgens heb ik alle data in een queue gezet die per dataregel aan een worker wordt uitgedeeld via subprocess.communicate(dataregel).
Waar komt de subprocess-module opeens vandaan...?


Overall is je verhaal een beetje warrig, /en/ je post geen code, dus dat maakt het lastig to-the-point antwoorden te geven.

Acties:
  • 0 Henk 'm!

  • Soultaker
  • Registratie: September 2000
  • Laatst online: 19:25
Voor de "echte" oplossing moet je zorgen dat je verwerkingsprogramma elke invoerregel direct verwerkt en daar wat feedback van geeft (door een regel uitvoer te genereren bijvoorbeeld).

Als je dat niet kunt fixen (omdat je dat programma niet kan aanpassen) dan zou ik voor een hybride aanpak gaan, waarbij je elke keer (bijvoorbeeld) 1000 regels aan één proces voert. Daarmee behoud je het voordeel dat je het werk redelijk kunt verdelen over de beschikbare cores zonder dat je van te voren het werk op hoeft te delen, maar je verkleint wel de overhead (per regel) van het opstarten van die procesen.

Ik vraag me trouwens wel af wat voor gegevens je aan het verwerken bent waarbij je zowel véél verschillende data items hebt dit toch zulke uiteenlopende verwerkingsduren hebt dat simpelweg de invoer in gelijke delen opsplitsen niet goed genoeg werkt.

Kun je die verschillen in verwerkingsduur enigzins inschatten? Als je het werk van te voren beter verdeelt, kun je gewoon je oorspronkelijke (en relatief simpele) aanpak gebruiken.

Acties:
  • 0 Henk 'm!

  • epic007
  • Registratie: Februari 2004
  • Laatst online: 25-08 11:27
ValHallASW schreef op vrijdag 09 maart 2012 @ 14:05:
Er van uitgaande dat je hier de multiprocessing-module bedoelt: waarom zet je een hele dataset in je queue en niet de losse regels?
Ik denk dat hier meerdere processen worden opgestart via de subprocess module. Ik denk dat je inderdaad beter naar de multiprocessing module kan kijken (zie link van ValHallASW) om het geheel op te delen in threads ipv losse processen, hier heeft python goede support voor. Er hoeft dan maar 1 process (met meerdere threads) te worden opgestart.

Hier een voorbeeld..


[edit]
Met een kleine testdataset heb ik een script gemaakt dat de lijst verspreid opdeelt en de applicatie via multiprocessing parallel opstart met elk een eigen dataset. Het probleem dat zich voordeed is dat na een half uur de helft van de workers al klaar waren terwijl een uur daarna de laatste pas klaar was. Dit is dus geen optimale inzet van de rekencapaciteit.
Hmm het lijkt dat je multiprocessing al hebt gebruikt. Misschien is het beter om niet elke thread een eigen dataset te geven, maar vanuit de main thread één queue te vullen waar alle threads uit lezen. Zodra een item verwerkt is pakt de thread het volgende item.

[ Voor 47% gewijzigd door epic007 op 10-03-2012 21:29 ]


Acties:
  • 0 Henk 'm!

  • t_captain
  • Registratie: Juli 2007
  • Laatst online: 14-09 16:11
Als ik het goed begrijp, had je eerst een programma zonder threads of multiprocessing, was ongeveer het volgende deed:

code:
1
2
3
while (input):
   read 1 line fron stdin
   do processing


Nu wil je de rekenkracht van je multicore server gebruiken, dus ben je het programma gaan splitsen. Het lijkt erop alsof je het volgende hebt gedaan:

code:
1
2
3
start worker 1(input=stdin[0    : X])
start worker 2(input=stdin[X    : 2*X])
start worker 3(input=stdin[2*x : 3*X])


Het gaat mis in de chunking van de jobs: regels 0 t/m X kunnen een heel andere complexiteit hebben dan regels X t/m 2X.

Mijn aanpak zou zijn:

code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def ProcessingFunc(q_in, q_out):
   while True:
      job = q_in.get()
      result = process job
      q_out.put(result)

def Main():
   # init all workers
   q11 = Queue() # queue for upload of jobs to worker 1
   q12 = Queue() # queue for download of results from worker 1
   p1 = Process(target=ProcessingFunc, args=[q11, q12]) # process for worker 1

   q21 = Queue()
   q22 = Queue()
   p2 = Process(target=ProcessingFunc, args=[q21,q22])

   workers = [(p1, q11, q12), (p2, q21, q22)]

   # first job
   for (p, q_up, q_down) in workers:
      q.up.put(stdin.readline())

   # main loop
   while (input):
      select(rlist=[q12, q22]) # wait for the first queue to give a result
      get result and store it in the overall results
      send a new job to this worker


Zo stuur je de jobs stuk voor stuk naar de workers zodra deze vrij zijn.

[ Voor 77% gewijzigd door t_captain op 12-03-2012 08:57 ]


Acties:
  • 0 Henk 'm!

  • Nextron
  • Registratie: Maart 2005
  • Nu online

Nextron

Ik weet dat ik niets weet

Topicstarter
Ik heb dit weekend een grote testset laten draaien op ~40 cores met de methode van verdeelde lijsten. Vanmorgen waren nog twee workers actief en nu nog een. Uitmiddelen van de verwerkingstijden gebeurt dus onvoldoende.
ValHallASW schreef op vrijdag 09 maart 2012 @ 14:05:
Er van uitgaande dat je hier de multiprocessing-module bedoelt: waarom zet je een hele dataset in je queue en niet de losse regels?
Die module inderdaad. De complete dataset wordt in de queue geladen waarbij elke dataregel een entry vormt.
Waar komt de subprocess-module opeens vandaan...?
Ik heb de multiprocessing-module gebruikt om meerdere processes parallel op te starten. De processes die parallel moeten lopen is een extern script. Volgens mij is de subprocess-module wat je daar het beste voor kan gebruiken.
Overall is je verhaal een beetje warrig, /en/ je post geen code, dus dat maakt het lastig to-the-point antwoorden te geven.
De code stelt als samenraapsel van wat voorbeelden nog weinig voor:
Python:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import multiprocessing, subprocess, sys, os
def main() :
    #build queue
    queue=multiprocessing.JoinableQueue()
    for dataregel in sys.stdin :
        queue.put(dataregel)
    for i in range(CPU_CORES) :  
        queue.put('end')
    #queue.close()

    #start workers
    for i in range(CPU_CORES) :        
        proc=multiprocessing.Process(target=worker, name=str(i), args=(queue,))
        proc.start()

    #wait to finish
    queue.join()

#Spawn worker process
def worker(q) :
    i=multiprocessing.current_process().name
    for data in iter(q.get,'end') :
        process=subprocess.Popen(['python', '%s/script.py'%os.path.realpath(os.path.dirname(__file__)), '-i'%int(i)], stdin=subprocess.PIPE)
        process.communicate(data)
        q.task_done()
    q.task_done()

if __name__ == '__main__':
    main()
Soultaker schreef op vrijdag 09 maart 2012 @ 15:45:
Voor de "echte" oplossing moet je zorgen dat je verwerkingsprogramma elke invoerregel direct verwerkt en daar wat feedback van geeft (door een regel uitvoer te genereren bijvoorbeeld).
Daar duik ik eens in.
Als je dat niet kunt fixen (omdat je dat programma niet kan aanpassen) dan zou ik voor een hybride aanpak gaan, waarbij je elke keer (bijvoorbeeld) 1000 regels aan één proces voert. Daarmee behoud je het voordeel dat je het werk redelijk kunt verdelen over de beschikbare cores zonder dat je van te voren het werk op hoeft te delen, maar je verkleind wel de overhead (per regel) van het opstarten van die procesen.
De hybride-aanpak begon ik ook als oplossing te overwegen.
Ik vraag me trouwens wel af wat voor gegevens je aan het verwerken bent waarbij je zowel véél verschillende data items hebt dit toch zulke uiteenlopende verwerkingsduren hebt dat simpelweg de invoer in gelijke delen opsplitsen niet goed genoeg werkt.
Elke dataregel verwijst naar een 3D-model waarin bepaalde interacties berekend moeten worden. Soms is binnen enkele seconde bekend dat er geen relevante interacties zijn, soms een paar en in andere gevallen veel met veel rekenwerk.
t_captain schreef op maandag 12 maart 2012 @ 08:36:
Als ik het goed begrijp, had je eerst een programma zonder threads of multiprocessing, was ongeveer het volgende deed:

code:
1
2
3
while (input):
  read 1 line fron stdin
  do processing
Klopt, alleen zijn er verschillende varianten van het orginele, erg grote script. Een enkele 'wrapper' die universeel te gebruiken was leek me makkelijker dan de werkende scipts te modificeren.

Hoe meer ik weet,
hoe meer ik weet,
dat ik niets weet.


Acties:
  • 0 Henk 'm!

  • ValHallASW
  • Registratie: Februari 2003
  • Niet online
Zo te zien is je verwerking ook een pythonprogramma. Kan je in dat programma niet gewoon een main()-functie aanroepen? Dan kan je het instantieren van het nieuwe proces geheel overslaan, en ipv je subprocess.Popen gewoon een functieaanroep doen.

Verder: multiprocessing.Pool().map() neemt je heel veel werk uit handen -- maar dat is meer voor de volgende keer, want je hebt nu al iets werkends. -- de vergelijkbare code van wat je nu doet is dan ongeveer dit:

code:
1
2
3
4
5
6
7
8
9
10
11
12
import multiprocessing, subprocess, os

def worker(item) :
    process=subprocess.Popen(['python', '%s/script.py'%os.path.realpath(os.path.dirname(__file__)), '-i'%int(i)], stdin=subprocess.PIPE)
    return process.communicate(item)

def main():
    p = subprocess.Pool()
    results = p.map(sys.stdin)

if __name__=="__main__":
    main()
Pagina: 1