NEURON
bbssrv2mpi.cpp
Go to the documentation of this file.
1 #include <../../nrnconf.h>
2 #include "bbsconf.h"
3 #include <nrnmpi.h>
4 #if NRNMPI // to end of file
5 #include <stdio.h>
6 #include <string.h>
7 #include <assert.h>
8 #include "bbssrv2mpi.h"
9 #include "bbssrv.h"
10 #include "bbsimpl.h"
11 #include "hocdec.h" //Printf
12 
13 void nrnbbs_context_wait();
14 
16 
17 #if defined(HAVE_STL)
18 #if defined(HAVE_SSTREAM) // the standard ...
19 #include <map>
20 #include <set>
21 #else
22 #include <pair.h>
23 #include <multimap.h>
24 #include <map.h>
25 #include <set.h>
26 #include <multiset.h>
27 #endif
28 
29 #define debug 0
30 
31 #define MessageList MpiMessageList
32 #define WorkItem MpiWorkItem
33 #define WorkList MpiWorkList
34 #define ReadyList MpiReadyList
35 #define ResultList MpiResultList
36 #define PendingList MpiPendingList
37 #define LookingToDoList MpiLookingToDoList
38 
39 class WorkItem {
40 public:
41  WorkItem(int id, bbsmpibuf* buf, int cid);
42  virtual ~WorkItem();
43  WorkItem* parent_;
44  int id_;
45  bbsmpibuf* buf_;
46  int cid_; // mpi host id
47  bool todo_less_than(const WorkItem*)const;
48 };
49 
50 struct ltstr {
51  bool operator() (const char* s1, const char* s2) const {
52  return strcmp(s1, s2) < 0;
53  }
54 };
55 
56 struct ltint {
57  bool operator() (int i, int j) const {
58  return i < j;
59  }
60 };
61 
62 struct ltWorkItem {
63  bool operator() (const WorkItem* w1, const WorkItem* w2) const {
64  return w1->todo_less_than(w2);
65  }
66 };
67 
68 static char* newstr(const char* s) {
69  char* s1 = new char[strlen(s) + 1];
70  strcpy(s1, s);
71  return s1;
72 }
73 
74 WorkItem::WorkItem(int id, bbsmpibuf* buf, int cid) {
75 #if debug == 2
76 printf("WorkItem %d\n", id);
77 #endif
78  id_ = id;
79  buf_ = buf;
80  cid_ = cid;
81  parent_ = nil;
82 }
83 
84 WorkItem::~WorkItem() {
85 #if debug
86 printf("~WorkItem %d\n", id_);
87 #endif
88 }
89 
90 bool WorkItem::todo_less_than(const WorkItem* w) const {
91  WorkItem* w1 = (WorkItem*)this;
92  WorkItem* w2 = (WorkItem*)w;
93  while (w1->parent_ != w2->parent_) {
94  if (w1->id_ < w2->id_) {
95  w2 = w2->parent_;
96  }else{
97  w1 = w1->parent_;
98  }
99  }
100 #if debug
101 printf("todo_less_than %d < %d return %d\n", this->id_, w->id_, w1->id_ < w2->id_);
102 #endif
103  return w1->id_ < w2->id_;
104 }
105 
106 class MessageList : public std::multimap <const char*, bbsmpibuf*, ltstr>{};
107 class PendingList : public std::multimap <const char*, const int, ltstr>{};
108 class WorkList : public std::map <int, const WorkItem*, ltint>{};
109 class LookingToDoList : public std::set <int, ltint>{};
110 class ReadyList : public std::set<const WorkItem*, ltWorkItem>{};
111 class ResultList: public std::multimap<int, const WorkItem*, ltint>{};
112 #else
113 class MessageList {};
114 class PendingList {};
115 class WorkList {};
116 class LookingToDoList {};
117 class ReadyList {};
118 class ResultList {};
119 #endif
120 
122 #if defined(HAVE_STL)
123  messages_ = new MessageList();
124  work_ = new WorkList();
125  todo_ = new ReadyList();
126  results_ = new ResultList();
127  pending_ = new PendingList();
128  looking_todo_ = new LookingToDoList();
129  send_context_ = new LookingToDoList();
130  next_id_ = FIRSTID;
131  context_buf_ = nil;
132  remaining_context_cnt_ = 0;
133 #endif
134 }
135 
137 #if defined(HAVE_STL)
138  delete todo_;
139  delete results_;
140  delete looking_todo_;
141 printf("~BBSLocalServer not deleting everything\n");
142 // need to unref MessageValue in messages_ and delete WorkItem in work_
143  delete pending_;
144  delete messages_;
145  delete work_;
146  delete send_context_;
147 #endif
148 }
149 
150 bool BBSDirectServer::look_take(const char* key, bbsmpibuf** recv) {
151 #if debug
152  printf("DirectServer::look_take |%s|\n", key);
153 #endif
154  bool b = false;
155 #if defined(HAVE_STL)
156  nrnmpi_unref(*recv);
157  *recv = nil;
158  MessageList::iterator m = messages_->find(key);
159  if (m != messages_->end()) {
160  b = true;
161  *recv = (*m).second;
162 //printf("free %d\n", buf);
163  char* s = (char*)((*m).first);
164  messages_->erase(m);
165  delete [] s;
166  }
167 #if debug
168  printf("DirectServer::look_take |%s| recv=%p return %d\n", key, (*recv), b);
169 #endif
170 #endif
171  return b;
172 }
173 
174 bool BBSDirectServer::look(const char* key, bbsmpibuf** recv) {
175 #if debug
176  printf("DirectServer::look |%s|\n", key);
177 #endif
178  bool b = false;
179  nrnmpi_unref(*recv);
180  *recv = nil;
181 #if defined(HAVE_STL)
182  MessageList::iterator m = messages_->find(key);
183  if (m != messages_->end()) {
184  b = true;
185  *recv = (*m).second;
186  if (*recv) {
187  nrnmpi_ref(*recv);
188  }
189  }
190 #if debug
191  printf("DirectServer::look |%s| recv=%p return %d\n", key, (*recv), b);
192 #endif
193 #endif
194  return b;
195 }
196 
197 void BBSDirectServer::put_pending(const char* key, int cid) {
198 #if defined(HAVE_STL)
199 #if debug
200 printf("put_pending |%s| %d\n", key, cid);
201 #endif
202  char* s = newstr(key);
203  pending_->insert(std::pair<const char* const, const int>(s, cid));
204 #endif
205 }
206 
207 bool BBSDirectServer::take_pending(const char* key, int* cid) {
208  bool b = false;
209 #if defined(HAVE_STL)
210  PendingList::iterator p = pending_->find(key);
211  if (p != pending_->end()) {
212  *cid = (*p).second;
213 #if debug
214 printf("take_pending |%s| %d\n", key, *cid);
215 #endif
216  char* s = (char*)((*p).first);
217  pending_->erase(p);
218  delete [] s;
219  b = true;
220  }
221 #endif
222  return b;
223 }
224 
225 void BBSDirectServer::post(const char* key, bbsmpibuf* send) {
226 #if defined(HAVE_STL)
227  int cid;
228 #if debug
229  printf("DirectServer::post |%s| send=%p\n", key, send);
230 #endif
231  if (take_pending(key, &cid)) {
232  nrnmpi_bbssend(cid, TAKE, send);
233  }else{
234  MessageList::iterator m = messages_->insert(
235  std::pair<const char* const, bbsmpibuf*>(newstr(key), send)
236  );
237  nrnmpi_ref(send);
238  }
239 #endif
240 }
241 
242 void BBSDirectServer::add_looking_todo(int cid) {
243 #if defined(HAVE_STL)
244  looking_todo_->insert(cid);
245 #endif
246 }
247 
248 void BBSDirectServer::post_todo(int pid, int cid, bbsmpibuf* send){
249 #if defined(HAVE_STL)
250 #if debug
251 printf("BBSDirectServer::post_todo pid=%d cid=%d send=%p\n", pid, cid, send);
252 #endif
253  WorkItem* w = new WorkItem(next_id_++, send, cid);
254  nrnmpi_ref(send);
255  WorkList::iterator p = work_->find(pid);
256  if (p != work_->end()) {
257  w->parent_ = (WorkItem*)((*p).second);
258  }
259  work_->insert(std::pair<const int, const WorkItem*>(w->id_, w));
260 #if debug
261 printf("work insert %d\n", w->id_);
262 #endif
263  LookingToDoList::iterator i = looking_todo_->begin();
264  if (i != looking_todo_->end()) {
265  cid = (*i);
266  looking_todo_->erase(i);
267  // the send buffer is correct
268  nrnmpi_bbssend(cid, w->id_ + 1, send);
269  }else{
270 #if debug
271 printf("todo insert\n");
272 #endif
273  todo_->insert(w);
274  }
275 #endif
276 }
277 
278 void BBSDirectServer::context(bbsmpibuf* send) {
279 #if defined(HAVE_STL)
280  int cid, j;
281 #if debug
282 printf("numprocs_bbs=%d\n", nrnmpi_numprocs_bbs);
283 #endif
284  // Previous context may complete after allowing more activity
285  for (j=0; j < 1000; ++j) {
286  if (remaining_context_cnt_ == 0) {
287  break;
288  }
289  handle();
290  }
291  if (remaining_context_cnt_ > 0) {
292  Printf("some workers did not receive previous context\n");
293  send_context_->erase(send_context_->begin(), send_context_->end());
294  nrnmpi_unref(context_buf_);
295  context_buf_ = nil;
296  }
297  remaining_context_cnt_ = nrnmpi_numprocs_bbs - 1;
298  for (j = 1; j < nrnmpi_numprocs_bbs; ++j) {
299  send_context_->insert(j);
300  }
301  LookingToDoList::iterator i = looking_todo_->begin();
302  while (i != looking_todo_->end()) {
303  cid = (*i);
304  looking_todo_->erase(i);
305 #if debug
306 printf("sending context to already waiting %d\n", cid);
307 #endif
308  nrnmpi_bbssend(cid, CONTEXT+1, send);
309  i = send_context_->find(cid);
310  send_context_->erase(i);
311  --remaining_context_cnt_;
312  i = looking_todo_->begin();
313  }
314  if (remaining_context_cnt_ > 0) {
315  context_buf_ = send;
316  nrnmpi_ref(context_buf_);
317  handle();
318  }
319 #endif
320 }
321 
322 void nrnbbs_context_wait() {
323  if (BBSImpl::is_master_) {
324  BBSDirectServer::server_->context_wait();
325  }
326 }
327 
329 //printf("context_wait enter %d\n", remaining_context_cnt_);
330  while (remaining_context_cnt_) {
331  handle();
332  }
333 //printf("context_wait exit %d\n", remaining_context_cnt_);
334 }
335 
336 bool BBSDirectServer::send_context(int cid) {
337 #if defined(HAVE_STL)
338  LookingToDoList::iterator i = send_context_->find(cid);
339  if (i != send_context_->end()) {
340  send_context_->erase(i);
341 #if debug
342 printf("sending context to %d\n", cid);
343 #endif
344  nrnmpi_bbssend(cid, CONTEXT+1, context_buf_);
345  if (--remaining_context_cnt_ <= 0) {
346  nrnmpi_unref(context_buf_);
347  context_buf_ = nil;
348  }
349  return true;
350  }
351 #endif
352  return false;
353 }
354 
355 void BBSDirectServer::post_result(int id, bbsmpibuf* send){
356 #if defined(HAVE_STL)
357 #if debug
358 printf("DirectServer::post_result id=%d send=%p\n", id, send);
359 #endif
360  WorkList::iterator i = work_->find(id);
361  WorkItem* w = (WorkItem*)((*i).second);
362  nrnmpi_ref(send);
363  nrnmpi_unref(w->buf_);
364  w->buf_ = send;
365  results_->insert(std::pair<const int, const WorkItem*>(w->parent_ ? w->parent_->id_ : 0, w));
366 #endif
367 }
368 
369 int BBSDirectServer::look_take_todo(bbsmpibuf** recv) {
370 #if defined(HAVE_STL)
371 #if debug
372 printf("DirectServer::look_take_todo\n");
373 #endif
374  nrnmpi_unref(*recv);
375  *recv = nil;
376  ReadyList::iterator i = todo_->begin();
377  if (i != todo_->end()) {
378  WorkItem* w = (WorkItem*)(*i);
379  todo_->erase(i);
380  *recv = w->buf_;
381 #if debug
382 printf("DirectServer::look_take_todo recv %p with keypos=%d return %d\n", *recv, (*recv)->keypos, w->id_);
383 #endif
384  w->buf_ = 0;
385  return w->id_;
386  }else{
387  return 0;
388  }
389 #else
390 return 0;
391 #endif
392 }
393 
394 int BBSDirectServer::look_take_result(int pid, bbsmpibuf** recv) {
395 #if debug
396 printf("DirectServer::look_take_result pid=%d\n", pid);
397 #endif
398 #if defined(HAVE_STL)
399  nrnmpi_unref(*recv);
400  *recv = nil;
401  ResultList::iterator i = results_->find(pid);
402  if (i != results_->end()) {
403  WorkItem* w = (WorkItem*)((*i).second);
404  results_->erase(i);
405  *recv = w->buf_;
406  int id = w->id_;
407  WorkList::iterator j = work_->find(id);
408  work_->erase(j);
409  delete w;
410 #if debug
411 printf("DirectServer::look_take_result recv=%p return %d\n", *recv, id);
412 #endif
413  return id;
414  }else{
415  return 0;
416  }
417 #else
418 return 0;
419 #endif
420 }
421 
422 #endif //NRNMPI
static bool is_master_
Definition: bbsimpl.h:59
#define Printf
Definition: model.h:252
void post(const char *key)
bool look_take(const char *key)
#define TAKE
Definition: bbssrv.h:8
bool look(const char *key)
bool take_pending(const char *key, int *cid)
#define CONTEXT
Definition: bbssrv.h:19
size_t p
int nrnmpi_numprocs_bbs
static BBSDirectServer * server_
Definition: bbslsrv2.h:18
void nrnbbs_context_wait()
Definition: datapath.cpp:30
void context(int ncid, int *cids)
static double map(void *v)
Definition: mlinedit.cpp:46
bool send_context(int cid)
void put_pending(const char *key, int cid)
_CONST char * s
Definition: system.cpp:74
#define printf
Definition: mwprefix.h:26
#define key
Definition: spt2queue.cpp:20
size_t j
#define nil
Definition: enter-scope.h:36
virtual ~BBSDirectServer()
void send(const char *url)
Definition: hel2mos.cpp:212
void post_result(int id)
int look_take_result(int parentid)
#define i
Definition: md1redef.h:12
#define id
Definition: md1redef.h:33
char buf[512]
Definition: init.cpp:13
#define FIRSTID
Definition: bbssrv.h:23
void post_todo(int parentid, int cid)
static N_Vector id_
void add_looking_todo(int cid)