[C++] Multithreaded probleem bij buffer

Pagina: 1
Acties:

Onderwerpen


Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
Ik ben al even bezig met een multi-threaded programma waarbij ik beeld-data en bijbehorende metadata per frame tussen 2 threads wil kunnen bufferen.
Dus ik heb een BufferQueue object gemaakt.
Omdat de hoeveelheid data per frame kan verschillen, moet er een write-request gedaan worden waarbij een pointer wordt opgeleverd (die dus NULL kan zijn) waar begonnen kan worden met schrijven en na afloop een writeCommit met de bijbehorende metadata en hoeveel data er werkelijk is geschreven.
Bij het request wordt dan aangegeven wat de geschatte bovengrens is van de hoeveelheid data die je wilt gaan schrijven.
N.B. er is dus 1 producer en 1 consumer.

Om dit op te lossen heeft het BufferQueue object een (hele) grote aaneengesloten buffer voor de pixel-data.
De informatie waar voor elk frame de data begint, hoeveel data erin zit en de metadata, wordt opgeslagen in een BufferInfo object.
Zo'n BufferInfo-object wordt weer opgeslagen in een CircularBuffer object.

Dat CircularBuffer-object bestaat uit een std::vector die 3 indices bij moet houden:
- readIndex
- writeIndex
- lastWrittenIndex.

Die lastWrittenIndex is nodig omdat de BufferQueue in 2 modi moet kunnen werken:
- processAllData
- processLastWritten

Oftewel de consumer-thread moet in bepaalde toepassingen data kunnen skippen wanneer de consumer het niet bij kan houden. Bijvoorbeeld live video moet gewoon snappy zijn en dus altijd het laatste beeld tonen, maar bij de focus kun je je niet permitteren om wat te missen, maar even wachten is dan niet erg.

Hieruit volgen de volgende definities voor isFull() en isEmpty():
code:
1
2
3
4
5
6
7
       bool isFull() const {
          return (_writeIndex == _readIndex);
        }

        bool isEmpty() const {
          return (_readIndex == _lastWrittenIndex);
        }


De indices zijn allen gedeclareerd als volatile size_t.

Om de boel een beetje rap te houden, heb ik een aantal optimalisaties toegepast:
- Geen locking tenzij echt niet anders kan (duurt te lang)
- Data in de "pixelbuffer" is cache-aligned. Oftewel elke write-pointer begint altijd op een adres wat een veelvoud is van de grootte van een cache-line op de processor. Dit om te voorkomen dat lees- en write-thread op elkaar moeten wachten.
- de indices zijn ook op 'cache-line-size' afstand van elkaar gedeclareerd, zodat ze elkaar niet in de weg zitten.

Dit is een hele lange inleiding om het probleem te kunnen beschrijven.
Deze buffer werkt geweldig.... maar niet altijd.
Heel soms (soms pas na 100GB aan data) komt het voor dat de read-thread een pointer krijgt die wijst naar data die nog net niet overschreven is met nieuwe data.
Oftewel je loopt dan qua beelden net zolang achter als dat de buffer lang is. (voor 100 MB buffer met 40 MB/s is dat dus 2.5 sec)
Het heeft er dus alle schijn van dat de read-index niet op lastWritten wordt gezet, maar op write-index, of mogelijk zelfs nog daarvoor.
Ik heb allerlei asserts in de code gehad om dat te proberen te detecteren, maar tot op heden niets kunnen vinden wat dat aantoont. Telkens als ik de debugger inspecteer, staan er precies de waarden die je zou verwachten.
En nu komt het gekke, die vertraging blijft constant. Je zou verwachten dat wanneer de readpointer op writepointer komt te staan, dat de buffer dan vol is, dus dat er geen data bij komt, dus dat de reader vanzelf de buffer verder leeg moet lezen. De reader zou het makkelijk moeten kunnen bijbenen (CPU-load is laag genoeg)
De vertraging kan makkelijk uren aan een stuk zo blijven, tot de reader-thread het een keer niet meer bij kan houden en dus weer een paar frames achter komt te liggen. Dan gaat het weer gewoon verder zoals het hoort.

Het enige wat ik kan bedenken is dat de reader op een gegeven moment een BufferInfo object krijgt wat een andere inhoud heeft dan die er net in is geschreven. Oftewel dat ik naar een oude cache zit te kijken.
De std::vector waarin ik de BufferInfo objecten opsla is niet volatile gedeclareerd.
Nu kun je std-containers ook niet volatile declareren, maar het element BufferInfo wat ik erin zet is niet volatile.

Mijn vraag is, is het überhaupt mogelijk dat ik steeds tegen een verouderde set data aan zit te kijken?

Nog een paar toevoegingen:
- er is een flush-functie in BufferQueue die in CircularQueue de readIndex gelijk maakt aan lastWritten. Hiervoor heb ik de update van de readIndex in een critical section geplaatst. Die is heel snel wanneer je 'm vrijwel altijd uitvoert vanuit dezelfde thread.
- ik gebruik een core i7 (1 socket dus) en de hele code bestaat uit zo'n 12 threads (1x QT, 1x USB read-thread, 1x image processing thread, 1x display thread en 8 threads van OpenMP voor de deBayering en verdere post-processing van het beeld)
- ik heb echt performance nodig in de zin van veel frames per second, dus locking is niet een optie. Ik haal nu makkelijk 1000-en frames per sec, maar er komen ook wel situaties voor dat ik ook echt 300 fps moet verwerken. Locks kunnen zomaar 10 - 50 ms duren als het tegenzit en dan heb ik al tig frames verloren.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • H!GHGuY
  • Registratie: December 2002
  • Niet online

H!GHGuY

Try and take over the world...

