Black Friday = Pricewatch Bekijk onze selectie van de beste Black Friday-deals en voorkom een miskoop.

[C++] Producer / Consumer wil maar niet lukken?

Pagina: 1
Acties:

  • Darkvater
  • Registratie: Januari 2001
  • Laatst online: 26-08-2024

Darkvater

oh really?

Topicstarter
Ik heb met behulp van CodeProject en pseudocode van Wikipedia een producer/consumer geimplementeerd.

Maar het wil maar niet doen wat ik wil, en ik word er na een dag helemaal gek van. Zie het foute gedeelte ook niet meer... Iemand ideeen?

Ik gebruik een circular buffer om in te lezen/schrijven en deze wordt dan gedeeld door beide threads. Deze zitten in een struct myType waarbij 'usage' gedeeld wordt tussen producer/consumer, bufsize het totale grootte van de buffer geeft en read/write de positie in de buffer.
Ik heb twee semaphores, full/empty en een event shutdown.

...
Zie code in http://gathering.tweakers.net/forum/quote_message/30387649/0 (vierde reply)
...

Ten eerste vind ik het erg vreemd dat nadat de consumer bij een empty buffer heeft gewacht en gesignaled wordt (dus usage is niet meer 0!!) het nog kennelijk mogelijk is dat usage 0 is; getuige een assert. Maar ook waarom de consumer meer inleest dat er geschreven is, is mij een raadsel. Als usage 0 is dan is het gewoon leeg, en dan wacht deze thread ook op een wakeup.
Ik heb ook sporadisch dat de ReleaseSemaphore functie een error teruggeeft over dat er teveel werd aangeroepen (max overschreden die nu op 1 staat).

