mpiexec patch for very large jobs
Alex
korobka at nankai.edu.cn
Mon Sep 20 13:19:01 EDT 2004
Here it is, the previous one did not check for closed connections (poll() needs
an explicit POLLHUP flag) which, as far as I remember, led to problems with the
-allstdin option.
Alex Korobka
ÔÚÄúµÄÀ´ÐÅÖÐÔø¾Ìáµ½:
>From: Pete Wyckoff <pw at osc.edu>
>Reply-To:
>To: Alex <korobka at nankai.edu.cn>
>Subject: Re: mpiexec patch for very large jobs
>Date:Fri, 17 Sep 2004 13:50:40 -0400
>
>korobka at nankai.edu.cn wrote on Fri, 17 Sep 2004 11:19 +0800:
> > I have an update to this, it fixes a corner case. I will send it next week
> > after I get back to the office.
>
> Great. I haven't started integrating your previous patch yet, so this
> update from you will be quite timely.
>
> -- Pete
>
-------------- next part --------------
diff -urd mpiexec-0.76/start_tasks.c mpiexec-0.76.poll/start_tasks.c
--- mpiexec-0.76/start_tasks.c 2004-04-20 01:01:39.000000000 +0800
+++ mpiexec-0.76.poll/start_tasks.c 2004-05-29 13:43:16.000000000 +0800
@@ -16,6 +16,7 @@
#include <errno.h>
#include <pwd.h>
#include <sys/time.h>
+#include <sys/poll.h>
#include <signal.h>
#include <netinet/in.h>
#include <sys/socket.h>
@@ -130,7 +131,7 @@
static void
env_add_int(const char *name, int value)
{
- char s[1024];
+ char s[4096];
sprintf(s, "%d", value);
env_add(name, s);
@@ -744,7 +745,7 @@
if (getsockname(gmpi_fd[i], (struct sockaddr *) &sin, &len) < 0)
error_errno("%s: getsockname", __func__);
gmpi_port[i] = ntohs(sin.sin_port);
- if (listen(gmpi_fd[i], 1024) < 0)
+ if (listen(gmpi_fd[i], 4096) < 0)
error_errno("%s: listen", __func__);
}
}
@@ -771,7 +772,7 @@
read_gm_startup_ports(void)
{
int i;
- char s[64*1024]; /* max recv len */
+ char s[64*4096]; /* max recv len */
int flags;
gmpi_info = Malloc(numtask * sizeof(*gmpi_info));
@@ -903,7 +904,7 @@
growstr_append(g, "|||");
for (i=0; i<numtask; i++) {
- char s[64*1024]; /* max recv len */
+ char s[64*4096]; /* max recv len */
int id, j, cc, fd = 0;
growstr_t *h;
@@ -982,12 +983,12 @@
for (;;) {
tm_event_t evt;
int ret;
- struct timeval tv = { 0, 200000 };
- fd_set wfs;
+ struct pollfd pfs;
- FD_ZERO(&wfs);
- FD_SET(fd, &wfs);
- ret = select(fd+1, 0, &wfs, 0, &tv);
+ pfs.fd = fd;
+ pfs.events = POLLOUT;
+ pfs.revents = 0;
+ ret = poll( &pfs, 1, 200 );
if (ret == 1) {
int f, flen = sizeof(f);
ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &f, &flen);
@@ -1002,10 +1003,10 @@
break;
}
if (ret < 0)
- error_errno("%s: select waiting connect iter %d",
+ error_errno("%s: poll waiting connect iter %d",
__func__, i);
if (ret != 0)
- error("%s: really not expecting select to return %d",
+ error("%s: really not expecting poll to return %d",
__func__, ret);
/* check to see if any process died, and abort if so */
@@ -1036,10 +1037,11 @@
growstr_append(h, "]]]");
cc = write_full(fd, h->s, h->len);
if (cc < 0)
- error_errno("%s: write gmpi_port#2 iter %d", __func__, i);
+ error_errno("%s: write gmpi_port#2 iter %d, fd %d, task %s, size %i\n", __func__, i, fd, tasks[id].name, h->len );
close(fd);
growstr_free(h);
- }
+ } /* for numtasks */
+
/* do not close gmpi_fd[1], it has been passed to stdio listener */
/* why not? seems if he has a copy, we can always close it */
close(gmpi_fd[1]);
@@ -1070,7 +1072,7 @@
error_errno("%s: bind", __func__);
if (getsockname(mport_fd, (struct sockaddr *) &sin, &len) < 0)
error_errno("%s: getsockname", __func__);
- if (listen(mport_fd, 1024) < 0)
+ if (listen(mport_fd, 4096) < 0)
error_errno("%s: listen", __func__);
*fd = mport_fd;
return ntohs(sin.sin_port);
@@ -1275,7 +1277,7 @@
error_errno("%s: bind", __func__);
if (getsockname(*pmi_fd, (struct sockaddr *) &sin, &len) < 0)
error_errno("%s: getsockname", __func__);
- if (listen(*pmi_fd, 1024) < 0)
+ if (listen(*pmi_fd, 4096) < 0)
error_errno("%s: listen", __func__);
return ntohs(sin.sin_port);
}
diff -urd mpiexec-0.76/stdio.c mpiexec-0.76.poll/stdio.c
--- mpiexec-0.76/stdio.c 2004-04-20 01:01:39.000000000 +0800
+++ mpiexec-0.76.poll/stdio.c 2004-05-29 13:58:12.000000000 +0800
@@ -23,6 +23,7 @@
#include <signal.h> /* sun location */
#include <sys/signal.h> /* linux location */
#include <sys/socket.h>
+#include <sys/poll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "mpiexec.h"
@@ -81,7 +82,9 @@
static fd_state_t *fds;
/* global set of readables */
-static fd_set rfs;
+static struct pollfd *pfs;
+static int *pfsmap;
+static int numfs;
/* extra fd where MPI_Abort callers will connect in mpich-gm */
static int abort_fd_array[2];
@@ -144,6 +147,63 @@
}
}
+static void poll_set(int fd, short events)
+{
+ int n = pfsmap[fd];
+
+ if( n ) /* already in the set */
+ {
+ n--;
+ pfs[n].events |= events;
+ }
+ else
+ {
+ pfs[numfs].fd = fd;
+ pfs[numfs].events = events;
+ pfsmap[fd] = ++numfs;
+ }
+}
+
+static int poll_isset(int fd, short events)
+{
+ int n = pfsmap[fd];
+
+ if( n-- )
+ {
+ return (pfs[n].revents & events);
+ }
+ return 0;
+}
+
+static void poll_clr(int fd, short events)
+{
+ int n = pfsmap[fd];
+
+ if( n-- )
+ {
+ pfs[n].revents &= ~events;
+ }
+ else
+ printf("%s: stray fd: %i !!!\n", __func__, fd);
+}
+
+static void poll_del(int fd)
+{
+ int n = pfsmap[fd];
+
+ if( n-- )
+ {
+ int m = numfs - 1;
+
+ pfs[n] = pfs[m]; /* move last entry into the hole */
+ pfsmap[pfs[n].fd] = n + 1; /* update reverse mapping */
+ pfsmap[fd] = 0; /* zero out mapping for the removed fd */
+ numfs--;
+ }
+ else
+ printf("%s: stray fd: %i !!!\n", __func__, fd);
+}
+
/*
* Main entry point: fork a process to handle process streams.
* Assumes 0,1,2 are the actual stdin,stdout,stderr where input
@@ -203,6 +263,11 @@
error("stdio_fork: need %d sockets, only %d available in system",
n+6, maxfd);
+ pfs = Malloc( sizeof(struct pollfd) * maxfd );
+ pfsmap = Malloc( sizeof(int) * maxfd );
+ memset( pfsmap, 0, sizeof(int) * maxfd );
+ numfs = 0;
+
/*
* Must setup listener sockets before fork so valid port numbers
* can be handed back and the spawn can continue.
@@ -383,7 +448,7 @@
if ((s = socket(AF_INET, SOCK_STREAM, 0)) < 0)
error_errno("%s: socket", __func__);
- if (listen(s, 1024) < 0)
+ if (listen(s, 4096) < 0)
error_errno("%s: listen", __func__);
if (getsockname(s, (struct sockaddr *)&addr, &len) < 0)
error_errno("%s: getsockname", __func__);
@@ -392,7 +457,7 @@
}
/*
- * Close a socket and remove it from the select set. (Not for
+ * Close a socket and remove it from the polled set. (Not for
* listener sockets or aggregate fds.)
*/
static void
@@ -437,19 +502,19 @@
/* ignore this socket, but leave it linked onto the print queue, if it is */
fds[s].which = NONE;
- FD_CLR(s, &rfs);
+ poll_del(s);
}
/*
* Close a listener socket (or stdin file/socket).
- * Also remove it from rfs. n == 0,1,2.
+ * Also remove it from the polled set. n == 0,1,2.
*/
static void
close_listener(int n)
{
if (close(listener[n])) /* stop listening */
error_errno("%s: close %d at fd %d", __func__, n, listener[n]);
- FD_CLR(listener[n], &rfs);
+ poll_del(listener[n]);
listener[n] = -1;
#if 0
if (n == OUT || n == ERR) /* just the _real_ listeners, not my own stdin */
@@ -468,7 +533,7 @@
int i;
close(aggregate[IN]);
- FD_CLR(aggregate[IN], &rfs);
+ poll_del(aggregate[IN]);
aggregate[IN] = -1;
/* tell processes there is no more stdin */
@@ -641,7 +706,7 @@
}
/*
- * Listening socket has found a new one, as reported by select.
+ * Listening socket has found a new one, as reported by poll.
* Accept it and pay attention to it.
*/
static void
@@ -674,10 +739,10 @@
if (which == IN) {
if (aggregate[IN] >= 0)
/* begin to listen to the stdin of mpiexec itself */
- FD_SET(aggregate[IN], &rfs);
+ poll_set(aggregate[IN], POLLIN);
}
}
- FD_SET(t, &rfs);
+ poll_set(t, POLLIN);
}
/*
@@ -727,7 +792,7 @@
}
close(fd);
close(abort_fd);
- FD_CLR(abort_fd, &rfs);
+ poll_del(abort_fd);
abort_fd_used = 1;
/* let parent deal with it, he'll tell us to exit later */
kill(getppid(), SIGALRM);
@@ -740,7 +805,7 @@
static void ATTR_NORETURN
do_child(void)
{
- int i;
+ int i, j;
/* allocate socket storage */
fds = (fd_state_t *) Malloc(maxfd * sizeof(*fds));
@@ -752,32 +817,31 @@
memset(&fds[i].sin, 0, sizeof(fds[i].sin));
}
- /* initial select source */
- FD_ZERO(&rfs);
+ /* initialize poll structures */
+ memset((void*)pfs, 0, sizeof(struct pollfd) * maxfd );
+ memset((void*)pfsmap, 0, sizeof(int) * maxfd );
+ numfs = 0;
+
/* put in listeners */
for (i=0; i<3; i++)
if (listener[i] >= 0)
- FD_SET(listener[i], &rfs);
+ poll_set( listener[i], POLLIN );
if (pmi_listen_fd >= 0)
- FD_SET(pmi_listen_fd, &rfs);
- /* writeable sockets do not get selected on, just hope they can take it */
+ poll_set( pmi_listen_fd, POLLIN );
+
+ /* writeable sockets do not get polled on, just hope they can take it */
/*
- * Infinite blocking select-driven loop.
+ * Infinite blocking poll-driven loop.
*/
for (;;) {
int n;
- fd_set rfsd;
- struct timeval tv;
-
- rfsd = rfs; /* copy in the static set */
- tv.tv_sec = 0;
- tv.tv_usec = 100000; /* check for timeouts every 100ms */
- if ((n = select(FD_SETSIZE, &rfsd, 0, 0, &tv)) < 0) {
+ /* check for timeouts every 100ms */
+ if ((n = poll( pfs, numfs, 100 )) < 0) {
if (errno == EINTR)
continue;
- error_errno("%s: select", __func__);
+ error_errno("%s: poll", __func__);
}
if (n == 0) {
@@ -800,61 +864,63 @@
continue;
}
- PRINTF("%s: select got %d\n", __func__, n);
+ PRINTF("%s: poll got %d\n", __func__, n);
PRINTF("%s: listeners are %d %d %d\n", __func__, listener[0],
listener[1], listener[2]);
/* my aggregate stdin */
- if (aggregate[IN] >= 0 && FD_ISSET(aggregate[IN], &rfsd)) {
+ if (aggregate[IN] >= 0 && poll_isset(aggregate[IN], (POLLIN | POLLHUP))) {
--n;
- FD_CLR(aggregate[IN], &rfsd);
- PRINTF("%s: input at aggregate[IN], %d select bits left\n",
+ poll_clr(aggregate[IN], POLLIN);
+ PRINTF("%s: input at aggregate[IN], %d poll sockets left\n",
__func__, n);
read_stdin();
}
/* new incoming connections */
for (i=0; n && i<3; i++) {
- if (listener[i] >= 0 && FD_ISSET(listener[i], &rfsd)) {
- FD_CLR(listener[i], &rfsd); /* so processes don't see it */
+ if (listener[i] >= 0 && poll_isset(listener[i], (POLLIN | POLLHUP))) {
--n;
- PRINTF("%s: input at listener[%d], %d select bits left\n",
+ poll_clr(listener[i], POLLIN); /* so processes don't see it */
+ PRINTF("%s: input at listener[%d], %d poll sockets left\n",
__func__, i, n);
accept_new_conn((fd_which_t)i);
}
}
/* abort fd connection */
- if (abort_fd >= 0 && FD_ISSET(abort_fd, &rfsd)) {
- FD_CLR(abort_fd, &rfsd);
+ if (abort_fd >= 0 && poll_isset(abort_fd, (POLLIN | POLLHUP))) {
--n;
+ poll_clr(abort_fd, POLLIN);
accept_abort_conn();
}
/* PMI listener */
- if (pmi_listen_fd >= 0 && FD_ISSET(pmi_listen_fd, &rfsd)) {
- FD_CLR(pmi_listen_fd, &rfsd); /* before the function */
+ if (pmi_listen_fd >= 0 && poll_isset(pmi_listen_fd, (POLLIN | POLLHUP))) {
+ poll_clr(pmi_listen_fd, POLLIN); /* before the function */
--n;
accept_pmi_conn();
}
/* existing PMI connections */
for (i=0; i<numtask; i++) {
- if (pmi_fds && pmi_fds[i] >= 0 && FD_ISSET(pmi_fds[i], &rfsd)) {
- FD_CLR(pmi_fds[i], &rfsd);
+ if (pmi_fds && pmi_fds[i] >= 0 && poll_isset(pmi_fds[i], (POLLIN | POLLHUP))) {
+ poll_clr(pmi_fds[i], POLLIN);
--n;
handle_pmi(i);
}
}
/* out,err from processes */
- for (i=0; n && i<maxfd; i++) {
- if (FD_ISSET(i, &rfsd)) {
+ for (j=0; n && j < numfs; j++) {
+ if (pfs[j].revents & (POLLIN | POLLHUP)) {
--n;
+ i = pfs[j].fd;
+
PRINTF("%s: output from process at %d type %s,"
- " %d select bits left\n", __func__, i,
- which_name(fds[i].which), n);
+ " %d poll sockets left\n", __func__, i, which_name(fds[i].which), n);
+
if (fds[i].which == OUT || fds[i].which == ERR)
readsome(i);
else if (fds[i].which == IN)
@@ -998,7 +1064,7 @@
PRINTF("%s: parent says via sig %d to listen to abort fd %d\n",
__func__, sig, abort_fd);
- FD_SET(abort_fd, &rfs);
+ poll_set(abort_fd, POLLIN);
}
/*
@@ -1144,7 +1210,7 @@
if (pmi_fds[rank] != -1)
error("%s: rank %d checked in twice", __func__, rank);
pmi_fds[rank] = fd;
- FD_SET(pmi_fds[rank], &rfs);
+ poll_set(pmi_fds[rank], POLLIN);
if (cl_args->verbose)
printf("%s: rank %d checks in\n", __func__, rank);
@@ -1247,7 +1313,7 @@
if (pmi_fds[i] == -1) break;
if (i == numtask) {
close(pmi_listen_fd);
- FD_CLR(pmi_listen_fd, &rfs);
+ poll_del(pmi_listen_fd);
pmi_listen_fd = -1;
}
@@ -1375,7 +1441,7 @@
error("%s: in cmd=finalize, expecting 1 word total", __func__);
finalize:
close(pmi_fds[rank]);
- FD_CLR(pmi_fds[rank], &rfs);
+ poll_del(pmi_fds[rank]);
pmi_fds[rank] = -1;
} else
diff -urd mpiexec-0.76/util.c mpiexec-0.76.poll/util.c
--- mpiexec-0.76/util.c 2004-04-20 01:01:39.000000000 +0800
+++ mpiexec-0.76.poll/util.c 2004-05-29 13:43:16.000000000 +0800
@@ -12,11 +12,14 @@
#include <stdarg.h>
#include <errno.h>
#include <unistd.h>
+#include <sys/poll.h>
/* pbse_to_txt found in pbs liblog.a */
#include <pbs_error.h>
#include "mpiexec.h"
+#define MPIEXEC_NBIO_TRIES 10
+
/*
* Set the program name, first statement of code usually.
*/
@@ -128,7 +131,7 @@
va_start(ap, fmt);
vfprintf(stderr, fmt, ap);
va_end(ap);
- fprintf(stderr, ": %s.\n", strerror(errno));
+ fprintf(stderr, ": %s.(%u)\n", strerror(errno), errno);
try_kill_stdio();
exit(1);
}
@@ -295,6 +298,37 @@
while (num > 0) {
i = read(fd, (char *)buf + offset, num);
if (i < 0)
+ {
+ if( errno == EINTR )
+ continue;
+
+ if( errno == EAGAIN )
+ {
+ int tries;
+ struct pollfd pfd;
+
+ pfd.fd = fd;
+ pfd.events = POLLIN;
+
+ for( tries = 0; tries < MPIEXEC_NBIO_TRIES; tries++ )
+ {
+ i = poll( &pfd, 1, 100*(tries+1) );
+ if( i == 1 )
+ break;
+ if( i == 0 )
+ continue;
+ if( errno == EINTR )
+ {
+ tries--;
+ continue;
+ }
+ else
+ break;
+ }
+ }
+
+ if( i > 0 ) continue;
+
/* ia64 needs %lu for size_t; x86 wants %u */
# ifdef USE_FORMAT_SIZE_T
error_errno("%s: read %zu bytes", __func__, num);
@@ -302,6 +336,7 @@
error_errno("%s: read %lu bytes", __func__,
(long unsigned int) num);
# endif
+ }
if (i == 0)
error("%s: EOF, only %d of %d bytes", __func__, offset, total);
num -= i;
@@ -321,7 +356,42 @@
while (count > 0) {
cc = write(fd, (const char *)buf + ptr, count);
if (cc < 0)
+ {
+ if( errno == EINTR )
+ continue;
+
+ if( errno == EAGAIN )
+ {
+ int tries, ret = 0;
+ struct pollfd pfd;
+
+ pfd.fd = fd;
+ pfd.events = POLLOUT;
+
+ for( tries = 0; tries < MPIEXEC_NBIO_TRIES; tries++ )
+ {
+ ret = poll(&pfd, 1, 100 * (tries + 1));
+ if( ret == 1 )
+ break;
+ if( ret == 0 )
+ continue;
+
+ if( errno == EINTR )
+ {
+ tries--;
+ continue;
+ }
+ else
+ break;
+ }
+
+ if( ret > 0 )
+ continue;
+ if( ret == 0 )
+ fprintf( stderr, "%s: timed out after %i tries\n", __func__, MPIEXEC_NBIO_TRIES );
+ }
return cc;
+ }
count -= cc;
ptr += cc;
}
More information about the mpiexec
mailing list