TD-er schreef op zondag 05 februari 2012 @ 10:49:
Om de boel een beetje rap te houden, heb ik een aantal optimalisaties toegepast:
- Geen locking tenzij echt niet anders kan (duurt te lang)
Als locking te lang duurt bij een 1:1 producer consumer, dan doe je iets fout. Geen idee welk OS je gebruikt, maar de Linux futex en Windows CriticalSection zijn volledig userspace als de lock genomen kan worden.
Bij een 1:1 producer consumer, met zulke hoeveelheden data, zou de lock geen contention point mogen zijn.
- Data in de "pixelbuffer" is cache-aligned. Oftewel elke write-pointer begint altijd op een adres wat een veelvoud is van de grootte van een cache-line op de processor. Dit om te voorkomen dat lees- en write-thread op elkaar moeten wachten.
Dit is allemaal mooi, maar helpt in realiteit vaak weinig bij zulke hoeveelheden data. Voor aanvang van een queue item zit je cache vol met stale data. Daarbovenop heb je bij video buffers veelal een (quasi-)lineair access patroon. Dus de writer zit meestal al op een heel andere plaats bezig dan de reader.
- de indices zijn ook op 'cache-line-size' afstand van elkaar gedeclareerd, zodat ze elkaar niet in de weg zitten.
Ook hier weer: 1 consumer, 1 producer. Per iteratie verwerkt elke thread vele kbytes/Mbytes aan data. Die ene pointer access zal voor cache access het verschil niet maken.
Dit is een hele lange inleiding om het probleem te kunnen beschrijven.
Deze buffer werkt geweldig.... maar niet altijd.
Heel soms (soms pas na 100GB aan data) komt het voor dat de read-thread een pointer krijgt die wijst naar data die nog net niet overschreven is met nieuwe data.
Oftewel je loopt dan qua beelden net zolang achter als dat de buffer lang is. (voor 100 MB buffer met 40 MB/s is dat dus 2.5 sec)
Het heeft er dus alle schijn van dat de read-index niet op lastWritten wordt gezet, maar op write-index, of mogelijk zelfs nog daarvoor.
Ik heb allerlei asserts in de code gehad om dat te proberen te detecteren, maar tot op heden niets kunnen vinden wat dat aantoont. Telkens als ik de debugger inspecteer, staan er precies de waarden die je zou verwachten.
En nu komt het gekke, die vertraging blijft constant. Je zou verwachten dat wanneer de readpointer op writepointer komt te staan, dat de buffer dan vol is, dus dat er geen data bij komt, dus dat de reader vanzelf de buffer verder leeg moet lezen. De reader zou het makkelijk moeten kunnen bijbenen (CPU-load is laag genoeg)
De vertraging kan makkelijk uren aan een stuk zo blijven, tot de reader-thread het een keer niet meer bij kan houden en dus weer een paar frames achter komt te liggen. Dan gaat het weer gewoon verder zoals het hoort.
- Vermoedelijk heb je een overflow of underflow, typisch door een off-by-one. Ergens maak je een rekenfout (modulo-rekenen?) die ervoor zorgt dat bij bepaalde condities de reader de writer voorbijsteekt of zoiets.
- Een andere mogelijkheid is dat je memory barriers mist. Locks voorzien, los van locking zelf, ook automatische memory barriers. Alles volatile declareren is meestal niet voldoende. Volatile is geen vervanging voor memory barriers (hoewel sommige compilers wel bepaalde efforts doen)
Het enige wat ik kan bedenken is dat de reader op een gegeven moment een BufferInfo object krijgt wat een andere inhoud heeft dan die er net in is geschreven. Oftewel dat ik naar een oude cache zit te kijken.
De std::vector waarin ik de BufferInfo objecten opsla is niet volatile gedeclareerd.
Nu kun je std-containers ook niet volatile declareren, maar het element BufferInfo wat ik erin zet is niet volatile.

Mijn vraag is, is het überhaupt mogelijk dat ik steeds tegen een verouderde set data aan zit te kijken?

Nog een paar toevoegingen:
- er is een flush-functie in BufferQueue die in CircularQueue de readIndex gelijk maakt aan lastWritten. Hiervoor heb ik de update van de readIndex in een critical section geplaatst. Die is heel snel wanneer je 'm vrijwel altijd uitvoert vanuit dezelfde thread.
- ik gebruik een core i7 (1 socket dus) en de hele code bestaat uit zo'n 12 threads (1x QT, 1x USB read-thread, 1x image processing thread, 1x display thread en 8 threads van OpenMP voor de deBayering en verdere post-processing van het beeld)
- ik heb echt performance nodig in de zin van veel frames per second, dus locking is niet een optie. Ik haal nu makkelijk 1000-en frames per sec, maar er komen ook wel situaties voor dat ik ook echt 300 fps moet verwerken. Locks kunnen zomaar 10 - 50 ms duren als het tegenzit en dan heb ik al tig frames verloren.
Nogmaals, locks die 10-50ms duren zie ik niet zo snel gebeuren, tenzij je zelf teveel werk doet binnen die locks.

ASSUME makes an ASS out of U and ME


Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
Ik had het eerst opgelost met een triple buffer, waarin ik maar 1 swap-statement deed tijdens die lock en daar heb ik situaties voorbij zien komen waarbij het verkrijgen van een lock echt 10 - 50 ms kon duren.
Maar die triple buffer is niet voldoende om de data bij te kunnen houden, aangezien je dan hooguit 1 iteratie achter kunt lopen in het verwerken.
Ik werk trouwens onder Windows.

De update van de index wordt als volgt gedaan:
code:
1
2
3
4
5
6
7
8
      size_t computeNextIndex(size_t pointer) const {
          ++pointer;
          if (pointer < _queue.size())
            return pointer;
          else
            return 0;
          // return ((pointer + 1) % _queue.size());
        }


