mpiexec scalability improved!

Pete Wyckoff pw at osc.edu
Wed Apr 12 12:49:53 EDT 2006


pw at osc.edu wrote on Tue, 11 Apr 2006 10:29 -0400:
> Hang onto your patch.  I'll take a crack at converting gm.c to
> do periodic servicing without fork and you can see how you like
> that.

Are you willing to test my vision for GM async?  Here's a patch.
It works here on 4 GM nodes on ia64, and the debug statements
appear to show it's doing the right things, but you may run
into issues at scale.  I am curious to know if it is as fast
as your fork() version or the mpich-gm perl script.

		-- Pete
-------------- next part --------------
Index: mpiexec.h
===================================================================
--- mpiexec.h	(revision 357)
+++ mpiexec.h	(working copy)
@@ -286,6 +286,7 @@
 /* gm.c */
 void prepare_gm_startup_ports(int gmpi_port[2]);
 int read_gm_startup_ports(void);
+int service_gm_startup(int created_new_task);
 
 /* ib.c */
 int prepare_ib_startup_port(int *fd);
Index: gm.c
===================================================================
--- gm.c	(revision 357)
+++ gm.c	(working copy)
@@ -1,9 +1,9 @@
 /*
  * gm.c - code specific to initializing MPICH/GM or MPICH/MX
  *
- * $Id: gm.c,v 1.10 2005/12/20 02:40:14 pw Exp $
+ * $Id$
  *
- * Copyright (C) 2000-5 Pete Wyckoff <pw at osc.edu>
+ * Copyright (C) 2000-6 Pete Wyckoff <pw at osc.edu>
  *
  * Distributed under the GNU Public License Version 2 or later (See LICENSE)
  */
@@ -38,6 +38,16 @@
 
 static int mpich_gm_version = -1;
 
+/* state of all the sockets */
+static int num_waiting_to_accept;  /* first accept all numtasks */
+static int num_waiting_to_read;    /* then read all the numtasks */
+#ifdef HAVE_POLL
+static struct pollfd *pfs;
+#else
+static fd_set rfs;
+static int fdmax;
+#endif
+
 static tm_event_t scatter_gm_startup_ports(void);
 
 /*
@@ -52,7 +62,7 @@
 {
     struct sockaddr_in sin;
     socklen_t len = sizeof(sin);
-    int i;
+    int flags, i;
 
     for (i=0; i<2; i++) {
 	gmpi_fd[i] = socket(PF_INET, SOCK_STREAM, 0);
@@ -81,9 +91,101 @@
 	if (listen(gmpi_fd[i], 1024) < 0)
 	    error_errno("%s: listen", __func__);
     }
+
+    /*
+     * Poll for connection while checking if process died to avoid
+     * hanging due to gm startup problems.
+     */
+    flags = fcntl(gmpi_fd[0], F_GETFL);
+    if (flags < 0)
+	error_errno("%s: get listen socket flags", __func__);
+    if (fcntl(gmpi_fd[0], F_SETFL, flags | O_NONBLOCK) < 0)
+	error_errno("%s: set listen socket nonblocking", __func__);
+
+    /* alloc */
+    gmpi_info = Malloc(numtasks * sizeof(*gmpi_info));
+    for (i=0; i<numtasks; i++)
+	gmpi_info[i].port = -1;
+
+#ifdef HAVE_POLL
+    pfs = Malloc((numtasks+1) * sizeof(*pfs));
+#else
+    FD_ZERO(&rfs);
+    fdmax = 0;
+#endif
+
+    num_waiting_to_accept = 0;  /* incremented on each call to service... */
+    num_waiting_to_read = 0;
 }
 
 /*
+ * Read a newly accepted socket.
+ */
+static void read_gm_one(int fd)
+{
+    int magic, id, port, board, numanode, pid, remote_port;
+    int cc;
+    unsigned int node;
+    char s[1024];
+
+    cc = read_until(fd, s, sizeof(s), ">>>", 0);
+    if (cc < 0)
+	error_errno("%s: read", __func__);
+    if (cc == 0)
+	error("%s: eof", __func__);
+    if (sscanf(s, "<<<%d:%d:%d:%d:%u:%d:%d::%d>>>", &magic, &id,
+               &port, &board, &node, &numanode, &pid, &remote_port) == 8) {
+	/* format used by mpich-gm-1.2.5..10 and later */
+	if (mpich_gm_version < 0) {
+	    mpich_gm_version = 12510;
+	    debug(1, "%s: mpich gm or mx version %d", __func__,
+	          mpich_gm_version);
+	}
+	else if (mpich_gm_version != 12510)
+	    error("%s: expecting version 12150 got %d", __func__,
+	          mpich_gm_version);
+    } else if (sscanf(s, "<<<%d:%d:%d:%d:%u:%d>>>", &magic, &id, &port,
+                      &board, &node, &pid) == 6) {
+	/* format used by mpich-gm-1.2.4..8a */
+	if (mpich_gm_version < 0) {
+	    mpich_gm_version = 1248;
+	    debug(1, "%s: mpich gm or mx version %d", __func__,
+	          mpich_gm_version);
+	}
+	else if (mpich_gm_version != 1248)
+	    error("%s: expecting version 1248 got %d", __func__,
+	          mpich_gm_version);
+	numanode = 0;
+	remote_port = 0;
+    } else {
+	error("%s: read gmpi_port#1 <<<...>>> string not recognized",
+	      __func__);
+    }
+    if (magic != atoi(jobid))
+	error("%s: received bad magic %d", __func__, magic);
+    if (id < 0 || id >= numtasks)
+	error("%s: received id %d out of range", __func__, id);
+    if (gmpi_info[id].port != -1)
+	error("%s: received duplicate response for id %d", __func__, id);
+    gmpi_info[id].port = port;
+    gmpi_info[id].board = board;
+    gmpi_info[id].node = node;
+    gmpi_info[id].numanode = numanode;
+    gmpi_info[id].remote_port = remote_port;
+    gmpi_info[id].pid = pid;
+    debug(1, "%s: rank %d in, %d + %d left", __func__, id,
+	  num_waiting_to_read + num_waiting_to_accept,
+	  numtasks - numspawned);
+    if (cl_args->verbose >= 2) {
+	printf("%s: id %d port %d board %d node_id 0x%08x\n",
+	  __func__, id, port, board, node);
+	printf("  numanode %d pid %5d remote_port %5d\n",
+	  numanode, pid, remote_port);
+    }
+    close(fd);
+}
+
+/*
  * Two big steps here.  Listen for info from all processes, then put it
  * together, and send it out when requested.
  *
@@ -92,118 +194,32 @@
 int
 read_gm_startup_ports(void)
 {
-    int i;
     int flags;
-    /* max recv len for the <<<%d...>>> string from a single process,
-     * never more than 8 ints and delimeters */
-    char s[1024];
 
