C++ multithreading: mainloop en meerdere workers

Pagina: 1
Acties:

Onderwerpen

Vraag


Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
Ik ben bezig met het multithreaded maken van een applicatie. Deze bestaat uit een main loop en een aantal workers. De workers kunnen onafhankelijk van elkaar tegelijk worden uitgevoerd, maar de main loop mag niet worden uitgevoerd als de workers aan het werk zijn. Dit alles moet continue worden uitgevoerd, zoals hieronder met pseudocode kan worden weergegeven

C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
mainLoop()
{
    while (true)
    {
        wachtTotAlleWorkersKlaarZijn();
        DoeZelfBerekeningen();
        NotifyWorkersDatZeVerderKunnen();
    }
}

worker(int id)
{
    while (true)
    {
        wachtOpNotifyVanMainLoop();
        DoeBerekeningen();
    }
}


Middels boost heb ik zelf wat geprobeerd middels een aantal mutexen en condition variables, maar ik krijg nog niet het gewenste resultaat. Met de onderstaande code krijg ik als output dat de mainLoop twee keer wordt uitgevoerd en slechts een enkele worker wordt drie keer uitgevoerd. De gewenste output is dat continue de mainLoop wordt uitgevoerd gevolgd door alle workers.

C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
class ThreadManager
{
    private:
        boost::condition_variable updateReadyCondition;
        boost::mutex updateReadyMutex;
        bool updateReady = false;

        boost::condition_variable threadsReadyCondition;
        boost::mutex threadsReadyMutex;
        bool threadsReady = true;

        int numThreadsRunning = 0;
        int numThreadsStarted = 0;

        boost::mutex numThreadsRunningMutex;
        boost::mutex numThreadsStartedMutex;

    public:

        void threadStarted()
        {
            numThreadsStartedMutex.lock();
            numThreadsStarted++;
            if (numThreadsStarted == NUM_THREADS)
            {
                updateReady = false;
            }
            checkIfThreadsAreReady();
            numThreadsStartedMutex.unlock();
        }

        void threadEnded()
        {
            numThreadsRunningMutex.lock();
            numThreadsRunning--;
            checkIfThreadsAreReady();
            numThreadsRunningMutex.unlock();
        }

        void checkIfThreadsAreReady()
        {
            if (numThreadsRunning == 0 && numThreadsStarted == NUM_THREADS)
            {
                boost::unique_lock<boost::mutex> lock(threadsReadyMutex);
                threadsReady = true;
                updateReady = false;
                threadsReadyCondition.notify_all();
            }
        }

        void mainLoop()
        {
            while (true)
            {
                boost::unique_lock<boost::mutex> lockThread(threadsReadyMutex);
                while (!threadsReady)
                {
                    threadsReadyCondition.wait(lockThread);
                }
            
                cout << "update\n";
                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
                cout << "update done\n";

                numThreadsRunning = NUM_THREADS;
                numThreadsStarted = 0;

                boost::unique_lock<boost::mutex> lockUpdate(updateReadyMutex);
                updateReady = true;
                updateReadyCondition.notify_all();
            }
        }

        void runThread(int id)
        {
            while (true)
            {
                boost::unique_lock<boost::mutex> lockUpdate(updateReadyMutex);
                while (!updateReady)
                {
                    updateReadyCondition.wait(lockUpdate);
                }
            
                threadStarted();
                cout << "start " << id << "\n";
                std::this_thread::sleep_for(std::chrono::milliseconds(1000 * id));
                cout << "end " << id << "\n";
                threadEnded();
            }
        }

        void start()
        {
            vector<boost::thread> threads;
            threads.push_back(boost::thread(&ThreadManager::mainLoop, this));
            for (int i = 0; i < NUM_THREADS; i++)
            {
                threads.push_back(boost::thread(&ThreadManager::runThread, this, i));
            }

            for (int i = 0; i < NUM_THREADS + 1; i++)
            {
                threads[i].join();
            }
        }
};

Beste antwoord (via Verwijderd op 05-04-2016 22:06)


  • farlane
  • Registratie: Maart 2000
  • Laatst online: 15:26
Oke, ben even aan het pielen geweest, maar dit lijkt volgens mij meer op wat je probeert te bereiken. De crux zit 'em in regel 15, waar je de lock op het "werk" weer loslaat nadat je deze hebt gekregen in de wait( .... ). ( Deze versie van wait encapsulate de while loop die jij expliciet uitschrijft ) en zoals je ziet hoef je voor de notify niets te locken.

C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
condition_variable work, complete;
mutex mtx;
int work_cnt, busy_cnt;

void worker( int id )
{

    while( true )
    {
        unique_lock<mutex> lk(mtx);

        work.wait(lk, []{ return work_cnt > 0; } );
        ++busy_cnt;
        --work_cnt;
        lk.unlock();


        // Do work here

        {
            lock_guard<mutex> lk(mtx);
            --busy_cnt;
        }
        complete.notify_all();
    }
}

