This is the mail archive of the cluster-cvs@sourceware.org mailing list for the cluster.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

RHEL5 - clogd: Additional fixes for bug 460156 and 464550


Gitweb:        http://git.fedorahosted.org/git/cluster.git?p=cluster.git;a=commitdiff;h=e07369b28d7a569e742d80152ef10c9d42bc2650
Commit:        e07369b28d7a569e742d80152ef10c9d42bc2650
Parent:        6714b7fe4d3fb1aca04e2996e9b0763c8be2814e
Author:        Jonathan Brassow <jbrassow@redhat.com>
AuthorDate:    Mon Oct 20 12:27:08 2008 -0500
Committer:     Jonathan Brassow <jbrassow@redhat.com>
CommitterDate: Mon Oct 20 12:27:54 2008 -0500

clogd: Additional fixes for bug 460156 and 464550

- Fixed a checkpoint ordering issue, where a checkpoint could be populated
  with data from the wrong moment in time.
- Fixed problem where resending requests when a server leaves would include
  requests that should have been recieved after the server had gone (so no
  resend should have been necessary).
---
 cmirror-kernel/src/dm-clog.c |    3 +
 cmirror/src/clogd.c          |    2 +-
 cmirror/src/cluster.c        |  561 +++++++++++++++++++++++++++++-------------
 3 files changed, 390 insertions(+), 176 deletions(-)

diff --git a/cmirror-kernel/src/dm-clog.c b/cmirror-kernel/src/dm-clog.c
index f21823e..626f1c8 100644
--- a/cmirror-kernel/src/dm-clog.c
+++ b/cmirror-kernel/src/dm-clog.c
@@ -67,6 +67,9 @@ retry:
 	r = dm_clog_consult_server(uuid, request_type, data,
 				   data_size, rdata, rdata_size);
 
+	if (r)
+		DMERR("cluster_do_request error: %d", r);
+
 	if (r != -ESRCH)
 		return r;
 
diff --git a/cmirror/src/clogd.c b/cmirror/src/clogd.c
index 0fa00ec..99543ba 100644
--- a/cmirror/src/clogd.c
+++ b/cmirror/src/clogd.c
@@ -43,7 +43,7 @@ int main(int argc, char *argv[])
 	/* Parent can now exit, we're ready to handle requests */
 	kill(getppid(), SIGTERM);
 
-	set_priority();
+	/* set_priority(); -- let's try to do w/o this */
 
 	LOG_PRINT("Starting clogd:");
 	LOG_PRINT(" Built: "__DATE__" "__TIME__"\n");
diff --git a/cmirror/src/cluster.c b/cmirror/src/cluster.c
index 16289e9..24c2540 100644
--- a/cmirror/src/cluster.c
+++ b/cmirror/src/cluster.c
@@ -52,13 +52,25 @@
 
 #define DM_CLOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
 #define DM_CLOG_CHECKPOINT_READY 21
-#define DM_CLOG_CONFIG_CHANGE    22
+#define DM_CLOG_MEMBER_JOIN      22
+#define DM_CLOG_MEMBER_LEAVE     23
+
+#define _RQ_TYPE(x) \
+	((x) == DM_CLOG_CHECKPOINT_READY) ? "DM_CLOG_CHECKPOINT_READY": \
+	((x) == DM_CLOG_MEMBER_JOIN) ? "DM_CLOG_MEMBER_JOIN": \
+	((x) == DM_CLOG_MEMBER_LEAVE) ? "DM_CLOG_MEMBER_LEAVE": \
+	RQ_TYPE((x) & ~DM_CLOG_RESPONSE)
+
 
 static uint32_t my_cluster_id = 0xDEAD;
 static SaCkptHandleT ckpt_handle = 0;
 static SaCkptCallbacksT callbacks = { 0, 0 };
 static SaVersionT version = { 'B', 1, 1 };
 
