Ik ben bezig met het maken van een Python script dat meerdere Processen wat werk kan laten uitvoeren en de resultaten verzameld en naar op disk wegschrijft.
Ik gebruik dit voorbeeld http://eli.thegreenplace....sks-with-multiprocessing/
Met als enige aanpassing, dat in mijn code, de Queue.Get() ná de Process.join() gebeurt.
Fetcher.Run pseudo code
1) Wat ik niet zo goed begrijp uit het voorbeeld, is waarom er een Process.Join() gedaan word, als er ook al een Queue.Get() gedaan word. .Get() is een blocking call die wacht tot er een item in de Queue zit.
2) Hoe kan ik (netjes) opvangen dat de Queue niet genoeg resultaten bevat als 1 van mijn Processen door een timeout is afgeschoten?
3) Ik krijg een beetje het idee dat Process.Join() en Queue.Get() mutualy exclusive zijn... Tenzij je meerdere .Put's in 1 run zou doen.
Heb ik het correct?
Ik gebruik dit voorbeeld http://eli.thegreenplace....sks-with-multiprocessing/
Met als enige aanpassing, dat in mijn code, de Queue.Get() ná de Process.join() gebeurt.
Python:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
| # Result Queue resultQueue = Queue(); # Create Fetcher Instance fetcher = fetcherClass() # Create Fetcher Process List fetcherProcesses = [] # Run Fetchers for config in configList: # Create Process to encapsulate Fetcher log.debug("Creating Fetcher for Target: %s" % config['object_name']) fetcherProcess = Process(target=fetcher.Run, args=(config,resultQueue)) log.debug("Starting Fetcher for Target: %s" % config['object_name']) fetcherProcess.start() fetcherProcesses.append((config, fetcherProcess)) # Wait for all Workers to complete for config, fetcherProcess in fetcherProcesses: log.debug("Waiting for Thread to complete (%s)." % str(config['object_name'])) fetcherProcess.join(DEFAULT_FETCHER_TIMEOUT) if fetcherProcess.is_alive(): log.critical("Fetcher thread for object %s Timed Out! Terminating..." % config['object_name']) fetcherProcess.terminate() # Loop thru results, and save them in RRD while not resultQueue.empty(): config, fetcherResult = resultQueue.get() result = storage.Save(config, fetcherResult) |
Fetcher.Run pseudo code
code:
1
2
3
| Run(config, queue):
resultaat = doeIets()
queue.Put((config, resultaat)) |
1) Wat ik niet zo goed begrijp uit het voorbeeld, is waarom er een Process.Join() gedaan word, als er ook al een Queue.Get() gedaan word. .Get() is een blocking call die wacht tot er een item in de Queue zit.
2) Hoe kan ik (netjes) opvangen dat de Queue niet genoeg resultaten bevat als 1 van mijn Processen door een timeout is afgeschoten?
3) Ik krijg een beetje het idee dat Process.Join() en Queue.Get() mutualy exclusive zijn... Tenzij je meerdere .Put's in 1 run zou doen.
Heb ik het correct?
Even niets...