[moody20@llnl.gov: Re: Startup-Protocol of MVAPICH-1.0]

Frank Mietke frank.mietke at informatik.tu-chemnitz.de
Wed Apr 2 11:46:33 EDT 2008


Hi,

> > I've done some patching work to integrate latest startup protocol of MVAPICH-1.0
> > into mpiexec code. They changed the startup mechanism completely into some
> > kind of opcode based protocol and use some self-written collective operations.
> > It seems that this new approach is more scalable than the older ones.
> 
> Great news that you have done this work.  I'm sure many will
> appreciate it.
here the patch for MVAPICH-1.0 final.

> 
> > It was only little effort to patch but I would completely reuse some files from
> > MVAPICH-1.0 directly where the collective stuff was implemented. Therefore I asked 
> > the author of these files about licensing. I could not speak on behalf of you. 
> 
> You might give a thought to how we can make sure that mpiexec
> continues to run all versions of mvapich in the future, as they
> change the contents of their copy of the startup protocol file.  I
> haven't looked to see if it is versioned, or if there is some other
> way to detect and adapt to changes.  Maybe you could suggest
> improvements to the mvapich guys if there is something we need.
If they would freeze this interface it would be great. This interface is
abstract and if there is the need to exchange new or more information they can do
this without changing the adopted files. The version variable in ib.c is the
version of this collective interface in MVAPICH-1.0. So, if there is a change in
this abstract interface then they change this version variable as well. But
these changes should be rare. I think with this interface they made it more
maintainable for us.

> 
> > PMGR_COLLECTIVE software.  Since MVAPICH is released under BSD, I'm looking 
> > at going with that.  I'm not familiar with GPLv2.  If I assign BSD 
> > licensing to PMGR_COLLECTIVE, would that be ok for you?
> 
> My naive understanding of licensing is that it is acceptable for me
> to continue to release mpiexec as GPLv2 while it includes a
> BSD-licensed source file.  This is the idea of "compatibility" that
> the gnu people talk about, and they list the FreeBSD license as
> being compatible here: http://www.gnu.org/licenses/license-list.html .
> I'm assuming that's the license you mean, and not the old
> BSD-with-advertising clause.  What's in LICENSE.TXT in older mvapich
> looks fine.  Frank can just glue that to the top of the source file.
Adam suggested to use the LICENSE.TXT from MVAPICH. I've integrated it.

Best Regards,
Frank


> 
> Thanks all.
> 
> 		-- Pete
> 

-- 
Dipl.-Inf. Frank Mietke     |     Fakultätsrechen- und Informationszentrum
Tel.: 0371 - 531 - 35538    |     Fak. für Informatik
Fax:  0371 - 531 8 35538    |     TU-Chemnitz
Key-ID: 60F59599            |     frank.mietke at informatik.tu-chemnitz.de
-------------- next part --------------
diff -Nru mpiexec/ib.c mpiexec_mv1.0/ib.c
--- mpiexec/ib.c	2008-03-31 17:26:25.000000000 +0200
+++ mpiexec_mv1.0/ib.c	2008-04-02 16:07:32.000000000 +0200
@@ -16,6 +16,7 @@
 #include <sys/socket.h>
 #include <sys/time.h>
 #include "mpiexec.h"
+#include "pmgr_collective_mpirun.h"
 
 #ifdef HAVE_POLL
 #  include <sys/poll.h>
@@ -190,6 +191,12 @@
  *    pids[]     # <pidlen> bytes
  *  Write back personalized out_addrs[] and full pids[].
  *
+ *  Version 8:
+ *    Completely rewritten opcode based hand-shake protocol
+ *    using self-written collective operations to improve startup.
+ *    Only version of protocol and rank of processes are received
+ *    in the old way. Everything else is handled in pmgr_collective*
+ *
  * Return negative on error, or new rank number for success.
  */
 static int read_ib_one(int fd)
@@ -220,9 +227,9 @@
 	if (read_full_ret(fd, &rank, sizeof(int)) != sizeof(int))
 	    goto out;
     }