En bijvoorbeeld de update van de read-pointer:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
    template<class DATA_TYPE>
    bool sgm::base::CircularQueue<DATA_TYPE>::updateReadIndex(bool performFlush) {
      bool indexChanged = false;
      _mutex.lock();
      if (performFlush) {
        _readIndex = _writeIndexAtFlush;
        indexChanged = true;
      } else if (!isEmpty()) {
        if (_mustGetLatest) {
          _readIndex = _lastWrittenIndex;
          _mustGetLatest = false;
        } else 
          _readIndex = computeNextIndex(_readIndex);
        indexChanged = true;
      }
      _mutex.unlock();
      return indexChanged;
    }


Die wordt aangeroepen vanuit deze functies:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
    template<class DATA_TYPE>
    bool CircularQueue<DATA_TYPE>::get(DATA_TYPE &data) {
      if (!updateReadIndex())
        return false;
      if (isEmpty())
        _newDataGate.close(); // GN: there is a race-condition possible where the gate is opened between isEmpty() and close().
      data = _queue[_readIndex];
      return true;
    }

    template<class DATA_TYPE>
    bool CircularQueue<DATA_TYPE>::getLatest(DATA_TYPE &data) {
      _mustGetLatest = true;
      return get(data);
    }

    template<class DATA_TYPE>
    void CircularQueue<DATA_TYPE>::flush() {
      _newDataGate.close();
      _writeIndexAtFlush = _lastWrittenIndex;
      updateReadIndex(true);
    }

En het write-deel:
code:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
    template<class DATA_TYPE>
    DATA_TYPE * sgm::base::CircularQueue<DATA_TYPE>::getWriteItem() {
      if (isFull())
        return NULL;
      DATA_TYPE *currentWriteItem = &_queue[_writeIndex];
      return currentWriteItem;
    }

    template<class DATA_TYPE>
    void CircularQueue<DATA_TYPE>::commitWrite() {
      SGM_DEBUG_ASSERT(!isFull());
      SGM_DEBUG_ASSERT(_queue[_writeIndex].isCommitted());
      _lastWrittenIndex = _writeIndex;
      _writeIndex = computeNextIndex(_writeIndex);
      _newDataGate.open();
    }

En het pollen of er al nieuwe data is:
code:
1
2
3
4
5
       bool waitForNewData(unsigned int timeOut) const {
          if (!isEmpty())
            return true; // GN: to overcome a possible race-condition mentioned in the get() function.
          return _newDataGate.tryWait(timeOut);
        }


Ik zie dus echt niet waar ik mis zou gaan met de index.
Ik heb her-en-der ook al de nodige asserts erin gehad om te kijken of de indices verkeerd gingen, maar die gingen dus ook nergens af.
Maar goed, vrijwel alle mogelijke compiler-bugs die ik gevonden dacht te hebben in het verleden, bleken toch eigen bugs te zijn, dus ik ga er nu ook maar vanuit dat ik gewoon (weer eens) blind ben voor m'n eigen bugs.
Vandaar ook dat ik hier kom voor advies waar de fout te zoeken.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • H!GHGuY
  • Registratie: December 2002
  • Niet online

H!GHGuY

Try and take over the world...

Een tijd terug kwam een goed artikel voorbij over lockless queues van Herb Sutter:
http://drdobbs.com/parallel/210604448?pgno=1

Het is de moeite om die eens door te nemen (het is een reeks artikelen, dus lees ze allemaal).

Ik kan enkele besluiten dat je functies te veel doen en ze nogal cryptisch (aka unmaintainable) zijn.
updateReadIndex doet _4_ verschillende dingen afhankelijk van een parameter en een 'global variable in disguise'. Splits die dingen op in aparte functies en roep de juiste aan.
Dit maakt:
- code leesbaarder en beter onderhoudbaar
- sneller (geen if-else's meer)

Ik mag bvb ook hopen dat _newDataGate geen condition variable is. (waar is dan de lock?)

Mijn conclusie blijft hetzelfde:
ofwel mis je memory barriers (op te lossen door locks te gebruiken)
ofwel heb je ergens een off-by-one (-achtige?) fout die sporadisch optreedt (op te lossen door te unit-testen)

ASSUME makes an ASS out of U and ME


Acties:
  • 0 Henk 'm!

  • Soultaker
  • Registratie: September 2000
  • Laatst online: 03:22
Ik ben het grotendeels met H!GHGuY eens, maar je kunt locks ook vermijden door atomaire instructies te gebruiken voor het bijhouden van vrije posities in je buffer. Die zijn wel compiler-specifiek (deze functies voor GCC, bijvoorbeeld) maar hebben als voordeel dat 't impliciete memory barriers zijn zonder verder veel overhead te hebben (aaangenomen dat ze hardwarematig geïmplementeerd kunnen worden op het doelplatform).

Een circulaire buffer is nu niet echt complex, dus het lijkt me dat je daarmee een relatief simpele en toch correcte buffer zou moeten kunnen implementeren.

Er zijn ook libraries als Intel's building blocks for threads die wel concurrent queues bevatten. Performance is waarschijnlijk geen issue als je genoeg werk per frame doet.

Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
Ik had in eerste instantie die updateReadIndex ook wat simpeler. maar je zit met de flush die feitelijk de read-pointer ook ophoogt.
De eerdere poging was dat ik een global bool zette (of een gate, kan ook) die aangaf of de get-functie een flush moest doen danwel een read.
_newDataGate is een gate. De code die met een tryWait staat te wachten gaat pas verder als de gate open is (of als er een timeout is opgetreden, maar dan is de returnwaarde van tryWait false)
Die wordt alleen open gezet door de writethread en dichtgezet door de read-thread.
Gates zijn systeemcalls die je niet hoeft te locken. Ik zal morgen even opzoeken welke calls erachter zitten.

Die artikelen van Herb Sutter had ik allemaal al doorgelezen. Als ik me niet vergis gebruikt hij linked lists waarbij hij de allocatie van die elementen ook moet doen. (maar het principe blijft natuurlijk gelijk)

@Soultaker:
Ik wilde maandag inderdaad eens gaan kijken naar de MS-alternatieven van de operaties die je noemt.
atomaire CAS- en Inc operaties.

Er is sprake van dat we over gaan schakelen naar de Intel compiler, dus dan heb je inderdaad die building-blocks erbij. Blijft over mijn eigen trots dat ik het toch zelf ook moet kunnen.
Performance voor met name de write is heel belangrijk in deze toepassing. Door beslissingen die helaas buiten mij om zijn genomen, zit er hardwarematig heel weinig bufferruimte tussen de camera en de PC, dus ik moet bloedsnel de data kunnen accepteren. Dus de write-thread moet echt snel kunnen gaan schrijven en om er een triple-buffer tussen te gaan zetten klinkt allemaal een beetje als overkill.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • Soultaker
  • Registratie: September 2000
  • Laatst online: 03:22
TD-er schreef op zondag 05 februari 2012 @ 14:29:
Er is sprake van dat we over gaan schakelen naar de Intel compiler, dus dan heb je inderdaad die building-blocks erbij.
Die library is niet alleen voor de Intel compiler hoor. ;)