+#define DEBUGGING_HISTORY 20
+static char debugging[DEBUGGING_HISTORY][128];
+static int idx = 0;
+
 static int log_resp_rec = 0;
 
 struct checkpoint_data {
@@ -87,7 +99,10 @@ struct clog_cpg {
 	int cpg_state;  /* FIXME: debugging */
 	int free_me;
 	int resend_requests;
+	int delay;
+	struct queue *delay_queue;
 	struct queue *startup_queue;
+	struct queue *cluster_queue;
 
 	int checkpoints_needed;
 	uint32_t checkpoint_requesters[10];
@@ -126,11 +141,23 @@ int cluster_send(struct clog_tfr *tfr)
 	iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size;
 
 	if (entry->cpg_state != VALID) {
-		LOG_ERROR("[%s] Attempt to send request to cluster while CPG not valid: "
-			  "request = %s", SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type));
+		LOG_ERROR("[%s] Attempt to send %s to cluster while CPG not valid: %s",
+			  SHORT_UUID(tfr->uuid), (tfr->request_type & DM_CLOG_RESPONSE) ?
+			  "response" : "request",
+			  _RQ_TYPE(tfr->request_type));
 		return -EINVAL;
 	}
 
+	if ((tfr->request_type != DM_CLOG_CHECKPOINT_READY) &&
+	    (tfr->request_type != DM_CLOG_MEMBER_LEAVE) &&
+	    (entry->delay || !queue_empty(entry->delay_queue))) {
+		LOG_COND(log_resend_requests, "[%s] Delaying request%s, %s/%u",
+			 SHORT_UUID(tfr->uuid), queue_empty(entry->delay_queue) ? "" : "*",
+			 _RQ_TYPE(tfr->request_type), tfr->seq);
+		queue_add_tail(tfr, entry->delay_queue);
+		return 0;
+	}
+
 	do {
 		r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
 		if ((r != SA_AIS_ERR_TRY_AGAIN) || (count++ > 5))
@@ -158,7 +185,8 @@ static int clog_tfr_cmp(struct clog_tfr *a, struct clog_tfr *b)
 	return r;
 }
 
-static int handle_cluster_request(struct clog_tfr *tfr, int server)
+static int handle_cluster_request(struct clog_cpg *entry,
+				  struct clog_tfr *tfr, int server)
 {
 	int r = 0;
 
@@ -173,8 +201,7 @@ static int handle_cluster_request(struct clog_tfr *tfr, int server)
 	    (tfr->originator == my_cluster_id))
 		r = do_request(tfr, server);
 
-	if (server &&
-	    (tfr->request_type != DM_CLOG_CLEAR_REGION)) {
+	if (server && (tfr->request_type != DM_CLOG_CLEAR_REGION)) {
 		tfr->request_type |= DM_CLOG_RESPONSE;
 
 		/*
@@ -188,7 +215,8 @@ static int handle_cluster_request(struct clog_tfr *tfr, int server)
 	return r;
 }
 
-static int handle_cluster_response_poop(struct clog_tfr *tfr, uint32_t who)
+static int handle_cluster_response_poop(struct clog_cpg *entry,
+					struct clog_tfr *tfr, uint32_t who)
 {
 	int r = 0;
 	struct clog_tfr *orig_tfr;
@@ -200,7 +228,7 @@ static int handle_cluster_response_poop(struct clog_tfr *tfr, uint32_t who)
 		return 0;
 
 	tfr->request_type &= ~DM_CLOG_RESPONSE;
-	orig_tfr = queue_remove_match(cluster_queue, clog_tfr_cmp, tfr);
+	orig_tfr = queue_remove_match(entry->cluster_queue, clog_tfr_cmp, tfr);
 
 	if (!orig_tfr) {
 		struct list_head l, *p, *n;
@@ -208,31 +236,47 @@ static int handle_cluster_response_poop(struct clog_tfr *tfr, uint32_t who)
 
 		/* Unable to find match for response */
 
-		LOG_ERROR("[%s] No match for cluster response: %s:%u",
-			  SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
-			  tfr->seq);
+		LOG_ERROR("[%s] No match for cluster response from %u: %s:%u",
+			  SHORT_UUID(tfr->uuid), who,
+			  _RQ_TYPE(tfr->request_type), tfr->seq);
 
 		INIT_LIST_HEAD(&l);
 		queue_remove_all(&l, cluster_queue);
-		LOG_ERROR("Current list:");
+		LOG_ERROR("Current global list:");
 		if (list_empty(&l))
 			LOG_ERROR("   [none]");
+
 		list_for_each_safe(p, n, &l) {
 			list_del_init(p);
 			t = (struct clog_tfr *)p;
 			LOG_ERROR("   [%s]  %s:%u", SHORT_UUID(t->uuid),
-				  RQ_TYPE(t->request_type),
+				  _RQ_TYPE(t->request_type),
 				  t->seq);
 			queue_add(t, cluster_queue);
 		}
 
+		INIT_LIST_HEAD(&l);
+		queue_remove_all(&l, entry->cluster_queue);
+		LOG_ERROR("Current local list:");
+		if (list_empty(&l))
+			LOG_ERROR("   [none]");
+
+		list_for_each_safe(p, n, &l) {
+			list_del_init(p);
+			t = (struct clog_tfr *)p;
+			LOG_ERROR("   [%s]  %s:%u", SHORT_UUID(t->uuid),
+				  _RQ_TYPE(t->request_type),
+				  t->seq);
+			queue_add(t, entry->cluster_queue);
+		}
+
 		return -EINVAL;
 	}
 
 	if (log_resp_rec > 0) {
 		LOG_COND(log_resend_requests,
 			 "[%s] Response received to %s/#%u from %u",
-			 SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+			 SHORT_UUID(tfr->uuid), _RQ_TYPE(tfr->request_type),
 			 tfr->seq, who);
 		log_resp_rec--;
 	}
@@ -273,6 +317,16 @@ static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
 	int r;
 	struct checkpoint_data *new;
 
+	if (entry->state != VALID) {
+		/*
+		 * We can't store bitmaps yet, because the log is not
+		 * valid yet.
+		 */
+		LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
+			  cp_requester);
+		return NULL;
+	}
+
 	new = malloc(sizeof(*new));
 	if (!new) {
 		LOG_ERROR("Unable to create checkpoint data for %u",
@@ -283,47 +337,37 @@ static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
 	new->requester = cp_requester;
 	strncpy(new->uuid, entry->name.value, entry->name.length);
 
-	if (entry->state == VALID) {
-		new->bitmap_size = push_state(entry->name.value, "clean_bits",
-					      &new->clean_bits);
-		if (new->bitmap_size <= 0) {
-			LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
-				  new->requester);
-			free(new);
-			return NULL;
-		}
+	new->bitmap_size = push_state(entry->name.value, "clean_bits",
+				      &new->clean_bits);
+	if (new->bitmap_size <= 0) {
+		LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
+			  new->requester);
+		free(new);
+		return NULL;
+	}
 
-		new->bitmap_size = push_state(entry->name.value,
-					      "sync_bits", &new->sync_bits);
-		if (new->bitmap_size <= 0) {
-			LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
-				  new->requester);
-			free(new->clean_bits);
-			free(new);
-			return NULL;
-		}
+	new->bitmap_size = push_state(entry->name.value,
+				      "sync_bits", &new->sync_bits);
+	if (new->bitmap_size <= 0) {
+		LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
+			  new->requester);
+		free(new->clean_bits);
+		free(new);
+		return NULL;
+	}
 
