Patch for MVAPICH-1.0beta

Frank Mietke frank.mietke at informatik.tu-chemnitz.de
Tue Feb 5 08:30:43 EST 2008


Hi Pete,

>> Can you look this over and test?  The only substantive change I made
>> from your version (barring missed pieces) was to change the way you
>> managed the poll set to keep the sockets open for the second phase.
>> I made it more apparent that this was a hack, and put it up in the
>> phase transition code area, and handled poll() users too.  Still not
>> so pretty, but I'm impressed you were able to accommodate this new
>> style in the existing code!

>Okay, I compiled mpiexec two times, first with --disable-poll and second with
>--enable-poll. Both went successfully through runtest.pl script. So, it seems
>working correctly. ;)

I've found a small mistake in your addings for poll() users. But the runtest.pl 
script has not thrown any error message, strange. Maybe some path mangling on my
site here. 

In your for-loop you did:
pfs[num_waiting_to_read].fd = fds[i];
...
but the value num_waiting_to_read didn't change inside this loop.

It should be:
pfs[i].fd = fds[i];
...

The latest diff against my SVN checkout as attachment.

Best Regards,
Frank


-- 
Dipl.-Inf. Frank Mietke     |     Fakultätsrechen- und Informationszentrum
Tel.: 0371 - 531 - 35538    |     Fak. für Informatik
Fax:  0371 - 531 8 35538    |     TU-Chemnitz
Key-ID: 60F59599            |     frank.mietke at informatik.tu-chemnitz.de
-------------- next part --------------
--- ib.c_old	2007-12-17 13:26:27.000000000 +0100
+++ ib.c	2008-02-05 10:04:21.000000000 +0100
@@ -30,7 +30,10 @@
 static int address_size = 0;
 static char *pids = 0;
 static int pids_size = 0;
-static int phase = 0;  /* for two-phase version 5 */
+static int phase = 0;  /* for two-phase versions 5 and 6 */
+static int *hca_type = NULL;
+static int hca_first_rank = -1;  /* to determine if homogeneous or not */
+static int is_homogeneous = 1;   /* until proven otherwise */
 
 /* state of all the sockets */
 static int num_waiting_to_accept;  /* first accept all numtasks */
@@ -169,6 +172,23 @@
  *     pids[]    # <pidlen> bytes
  *   Write back personalized out_addrs[] and full pids[].
  *
+ * Version 6:
+ *   It's 2 phase based like version 5 with a major change that the socket
+ *   to each task stays open across phases, and some data tweaks.
+ *  First phase distributes hostids and hca_types:
+ *    version    # 6
+ *    rank       # 0..np-1
+ *    hostidlen  # 4 bytes
+ *    hostid     # <hostidlen> bytes
+ *    hca_type   # 4 bytes
+ *  Write back is_homogeneous (4 bytes)  and the entire hostid[] array. 
+ *  Keep the fds open, go to phase 2 and gather:
+ *    addrlen    # 4 bytes, could be 0
+ *    addrs[]    # <addrlen> bytes
+ *    pidlen     # 4 bytes
+ *    pids[]     # <pidlen> bytes
+ *  Write back personalized out_addrs[] and full pids[].
+ *
  * Return negative on error, or new rank number for success.
  */
 static int read_ib_one(int fd)
@@ -178,15 +198,28 @@
     int j, ret = -1;
     pid_t pidlen;
 
