NEURON
bbsmpipack.cpp
Go to the documentation of this file.
1 #include <../../nrnconf.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 
5 /* do not want the redef in the dynamic load case */
6 #include <nrnmpiuse.h>
7 
8 #if NRNMPI_DYNAMICLOAD
9 #include <nrnmpi_dynam.h>
10 #endif
11 
12 #include <nrnmpi.h>
13 
14 #if NRNMPI
15 #if HAVE_STRING_H
16 #include <string.h>
17 #endif
18 #include <assert.h>
19 #include <errno.h>
20 #include <mpi.h>
21 #include <nrnmpidec.h>
22 #include <nrnmpi_impl.h>
23 #include <hocdec.h>
24 
25 #if 0
26 #define guard(f) nrn_assert(f == MPI_SUCCESS)
27 #else
28 #define guard(f) \
29  { \
30  int _i = f; \
31  if (_i != MPI_SUCCESS) { \
32  printf("%s %d\n", #f, _i); \
33  assert(0); \
34  } \
35  }
36 #endif
37 
38 #define nrnmpidebugleak 0
39 #define debug 0
40 
41 extern MPI_Comm nrn_bbs_comm;
42 
43 #if nrnmpidebugleak
44 static int nrnmpi_bufcnt_;
45 #endif
46 
47 /* we want to find the key easily and I am assuming that all MPI
48  implementations allow one to start unpacking from a particular position.
49  Therefore we give space for the key position at the beginning and
50  an int always takes up the same space at position 0.
51  Now I regret not forcing the key to come first at the user level.
52 */
53 
54 #define my_MPI_INT 0
55 #define my_MPI_DOUBLE 1
56 #define my_MPI_CHAR 2
57 #define my_MPI_PACKED 3
58 #define my_MPI_PICKLE 4
59 
60 static MPI_Datatype mytypes[] = {MPI_INT, MPI_DOUBLE, MPI_CHAR, MPI_PACKED, MPI_CHAR};
61 
62 static void unpack(void* buf, int count, int my_datatype, bbsmpibuf* r, const char* errmes) {
63  int type[2];
64  assert(r && r->buf);
65 #if debug
66  printf("%d unpack upkpos=%d pkposition=%d keypos=%d size=%d\n",
68  r->upkpos,
69  r->pkposition,
70  r->keypos,
71  r->size);
72 #endif
73  assert(r->upkpos >= 0 && r->size >= r->upkpos);
74  guard(MPI_Unpack(r->buf, r->size, &r->upkpos, type, 2, MPI_INT, nrn_bbs_comm));
75 #if debug
76  printf("%d unpack r=%p size=%d upkpos=%d type[0]=%d datatype=%d type[1]=%d count=%d\n",
78  r,
79  r->size,
80  r->upkpos,
81  type[0],
82  my_datatype,
83  type[1],
84  count);
85 #endif
86  if (type[0] != my_datatype || type[1] != count) {
87  printf("%d unpack size=%d upkpos=%d type[0]=%d datatype=%d type[1]=%d count=%d\n",
89  r->size,
90  r->upkpos,
91  type[0],
92  my_datatype,
93  type[1],
94  count);
95  }
96  assert(type[0] == my_datatype);
97  assert(type[1] == count);
98  guard(MPI_Unpack(r->buf, r->size, &r->upkpos, buf, count, mytypes[my_datatype], nrn_bbs_comm));
99 }
100 
101 void nrnmpi_upkbegin(bbsmpibuf* r) {
102  int type;
103  int p;
104 #if debug
105  printf("%d nrnmpi_upkbegin %p (preunpack upkpos=%d keypos=%d)\n",
107  r,
108  r->upkpos,
109  r->keypos);
110 #endif
111  assert(r && r->buf && r->size > 0);
112  if (nrnmpi_myid_bbs == -1) {
113  hoc_execerror("subworld process with nhost > 0 cannot use", "the bulletin board");
114  }
115  r->upkpos = 0;
116  guard(MPI_Unpack(r->buf, r->size, &r->upkpos, &p, 1, MPI_INT, nrn_bbs_comm));
117  if (p > r->size) {
118  printf("\n %d nrnmpi_upkbegin keypos=%d size=%d\n", nrnmpi_myid_bbs, p, r->size);
119  }
120  assert(p <= r->size);
121  guard(MPI_Unpack(r->buf, r->size, &p, &type, 1, MPI_INT, nrn_bbs_comm));
122 #if debug
123  printf("%d nrnmpi_upkbegin type=%d keypos=%d\n", nrnmpi_myid_bbs, type, p);
124 #endif
125  assert(type == 0);
126  r->keypos = p;
127 }
128 
129 char* nrnmpi_getkey(bbsmpibuf* r) {
130  char* s;
131  int type;
132  type = r->upkpos;
133  r->upkpos = r->keypos;
134 #if debug
135  printf("%d nrnmpi_getkey %p keypos=%d\n", nrnmpi_myid_bbs, r, r->keypos);
136 #endif
137  s = nrnmpi_upkstr(r);
138  assert(r->pkposition == 0 || r->pkposition == r->upkpos);
139  r->pkposition = r->upkpos;
140  r->upkpos = type;
141 #if debug
142  printf("getkey return %s\n", s);
143 #endif
144  return s;
145 }
146 
147 int nrnmpi_getid(bbsmpibuf* r) {
148  int i, type;
149  type = r->upkpos;
150  r->upkpos = r->keypos;
151 #if debug
152  printf("%d nrnmpi_getid %p keypos=%d\n", nrnmpi_myid_bbs, r, r->keypos);
153 #endif
154  i = nrnmpi_upkint(r);
155  r->upkpos = type;
156 #if debug
157  printf("getid return %d\n", i);
158 #endif
159  return i;
160 }
161 
162 int nrnmpi_upkint(bbsmpibuf* r) {
163  int i;
164  unpack(&i, 1, my_MPI_INT, r, "upkint");
165  return i;
166 }
167 
168 double nrnmpi_upkdouble(bbsmpibuf* r) {
169  double x;
170  unpack(&x, 1, my_MPI_DOUBLE, r, "upkdouble");
171  return x;
172 }
173 
174 void nrnmpi_upkvec(int n, double* x, bbsmpibuf* r) {
175  unpack(x, n, my_MPI_DOUBLE, r, "upkvec");
176 }
177 
178 #if NRNMPI_DYNAMICLOAD
179 /* for some unknown reason mpiexec -n 2 python3 test0.py gives
180 load_nrnmpi: /home/hines/neuron/nrndynam/x86_64/lib/libnrnmpi.so: undefined symbol: cxx_char_alloc
181 So fill this in explicitly in nrnmpi_dynam.cpp
182 */
183 char* (*p_cxx_char_alloc)(int len);
184 #endif
185 
186 char* nrnmpi_upkstr(bbsmpibuf* r) {
187  int len;
188  char* s;
189  unpack(&len, 1, my_MPI_INT, r, "upkstr length");
190 #if NRNMPI_DYNAMICLOAD
191  s = (*p_cxx_char_alloc)(len + 1); /* will be delete not free */
192 #else
193  s = cxx_char_alloc(len + 1); /* will be delete not free */
194 #endif
195  unpack(s, len, my_MPI_CHAR, r, "upkstr string");
196  s[len] = '\0';
197  return s;
198 }
199 
200 char* nrnmpi_upkpickle(size_t* size, bbsmpibuf* r) {
201  int len;
202  char* s;
203  unpack(&len, 1, my_MPI_INT, r, "upkpickle length");
204  *size = len;
205 #if NRNMPI_DYNAMICLOAD
206  s = (*p_cxx_char_alloc)(len + 1); /* will be delete not free */
207 #else
208  s = cxx_char_alloc(len + 1); /* will be delete, not free */
209 #endif
210  unpack(s, len, my_MPI_PICKLE, r, "upkpickle data");
211  return s;
212 }
213 
214 static void resize(bbsmpibuf* r, int size) {
215  int newsize;
216  if (r->size < size) {
217  newsize = (size / 64) * 64 + 128;
218  r->buf = static_cast<char*>(hoc_Erealloc(r->buf, newsize));
219  hoc_malchk();
220  r->size = newsize;
221  }
222 }
223 
224 void nrnmpi_pkbegin(bbsmpibuf* r) {
225  int type;
226  if (nrnmpi_myid_bbs == -1) {
227  hoc_execerror("subworld process with nhost > 0 cannot use", "the bulletin board");
228  }
229  r->pkposition = 0;
230  type = 0;
231 #if debug
232  printf(
233  "%d nrnmpi_pkbegin %p size=%d pkposition=%d\n", nrnmpi_myid_bbs, r, r->size, r->pkposition);
234 #endif
235  guard(MPI_Pack(&type, 1, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
236 }
237 
238 void nrnmpi_enddata(bbsmpibuf* r) {
239  int p, type, isize, oldsize;
240  p = r->pkposition;
241  type = 0;
242 #if debug
243  printf("%d nrnmpi_enddata %p size=%d pkposition=%d\n", nrnmpi_myid_bbs, r, r->size, p);
244 #endif
245  guard(MPI_Pack_size(1, MPI_INT, nrn_bbs_comm, &isize));
246  oldsize = r->size;
247  resize(r, r->pkposition + isize);
248 #if debug
249  if (oldsize < r->pkposition + isize) {
250  printf("%d %p need %d more. end up with total of %d\n", nrnmpi_myid_bbs, r, isize, r->size);
251  }
252 #endif
253  guard(MPI_Pack(&type, 1, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
254 #if debug
255  printf("%d nrnmpi_enddata buf=%p size=%d pkposition=%d\n",
257  r->buf,
258  r->size,
259  r->pkposition);
260 #endif
261  guard(MPI_Pack(&p, 1, MPI_INT, r->buf, r->size, &type, nrn_bbs_comm));
262 #if debug
263  printf("%d after nrnmpi_enddata, %d was packed at beginning and 0 was packed before %d\n",
265  p,
266  r->pkposition);
267 #endif
268 }
269 
270 static void pack(void* inbuf, int incount, int my_datatype, bbsmpibuf* r, const char* e) {
271  int type[2];
272  int dsize, isize, oldsize;
273 #if debug
274  printf("%d pack %p count=%d type=%d outbuf-%p pkposition=%d %s\n",
276  r,
277  incount,
278  my_datatype,
279  r->buf,
280  r->pkposition,
281  e);
282 #endif
283  guard(MPI_Pack_size(incount, mytypes[my_datatype], nrn_bbs_comm, &dsize));
284  guard(MPI_Pack_size(2, MPI_INT, nrn_bbs_comm, &isize));
285  oldsize = r->size;
286  resize(r, r->pkposition + dsize + isize);
287 #if debug
288  if (oldsize < r->pkposition + dsize + isize) {
289  printf("%d %p need %d more. end up with total of %d\n",
291  r,
292  dsize + isize,
293  r->size);
294  }
295 #endif
296  type[0] = my_datatype;
297  type[1] = incount;
298  guard(MPI_Pack(type, 2, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
299  guard(MPI_Pack(
300  inbuf, incount, mytypes[my_datatype], r->buf, r->size, &r->pkposition, nrn_bbs_comm));
301 #if debug
302  printf("%d pack done pkposition=%d\n", nrnmpi_myid_bbs, r->pkposition);
303 #endif
304 }
305 
306 void nrnmpi_pkint(int i, bbsmpibuf* r) {
307  int ii;
308  ii = i;
309  pack(&ii, 1, my_MPI_INT, r, "pkint");
310 }
311 
312 void nrnmpi_pkdouble(double x, bbsmpibuf* r) {
313  double xx;
314  xx = x;
315  pack(&xx, 1, my_MPI_DOUBLE, r, "pkdouble");
316 }
317 
318 void nrnmpi_pkvec(int n, double* x, bbsmpibuf* r) {
319  pack(x, n, my_MPI_DOUBLE, r, "pkvec");
320 }
321 
322 void nrnmpi_pkstr(const char* s, bbsmpibuf* r) {
323  int len;
324  len = strlen(s);
325  pack(&len, 1, my_MPI_INT, r, "pkstr length");
326  pack((char*) s, len, my_MPI_CHAR, r, "pkstr string");
327 }
328 
329 void nrnmpi_pkpickle(const char* s, size_t size, bbsmpibuf* r) {
330  int len = size;
331  pack(&len, 1, my_MPI_INT, r, "pkpickle length");
332  pack((char*) s, len, my_MPI_PICKLE, r, "pkpickle data");
333 }
334 
335 void nrnmpi_bbssend(int dest, int tag, bbsmpibuf* r) {
336 #if debug
337  printf("%d nrnmpi_bbssend %p dest=%d tag=%d size=%d\n",
339  r,
340  dest,
341  tag,
342  (r) ? r->size : 0);
343 #endif
344 
345  /* Some MPI implementations limit tags to be less than full MPI_INT domain
346  so pack tag if > 20 in second slot (now reserved) of buffer and use 20 as the tag.
347  */
348  if (tag > 20) {
349  int save_position = r->pkposition;
350  int save_upkpos = r->upkpos;
351  nrnmpi_upkbegin(r);
352  nrnmpi_upkint(r);
353  r->pkposition = r->upkpos;
354  nrnmpi_pkint(tag, r);
355  r->pkposition = save_position;
356  r->upkpos = save_upkpos;
357  tag = 20;
358  }
359 
360  if (r) {
361  assert(r->buf && r->keypos <= r->size);
362  guard(MPI_Send(r->buf, r->size, MPI_PACKED, dest, tag, nrn_bbs_comm));
363  } else {
364  guard(MPI_Send(NULL, 0, MPI_PACKED, dest, tag, nrn_bbs_comm));
365  }
366  errno = 0;
367 #if debug
368  printf("%d return from send\n", nrnmpi_myid_bbs);
369 #endif
370 }
371 
372 int nrnmpi_bbsrecv(int source, bbsmpibuf* r) {
373  MPI_Status status;
374  int size;
375  if (source == -1) {
376  source = MPI_ANY_SOURCE;
377  }
378 #if debug
379  printf("%d nrnmpi_bbsrecv %p\n", nrnmpi_myid_bbs, r);
380 #endif
381  guard(MPI_Probe(source, MPI_ANY_TAG, nrn_bbs_comm, &status));
382  guard(MPI_Get_count(&status, MPI_PACKED, &size));
383 #if debug
384  printf("%d nrnmpi_bbsrecv probe size=%d source=%d tag=%d\n",
386  size,
387  status.MPI_SOURCE,
388  status.MPI_TAG);
389 #endif
390  resize(r, size);
391  guard(MPI_Recv(r->buf, r->size, MPI_PACKED, source, MPI_ANY_TAG, nrn_bbs_comm, &status));
392  errno = 0;
393  /* Some MPI implementations limit tags to be less than full MPI_INT domain
394  In the past we allowed TODO mesages to have tags > 20 (FIRSTID of src/parallel/bbssrv.h)
395  To fix the bug we no longer send or receive such tags and instead
396  copy the tag into the second pkint of the r->buf and send with
397  a tag of 20.
398  */
399  if (status.MPI_TAG == 20) {
400  int tag;
401  int save_upkpos = r->upkpos; /* possibly not needed */
402  nrnmpi_upkbegin(r);
403  nrnmpi_upkint(r);
404  tag = nrnmpi_upkint(r);
405  r->upkpos = save_upkpos;
406  return tag;
407  }
408  return status.MPI_TAG;
409 }
410 
411 int nrnmpi_bbssendrecv(int dest, int tag, bbsmpibuf* s, bbsmpibuf* r) {
412  int size, itag, source;
413  int msgtag;
414  MPI_Status status;
415 #if debug
416  printf("%d nrnmpi_bbssendrecv dest=%d tag=%d\n", nrnmpi_myid_bbs, dest, tag);
417 #endif
418  if (!nrnmpi_iprobe(&size, &itag, &source) || source != dest) {
419 #if debug
420  printf("%d nrnmpi_bbssendrecv nothing available so send\n", nrnmpi_myid_bbs);
421 #endif
422  nrnmpi_bbssend(dest, tag, s);
423  }
424  return nrnmpi_bbsrecv(dest, r);
425 }
426 
427 int nrnmpi_iprobe(int* size, int* tag, int* source) {
428  int flag = 0;
429  MPI_Status status;
430  guard(MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, nrn_bbs_comm, &flag, &status));
431  if (flag) {
432  if (source)
433  *source = status.MPI_SOURCE;
434  if (tag)
435  *tag = status.MPI_TAG;
436  if (size)
437  guard(MPI_Get_count(&status, MPI_PACKED, size));
438  }
439  return flag;
440 }
441 
442 void nrnmpi_probe(int* size, int* tag, int* source) {
443  int flag = 0;
444  MPI_Status status;
445  guard(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, nrn_bbs_comm, &status));
446  if (source)
447  *source = status.MPI_SOURCE;
448  if (tag)
449  *tag = status.MPI_TAG;
450  if (size)
451  guard(MPI_Get_count(&status, MPI_PACKED, size));
452 }
453 
454 bbsmpibuf* nrnmpi_newbuf(int size) {
455  bbsmpibuf* buf;
456  buf = (bbsmpibuf*) hoc_Emalloc(sizeof(bbsmpibuf));
457  hoc_malchk();
458 #if debug
459  printf("%d nrnmpi_newbuf %p\n", nrnmpi_myid_bbs, buf);
460 #endif
461  buf->buf = (char*) 0;
462  if (size > 0) {
463  buf->buf = static_cast<char*>(hoc_Emalloc(size * sizeof(char)));
464  hoc_malchk();
465  }
466  buf->size = size;
467  buf->pkposition = 0;
468  buf->upkpos = 0;
469  buf->keypos = 0;
470  buf->refcount = 0;
471 #if nrnmpidebugleak
472  ++nrnmpi_bufcnt_;
473 #endif
474  return buf;
475 }
476 
477 void nrnmpi_copy(bbsmpibuf* dest, bbsmpibuf* src) {
478  int i;
479  resize(dest, src->size);
480  for (i = 0; i < src->size; ++i) {
481  dest->buf[i] = src->buf[i];
482  }
483  dest->pkposition = src->pkposition;
484  dest->upkpos = src->upkpos;
485  dest->keypos = src->keypos;
486 }
487 
488 static void nrnmpi_free(bbsmpibuf* buf) {
489 #if debug
490  printf("%d nrnmpi_free %p\n", nrnmpi_myid_bbs, buf);
491 #endif
492  if (buf->buf) {
493  free(buf->buf);
494  }
495  free(buf);
496 #if nrnmpidebugleak
497  --nrnmpi_bufcnt_;
498 #endif
499 }
500 
501 void nrnmpi_ref(bbsmpibuf* buf) {
502  assert(buf);
503  buf->refcount += 1;
504 }
505 
506 void nrnmpi_unref(bbsmpibuf* buf) {
507  if (buf) {
508  --buf->refcount;
509  if (buf->refcount <= 0) {
510  nrnmpi_free(buf);
511  }
512  }
513 }
514 
515 #if nrnmpidebugleak
516 void nrnmpi_checkbufleak() {
517  if (nrnmpi_bufcnt_ > 0) {
518  printf("%d nrnmpi_bufcnt=%d\n", nrnmpi_myid_bbs, nrnmpi_bufcnt_);
519  }
520 }
521 #endif
522 
523 #endif /*NRNMPI*/
short type
Definition: cabvars.h:9
void hoc_execerror(const char *, const char *)
Definition: hoc.cpp:754
char buf[512]
Definition: init.cpp:13
char * cxx_char_alloc(size_t sz)
Definition: ivoc.cpp:118
#define assert(ex)
Definition: hocassrt.h:32
#define i
Definition: md1redef.h:12
#define printf
Definition: mwprefix.h:26
int const size_t const size_t n
Definition: nrngsl.h:11
return status
size_t p
int nrnmpi_myid_bbs
void * hoc_Erealloc(void *buf, size_t size)
Definition: symbol.cpp:251
void * hoc_Emalloc(size_t size)
Definition: symbol.cpp:190
void hoc_malchk()
Definition: symbol.cpp:183
static double resize(void *v)
static double pack(void *v)
Definition: ocbbs.cpp:313
static double unpack(void *v)
Definition: ocbbs.cpp:354
#define e
Definition: passive0.cpp:22
#define NULL
Definition: sptree.h:16