-		r = push_state(entry->name.value, "recovering_region", &new->recovering_region);
-		if (r <= 0) {
-			LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
-				  new->requester);
-			free(new->sync_bits);
-			free(new->clean_bits);
-			free(new);
-			return NULL;
-		}
-		LOG_DBG("[%s] Checkpoint prepared for node %u:",
-			SHORT_UUID(new->uuid), new->requester);
-		LOG_DBG("  bitmap_size = %d", new->bitmap_size);
-	} else {
-		/*
-		 * We can't store bitmaps yet, because the log is not
-		 * valid yet.
-		 */
-		LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
+	r = push_state(entry->name.value, "recovering_region", &new->recovering_region);
+	if (r <= 0) {
+		LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
 			  new->requester);
-		new->bitmap_size = 0;
+		free(new->sync_bits);
+		free(new->clean_bits);
+		free(new);
+		return NULL;
 	}
+	LOG_DBG("[%s] Checkpoint prepared for node %u:",
+		SHORT_UUID(new->uuid), new->requester);
+	LOG_DBG("  bitmap_size = %d", new->bitmap_size);
 
 	return new;
 }
@@ -703,76 +747,88 @@ static void do_checkpoints(struct clog_cpg *entry)
 static int resend_requests(struct clog_cpg *entry)
 {
 	int r = 0;
-	struct list_head l, *p, *n;
+	struct list_head resend, delay, *p, *n;
 	struct clog_tfr *tfr;
 
-	if (!entry->resend_requests)
+	if (entry->delay)
 		return 0;
 
-	if (entry->resend_requests == 1) {
-		LOG_COND(log_resend_requests,
-			 "[%s]  Giving another chance before resending requests",
-			 SHORT_UUID(entry->name.value));
-		sleep(1);
-		entry->resend_requests++;
-		return 1;
-	}
+	INIT_LIST_HEAD(&resend);
+	INIT_LIST_HEAD(&delay);
+	queue_remove_all(&delay, entry->delay_queue);
 
-	entry->resend_requests = 0;
+	if (entry->resend_requests) {
+		entry->resend_requests = 0;
 
-	log_resp_rec=2;
-	INIT_LIST_HEAD(&l);
-	queue_remove_all(&l, cluster_queue);
+		log_resp_rec = 0;
+		queue_remove_all(&resend, entry->cluster_queue);
 
-	list_for_each_safe(p, n, &l) {
-		list_del_init(p);
-		tfr = (struct clog_tfr *)p;
+		list_for_each_safe(p, n, &resend) {
+			list_del_init(p);
+			tfr = (struct clog_tfr *)p;
 
-		if (!strcmp(entry->name.value, tfr->uuid)) {
-			switch (tfr->request_type) {
-			case DM_CLOG_POSTSUSPEND:
-				/*
-				 * Don't resend DM_CLOG_POSTSUSPEND request, it will
-				 * be handled when we get our own config leave
-				 */
-				queue_add(tfr, cluster_queue);
-				break;
-			case DM_CLOG_RESUME:
-				/* We are only concerned about this request locally */
-			case DM_CLOG_SET_REGION_SYNC:
-				/*
-				 * Some requests simply do not need to be resent.
-				 * If it is a request that just changes log state,
-				 * then it doesn't need to be resent (everyone makes
-				 * updates).
-				 */
-				LOG_COND(log_resend_requests,
-					 "[%s] Skipping resend of %s...",
-					  SHORT_UUID(entry->name.value),
-					  RQ_TYPE(tfr->request_type));
-				tfr->data_size = 0;
-				kernel_send(tfr);
+			if (!strcmp(entry->name.value, tfr->uuid)) {
+				switch (tfr->request_type) {
+				case DM_CLOG_POSTSUSPEND:
+					/*
+					 * Don't resend DM_CLOG_POSTSUSPEND request, it will
+					 * be handled when we get our own config leave
+					 */
+					queue_add_tail(tfr, cluster_queue);
+					break;
+				case DM_CLOG_RESUME:
+					/* We are only concerned about this request locally */
+				case DM_CLOG_SET_REGION_SYNC:
+					/*
+					 * Some requests simply do not need to be resent.
+					 * If it is a request that just changes log state,
+					 * then it doesn't need to be resent (everyone makes
+					 * updates).
+					 */
+					LOG_COND(log_resend_requests,
+						 "[%s] Skipping resend of %s/#%u...",
+						 SHORT_UUID(entry->name.value),
+						 _RQ_TYPE(tfr->request_type), tfr->seq);
+					tfr->data_size = 0;
+					kernel_send(tfr);
 				
-				break;
-			default:
-				/*
-				 * If an action or a response is required, then
-				 * the request must be resent.
-				 */
-				r = 1;
-				log_resp_rec++;
-				LOG_COND(log_resend_requests,
-					 "[%s] Resending %s(#%u) due to new server(%u)",
-					  SHORT_UUID(entry->name.value),
-					  RQ_TYPE(tfr->request_type),
-					  tfr->seq, entry->lowest_id);
-				queue_add(tfr, cluster_queue);
-				if (cluster_send(tfr))
-					LOG_ERROR("Failed resend");
+					break;
+				default:
+					/*
+					 * If an action or a response is required, then
+					 * the request must be resent.
+					 */
+					r = 1;
+					log_resp_rec++;
+					LOG_COND(log_resend_requests,
+						 "[%s] Resending %s(#%u) due to new server(%u)",
+						 SHORT_UUID(entry->name.value),
+						 _RQ_TYPE(tfr->request_type),
+						 tfr->seq, entry->lowest_id);
+					queue_add_tail(tfr, cluster_queue);
+					if (cluster_send(tfr))
+						LOG_ERROR("Failed resend");
+				}
 			}
 		}
 	}
 
+	list_for_each_safe(p, n, &delay) {
+		list_del_init(p);
+		tfr = (struct clog_tfr *)p;
+
+		LOG_COND(log_resend_requests,
+			 "[%s] Sending delayed request, %s/#%u",
+			 SHORT_UUID(entry->name.value),
+			 _RQ_TYPE(tfr->request_type),
+			 tfr->seq);
+		queue_add_tail(tfr, cluster_queue);
+
+		if ((tfr->request_type != DM_CLOG_POSTSUSPEND) &&
+		    cluster_send(tfr))
+			LOG_ERROR("Failed resend");
+	}
+
 	return r;
 }
 
@@ -790,6 +846,9 @@ static int do_cluster_work(void *data)
 				LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
 
 			if (entry->free_me) {
+				free(entry->cluster_queue);
+				free(entry->delay_queue);
+				free(entry->startup_queue);
 				free(entry);
 				continue;
 			}
@@ -821,17 +880,17 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 	if (tfr->request_type & DM_CLOG_RESPONSE)
 		LOG_DBG("Response to %u from %u received [%s/%u]",
 			tfr->originator, nodeid,
-			RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
+			_RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
 			tfr->seq);
 	else
 		LOG_DBG("Request from %u received [%s/%u]",
-			nodeid, RQ_TYPE(tfr->request_type),
+			nodeid, _RQ_TYPE(tfr->request_type),
 			tfr->seq);
 
 	if (my_cluster_id == 0xDEAD) {
 		LOG_ERROR("[%s]  Message from %u before init [%s/%u]",
 			  SHORT_UUID(tfr->uuid), nodeid,
-			  RQ_TYPE(tfr->request_type), tfr->seq);
+			  _RQ_TYPE(tfr->request_type), tfr->seq);
 		return;
 	}
 
@@ -844,10 +903,61 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 	if (match->lowest_id == 0xDEAD) {
 		LOG_ERROR("[%s]  Message from %u before init* [%s/%u]",
 			  SHORT_UUID(tfr->uuid), nodeid,
-			  RQ_TYPE(tfr->request_type), tfr->seq);
+			  _RQ_TYPE(tfr->request_type), tfr->seq);
 		return;
 	}
 
+	if (nodeid == my_cluster_id) {
+		struct clog_tfr *orig_tfr;
+
+		orig_tfr = queue_remove_match(cluster_queue, clog_tfr_cmp, tfr);
+		if (orig_tfr)
+			queue_add_tail(orig_tfr, match->cluster_queue);
+	}
+
+	if (tfr->request_type == DM_CLOG_MEMBER_LEAVE) {
+		/*
+		 * If the server (lowest_id) indicates it is leaving,
+		 * then we must resend any outstanding requests.  However,
+		 * we do not want to resend them if the next server in
+		 * line is in the process of leaving.
+		 */
+		if (nodeid == my_cluster_id)
+			LOG_COND(log_resend_requests, "[%s] I am leaving (2).....",
+				 SHORT_UUID(tfr->uuid));
+
+		if ((nodeid == match->lowest_id) &&
+		    (nodeid != my_cluster_id)) {
+			struct list_head l, *p, *n;
+			struct clog_tfr *t;
+
+			log_resp_rec++;
+
+			match->resend_requests = 1;
+			LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
+				 SHORT_UUID(tfr->uuid), nodeid,
+				 (queue_empty(match->cluster_queue)) ? " -- cluster_queue empty": "");
+			
+			INIT_LIST_HEAD(&l);
+			queue_remove_all(&l, match->cluster_queue);
+			list_for_each_safe(p, n, &l) {
+				list_del_init(p);
+				t = (struct clog_tfr *)p;
+
+				LOG_COND(log_resend_requests,
+					 "[%s]                %s/%u",
+					 SHORT_UUID(t->uuid), _RQ_TYPE(t->request_type), t->seq);
+				queue_add_tail(t, match->cluster_queue);
+			}
+		}
+		if (nodeid < my_cluster_id) {
+			match->delay++;
+			LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
+				 SHORT_UUID(tfr->uuid), nodeid, match->delay);
+		}
+		goto out;
+	}
+
 	/*
 	 * We can receive messages after we do a cpg_leave but before we
 	 * get our config callback.  However, since we can't respond after
@@ -872,7 +982,7 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 				match->state = VALID;
 
 				while ((startup_tfr = queue_remove(match->startup_queue))) {
-					if (startup_tfr->request_type == DM_CLOG_CONFIG_CHANGE) {
+					if (startup_tfr->request_type == DM_CLOG_MEMBER_JOIN) {
 						struct checkpoint_data *new;
 
 						new = prepare_checkpoint(match, startup_tfr->originator);
@@ -882,13 +992,15 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 								  startup_tfr->originator);
 							goto out;
 						}
+						LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
+							 SHORT_UUID(tfr->uuid), startup_tfr->originator);
 					} else {
 						LOG_DBG("Processing delayed request %d: %s",
 							match->startup_queue->count,
-							RQ_TYPE(startup_tfr->request_type));
+							_RQ_TYPE(startup_tfr->request_type));
 						i_was_server = (startup_tfr->error == my_cluster_id) ? 1 : 0;
 						startup_tfr->error = 0;
-						r = handle_cluster_request(startup_tfr, i_was_server);
+						r = handle_cluster_request(match, startup_tfr, i_was_server);
 
 						if (r) {
 							LOG_ERROR("Error while processing delayed CPG message");
@@ -909,36 +1021,34 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 	/*
 	 * If the log is now valid, we can queue the checkpoints
 	 */
