mpiexec patch for very large jobs

Alex korobka at nankai.edu.cn
Mon Sep 20 13:19:01 EDT 2004


Here it is, the previous one did not check for closed connections (poll() needs 
an explicit POLLHUP flag) which, as far as I remember, led to problems with the 
-allstdin option.

Alex Korobka

ÔÚÄúµÄÀ´ÐÅÖÐÔø¾­Ìáµ½:
>From: Pete Wyckoff <pw at osc.edu>
>Reply-To: 
>To: Alex <korobka at nankai.edu.cn>
>Subject: Re: mpiexec patch for very large jobs
>Date:Fri, 17 Sep 2004 13:50:40 -0400
>
>korobka at nankai.edu.cn wrote on Fri, 17 Sep 2004 11:19 +0800:
> > I have an update to this, it fixes a corner case. I will send it next week
> > after I get back to the office.
> 
> Great.  I haven't started integrating your previous patch yet, so this
> update from you will be quite timely.
> 
> 		-- Pete
>


-------------- next part --------------
diff -urd mpiexec-0.76/start_tasks.c mpiexec-0.76.poll/start_tasks.c
--- mpiexec-0.76/start_tasks.c	2004-04-20 01:01:39.000000000 +0800
+++ mpiexec-0.76.poll/start_tasks.c	2004-05-29 13:43:16.000000000 +0800
@@ -16,6 +16,7 @@
 #include <errno.h>
 #include <pwd.h>
 #include <sys/time.h>
+#include <sys/poll.h>
 #include <signal.h>
 #include <netinet/in.h>
 #include <sys/socket.h>
@@ -130,7 +131,7 @@
 static void
 env_add_int(const char *name, int value)
 {
-    char s[1024];
+    char s[4096];
 
     sprintf(s, "%d", value);
     env_add(name, s);
@@ -744,7 +745,7 @@
 	if (getsockname(gmpi_fd[i], (struct sockaddr *) &sin, &len) < 0)
 	    error_errno("%s: getsockname", __func__);
 	gmpi_port[i] = ntohs(sin.sin_port);
-	if (listen(gmpi_fd[i], 1024) < 0)
+	if (listen(gmpi_fd[i], 4096) < 0)
 	    error_errno("%s: listen", __func__);
     }
 }
