This is the mail archive of the cluster-cvs@sourceware.org mailing list for the cluster.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

RHEL5 - clogd: Updates to address bugs 464550 & 464934


Gitweb:        http://git.fedorahosted.org/git/cluster.git?p=cluster.git;a=commitdiff;h=33ac113b87bc000cb801de2a9f6f2bfe2f20cf22
Commit:        33ac113b87bc000cb801de2a9f6f2bfe2f20cf22
Parent:        adae80e90795c462f15ba1c09d7968afd00a7746
Author:        Jonathan Brassow <jbrassow@redhat.com>
AuthorDate:    Mon Oct 13 17:23:35 2008 -0500
Committer:     Jonathan Brassow <jbrassow@redhat.com>
CommitterDate: Mon Oct 13 17:23:35 2008 -0500

clogd: Updates to address bugs 464550 & 464934

Allow time for exiting server responses to settle out
before resending requests.

Various debugging enhancements included as well.
---
 cmirror-kernel/src/dm-clog-tfr.c |    9 --
 cmirror/src/Makefile             |   16 +++-
 cmirror/src/cluster.c            |  191 ++++++++++++++++++++++++++++----------
 cmirror/src/functions.c          |    2 +-
 cmirror/src/link_mon.c           |    9 +--
 cmirror/src/local.c              |   11 +--
 cmirror/src/logging.c            |   21 ++++
 cmirror/src/logging.h            |   25 ++----
 8 files changed, 184 insertions(+), 100 deletions(-)

diff --git a/cmirror-kernel/src/dm-clog-tfr.c b/cmirror-kernel/src/dm-clog-tfr.c
index 15d971c..3ceb320 100644
--- a/cmirror-kernel/src/dm-clog-tfr.c
+++ b/cmirror-kernel/src/dm-clog-tfr.c
@@ -32,7 +32,6 @@ struct receiving_pkg {
 	struct list_head list;
 	struct completion complete;
 
-	unsigned long long start_time;
 	uint32_t seq;
 
 	int error;
@@ -203,7 +202,6 @@ resend:
 	spin_lock(&receiving_list_lock);
 	list_add(&(pkg.list), &receiving_list);
 	spin_unlock(&receiving_list_lock);
-	pkg.start_time = jiffies;
 
 	r = dm_clog_sendto_server(tfr);
 
@@ -229,13 +227,6 @@ resend:
 		goto resend;
 	}
 
-	/* FIXME: Pull time checking code - it's just for profiling */
-	pkg.start_time = (jiffies - pkg.start_time);
-	do_div(pkg.start_time, HZ);
-	if (pkg.start_time > 0)
-		DMWARN("Excessive delay in request processing, %llu sec: [%s/%u]",
-		       pkg.start_time, RQ_TYPE(request_type), pkg.seq);
-
 	r = pkg.error;
 	if (r == -EAGAIN)
 		goto resend;
diff --git a/cmirror/src/Makefile b/cmirror/src/Makefile
index 892008b..c4e3b8b 100644
--- a/cmirror/src/Makefile
+++ b/cmirror/src/Makefile
@@ -31,15 +31,23 @@ else
 TARGET=no_files
 endif
 
-ifeq ($(DEBUG),log)
+ifneq ($(DEBUG), )
 CFLAGS += -DDEBUG
 endif
 
-CFLAGS += -g
-ifeq ($(DEBUG),all)
-CFLAGS += -DDEBUG -DDEBUG_FUNCTIONS
+ifneq ($(MEMB), )
+CFLAGS += -DMEMB
+endif
+
+ifneq ($(CKPT), )
+CFLAGS += -DCKPT
 endif
 
+ifneq ($(RESEND), )
+CFLAGS += -DRESEND
+endif
+
+CFLAGS += -g
 LDFLAGS += -L${libdir}/openais -L${libdir} -lcpg -lSaCkpt -lext2fs
 
 all: ${TARGET}