-	for (i = match->checkpoints_needed; i;) {
+	for (i = match->checkpoints_needed; i; ) {
 		struct checkpoint_data *new;
 
 		i--;
-		if (log_get_state(tfr) != LOG_RESUMED) {
-			LOG_DBG("Skipping checkpoint for %u, my log is not ready",
-				match->checkpoint_requesters[i]);
-			continue;
-		}
 		new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
 		if (!new) {
 			/* FIXME: Need better error handling */
-			LOG_ERROR("Failed to prepare checkpoint for %u!!!",
-				  match->checkpoint_requesters[i]);
+			LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
+				  SHORT_UUID(tfr->uuid), match->checkpoint_requesters[i]);
 			break;
 		}
-		match->checkpoints_needed = i;
+		LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
+			 SHORT_UUID(tfr->uuid), match->checkpoint_requesters[i]);
+		match->checkpoints_needed--;
+
 		new->next = match->checkpoint_list;
 		match->checkpoint_list = new;
-	}
+	}		
 
 	if (tfr->request_type & DM_CLOG_RESPONSE) {
 		response = 1;
-		r = handle_cluster_response_poop(tfr, nodeid);
+		r = handle_cluster_response_poop(match, tfr, nodeid);
 	} else {
 		tfr->originator = nodeid;
 
 		if (match->state == LEAVING) {
 			LOG_ERROR("[%s]  Ignoring %s from %u.  Reason: I'm leaving",
-				  SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+				  SHORT_UUID(tfr->uuid), _RQ_TYPE(tfr->request_type),
 				  tfr->originator);
 			goto out;
 		}
@@ -964,14 +1074,14 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 			goto out;
 		}
 