Acties:
  • 0 Henk 'm!

  • MLM
  • Registratie: Juli 2004
  • Laatst online: 12-03-2023

MLM

aka Zolo

maak je programma zo, dat je zo min mogelijk synchronisatie moet doen, en als je het doet, gebruik dan de juiste primitive.

producer consumer is zeg maar de perfecte situatie voor een fifo/queue, gelijk de enige primitive die je nodig hebt :)

-niks-


Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
MLM schreef op zondag 05 februari 2012 @ 16:41:
maak je programma zo, dat je zo min mogelijk synchronisatie moet doen, en als je het doet, gebruik dan de juiste primitive.

producer consumer is zeg maar de perfecte situatie voor een fifo/queue, gelijk de enige primitive die je nodig hebt :)
Zou je concreet kunnen aangeven wat er dan anders moet?
Een circular buffer is namelijk volgens mij een FIFO/queue.
En per frame synchroniseren lijkt mij juist een redelijk minimaal aantal.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • MLM
  • Registratie: Juli 2004
  • Laatst online: 12-03-2023

MLM

aka Zolo

het idee dat je met multithreading performance haalt is dat je dingen tegelijk doet.
elk frame synchroniseren (ik neem aan dat we het hier over video hebben), hoop ik dat je bedoelt dat je producer niet op hetzelfde frame synct als je consumer, want dan is het gewoon serieel (ie, singlethreaded, maar dan moeilijk gedaan).

ik zou eerst eens checken met een profiler of je daadwerkelijk parallelle performance realiseert met jouw buffer, en anders zou ik overwegen om gewoon eens te kijken naar een library die dit soort dingen voor je regelt (TBB bijvoorbeeld)

-niks-


Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
Die buffer heb ik ingevoerd, omdat ik de communicatie van de camera gewoonweg niet kan verwerken in dezelfde thread als waar ik de processing doe.
Dan verlies ik meer dan 50% van de frames en zodra je PC iets meer doet, zit je al op 75% verlies.
Dus hoe dan ook is het nodig dat er minimaal 2 threads zijn met een buffer er tussen.
De synchronisatie is dat er een gate geopend wordt zodra er beelden zijn die nog niet gelezen zijn en gesloten wordt zodra er geen nieuwe data meer in de buffer zit.
Dus zeker geen serialisatie.

Inmiddels is het als volgt opgezet:
- Communicatie-thread
- buffer
- Image-Process-thread (met openMP geparallelliseerd op de knelpunten)
- Buffer (meerdere soorten buffers voor verschillende soorten frames)
- postprocessing/weergave/etc. afhankelijk van frame-type.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • Zoijar
  • Registratie: September 2001
  • Niet online

Zoijar

Because he doesn't row...

Circulair producer consumer lijkt me wel een goed idee. Heb ik ook ooit gedaan om data van de ene GPU (renderer) naar de andere GPU (post-processor) over te sturen via de PCIe bus. Overigens is een circulair p/c juist geen fifo; een stack is fifo.

Ik gebruikte toen dit; misschien heb je er iets aan? (pas op, oude code ;) Dit draaide trouwens stabiel op 120hz)

C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#ifndef PRODUCER_CONSUMER_H_
#define PRODUCER_CONSUMER_H_

#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_semaphore.hpp>
#include <boost/interprocess/allocators/allocator.hpp>
#include <memory>

namespace FS {

template <typename T = char, typename Alloc = std::allocator<T> >
class SingleProducerConsumerBuffer {
public:
    // this is required because the shared memory maps to a different virtual pointer
    // in each process; i.e., regular pointers into shared memory can't be shared among processes
    // to solve this an offset pointer is used, which represents an offset from the start of the
    // shared memory -- wherever that is for each process.
    typedef typename boost::interprocess::offset_ptr<T> pointer;    

    enum AcquireMode {READ, WRITE};

    struct AcquireSlot {
        AcquireSlot(SingleProducerConsumerBuffer<T, Alloc>& x, AcquireMode am, bool block = true) : pcb(x), mode(am) {
            if (mode == READ) {
                ptr = pcb.acquire_read_slot(block, slot_nr);
            } else {
                ptr = pcb.acquire_write_slot(block, slot_nr);
            }
        }

        ~AcquireSlot() {
            if (ptr) {
                if (mode == READ) {
                    pcb.release_read_slot();
                } else {
                    pcb.release_write_slot();
                }
            }
        }