diff --git a/cmirror/src/cluster.c b/cmirror/src/cluster.c
index 8c49dee..16289e9 100644
--- a/cmirror/src/cluster.c
+++ b/cmirror/src/cluster.c
@@ -59,6 +59,8 @@ static SaCkptHandleT ckpt_handle = 0;
 static SaCkptCallbacksT callbacks = { 0, 0 };
 static SaVersionT version = { 'B', 1, 1 };
 
+static int log_resp_rec = 0;
+
 struct checkpoint_data {
 	uint32_t requester;
 	char uuid[CPG_MAX_NAME_LENGTH];
@@ -84,6 +86,7 @@ struct clog_cpg {
 	int state;
 	int cpg_state;  /* FIXME: debugging */
 	int free_me;
+	int resend_requests;
 	struct queue *startup_queue;
 
 	int checkpoints_needed;
@@ -185,7 +188,7 @@ static int handle_cluster_request(struct clog_tfr *tfr, int server)
 	return r;
 }
 
-static int handle_cluster_response(struct clog_tfr *tfr)
+static int handle_cluster_response_poop(struct clog_tfr *tfr, uint32_t who)
 {
 	int r = 0;
 	struct clog_tfr *orig_tfr;
@@ -226,6 +229,14 @@ static int handle_cluster_response(struct clog_tfr *tfr)
 		return -EINVAL;
 	}
 
+	if (log_resp_rec > 0) {
+		LOG_COND(log_resend_requests,
+			 "[%s] Response received to %s/#%u from %u",
+			 SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+			 tfr->seq, who);
+		log_resp_rec--;
+	}
+
 	/* FIXME: Ensure memcpy cannot explode */
 	memcpy(orig_tfr, tfr, sizeof(*tfr) + tfr->data_size);
 	r = kernel_send(orig_tfr);
@@ -662,8 +673,9 @@ static void do_checkpoints(struct clog_cpg *entry)
 	struct checkpoint_data *cp;
 
 	for (cp = entry->checkpoint_list; cp;) {
-		LOG_DBG("[%s] Checkpoint data available for node %u",
-			SHORT_UUID(entry->name.value), cp->requester);
+		LOG_COND(log_checkpoint,
+			 "[%s] Checkpoint data available for node %u",
+			 SHORT_UUID(entry->name.value), cp->requester);
 
 		/*
 		 * FIXME: Check return code.  Could send failure
@@ -672,8 +684,9 @@ static void do_checkpoints(struct clog_cpg *entry)
 		 */
 		switch (export_checkpoint(cp)) {
 		case -EEXIST:
-			LOG_DBG("[%s] Checkpoint for %u already handled",
-				SHORT_UUID(entry->name.value), cp->requester);
+			LOG_COND(log_checkpoint,
+				 "[%s] Checkpoint for %u already handled",
+				 SHORT_UUID(entry->name.value), cp->requester);
 		case 0:
 			entry->checkpoint_list = cp->next;
 			free_checkpoint(cp);
@@ -687,22 +700,105 @@ static void do_checkpoints(struct clog_cpg *entry)
 	}
 }
 
+static int resend_requests(struct clog_cpg *entry)
+{
+	int r = 0;
+	struct list_head l, *p, *n;
+	struct clog_tfr *tfr;
+
+	if (!entry->resend_requests)
+		return 0;
+
+	if (entry->resend_requests == 1) {
+		LOG_COND(log_resend_requests,
+			 "[%s]  Giving another chance before resending requests",
+			 SHORT_UUID(entry->name.value));
+		sleep(1);
+		entry->resend_requests++;
+		return 1;
+	}
+
+	entry->resend_requests = 0;
+
+	log_resp_rec=2;
+	INIT_LIST_HEAD(&l);
+	queue_remove_all(&l, cluster_queue);
+
+	list_for_each_safe(p, n, &l) {
+		list_del_init(p);
+		tfr = (struct clog_tfr *)p;
+
+		if (!strcmp(entry->name.value, tfr->uuid)) {
+			switch (tfr->request_type) {
+			case DM_CLOG_POSTSUSPEND:
+				/*
+				 * Don't resend DM_CLOG_POSTSUSPEND request, it will
+				 * be handled when we get our own config leave
+				 */
+				queue_add(tfr, cluster_queue);
+				break;
+			case DM_CLOG_RESUME:
+				/* We are only concerned about this request locally */
+			case DM_CLOG_SET_REGION_SYNC:
+				/*
+				 * Some requests simply do not need to be resent.
+				 * If it is a request that just changes log state,
+				 * then it doesn't need to be resent (everyone makes
+				 * updates).
+				 */
+				LOG_COND(log_resend_requests,
+					 "[%s] Skipping resend of %s...",
+					  SHORT_UUID(entry->name.value),
+					  RQ_TYPE(tfr->request_type));
+				tfr->data_size = 0;
+				kernel_send(tfr);
+				
+				break;
+			default:
+				/*
+				 * If an action or a response is required, then
+				 * the request must be resent.
+				 */
+				r = 1;
+				log_resp_rec++;
+				LOG_COND(log_resend_requests,
+					 "[%s] Resending %s(#%u) due to new server(%u)",
+					  SHORT_UUID(entry->name.value),
+					  RQ_TYPE(tfr->request_type),
+					  tfr->seq, entry->lowest_id);
+				queue_add(tfr, cluster_queue);
+				if (cluster_send(tfr))
+					LOG_ERROR("Failed resend");
+			}
+		}
+	}
+
+	return r;
+}
+
 static int do_cluster_work(void *data)
 {
+	int i, go_again = 1;
 	int r = SA_AIS_OK;
 	struct clog_cpg *entry, *tmp;
 
-	list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list) {
-		r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
-		if (r != SA_AIS_OK)
-			LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
+	for (i = 0; ((i < 5) && go_again); i++) {
+		go_again = 0;
+		list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list) {
+			r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
+			if (r != SA_AIS_OK)
+				LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
 
-		if (entry->free_me) {
-			free(entry);
-			continue;
+			if (entry->free_me) {
+				free(entry);
+				continue;
+			}
+			do_checkpoints(entry);
+
+			go_again = resend_requests(entry);
 		}
-		do_checkpoints(entry);
 	}
+
 	return (r == SA_AIS_OK) ? 0 : -1;  /* FIXME: good error number? */
 }
 
@@ -770,8 +866,9 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 				/* Could we retry? */
 				goto out;
 			} else if (match->state == INVALID) {
-				LOG_DBG("[%s] Checkpoint data received.  Log is now valid",
-					SHORT_UUID(match->name.value));
+				LOG_COND(log_checkpoint,
+					 "[%s] Checkpoint data received from %u.  Log is now valid",
+					 SHORT_UUID(match->name.value), nodeid);
 				match->state = VALID;
 
 				while ((startup_tfr = queue_remove(match->startup_queue))) {
@@ -835,7 +932,7 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
 
 	if (tfr->request_type & DM_CLOG_RESPONSE) {
 		response = 1;
-		r = handle_cluster_response(tfr);
+		r = handle_cluster_response_poop(tfr, nodeid);
 	} else {
 		tfr->originator = nodeid;
 
@@ -947,18 +1044,18 @@ out:
 			match->lowest_id = member_list[i].nodeid;
 
 	if (lowest == 0xDEAD)
-		LOG_DBG("[%s]  Server change <none> -> %u (%u %s)",
-			SHORT_UUID(match->name.value), match->lowest_id,
-			joined->nodeid, (member_list_entries == 1) ?
-			"is first to join" : "joined");
+		LOG_COND(log_membership_change, "[%s]  Server change <none> -> %u (%u %s)",
+			 SHORT_UUID(match->name.value), match->lowest_id,
+			 joined->nodeid, (member_list_entries == 1) ?
+			 "is first to join" : "joined");
 	else if (lowest != match->lowest_id)
-		LOG_DBG("[%s]  Server change %u -> %u (%u joined)",
-			SHORT_UUID(match->name.value), lowest,
-			match->lowest_id, joined->nodeid);
+		LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u joined)",
+			 SHORT_UUID(match->name.value), lowest,
+			 match->lowest_id, joined->nodeid);
 	else
-		LOG_DBG("[%s]  Server unchanged at %u (%u joined)",
-			SHORT_UUID(match->name.value),
-			lowest, joined->nodeid);
+		LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u joined)",
+			 SHORT_UUID(match->name.value),
+			 lowest, joined->nodeid);
 }
 
 static void cpg_leave_callback(struct clog_cpg *match,
@@ -1021,10 +1118,10 @@ static void cpg_leave_callback(struct clog_cpg *match,
 	/* Find the lowest_id, i.e. the server */
 	if (!member_list_entries) {
 		match->lowest_id = 0xDEAD;
-		LOG_DBG("[%s]  Server change %u -> <none> "
-			"(%u is last to leave)",
-			SHORT_UUID(match->name.value), left->nodeid,
-			left->nodeid);
+		LOG_COND(log_membership_change, "[%s]  Server change %u -> <none> "
+			 "(%u is last to leave)",
+			 SHORT_UUID(match->name.value), left->nodeid,
+			 left->nodeid);
 		return;
 	}
 		
@@ -1034,9 +1131,9 @@ static void cpg_leave_callback(struct clog_cpg *match,
 			match->lowest_id = member_list[i].nodeid;
 
 	if (lowest != match->lowest_id) {
-		LOG_DBG("[%s]  Server change %u -> %u (%u left)",
-			SHORT_UUID(match->name.value), lowest,
-			match->lowest_id, left->nodeid);
+		LOG_COND(log_membership_change, "[%s]  Server change %u -> %u (%u left)",
+			 SHORT_UUID(match->name.value), lowest,
+			 match->lowest_id, left->nodeid);
 
 		list_for_each_safe(p, n, &cluster_queue->list) {
 			tfr = (struct clog_tfr *)p;
@@ -1050,18 +1147,13 @@ static void cpg_leave_callback(struct clog_cpg *match,
 					 */
 					break;
 				default:
-					LOG_PRINT("[%s] Resending %s due to new server(%u -> %u)",
-						  SHORT_UUID(match->name.value),
-						  RQ_TYPE(tfr->request_type),
-						  lowest, match->lowest_id);
-					if (cluster_send(tfr))
-						LOG_ERROR("Failed resend");
+					match->resend_requests = 1;
 				}
 			}
 		}
 	} else
-		LOG_DBG("[%s]  Server unchanged at %u (%u left)",
-			SHORT_UUID(match->name.value), lowest, left->nodeid);
+		LOG_COND(log_membership_change, "[%s]  Server unchanged at %u (%u left)",
+			 SHORT_UUID(match->name.value), lowest, left->nodeid);
 
 }
 
@@ -1173,13 +1265,13 @@ static void abort_startup(struct clog_cpg *del)
 	SaCkptCheckpointHandleT h;
 	struct clog_tfr *startup_tfr = NULL;
 
-	LOG_ERROR("[%s]  CPG teardown before checkpoint received",
-		  SHORT_UUID(del->name.value));
+	LOG_DBG("[%s]  CPG teardown before checkpoint received",
+		SHORT_UUID(del->name.value));
 
 	while ((startup_tfr = queue_remove(del->startup_queue))) {
-		LOG_ERROR("[%s]  Ignoring request from %u: %s",
-			  SHORT_UUID(del->name.value), startup_tfr->originator,
-			  RQ_TYPE(startup_tfr->request_type));
+		LOG_DBG("[%s]  Ignoring request from %u: %s",
+			SHORT_UUID(del->name.value), startup_tfr->originator,
+			RQ_TYPE(startup_tfr->request_type));
 		queue_add(startup_tfr, free_queue);
 	}
 
@@ -1196,13 +1288,10 @@ open_retry:
                 goto open_retry;
         }
 
