This is the mail archive of the
cluster-cvs@sourceware.org
mailing list for the cluster.
RHEL5 - clogd: Updates to address bugs 464550 & 464934
- From: Jonathan Brassow <jbrassow at fedoraproject dot org>
- To: cluster-cvs-relay at redhat dot com
- Date: Mon, 13 Oct 2008 22:24:13 +0000 (UTC)
- Subject: RHEL5 - clogd: Updates to address bugs 464550 & 464934
Gitweb: http://git.fedorahosted.org/git/cluster.git?p=cluster.git;a=commitdiff;h=33ac113b87bc000cb801de2a9f6f2bfe2f20cf22
Commit: 33ac113b87bc000cb801de2a9f6f2bfe2f20cf22
Parent: adae80e90795c462f15ba1c09d7968afd00a7746
Author: Jonathan Brassow <jbrassow@redhat.com>
AuthorDate: Mon Oct 13 17:23:35 2008 -0500
Committer: Jonathan Brassow <jbrassow@redhat.com>
CommitterDate: Mon Oct 13 17:23:35 2008 -0500
clogd: Updates to address bugs 464550 & 464934
Allow time for exiting server responses to settle out
before resending requests.
Various debugging enhancements included as well.
---
cmirror-kernel/src/dm-clog-tfr.c | 9 --
cmirror/src/Makefile | 16 +++-
cmirror/src/cluster.c | 191 ++++++++++++++++++++++++++++----------
cmirror/src/functions.c | 2 +-
cmirror/src/link_mon.c | 9 +--
cmirror/src/local.c | 11 +--
cmirror/src/logging.c | 21 ++++
cmirror/src/logging.h | 25 ++----
8 files changed, 184 insertions(+), 100 deletions(-)
diff --git a/cmirror-kernel/src/dm-clog-tfr.c b/cmirror-kernel/src/dm-clog-tfr.c
index 15d971c..3ceb320 100644
--- a/cmirror-kernel/src/dm-clog-tfr.c
+++ b/cmirror-kernel/src/dm-clog-tfr.c
@@ -32,7 +32,6 @@ struct receiving_pkg {
struct list_head list;
struct completion complete;
- unsigned long long start_time;
uint32_t seq;
int error;
@@ -203,7 +202,6 @@ resend:
spin_lock(&receiving_list_lock);
list_add(&(pkg.list), &receiving_list);
spin_unlock(&receiving_list_lock);
- pkg.start_time = jiffies;
r = dm_clog_sendto_server(tfr);
@@ -229,13 +227,6 @@ resend:
goto resend;
}
- /* FIXME: Pull time checking code - it's just for profiling */
- pkg.start_time = (jiffies - pkg.start_time);
- do_div(pkg.start_time, HZ);
- if (pkg.start_time > 0)
- DMWARN("Excessive delay in request processing, %llu sec: [%s/%u]",
- pkg.start_time, RQ_TYPE(request_type), pkg.seq);
-
r = pkg.error;
if (r == -EAGAIN)
goto resend;
diff --git a/cmirror/src/Makefile b/cmirror/src/Makefile
index 892008b..c4e3b8b 100644
--- a/cmirror/src/Makefile
+++ b/cmirror/src/Makefile
@@ -31,15 +31,23 @@ else
TARGET=no_files
endif
-ifeq ($(DEBUG),log)
+ifneq ($(DEBUG), )
CFLAGS += -DDEBUG
endif
-CFLAGS += -g
-ifeq ($(DEBUG),all)
-CFLAGS += -DDEBUG -DDEBUG_FUNCTIONS
+ifneq ($(MEMB), )
+CFLAGS += -DMEMB
+endif
+
+ifneq ($(CKPT), )
+CFLAGS += -DCKPT
endif
+ifneq ($(RESEND), )
+CFLAGS += -DRESEND
+endif
+
+CFLAGS += -g
LDFLAGS += -L${libdir}/openais -L${libdir} -lcpg -lSaCkpt -lext2fs
all: ${TARGET}
diff --git a/cmirror/src/cluster.c b/cmirror/src/cluster.c
index 8c49dee..16289e9 100644
--- a/cmirror/src/cluster.c
+++ b/cmirror/src/cluster.c
@@ -59,6 +59,8 @@ static SaCkptHandleT ckpt_handle = 0;
static SaCkptCallbacksT callbacks = { 0, 0 };
static SaVersionT version = { 'B', 1, 1 };
+static int log_resp_rec = 0;
+
struct checkpoint_data {
uint32_t requester;
char uuid[CPG_MAX_NAME_LENGTH];
@@ -84,6 +86,7 @@ struct clog_cpg {
int state;
int cpg_state; /* FIXME: debugging */
int free_me;
+ int resend_requests;
struct queue *startup_queue;
int checkpoints_needed;
@@ -185,7 +188,7 @@ static int handle_cluster_request(struct clog_tfr *tfr, int server)
return r;
}
-static int handle_cluster_response(struct clog_tfr *tfr)
+static int handle_cluster_response_poop(struct clog_tfr *tfr, uint32_t who)
{
int r = 0;
struct clog_tfr *orig_tfr;
@@ -226,6 +229,14 @@ static int handle_cluster_response(struct clog_tfr *tfr)
return -EINVAL;
}
+ if (log_resp_rec > 0) {
+ LOG_COND(log_resend_requests,
+ "[%s] Response received to %s/#%u from %u",
+ SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+ tfr->seq, who);
+ log_resp_rec--;
+ }
+
/* FIXME: Ensure memcpy cannot explode */
memcpy(orig_tfr, tfr, sizeof(*tfr) + tfr->data_size);
r = kernel_send(orig_tfr);
@@ -662,8 +673,9 @@ static void do_checkpoints(struct clog_cpg *entry)
struct checkpoint_data *cp;
for (cp = entry->checkpoint_list; cp;) {
- LOG_DBG("[%s] Checkpoint data available for node %u",
- SHORT_UUID(entry->name.value), cp->requester);
+ LOG_COND(log_checkpoint,
+ "[%s] Checkpoint data available for node %u",
+ SHORT_UUID(entry->name.value), cp->requester);
/*
* FIXME: Check return code. Could send failure
@@ -672,8 +684,9 @@ static void do_checkpoints(struct clog_cpg *entry)
*/
switch (export_checkpoint(cp)) {
case -EEXIST:
- LOG_DBG("[%s] Checkpoint for %u already handled",
- SHORT_UUID(entry->name.value), cp->requester);
+ LOG_COND(log_checkpoint,
+ "[%s] Checkpoint for %u already handled",
+ SHORT_UUID(entry->name.value), cp->requester);
case 0:
entry->checkpoint_list = cp->next;
free_checkpoint(cp);
@@ -687,22 +700,105 @@ static void do_checkpoints(struct clog_cpg *entry)
}
}
+static int resend_requests(struct clog_cpg *entry)
+{
+ int r = 0;
+ struct list_head l, *p, *n;
+ struct clog_tfr *tfr;
+
+ if (!entry->resend_requests)
+ return 0;
+
+ if (entry->resend_requests == 1) {
+ LOG_COND(log_resend_requests,
+ "[%s] Giving another chance before resending requests",
+ SHORT_UUID(entry->name.value));
+ sleep(1);
+ entry->resend_requests++;
+ return 1;
+ }
+
+ entry->resend_requests = 0;
+
+ log_resp_rec=2;
+ INIT_LIST_HEAD(&l);
+ queue_remove_all(&l, cluster_queue);
+
+ list_for_each_safe(p, n, &l) {
+ list_del_init(p);
+ tfr = (struct clog_tfr *)p;
+
+ if (!strcmp(entry->name.value, tfr->uuid)) {
+ switch (tfr->request_type) {
+ case DM_CLOG_POSTSUSPEND:
+ /*
+ * Don't resend DM_CLOG_POSTSUSPEND request, it will
+ * be handled when we get our own config leave
+ */
+ queue_add(tfr, cluster_queue);
+ break;
+ case DM_CLOG_RESUME:
+ /* We are only concerned about this request locally */
+ case DM_CLOG_SET_REGION_SYNC:
+ /*
+ * Some requests simply do not need to be resent.
+ * If it is a request that just changes log state,
+ * then it doesn't need to be resent (everyone makes
+ * updates).
+ */
+ LOG_COND(log_resend_requests,
+ "[%s] Skipping resend of %s...",
+ SHORT_UUID(entry->name.value),
+ RQ_TYPE(tfr->request_type));
+ tfr->data_size = 0;
+ kernel_send(tfr);
+
+ break;
+ default:
+ /*
+ * If an action or a response is required, then
+ * the request must be resent.
+ */
+ r = 1;
+ log_resp_rec++;
+ LOG_COND(log_resend_requests,
+ "[%s] Resending %s(#%u) due to new server(%u)",
+ SHORT_UUID(entry->name.value),
+ RQ_TYPE(tfr->request_type),
+ tfr->seq, entry->lowest_id);
+ queue_add(tfr, cluster_queue);
+ if (cluster_send(tfr))
+ LOG_ERROR("Failed resend");
+ }
+ }
+ }
+
+ return r;
+}
+
static int do_cluster_work(void *data)
{
+ int i, go_again = 1;
int r = SA_AIS_OK;
struct clog_cpg *entry, *tmp;
- list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list) {
- r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
- if (r != SA_AIS_OK)
- LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
+ for (i = 0; ((i < 5) && go_again); i++) {
+ go_again = 0;
+ list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list) {
+ r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL);
+ if (r != SA_AIS_OK)
+ LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
- if (entry->free_me) {
- free(entry);
- continue;
+ if (entry->free_me) {
+ free(entry);
+ continue;
+ }
+ do_checkpoints(entry);
+
+ go_again = resend_requests(entry);
}
- do_checkpoints(entry);
}
+
return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */
}
@@ -770,8 +866,9 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
/* Could we retry? */
goto out;
} else if (match->state == INVALID) {
- LOG_DBG("[%s] Checkpoint data received. Log is now valid",
- SHORT_UUID(match->name.value));
+ LOG_COND(log_checkpoint,
+ "[%s] Checkpoint data received from %u. Log is now valid",
+ SHORT_UUID(match->name.value), nodeid);
match->state = VALID;
while ((startup_tfr = queue_remove(match->startup_queue))) {
@@ -835,7 +932,7 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
if (tfr->request_type & DM_CLOG_RESPONSE) {
response = 1;
- r = handle_cluster_response(tfr);
+ r = handle_cluster_response_poop(tfr, nodeid);
} else {
tfr->originator = nodeid;
@@ -947,18 +1044,18 @@ out:
match->lowest_id = member_list[i].nodeid;
if (lowest == 0xDEAD)
- LOG_DBG("[%s] Server change <none> -> %u (%u %s)",
- SHORT_UUID(match->name.value), match->lowest_id,
- joined->nodeid, (member_list_entries == 1) ?
- "is first to join" : "joined");
+ LOG_COND(log_membership_change, "[%s] Server change <none> -> %u (%u %s)",
+ SHORT_UUID(match->name.value), match->lowest_id,
+ joined->nodeid, (member_list_entries == 1) ?
+ "is first to join" : "joined");
else if (lowest != match->lowest_id)
- LOG_DBG("[%s] Server change %u -> %u (%u joined)",
- SHORT_UUID(match->name.value), lowest,
- match->lowest_id, joined->nodeid);
+ LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u joined)",
+ SHORT_UUID(match->name.value), lowest,
+ match->lowest_id, joined->nodeid);
else
- LOG_DBG("[%s] Server unchanged at %u (%u joined)",
- SHORT_UUID(match->name.value),
- lowest, joined->nodeid);
+ LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u joined)",
+ SHORT_UUID(match->name.value),
+ lowest, joined->nodeid);
}
static void cpg_leave_callback(struct clog_cpg *match,
@@ -1021,10 +1118,10 @@ static void cpg_leave_callback(struct clog_cpg *match,
/* Find the lowest_id, i.e. the server */
if (!member_list_entries) {
match->lowest_id = 0xDEAD;
- LOG_DBG("[%s] Server change %u -> <none> "
- "(%u is last to leave)",
- SHORT_UUID(match->name.value), left->nodeid,
- left->nodeid);
+ LOG_COND(log_membership_change, "[%s] Server change %u -> <none> "
+ "(%u is last to leave)",
+ SHORT_UUID(match->name.value), left->nodeid,
+ left->nodeid);
return;
}
@@ -1034,9 +1131,9 @@ static void cpg_leave_callback(struct clog_cpg *match,
match->lowest_id = member_list[i].nodeid;
if (lowest != match->lowest_id) {
- LOG_DBG("[%s] Server change %u -> %u (%u left)",
- SHORT_UUID(match->name.value), lowest,
- match->lowest_id, left->nodeid);
+ LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u left)",
+ SHORT_UUID(match->name.value), lowest,
+ match->lowest_id, left->nodeid);
list_for_each_safe(p, n, &cluster_queue->list) {
tfr = (struct clog_tfr *)p;
@@ -1050,18 +1147,13 @@ static void cpg_leave_callback(struct clog_cpg *match,
*/
break;
default:
- LOG_PRINT("[%s] Resending %s due to new server(%u -> %u)",
- SHORT_UUID(match->name.value),
- RQ_TYPE(tfr->request_type),
- lowest, match->lowest_id);
- if (cluster_send(tfr))
- LOG_ERROR("Failed resend");
+ match->resend_requests = 1;
}
}
}
} else
- LOG_DBG("[%s] Server unchanged at %u (%u left)",
- SHORT_UUID(match->name.value), lowest, left->nodeid);
+ LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u left)",
+ SHORT_UUID(match->name.value), lowest, left->nodeid);
}
@@ -1173,13 +1265,13 @@ static void abort_startup(struct clog_cpg *del)
SaCkptCheckpointHandleT h;
struct clog_tfr *startup_tfr = NULL;
- LOG_ERROR("[%s] CPG teardown before checkpoint received",
- SHORT_UUID(del->name.value));
+ LOG_DBG("[%s] CPG teardown before checkpoint received",
+ SHORT_UUID(del->name.value));
while ((startup_tfr = queue_remove(del->startup_queue))) {
- LOG_ERROR("[%s] Ignoring request from %u: %s",
- SHORT_UUID(del->name.value), startup_tfr->originator,
- RQ_TYPE(startup_tfr->request_type));
+ LOG_DBG("[%s] Ignoring request from %u: %s",
+ SHORT_UUID(del->name.value), startup_tfr->originator,
+ RQ_TYPE(startup_tfr->request_type));
queue_add(startup_tfr, free_queue);
}
@@ -1196,13 +1288,10 @@ open_retry:
goto open_retry;
}
- if (rv != SA_AIS_OK) {
- LOG_ERROR("[%s] Failed to open checkpoint: %s",
- SHORT_UUID(del->name.value), str_ais_error(rv));
+ if (rv != SA_AIS_OK)
return;
- }
- LOG_ERROR("[%s] Removing checkpoint", SHORT_UUID(del->name.value));
+ LOG_DBG("[%s] Removing checkpoint", SHORT_UUID(del->name.value));
unlink_retry:
rv = saCkptCheckpointUnlink(ckpt_handle, &name);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
diff --git a/cmirror/src/functions.c b/cmirror/src/functions.c
index 9f6109c..10e2158 100644
--- a/cmirror/src/functions.c
+++ b/cmirror/src/functions.c
@@ -698,7 +698,7 @@ static int clog_resume(struct clog_tfr *tfr)
switch (lc->resume_override) {
case 1000:
- LOG_ERROR("[%s] ERROR:: Additional resume issued before suspend",
+ LOG_ERROR("[%s] Additional resume issued before suspend",
SHORT_UUID(tfr->uuid));
return 0;
case 0:
diff --git a/cmirror/src/link_mon.c b/cmirror/src/link_mon.c
index 71d2024..8799c9b 100644
--- a/cmirror/src/link_mon.c
+++ b/cmirror/src/link_mon.c
@@ -23,21 +23,16 @@ int links_register(int fd, char *name, int (*callback)(void *data), void *data)
int i;
struct link_callback *lc;
- ENTER();
-
for (i = 0; i < used_pfds; i++) {
if (fd == pfds[i].fd) {
LOG_ERROR("links_register: Duplicate file descriptor");
- EXIT();
return -EINVAL;
}
}
lc = malloc(sizeof(*lc));
- if (!lc) {
- EXIT();
+ if (!lc)
return -ENOMEM;
- }
lc->fd = fd;
lc->name = name;
@@ -49,7 +44,6 @@ int links_register(int fd, char *name, int (*callback)(void *data), void *data)
tmp = realloc(pfds, sizeof(struct pollfd) * ((used_pfds*2) + 1));
if (!tmp) {
free(lc);
- EXIT();
return -ENOMEM;
}
@@ -69,7 +63,6 @@ int links_register(int fd, char *name, int (*callback)(void *data), void *data)
LOG_DBG(" used_pfds = %d, free_pfds = %d",
used_pfds, free_pfds);
- EXIT();
return 0;
}
diff --git a/cmirror/src/local.c b/cmirror/src/local.c
index aa5b6e7..8e7212a 100644
--- a/cmirror/src/local.c
+++ b/cmirror/src/local.c
@@ -331,7 +331,6 @@ int kernel_send(struct clog_tfr *tfr)
int r;
int size;
- ENTER();
if (!tfr)
return -EINVAL;
@@ -360,7 +359,6 @@ int kernel_send(struct clog_tfr *tfr)
queue_add(tfr, free_queue);
- EXIT();
return r;
}
@@ -377,13 +375,9 @@ int init_local(void)
int opt;
struct sockaddr_nl addr;
- ENTER();
-
cn_fd = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
- if (cn_fd < 0) {
- EXIT();
+ if (cn_fd < 0)
return EXIT_KERNEL_TFR_SOCKET;
- }
/* memset to fix valgrind complaint */
memset(&addr, 0, sizeof(struct sockaddr_nl));
@@ -395,7 +389,6 @@ int init_local(void)
r = bind(cn_fd, (struct sockaddr *) &addr, sizeof(addr));
if (r < 0) {
close(cn_fd);
- EXIT();
return EXIT_KERNEL_TFR_BIND;
}
@@ -403,7 +396,6 @@ int init_local(void)
r = setsockopt(cn_fd, 270, NETLINK_ADD_MEMBERSHIP, &opt, sizeof(opt));
if (r) {
close(cn_fd);
- EXIT();
return EXIT_KERNEL_TFR_SETSOCKOPT;
}
@@ -413,7 +405,6 @@ int init_local(void)
links_register(cn_fd, "local", do_local_work, NULL);
- EXIT();
return 0;
}
diff --git a/cmirror/src/logging.c b/cmirror/src/logging.c
index 182f15d..8c327e9 100644
--- a/cmirror/src/logging.c
+++ b/cmirror/src/logging.c
@@ -2,3 +2,24 @@
int log_tabbing = 0;
int log_is_open = 0;
+
+/*
+ * Variables for various conditional logging
+ */
+#ifdef MEMB
+int log_membership_change = 1;
+#else
+int log_membership_change = 0;
+#endif
+
+#ifdef CKPT
+int log_checkpoint = 1;
+#else
+int log_checkpoint = 0;
+#endif
+
+#ifdef RESEND
+int log_resend_requests = 1;
+#else
+int log_resend_requests = 0;
+#endif
diff --git a/cmirror/src/logging.h b/cmirror/src/logging.h
index b971ae7..4d5b5e7 100644
--- a/cmirror/src/logging.h
+++ b/cmirror/src/logging.h
@@ -33,6 +33,9 @@
extern int log_tabbing;
extern int log_is_open;
+extern int log_membership_change;
+extern int log_checkpoint;
+extern int log_resend_requests;
#define LOG_OPEN(ident, option, facility) do { \
openlog(ident, option, facility); \
@@ -63,27 +66,15 @@ extern int log_is_open;
#ifdef DEBUG
#define LOG_DBG(f, arg...) LOG_OUTPUT(LOG_DEBUG, f, ## arg)
-
-#ifdef DEBUG_FUNCTIONS
-#define ENTER(f, arg...) do { \
- LOG_DBG("Entering %s: " f, __FUNCTION__, ## arg); \
- log_tabbing++; \
- } while (0)
-#define EXIT(f, arg...) do { \
- log_tabbing--; \
- LOG_DBG("Exiting %s: " f, __FUNCTION__, ## arg); \
- } while (0)
-#else /* DEBUG_FUNCTIONS */
-#define ENTER(f, arg...)
-#define EXIT(f, arg...)
-#endif /* DEBUG_FUNCTIONS */
-
#else /* DEBUG */
#define LOG_DBG(f, arg...)
-#define ENTER(f, arg...)
-#define EXIT(f, arg...)
#endif /* DEBUG */
+#define LOG_COND(__X, f, arg...) do {\
+ if (__X) { \
+ LOG_OUTPUT(LOG_NOTICE, f, ## arg); \
+ } \
+ } while (0)
#define LOG_PRINT(f, arg...) LOG_OUTPUT(LOG_NOTICE, f, ## arg)
#define LOG_ERROR(f, arg...) LOG_OUTPUT(LOG_ERR, f, ## arg)