-		r = handle_cluster_request(tfr, i_am_server);
+		r = handle_cluster_request(match, tfr, i_am_server);
 	}
 
 out:
 	if (r) {
 		LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
 			  SHORT_UUID(tfr->uuid),
-			  RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
+			  _RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
 			  strerror(-r));
 		LOG_ERROR("[%s]    Response  : %s", SHORT_UUID(tfr->uuid),
 			  (response) ? "YES" : "NO");
@@ -980,6 +1090,28 @@ out:
 		if (response)
 			LOG_ERROR("[%s]    Responder : %u",
 				  SHORT_UUID(tfr->uuid), nodeid);
+
+               LOG_ERROR("HISTORY::");
+               for (i = 0; i < DEBUGGING_HISTORY; i++) {
+                       idx++;
+                       idx = idx % DEBUGGING_HISTORY;
+                       if (debugging[idx][0] == '\0')
+                               continue;
+                       LOG_ERROR("%d:%d) %s", i, idx, debugging[idx]);
+               }
+       } else if (!(tfr->request_type & DM_CLOG_RESPONSE) ||
+                  (tfr->originator == my_cluster_id)) {
+               int len;
+               idx++;
+               idx = idx % DEBUGGING_HISTORY;
+               len = sprintf(debugging[idx],
+                             "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
+                             tfr->seq,
+                             SHORT_UUID(tfr->uuid),
+                             _RQ_TYPE(tfr->request_type),
+                             tfr->originator, (response) ? "YES" : "NO");
+               if (response)
+                       sprintf(debugging[idx] + len, ", RSPR=%u", nodeid);
 	}
 }
 