Wat doe ik nou fout? :(

P.S. Ik heb bij mijn code de CriticalSection weggelaten omdat (volgens wikipedia) deze niet nodig zijn bij Monitors.

[ Voor 42% gewijzigd door Darkvater op 09-07-2008 00:16 . Reden: nieuwe code ]


Windows Vista? *NEVER* Het waarom - Opera forever!!!
I've seen chickens that were more menacing. Chickens in a coma. On ice. In my fridge


  • Zoijar
  • Registratie: September 2001
  • Niet online

Zoijar

Because he doesn't row...

Wat is dat usage? Is dat thread-safe?

De logica met 2 semaphores is goed, als je slechts 1 producer en 1 consumer hebt.

  • Darkvater
  • Registratie: Januari 2001
  • Laatst online: 26-08-2024

Darkvater

oh really?

Topicstarter
Zoijar schreef op maandag 07 juli 2008 @ 20:25:
Wat is dat usage? Is dat thread-safe?

De logica met 2 semaphores is goed, als je slechts 1 producer en 1 consumer hebt.
Ik heb inderdaad maar slechts 1 producer en 1 consumer, meer is ook niet nodig. 'usage' geeft aan hoeveel van de buffer in gebruik is. Bij 0 is de buffer leeg, als het gelijk is aan 'bufsize', is de buffer vol. Als ik een element toevoeg dan komt er eentje bij, als ik er eentje lees gaat er eentje af.

Hoe weet ik of het thread-safe is?


Windows Vista? *NEVER* Het waarom - Opera forever!!!
I've seen chickens that were more menacing. Chickens in a coma. On ice. In my fridge


  • Zoijar
  • Registratie: September 2001
  • Niet online

Zoijar

Because he doesn't row...

Als ik het goed begrijp is usage juist de variabele die je moet synchronizen. Die logica moet binnen je semaphores staan. Je hoeft daar zelf niet op te checken, de semaphore empty houdt juist zelf bij of er nog lege slots zijn.

Ik heb toevallig net zelf een shared memeory producer consumer buffer geschreven. Mijn acquire-release ziet er zo uit:

C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
    T* acquire_write_slot(bool block = true) {
        if (block) {
            sem_empty.wait();
        } else if (!sem_empty.try_wait()) {
            return 0;
        }

        return &storage[write_index*slot_sz];
    }

    void release_write_slot() {
        advance(write_index);
        sem_full.post();
    }

    T* acquire_read_slot(bool block = true) {
        if (block) {
            sem_full.wait();
        } else if (!sem_full.try_wait()) {
            return 0;
        }

        return &storage[read_index*slot_sz];
    }

    void release_read_slot() {
        advance(read_index);
        sem_empty.post();
    }

(ik split die functies omdat ik slots uitgeef en dat gaat via een scoped locking object... ingewikkeld... Verder gebruik ik boost::interprocess semaphores) Je kan nu iets toevoegen op deze manier:
C++:
1
2
3
T* x = pcb->acquire_read_slot(true);
x = // my data
pcb->release_write_slot();

  • Darkvater
  • Registratie: Januari 2001
  • Laatst online: 26-08-2024

Darkvater

oh really?

Topicstarter
Dank je wel Zoijar, Ik heb nu EnterCriticalSection/LeaveCriticalSection gezet tussen het veranderen van usage en de ReleaseSemaphore() code. Nu gaat het allemaal wel correct.

Wat mij wel nogal vreemd blijft is het volgende:
Als ik de programma run krijg ik heel vaak een error bij ReleaseSemaphore; 298. Dit is dat de semaphore te vaak is gereleased. Het zou maar een keer mogen, maar kennelijk gebeurt het vaker. Als ik naar de code kijk dan moet het toch correct zijn? Ik doe alleen maar een release bijvoorbeeld als 'usage == 1' en niet anders. Overflowen kan die waarde ook niet want als het gelijk is aan bufsize moet de producer wachten. Er wordt door de consumer heel mooi gewacht op de semaphore bij een lege buffer... wat is er nou mis? Hoe kan er toch dan deze disrepancy ontstaan?

Ook bizar is het feit dat kennelijk de correlatie coefficient van een vector van lengte 200 sneller berekend wordt dan een fprintf? Naja, als dat zo is...

Mijn code zoals die nu is:
Aanroepen threads:
C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    Wavelet w;
    /* read in data from matlab */
    w.data = getMatlabData("matX.mat", "X", timeframes, regions, individuals);

    BufferThread<double> q(50);
    w.buffer = &q;
    HANDLE p = CreateThread(NULL, 0, producer, &w, 0, NULL);
    if (p == NULL) printf("cannot create producer!\n");
    
    HANDLE c = CreateThread(NULL, 0, consumer, &w, 0, NULL);
    if (c == NULL) printf("cannot create consumer!\n");
    WaitForSingleObject(p, INFINITE);
    CloseHandle(p);
    w.buffer->Shutdown();
    WaitForSingleObject(c, INFINITE);
    CloseHandle(c);

De threads zelf:
C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
typedef struct Wavelet {
    BufferThread<double> *buffer;
    double *data;
} Wavelet;

static DWORD WINAPI producer(LPVOID param) {
    BufferThread<double> *out = ((Wavelet*)param)->buffer;
    double *data = ((Wavelet*)param)->data;

    for (int i = 0; i != regions; i++) {
        for (int j = i + 1; j != regions; j++) {
            double r = corrcoef<timeframes>(&data[i*timeframes], &data[j*timeframes]);
            out->Add(r);
        }
    }

    printf("add: %d, remove: %d\n", out->add, out->remove);
    return 0;
}

static DWORD WINAPI consumer(LPVOID param) {
    BufferThread<double> *in = ((Wavelet*)param)->buffer;
    FILE *fp = fopen("out_par.txt", "w");

    double rMap = in->Remove();
    while (in->running) {
        fprintf(fp, "%7.4f\n", rMap);
        rMap = in->Remove();
    }

    fclose(fp);
    printf("add: %d, remove: %d\n", in->add, in->remove);
    return 0;
}


Mijn buffer struct:
C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/* $Id: buffer.h 70 2008-07-07 18:36:37Z tomi $ */
#include <windows.h>

template <class T>
class BufferThread {
public:
    BufferThread(int size): bufsize(size), read(0), write(0), usage(0), running(true) {
        buffer = new T[bufsize];
        full = CreateSemaphore(NULL, 0, 1, NULL);
        empty = CreateSemaphore(NULL, 0, 1, NULL);
        shutdown = CreateEvent(NULL, true, false, NULL);
        hList[EMPTY] = empty; hList[SHUTDOWN] = shutdown;
        InitializeCriticalSection(&lock);
        add = remove = 0;
    }

    ~BufferThread(void) {
        delete[] buffer;
        buffer = NULL;

        CloseHandle(full);
        CloseHandle(empty);
        CloseHandle(shutdown);
        DeleteCriticalSection(&lock);
    }

    /** PRODUCER
     * Add a new element to the buffer. If the buffer is full, we wait until
     * the consumer object (BufferThread::Remove() has read at least a single item */
    void Add(T p) {
        while (usage == bufsize) {add++;WaitForSingleObject(full, INFINITE);}

        this->buffer[write] = p;
        write = increment(write);

        EnterCriticalSection(&lock);
        usage++;

        if (usage == 1) {
            if (!ReleaseSemaphore(empty, 1, NULL)) 
                printf("Cannot release semaphore for 'Add' (Err: %d)\n", GetLastError());
        }

        LeaveCriticalSection(&lock);
    }

    /** CONSUMER
     * Read an element from the buffer. If the buffer is empty, we wait until
     * the producer object (BufferThread::Add() has written at least a single item */
    T Remove(void) {
        while (usage == 0) {
            remove++;
            switch (WaitForMultipleObjects(2, hList, false, INFINITE)) {
                case WAIT_OBJECT_0 + EMPTY:
                    break;
                case WAIT_OBJECT_0 + SHUTDOWN: 
                    running = false;
                    return (T)NULL;
                default: assert(false); return (T)NULL;
            }
        }

        T result = this->buffer[read];
        read = increment(read);

        EnterCriticalSection(&lock);
        usage--;

        if (usage == bufsize - 1) {
            if (!ReleaseSemaphore(full, 1, NULL)) 
                printf("Cannot release semaphore for 'Remove' (Err: %d)\n", GetLastError());
        }

        LeaveCriticalSection(&lock);

        return result;
    }

    /** Gracefully shutdown the consumer. Will empty the buffer first before exiting */
    void Shutdown(void) {SetEvent(shutdown);}
public:
    bool running;
    int add, remove;
private:
    inline int increment(int i) {return (i + 1) % this->bufsize;}
    enum {EMPTY, SHUTDOWN};
    HANDLE full, empty, shutdown;
    HANDLE hList[2];
    CRITICAL_SECTION lock;
    int bufsize, usage;
    int read, write;
    T *buffer;
};


Windows Vista? *NEVER* Het waarom - Opera forever!!!
I've seen chickens that were more menacing. Chickens in a coma. On ice. In my fridge


  • curry684
  • Registratie: Juni 2000
  • Laatst online: 06-09 00:37

curry684

left part of the evil twins

Darkvater schreef op woensdag 09 juli 2008 @ 00:14:
Ook bizar is het feit dat kennelijk de correlatie coefficient van een vector van lengte 200 sneller berekend wordt dan een fprintf? Naja, als dat zo is...
Dat verbaast me totaal niet eigenlijk. De berekening die je doet is puur een floating point operatie op een paar registers, dus daar walst de FPU lazy doorheen in de achtergrond terwijl de CPU al lang en breed op je synchronization object zit te walsen. Dit in vergelijk met fprintf die je adresbus zit te belasten voor het inlezen van de formatting string en vervolgens voor de output moet gaan zitten queuen op het veruit traagste onderdeel van je hele systeem.

Professionele website nodig?


  • Zoijar
  • Registratie: September 2001
  • Niet online

Zoijar

Because he doesn't row...

Je code is nog steeds niet goed: dit is niet thread-safe. Je gebruikt semaphores verkeerd (als mutex)

Dit moet het zijn:

// Regels 9-10:
full = CreateSemaphore(NULL, 0, bufsize, NULL);
empty = CreateSemaphore(NULL, bufsize, bufsize, NULL);
// bufsize buckets max, full begint op 0, empty op bufsize


void Add(T p) {
WaitForSingleObject(empty, INFINITE);} // we hebben een EMPTY slot nodig hier!

this->buffer[write] = p;
write = increment(write);

usage++;
ReleaseSemaphore(full, 1,NULL); // we notify-en hier een FULL slot, nl. die we net hebben gevuld.
}