-    gmpi_info = Malloc(numtasks * sizeof(*gmpi_info));
-    for (i=0; i<numtasks; i++)
-	gmpi_info[i].port = -1;
+    debug(1, "%s: waiting for checkin: %d to accept, %d to read", __func__,
+      num_waiting_to_accept, num_waiting_to_read);
 
-    if (cl_args->verbose)
-	printf("%s: waiting for info\n", __func__);
     /*
-     * Poll for connection while checking if process died to avoid
-     * hanging due to gm startup problems.
+     * Watch the sockets until all clients have been accepted and sent
+     * their data.
      */
+    while (num_waiting_to_accept + num_waiting_to_read > 0) {
+	int ret = service_gm_startup(0);
+	if (ret < 0)
+	    return 1;
+	if (ret == 0)  /* did nothing, sleep a bit */
+	    usleep(200000);
+    }
+
+    /*
+     * Put listen socket back in blocking, in case this is an old gm
+     * version that uses it for abort.
+     */
     flags = fcntl(gmpi_fd[0], F_GETFL);
     if (flags < 0)
-	error_errno("%s: get listen socket flags", __func__);
-    if (fcntl(gmpi_fd[0], F_SETFL, flags | O_NONBLOCK) < 0)
-	error_errno("%s: set listen socket nonblocking", __func__);
-
-    for (i=0; i<numtasks; i++) {
-	int magic, id, port, board, numanode, pid, remote_port;
-	unsigned int node;
-	int cc, fd;
-
-	for (;;) {
-	    fd = accept(gmpi_fd[0], 0, 0);
-	    if (fd >= 0)
-		break;
-	    if (errno != EAGAIN)
-		error_errno("%s: accept gmpi_port#1 iter %d", __func__, i);
-
-	    /* check to see if any process died, and abort if so */
-	    if (poll_events_until_obit()) {
-		close(gmpi_fd[0]);
-		return 1;
-	    }
-	    usleep(200000);
-	}
-
-	/*
-	 * Explictly turn off nonblocking.  Some OSes (Mac and perhaps its
-	 * BSD ancestors) inherit socket flags from the listening one to
-	 * the newly accepted one.  Others (like linux) reset all F_GETFL
-	 * flags to default.  This should be harmless even if O_NONBLOCK
-	 * was already turned off.
-	 */
-	flags = fcntl(fd, F_GETFL);
-	if (flags < 0)
-	    error_errno("%s: get new socket flags", __func__);
-	if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) < 0)
-	    error_errno("%s: set new socket blocking", __func__);
-
-	/*
-	 * Read the new socket.
-	 */
-	cc = read_until(fd, s, sizeof(s), ">>>", 0);
-	if (cc < 0)
-	    error_errno("%s: read gmpi_port#1 iter %d", __func__, i);
-	if (cc == 0)
-	    error("%s: eof in gmpi_port#1 iter %d", __func__, i);
-	if (sscanf(s, "<<<%d:%d:%d:%d:%u:%d:%d::%d>>>", &magic, &id,
-	  &port, &board, &node, &numanode, &pid, &remote_port) == 8) {
-	    /* format used by mpich-gm-1.2.5..10 and later */
-	    if (mpich_gm_version < 0) {
-		mpich_gm_version = 12510;
-		debug(1, "%s: mpich gm or mx version %d\n", __func__,
-		  mpich_gm_version);
-	    }
-	    else if (mpich_gm_version != 12510)
-		error("%s: read gmpi_port#1 iter %d expect version %d got %d",
-		  __func__, i, mpich_gm_version, 12510);
-	} else if (sscanf(s, "<<<%d:%d:%d:%d:%u:%d>>>", &magic, &id, &port,
-	  &board, &node, &pid) == 6) {
-	    /* format used by mpich-gm-1.2.4..8a */
-	    if (mpich_gm_version < 0) {
-		mpich_gm_version = 1248;
-		debug(1, "%s: mpich gm or mx version %d\n", __func__,
-		  mpich_gm_version);
-	    }
-	    else if (mpich_gm_version != 1248)
-		error("%s: read gmpi_port#1 iter %d expect version %d got %d",
-		  __func__, i, mpich_gm_version, 1248);
-	    numanode = 0;
-	    remote_port = 0;
-	} else {
-	    error(
-	      "%s: read gmpi_port#1 iter %d <<<...>>> string not recognized",
-	      __func__, i);
-	}
-	if (magic != atoi(jobid))
-	    error("%s: received bad magic %d", __func__, magic);
-	if (id < 0 || id >= numtasks)
-	    error("%s: received id %d out of range", __func__, id);
-	if (gmpi_info[id].port != -1)
-	    error("%s: received duplicate response for id %d", __func__, id);
-	gmpi_info[id].port = port;
-	gmpi_info[id].board = board;
-	gmpi_info[id].node = node;
-	gmpi_info[id].numanode = numanode;
-	gmpi_info[id].remote_port = remote_port;
-	gmpi_info[id].pid = pid;
-	if (cl_args->verbose) {
-	    printf("%s: id %d port %d board %d node_id 0x%08x\n",
-	      __func__, id, port, board, node);
-	    printf("  numanode %d pid %5d remote_port %5d\n",
-	      numanode, pid, remote_port);
-	}
-	close(fd);
-    }
+	error_errno("%s: get socket flags", __func__);
+    if (fcntl(gmpi_fd[0], F_SETFL, flags & ~O_NONBLOCK) < 0)
+	error_errno("%s: set listen socket blocking", __func__);
     close(gmpi_fd[0]);
 
     return scatter_gm_startup_ports();