@@ -993,6 +1125,13 @@ static void cpg_join_callback(struct clog_cpg *match,
 	uint32_t lowest = match->lowest_id;
 	struct clog_tfr *tfr;
 
+	{
+               idx++;
+               idx = idx % DEBUGGING_HISTORY;
+               sprintf(debugging[idx], "+++  UUID=%s  %u join  +++",
+		       SHORT_UUID(match->name.value), joined->nodeid);
+	}
+
 	/* Assign my_cluster_id */
 	if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
 		my_cluster_id = joined->nodeid;
@@ -1007,13 +1146,14 @@ static void cpg_join_callback(struct clog_cpg *match,
 	if (joined->nodeid == my_cluster_id)
 		goto out;
 
-	LOG_DBG("Joining node, %u needs checkpoint", joined->nodeid);
+	LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint",
+		 SHORT_UUID(match->name.value), joined->nodeid);
 
 	/*
 	 * FIXME: remove checkpoint_requesters/checkpoints_needed, and use
 	 * the startup_queue interface exclusively
 	 */
-	if (queue_empty(match->startup_queue)) {
+	if (queue_empty(match->startup_queue) && (match->state == VALID)) {
 		match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
 		goto out;
 	}
@@ -1031,11 +1171,10 @@ static void cpg_join_callback(struct clog_cpg *match,
 			goto out;
 		}
 	}
-	tfr->request_type = DM_CLOG_CONFIG_CHANGE;
+	tfr->request_type = DM_CLOG_MEMBER_JOIN;
 	tfr->originator   = joined->nodeid;
 	queue_add_tail(tfr, match->startup_queue);
 
-
 out:
 	/* Find the lowest_id, i.e. the server */
 	match->lowest_id = member_list[0].nodeid;
@@ -1068,7 +1207,12 @@ static void cpg_leave_callback(struct clog_cpg *match,
 	uint32_t lowest = match->lowest_id;
 	struct clog_tfr *tfr;
 
-	INIT_LIST_HEAD(&l);
+	{
+               idx++;
+               idx = idx % DEBUGGING_HISTORY;
+               sprintf(debugging[idx], "---  UUID=%s  %u left  ---",
+		       SHORT_UUID(match->name.value), left->nodeid);
+	}
 
 	/* Am I leaving? */
 	if (my_cluster_id == left->nodeid) {
@@ -1080,8 +1224,8 @@ static void cpg_leave_callback(struct clog_cpg *match,
 
 		cluster_postsuspend(match->name.value);
 
+		INIT_LIST_HEAD(&l);
 		queue_remove_all(&l, cluster_queue);
-
 		list_for_each_safe(p, n, &l) {
 			list_del_init(p);
 			tfr = (struct clog_tfr *)p;
@@ -1098,6 +1242,18 @@ static void cpg_leave_callback(struct clog_cpg *match,
 				queue_add(tfr, free_queue);
 		}
 
+		INIT_LIST_HEAD(&l);
+		queue_remove_all(&l, match->cluster_queue);
+		list_for_each_safe(p, n, &l) {
+			list_del_init(p);
+			tfr = (struct clog_tfr *)p;
+
+			if (tfr->request_type == DM_CLOG_POSTSUSPEND)
+				kernel_send(tfr);
+			else
+				queue_add(tfr, free_queue);
+		}
+
 		cpg_finalize(match->handle);
 
 		if (match->startup_queue->count) {
@@ -1109,12 +1265,20 @@ static void cpg_leave_callback(struct clog_cpg *match,
 			}
 		}
 
-		free(match->startup_queue);
 		match->free_me = 1;
 		match->lowest_id = 0xDEAD;
 		match->state = INVALID;
 	}			
 
+	if (left->nodeid < my_cluster_id) {
+		match->delay = (match->delay > 0) ? match->delay - 1 : 0;
+		if (!match->delay && queue_empty(match->cluster_queue))
+			match->resend_requests = 0;
+		LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s",
+			 SHORT_UUID(match->name.value), left->nodeid,
+			 match->delay, (queue_empty(match->cluster_queue)) ? " -- cluster_queue empty": "");
+	}
+
 	/* Find the lowest_id, i.e. the server */
 	if (!member_list_entries) {
 		match->lowest_id = 0xDEAD;
@@ -1134,23 +1298,6 @@ static void cpg_leave_callback(struct clog_cpg *match,
 		LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u left)",
 			 SHORT_UUID(match->name.value), lowest,
 			 match->lowest_id, left->nodeid);
-
-		list_for_each_safe(p, n, &cluster_queue->list) {
-			tfr = (struct clog_tfr *)p;
-
-			if (!strcmp(match->name.value, tfr->uuid)) {
-				switch (tfr->request_type) {
-				case DM_CLOG_POSTSUSPEND:
-					/*
-					 * Don't resend DM_CLOG_POSTSUSPEND request, it will
-					 * be handled when we get our own config leave
-					 */
-					break;
-				default:
-					match->resend_requests = 1;
-				}
-			}
-		}
 	} else
 		LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u left)",
 			 SHORT_UUID(match->name.value), lowest, left->nodeid);