-	if (rv != SA_AIS_OK) {
-                LOG_ERROR("[%s] Failed to open checkpoint: %s",
-                          SHORT_UUID(del->name.value), str_ais_error(rv));
+	if (rv != SA_AIS_OK)
                 return;
-        }
 
-	LOG_ERROR("[%s]  Removing checkpoint", SHORT_UUID(del->name.value));
+	LOG_DBG("[%s]  Removing checkpoint", SHORT_UUID(del->name.value));
 unlink_retry:
         rv = saCkptCheckpointUnlink(ckpt_handle, &name);
         if (rv == SA_AIS_ERR_TRY_AGAIN) {
diff --git a/cmirror/src/functions.c b/cmirror/src/functions.c
index 9f6109c..10e2158 100644
--- a/cmirror/src/functions.c
+++ b/cmirror/src/functions.c
@@ -698,7 +698,7 @@ static int clog_resume(struct clog_tfr *tfr)
 
 	switch (lc->resume_override) {
 	case 1000:
-		LOG_ERROR("[%s] ERROR:: Additional resume issued before suspend",
+		LOG_ERROR("[%s] Additional resume issued before suspend",
 			  SHORT_UUID(tfr->uuid));
 		return 0;
 	case 0:
diff --git a/cmirror/src/link_mon.c b/cmirror/src/link_mon.c
index 71d2024..8799c9b 100644
--- a/cmirror/src/link_mon.c
+++ b/cmirror/src/link_mon.c
@@ -23,21 +23,16 @@ int links_register(int fd, char *name, int (*callback)(void *data), void *data)
 	int i;
 	struct link_callback *lc;
 
-	ENTER();
-
 	for (i = 0; i < used_pfds; i++) {
 		if (fd == pfds[i].fd) {
 			LOG_ERROR("links_register: Duplicate file descriptor");
-			EXIT();
 			return -EINVAL;
 		}
 	}
 
 	lc = malloc(sizeof(*lc));
-	if (!lc) {
-		EXIT();
+	if (!lc)
 		return -ENOMEM;
-	}
 
 	lc->fd = fd;
 	lc->name = name;
@@ -49,7 +44,6 @@ int links_register(int fd, char *name, int (*callback)(void *data), void *data)
 		tmp = realloc(pfds, sizeof(struct pollfd) * ((used_pfds*2) + 1));
 		if (!tmp) {
 			free(lc);
-			EXIT();
 			return -ENOMEM;
 		}
 		
@@ -69,7 +63,6 @@ int links_register(int fd, char *name, int (*callback)(void *data), void *data)
 	LOG_DBG(" used_pfds = %d, free_pfds = %d",
 		used_pfds, free_pfds);
 
-	EXIT();
 	return 0;
 }
 
diff --git a/cmirror/src/local.c b/cmirror/src/local.c
index aa5b6e7..8e7212a 100644
--- a/cmirror/src/local.c
+++ b/cmirror/src/local.c
@@ -331,7 +331,6 @@ int kernel_send(struct clog_tfr *tfr)
 	int r;
 	int size;
 
-	ENTER();
 	if (!tfr)
 		return -EINVAL;
 
@@ -360,7 +359,6 @@ int kernel_send(struct clog_tfr *tfr)
 
 	queue_add(tfr, free_queue);
 
-	EXIT();
 	return r;
 }
 
@@ -377,13 +375,9 @@ int init_local(void)
 	int opt;
 	struct sockaddr_nl addr;
 
-	ENTER();
-
 	cn_fd = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
-	if (cn_fd < 0) {
-		EXIT();
+	if (cn_fd < 0)
 		return EXIT_KERNEL_TFR_SOCKET;
-	}
 
 	/* memset to fix valgrind complaint */
 	memset(&addr, 0, sizeof(struct sockaddr_nl));
@@ -395,7 +389,6 @@ int init_local(void)
 	r = bind(cn_fd, (struct sockaddr *) &addr, sizeof(addr));
 	if (r < 0) {
 		close(cn_fd);
-		EXIT();
 		return EXIT_KERNEL_TFR_BIND;
 	}
 
@@ -403,7 +396,6 @@ int init_local(void)
 	r = setsockopt(cn_fd, 270, NETLINK_ADD_MEMBERSHIP, &opt, sizeof(opt));
 	if (r) {
 		close(cn_fd);
-		EXIT();
 		return EXIT_KERNEL_TFR_SETSOCKOPT;
 	}
 
@@ -413,7 +405,6 @@ int init_local(void)
 
 	links_register(cn_fd, "local", do_local_work, NULL);
 
-	EXIT();
 	return 0;
 }
 
