NEURON
bbsmpipack.cpp
Go to the documentation of this file.
1 #include <../../nrnconf.h>
2 #include <stdio.h>
3 #include <stdlib.h>
4 
5 /* do not want the redef in the dynamic load case */
6 #include <nrnmpiuse.h>
7 
8 #if NRNMPI_DYNAMICLOAD
9 #include <nrnmpi_dynam.h>
10 #endif
11 
12 #include <nrnmpi.h>
13 
14 #if NRNMPI
15 #if HAVE_STRING_H
16 #include <string.h>
17 #endif
18 #include <assert.h>
19 #include <errno.h>
20 #include <mpi.h>
21 #include <nrnmpidec.h>
22 #include <nrnmpi_impl.h>
23 #include <hocdec.h>
24 
25 #if 0
26 #define guard(f) nrn_assert(f == MPI_SUCCESS)
27 #else
28 #define guard(f) {int _i = f; if (_i != MPI_SUCCESS) {printf("%s %d\n", #f, _i); assert(0);}}
29 #endif
30 
31 #define nrnmpidebugleak 0
32 #define debug 0
33 
34 extern MPI_Comm nrn_bbs_comm;
35 
36 #if nrnmpidebugleak
37 static int nrnmpi_bufcnt_;
38 #endif
39 
40 /* we want to find the key easily and I am assuming that all MPI
41  implementations allow one to start unpacking from a particular position.
42  Therefore we give space for the key position at the beginning and
43  an int always takes up the same space at position 0.
44  Now I regret not forcing the key to come first at the user level.
45 */
46 
47 #define my_MPI_INT 0
48 #define my_MPI_DOUBLE 1
49 #define my_MPI_CHAR 2
50 #define my_MPI_PACKED 3
51 #define my_MPI_PICKLE 4
52 
53 static MPI_Datatype mytypes[] = {MPI_INT, MPI_DOUBLE, MPI_CHAR, MPI_PACKED, MPI_CHAR};
54 
55 static void unpack(void* buf, int count, int my_datatype, bbsmpibuf* r, const char* errmes) {
56  int type[2];
57  assert(r && r->buf);
58 #if debug
59 printf("%d unpack upkpos=%d pkposition=%d keypos=%d size=%d\n",
60  nrnmpi_myid_bbs, r->upkpos, r->pkposition, r->keypos, r->size);
61 #endif
62 assert(r->upkpos >= 0 && r->size >= r->upkpos);
63  guard(MPI_Unpack(r->buf, r->size, &r->upkpos, type, 2, MPI_INT, nrn_bbs_comm));
64 #if debug
65 printf("%d unpack r=%p size=%d upkpos=%d type[0]=%d datatype=%d type[1]=%d count=%d\n", nrnmpi_myid_bbs, r, r->size, r->upkpos, type[0], my_datatype, type[1], count);
66 #endif
67 if (type[0] != my_datatype || type[1] != count) {
68 printf("%d unpack size=%d upkpos=%d type[0]=%d datatype=%d type[1]=%d count=%d\n", nrnmpi_myid_bbs, r->size, r->upkpos, type[0], my_datatype, type[1], count);
69 }
70  assert(type[0] == my_datatype);
71  assert(type[1] == count);
72  guard(MPI_Unpack(r->buf, r->size, &r->upkpos, buf, count, mytypes[my_datatype], nrn_bbs_comm));
73 }
74 
75 void nrnmpi_upkbegin(bbsmpibuf* r) {
76  int type;
77  int p;
78 #if debug
79 printf("%d nrnmpi_upkbegin %p (preunpack upkpos=%d keypos=%d)\n", nrnmpi_myid_bbs, r, r->upkpos, r->keypos);
80 #endif
81 assert(r && r->buf && r->size > 0);
82  if (nrnmpi_myid_bbs == -1) {
83  hoc_execerror("subworld process with nhost > 0 cannot use", "the bulletin board");
84  }
85  r->upkpos = 0;
86  guard(MPI_Unpack(r->buf, r->size, &r->upkpos,
87  &p, 1, MPI_INT, nrn_bbs_comm));
88 if (p > r->size) {
89 printf("\n %d nrnmpi_upkbegin keypos=%d size=%d\n", nrnmpi_myid_bbs, p, r->size);
90 }
91 assert(p <= r->size);
92  guard(MPI_Unpack(r->buf, r->size, &p, &type, 1, MPI_INT, nrn_bbs_comm));
93 #if debug
94 printf("%d nrnmpi_upkbegin type=%d keypos=%d\n", nrnmpi_myid_bbs, type, p);
95 #endif
96  assert(type == 0);
97  r->keypos = p;
98 }
99 
100 char* nrnmpi_getkey(bbsmpibuf* r) {
101  char* s;
102  int type;
103  type = r->upkpos;
104  r->upkpos = r->keypos;
105 #if debug
106 printf("%d nrnmpi_getkey %p keypos=%d\n", nrnmpi_myid_bbs, r, r->keypos);
107 #endif
108  s = nrnmpi_upkstr(r);
109  assert(r->pkposition == 0 || r->pkposition == r->upkpos);
110  r->pkposition = r->upkpos;
111  r->upkpos = type;
112 #if debug
113 printf("getkey return %s\n", s);
114 #endif
115  return s;
116 }
117 
118 int nrnmpi_getid(bbsmpibuf* r) {
119  int i, type;
120  type = r->upkpos;
121  r->upkpos = r->keypos;
122 #if debug
123 printf("%d nrnmpi_getid %p keypos=%d\n", nrnmpi_myid_bbs, r, r->keypos);
124 #endif
125  i = nrnmpi_upkint(r);
126  r->upkpos = type;
127 #if debug
128 printf("getid return %d\n", i);
129 #endif
130  return i;
131 }
132 
133 int nrnmpi_upkint(bbsmpibuf* r) {
134  int i;
135  unpack(&i, 1, my_MPI_INT, r, "upkint");
136  return i;
137 }
138 
139 double nrnmpi_upkdouble(bbsmpibuf* r) {
140  double x;
141  unpack(&x, 1, my_MPI_DOUBLE, r, "upkdouble");
142  return x;
143 }
144 
145 void nrnmpi_upkvec(int n, double* x, bbsmpibuf* r) {
146  unpack(x, n, my_MPI_DOUBLE, r, "upkvec");
147 }
148 
149 #if NRNMPI_DYNAMICLOAD
150 /* for some unknown reason mpiexec -n 2 python3 test0.py gives
151 load_nrnmpi: /home/hines/neuron/nrndynam/x86_64/lib/libnrnmpi.so: undefined symbol: cxx_char_alloc
152 So fill this in explicitly in nrnmpi_dynam.cpp
153 */
154 char* (*p_cxx_char_alloc)(int len);
155 #endif
156 
157 char* nrnmpi_upkstr(bbsmpibuf* r) {
158  int len;
159  char* s;
160  unpack(&len, 1, my_MPI_INT, r, "upkstr length");
161 #if NRNMPI_DYNAMICLOAD
162  s = (*p_cxx_char_alloc)(len+1); /* will be delete not free */
163 #else
164  s = cxx_char_alloc(len+1); /* will be delete not free */
165 #endif
166  unpack(s, len, my_MPI_CHAR, r, "upkstr string");
167  s[len] = '\0';
168  return s;
169 }
170 
171 char* nrnmpi_upkpickle(size_t* size, bbsmpibuf* r) {
172  int len;
173  char* s;
174  unpack(&len, 1, my_MPI_INT, r, "upkpickle length");
175  *size = len;
176 #if NRNMPI_DYNAMICLOAD
177  s = (*p_cxx_char_alloc)(len+1); /* will be delete not free */
178 #else
179  s = cxx_char_alloc(len + 1); /* will be delete, not free */
180 #endif
181  unpack(s, len, my_MPI_PICKLE, r, "upkpickle data");
182  return s;
183 }
184 
185 static void resize(bbsmpibuf* r, int size) {
186  int newsize;
187  if (r->size < size) {
188  newsize = (size/64)*64 + 128;
189  r->buf = static_cast<char*>(hoc_Erealloc(r->buf, newsize)); hoc_malchk();
190  r->size = newsize;
191  }
192 }
193 
194 void nrnmpi_pkbegin(bbsmpibuf* r) {
195  int type;
196  if (nrnmpi_myid_bbs == -1) {
197  hoc_execerror("subworld process with nhost > 0 cannot use", "the bulletin board");
198  }
199  r->pkposition = 0;
200  type = 0;
201 #if debug
202 printf("%d nrnmpi_pkbegin %p size=%d pkposition=%d\n", nrnmpi_myid_bbs, r, r->size, r->pkposition);
203 #endif
204  guard(MPI_Pack(&type, 1, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
205 }
206 
207 void nrnmpi_enddata(bbsmpibuf* r) {
208  int p, type, isize, oldsize;
209  p = r->pkposition;
210  type = 0;
211 #if debug
212 printf("%d nrnmpi_enddata %p size=%d pkposition=%d\n", nrnmpi_myid_bbs, r, r->size, p);
213 #endif
214  guard(MPI_Pack_size(1, MPI_INT, nrn_bbs_comm, &isize));
215 oldsize = r->size;
216  resize(r, r->pkposition + isize);
217 #if debug
218 if (oldsize < r->pkposition + isize) {
219  printf("%d %p need %d more. end up with total of %d\n", nrnmpi_myid_bbs, r, isize, r->size);
220 }
221 #endif
222  guard(MPI_Pack(&type, 1, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
223 #if debug
224 printf("%d nrnmpi_enddata buf=%p size=%d pkposition=%d\n", nrnmpi_myid_bbs, r->buf, r->size, r->pkposition);
225 #endif
226  guard(MPI_Pack(&p, 1, MPI_INT, r->buf, r->size, &type, nrn_bbs_comm));
227 #if debug
228 printf("%d after nrnmpi_enddata, %d was packed at beginning and 0 was packed before %d\n", nrnmpi_myid_bbs, p, r->pkposition);
229 #endif
230 }
231 
232 static void pack(void* inbuf, int incount, int my_datatype, bbsmpibuf* r, const char* e) {
233  int type[2];
234  int dsize, isize, oldsize;
235 #if debug
236 printf("%d pack %p count=%d type=%d outbuf-%p pkposition=%d %s\n", nrnmpi_myid_bbs, r, incount, my_datatype, r->buf, r->pkposition, e);
237 #endif
238  guard(MPI_Pack_size(incount, mytypes[my_datatype], nrn_bbs_comm, &dsize));
239  guard(MPI_Pack_size(2, MPI_INT, nrn_bbs_comm, &isize));
240 oldsize = r->size;
241  resize(r, r->pkposition + dsize + isize);
242 #if debug
243 if (oldsize < r->pkposition + dsize + isize) {
244  printf("%d %p need %d more. end up with total of %d\n", nrnmpi_myid_bbs, r, dsize+isize, r->size);
245 }
246 #endif
247  type[0] = my_datatype; type[1] = incount;
248  guard(MPI_Pack(type, 2, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
249  guard(MPI_Pack(inbuf, incount, mytypes[my_datatype], r->buf, r->size, &r->pkposition, nrn_bbs_comm));
250 #if debug
251 printf("%d pack done pkposition=%d\n", nrnmpi_myid_bbs, r->pkposition);
252 #endif
253 }
254 
255 void nrnmpi_pkint(int i, bbsmpibuf* r) {
256  int ii;
257  ii = i;
258  pack(&ii, 1, my_MPI_INT, r, "pkint");
259 }
260 
261 void nrnmpi_pkdouble(double x, bbsmpibuf* r) {
262  double xx;
263  xx = x;
264  pack(&xx, 1, my_MPI_DOUBLE, r, "pkdouble");
265 }
266 
267 void nrnmpi_pkvec(int n, double* x, bbsmpibuf* r) {
268  pack(x, n, my_MPI_DOUBLE, r, "pkvec");
269 }
270 
271 void nrnmpi_pkstr(const char* s, bbsmpibuf* r) {
272  int len;
273  len = strlen(s);
274  pack(&len, 1, my_MPI_INT, r, "pkstr length");
275  pack((char*)s, len, my_MPI_CHAR, r, "pkstr string");
276 }
277 
278 void nrnmpi_pkpickle(const char* s, size_t size, bbsmpibuf* r) {
279  int len = size;
280  pack(&len, 1, my_MPI_INT, r, "pkpickle length");
281  pack((char*)s, len, my_MPI_PICKLE, r, "pkpickle data");
282 }
283 
284 void nrnmpi_bbssend(int dest, int tag, bbsmpibuf* r) {
285 #if debug
286 printf("%d nrnmpi_bbssend %p dest=%d tag=%d size=%d\n", nrnmpi_myid_bbs, r, dest, tag, (r)?r->size:0);
287 #endif
288 
289  /* Some MPI implementations limit tags to be less than full MPI_INT domain
290  so pack tag if > 20 in second slot (now reserved) of buffer and use 20 as the tag.
291  */
292  if (tag > 20) {
293  int save_position = r->pkposition;
294  int save_upkpos = r->upkpos;
295  nrnmpi_upkbegin(r);
296  nrnmpi_upkint(r);
297  r->pkposition = r->upkpos;
298  nrnmpi_pkint(tag, r);
299  r->pkposition = save_position;
300  r->upkpos = save_upkpos;
301  tag = 20;
302  }
303 
304  if (r) {
305  assert( r->buf && r->keypos <= r->size);
306  guard(MPI_Send(r->buf, r->size, MPI_PACKED, dest, tag, nrn_bbs_comm));
307  }else{
308  guard(MPI_Send(NULL, 0, MPI_PACKED, dest, tag, nrn_bbs_comm));
309  }
310  errno = 0;
311 #if debug
312 printf("%d return from send\n", nrnmpi_myid_bbs);
313 #endif
314 }
315 
316 int nrnmpi_bbsrecv(int source, bbsmpibuf* r) {
317  MPI_Status status;
318  int size;
319  if (source == -1) {
320  source = MPI_ANY_SOURCE;
321  }
322 #if debug
323 printf("%d nrnmpi_bbsrecv %p\n", nrnmpi_myid_bbs, r);
324 #endif
325  guard(MPI_Probe(source, MPI_ANY_TAG, nrn_bbs_comm, &status));
326  guard(MPI_Get_count(&status, MPI_PACKED, &size));
327 #if debug
328 printf("%d nrnmpi_bbsrecv probe size=%d source=%d tag=%d\n", nrnmpi_myid_bbs, size, status.MPI_SOURCE, status.MPI_TAG);
329 #endif
330  resize(r, size);
331  guard(MPI_Recv(r->buf, r->size, MPI_PACKED, source, MPI_ANY_TAG, nrn_bbs_comm, &status));
332  errno = 0;
333  /* Some MPI implementations limit tags to be less than full MPI_INT domain
334  In the past we allowed TODO mesages to have tags > 20 (FIRSTID of src/parallel/bbssrv.h)
335  To fix the bug we no longer send or receive such tags and instead
336  copy the tag into the second pkint of the r->buf and send with
337  a tag of 20.
338  */
339  if (status.MPI_TAG == 20) {
340  int tag;
341  int save_upkpos = r->upkpos; /* possibly not needed */
342  nrnmpi_upkbegin(r);
343  nrnmpi_upkint(r);
344  tag = nrnmpi_upkint(r);
345  r->upkpos = save_upkpos;
346  return tag;
347  }
348  return status.MPI_TAG;
349 }
350 
351 int nrnmpi_bbssendrecv(int dest, int tag, bbsmpibuf* s, bbsmpibuf* r) {
352  int size, itag, source;
353  int msgtag;
354  MPI_Status status;
355 #if debug
356 printf("%d nrnmpi_bbssendrecv dest=%d tag=%d\n", nrnmpi_myid_bbs, dest, tag);
357 #endif
358  if (!nrnmpi_iprobe(&size, &itag, &source) || source != dest) {
359 #if debug
360 printf("%d nrnmpi_bbssendrecv nothing available so send\n", nrnmpi_myid_bbs);
361 #endif
362  nrnmpi_bbssend(dest, tag, s);
363  }
364  return nrnmpi_bbsrecv(dest, r);
365 }
366 
367 int nrnmpi_iprobe(int* size, int* tag, int* source) {
368  int flag = 0;
369  MPI_Status status;
370  guard(MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, nrn_bbs_comm, &flag, &status));
371  if (flag) {
372  if (source) *source = status.MPI_SOURCE;
373  if (tag) *tag = status.MPI_TAG;
374  if (size) guard(MPI_Get_count(&status, MPI_PACKED, size));
375  }
376  return flag;
377 }
378 
379 void nrnmpi_probe(int* size, int* tag, int* source) {
380  int flag = 0;
381  MPI_Status status;
382  guard(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, nrn_bbs_comm, &status));
383  if (source) *source = status.MPI_SOURCE;
384  if (tag) *tag = status.MPI_TAG;
385  if (size) guard(MPI_Get_count(&status, MPI_PACKED, size));
386 }
387 
388 bbsmpibuf* nrnmpi_newbuf(int size) {
389  bbsmpibuf* buf;
390  buf = (bbsmpibuf*)hoc_Emalloc(sizeof(bbsmpibuf)); hoc_malchk();
391 #if debug
392 printf("%d nrnmpi_newbuf %p\n", nrnmpi_myid_bbs, buf);
393 #endif
394  buf->buf = (char*)0;
395  if (size > 0) {
396  buf->buf = static_cast<char*>(hoc_Emalloc(size*sizeof(char))) ; hoc_malchk();
397  }
398  buf->size = size;
399  buf->pkposition = 0;
400  buf->upkpos = 0;
401  buf->keypos = 0;
402  buf->refcount = 0;
403 #if nrnmpidebugleak
404  ++nrnmpi_bufcnt_;
405 #endif
406  return buf;
407 }
408 
409 void nrnmpi_copy(bbsmpibuf* dest, bbsmpibuf* src){
410  int i;
411  resize(dest, src->size);
412  for (i=0; i < src->size; ++i) {
413  dest->buf[i] = src->buf[i];
414  }
415  dest->pkposition = src->pkposition;
416  dest->upkpos = src->upkpos;
417  dest->keypos = src->keypos;
418 }
419 
420 static void nrnmpi_free(bbsmpibuf* buf){
421 #if debug
422 printf("%d nrnmpi_free %p\n", nrnmpi_myid_bbs, buf);
423 #endif
424  if (buf->buf) {
425  free(buf->buf);
426  }
427  free(buf);
428 #if nrnmpidebugleak
429  --nrnmpi_bufcnt_;
430 #endif
431 }
432 
433 void nrnmpi_ref(bbsmpibuf* buf) {
434  assert(buf);
435  buf->refcount += 1;
436 }
437 
438 void nrnmpi_unref(bbsmpibuf* buf) {
439  if (buf) {
440  --buf->refcount;
441  if (buf->refcount <= 0) {
442  nrnmpi_free(buf);
443  }
444  }
445 }
446 
447 #if nrnmpidebugleak
448 void nrnmpi_checkbufleak() {
449  if (nrnmpi_bufcnt_ > 0) {
450  printf("%d nrnmpi_bufcnt=%d\n", nrnmpi_myid_bbs, nrnmpi_bufcnt_);
451  }
452 }
453 #endif
454 
455 #endif /*NRNMPI*/
#define assert(ex)
Definition: hocassrt.h:26
short type
Definition: cabvars.h:10
int nrnmpi_myid_bbs
size_t p
#define e
Definition: passive0.cpp:24
return status
static double pack(void *v)
Definition: ocbbs.cpp:310
int const size_t const size_t n
Definition: nrngsl.h:12
_CONST char * s
Definition: system.cpp:74
#define printf
Definition: mwprefix.h:26
void hoc_execerror(const char *, const char *)
Definition: hoc.cpp:741
errno
Definition: system.cpp:98
static double resize(void *v)
char * cxx_char_alloc(size_t sz)
Definition: ivoc.cpp:122
void * hoc_Erealloc(void *buf, size_t size)
Definition: symbol.cpp:253
void hoc_malchk()
Definition: symbol.cpp:187
#define i
Definition: md1redef.h:12
char buf[512]
Definition: init.cpp:13
static double unpack(void *v)
Definition: ocbbs.cpp:351
return NULL
Definition: cabcode.cpp:461
void * hoc_Emalloc(size_t size)
Definition: symbol.cpp:194