void supervisor()
{
    while( true )
    {
        {
            lock_guard<mutex> lk(mtx);
            work_cnt = 5;
        }

        work.notify_all();

        unique_lock<mutex> lk(mtx);
        complete.wait(lk, []{   return work_cnt == 0 && busy_cnt == 0; } );
    }
}

int main()
{
    thread t1(worker, 1), t2(worker, 2), t3(worker, 3), t4(supervisor);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}

Somniferous whisperings of scarlet fields. Sleep calling me and in my dreams i wander. My reality is abandoned (I traverse afar). Not a care if I never everwake.

Alle reacties


Acties:
  • 0 Henk 'm!

  • farlane
  • Registratie: Maart 2000
  • Laatst online: 15:26
Volgens mij is je locking nogal ingewikkeld. Als de threads simpelweg wachten op een notify ( updateReadyCondition, d'r hoeft volgens mij geen boolean+lock achter) en de threadsrunning ophogen en als ze klaar zijn het ding weer verlagen en de threadsReadyCondition notifyen ben je d'r toch? De threadsrunning moet wel een mutex omheen natuurlijk.

Somniferous whisperings of scarlet fields. Sleep calling me and in my dreams i wander. My reality is abandoned (I traverse afar). Not a care if I never everwake.


Acties:
  • +2 Henk 'm!

  • MSalters
  • Registratie: Juni 2001
  • Laatst online: 13-09 00:05
Veel te ingewikkeld. Je wil helemaal niet op dit nivo denken. "Workers"? Het is de 20ste eeuw niet, we leven in de toekomst. Gebruik std::future en laat de implementatie van de details over aan de compilerbouwers.

Man hopes. Genius creates. Ralph Waldo Emerson
Never worry about theory as long as the machinery does what it's supposed to do. R. A. Heinlein


Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
Ik heb de locking inderdaad voor de helft weggehaald, maar ik blijf hetzelfde probleem ondervinden. Met een std::future kan het inderdaad ook, maar dan nog snap ik niet waarom de bovenstaande implementatie niet werkt.

Acties:
  • 0 Henk 'm!

  • farlane
  • Registratie: Maart 2000
  • Laatst online: 15:26
OK, wat heb je nu dan?

FWIW, ik vind het een gezonde impuls om je af te vragen waarom het (niet) werkt. In productiecode gebruikt je natuurlijk wel de library bouwers' voorzieningen :)

Somniferous whisperings of scarlet fields. Sleep calling me and in my dreams i wander. My reality is abandoned (I traverse afar). Not a care if I never everwake.


Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
Op dit moment is dit de code:
C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class ThreadManager
{
    private:
        boost::condition_variable updateReadyCondition;
        boost::mutex updateReadyMutex;
        bool updateReady = false;

        boost::condition_variable threadsReadyCondition;
        boost::mutex threadsReadyMutex;
        bool threadsReady = true;

        int numThreadsRunning = 0;

        boost::mutex numThreadsRunningMutex;

    public:


        void threadEnded()
        {
            numThreadsRunningMutex.lock();
            numThreadsRunning--;
            checkIfThreadsAreReady();
            numThreadsRunningMutex.unlock();
        }

        void checkIfThreadsAreReady()
        {
            if (numThreadsRunning == 0)
            {
                boost::unique_lock<boost::mutex> lock(threadsReadyMutex);
                threadsReady = true;
                updateReady = false;
                threadsReadyCondition.notify_all();
            }
        }

        void mainLoop()
        {
            while (true)
            {
                boost::unique_lock<boost::mutex> lockThread(threadsReadyMutex);
                while (!threadsReady)
                {
                    threadsReadyCondition.wait(lockThread);
                }

                cout << "update\n";
                std::this_thread::sleep_for(std::chrono::milliseconds(1000));
                cout << "update done\n";

                numThreadsRunning = NUM_THREADS;

                boost::unique_lock<boost::mutex> lockUpdate(updateReadyMutex);
                updateReady = true;
                updateReadyCondition.notify_all();
            }
        }

        void runThread(int id)
        {
            while (true)
            {
                boost::unique_lock<boost::mutex> lockUpdate(updateReadyMutex);
                while (!updateReady)
                {
                    updateReadyCondition.wait(lockUpdate);
                }

                cout << "start " << id << "\n";
                std::this_thread::sleep_for(std::chrono::milliseconds(1000 * id));
                cout << "end " << id << "\n";
                threadEnded();
            }
        }

        void start()
        {
            vector<boost::thread> threads;
            threads.push_back(boost::thread(&ThreadManager::mainLoop, this));
            for (int i = 0; i < NUM_THREADS; i++)
            {
                threads.push_back(boost::thread(&ThreadManager::runThread, this, i));
            }

            for (int i = 0; i < NUM_THREADS + 1; i++)
            {
                threads[i].join();
            }
        }
};


Ik krijg hierbij de volgende output, waarbij de thread id wisselend is, maar er wel constant slechts een enkele thread wordt gestart:
code:
1
2
3
4
5
6
7
8
9
10
update
update done
update
start 1
update done
end 1
start 1
end 1
start 1
end 1

Acties:
  • +1 Henk 'm!

  • MSalters
  • Registratie: Juni 2001
  • Laatst online: 13-09 00:05
Wat precies denk je dat een mutex doet?! Het is een afkorting for Mutual Exclusion, en dat is precies wat je ziet. Er is precies 1 thread die updateReadyMutex gelocked kan hebben op elk willekeurig moment,en dat is de enige workerthread die kan runnen.

Overigens heb je nog grof mazzel dat je geen deadlock hebt, want je mutexen zijn niet gesorteerd. Mainloop heeft threadsReadyMutex < updateReadyMutex en runThread heeft updateReadyMutex < threadsReadyMutex.

Man hopes. Genius creates. Ralph Waldo Emerson
Never worry about theory as long as the machinery does what it's supposed to do. R. A. Heinlein


Acties:
  • 0 Henk 'm!

  • Trolando
  • Registratie: April 2005
  • Laatst online: 25-08 13:47
Misschien heb je hier wat aan, mijn eigen project Lace op GitHub?

https://github.com/trolando/lace

Ja, ik zal een keer een Readme toevoegen. Check benchmarks/fib/fib-lace.c voor een voorbeeld. Het is niet echt C++, maar C++ kan wel. C.

Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
Ik weet wat een mutex doet, alleen begreep ik uit de documentatie dat door middel van een condition variable er meerdere threads tegelijk konden wachten op een notify. Zie bijvoorbeeld hier: http://www.cplusplus.com/...iable/condition_variable/

Wat bedoel je precies dat er geen deadlock is? In mijn gedachten krijgt elke thread een bericht van de mainLoop om verder te gaan en de mainLoop krijgt een bericht wanneer de threads klaar zijn.

Acties:
  • 0 Henk 'm!

  • farlane
  • Registratie: Maart 2000
  • Laatst online: 15:26
Als je de beschijving van condition_variable leest :
"When the condition variable is notified, a timeout expires, or a spurious wakeup occurs, the thread is awakened, and the mutex is atomically reacquired. The thread should then check the condition and resume waiting if the wake up was spurious."
Dus jouw worker threads willen allemaal, gedurende de gesimuleerde taak, de mutex gelocked hebben, waardoor er dus maar 1 tegelijkertijd kan runnen.

Somniferous whisperings of scarlet fields. Sleep calling me and in my dreams i wander. My reality is abandoned (I traverse afar). Not a care if I never everwake.


Acties:
  • Beste antwoord
  • +1 Henk 'm!

  • farlane
  • Registratie: Maart 2000
  • Laatst online: 15:26
Oke, ben even aan het pielen geweest, maar dit lijkt volgens mij meer op wat je probeert te bereiken. De crux zit 'em in regel 15, waar je de lock op het "werk" weer loslaat nadat je deze hebt gekregen in de wait( .... ). ( Deze versie van wait encapsulate de while loop die jij expliciet uitschrijft ) en zoals je ziet hoef je voor de notify niets te locken.

C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
condition_variable work, complete;
mutex mtx;
int work_cnt, busy_cnt;

void worker( int id )
{

    while( true )
    {
        unique_lock<mutex> lk(mtx);

        work.wait(lk, []{ return work_cnt > 0; } );
        ++busy_cnt;
        --work_cnt;
        lk.unlock();


        // Do work here

        {
            lock_guard<mutex> lk(mtx);
            --busy_cnt;
        }
        complete.notify_all();
    }
}

void supervisor()
{
    while( true )
    {
        {
            lock_guard<mutex> lk(mtx);
            work_cnt = 5;
        }

        work.notify_all();

        unique_lock<mutex> lk(mtx);
        complete.wait(lk, []{   return work_cnt == 0 && busy_cnt == 0; } );
    }
}

int main()
{
    thread t1(worker, 1), t2(worker, 2), t3(worker, 3), t4(supervisor);
    t1.join();
    t2.join();
    t3.join();
    t4.join();
}

Somniferous whisperings of scarlet fields. Sleep calling me and in my dreams i wander. My reality is abandoned (I traverse afar). Not a care if I never everwake.


Acties:
  • 0 Henk 'm!

Verwijderd

Topicstarter
Aah super, dit werkt inderdaad een stuk beter dan mijn implementatie.

Acties:
  • 0 Henk 'm!

  • farlane
  • Registratie: Maart 2000
  • Laatst online: 15:26
Verwijderd schreef op dinsdag 05 april 2016 @ 22:06:
Aah super, dit werkt inderdaad een stuk beter dan mijn implementatie.
Maar snap je ook wat er gebeurt? ;)

Somniferous whisperings of scarlet fields. Sleep calling me and in my dreams i wander. My reality is abandoned (I traverse afar). Not a care if I never everwake.

Pagina: 1