        T* ptr;
        size_t slot_nr;
    private:
        SingleProducerConsumerBuffer<T, Alloc>& pcb;
        AcquireMode mode;
    private:
        AcquireSlot(const AcquireSlot& src);
        AcquireSlot& operator=(const AcquireSlot& src);
    };

    typedef boost::shared_ptr<AcquireSlot> SlotPtr;

    SlotPtr acquireSlot(AcquireMode am, bool block = true) {
        return SlotPtr(new AcquireSlot(*this, am, block));
    }

    SingleProducerConsumerBuffer(size_t nr_slots, size_t slot_size, Alloc a = Alloc()) :
        sem_full(0), sem_empty(nr_slots), alloc(a), read_index(0), write_index(0), slots(nr_slots), slot_sz(slot_size)
    {
        storage = alloc.allocate(slots*slot_sz);
    }

    ~SingleProducerConsumerBuffer() {
        alloc.deallocate(storage.get(), slots*slot_sz);
    }

    size_t getSlotSize() const {
        return slot_sz;
    }

    size_t getNrSlots() const {
        return slots;
    }
    
protected:
    T* acquire_write_slot(bool block, size_t& slot_out) {
        if (block) {
            sem_empty.wait();
        } else if (!sem_empty.try_wait()) {
            return 0;
        }

        T* result = &storage[write_index*slot_sz];
        slot_out = write_index;

        advance(write_index);

        return result;
    }

    void release_write_slot() {
        sem_full.post();
    }

    T* acquire_read_slot(bool block, size_t& slot_out) {
        if (block) {
            sem_full.wait();
        } else if (!sem_full.try_wait()) {
            return 0;
        }

        T* result = &storage[read_index*slot_sz];
        slot_out = read_index;

        advance(read_index);
        return result;
    }

    void release_read_slot() {
        sem_empty.post();
    }

private:
    void advance(size_t& i) const {
        i = (i+1)%slots;
    }

private:
    boost::interprocess::interprocess_semaphore sem_full, sem_empty;
    Alloc alloc;
    size_t read_index, write_index, slots, slot_sz;
    pointer storage;
};

template <typename T, typename Segment = boost::interprocess::managed_shared_memory>
struct SHMTypes {
    typedef T type;
    typedef typename Segment::segment_manager segment_manager;
    typedef typename boost::interprocess::allocator<type, segment_manager> allocator;
    typedef typename FS::SingleProducerConsumerBuffer<type, allocator> ProducerConsumerBuffer;
};

typedef SHMTypes<char>::ProducerConsumerBuffer ShmPCB;
typedef SHMTypes<char>::allocator ShmAlloc;

typedef SingleProducerConsumerBuffer<char, std::allocator<char> > HeapPCB;

} // namespace FS

#endif

Acties:
  • 0 Henk 'm!

  • Soultaker
  • Registratie: September 2000
  • Laatst online: 03:22
Zoijar schreef op maandag 06 februari 2012 @ 10:35:
Overigens is een circulair p/c juist geen fifo; een stack is fifo.
Ik denk dat je in de war bent. :P

FIFO betekent "first in, first out": het element dat er als eerste in gegaan is, komt er ook als eerste weer uit. Dat is een queue. De volgorde van elementen blijft behouden. Dat is wat de TS wil.

LIFO betekent "last in, first out": het element dat het laatst is toegevoegd komt er als eerste weer uit. Dat is een stack.

(Je kunt die dingen natuurlijk ook LILO, FOFI, FILO of nog anders noemen, maar daar wordt het niet minder verwarrend van.)

[ Voor 10% gewijzigd door Soultaker op 06-02-2012 18:30 ]


Acties:
  • 0 Henk 'm!

  • Zoijar
  • Registratie: September 2001
  • Niet online

Zoijar

Because he doesn't row...

Soultaker schreef op maandag 06 februari 2012 @ 18:27:
Ik denk dat je in de war bent. :P

FIFO betekent "first in, first out": het element dat er als eerste in gegaan is, komt er ook als eerste weer uit. Dat is een queue. De volgorde van elementen blijft behouden. Dat is wat de TS wil.
Ik geef toe dat ik ziek ben en niet helder denk, maar.... :)


FIFO behoudt juist niet de volgorde. Je krijgt altijd de nieuwste frame, en werkt je dan terug naar oudere frames, totdat je weer een nieuwe krijgt. Je kan dus een output van frame nummers 1,2,3,4,5,6 hebben, en een verwerking -- afhankelijk van de snelheid -- van 1,3,5,6,4,2 met FIFO

Als je altijd alleen de nieuwste wilt en de oude weg kan gooien, dan heb je helemaal geen echte buffer nodig, dan swap je gewoon tussen twee geheugen locaties.

Een circulair buffer leest alle elementen op volgorde, daar lees je met output 1,2,3,4,5,6 altijd 1,2,3,4,5,6 terug, zij het vertraagd.

Ja ok je hebt gelijk :X :+ duidelijk geen goed moment om code te schrijven...

[ Voor 14% gewijzigd door Zoijar op 06-02-2012 18:54 ]


Acties:
  • 0 Henk 'm!

  • MSalters
  • Registratie: Juni 2001
  • Laatst online: 00:05
Soultaker schreef op zondag 05 februari 2012 @ 14:23:
je kunt locks ook vermijden door atomaire instructies te gebruiken voor het bijhouden van vrije posities in je buffer. Die zijn wel compiler-specifiek
std::atomic<T> is dat niet. Wel bij blijven he ;)