@@ -1200,7 +1347,7 @@ int create_cluster_cpg(char *str)
 {
 	int r;
 	int size;
-	struct clog_cpg *new;
+	struct clog_cpg *new = NULL;
 	struct clog_cpg *tmp, *tmp2;
 
 	list_for_each_entry_safe(tmp, tmp2, &clog_cpg_list, list)
@@ -1220,13 +1367,30 @@ int create_cluster_cpg(char *str)
 
 	new->startup_queue = malloc(sizeof(struct queue));
 	if (!new->startup_queue) {
-		free(new);
 		LOG_ERROR("Unable to allocate memory for clog_cpg");
-		return -ENOMEM;
+		r = -ENOMEM;
+		goto fail;
 	}
 	INIT_LIST_HEAD(&(new->startup_queue->list));
 	new->startup_queue->count = 0;
 
+	new->delay_queue = malloc(sizeof(struct queue));
+	if (!new->delay_queue) {
+		LOG_ERROR("Unable to allocate memory for clog_cpg");
+		r = -ENOMEM;
+		goto fail;
+	}
+	INIT_LIST_HEAD(&(new->delay_queue->list));
+	new->delay_queue->count = 0;
+
+	new->cluster_queue = malloc(sizeof(struct queue));
+	if (!new->cluster_queue) {
+		LOG_ERROR("Unable to allocate memory for clog_cpg");
+		r = -ENOMEM;
+		goto fail;
+	}
+	INIT_LIST_HEAD(&(new->cluster_queue->list));
+	new->cluster_queue->count = 0;
 
 	size = ((strlen(str) + 1) > CPG_MAX_NAME_LENGTH) ?
 		CPG_MAX_NAME_LENGTH : (strlen(str) + 1);
@@ -1236,13 +1400,15 @@ int create_cluster_cpg(char *str)
 	r = cpg_initialize(&new->handle, &cpg_callbacks);
 	if (r != SA_AIS_OK) {
 		LOG_ERROR("cpg_initialize failed:  Cannot join cluster");
-		return -EPERM;
+		r = -EPERM;
+		goto fail;
 	}
 
 	r = cpg_join(new->handle, &new->name);
 	if (r != SA_AIS_OK) {
 		LOG_ERROR("cpg_join failed:  Cannot join cluster");
-		return -EPERM;
+		r = -EPERM;
+		goto fail;
 	}
 
 	new->cpg_state = VALID;
@@ -1255,6 +1421,18 @@ int create_cluster_cpg(char *str)
 	links_register(r, "cluster", do_cluster_work, NULL);
 
 	return 0;
+
+fail:
+	if (new) {
+		if (new->startup_queue)
+			free(new->startup_queue);
+		if (new->delay_queue)
+			free(new->delay_queue);
+		if (new->cluster_queue)
+			free(new->cluster_queue);
+		free(new);
+	}
+	return r;
 }
 
 static void abort_startup(struct clog_cpg *del)
