mpiexec scalability improved!
Pete Wyckoff
pw at osc.edu
Wed Apr 12 12:49:53 EDT 2006
pw at osc.edu wrote on Tue, 11 Apr 2006 10:29 -0400:
> Hang onto your patch. I'll take a crack at converting gm.c to
> do periodic servicing without fork and you can see how you like
> that.
Are you willing to test my vision for GM async? Here's a patch.
It works here on 4 GM nodes on ia64, and the debug statements
appear to show it's doing the right things, but you may run
into issues at scale. I am curious to know if it is as fast
as your fork() version or the mpich-gm perl script.
-- Pete
-------------- next part --------------
Index: mpiexec.h
===================================================================
--- mpiexec.h (revision 357)
+++ mpiexec.h (working copy)
@@ -286,6 +286,7 @@
/* gm.c */
void prepare_gm_startup_ports(int gmpi_port[2]);
int read_gm_startup_ports(void);
+int service_gm_startup(int created_new_task);
/* ib.c */
int prepare_ib_startup_port(int *fd);
Index: gm.c
===================================================================
--- gm.c (revision 357)
+++ gm.c (working copy)
@@ -1,9 +1,9 @@
/*
* gm.c - code specific to initializing MPICH/GM or MPICH/MX
*
- * $Id: gm.c,v 1.10 2005/12/20 02:40:14 pw Exp $
+ * $Id$
*
- * Copyright (C) 2000-5 Pete Wyckoff <pw at osc.edu>
+ * Copyright (C) 2000-6 Pete Wyckoff <pw at osc.edu>
*
* Distributed under the GNU Public License Version 2 or later (See LICENSE)
*/
@@ -38,6 +38,16 @@
static int mpich_gm_version = -1;
+/* state of all the sockets */
+static int num_waiting_to_accept; /* first accept all numtasks */
+static int num_waiting_to_read; /* then read all the numtasks */
+#ifdef HAVE_POLL
+static struct pollfd *pfs;
+#else
+static fd_set rfs;
+static int fdmax;
+#endif
+
static tm_event_t scatter_gm_startup_ports(void);
/*
@@ -52,7 +62,7 @@
{
struct sockaddr_in sin;
socklen_t len = sizeof(sin);
- int i;
+ int flags, i;
for (i=0; i<2; i++) {
gmpi_fd[i] = socket(PF_INET, SOCK_STREAM, 0);
@@ -81,9 +91,101 @@
if (listen(gmpi_fd[i], 1024) < 0)
error_errno("%s: listen", __func__);
}
+
+ /*
+ * Poll for connection while checking if process died to avoid
+ * hanging due to gm startup problems.
+ */
+ flags = fcntl(gmpi_fd[0], F_GETFL);
+ if (flags < 0)
+ error_errno("%s: get listen socket flags", __func__);
+ if (fcntl(gmpi_fd[0], F_SETFL, flags | O_NONBLOCK) < 0)
+ error_errno("%s: set listen socket nonblocking", __func__);
+
+ /* alloc */
+ gmpi_info = Malloc(numtasks * sizeof(*gmpi_info));
+ for (i=0; i<numtasks; i++)
+ gmpi_info[i].port = -1;
+
+#ifdef HAVE_POLL
+ pfs = Malloc((numtasks+1) * sizeof(*pfs));
+#else
+ FD_ZERO(&rfs);
+ fdmax = 0;
+#endif
+
+ num_waiting_to_accept = 0; /* incremented on each call to service... */
+ num_waiting_to_read = 0;
}
/*
+ * Read a newly accepted socket.
+ */
+static void read_gm_one(int fd)
+{
+ int magic, id, port, board, numanode, pid, remote_port;
+ int cc;
+ unsigned int node;
+ char s[1024];
+
+ cc = read_until(fd, s, sizeof(s), ">>>", 0);
+ if (cc < 0)
+ error_errno("%s: read", __func__);
+ if (cc == 0)
+ error("%s: eof", __func__);
+ if (sscanf(s, "<<<%d:%d:%d:%d:%u:%d:%d::%d>>>", &magic, &id,
+ &port, &board, &node, &numanode, &pid, &remote_port) == 8) {
+ /* format used by mpich-gm-1.2.5..10 and later */
+ if (mpich_gm_version < 0) {
+ mpich_gm_version = 12510;
+ debug(1, "%s: mpich gm or mx version %d", __func__,
+ mpich_gm_version);
+ }
+ else if (mpich_gm_version != 12510)
+ error("%s: expecting version 12150 got %d", __func__,
+ mpich_gm_version);
+ } else if (sscanf(s, "<<<%d:%d:%d:%d:%u:%d>>>", &magic, &id, &port,
+ &board, &node, &pid) == 6) {
+ /* format used by mpich-gm-1.2.4..8a */
+ if (mpich_gm_version < 0) {
+ mpich_gm_version = 1248;
+ debug(1, "%s: mpich gm or mx version %d", __func__,
+ mpich_gm_version);
+ }
+ else if (mpich_gm_version != 1248)
+ error("%s: expecting version 1248 got %d", __func__,
+ mpich_gm_version);
+ numanode = 0;
+ remote_port = 0;
+ } else {
+ error("%s: read gmpi_port#1 <<<...>>> string not recognized",
+ __func__);
+ }
+ if (magic != atoi(jobid))
+ error("%s: received bad magic %d", __func__, magic);
+ if (id < 0 || id >= numtasks)
+ error("%s: received id %d out of range", __func__, id);
+ if (gmpi_info[id].port != -1)
+ error("%s: received duplicate response for id %d", __func__, id);
+ gmpi_info[id].port = port;
+ gmpi_info[id].board = board;
+ gmpi_info[id].node = node;
+ gmpi_info[id].numanode = numanode;
+ gmpi_info[id].remote_port = remote_port;
+ gmpi_info[id].pid = pid;
+ debug(1, "%s: rank %d in, %d + %d left", __func__, id,
+ num_waiting_to_read + num_waiting_to_accept,
+ numtasks - numspawned);
+ if (cl_args->verbose >= 2) {
+ printf("%s: id %d port %d board %d node_id 0x%08x\n",
+ __func__, id, port, board, node);
+ printf(" numanode %d pid %5d remote_port %5d\n",
+ numanode, pid, remote_port);
+ }
+ close(fd);
+}
+
+/*
* Two big steps here. Listen for info from all processes, then put it
* together, and send it out when requested.
*
@@ -92,118 +194,32 @@
int
read_gm_startup_ports(void)
{
- int i;
int flags;
- /* max recv len for the <<<%d...>>> string from a single process,
- * never more than 8 ints and delimeters */
- char s[1024];
- gmpi_info = Malloc(numtasks * sizeof(*gmpi_info));
- for (i=0; i<numtasks; i++)
- gmpi_info[i].port = -1;
+ debug(1, "%s: waiting for checkin: %d to accept, %d to read", __func__,
+ num_waiting_to_accept, num_waiting_to_read);
- if (cl_args->verbose)
- printf("%s: waiting for info\n", __func__);
/*
- * Poll for connection while checking if process died to avoid
- * hanging due to gm startup problems.
+ * Watch the sockets until all clients have been accepted and sent
+ * their data.
*/
+ while (num_waiting_to_accept + num_waiting_to_read > 0) {
+ int ret = service_gm_startup(0);
+ if (ret < 0)
+ return 1;
+ if (ret == 0) /* did nothing, sleep a bit */
+ usleep(200000);
+ }
+
+ /*
+ * Put listen socket back in blocking, in case this is an old gm
+ * version that uses it for abort.
+ */
flags = fcntl(gmpi_fd[0], F_GETFL);
if (flags < 0)
- error_errno("%s: get listen socket flags", __func__);
- if (fcntl(gmpi_fd[0], F_SETFL, flags | O_NONBLOCK) < 0)
- error_errno("%s: set listen socket nonblocking", __func__);
-
- for (i=0; i<numtasks; i++) {
- int magic, id, port, board, numanode, pid, remote_port;
- unsigned int node;
- int cc, fd;
-
- for (;;) {
- fd = accept(gmpi_fd[0], 0, 0);
- if (fd >= 0)
- break;
- if (errno != EAGAIN)
- error_errno("%s: accept gmpi_port#1 iter %d", __func__, i);
-
- /* check to see if any process died, and abort if so */
- if (poll_events_until_obit()) {
- close(gmpi_fd[0]);
- return 1;
- }
- usleep(200000);
- }
-
- /*
- * Explictly turn off nonblocking. Some OSes (Mac and perhaps its
- * BSD ancestors) inherit socket flags from the listening one to
- * the newly accepted one. Others (like linux) reset all F_GETFL
- * flags to default. This should be harmless even if O_NONBLOCK
- * was already turned off.
- */
- flags = fcntl(fd, F_GETFL);
- if (flags < 0)
- error_errno("%s: get new socket flags", __func__);
- if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) < 0)
- error_errno("%s: set new socket blocking", __func__);
-
- /*
- * Read the new socket.
- */
- cc = read_until(fd, s, sizeof(s), ">>>", 0);
- if (cc < 0)
- error_errno("%s: read gmpi_port#1 iter %d", __func__, i);
- if (cc == 0)
- error("%s: eof in gmpi_port#1 iter %d", __func__, i);
- if (sscanf(s, "<<<%d:%d:%d:%d:%u:%d:%d::%d>>>", &magic, &id,
- &port, &board, &node, &numanode, &pid, &remote_port) == 8) {
- /* format used by mpich-gm-1.2.5..10 and later */
- if (mpich_gm_version < 0) {
- mpich_gm_version = 12510;
- debug(1, "%s: mpich gm or mx version %d\n", __func__,
- mpich_gm_version);
- }
- else if (mpich_gm_version != 12510)
- error("%s: read gmpi_port#1 iter %d expect version %d got %d",
- __func__, i, mpich_gm_version, 12510);
- } else if (sscanf(s, "<<<%d:%d:%d:%d:%u:%d>>>", &magic, &id, &port,
- &board, &node, &pid) == 6) {
- /* format used by mpich-gm-1.2.4..8a */
- if (mpich_gm_version < 0) {
- mpich_gm_version = 1248;
- debug(1, "%s: mpich gm or mx version %d\n", __func__,
- mpich_gm_version);
- }
- else if (mpich_gm_version != 1248)
- error("%s: read gmpi_port#1 iter %d expect version %d got %d",
- __func__, i, mpich_gm_version, 1248);
- numanode = 0;
- remote_port = 0;
- } else {
- error(
- "%s: read gmpi_port#1 iter %d <<<...>>> string not recognized",
- __func__, i);
- }
- if (magic != atoi(jobid))
- error("%s: received bad magic %d", __func__, magic);
- if (id < 0 || id >= numtasks)
- error("%s: received id %d out of range", __func__, id);
- if (gmpi_info[id].port != -1)
- error("%s: received duplicate response for id %d", __func__, id);
- gmpi_info[id].port = port;
- gmpi_info[id].board = board;
- gmpi_info[id].node = node;
- gmpi_info[id].numanode = numanode;
- gmpi_info[id].remote_port = remote_port;
- gmpi_info[id].pid = pid;
- if (cl_args->verbose) {
- printf("%s: id %d port %d board %d node_id 0x%08x\n",
- __func__, id, port, board, node);
- printf(" numanode %d pid %5d remote_port %5d\n",
- numanode, pid, remote_port);
- }
- close(fd);
- }
+ error_errno("%s: get socket flags", __func__);
+ if (fcntl(gmpi_fd[0], F_SETFL, flags & ~O_NONBLOCK) < 0)
+ error_errno("%s: set listen socket blocking", __func__);
close(gmpi_fd[0]);
return scatter_gm_startup_ports();
@@ -212,7 +228,7 @@
/*
* Second step: send all this info back out as they request new
* connections on the second part. No I do not understand why a
- * second connection is necessary.
+ * second connection is necessary, but that's gm.
*/
static int
scatter_gm_startup_ports(void)
@@ -393,6 +409,9 @@
close(gmpi_fd[1]);
growstr_free(g);
free(gmpi_info);
+#ifdef HAVE_POLL
+ free(pfs);
+#endif
/* signal stdio that it should pay attention to the abort_fd now */
if (fret == 0)
@@ -401,3 +420,110 @@
return fret;
}
+/*
+ * Check for incoming connections and read-readiness of existing sockets
+ * to keep process checking moving along. Called after every process
+ * startup to make sure no previously started tasks time out in their
+ * connect phase.
+ *
+ * Returns negative if error, 0 if did nothing, >0 if did something.
+ */
+int
+service_gm_startup(int created_new_task)
+{
+ int fd, ret = 0;
+ int numspawned_entry = numspawned;
+
+ if (created_new_task)
+ ++num_waiting_to_accept;
+
+ debug(2, "%s: %snew task, now accept wait %d", __func__,
+ created_new_task ? "" : "no ", num_waiting_to_accept);
+
+ /*
+ * If anything died, give up.
+ */
+ ret = poll_events_until_obit();
+ if (ret || numspawned_entry != numspawned) {
+ close(gmpi_fd[0]);
+ ret = -1;
+ goto out;
+ }
+
+ /*
+ * If there's a new connection to accept, do so and add it to the
+ * poll list for later reading.
+ */
+ fd = accept(gmpi_fd[0], 0, 0);
+ if (fd == -1) {
+ if (errno != EAGAIN)
+ error_errno("%s: accept", __func__);
+ } else {
+ int flags;
+
+ /*
+ * Explictly turn off nonblocking. Some OSes (Mac and perhaps its
+ * BSD ancestors) inherit socket flags from the listening one to
+ * the newly accepted one. Others (like linux) reset all F_GETFL
+ * flags to default. This should be harmless even if O_NONBLOCK
+ * was already turned off.
+ */
+ flags = fcntl(fd, F_GETFL);
+ if (flags < 0)
+ error_errno("%s: get new socket flags", __func__);
+ if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) < 0)
+ error_errno("%s: set new socket blocking", __func__);
+
+ --num_waiting_to_accept;
+ ++ret;
+ debug(2, "%s: accepted fd %d, accept wait %d", __func__, fd,
+ num_waiting_to_accept);
+
+ /* add to poll list */
+#ifdef HAVE_POLL
+ pfs[num_waiting_to_read].fd = fd;
+ pfs[num_waiting_to_read].events = POLLIN;
+ pfs[num_waiting_to_read].revents = 0;
+#else
+ FD_SET(fd, &rfs);
+ if (fd > fdmax)
+ fdmax = fd;
+#endif
+ ++num_waiting_to_read;
+ }
+
+ /*
+ * Poll for something to read.
+ */
+#ifdef HAVE_POLL
+ int k;
+ int pret = poll(pfs, num_waiting_to_read, 0);
+ if (pret < 0)
+ error_errno("%s: poll", __func__);
+ for (k=0; k<num_waiting_to_read; k++) {
+ if (pfs[k].revents & (POLLIN | POLLHUP)) {
+ fd = pfs[k].fd;
+ pfs[k] = pfs[num_waiting_to_read-1]; /* bubble up */
+ --k;
+#else /* }} */
+ struct timeval tv = { 0, 0 };
+ fd_set trfs = rfs;
+ int sret = select(fdmax+1, &trfs, 0, 0, &tv);
+ if (sret < 0)
+ error_errno("%s: select", __func__);
+ for (fd=0; fd<fdmax; fd++) {
+ if (FD_ISSET(fd, &trfs)) {
+ FD_CLR(fd, &rfs);
+#endif
+ --num_waiting_to_read;
+ ++ret;
+ debug(2, "%s: reading fd %d, read wait %d", __func__, fd,
+ num_waiting_to_read);
+ read_gm_one(fd);
+ }
+ }
+
+ out:
+ return ret;
+}
+
Index: start_tasks.c
===================================================================
--- start_tasks.c (revision 357)
+++ start_tasks.c (working copy)
@@ -648,6 +648,19 @@
break;
}
}
+ if (cl_args->comm == COMM_MPICH_GM) {
+ int one = 1;
+ for (;;) {
+ ret = service_gm_startup(one);
+ one = 0; /* only report the new task that first time */
+ if (ret < 0) {
+ ret = 1;
+ goto out;
+ }
+ if (ret == 0) /* nothing accomplished */
+ break;
+ }
+ }
}
/* don't need these anymore */
@@ -720,6 +733,17 @@
break;
}
}
+ if (cl_args->comm == COMM_MPICH_GM) {
+ for (;;) {
+ ret = service_gm_startup(0);
+ if (ret < 0) {
+ ret = 1;
+ break;
+ }
+ if (ret == 0) /* nothing accomplished */
+ break;
+ }
+ }
ep = poll_event();
if (ep)
@@ -727,7 +751,9 @@
else
usleep(200000);
- if (numtasks_waiting_start_entry != numtasks_waiting_start)
+ debug(2, "%s: nwse %d nws %d ret %d", __func__,
+ numtasks_waiting_start_entry, numtasks_waiting_start, ret);
+ if (ret || numtasks_waiting_start_entry != numtasks_waiting_start)
break;
}
More information about the mpiexec
mailing list