diff --git a/cmirror/src/logging.c b/cmirror/src/logging.c
index 182f15d..8c327e9 100644
--- a/cmirror/src/logging.c
+++ b/cmirror/src/logging.c
@@ -2,3 +2,24 @@
 
 int log_tabbing = 0;
 int log_is_open = 0;
+
+/*
+ * Variables for various conditional logging
+ */
+#ifdef MEMB
+int log_membership_change = 1;
+#else
+int log_membership_change = 0;
+#endif
+
+#ifdef CKPT
+int log_checkpoint = 1;
+#else
+int log_checkpoint = 0;
+#endif
+
+#ifdef RESEND
+int log_resend_requests = 1;
+#else
+int log_resend_requests = 0;
+#endif
diff --git a/cmirror/src/logging.h b/cmirror/src/logging.h
index b971ae7..4d5b5e7 100644
--- a/cmirror/src/logging.h
+++ b/cmirror/src/logging.h
@@ -33,6 +33,9 @@
 
 extern int log_tabbing;
 extern int log_is_open;
+extern int log_membership_change;
+extern int log_checkpoint;
+extern int log_resend_requests;
 
 #define LOG_OPEN(ident, option, facility) do { \
 		openlog(ident, option, facility); \
@@ -63,27 +66,15 @@ extern int log_is_open;
 
 #ifdef DEBUG
 #define LOG_DBG(f, arg...) LOG_OUTPUT(LOG_DEBUG, f, ## arg)
-
-#ifdef DEBUG_FUNCTIONS
-#define ENTER(f, arg...) do {						\
-		LOG_DBG("Entering %s: " f, __FUNCTION__, ## arg);	\
-		log_tabbing++;						\
-	} while (0)
-#define EXIT(f, arg...) do {						\
-		log_tabbing--;						\
-		LOG_DBG("Exiting %s: " f, __FUNCTION__, ## arg);	\
-	} while (0)
-#else /* DEBUG_FUNCTIONS */
-#define ENTER(f, arg...)
-#define EXIT(f, arg...)
-#endif /* DEBUG_FUNCTIONS */
-
 #else /* DEBUG */
 #define LOG_DBG(f, arg...)
-#define ENTER(f, arg...)
-#define EXIT(f, arg...)
 #endif /* DEBUG */
 
+#define LOG_COND(__X, f, arg...) do {\
+		if (__X) { 	     \
+			LOG_OUTPUT(LOG_NOTICE, f, ## arg); \
+		} \
+	} while (0)
 #define LOG_PRINT(f, arg...) LOG_OUTPUT(LOG_NOTICE, f, ## arg)
 #define LOG_ERROR(f, arg...) LOG_OUTPUT(LOG_ERR, f, ## arg)
 


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]