@@ -1263,16 +1441,22 @@ static void abort_startup(struct clog_cpg *del)
 	SaNameT name;
 	SaAisErrorT rv;
 	SaCkptCheckpointHandleT h;
-	struct clog_tfr *startup_tfr = NULL;
+	struct clog_tfr *tfr = NULL;
 
 	LOG_DBG("[%s]  CPG teardown before checkpoint received",
 		SHORT_UUID(del->name.value));
 
-	while ((startup_tfr = queue_remove(del->startup_queue))) {
+	while ((tfr = queue_remove(del->startup_queue))) {
 		LOG_DBG("[%s]  Ignoring request from %u: %s",
-			SHORT_UUID(del->name.value), startup_tfr->originator,
-			RQ_TYPE(startup_tfr->request_type));
-		queue_add(startup_tfr, free_queue);
+			SHORT_UUID(del->name.value), tfr->originator,
+			_RQ_TYPE(tfr->request_type));
+		queue_add(tfr, free_queue);
+	}
+
+	while ((tfr = queue_remove(del->delay_queue))) {
+		LOG_DBG("[%s]  Ignoring delayed request: %s",
+			SHORT_UUID(del->name.value), _RQ_TYPE(tfr->request_type));
+		queue_add(tfr, free_queue);
 	}
 
 	len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
@@ -1309,23 +1493,44 @@ unlink_retry:
 	saCkptCheckpointClose(h);
 }
 
-int destroy_cluster_cpg(char *str)
+static int _destroy_cluster_cpg(struct clog_cpg *del)
 {
 	int r;
+	struct clog_tfr *tfr;
+	
+	LOG_COND(log_resend_requests, "[%s] I am leaving (1).....",
+		 SHORT_UUID(del->name.value));
+
+	tfr = queue_remove(free_queue);
+	if (tfr) {
+		tfr->request_type = DM_CLOG_MEMBER_LEAVE;
+		tfr->originator = my_cluster_id;
+		tfr->seq = 0;
+		strncpy(tfr->uuid, del->name.value, CPG_MAX_NAME_LENGTH);
+		cluster_send(tfr);
+		queue_add(tfr, free_queue);
+	}
+
+	del->cpg_state = INVALID;
+	del->state = LEAVING;
+
+	if (!queue_empty(del->startup_queue) ||
+	    !queue_empty(del->delay_queue))
+		abort_startup(del);
+
+	r = cpg_leave(del->handle, &del->name);
+	if (r != CPG_OK)
+		LOG_ERROR("Error leaving CPG!");
+	return 0;
+}
+
+int destroy_cluster_cpg(char *str)
+{
 	struct clog_cpg *del, *tmp;
 
 	list_for_each_entry_safe(del, tmp, &clog_cpg_list, list)
-		if (!strncmp(del->name.value, str, CPG_MAX_NAME_LENGTH)) {
-			del->cpg_state = INVALID;
-			del->state = LEAVING;
-			if (!queue_empty(del->startup_queue))
-				abort_startup(del);
-
-			r = cpg_leave(del->handle, &del->name);
-			if (r != CPG_OK)
-				LOG_ERROR("Error leaving CPG!");
-			break;
-		}
+		if (!strncmp(del->name.value, str, CPG_MAX_NAME_LENGTH))
+			_destroy_cluster_cpg(del);
 
 	return 0;
 }
@@ -1334,6 +1539,12 @@ int init_cluster(void)
 {
 	SaAisErrorT rv;
 
+	{
+		int i;
+		for (i = 0; i < DEBUGGING_HISTORY; i++)
+			debugging[i][0] = '\0';
+	}
+
 	INIT_LIST_HEAD(&clog_cpg_list);
 	rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
 


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]