-
-    if (read_full_ret(fd, &addrlen, sizeof(int)) != sizeof(int))
-	goto out;
+    if (testvers < 8)
+	    if (read_full_ret(fd, &addrlen, sizeof(int)) != sizeof(int))
+		    goto out;
 
     non_versioned_092 = 0;
     if (rank == 32 + numtasks * 8) {
@@ -254,20 +261,25 @@
 	version = testvers;
 	version_as_read = testvers;
 	if (!(version == 1 || version == 2 || version == 3 || version == 5 ||
-	      version == 6)) {
-	    warning("%s: protocol version %d not known, but might still work",
-	            __func__, version);
-	    version = 6;  /* guess the latest still works */
+	      version == 6 || version == 8)) {
+	    warning(
+	      "%s: protocol version %d not known, but might still work",
+	      __func__, version);
+	    version = 8;  /* guess the latest still works */
 	}
 	debug(1, "%s: version %d startup%s", __func__, version,
-	      non_versioned_092 ? " (unversioned)" : "");
+	  non_versioned_092 ? " (unversioned)" : "");
     } else {
 	if (version_as_read != testvers)
 	    error("%s: mixed version executables (%d and %d), no hope",
-	          __func__, version_as_read, testvers);
+	      __func__, version_as_read, testvers);
     }
     if (rank < 0 || rank >= numtasks)
 	error("%s: rank %d out of bounds [0..%d)", __func__, rank, numtasks);