@@ -771,7 +772,7 @@
 read_gm_startup_ports(void)
 {
     int i;
-    char s[64*1024];  /* max recv len */
+    char s[64*4096];  /* max recv len */
     int flags;
 
     gmpi_info = Malloc(numtask * sizeof(*gmpi_info));
@@ -903,7 +904,7 @@
     growstr_append(g, "|||");
 
     for (i=0; i<numtask; i++) {
-	char s[64*1024];  /* max recv len */
+	char s[64*4096];  /* max recv len */
 	int id, j, cc, fd = 0;
 	growstr_t *h;
 	
@@ -982,12 +983,12 @@
 	    for (;;) {
 		tm_event_t evt;
 		int ret;
-		struct timeval tv = { 0, 200000 };
-		fd_set wfs;
+		struct pollfd pfs;
 
-		FD_ZERO(&wfs);
-		FD_SET(fd, &wfs);
-		ret = select(fd+1, 0, &wfs, 0, &tv);
+		pfs.fd = fd;
+		pfs.events = POLLOUT;
+		pfs.revents = 0;
+		ret = poll( &pfs, 1, 200 );
 		if (ret == 1) {
 		    int f, flen = sizeof(f);
 		    ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &f, &flen);
@@ -1002,10 +1003,10 @@
 		    break;
 		}
 		if (ret < 0)
-		    error_errno("%s: select waiting connect iter %d",
+		    error_errno("%s: poll waiting connect iter %d",
 		      __func__, i);
 		if (ret != 0)
-		    error("%s: really not expecting select to return %d",
+		    error("%s: really not expecting poll to return %d",
 		      __func__, ret);
 
 		/* check to see if any process died, and abort if so */
@@ -1036,10 +1037,11 @@
 	growstr_append(h, "]]]");
 	cc = write_full(fd, h->s, h->len);
 	if (cc < 0)
-	    error_errno("%s: write gmpi_port#2 iter %d", __func__, i);
+	    error_errno("%s: write gmpi_port#2 iter %d, fd %d, task %s, size %i\n", __func__, i, fd, tasks[id].name, h->len );
 	close(fd);
 	growstr_free(h);
-    }
+    } /* for numtasks */
+
     /* do not close gmpi_fd[1], it has been passed to stdio listener */
     /* why not?  seems if he has a copy, we can always close it */
     close(gmpi_fd[1]);
@@ -1070,7 +1072,7 @@
 	error_errno("%s: bind", __func__);
     if (getsockname(mport_fd, (struct sockaddr *) &sin, &len) < 0)
 	error_errno("%s: getsockname", __func__);
-    if (listen(mport_fd, 1024) < 0)
+    if (listen(mport_fd, 4096) < 0)
 	error_errno("%s: listen", __func__);
     *fd = mport_fd;
     return ntohs(sin.sin_port);
@@ -1275,7 +1277,7 @@
 	error_errno("%s: bind", __func__);
     if (getsockname(*pmi_fd, (struct sockaddr *) &sin, &len) < 0)
 	error_errno("%s: getsockname", __func__);
-    if (listen(*pmi_fd, 1024) < 0)
+    if (listen(*pmi_fd, 4096) < 0)
 	error_errno("%s: listen", __func__);
     return ntohs(sin.sin_port);
 }
diff -urd mpiexec-0.76/stdio.c mpiexec-0.76.poll/stdio.c
--- mpiexec-0.76/stdio.c	2004-04-20 01:01:39.000000000 +0800
+++ mpiexec-0.76.poll/stdio.c	2004-05-29 13:58:12.000000000 +0800
@@ -23,6 +23,7 @@
 #include <signal.h>  /* sun location */
 #include <sys/signal.h>  /* linux location */
 #include <sys/socket.h>
+#include <sys/poll.h>
 #include <netinet/in.h>
 #include <arpa/inet.h>
 #include "mpiexec.h"
@@ -81,7 +82,9 @@
 static fd_state_t *fds;
 
 /* global set of readables */
-static fd_set rfs;
+static struct pollfd *pfs;
+static int   *pfsmap;
+static int   numfs;
 
 /* extra fd where MPI_Abort callers will connect in mpich-gm */
 static int abort_fd_array[2];
@@ -144,6 +147,63 @@
     }
 }
 
+static void poll_set(int fd, short events)
+{
+    int n = pfsmap[fd];
+
+    if( n ) /* already in the set */
+    {
+	n--;
+	pfs[n].events |= events;
+    }
+    else
+    {
+	pfs[numfs].fd = fd;
+	pfs[numfs].events = events;
+	pfsmap[fd] = ++numfs;
+    }
+}
+
+static int poll_isset(int fd, short events)
+{
+    int n = pfsmap[fd];
+
+    if( n-- )
+    {
+	return (pfs[n].revents & events);
+    }
+    return 0;
+}
+
+static void poll_clr(int fd, short events)
+{
+    int n = pfsmap[fd];
+
+    if( n-- )
+    {
+	pfs[n].revents &= ~events;
+    }
+    else
+	printf("%s: stray fd: %i !!!\n", __func__, fd);
+}
+
+static void poll_del(int fd)
+{
+    int n = pfsmap[fd];
+
+    if( n-- )
+    {
+	int m = numfs - 1;
+
+	pfs[n] = pfs[m];		/* move last entry into the hole */
+	pfsmap[pfs[n].fd] = n + 1;	/* update reverse mapping */
+	pfsmap[fd] = 0;			/* zero out mapping for the removed fd */
+	numfs--;
+    }
+    else
+	printf("%s: stray fd: %i !!!\n", __func__, fd);
+}
+
 /*
  * Main entry point:  fork a process to handle process streams.
  * Assumes 0,1,2 are the actual stdin,stdout,stderr where input
@@ -203,6 +263,11 @@
 	error("stdio_fork: need %d sockets, only %d available in system",
 	  n+6, maxfd);
 
+    pfs = Malloc( sizeof(struct pollfd) * maxfd );
+    pfsmap = Malloc( sizeof(int) * maxfd );
+    memset( pfsmap, 0, sizeof(int) * maxfd );
+    numfs = 0;
+
     /*
      * Must setup listener sockets before fork so valid port numbers
      * can be handed back and the spawn can continue.
@@ -383,7 +448,7 @@
 
     if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0)
 	error_errno("%s: socket", __func__);
-    if (listen(s, 1024) < 0)
+    if (listen(s, 4096) < 0)
 	error_errno("%s: listen", __func__);
     if (getsockname(s, (struct sockaddr *)&addr, &len) < 0)
 	error_errno("%s: getsockname", __func__);
@@ -392,7 +457,7 @@
 }
 
 /*
- * Close a socket and remove it from the select set.  (Not for
+ * Close a socket and remove it from the polled set.  (Not for
  * listener sockets or aggregate fds.)
  */
 static void