Man hopes. Genius creates. Ralph Waldo Emerson
Never worry about theory as long as the machinery does what it's supposed to do. R. A. Heinlein


Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
C++0x (of C++11) zit nog niet echt in Studio 2005.
Maar ik ben inmiddels al wel wat verder en alvast een paar potentiele concurrency-issues eruit gehaald. Helaas treed het probleem nog steeds op.
Een van de potentiele problemen was dat writeIndex eerst aan lastWritten toegekend werd en dan pas writeindex verhoogd werd. Het kan dus voorkomen dat je dan krijgt dat readIndex, lastWrittenIndex en writeIndex op dezelfde positie kunnen komen en tegelijk de gate ook open is.
Theoretisch kan het dan voorkomen dat de readPointer opgehoogd zou worden (omdat de gate open is), waardoor je precies de lengte van de grote buffer achter komt te liggen. Dat is dus precies wat er mis gaat, maar kennelijk kan dat nog op een andere manier optreden, want die fix was dus niet de oplossing.
En de asserts die ik op de index-variabelen hebt gezet gaan niet af.
Dus ik heb nog steeds sterk het idee dat dezelfde variabele in verschillende threads op hetzelfde moment wel eens een verschillende waarde kan hebben. Oftewel dat het toch niet zo volatile is als je zou verwachten.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • MLM
  • Registratie: Juli 2004
  • Laatst online: 12-03-2023

MLM

aka Zolo

met threading moet je erg goed opletten wat je doet.

een "volatile" qualifier lost in zijn eentje geen threading problemen op, volatile impliceert alleen (op MSVC, dacht in GCC ook) dat:
1) alle reads en writes naar volatiles zullen in de C++ volgorde in de machinecode terechtkomen
2) reads van dezelfde volatile mogen niet gecached worden in een temporary

maar die voorwaarde van 1) betekend niet dat de CPU zich daaraan houd, die mag gewoon reads uit de "toekomst" doen (in elk geval gaat dat op voor x86 en x64) als een optimalisatie (en nog veel meer mindboggling dingen, maar daar gaat het nu niet over).

om de CPU te forceren te doen wat jij wilt, MOET je memory barriers gebruiken (daar zijn intrinsics voor). daarnaast zijn er ook nog atomic instructions, dat zijn ook memory barriers (wederom, voor x86 en x64).

al met al is dit heel complexe materie, ik raad je echt aan om een library (TBB) te gebruiken, of een high-level primitive (mutex/futex/semaphore/queue) waarvan je zeker weet dat het werkt (ie, al debugged door pro's). als je zelf iets schrijft kan het heel goed LIJKEN dat het werkt, maar kan het nog steeds bugged zijn op andere systemen etc.

zonder precies te weten wat je huidige constructie is qua thread-safety, heb je al gewoon een mutex in je circulaire buffer, en elke read/write serializeren al geprobeerd? kan al prima parallellisatie opleveren als je frames voldoende workload opleveren voor een thread, en dan heb je in elk geval een werkende baseline om vanaf te werken.

-niks-


Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
Met de huidige camera zal ik in de praktijk al > 100 fps halen, maar met de volgende camera zullen we gaan tot 300 fps, waarvoor ik dan dezelfde buffer wil gebruiken.
Met zo'n 3 ms per frame gaat locking waarschijnlijk te veel tijd kosten. (al zal de nieuwe camera wel meer geheugen gaan krijgen)

Maar ik zal eens kijken naar de memory barriers.

Ik heb gisteren wat debug-informatie laten verzamelen (en laten analyseren), maar het lijkt altijd "goed" te gaan qua indices en volgorde van buffers die geschreven en gelezen worden.
Die informatie verzamel ik dan in de commitWrite en get functies, in elk aparte datastructuren.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • H!GHGuY
  • Registratie: December 2002
  • Niet online

H!GHGuY

Try and take over the world...

TD-er schreef op donderdag 09 februari 2012 @ 07:00:
Met zo'n 3 ms per frame gaat locking waarschijnlijk te veel tijd kosten. (al zal de nieuwe camera wel meer geheugen gaan krijgen)
Nogmaals, 3ms is tijd zat voor locking. Als er geen contention is, dan kost die lock gewoon quasi-niets.

Trouwens is het enige wat je moet locken de indices. Je moet niet beginnen met grote buffers kopieren als je de lock houdt. Enkel het veranderen en vergelijken van de indices moet je locken.

zeer simpel en fout voorbeeld:
C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
template<class T>
class CircBuf {
public:
  bool ReadNext(T& out)
  {
     my_return = m_Buf.size();
     {
         ScopedLock lock(m_Mutex);
         if (readIndex +1 < writeIndex) // zeer naïef en fout
            my_return = readIndex++;
     }
     if (my_return < m_Buf.size())
     {
       out = m_Buf[my_return];
       return true;
     }
     return false;
  }
private:
  size_t readIndex, writeIndex;
  Mutex m_Mutex;
  std::vector<T> m_buf;
};

[ Voor 33% gewijzigd door H!GHGuY op 09-02-2012 12:54 ]

ASSUME makes an ASS out of U and ME


Acties:
  • 0 Henk 'm!

  • .oisyn
  • Registratie: September 2000
  • Laatst online: 12-09 15:22

.oisyn

Moderator Devschuur®

Demotivational Speaker

H!GHGuY schreef op donderdag 09 februari 2012 @ 12:48:
[...]


Nogmaals, 3ms is tijd zat voor locking. Als er geen contention is, dan kost die lock gewoon quasi-niets.
En zelfs als er wél contention is is 3ms peanuts. Een cycle op een 3GHz CPU duurt 1/3ns. 3ms is dus 9 miljoen cycles. Een atomaire memory instructie zit in de orde van grootte van 200 cycles.

Ter indicatie, in onze engine schedulen we tasks van enkele tientallen tot enkele hondertallen microseconden. Alle task threads locken op een semaphore voor een nieuwe workload. In die 3ms werken we iets van 500 tasks af.

Als locks te lang duren omdat een andere thread de locks te lang vasthoudt dan kun je nog amper spreken van parallellisatie, en zul je eens op je achterhoofd moeten krabben of je niet iets verkeer aan het doen bent.

[ Voor 14% gewijzigd door .oisyn op 09-02-2012 13:26 ]

Give a man a game and he'll have fun for a day. Teach a man to make games and he'll never have fun again.


Acties:
  • 0 Henk 'm!

  • Soultaker
  • Registratie: September 2000
  • Laatst online: 03:22
MSalters schreef op woensdag 08 februari 2012 @ 00:30:
std::atomic<T> is dat niet. Wel bij blijven he ;)
Goed punt, daarmee wordt het wel heel makkelijk.

