Ik ben reeds enige tijd bezig met de gedachte van een multi-producer, multi-consumer queue.
Hierin ben ik gelukkig niet de enige en er is dan ook een hoop te vinden. Een van de belangrijkste is natuurlijk
boost. Een eigenaardige beperking aan deze boost::lockfree::queue is echter dat het datatype een triviale destructor moet hebben.
Dat vind ik aardig afbreuk doen aan het (wat mij betreft) ideaal beeld van elke thread die een job mag queuen. Die job kan namelijk niet in de uitvoerende thread-memory-space gealloceerd worden. Je kan wel een job maken die dat dan weer doet, maar ook dat vereist weer werk wat imho. niet triviaal is om goed te doen.
Wat mij betreft zou je gewoon een std::function< void() > op een queue moeten pushen and that's it.
Met die insteek ben ik aan de slag gegaan en heb ik uiteindelijk iets geschreven wat dat ook daadwerkelijk kan.
Om dit alles in perspectief te zetten had ik een test setup geschreven zodat ik ook zinnige vergelijkingen kon maken met andere methoden. Wat ik syntactisch probeer te doen lijkt nog het meest op boost::asio, daar heb ik dus ook een vergelijking mee gemaakt.
Daarnaast heb ik ook een vergelijking gemaakt met een standaard vector beschermt met een std::mutex, als een soort sanity check zeg maar.
Over het algemeen zijn de resultaten nogal rooskleurig voor mijn brouwsel, hetgeen mij doet vermoeden dat ik een onbedoelde bias voor mijn setup heb ingebouwd. Ik had dit al eens op codereview.stackoverflow gepost maar daar gebeurt niet zoveel..
Om de bias zoveel mogelijk uit te sluiten heb ik de test aangepast zodat deze direct aansluit op boost. Dit gaf de volgende resultaten op mijn dual core laptop:
De implementatie zou nog netter kunnen door mbv. templates de queue fixed size of niet te maken, custom allocators, dat soort dingen. Maar graag hoor ik jullie mening over de test-setup en of de vergelijking eerlijk en representatief is voor daadwerkelijke performance impact.
Hier is de main.cpp
De rest van de code is hier te vinden:
https://github.com/arjanhouben/lock_free_fifo
Hierin ben ik gelukkig niet de enige en er is dan ook een hoop te vinden. Een van de belangrijkste is natuurlijk
boost. Een eigenaardige beperking aan deze boost::lockfree::queue is echter dat het datatype een triviale destructor moet hebben.
Dat vind ik aardig afbreuk doen aan het (wat mij betreft) ideaal beeld van elke thread die een job mag queuen. Die job kan namelijk niet in de uitvoerende thread-memory-space gealloceerd worden. Je kan wel een job maken die dat dan weer doet, maar ook dat vereist weer werk wat imho. niet triviaal is om goed te doen.
Wat mij betreft zou je gewoon een std::function< void() > op een queue moeten pushen and that's it.
Met die insteek ben ik aan de slag gegaan en heb ik uiteindelijk iets geschreven wat dat ook daadwerkelijk kan.
Om dit alles in perspectief te zetten had ik een test setup geschreven zodat ik ook zinnige vergelijkingen kon maken met andere methoden. Wat ik syntactisch probeer te doen lijkt nog het meest op boost::asio, daar heb ik dus ook een vergelijking mee gemaakt.
Daarnaast heb ik ook een vergelijking gemaakt met een standaard vector beschermt met een std::mutex, als een soort sanity check zeg maar.
Over het algemeen zijn de resultaten nogal rooskleurig voor mijn brouwsel, hetgeen mij doet vermoeden dat ik een onbedoelde bias voor mijn setup heb ingebouwd. Ik had dit al eens op codereview.stackoverflow gepost maar daar gebeurt niet zoveel..
Om de bias zoveel mogelijk uit te sluiten heb ik de test aangepast zodat deze direct aansluit op boost. Dit gaf de volgende resultaten op mijn dual core laptop:
boostlockfree:
{
single producer, single consumer took: 1.10634 seconds
single producer, multi consumer took: 1.10573 seconds
multi producer, single consumer took: 0.867554 seconds
multi producer, multi consumer took: 0.712594 seconds
total: 4.29545 seconds
}
boostasio:
{
single producer, single consumer took: 2.14071 seconds
single producer, multi consumer took: 6.53309 seconds
multi producer, single consumer took: 6.81575 seconds
multi producer, multi consumer took: 11.5268 seconds
total: 27.0172 seconds
}
lock_free::fifo:
{
single producer, single consumer took: 0.568918 seconds
single producer, multi consumer took: 0.449427 seconds
multi producer, single consumer took: 0.509566 seconds
multi producer, multi consumer took: 0.423657 seconds
total: 2.00084 seconds
}
mutex_queue:
{
single producer, single consumer took: 0.33865 seconds
single producer, multi consumer took: 4.2441 seconds
multi producer, single consumer took: 4.19272 seconds
multi producer, multi consumer took: 8.12005 seconds
total: 16.9422 seconds
}
De implementatie zou nog netter kunnen door mbv. templates de queue fixed size of niet te maken, custom allocators, dat soort dingen. Maar graag hoor ik jullie mening over de test-setup en of de vergelijking eerlijk en representatief is voor daadwerkelijke performance impact.
Hier is de main.cpp
C++:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
| #include <iostream> #include <functional> #include <thread> #include <sstream> #include <atomic> #include <vector> #include <chrono> #include <mutex> #include <lock_free/fifo.h> #include <lock_free/shared_mutex.h> #include <boost/asio/io_service.hpp> #include <boost/lockfree/queue.hpp> using namespace std; using namespace chrono; using namespace boost::asio; typedef function< void() >function_type; template < typename T > struct boostlockfree { boostlockfree( size_t r = 1024 ) : jobs_( r ) {} inline void push_back( T t ) { jobs_.push( t ); } inline bool pop( T &t ) { return jobs_.pop( t ); } boost::lockfree::queue< T > jobs_; }; template < typename T > struct boostasio { boostasio( size_t = 1024 ) {} inline void push_back( T t ) { service_.post( *t ); } inline bool pop( T &t ) { static function_type tmp = [](){}; t = &tmp; return service_.run_one() > 0; } io_service service_; }; template < typename T > struct mutex_queue { mutex_queue( size_t r = 1024 ) : lock_(), index_( 0 ), data_( r ) { data_.clear(); } inline void push_back( const T &t ) { lock_guard< mutex > guard( lock_ ); data_.push_back( t ); } inline bool pop( T &t ) { lock_guard< mutex > guard( lock_ ); if ( index_ == data_.size() ) { return false; } t = data_[ index_++ ]; return true; } mutex lock_; size_t index_; vector< T > data_; }; template < typename T > T to( const string &str ) { T result; stringstream( str ) >> result; return result; } template < typename T > function_type get_producer( T &&t ) { return get< 0 >( t ); } template < typename T > function_type get_consumer( T &&t ) { return get< 1 >( t ); } template < typename T > function_type get_result( T &&t ) { return get< 2 >( t ); } template < typename Q > void test( const string &testname, size_t count, size_t threadcount ) { auto create_producer_consumer_result = [=]( const string &name ) { high_resolution_clock::time_point t1 = high_resolution_clock::now(); auto data = make_shared< Q >( count ); auto tmp = new function_type( [data]() { ++data->consumer_count; } ); function_type producer = [data,tmp]() { while ( data->producer_count++ < data->expected ) { data->queue.push_back( tmp ); } if ( data->producer_count >= data->expected ) { --data->producer_count; } }; function_type consumer = [data]() { while ( data->consumer_count < data->expected ) { function_type *func; while ( data->queue.pop( func ) ) { (*func)(); } } }; function_type result = [=]() { high_resolution_clock::time_point t2 = high_resolution_clock::now(); duration< double > time_span = duration_cast< duration< double > >( t2 - t1 ); if ( data->expected != data->consumer_count ) { cout << "\texpected: " << data->expected << ", actual: " << data->consumer_count << endl; } cout << '\t' << name << " took: " << time_span.count() << " seconds" << endl; delete tmp; }; return make_tuple( producer, consumer, result ); }; high_resolution_clock::time_point teststart = high_resolution_clock::now(); cout << testname << ":\n{\n"; // single producer, single consumer { auto pcr = create_producer_consumer_result( "single producer, single consumer" ); get_producer( pcr )(); get_consumer( pcr )(); get_result( pcr )(); } // single producer, multi consumer { auto pcr = create_producer_consumer_result( "single producer, multi consumer" ); get_producer( pcr )(); vector< thread > threads; size_t c = threadcount; while ( c-- ) { threads.push_back( thread( get_consumer( pcr ) ) ); } for ( auto &t : threads ) { t.join(); } get_result( pcr )(); } // multi producer, single consumer { auto pcr = create_producer_consumer_result( "multi producer, single consumer" ); vector< thread > threads; size_t c = threadcount; while ( c-- ) { threads.push_back( thread( get_producer( pcr ) ) ); } for ( auto &t : threads ) { t.join(); } get_consumer( pcr )(); get_result( pcr )(); } // multi producer, multi consumer { auto pcr = create_producer_consumer_result( "multi producer, multi consumer" ); vector< thread > threads; size_t c = threadcount / 2; while ( c-- ) { threads.push_back( thread( get_producer( pcr ) ) ); threads.push_back( thread( get_consumer( pcr ) ) ); } for ( auto &t : threads ) { t.join(); } get_result( pcr )(); } duration< double > time_span = duration_cast< duration< double > >( high_resolution_clock::now() - teststart ); cout << "\ttotal: " << time_span.count() << " seconds\n}" << endl; } template < typename T > struct test_data { test_data( size_t e ) : expected( e ), queue(), producer_count( 0 ), consumer_count( 0 ) { } const size_t expected; T queue; atomic_size_t producer_count; atomic_size_t consumer_count; }; int main( int argc, char *argv[] ) { const auto test_count = 1e6; const auto thread_count = argc > 1 ? to< size_t >( argv[ 1 ] ) : 16; test< test_data< boostlockfree< function_type* > > >( "boostlockfree", test_count, thread_count ); test< test_data< boostasio< function_type* > > >( "boostasio", test_count, thread_count ); test< test_data< lock_free::fifo< function_type* > > >( "lock_free::fifo", test_count, thread_count ); test< test_data< mutex_queue< function_type* > > >( "mutex_queue", test_count, thread_count ); return 0; } |
De rest van de code is hier te vinden:
https://github.com/arjanhouben/lock_free_fifo
oprecht vertrouwen wordt nooit geschaad