+    if (version == 8) {
+	    ret = rank;
+	    goto out;
+    }
 
     if (!address) {
 	/*
@@ -483,9 +495,16 @@
 	    }
 	    free(pids);
 	} else
-	    error("%s: programmer error, unknown version 5 phase %d", __func__,
-		  phase);
-    } else
+	    error("%s: programmer error, unknown version %d phase %d", __func__,
+		  version, phase);
+    } 
+    /* For version 8 of MVAPICH startup protocol call pmgr_processops()
+       to handle the remaining communication with MPI processes.
+    */
+    else if (version == 8)
+	    ret = pmgr_processops(fds, numtasks);
+	    
+    else
 	error("%s: programmer error, unknown version %d", __func__, version);
 
     /*
@@ -499,6 +518,9 @@
     close(mport_fd);
     stdio_msg_parent_say_abort_fd(0);
 
+    if (version == 8)
+	    goto out;
+
     /*
      * Finally, implement a simple barrier.  Use a select loop to avoid
      * hanging on a sequential read from #0 which is always quite busy and
diff -Nru mpiexec/LICENSE_MVAPICH mpiexec_mv1.0/LICENSE_MVAPICH
--- mpiexec/LICENSE_MVAPICH	1970-01-01 01:00:00.000000000 +0100
+++ mpiexec_mv1.0/LICENSE_MVAPICH	2008-01-25 21:09:22.000000000 +0100
@@ -0,0 +1,37 @@
+MVAPICH
+
+Copyright 2002-2008 The Ohio State University.
+Portions Copyright 1999-2002 The Regents of the University of
+California, through Lawrence Berkeley National Laboratory (subject to
+receipt of any required approvals from U.S. Dept. of Energy).
+Portions copyright 1993 University of Chicago.
+Portions copyright 1993 Mississippi State University.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are
+met:
+
+(1) Redistributions of source code must retain the above copyright
+notice, this list of conditions and the following disclaimer.
+
+(2) Redistributions in binary form must reproduce the above copyright
+notice, this list of conditions and the following disclaimer in the
+documentation and/or other materials provided with the distribution.
+
+(3) Neither the name of The Ohio State University, the University of
+California, Lawrence Berkeley National Laboratory, The University of
+Chicago, Argonne National Laboratory, U.S. Dept. of Energy nor the
+names of their contributors may be used to endorse or promote products
+derived from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
diff -Nru mpiexec/Makefile.in mpiexec_mv1.0/Makefile.in
--- mpiexec/Makefile.in	2008-03-31 17:27:13.000000000 +0200
+++ mpiexec_mv1.0/Makefile.in	2008-04-02 15:30:18.000000000 +0200
@@ -7,8 +7,8 @@
 #
 SRC   = mpiexec.c get_hosts.c start_tasks.c task.c event.c util.c config.c \
 	stdio.c growstr.c pmi.c gm.c ib.c psm.c p4.c rai.c concurrent.c \
-	exedist.c spawn.c tv_attach.c
-H     = mpiexec.h util.h growstr.h list.h tv_attach.h
+	exedist.c spawn.c tv_attach.c pmgr_collective_mpirun.c pmgr_collective_common.c
+H     = mpiexec.h util.h growstr.h list.h tv_attach.h pmgr_collective_mpirun.h pmgr_collective_common.h
 OTHER = ChangeLog LICENSE README mpiexec.1 proc-relations.fig \
 	hello.c hellof.f hellomp.f redir-helper.c \
 	runtests.pl README.lam
diff -Nru mpiexec/pmgr_collective_common.c mpiexec_mv1.0/pmgr_collective_common.c
--- mpiexec/pmgr_collective_common.c	1970-01-01 01:00:00.000000000 +0100
+++ mpiexec_mv1.0/pmgr_collective_common.c	2008-04-02 15:39:53.000000000 +0200
@@ -0,0 +1,168 @@
+/*
+ * PMGR_COLLECTIVE ============================================================
+ * This protocol enables MPI to bootstrap itself through a series of collective
+ * operations.  The collective operations are modeled after MPI collectives --
+ * all tasks must call them in the same order and with consistent parameters.
+ *
+ * MPI may invoke any number of collectives, in any order, passing an arbitrary
+ * amount of data.  All message sizes are specified in bytes.
+ * PMGR_COLLECTIVE ============================================================
+ *
+ * This file provides common implementations for
+ *   pmgr_collective_mpirun - the interface used by mpirun
+ *   pmgr_collective_client - the interface used by the MPI tasks
+ *
+ * Copyright (C) 2007 The Regents of the University of California.
+ * Produced at Lawrence Livermore National Laboratory.
+ * Author: Adam Moody <moody20 at llnl.gov>
+ *
+ * Distributed under the BSD-style license (See LICENSE_MVAPICH) 
+*/
+
+#include <stdarg.h>
+#include <unistd.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include "pmgr_collective_common.h"
+
+/*
+   my rank
+   -3     ==> unitialized task (may be mpirun or MPI task)
+   -2     ==> mpirun
+   -1     ==> MPI task before rank is assigned
+   0..N-1 ==> MPI task
+*/
+int pmgr_me = -3;
+
+int pmgr_echo_debug = 0;
+
+/* Return the number of secs as a double between two timeval structs (tv2-tv1) */
+double pmgr_getsecs(struct timeval* tv2, struct timeval* tv1)
+{
+        struct timeval result;
+        timersub(tv2, tv1, &result);
+        return (double) result.tv_sec + (double) result.tv_usec / 1000000.0;
+}
+
+/* Fills in timeval via gettimeofday */
+void pmgr_gettimeofday(struct timeval* tv)
+{
+        if (gettimeofday(tv, NULL) < 0) {
+                pmgr_error("getting time (gettimeofday() %m errno=%d)",
+                        errno);
+        }
+}
+
+/* Reads environment variable, bails if not set */
+char* pmgr_getenv(const char* envvar, int type)
+{
+    char* str = getenv(envvar);
+    if (str == NULL && type == ENV_REQUIRED) {
+        pmgr_error("Missing required environment variable: %s", envvar);
+        exit(1);
+    }
+    return str;
+}
+
+/* malloc n bytes, and bail out with error msg if fails */
+void* pmgr_malloc(size_t n, const char* msg)
+{
+    void* p = malloc(n);
+    if (!p) {
+        pmgr_error("malloc(%d) failed: %s (errno %d)", n, msg, errno);
+        exit(1);
+    }
+    return p;
+}
+
+/* print message to stderr */
+void pmgr_error(const char *fmt, ...)
+{
+    va_list argp;
+    fprintf(stderr, "PMGR_COLLECTIVE ERROR: ");
+    if (pmgr_me >= 0) {
+        fprintf(stderr, "%d: ", pmgr_me);
+    } else if (pmgr_me == -2) {
+        fprintf(stderr, "mpirun: ");
+    } else if (pmgr_me == -1) {
+        fprintf(stderr, "unitialized MPI task: ");
+    } else {
+        fprintf(stderr, "unitialized task (mpirun or MPI): ");
+    }
+    va_start(argp, fmt);
+    vfprintf(stderr, fmt, argp);
+    va_end(argp);
+    fprintf(stderr, "\n");
+}
+
+/* print message to stderr */
+void pmgr_debug(int level, const char *fmt, ...)
+{
+    va_list argp;
+    if (pmgr_echo_debug > 0 && pmgr_echo_debug >= level) {
+        fprintf(stderr, "PMGR_COLLECTIVE DEBUG: ");
+        if (pmgr_me >= 0) {
+            fprintf(stderr, "%d: ", pmgr_me);
+        } else if (pmgr_me == -2) {
+            fprintf(stderr, "mpirun: ");
+        } else if (pmgr_me == -1) {
+            fprintf(stderr, "unitialized MPI task: ");
+        } else {
+            fprintf(stderr, "unitialized task (mpirun or MPI): ");
+        }
+        va_start(argp, fmt);
+        vfprintf(stderr, fmt, argp);
+        va_end(argp);
+        fprintf(stderr, "\n");
+    }
+}
+
+/* write size bytes from buf into fd, retry if necessary */
+int pmgr_write_fd(int fd, void* buf, int size)
+{
+    int rc;
+    int n = 0;
+    char* offset = (char*) buf;
+
+    while (n < size) {
+	rc = write(fd, offset, size - n);
+
+	if (rc < 0) {
+	    if(errno == EINTR || errno == EAGAIN) { continue; }
+	    return rc;
+	} else if(rc == 0) {
+	    return n;
+	}
+
+	offset += rc;
+	n += rc;
+    }
+
+    return n;
+}
+
+/* read size bytes into buf from fd, retry if necessary */
+int pmgr_read_fd(int fd, void* buf, int size)
+{
+    int rc;
+    int n = 0;
+    char* offset = (char*) buf;
+
+    while (n < size) {
+	rc = read(fd, offset, size - n);
+
+	if (rc < 0) {
+	    if(errno == EINTR || errno == EAGAIN) { continue; }
+	    return rc;
+	} else if(rc == 0) {
+	    return n;
+	}
+
+	offset += rc;
+	n += rc;
+    }
+
+    return n;
+}
diff -Nru mpiexec/pmgr_collective_common.h mpiexec_mv1.0/pmgr_collective_common.h
--- mpiexec/pmgr_collective_common.h	1970-01-01 01:00:00.000000000 +0100
+++ mpiexec_mv1.0/pmgr_collective_common.h	2008-04-02 15:39:39.000000000 +0200
@@ -0,0 +1,80 @@
+/*
+ * PMGR_COLLECTIVE ============================================================
+ * This protocol enables MPI to bootstrap itself through a series of collective
+ * operations.  The collective operations are modeled after MPI collectives --
+ * all tasks must call them in the same order and with consistent parameters.
+ *
+ * MPI may invoke any number of collectives, in any order, passing an arbitrary
+ * amount of data.  All message sizes are specified in bytes.
+ * PMGR_COLLECTIVE ============================================================
+ *
+ * This file provides common definitions for
+ *   pmgr_collective_mpirun - the interface used by mpirun
+ *   pmgr_collective_client - the interface used by the MPI tasks
+ *
+ * Copyright (C) 2007 The Regents of the University of California.
+ * Produced at Lawrence Livermore National Laboratory.
+ * Author: Adam Moody <moody20 at llnl.gov>
+ *
+ * Distributed under the BSD-style license (See LICENSE_MVAPICH)
+*/
+
+#ifndef _PMGR_COLLECTIVE_COMMON_H
+#define _PMGR_COLLECTIVE_COMMON_H
+
+/* PMGR_VERSION for pmgr_collective is PMGR_COLLECTIVE (== 8) */
+#define PMGR_COLLECTIVE 8
+
+#define PMGR_SUCCESS 0
+
+#define PMGR_OPEN      0
+#define PMGR_CLOSE     1
+#define PMGR_ABORT     2
+#define PMGR_BARRIER   3
+#define PMGR_BCAST     4
+#define PMGR_GATHER    5
+#define PMGR_SCATTER   6
+#define PMGR_ALLGATHER 7
+#define PMGR_ALLTOALL  8
+
+/*
+   my rank
+   -3     ==> unitialized task (may be mpirun or MPI task)
+   -2     ==> mpirun
+   -1     ==> MPI task before rank is assigned
+   0..N-1 ==> MPI task
+*/
+extern int pmgr_me;
+
+extern int pmgr_echo_debug;
+
+/* Return the number of secs as a double between two timeval structs (tv2-tv1) */
+double pmgr_getsecs(struct timeval* tv2, struct timeval* tv1);
+
+/* Fills in timeval via gettimeofday */
+void pmgr_gettimeofday(struct timeval* tv);
+
+/* Reads environment variable, bails if not set */
+#define ENV_REQUIRED 0
+#define ENV_OPTIONAL 1
+char* pmgr_getenv(const char* envvar, int type);
+
+/* malloc n bytes, and bail out with error msg if fails */
+void* pmgr_malloc(size_t n, const char* msg);
+
+/* macro to free the pointer if set, then set it to NULL */
+#define pmgr_free(p) { if(p) { free((void*)p); p=NULL; } }
+
+/* print message to stderr */
+void pmgr_error(const char *fmt, ...);
+
+/* print message to stderr */
+void pmgr_debug(int leve, const char *fmt, ...);
+
+/* write size bytes from buf into fd, retry if necessary */
+int pmgr_write_fd(int fd, void* buf, int size);
+
+/* read size bytes into buf from fd, retry if necessary */
+int pmgr_read_fd (int fd, void* buf, int size);
+
+#endif /* _PMGR_COLLECTIVE_COMMON_H */
diff -Nru mpiexec/pmgr_collective_mpirun.c mpiexec_mv1.0/pmgr_collective_mpirun.c
--- mpiexec/pmgr_collective_mpirun.c	1970-01-01 01:00:00.000000000 +0100
+++ mpiexec_mv1.0/pmgr_collective_mpirun.c	2008-04-02 15:29:31.000000000 +0200
@@ -0,0 +1,338 @@
+/*
+ * PMGR_COLLECTIVE ============================================================
+ * This protocol enables MPI to bootstrap itself through a series of collective
+ * operations.  The collective operations are modeled after MPI collectives --
+ * all tasks must call them in the same order and with consistent parameters.
+ *
+ * MPI may invoke any number of collectives, in any order, passing an arbitrary
+ * amount of data.  All message sizes are specified in bytes.
+ * PMGR_COLLECTIVE ============================================================
+ *
+ * This file implements the interface used by mpirun.  The mpirun process should call
+ * pmgr_processops after accepting connections from the MPI tasks and negotiating
+ * the protocol version number (PMGR_COLLECTIVE uses protocol 8).
+ *
+ * It should provide an array of open socket file descriptors indexed by MPI rank
+ * (fds) along with the number of MPI tasks (nprocs) as arguments.
+ *
+ * pmgr_processops will handle all PMGR_COLLECTIVE operations and return control
+ * upon an error or after receiving PMGR_CLOSE from the MPI tasks.  If no errors
+ * are encountered, it will close all socket file descriptors before returning.
+ *
+ * Copyright (C) 2007 The Regents of the University of California.
+ * Produced at Lawrence Livermore National Laboratory.
+ * Author: Adam Moody <moody20 at llnl.gov>
+ *
+ * Distributed under the BSD-style license (See LICENSE_MVAPICH)
+*/
+
+#include <stdio.h>
+#include <unistd.h>
+#include <string.h>
+#include <stdlib.h>
+#include <errno.h>
+#include <sys/time.h>
+#include "pmgr_collective_mpirun.h"
+
+/*
+ * We need to abort on pmgr errors
+ */
+//extern void cleanup(void);
+int* fd_by_rank;
+int  N;
+
+/* Write size bytes from buf into socket for rank */
+void pmgr_send(void* buf, int size, int rank)
+{
+	int fd = fd_by_rank[rank];
+	if (pmgr_write_fd(fd, buf, size) < 0) {
+		pmgr_error("writing to rank %d (write() %m errno=%d) @ file %s:%d",
+			rank, errno, __FILE__, __LINE__);
+	}
+}
+
+/* Read size bytes from socket for rank into buf */
+void pmgr_recv(void* buf, int size, int rank)
+{
+	int fd = fd_by_rank[rank];
+	if (pmgr_read_fd(fd, buf, size) <= 0) {
+		pmgr_error("reading from rank %d (read() %m errno=%d) @ file %s:%d",
+			rank, errno, __FILE__, __LINE__);
+	}
+}
+
+/* Read an integer from socket for rank */
+int pmgr_recv_int(int rank)
+{
+	int buf;
+	pmgr_recv(&buf, sizeof(buf), rank);
+	return buf;
+}
+
+/* Scatter data in buf to ranks using chunks of size bytes */
+void pmgr_scatterbcast(void* buf, int size)
+{
+	int i;
+	for (i = 0; i < N; i++) {
+		pmgr_send((char *)buf + i*size, size, i);
+	}
+}
+
+/* Broadcast buf, which is size bytes big, to each rank */
+void pmgr_allgatherbcast(void* buf, int size)
+{
+	int i;
+	for (i = 0; i < N; i++) {
+		pmgr_send(buf, size, i);
+	}
+}
+
+/* Perform alltoall using data in buf with elements of size bytes */
+void pmgr_alltoallbcast(void* buf, int size)
+{
+	int pbufsize = size * N;
+	void* pbuf = (void*) pmgr_malloc(pbufsize, "Temporary buffer for alltoall");	
+
+	int i, src;
+	for (i = 0; i < N; i++) {
+		for (src = 0; src < N; src++) {
+			memcpy( (char *)pbuf + size*src,
+				(char *)buf  + size*(src*N + i),
+				size );
+		}
+		pmgr_send(pbuf, pbufsize, i);
+	}
+	
+	pmgr_free(pbuf);
+}
+
+/* Check that new == curr value if curr has been initialized (-1 == uninitialized) */
+int set_current(int curr, int new)
+{
+	if (curr == -1) { curr = new; }
+	if (new != curr) {
+		pmgr_error("unexpected value: received %d, expecting %d @ file %s:%d",
+			   new, curr, __FILE__, __LINE__);
+	}
+	return curr;
+}
+
+/*
+ * pmgr_processops
+ * This function carries out pmgr_collective operations to bootstrap MPI.
+ * These collective operations are modeled after MPI collectives -- all tasks
+ * must call them in the same order and with consistent parameters.
+ *
+ * fds - integer array of open sockets (file descriptors)
+ *       indexed by MPI rank
+ * nprocs - number of MPI tasks in job
+ *
+ * returns PMGR_SUCCESS on success
+ * If no errors are encountered, all sockets are closed before returning.
+ *
+ * Until a 'CLOSE' or 'ABORT' message is seen, we continuously loop processing ops
+ *   For each op, we read one packet from each rank (socket)
+ *     A packet consists of an integer OP CODE, followed by variable length data
+ *     depending on the operation
+ *   After reading a packet from each rank, mpirun completes the operation by broadcasting
+ *   data back to any destinations, depending on the operation being performed
+ *
+ * Note: Although there are op codes available for PMGR_OPEN and PMGR_ABORT, neither
+ * is fully implemented and should not be used.
+ *
+ * This function assumes there is a set of open sockets (file descriptors) to each MPI
+ * task which can be indexed by MPI rank.
+ *
+ * Packet structures:
+ *   N    ==> Number of MPI tasks
+ *   From ==> data mpirun receives from each MPI task
+ *   To   ==> data mpirun sends to each MPI task
+ *   NULL ==> no data sent sent / recevied
+ * 
+ *   Message sizes are always in bytes and give number
+ *   of bytes in vector for each process, not necessarily
+ *   the total number of bytes included in the packet.
+ *   The message size and collective operation together
+ *   imply the total number of bytes in the packet.
+ *
+ * PMGR_OPEN: (not used)
+ *   From: <int opcode == 0, int rank>
+ *   To:   NULL
+ * PMGR_CLOSE:
+ *   From: <int opcode == 1>
+ *   To:   NULL
+ * PMGR_ABORT: (not used)
+ *   From: <int opcode == 2, int errcode>
+ *   To:   NULL
+ * PMGR_BARRIER:
+ *   From: <int opcode == 3>
+ *   To:   <int opcode == 3>
+ * PMGR_BCAST:
+ *   From (root):     <int opcode == 4, int root, int msg_size, msg_size msg>
+ *   From (non-root): <int opcode == 4, int root, int msg_size>
+ *   To:   <msg_size msg>
+ * PMGR_GATHER:
+ *   From: <int opcode == 5, int root, int msg_size, msg_size msg>
+ *   To (root):     <msg_size*N msg>
+ *   To (non-root): NULL
+ * PMGR_SCATTER:
+ *   From (root):     <int opcode == 6, int root, int msg_size, msg_size*N msg>
+ *   From (non-root): <int opcode == 6, int root, int msg_size>
+ *   To:   <msg_size msg>
+ * PMGR_ALLGATHER:
+ *   From: <int opcode == 7, int msg_size, msg_size msg>
+ *   To:   <msg_size*N msg>
+ * PMGR_ALLTOALL:
+ *   From: <int opcode == 8, int msg_size, msg_size*N msg>
+ *   To:   <msg_size*N msg>
+*/
+int pmgr_processops(int* fds, int nprocs)
+{
+  pmgr_me = -2;
+  pmgr_echo_debug = 0;
+  char* value;
+  fd_by_rank = fds;
+  N = nprocs;
+
+  struct timeval time_start, time_end, time_startop, time_endop;
+  pmgr_gettimeofday(&time_start);
+
+  if ((value = pmgr_getenv("MPIRUN_DEBUG", ENV_OPTIONAL)) != NULL) {
+    pmgr_echo_debug = atoi(value);
+  }
+
+  pmgr_debug(1, "Processing PMGR opcodes");
+
+  /* Until a 'CLOSE' or 'ABORT' message is seen, we continuously loop processing ops */
+  int exit = 0;
+  while (!exit) {
+	int opcode = -1;
+	int root   = -1;
+	int size   = -1;
+	void* buf = NULL;
+        pmgr_gettimeofday(&time_startop);
+
+	/* for each process, read in one packet (opcode and its associated data) */
+	int i;
+	for (i = 0; i < N; i++) {
+		/* read in opcode */
+		opcode = set_current(opcode, pmgr_recv_int(i));
+
+		/* read in additional data depending on current opcode */
+		int rank, code;
+		switch(opcode) {
+			case PMGR_OPEN: /* followed by rank */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_OPEN"); }
+				rank = pmgr_recv_int(i);
+				break;
+			case PMGR_CLOSE: /* no data, close the socket */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_CLOSE"); }
+				close(fd_by_rank[i]);
+				break;
+			case PMGR_ABORT: /* followed by exit code */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_ABORT"); }
+				code = pmgr_recv_int(i);
+				pmgr_error("received abort code %d from rank %d @ file %s:%d", code, i, __FILE__, __LINE__);
+				break;
+			case PMGR_BARRIER: /* no data */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_BARRIER"); }
+				break;
+			case PMGR_BCAST: /* root, size of message, then message data (from root only) */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_BCAST"); }
+				root = set_current(root, pmgr_recv_int(i));
+				size = set_current(size, pmgr_recv_int(i));
+				if (!buf) { buf = (void*) pmgr_malloc(size, "Bcast buffer"); }
+				if (i == root) { pmgr_recv(buf, size, i); }
+				break;
+			case PMGR_GATHER: /* root, size of message, then message data */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_GATHER"); }
+				root = set_current(root, pmgr_recv_int(i));
+				size = set_current(size, pmgr_recv_int(i));
+				if (!buf) { buf = (void*) pmgr_malloc(size * N, "Gather buffer"); }
+				pmgr_recv((char *)buf + size*i, size, i);
+				break;
+			case PMGR_SCATTER: /* root, size of message, then message data */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_SCATTER"); }
+				root = set_current(root, pmgr_recv_int(i));
+				size = set_current(size, pmgr_recv_int(i));
+				if (!buf) { buf = (void*) pmgr_malloc(size * N, "Scatter buffer"); }
+				if (i == root) { pmgr_recv(buf, size * N, i); }
+				break;
+			case PMGR_ALLGATHER: /* size of message, then message data */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_ALLGATHER"); }
+				size = set_current(size, pmgr_recv_int(i));
+				if (!buf) { buf = (void*) pmgr_malloc(size * N, "Allgather buffer"); }
+				pmgr_recv((char *)buf + size*i, size, i);
+				break;
+			case PMGR_ALLTOALL: /* size of message, then message data */
+				if (i==0) { pmgr_debug(1, "Receiving data for PMGR_ALLTOALL"); }
+				size = set_current(size, pmgr_recv_int(i));
+				if (!buf) { buf = (void*) pmgr_malloc(size * N * N, "Alltoall buffer"); }
+				pmgr_recv((char *)buf + (size*N)*i, size * N, i);
+				break;
+			default:
+				pmgr_error("unrecognized PMGR_COLLECTIVE opcode: %d @ file %s:%d", opcode, __FILE__, __LINE__);
+				return -1;
+		}
+	} /* end for each process, read in one packet (opcode and its associated data) */
+
+	/* Complete operation */
+	switch(opcode) {
+		case PMGR_OPEN:
+			pmgr_debug(1, "Sending data for PMGR_OPEN");
+			pmgr_debug(1, "Completed PMGR_OPEN");
+			break;
+		case PMGR_CLOSE:
+			pmgr_debug(1, "Sending data for PMGR_CLOSE");
+			pmgr_debug(1, "Completed PMGR_CLOSE");
+			exit = 1;
+			break;
+		case PMGR_ABORT:
+			pmgr_debug(1, "Sending data for PMGR_ABORT");
+			pmgr_debug(1, "Completed PMGR_ABORT");
+			exit = 1;
+			break;
+		case PMGR_BARRIER: /* (just echo the opcode back) */
+			pmgr_debug(1, "Sending data for PMGR_BARRIER");
+			pmgr_allgatherbcast(&opcode, sizeof(opcode));
+			pmgr_debug(1, "Completed PMGR_BARRIER");
+			break;
+		case PMGR_BCAST:
+			pmgr_debug(1, "Sending data for PMGR_BCAST");
+			pmgr_allgatherbcast(buf, size);
+			pmgr_debug(1, "Completed PMGR_BCAST");
+			break;
+		case PMGR_GATHER:
+			pmgr_debug(1, "Sending data for PMGR_GATHER");
+			pmgr_send(buf, size * N, root);
+			pmgr_debug(1, "Completed PMGR_GATHER");
+			break;
+		case PMGR_SCATTER:
+			pmgr_debug(1, "Sending data for PMGR_SCATTER");
+			pmgr_scatterbcast(buf, size);
+			pmgr_debug(1, "Completed PMGR_SCATTER");
+			break;
+		case PMGR_ALLGATHER:
+			pmgr_debug(1, "Sending data for PMGR_ALLGATHER");
+			pmgr_allgatherbcast(buf, size * N);
+			pmgr_debug(1, "Completed PMGR_ALLGATHER");
+			break;
+		case PMGR_ALLTOALL:
+			pmgr_debug(1, "Sending data for PMGR_ALLTOALL");
+			pmgr_alltoallbcast(buf, size);
+			pmgr_debug(1, "Completed PMGR_ALLTOALL");
+			break;
+		default:
+			pmgr_error("unrecognized PMGR_COLLECTIVE opcode: %d @ file %s:%d", opcode, __FILE__, __LINE__);
+	} /* end switch(opcode) for Completing operations */
+
+	pmgr_free(buf);
+        pmgr_gettimeofday(&time_endop);
+        pmgr_debug(1, "Operation took %f seconds for %d procs", pmgr_getsecs(&time_endop, &time_startop), nprocs);
+  } /* while(!exit) must be more opcodes to process */
+
+  pmgr_gettimeofday(&time_end);
+  pmgr_debug(1, "Completed processing PMGR opcodes; took %f seconds for %d procs", pmgr_getsecs(&time_end, &time_start), nprocs);
+
+  return PMGR_SUCCESS;
+}
diff -Nru mpiexec/pmgr_collective_mpirun.h mpiexec_mv1.0/pmgr_collective_mpirun.h
--- mpiexec/pmgr_collective_mpirun.h	1970-01-01 01:00:00.000000000 +0100
+++ mpiexec_mv1.0/pmgr_collective_mpirun.h	2008-04-02 15:55:37.000000000 +0200
@@ -0,0 +1,60 @@
+/*
+ * PMGR_COLLECTIVE ============================================================
+ * This protocol enables MPI to bootstrap itself through a series of collective
+ * operations.  The collective operations are modeled after MPI collectives --
+ * all tasks must call them in the same order and with consistent parameters.
+ *
+ * MPI may invoke any number of collectives, in any order, passing an arbitrary
+ * amount of data.  All message sizes are specified in bytes.
+ * PMGR_COLLECTIVE ============================================================
+ *
+ * This file defines the interface used by mpirun.  The mpirun process should call
+ * pmgr_processops after accepting connections from the MPI tasks and negotiating
+ * the protocol version number (PMGR_COLLECTIVE uses protocol 8).
+ *
+ * It should provide an array of open socket file descriptors indexed by MPI rank
+ * (fds) along with the number of MPI tasks (nprocs) as arguments.
+ *
+ * pmgr_processops will handle all PMGR_COLLECTIVE operations and return control
+ * upon an error or after receiving PMGR_CLOSE from the MPI tasks.  If no errors
+ * are encountered, it will close all socket file descriptors before returning.
+ *
+ * Copyright (C) 2007 The Regents of the University of California.
+ * Produced at Lawrence Livermore National Laboratory.
+ * Author: Adam Moody <moody20 at llnl.gov>
+ *
+ * Distributed under the BSD-style license (See LICENSE_MVAPICH)
+*/
+
+#ifndef _PMGR_COLLECTIVE_MPIRUN_H
+#define _PMGR_COLLECTIVE_MPIRUN_H
+
+#include "pmgr_collective_common.h"
+
+/*
+  Prototypes of all functions
+*/
+void pmgr_send(void* buf, int size, int rank);
+void pmgr_recv(void* buf, int size, int rank);
+int pmgr_recv_int(int rank);
+void pmgr_scatterbcast(void* buf, int size);
+void pmgr_allgatherbcast(void* buf, int size);
+void pmgr_alltoallbcast(void* buf, int size);
+int set_current(int curr, int new);
+
+/*
+  pmgr_processops
+  This function carries out pmgr_collective operations to bootstrap MPI.
+  These collective operations are modeled after MPI collectives -- all tasks
+  must call them in the same order and with consistent parameters.
+
+  fds - integer array of open sockets (file descriptors)
+        indexed by MPI rank
+  nprocs - number of MPI tasks in job
+
+  returns PMGR_SUCCESS on success
+  If no errors are encountered, all sockets are closed before returning.
+*/
+int pmgr_processops(int* fds, int nprocs);
+
+#endif /* _PMGR_COLLECTIVE_MPIRUN_H */


More information about the mpiexec mailing list