@@ -212,7 +228,7 @@
 /*
  * Second step:  send all this info back out as they request new
  * connections on the second part.  No I do not understand why a
- * second connection is necessary.
+ * second connection is necessary, but that's gm.
  */
 static int
 scatter_gm_startup_ports(void)
@@ -393,6 +409,9 @@
     close(gmpi_fd[1]);
     growstr_free(g);
     free(gmpi_info);
+#ifdef HAVE_POLL
+    free(pfs);
+#endif
 
     /* signal stdio that it should pay attention to the abort_fd now */
     if (fret == 0)
@@ -401,3 +420,110 @@
     return fret;
 }
 
+/*
+ * Check for incoming connections and read-readiness of existing sockets
+ * to keep process checking moving along.  Called after every process
+ * startup to make sure no previously started tasks time out in their
+ * connect phase.
+ *
+ * Returns negative if error, 0 if did nothing, >0 if did something.
+ */
+int
+service_gm_startup(int created_new_task)
+{
+    int fd, ret = 0;
+    int numspawned_entry = numspawned;
+
+    if (created_new_task)
+	++num_waiting_to_accept;
+
+    debug(2, "%s: %snew task, now accept wait %d", __func__,
+          created_new_task ? "" : "no ", num_waiting_to_accept);
+
+    /*
+     * If anything died, give up.
+     */
+    ret = poll_events_until_obit();
+    if (ret || numspawned_entry != numspawned) {
+	close(gmpi_fd[0]);
+	ret = -1;
+	goto out;
+    }
+
+    /*
+     * If there's a new connection to accept, do so and add it to the
+     * poll list for later reading.
+     */
+    fd = accept(gmpi_fd[0], 0, 0);
+    if (fd == -1) {
+	if (errno != EAGAIN)
+	    error_errno("%s: accept", __func__);
+    } else {
+	int flags;
+
+	/*
+	 * Explictly turn off nonblocking.  Some OSes (Mac and perhaps its
+	 * BSD ancestors) inherit socket flags from the listening one to
+	 * the newly accepted one.  Others (like linux) reset all F_GETFL
+	 * flags to default.  This should be harmless even if O_NONBLOCK
+	 * was already turned off.
+	 */
+	flags = fcntl(fd, F_GETFL);
+	if (flags < 0)
+	    error_errno("%s: get new socket flags", __func__);
+	if (fcntl(fd, F_SETFL, flags & ~O_NONBLOCK) < 0)
+	    error_errno("%s: set new socket blocking", __func__);
+
+	--num_waiting_to_accept;
+	++ret;
+	debug(2, "%s: accepted fd %d, accept wait %d", __func__, fd,
+	  num_waiting_to_accept);
+
+	/* add to poll list */
+#ifdef HAVE_POLL
+	pfs[num_waiting_to_read].fd = fd;
+	pfs[num_waiting_to_read].events = POLLIN;
+	pfs[num_waiting_to_read].revents = 0;
+#else
+	FD_SET(fd, &rfs);
+	if (fd > fdmax)
+	    fdmax = fd;
+#endif
+	++num_waiting_to_read;
+    }
+
+    /*
+     * Poll for something to read.
+     */
+#ifdef HAVE_POLL
+    int k;
+    int pret = poll(pfs, num_waiting_to_read, 0);
+    if (pret < 0)
+	error_errno("%s: poll", __func__);
+    for (k=0; k<num_waiting_to_read; k++) {
+	if (pfs[k].revents & (POLLIN | POLLHUP)) {
+	    fd = pfs[k].fd;
+	    pfs[k] = pfs[num_waiting_to_read-1];  /* bubble up */
+	    --k;
+#else  /* }} */
+    struct timeval tv = { 0, 0 };
+    fd_set trfs = rfs;
+    int sret = select(fdmax+1, &trfs, 0, 0, &tv);
+    if (sret < 0)
+	error_errno("%s: select", __func__);
+    for (fd=0; fd<fdmax; fd++) {
+	if (FD_ISSET(fd, &trfs)) {
+	    FD_CLR(fd, &rfs);
+#endif
+	    --num_waiting_to_read;
+	    ++ret;
+	    debug(2, "%s: reading fd %d, read wait %d", __func__, fd,
+	          num_waiting_to_read);
+	    read_gm_one(fd);
+	}
+    }
+
+  out:
+    return ret;
+}
+
Index: start_tasks.c
===================================================================
--- start_tasks.c	(revision 357)
+++ start_tasks.c	(working copy)
@@ -648,6 +648,19 @@
 		    break;
 	    }
 	}