C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
const int nbuffers = 10;
int read_index = 0, write_index = 1;
std::atomic<int> nreadable(0), nwritable(nbuffers - 2);

int get_next_write_buffer()
{
    int tmp = nwritable;
    if (tmp < 1) return -1;
    while (!nwritable.compare_exchange_weak(tmp, tmp - 1)) tmp = nwritable;
    nreadable += 1;
    return write_index = (write_index + 1)%nbuffers;
}

int get_next_read_buffer(int nth = 1)
{
    int tmp = nreadable;
    if (tmp < nth) return -1;
    while (!nreadable.compare_exchange_weak(tmp, tmp - nth)) tmp = nreadable;
    nwritable += nth;
    return read_index = (read_index + nth)%nbuffers;
}

int get_last_read_buffer()
{
    if (nreadable < 1) return -1;
    return get_next_read_buffer(nreadable);
}


Dan ben je er, toch? Of zie ik wat over het hoofd?

De vraag is nog wel wat je doet als je geen nieuwe buffer kunt krijgen. De beste oplossing is waarschijnlijk een condition variable toevoegen waar je op wacht (en die gesignald wordt door de andere thread elke keer dat 'ie een nieuwe buffer vrijgeeft).

Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
Ik zal van de week nog eens een keertje gaan testen met locking, of het inderdaad zo licht is.
Mijn tests van een tijdje terug, met een simpele triple buffer (lock was alleen maar om een swap van 2 pointers heen) op een core2duo had toch echt af en toe meer dan 10 ms nodig.
Maar goed, dat zou natuurlijk met een CAS operatie uitgevoerd kunnen worden en een lock die constant gedeeld wordt tussen 2 threads is natuurlijk een stukje zwaarder dan een lock om een parameter in 1 thread aan te passen.

Inmiddels heb ik memory-barriers toegepast in mijn buffer en ook een paar atomaire operatoren om de indices op te hogen.
Tevens heb ik echt het nodige laten loggen en de buffer lijkt het goed te doen, maar ik heb nog steeds af en toe dat de write-thread aan het schrijven is in een gebied wat tegelijk gelezen wordt. (kan ik zien doordat een deel van het beeld overschreven wordt) Hierdoor loop je dus een complete buffer-lengte achter voor de data die niet overschreven wordt door de write-thread en het stuk wat wel overschreven wordt is wel up-to-date.

Kortom, knap frustrerend allemaal.

De leesthread lijkt netjes na de write thread te komen, de timestamp van het grabben zie ik ook netjes terug in de gelezen data zonder noemenswaardige vertraging (pak 'm beet 0.05 - 0.1 ms later) en de pointers die ik terugkrijg en gebruik zijn ook werkelijk steeds dezelfde.

Ik heb ook nergens dingen die kunnen blijven hangen op oude waarden (pointers worden netjes NULL gemaakt als ik ze niet meer nodig heb).
Nou ja morgen is er weer een dag.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • EddoH
  • Registratie: Maart 2009
  • Niet online

EddoH

Backpfeifengesicht

TD-er schreef op donderdag 09 februari 2012 @ 21:58:
Ik zal van de week nog eens een keertje gaan testen met locking, of het inderdaad zo licht is.
Mijn tests van een tijdje terug, met een simpele triple buffer (lock was alleen maar om een swap van 2 pointers heen) op een core2duo had toch echt af en toe meer dan 10 ms nodig.

...

Inmiddels heb ik memory-barriers toegepast in mijn buffer en ook een paar atomaire operatoren om de indices op te hogen.
Tevens heb ik echt het nodige laten loggen en de buffer lijkt het goed te doen, maar ik heb nog steeds af en
Just to be sure: ik weet niet hoe je logging in elkaar steekt, maar ben je er zeker van dat niet juist je logging de gemeten vertraging bepaalt? Je zult niet de eerste zijn die op de verkeerde manier benchmarking uitvoert.

Acties:
  • 0 Henk 'm!

  • Zoijar
  • Registratie: September 2001
  • Niet online

Zoijar

Because he doesn't row...

TD-er schreef op donderdag 09 februari 2012 @ 21:58:
Inmiddels heb ik memory-barriers toegepast in mijn buffer en ook een paar atomaire operatoren om de indices op te hogen.
Je moet eigenlijk niet zo maar dingen toevoegen zodat het dan misschien wel werkt. Dat lukt met concurrent code nooit. Je moet goed uitdenken wat je wilt en waar het fout kan gaan. Elke statement moet je nadenken, wat als nu vlak hierna de variabelene wijzigen. Kan dat? Wat gebeurt er dan? Advies, je code is nogal rommelig. Neem gewoon een bestaande implementatie en breidt die uit voor wat je nodig hebt. Concurrent code is zo'n beetje het lastigste wat er is. Twee statements omdraaien kan soms al zorgen voor een fout. Maar uiteindelijk hoeft het helemaal niet ingewikkeld te zijn; kijk naar mijn code als voorbeeld, twee semaphores, wait de een op een acquire, post de ander op een release, done.

Acties:
  • 0 Henk 'm!

  • MSalters
  • Registratie: Juni 2001
  • Laatst online: 00:05
Soultaker schreef op donderdag 09 februari 2012 @ 19:01:
[...]
C++:
1
2
3
4
5
6
7
8
9
10
11
const int nbuffers = 10;
int read_index = 0, write_index = 1;
std::atomic<int> nreadable(0), nwritable(nbuffers - 2);

//...

int get_last_read_buffer()
{
    if (nreadable < 1) return -1;
    return get_next_read_buffer(nreadable);
}


Dan ben je er, toch? Of zie ik wat over het hoofd?
Die laatste functie ziet er verdacht uit; twee reads?

Man hopes. Genius creates. Ralph Waldo Emerson
Never worry about theory as long as the machinery does what it's supposed to do. R. A. Heinlein


Acties:
  • 0 Henk 'm!

  • TD-er
  • Registratie: Januari 2000
  • Laatst online: 03-09 09:15
EddoH schreef op vrijdag 10 februari 2012 @ 09:46:
[...]


Just to be sure: ik weet niet hoe je logging in elkaar steekt, maar ben je er zeker van dat niet juist je logging de gemeten vertraging bepaalt? Je zult niet de eerste zijn die op de verkeerde manier benchmarking uitvoert.
Loggen van dat soort zaken doe ik door een tijdsduur te berekenen (simpele systemcalls, op 1 en dezelfde thread) en dat in een enkele vector op te slaan (vooraf ruimschoots gealloceerd, zodat duur van allocatie geen issue is).
Die data kun je dan op een gegeven moment gewoon bekijken in de debugger.
Output naar debugout is inderdaad gewoon te traag danwel vertragend.
Zoijar schreef op vrijdag 10 februari 2012 @ 10:43:
[...]

Je moet eigenlijk niet zo maar dingen toevoegen zodat het dan misschien wel werkt. Dat lukt met concurrent code nooit. Je moet goed uitdenken wat je wilt en waar het fout kan gaan. Elke statement moet je nadenken, wat als nu vlak hierna de variabelene wijzigen. Kan dat? Wat gebeurt er dan? Advies, je code is nogal rommelig. Neem gewoon een bestaande implementatie en breidt die uit voor wat je nodig hebt. Concurrent code is zo'n beetje het lastigste wat er is. Twee statements omdraaien kan soms al zorgen voor een fout. Maar uiteindelijk hoeft het helemaal niet ingewikkeld te zijn; kijk naar mijn code als voorbeeld, twee semaphores, wait de een op een acquire, post de ander op een release, done.
De code is inmiddels (mede door jullie opmerkingen) al behoorlijk opgeschoond, maar ik kan die stukjes helaas nu niet hier posten. (vergeten toe te mailen)

Het enige waar ik nog over twijfel is of de elementen in de _queue (std::vector) op de een of andere manier toch nog gecached kunnen zijn. (oftewel niet volatile) Ik geef alleen de pointer door van het element.
Maar aan de andere kant heb ik de objecten die daarin hangen voorzien van wat extra debug-informatie en het lijkt altijd goed te gaan in de buffer. Zo sla ik bijvoorbeeld het moment van opslaan op in de meta-data zodat ik kan zien dat ik niet naar een stuk oudere data zit te kijken bij het ophalen.
Mijn probleem is namelijk dat ik kennelijk in de read-thread zit te kijken naar de data die op dat moment geschreven wordt. Oftewel dat zou betekenen dat de readpointer gelijk is aan de write-pointer, danwel dat ik zit te kijken naar de data die erin stond voordat de writer-data committed is. (vandaar ook mijn twijfel over de volatile-ness van de data in de queue.
Maar alle dumps van de debugdata wijzen uit dat ik wel degelijk naar de juiste data zou zitten te kijken.

Een goedkope voeding is als een lot in de loterij, je maakt kans op een paar tientjes korting, maar meestal betaal je de hoofdprijs. mijn posts (nodig wegens nieuwe layout)


Acties:
  • 0 Henk 'm!

  • H!GHGuY
  • Registratie: December 2002
  • Niet online

H!GHGuY

Try and take over the world...

Zoijar schreef op vrijdag 10 februari 2012 @ 10:43:
[...]

Je moet eigenlijk niet zo maar dingen toevoegen zodat het dan misschien wel werkt. Dat lukt met concurrent code nooit.
Volledig mee eens. 'Zomaar' wat memory barriers toevoegen kan nooit goed gaan.
Je moet ze op de juiste plek toevoegen en als je read en write barriers gebruikt ook nog eens uitbalanceren.
Ik durf te beweren dat ze niet op de juiste plaats staan.

Het eerste punt blijft nog altijd bestaan: gooi er op de juiste plaats locks tegenaan en je probleem is opgelost.

ASSUME makes an ASS out of U and ME


Acties:
  • 0 Henk 'm!

  • Soultaker
  • Registratie: September 2000
  • Laatst online: 03:22
MSalters schreef op vrijdag 10 februari 2012 @ 11:12:
Die laatste functie ziet er verdacht uit; twee reads?
Dat maakt niet uit; het is een single-reader/single-writer scenario is. Dat betekent dat tussen twee reads nreadable hooguit verhoogd kan worden door de writer (maar niet verlaagd, want dat doet de reader).

Als je optimale performance zou willen zou je het zo kunnen schrijven:
C++:
1
2
3
4
5
6
7
8
int get_last_read_buffer()
{
    int tmp = nreadable;
    if (tmp < 1) return -1;
    while (!nreadable.compare_exchange_weak(tmp, 0)) tmp = nreadable;
    nwritable += tmp;
    return read_index = (read_index + tmp)%nbuffers;
}

Dat voorkomt nog een stuk of twee atomaire reads. Dan is de code wel wat uitgebreider. Maar dan kun je de extra parameter van get_next_read_buffer() weer weghalen. Het is maar net wat je mooi vindt; functioneel gezien maakt het weinig uit, denk ik?
Pagina: 1