T Remove(void) {
WaitForSingleObject(full, INFINITE); // we hebben een FULL slot nodig hier!

T result = this->buffer[read];
read = increment(read);
usage--;

ReleaseSemaphore(empty, 1, NULL); // we krijgen er een EMPTY slot bij, we halen er eentje weg hier

return result;
}

  • Darkvater
  • Registratie: Januari 2001
  • Laatst online: 26-08-2024

Darkvater

oh really?

Topicstarter
Ik heb de empty/full net andersom geimplementeerd. Mijn redenatie was dat ik wacht tot de buffer niet meer "FULL" is :). Heb het nu veranderd, omgewisseld en de semaphore (initial) size omgezet..

Dit gaat goed bij een buffersize van 50, zoals in mijn code. Als ik dit echter verander naar 2 dan krijg ik een hele waslijst aan errors voor ReleaseSemaphore(empty) (Remove). Bij een buffersize van 1 krijg ik bij beiden ReleaseSemaphore(empty) (Remove) en ReleaseSemaphore(full) (Add) errors.

Iets klopt er nog steeds niet...


Windows Vista? *NEVER* Het waarom - Opera forever!!!
I've seen chickens that were more menacing. Chickens in a coma. On ice. In my fridge


  • .oisyn
  • Registratie: September 2000
  • Laatst online: 00:04