@@ -437,19 +502,19 @@
 
     /* ignore this socket, but leave it linked onto the print queue, if it is */
     fds[s].which = NONE;
-    FD_CLR(s, &rfs);
+    poll_del(s);
 }
 
 /*
  * Close a listener socket (or stdin file/socket).
- * Also remove it from rfs.  n == 0,1,2.
+ * Also remove it from the polled set.  n == 0,1,2.
  */
 static void
 close_listener(int n)
 {
     if (close(listener[n]))  /* stop listening */
 	error_errno("%s: close %d at fd %d", __func__, n, listener[n]);
-    FD_CLR(listener[n], &rfs);
+    poll_del(listener[n]);
     listener[n] = -1;
 #if 0
     if (n == OUT || n == ERR)  /* just the _real_ listeners, not my own stdin */
@@ -468,7 +533,7 @@
     int i;
 
     close(aggregate[IN]);
-    FD_CLR(aggregate[IN], &rfs);
+    poll_del(aggregate[IN]);
     aggregate[IN] = -1;
 
     /* tell processes there is no more stdin */
@@ -641,7 +706,7 @@
 }
 
 /*
- * Listening socket has found a new one, as reported by select.
+ * Listening socket has found a new one, as reported by poll.
  * Accept it and pay attention to it.
  */
 static void
@@ -674,10 +739,10 @@
 	if (which == IN) {
 	    if (aggregate[IN] >= 0)
 		/* begin to listen to the stdin of mpiexec itself */
-		FD_SET(aggregate[IN], &rfs);
+		poll_set(aggregate[IN], POLLIN);
 	}
     }
-    FD_SET(t, &rfs);
+    poll_set(t, POLLIN);
 }
 
 /*
@@ -727,7 +792,7 @@
     }
     close(fd);
     close(abort_fd);
-    FD_CLR(abort_fd, &rfs);
+    poll_del(abort_fd);
     abort_fd_used = 1;
     /* let parent deal with it, he'll tell us to exit later */
     kill(getppid(), SIGALRM);
@@ -740,7 +805,7 @@
 static void ATTR_NORETURN
 do_child(void)
 {
-    int i;
+    int i, j;
 
     /* allocate socket storage */
     fds = (fd_state_t *) Malloc(maxfd * sizeof(*fds));
@@ -752,32 +817,31 @@
 	memset(&fds[i].sin, 0, sizeof(fds[i].sin));
     }
 
-    /* initial select source */
-    FD_ZERO(&rfs);
+    /* initialize poll structures  */
+    memset((void*)pfs, 0, sizeof(struct pollfd) * maxfd );
+    memset((void*)pfsmap, 0, sizeof(int) * maxfd );
+    numfs = 0;
+
     /* put in listeners */
     for (i=0; i<3; i++)
 	if (listener[i] >= 0)
-	    FD_SET(listener[i], &rfs);
+	    poll_set( listener[i], POLLIN );
     if (pmi_listen_fd >= 0)
