1 #include <../../nrnconf.h> 9 #include <nrnmpi_dynam.h> 26 #define guard(f) nrn_assert(f == MPI_SUCCESS) 28 #define guard(f) {int _i = f; if (_i != MPI_SUCCESS) {printf("%s %d\n", #f, _i); assert(0);}} 31 #define nrnmpidebugleak 0 34 extern MPI_Comm nrn_bbs_comm;
37 static int nrnmpi_bufcnt_;
48 #define my_MPI_DOUBLE 1 50 #define my_MPI_PACKED 3 51 #define my_MPI_PICKLE 4 53 static MPI_Datatype mytypes[] = {MPI_INT, MPI_DOUBLE, MPI_CHAR, MPI_PACKED, MPI_CHAR};
55 static void unpack(
void*
buf,
int count,
int my_datatype, bbsmpibuf* r,
const char* errmes) {
59 printf(
"%d unpack upkpos=%d pkposition=%d keypos=%d size=%d\n",
62 assert(r->upkpos >= 0 && r->size >= r->upkpos);
63 guard(MPI_Unpack(r->buf, r->size, &r->upkpos, type, 2, MPI_INT, nrn_bbs_comm));
65 printf(
"%d unpack r=%p size=%d upkpos=%d type[0]=%d datatype=%d type[1]=%d count=%d\n",
nrnmpi_myid_bbs, r, r->size, r->upkpos, type[0], my_datatype, type[1], count);
67 if (type[0] != my_datatype || type[1] != count) {
68 printf(
"%d unpack size=%d upkpos=%d type[0]=%d datatype=%d type[1]=%d count=%d\n",
nrnmpi_myid_bbs, r->size, r->upkpos, type[0], my_datatype, type[1], count);
70 assert(type[0] == my_datatype);
72 guard(MPI_Unpack(r->buf, r->size, &r->upkpos, buf, count, mytypes[my_datatype], nrn_bbs_comm));
75 void nrnmpi_upkbegin(bbsmpibuf* r) {
79 printf(
"%d nrnmpi_upkbegin %p (preunpack upkpos=%d keypos=%d)\n",
nrnmpi_myid_bbs, r, r->upkpos, r->keypos);
81 assert(r && r->buf && r->size > 0);
83 hoc_execerror(
"subworld process with nhost > 0 cannot use",
"the bulletin board");
86 guard(MPI_Unpack(r->buf, r->size, &r->upkpos,
87 &p, 1, MPI_INT, nrn_bbs_comm));
92 guard(MPI_Unpack(r->buf, r->size, &p, &type, 1, MPI_INT, nrn_bbs_comm));
100 char* nrnmpi_getkey(bbsmpibuf* r) {
104 r->upkpos = r->keypos;
108 s = nrnmpi_upkstr(r);
109 assert(r->pkposition == 0 || r->pkposition == r->upkpos);
110 r->pkposition = r->upkpos;
113 printf(
"getkey return %s\n", s);
118 int nrnmpi_getid(bbsmpibuf* r) {
121 r->upkpos = r->keypos;
125 i = nrnmpi_upkint(r);
128 printf(
"getid return %d\n", i);
133 int nrnmpi_upkint(bbsmpibuf* r) {
135 unpack(&i, 1, my_MPI_INT, r,
"upkint");
139 double nrnmpi_upkdouble(bbsmpibuf* r) {
141 unpack(&x, 1, my_MPI_DOUBLE, r,
"upkdouble");
145 void nrnmpi_upkvec(
int n,
double* x, bbsmpibuf* r) {
146 unpack(x, n, my_MPI_DOUBLE, r,
"upkvec");
149 #if NRNMPI_DYNAMICLOAD 154 char* (*p_cxx_char_alloc)(
int len);
157 char* nrnmpi_upkstr(bbsmpibuf* r) {
160 unpack(&len, 1, my_MPI_INT, r,
"upkstr length");
161 #if NRNMPI_DYNAMICLOAD 162 s = (*p_cxx_char_alloc)(len+1);
166 unpack(s, len, my_MPI_CHAR, r,
"upkstr string");
171 char* nrnmpi_upkpickle(
size_t* size, bbsmpibuf* r) {
174 unpack(&len, 1, my_MPI_INT, r,
"upkpickle length");
176 #if NRNMPI_DYNAMICLOAD 177 s = (*p_cxx_char_alloc)(len+1);
181 unpack(s, len, my_MPI_PICKLE, r,
"upkpickle data");
185 static void resize(bbsmpibuf* r,
int size) {
187 if (r->size < size) {
188 newsize = (size/64)*64 + 128;
194 void nrnmpi_pkbegin(bbsmpibuf* r) {
197 hoc_execerror(
"subworld process with nhost > 0 cannot use",
"the bulletin board");
204 guard(MPI_Pack(&type, 1, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
207 void nrnmpi_enddata(bbsmpibuf* r) {
208 int p,
type, isize, oldsize;
214 guard(MPI_Pack_size(1, MPI_INT, nrn_bbs_comm, &isize));
216 resize(r, r->pkposition + isize);
218 if (oldsize < r->pkposition + isize) {
222 guard(MPI_Pack(&type, 1, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
224 printf(
"%d nrnmpi_enddata buf=%p size=%d pkposition=%d\n",
nrnmpi_myid_bbs, r->buf, r->size, r->pkposition);
226 guard(MPI_Pack(&p, 1, MPI_INT, r->buf, r->size, &type, nrn_bbs_comm));
228 printf(
"%d after nrnmpi_enddata, %d was packed at beginning and 0 was packed before %d\n",
nrnmpi_myid_bbs, p, r->pkposition);
232 static void pack(
void* inbuf,
int incount,
int my_datatype, bbsmpibuf* r,
const char*
e) {
234 int dsize, isize, oldsize;
236 printf(
"%d pack %p count=%d type=%d outbuf-%p pkposition=%d %s\n",
nrnmpi_myid_bbs, r, incount, my_datatype, r->buf, r->pkposition, e);
238 guard(MPI_Pack_size(incount, mytypes[my_datatype], nrn_bbs_comm, &dsize));
239 guard(MPI_Pack_size(2, MPI_INT, nrn_bbs_comm, &isize));
241 resize(r, r->pkposition + dsize + isize);
243 if (oldsize < r->pkposition + dsize + isize) {
247 type[0] = my_datatype; type[1] = incount;
248 guard(MPI_Pack(type, 2, MPI_INT, r->buf, r->size, &r->pkposition, nrn_bbs_comm));
249 guard(MPI_Pack(inbuf, incount, mytypes[my_datatype], r->buf, r->size, &r->pkposition, nrn_bbs_comm));
255 void nrnmpi_pkint(
int i, bbsmpibuf* r) {
258 pack(&ii, 1, my_MPI_INT, r,
"pkint");
261 void nrnmpi_pkdouble(
double x, bbsmpibuf* r) {
264 pack(&xx, 1, my_MPI_DOUBLE, r,
"pkdouble");
267 void nrnmpi_pkvec(
int n,
double* x, bbsmpibuf* r) {
268 pack(x, n, my_MPI_DOUBLE, r,
"pkvec");
271 void nrnmpi_pkstr(
const char*
s, bbsmpibuf* r) {
274 pack(&len, 1, my_MPI_INT, r,
"pkstr length");
275 pack((
char*)s, len, my_MPI_CHAR, r,
"pkstr string");
278 void nrnmpi_pkpickle(
const char* s,
size_t size, bbsmpibuf* r) {
280 pack(&len, 1, my_MPI_INT, r,
"pkpickle length");
281 pack((
char*)s, len, my_MPI_PICKLE, r,
"pkpickle data");
284 void nrnmpi_bbssend(
int dest,
int tag, bbsmpibuf* r) {
286 printf(
"%d nrnmpi_bbssend %p dest=%d tag=%d size=%d\n",
nrnmpi_myid_bbs, r, dest, tag, (r)?r->size:0);
293 int save_position = r->pkposition;
294 int save_upkpos = r->upkpos;
297 r->pkposition = r->upkpos;
298 nrnmpi_pkint(tag, r);
299 r->pkposition = save_position;
300 r->upkpos = save_upkpos;
305 assert( r->buf && r->keypos <= r->size);
306 guard(MPI_Send(r->buf, r->size, MPI_PACKED, dest, tag, nrn_bbs_comm));
308 guard(MPI_Send(
NULL, 0, MPI_PACKED, dest, tag, nrn_bbs_comm));
316 int nrnmpi_bbsrecv(
int source, bbsmpibuf* r) {
320 source = MPI_ANY_SOURCE;
325 guard(MPI_Probe(source, MPI_ANY_TAG, nrn_bbs_comm, &status));
326 guard(MPI_Get_count(&status, MPI_PACKED, &size));
328 printf(
"%d nrnmpi_bbsrecv probe size=%d source=%d tag=%d\n",
nrnmpi_myid_bbs, size, status.MPI_SOURCE, status.MPI_TAG);
331 guard(MPI_Recv(r->buf, r->size, MPI_PACKED, source, MPI_ANY_TAG, nrn_bbs_comm, &status));
339 if (status.MPI_TAG == 20) {
341 int save_upkpos = r->upkpos;
344 tag = nrnmpi_upkint(r);
345 r->upkpos = save_upkpos;
348 return status.MPI_TAG;
351 int nrnmpi_bbssendrecv(
int dest,
int tag, bbsmpibuf* s, bbsmpibuf* r) {
352 int size, itag, source;
358 if (!nrnmpi_iprobe(&size, &itag, &source) || source != dest) {
362 nrnmpi_bbssend(dest, tag, s);
364 return nrnmpi_bbsrecv(dest, r);
367 int nrnmpi_iprobe(
int* size,
int* tag,
int* source) {
370 guard(MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, nrn_bbs_comm, &flag, &status));
372 if (source) *source = status.MPI_SOURCE;
373 if (tag) *tag = status.MPI_TAG;
374 if (size) guard(MPI_Get_count(&status, MPI_PACKED, size));
379 void nrnmpi_probe(
int* size,
int* tag,
int* source) {
382 guard(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, nrn_bbs_comm, &status));
383 if (source) *source = status.MPI_SOURCE;
384 if (tag) *tag = status.MPI_TAG;
385 if (size) guard(MPI_Get_count(&status, MPI_PACKED, size));
388 bbsmpibuf* nrnmpi_newbuf(
int size) {
409 void nrnmpi_copy(bbsmpibuf* dest, bbsmpibuf* src){
412 for (i=0; i < src->size; ++
i) {
413 dest->buf[
i] = src->buf[
i];
415 dest->pkposition = src->pkposition;
416 dest->upkpos = src->upkpos;
417 dest->keypos = src->keypos;
420 static void nrnmpi_free(bbsmpibuf* buf){
433 void nrnmpi_ref(bbsmpibuf* buf) {
438 void nrnmpi_unref(bbsmpibuf* buf) {
441 if (buf->refcount <= 0) {
448 void nrnmpi_checkbufleak() {
449 if (nrnmpi_bufcnt_ > 0) {
static double pack(void *v)
int const size_t const size_t n
void hoc_execerror(const char *, const char *)
static double resize(void *v)
char * cxx_char_alloc(size_t sz)
void * hoc_Erealloc(void *buf, size_t size)
static double unpack(void *v)
void * hoc_Emalloc(size_t size)