+	if (cl_args->comm == COMM_MPICH_GM) {
+	    int one = 1;
+	    for (;;) {
+		ret = service_gm_startup(one);
+		one = 0;  /* only report the new task that first time */
+		if (ret < 0) {
+		    ret = 1;
+		    goto out;
+		}
+		if (ret == 0)  /* nothing accomplished */
+		    break;
+	    }
+	}
     }
 
     /* don't need these anymore */
@@ -720,6 +733,17 @@
 		    break;
 	    }
 	}
+	if (cl_args->comm == COMM_MPICH_GM) {
+	    for (;;) {
+		ret = service_gm_startup(0);
+		if (ret < 0) {
+		    ret = 1;
+		    break;
+		}
+		if (ret == 0)  /* nothing accomplished */
+		    break;
+	    }
+	}
 
 	ep = poll_event();
 	if (ep)
@@ -727,7 +751,9 @@
 	else
 	    usleep(200000);
 
-	if (numtasks_waiting_start_entry != numtasks_waiting_start)
+	debug(2, "%s: nwse %d nws %d ret %d", __func__,
+	      numtasks_waiting_start_entry, numtasks_waiting_start, ret);
+	if (ret || numtasks_waiting_start_entry != numtasks_waiting_start)
 	    break;
     }
 


More information about the mpiexec mailing list