-	FD_SET(pmi_listen_fd, &rfs);
-    /* writeable sockets do not get selected on, just hope they can take it */
+	poll_set( pmi_listen_fd, POLLIN );
+
+    /* writeable sockets do not get polled on, just hope they can take it */
 
     /*
-     * Infinite blocking select-driven loop.
+     * Infinite blocking poll-driven loop.
      */
     for (;;) {
 	int n;
-	fd_set rfsd;
-	struct timeval tv;
-
-	rfsd = rfs;  /* copy in the static set */
-	tv.tv_sec = 0;
-	tv.tv_usec = 100000;  /* check for timeouts every 100ms */
 
-	if ((n = select(FD_SETSIZE, &rfsd, 0, 0, &tv)) < 0) {
+	/* check for timeouts every 100ms */
+	if ((n = poll( pfs, numfs, 100 )) < 0) {
 	    if (errno == EINTR)
 		continue;
-	    error_errno("%s: select", __func__);
+	    error_errno("%s: poll", __func__);
 	}
 
 	if (n == 0) {
@@ -800,61 +864,63 @@
 	    continue;
 	}
 
-	PRINTF("%s: select got %d\n", __func__, n);
+	PRINTF("%s: poll got %d\n", __func__, n);
 
 	PRINTF("%s:  listeners are %d %d %d\n", __func__, listener[0],
 	  listener[1], listener[2]);
 	
 	/* my aggregate stdin */
-	if (aggregate[IN] >= 0 && FD_ISSET(aggregate[IN], &rfsd)) {
+	if (aggregate[IN] >= 0 && poll_isset(aggregate[IN], (POLLIN | POLLHUP))) {
 	    --n;
-	    FD_CLR(aggregate[IN], &rfsd);
-	    PRINTF("%s: input at aggregate[IN], %d select bits left\n",
+	    poll_clr(aggregate[IN], POLLIN);
+	    PRINTF("%s: input at aggregate[IN], %d poll sockets left\n",
 	      __func__, n);
 	    read_stdin();
 	}
 
 	/* new incoming connections */
 	for (i=0; n && i<3; i++) {
-	    if (listener[i] >= 0 && FD_ISSET(listener[i], &rfsd)) {
-		FD_CLR(listener[i], &rfsd);  /* so processes don't see it */
+	    if (listener[i] >= 0 && poll_isset(listener[i], (POLLIN | POLLHUP))) {
 		--n;
-		PRINTF("%s: input at listener[%d], %d select bits left\n",
+		poll_clr(listener[i], POLLIN);  /* so processes don't see it */
+		PRINTF("%s: input at listener[%d], %d poll sockets left\n",
 		  __func__, i, n);
 		accept_new_conn((fd_which_t)i);
 	    }
 	}
 
 	/* abort fd connection */
-	if (abort_fd >= 0 && FD_ISSET(abort_fd, &rfsd)) {
-	    FD_CLR(abort_fd, &rfsd);
+	if (abort_fd >= 0 && poll_isset(abort_fd, (POLLIN | POLLHUP))) {
 	    --n;
+	    poll_clr(abort_fd, POLLIN);
 	    accept_abort_conn();
 	}
 
 	/* PMI listener */
-	if (pmi_listen_fd >= 0 && FD_ISSET(pmi_listen_fd, &rfsd)) {
-	    FD_CLR(pmi_listen_fd, &rfsd);  /* before the function */
+	if (pmi_listen_fd >= 0 && poll_isset(pmi_listen_fd, (POLLIN | POLLHUP))) {
+	    poll_clr(pmi_listen_fd, POLLIN);  /* before the function */
 	    --n;
 	    accept_pmi_conn();
 	}
 
 	/* existing PMI connections */
 	for (i=0; i<numtask; i++) {
-	    if (pmi_fds && pmi_fds[i] >= 0 && FD_ISSET(pmi_fds[i], &rfsd)) {
-		FD_CLR(pmi_fds[i], &rfsd);
+	    if (pmi_fds && pmi_fds[i] >= 0 && poll_isset(pmi_fds[i], (POLLIN | POLLHUP))) {
+		poll_clr(pmi_fds[i], POLLIN);
 		--n;
 		handle_pmi(i);
 	    }
 	}
 
 	/* out,err from processes */
-	for (i=0; n && i<maxfd; i++) {
-	    if (FD_ISSET(i, &rfsd)) {
+	for (j=0; n && j < numfs; j++) {
+	    if (pfs[j].revents & (POLLIN | POLLHUP)) {
 		--n;
+		i = pfs[j].fd;
+
 		PRINTF("%s: output from process at %d type %s,"
-		  " %d select bits left\n", __func__, i,
-		  which_name(fds[i].which), n);
+		  " %d poll sockets left\n", __func__, i, which_name(fds[i].which), n);
+
 		if (fds[i].which == OUT || fds[i].which == ERR)
 		    readsome(i);
 		else if (fds[i].which == IN)
@@ -998,7 +1064,7 @@
     PRINTF("%s: parent says via sig %d to listen to abort fd %d\n",
       __func__, sig, abort_fd);
 
-    FD_SET(abort_fd, &rfs);
+    poll_set(abort_fd, POLLIN);
 }
 
 /*
@@ -1144,7 +1210,7 @@
     if (pmi_fds[rank] != -1)
 	error("%s: rank %d checked in twice", __func__, rank);
     pmi_fds[rank] = fd;
-    FD_SET(pmi_fds[rank], &rfs);
+    poll_set(pmi_fds[rank], POLLIN);
 
     if (cl_args->verbose)
 	printf("%s: rank %d checks in\n", __func__, rank);
@@ -1247,7 +1313,7 @@
 	if (pmi_fds[i] == -1) break;
     if (i == numtask) {
 	close(pmi_listen_fd);
-	FD_CLR(pmi_listen_fd, &rfs);
+	poll_del(pmi_listen_fd);
 	pmi_listen_fd = -1;
     }
 
@@ -1375,7 +1441,7 @@
 	    error("%s: in cmd=finalize, expecting 1 word total", __func__);
       finalize:
 	close(pmi_fds[rank]);
-	FD_CLR(pmi_fds[rank], &rfs);
+	poll_del(pmi_fds[rank]);
 	pmi_fds[rank] = -1;
 
     } else
diff -urd mpiexec-0.76/util.c mpiexec-0.76.poll/util.c
--- mpiexec-0.76/util.c	2004-04-20 01:01:39.000000000 +0800
+++ mpiexec-0.76.poll/util.c	2004-05-29 13:43:16.000000000 +0800
@@ -12,11 +12,14 @@
 #include <stdarg.h>
 #include <errno.h>
 #include <unistd.h>
+#include <sys/poll.h>
 
 /* pbse_to_txt found in pbs liblog.a */
 #include <pbs_error.h>
 #include "mpiexec.h"
 
+#define MPIEXEC_NBIO_TRIES	10
+
 /*
  * Set the program name, first statement of code usually.
  */
@@ -128,7 +131,7 @@
     va_start(ap, fmt);
     vfprintf(stderr, fmt, ap);
     va_end(ap);
-    fprintf(stderr, ": %s.\n", strerror(errno));
+    fprintf(stderr, ": %s.(%u)\n", strerror(errno), errno);
     try_kill_stdio();
     exit(1);
 }
@@ -295,6 +298,37 @@
     while (num > 0) {
 	i = read(fd, (char *)buf + offset, num);
 	if (i < 0)
+	{
+	    if( errno == EINTR )
+		continue;
+
+	    if( errno == EAGAIN )
+	    {
+		int tries;
+		struct pollfd pfd;
+
+		pfd.fd = fd;
+		pfd.events = POLLIN;
+
+		for( tries = 0; tries < MPIEXEC_NBIO_TRIES; tries++ )
+		{
+		    i = poll( &pfd, 1, 100*(tries+1) );
+		    if( i == 1 )
+			break;
+		    if( i == 0 )
+			continue;
+		    if( errno == EINTR )
+		    {
+			tries--;
+			continue;
+		    }	
+		    else
+			break;
+		}
+	    }
+
+	    if( i > 0 ) continue;
+
 	    /* ia64 needs %lu for size_t; x86 wants %u */
 #	    ifdef USE_FORMAT_SIZE_T
 		error_errno("%s: read %zu bytes", __func__, num);
@@ -302,6 +336,7 @@
 		error_errno("%s: read %lu bytes", __func__,
 		  (long unsigned int) num);
 #	    endif
+	}
 	if (i == 0)
 	    error("%s: EOF, only %d of %d bytes", __func__, offset, total);
 	num -= i;
@@ -321,7 +356,42 @@
     while (count > 0) {
 	cc = write(fd, (const char *)buf + ptr, count);
 	if (cc < 0)
+	{
+	    if( errno == EINTR )
+		continue;
+
+	    if( errno == EAGAIN )
+	    {
+		int tries, ret = 0;
+		struct pollfd pfd;
+
+		pfd.fd = fd;
+		pfd.events = POLLOUT;
+
+		for( tries = 0; tries < MPIEXEC_NBIO_TRIES; tries++ )
+		{
+		     ret = poll(&pfd, 1, 100 * (tries + 1));
+		     if( ret == 1 )
+			 break;
+		     if( ret == 0 )
+			 continue;
+
+		     if( errno == EINTR )
+		     {
+			    tries--;
+			    continue;
+		     }
+		     else
+			break;
+		}
+
+		if( ret > 0 ) 
+		    continue;
+		if( ret == 0 )
+		    fprintf( stderr, "%s: timed out after %i tries\n", __func__, MPIEXEC_NBIO_TRIES );
+	    }
 	    return cc;
+	}
 	count -= cc;
 	ptr += cc;
     }


More information about the mpiexec mailing list