Check alle échte Black Friday-deals Ook zo moe van nepaanbiedingen? Wij laten alleen échte deals zien

[Python] Queue.Get() icm Process.Join()

Pagina: 1
Acties:

  • FireDrunk
  • Registratie: November 2002
  • Laatst online: 18:53
Ik ben bezig met het maken van een Python script dat meerdere Processen wat werk kan laten uitvoeren en de resultaten verzameld en naar op disk wegschrijft.

Ik gebruik dit voorbeeld http://eli.thegreenplace....sks-with-multiprocessing/

Met als enige aanpassing, dat in mijn code, de Queue.Get() ná de Process.join() gebeurt.

Python:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Result Queue
resultQueue = Queue();

# Create Fetcher Instance
fetcher = fetcherClass()

# Create Fetcher Process List
fetcherProcesses = []

# Run Fetchers
for config in configList:
    # Create Process to encapsulate Fetcher
    log.debug("Creating Fetcher for Target: %s" % config['object_name'])
    fetcherProcess = Process(target=fetcher.Run, args=(config,resultQueue))
    
    log.debug("Starting Fetcher for Target: %s" % config['object_name'])
    fetcherProcess.start()
    fetcherProcesses.append((config, fetcherProcess))

# Wait for all Workers to complete
for config, fetcherProcess in fetcherProcesses:
    log.debug("Waiting for Thread to complete (%s)." % str(config['object_name']))
    fetcherProcess.join(DEFAULT_FETCHER_TIMEOUT)
    if fetcherProcess.is_alive():
        log.critical("Fetcher thread for object %s Timed Out! Terminating..." % config['object_name'])
        fetcherProcess.terminate()
    
# Loop thru results, and save them in RRD
while not resultQueue.empty():
    config, fetcherResult = resultQueue.get()
    result = storage.Save(config, fetcherResult)


Fetcher.Run pseudo code

code:
1
2
3
Run(config, queue):
    resultaat =  doeIets()
    queue.Put((config, resultaat))


1) Wat ik niet zo goed begrijp uit het voorbeeld, is waarom er een Process.Join() gedaan word, als er ook al een Queue.Get() gedaan word. .Get() is een blocking call die wacht tot er een item in de Queue zit.

2) Hoe kan ik (netjes) opvangen dat de Queue niet genoeg resultaten bevat als 1 van mijn Processen door een timeout is afgeschoten?

3) Ik krijg een beetje het idee dat Process.Join() en Queue.Get() mutualy exclusive zijn... Tenzij je meerdere .Put's in 1 run zou doen.

Heb ik het correct?

Even niets...


  • FireDrunk
  • Registratie: November 2002
  • Laatst online: 18:53
Iemand nog een idee?

Even niets...


  • ValHallASW
  • Registratie: Februari 2003
  • Niet online
Gebruik Multiprocessing.Pool.map in plaats van zelf met queues te klooien. https://docs.python.org/2...l#using-a-pool-of-workers

Verder: wat is eigenlijk het probleem? Wat werkt er niet? Heb je deadlocks?
1) Wat ik niet zo goed begrijp uit het voorbeeld, is waarom er een Process.Join() gedaan word, als er ook al een Queue.Get() gedaan word. .Get() is een blocking call die wacht tot er een item in de Queue zit.
Omdat je queue nog heel goed leeg kan zijn voordat je bij while not resultQueue.empty(): aankomt.
2) Hoe kan ik (netjes) opvangen dat de Queue niet genoeg resultaten bevat als 1 van mijn Processen door een timeout is afgeschoten?
Nu worden die resultaten overgeslagen. Is dat niet wat je wilt? Wat dan wel? Als je het in je for-loop wilt weten dan kan je na het afschieten van je process een error-object in je queue zetten.
3) Ik krijg een beetje het idee dat Process.Join() en Queue.Get() mutualy exclusive zijn... Tenzij je meerdere .Put's in 1 run zou doen.
Want?

[ Voor 77% gewijzigd door ValHallASW op 01-10-2014 11:41 ]