.oisyn

Moderator Devschuur®

Demotivational Speaker

Ik zou die critical sections sowieso niet gebruiken, het enige wat je ermee doet is de 'usage' variabele guarden die je alleen maar wilt incrementen en decrementen. Gebruik dan InterlockedIncrement() en InterlockedDecrement(), die zijn gegarandeerd atomic op elk systeem (en dus lockless en daarmee een stuk sneller dan zo'n synchronisatie-object).

Give a man a game and he'll have fun for a day. Teach a man to make games and he'll never have fun again.


  • Darkvater
  • Registratie: Januari 2001
  • Laatst online: 26-08-2024

Darkvater

oh really?

Topicstarter
.oisyn schreef op woensdag 09 juli 2008 @ 11:18:
Ik zou die critical sections sowieso niet gebruiken, het enige wat je ermee doet is de 'usage' variabele guarden die je alleen maar wilt incrementen en decrementen. Gebruik dan InterlockedIncrement() en InterlockedDecrement(), die zijn gegarandeerd atomic op elk systeem (en dus lockless en daarmee een stuk sneller dan zo'n synchronisatie-object).
Krijg je dan geen problemen dat ReleaseSemaphore dan niet op het juiste moment wordt aangeroepen doordat een andere thread usage daarvoor ook veranderd heeft waardoor mogelijk een deadlock kan ontstaan?


Windows Vista? *NEVER* Het waarom - Opera forever!!!
I've seen chickens that were more menacing. Chickens in a coma. On ice. In my fridge


  • .oisyn
  • Registratie: September 2000
  • Laatst online: 00:04

.oisyn

Moderator Devschuur®

Demotivational Speaker

Je mag van die synchronisatie functies wel verwachten dat ze thread-safe zijn hoor ;). Maar even compleet los daarvan, je ondersteunt sowieso maar 2 threads - de consumer en de producer. Op het moment dat je er meerdere hebt ben je de sjaak want je guard de read en write positions ook niet. Dus of ReleaseSemaphore() nou thread-safe is of niet, je roept 'm sowieso maar voor 1 thread aan per semaphore.

Give a man a game and he'll have fun for a day. Teach a man to make games and he'll never have fun again.


  • Darkvater
  • Registratie: Januari 2001
  • Laatst online: 26-08-2024

Darkvater

oh really?

Topicstarter
Dat snap ik :+ Ik bedoelde meer de guard (usage == 1) en (usage == bufsize - 1) dat die verkeerd gepasseerd worden als niet alles in een critical section zit.

Edit: DOH, ik snap nu wat Zoijar bedoelt. Haal die hele van 'while (usage == 0)' en '(while (usage == bufsize)'. Of eigenlijk de hele usage variabele en doe het met de semaphores :)

Op mijn manier is het eigenlijk een mutex, of zou het moeten zijn, maar werkt niet echt zo :p Mijn enige vraag is nu waarom de mutex niet werkt? Des te meer omdat de parallele code ongeveer twee keer zo langzaam is nu als de sequenitele, naar ik neem aan door de constante overhead van WaitForObject/ReleasSemaphore. Een mutex zou deze calls moeten beperken denk ik.

[ Voor 69% gewijzigd door Darkvater op 09-07-2008 13:02 . Reden: he has seen the light! ]


Windows Vista? *NEVER* Het waarom - Opera forever!!!
I've seen chickens that were more menacing. Chickens in a coma. On ice. In my fridge

Pagina: 1