-    if (version == 5 && phase == 1) {
+    if ((version == 5 || version == 6) && phase == 1) {
 	/* no version again on second phase */
 	testvers = version;
     } else {
 	if (read_full_ret(fd, &testvers, sizeof(int)) != sizeof(int))
 	    goto out;
     }
-    if (read_full_ret(fd, &rank, sizeof(int)) != sizeof(int))
-	goto out;
+
+    if (version == 6 && phase == 1) {
+	/* since socket stays open, tasks do not send rank, so figure it out
+	 * from the fd the slow way.  This can not fail.
+	 */
+	for (j = 0; j < numtasks; j++)
+	    if (fds[j] == fd) {
+		rank = j;
+		break;
+	    }
+    } else{
+	if (read_full_ret(fd, &rank, sizeof(int)) != sizeof(int))
+	    goto out;
+    }
+
     if (read_full_ret(fd, &addrlen, sizeof(int)) != sizeof(int))
 	goto out;
 
@@ -218,11 +251,12 @@
 
     if (version == -1) {
 	version = testvers;
-	if (!(version == 1 || version == 2 || version == 3 || version == 5)) {
+	if (!(version == 1 || version == 2 || version == 3 || version == 5 ||
+	      version == 6)) {
 	    warning(
 	      "%s: protocol version %d not known, but might still work",
 	      __func__, version);
-	    version = 5;  /* guess the latest still works */
+	    version = 6;  /* guess the latest still works */
 	}
 	debug(1, "%s: version %d startup%s", __func__, version,
 	  non_versioned_092 ? " (unversioned)" : "");
@@ -249,6 +283,10 @@
 	          __func__, rank, addrlen, address_size);
     }
 
+    /* New to protocol version 6 of MVAPICH-1.0 */
+    if (version == 6 && hca_type == NULL)
+	    hca_type = malloc(numtasks * sizeof(int));
+
     if (non_versioned_092) {
 	/* push back the bit we accidentally read in guessing the version */
 	for (j=0; j<4; j++)
@@ -262,7 +300,16 @@
 	    goto out;
     }
 
-    if (version == 3 || (version == 5 && phase == 1)) {
+    if (version == 6 && phase == 0) {
+	if (read_full_ret(fd, &hca_type[rank], sizeof(int)) != sizeof(int))
+	    goto out;
+	if (hca_first_rank == -1)
+	    hca_first_rank = rank;
+	if (hca_type[hca_first_rank] != hca_type[rank])
+	    is_homogeneous = 0;
+    }
+
+    if (version == 3 || ((version == 5 || version == 6) && phase == 1)) {
 	read_full(fd, &pidlen, sizeof(pidlen));
 	if (!pids) {
 	    pids_size = pidlen;
@@ -366,23 +413,45 @@
 	    }
 	    free(pids);
 	}
-    } else if (version == 5) {
+    } else if (version == 5 || version == 6) {
 	if (phase == 0) {
 	    /* These are actually the hostids, in mvapich parlance.  Next
 	     * phase will be the personalized addresses. */
 	    for (i=0; i<numtasks; i++) {
+		if (version == 6)
+		    if (write_full(fds[i], &is_homogeneous, sizeof(int)) < 0)
+			error_errno("%s: write homogeneous flag to rank %d",
+				    __func__, i);
 		if (write_full(fds[i], address, numtasks * address_size) < 0)
 		    error_errno("%s: write addresses to rank %d", __func__, i);
 	    }
 	    phase = 1;
-	    for (i=0; i<numtasks; i++) {
-		close(fds[i]);
-		fds[i] = -1;
+	    if (version == 5) {
+		for (i=0; i<numtasks; i++) {
+		    close(fds[i]);
+		    fds[i] = -1;
+		}
+		num_waiting_to_accept = numtasks;  /* will reconnect */
+	    } else {
+		/* Hackity hack.  Put these back on the ready-to-read list. */
+		for (i=0; i<numtasks; i++) {
+#ifdef HAVE_POLL
+		    pfs[i].fd = fds[i];
+		    pfs[i].events = POLLIN;
+		    pfs[i].revents = 0;
+#else
+		    FD_SET(fds[i], &rfs);
+#endif
+		}
+		num_waiting_to_read = numtasks;  /* socks stay open */
 	    }
 	    address_size = 0;
 	    free(address);
 	    address = NULL;
-	    num_waiting_to_accept = numtasks;
+	    if (version == 6) {
+		free(hca_type);
+		hca_type = NULL;
+	    }
 	    goto next_phase;
 	} else if (phase == 1) {
 	    /*
@@ -626,6 +695,9 @@
 		ret = rank;
 		goto out;  /* let obit poll catch it later */
 	    }
+		
+		if (version == 6 && phase == 1) /* fds not cleaned during phase transition */
+			continue;
 
 	    /* rank checked in already? */
 	    if (fds[rank] != -1)


More information about the mpiexec mailing list