This is the mail archive of the
cluster-cvs@sourceware.org
mailing list for the cluster.
RHEL5 - clogd: Additional fixes for bug 460156 and 464550
- From: Jonathan Brassow <jbrassow at fedoraproject dot org>
- To: cluster-cvs-relay at redhat dot com
- Date: Mon, 20 Oct 2008 17:29:05 +0000 (UTC)
- Subject: RHEL5 - clogd: Additional fixes for bug 460156 and 464550
Gitweb: http://git.fedorahosted.org/git/cluster.git?p=cluster.git;a=commitdiff;h=e07369b28d7a569e742d80152ef10c9d42bc2650
Commit: e07369b28d7a569e742d80152ef10c9d42bc2650
Parent: 6714b7fe4d3fb1aca04e2996e9b0763c8be2814e
Author: Jonathan Brassow <jbrassow@redhat.com>
AuthorDate: Mon Oct 20 12:27:08 2008 -0500
Committer: Jonathan Brassow <jbrassow@redhat.com>
CommitterDate: Mon Oct 20 12:27:54 2008 -0500
clogd: Additional fixes for bug 460156 and 464550
- Fixed a checkpoint ordering issue, where a checkpoint could be populated
with data from the wrong moment in time.
- Fixed problem where resending requests when a server leaves would include
requests that should have been recieved after the server had gone (so no
resend should have been necessary).
---
cmirror-kernel/src/dm-clog.c | 3 +
cmirror/src/clogd.c | 2 +-
cmirror/src/cluster.c | 561 +++++++++++++++++++++++++++++-------------
3 files changed, 390 insertions(+), 176 deletions(-)
diff --git a/cmirror-kernel/src/dm-clog.c b/cmirror-kernel/src/dm-clog.c
index f21823e..626f1c8 100644
--- a/cmirror-kernel/src/dm-clog.c
+++ b/cmirror-kernel/src/dm-clog.c
@@ -67,6 +67,9 @@ retry:
r = dm_clog_consult_server(uuid, request_type, data,
data_size, rdata, rdata_size);
+ if (r)
+ DMERR("cluster_do_request error: %d", r);
+
if (r != -ESRCH)
return r;
diff --git a/cmirror/src/clogd.c b/cmirror/src/clogd.c
index 0fa00ec..99543ba 100644
--- a/cmirror/src/clogd.c
+++ b/cmirror/src/clogd.c
@@ -43,7 +43,7 @@ int main(int argc, char *argv[])
/* Parent can now exit, we're ready to handle requests */
kill(getppid(), SIGTERM);
- set_priority();
+ /* set_priority(); -- let's try to do w/o this */
LOG_PRINT("Starting clogd:");
LOG_PRINT(" Built: "__DATE__" "__TIME__"\n");
diff --git a/cmirror/src/cluster.c b/cmirror/src/cluster.c
index 16289e9..24c2540 100644
--- a/cmirror/src/cluster.c
+++ b/cmirror/src/cluster.c
@@ -52,13 +52,25 @@
#define DM_CLOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
#define DM_CLOG_CHECKPOINT_READY 21
-#define DM_CLOG_CONFIG_CHANGE 22
+#define DM_CLOG_MEMBER_JOIN 22
+#define DM_CLOG_MEMBER_LEAVE 23
+
+#define _RQ_TYPE(x) \
+ ((x) == DM_CLOG_CHECKPOINT_READY) ? "DM_CLOG_CHECKPOINT_READY": \
+ ((x) == DM_CLOG_MEMBER_JOIN) ? "DM_CLOG_MEMBER_JOIN": \
+ ((x) == DM_CLOG_MEMBER_LEAVE) ? "DM_CLOG_MEMBER_LEAVE": \
+ RQ_TYPE((x) & ~DM_CLOG_RESPONSE)
+
static uint32_t my_cluster_id = 0xDEAD;
static SaCkptHandleT ckpt_handle = 0;
static SaCkptCallbacksT callbacks = { 0, 0 };
static SaVersionT version = { 'B', 1, 1 };
+#define DEBUGGING_HISTORY 20
+static char debugging[DEBUGGING_HISTORY][128];
+static int idx = 0;
+
static int log_resp_rec = 0;
struct checkpoint_data {
@@ -87,7 +99,10 @@ struct clog_cpg {
int cpg_state; /* FIXME: debugging */
int free_me;
int resend_requests;
+ int delay;
+ struct queue *delay_queue;
struct queue *startup_queue;
+ struct queue *cluster_queue;
int checkpoints_needed;
uint32_t checkpoint_requesters[10];
@@ -126,11 +141,23 @@ int cluster_send(struct clog_tfr *tfr)
iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size;
if (entry->cpg_state != VALID) {
- LOG_ERROR("[%s] Attempt to send request to cluster while CPG not valid: "
- "request = %s", SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type));
+ LOG_ERROR("[%s] Attempt to send %s to cluster while CPG not valid: %s",
+ SHORT_UUID(tfr->uuid), (tfr->request_type & DM_CLOG_RESPONSE) ?
+ "response" : "request",
+ _RQ_TYPE(tfr->request_type));
return -EINVAL;
}
+ if ((tfr->request_type != DM_CLOG_CHECKPOINT_READY) &&
+ (tfr->request_type != DM_CLOG_MEMBER_LEAVE) &&
+ (entry->delay || !queue_empty(entry->delay_queue))) {
+ LOG_COND(log_resend_requests, "[%s] Delaying request%s, %s/%u",
+ SHORT_UUID(tfr->uuid), queue_empty(entry->delay_queue) ? "" : "*",
+ _RQ_TYPE(tfr->request_type), tfr->seq);
+ queue_add_tail(tfr, entry->delay_queue);
+ return 0;
+ }
+
do {
r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
if ((r != SA_AIS_ERR_TRY_AGAIN) || (count++ > 5))
@@ -158,7 +185,8 @@ static int clog_tfr_cmp(struct clog_tfr *a, struct clog_tfr *b)
return r;
}
-static int handle_cluster_request(struct clog_tfr *tfr, int server)
+static int handle_cluster_request(struct clog_cpg *entry,
+ struct clog_tfr *tfr, int server)
{
int r = 0;
@@ -173,8 +201,7 @@ static int handle_cluster_request(struct clog_tfr *tfr, int server)
(tfr->originator == my_cluster_id))
r = do_request(tfr, server);
- if (server &&
- (tfr->request_type != DM_CLOG_CLEAR_REGION)) {
+ if (server && (tfr->request_type != DM_CLOG_CLEAR_REGION)) {
tfr->request_type |= DM_CLOG_RESPONSE;
/*
@@ -188,7 +215,8 @@ static int handle_cluster_request(struct clog_tfr *tfr, int server)
return r;
}
-static int handle_cluster_response_poop(struct clog_tfr *tfr, uint32_t who)
+static int handle_cluster_response_poop(struct clog_cpg *entry,
+ struct clog_tfr *tfr, uint32_t who)
{
int r = 0;
struct clog_tfr *orig_tfr;
@@ -200,7 +228,7 @@ static int handle_cluster_response_poop(struct clog_tfr *tfr, uint32_t who)
return 0;
tfr->request_type &= ~DM_CLOG_RESPONSE;
- orig_tfr = queue_remove_match(cluster_queue, clog_tfr_cmp, tfr);
+ orig_tfr = queue_remove_match(entry->cluster_queue, clog_tfr_cmp, tfr);
if (!orig_tfr) {
struct list_head l, *p, *n;
@@ -208,31 +236,47 @@ static int handle_cluster_response_poop(struct clog_tfr *tfr, uint32_t who)
/* Unable to find match for response */
- LOG_ERROR("[%s] No match for cluster response: %s:%u",
- SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
- tfr->seq);
+ LOG_ERROR("[%s] No match for cluster response from %u: %s:%u",
+ SHORT_UUID(tfr->uuid), who,
+ _RQ_TYPE(tfr->request_type), tfr->seq);
INIT_LIST_HEAD(&l);
queue_remove_all(&l, cluster_queue);
- LOG_ERROR("Current list:");
+ LOG_ERROR("Current global list:");
if (list_empty(&l))
LOG_ERROR(" [none]");
+
list_for_each_safe(p, n, &l) {
list_del_init(p);
t = (struct clog_tfr *)p;
LOG_ERROR(" [%s] %s:%u", SHORT_UUID(t->uuid),
- RQ_TYPE(t->request_type),
+ _RQ_TYPE(t->request_type),
t->seq);
queue_add(t, cluster_queue);
}
+ INIT_LIST_HEAD(&l);
+ queue_remove_all(&l, entry->cluster_queue);
+ LOG_ERROR("Current local list:");
+ if (list_empty(&l))
+ LOG_ERROR(" [none]");
+
+ list_for_each_safe(p, n, &l) {
+ list_del_init(p);
+ t = (struct clog_tfr *)p;
+ LOG_ERROR(" [%s] %s:%u", SHORT_UUID(t->uuid),
+ _RQ_TYPE(t->request_type),
+ t->seq);
+ queue_add(t, entry->cluster_queue);
+ }
+
return -EINVAL;
}
if (log_resp_rec > 0) {
LOG_COND(log_resend_requests,
"[%s] Response received to %s/#%u from %u",
- SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+ SHORT_UUID(tfr->uuid), _RQ_TYPE(tfr->request_type),
tfr->seq, who);
log_resp_rec--;
}
@@ -273,6 +317,16 @@ static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
int r;
struct checkpoint_data *new;
+ if (entry->state != VALID) {
+ /*
+ * We can't store bitmaps yet, because the log is not
+ * valid yet.
+ */
+ LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
+ cp_requester);
+ return NULL;
+ }
+
new = malloc(sizeof(*new));
if (!new) {
LOG_ERROR("Unable to create checkpoint data for %u",
@@ -283,47 +337,37 @@ static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
new->requester = cp_requester;
strncpy(new->uuid, entry->name.value, entry->name.length);
- if (entry->state == VALID) {
- new->bitmap_size = push_state(entry->name.value, "clean_bits",
- &new->clean_bits);
- if (new->bitmap_size <= 0) {
- LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
- new->requester);
- free(new);
- return NULL;
- }
+ new->bitmap_size = push_state(entry->name.value, "clean_bits",
+ &new->clean_bits);
+ if (new->bitmap_size <= 0) {
+ LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
+ new->requester);
+ free(new);
+ return NULL;
+ }
- new->bitmap_size = push_state(entry->name.value,
- "sync_bits", &new->sync_bits);
- if (new->bitmap_size <= 0) {
- LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
- new->requester);
- free(new->clean_bits);
- free(new);
- return NULL;
- }
+ new->bitmap_size = push_state(entry->name.value,
+ "sync_bits", &new->sync_bits);
+ if (new->bitmap_size <= 0) {
+ LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
+ new->requester);
+ free(new->clean_bits);
+ free(new);
+ return NULL;
+ }
- r = push_state(entry->name.value, "recovering_region", &new->recovering_region);
- if (r <= 0) {
- LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
- new->requester);
- free(new->sync_bits);
- free(new->clean_bits);
- free(new);
- return NULL;
- }
- LOG_DBG("[%s] Checkpoint prepared for node %u:",
- SHORT_UUID(new->uuid), new->requester);
- LOG_DBG(" bitmap_size = %d", new->bitmap_size);
- } else {
- /*
- * We can't store bitmaps yet, because the log is not
- * valid yet.
- */
- LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
+ r = push_state(entry->name.value, "recovering_region", &new->recovering_region);
+ if (r <= 0) {
+ LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
new->requester);
- new->bitmap_size = 0;
+ free(new->sync_bits);
+ free(new->clean_bits);
+ free(new);
+ return NULL;
}
+ LOG_DBG("[%s] Checkpoint prepared for node %u:",
+ SHORT_UUID(new->uuid), new->requester);
+ LOG_DBG(" bitmap_size = %d", new->bitmap_size);
return new;
}
@@ -703,76 +747,88 @@ static void do_checkpoints(struct clog_cpg *entry)
static int resend_requests(struct clog_cpg *entry)
{
int r = 0;
- struct list_head l, *p, *n;
+ struct list_head resend, delay, *p, *n;
struct clog_tfr *tfr;
- if (!entry->resend_requests)
+ if (entry->delay)
return 0;
- if (entry->resend_requests == 1) {
- LOG_COND(log_resend_requests,
- "[%s] Giving another chance before resending requests",
- SHORT_UUID(entry->name.value));
- sleep(1);
- entry->resend_requests++;
- return 1;
- }
+ INIT_LIST_HEAD(&resend);
+ INIT_LIST_HEAD(&delay);
+ queue_remove_all(&delay, entry->delay_queue);
- entry->resend_requests = 0;
+ if (entry->resend_requests) {
+ entry->resend_requests = 0;
- log_resp_rec=2;
- INIT_LIST_HEAD(&l);
- queue_remove_all(&l, cluster_queue);
+ log_resp_rec = 0;
+ queue_remove_all(&resend, entry->cluster_queue);
- list_for_each_safe(p, n, &l) {
- list_del_init(p);
- tfr = (struct clog_tfr *)p;
+ list_for_each_safe(p, n, &resend) {
+ list_del_init(p);
+ tfr = (struct clog_tfr *)p;
- if (!strcmp(entry->name.value, tfr->uuid)) {
- switch (tfr->request_type) {
- case DM_CLOG_POSTSUSPEND:
- /*
- * Don't resend DM_CLOG_POSTSUSPEND request, it will
- * be handled when we get our own config leave
- */
- queue_add(tfr, cluster_queue);
- break;
- case DM_CLOG_RESUME:
- /* We are only concerned about this request locally */
- case DM_CLOG_SET_REGION_SYNC:
- /*
- * Some requests simply do not need to be resent.
- * If it is a request that just changes log state,
- * then it doesn't need to be resent (everyone makes
- * updates).
- */
- LOG_COND(log_resend_requests,
- "[%s] Skipping resend of %s...",
- SHORT_UUID(entry->name.value),
- RQ_TYPE(tfr->request_type));
- tfr->data_size = 0;
- kernel_send(tfr);
+ if (!strcmp(entry->name.value, tfr->uuid)) {
+ switch (tfr->request_type) {
+ case DM_CLOG_POSTSUSPEND:
+ /*
+ * Don't resend DM_CLOG_POSTSUSPEND request, it will
+ * be handled when we get our own config leave
+ */
+ queue_add_tail(tfr, cluster_queue);
+ break;
+ case DM_CLOG_RESUME:
+ /* We are only concerned about this request locally */
+ case DM_CLOG_SET_REGION_SYNC:
+ /*
+ * Some requests simply do not need to be resent.
+ * If it is a request that just changes log state,
+ * then it doesn't need to be resent (everyone makes
+ * updates).
+ */
+ LOG_COND(log_resend_requests,
+ "[%s] Skipping resend of %s/#%u...",
+ SHORT_UUID(entry->name.value),
+ _RQ_TYPE(tfr->request_type), tfr->seq);
+ tfr->data_size = 0;
+ kernel_send(tfr);
- break;
- default:
- /*
- * If an action or a response is required, then
- * the request must be resent.
- */
- r = 1;
- log_resp_rec++;
- LOG_COND(log_resend_requests,
- "[%s] Resending %s(#%u) due to new server(%u)",
- SHORT_UUID(entry->name.value),
- RQ_TYPE(tfr->request_type),
- tfr->seq, entry->lowest_id);
- queue_add(tfr, cluster_queue);
- if (cluster_send(tfr))
- LOG_ERROR("Failed resend");
+ break;
+ default:
+ /*
+ * If an action or a response is required, then
+ * the request must be resent.
+ */
+ r = 1;
+ log_resp_rec++;
+ LOG_COND(log_resend_requests,
+ "[%s] Resending %s(#%u) due to new server(%u)",
+ SHORT_UUID(entry->name.value),
+ _RQ_TYPE(tfr->request_type),
+ tfr->seq, entry->lowest_id);
+ queue_add_tail(tfr, cluster_queue);
+ if (cluster_send(tfr))
+ LOG_ERROR("Failed resend");
+ }
}
}
}
+ list_for_each_safe(p, n, &delay) {
+ list_del_init(p);
+ tfr = (struct clog_tfr *)p;
+
+ LOG_COND(log_resend_requests,
+ "[%s] Sending delayed request, %s/#%u",
+ SHORT_UUID(entry->name.value),
+ _RQ_TYPE(tfr->request_type),
+ tfr->seq);
+ queue_add_tail(tfr, cluster_queue);
+
+ if ((tfr->request_type != DM_CLOG_POSTSUSPEND) &&
+ cluster_send(tfr))
+ LOG_ERROR("Failed resend");
+ }
+
return r;
}
@@ -790,6 +846,9 @@ static int do_cluster_work(void *data)
LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r));
if (entry->free_me) {
+ free(entry->cluster_queue);
+ free(entry->delay_queue);
+ free(entry->startup_queue);
free(entry);
continue;
}
@@ -821,17 +880,17 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
if (tfr->request_type & DM_CLOG_RESPONSE)
LOG_DBG("Response to %u from %u received [%s/%u]",
tfr->originator, nodeid,
- RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
+ _RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
tfr->seq);
else
LOG_DBG("Request from %u received [%s/%u]",
- nodeid, RQ_TYPE(tfr->request_type),
+ nodeid, _RQ_TYPE(tfr->request_type),
tfr->seq);
if (my_cluster_id == 0xDEAD) {
LOG_ERROR("[%s] Message from %u before init [%s/%u]",
SHORT_UUID(tfr->uuid), nodeid,
- RQ_TYPE(tfr->request_type), tfr->seq);
+ _RQ_TYPE(tfr->request_type), tfr->seq);
return;
}
@@ -844,10 +903,61 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
if (match->lowest_id == 0xDEAD) {
LOG_ERROR("[%s] Message from %u before init* [%s/%u]",
SHORT_UUID(tfr->uuid), nodeid,
- RQ_TYPE(tfr->request_type), tfr->seq);
+ _RQ_TYPE(tfr->request_type), tfr->seq);
return;
}
+ if (nodeid == my_cluster_id) {
+ struct clog_tfr *orig_tfr;
+
+ orig_tfr = queue_remove_match(cluster_queue, clog_tfr_cmp, tfr);
+ if (orig_tfr)
+ queue_add_tail(orig_tfr, match->cluster_queue);
+ }
+
+ if (tfr->request_type == DM_CLOG_MEMBER_LEAVE) {
+ /*
+ * If the server (lowest_id) indicates it is leaving,
+ * then we must resend any outstanding requests. However,
+ * we do not want to resend them if the next server in
+ * line is in the process of leaving.
+ */
+ if (nodeid == my_cluster_id)
+ LOG_COND(log_resend_requests, "[%s] I am leaving (2).....",
+ SHORT_UUID(tfr->uuid));
+
+ if ((nodeid == match->lowest_id) &&
+ (nodeid != my_cluster_id)) {
+ struct list_head l, *p, *n;
+ struct clog_tfr *t;
+
+ log_resp_rec++;
+
+ match->resend_requests = 1;
+ LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
+ SHORT_UUID(tfr->uuid), nodeid,
+ (queue_empty(match->cluster_queue)) ? " -- cluster_queue empty": "");
+
+ INIT_LIST_HEAD(&l);
+ queue_remove_all(&l, match->cluster_queue);
+ list_for_each_safe(p, n, &l) {
+ list_del_init(p);
+ t = (struct clog_tfr *)p;
+
+ LOG_COND(log_resend_requests,
+ "[%s] %s/%u",
+ SHORT_UUID(t->uuid), _RQ_TYPE(t->request_type), t->seq);
+ queue_add_tail(t, match->cluster_queue);
+ }
+ }
+ if (nodeid < my_cluster_id) {
+ match->delay++;
+ LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
+ SHORT_UUID(tfr->uuid), nodeid, match->delay);
+ }
+ goto out;
+ }
+
/*
* We can receive messages after we do a cpg_leave but before we
* get our config callback. However, since we can't respond after
@@ -872,7 +982,7 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
match->state = VALID;
while ((startup_tfr = queue_remove(match->startup_queue))) {
- if (startup_tfr->request_type == DM_CLOG_CONFIG_CHANGE) {
+ if (startup_tfr->request_type == DM_CLOG_MEMBER_JOIN) {
struct checkpoint_data *new;
new = prepare_checkpoint(match, startup_tfr->originator);
@@ -882,13 +992,15 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
startup_tfr->originator);
goto out;
}
+ LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
+ SHORT_UUID(tfr->uuid), startup_tfr->originator);
} else {
LOG_DBG("Processing delayed request %d: %s",
match->startup_queue->count,
- RQ_TYPE(startup_tfr->request_type));
+ _RQ_TYPE(startup_tfr->request_type));
i_was_server = (startup_tfr->error == my_cluster_id) ? 1 : 0;
startup_tfr->error = 0;
- r = handle_cluster_request(startup_tfr, i_was_server);
+ r = handle_cluster_request(match, startup_tfr, i_was_server);
if (r) {
LOG_ERROR("Error while processing delayed CPG message");
@@ -909,36 +1021,34 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
/*
* If the log is now valid, we can queue the checkpoints
*/
- for (i = match->checkpoints_needed; i;) {
+ for (i = match->checkpoints_needed; i; ) {
struct checkpoint_data *new;
i--;
- if (log_get_state(tfr) != LOG_RESUMED) {
- LOG_DBG("Skipping checkpoint for %u, my log is not ready",
- match->checkpoint_requesters[i]);
- continue;
- }
new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
if (!new) {
/* FIXME: Need better error handling */
- LOG_ERROR("Failed to prepare checkpoint for %u!!!",
- match->checkpoint_requesters[i]);
+ LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
+ SHORT_UUID(tfr->uuid), match->checkpoint_requesters[i]);
break;
}
- match->checkpoints_needed = i;
+ LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
+ SHORT_UUID(tfr->uuid), match->checkpoint_requesters[i]);
+ match->checkpoints_needed--;
+
new->next = match->checkpoint_list;
match->checkpoint_list = new;
- }
+ }
if (tfr->request_type & DM_CLOG_RESPONSE) {
response = 1;
- r = handle_cluster_response_poop(tfr, nodeid);
+ r = handle_cluster_response_poop(match, tfr, nodeid);
} else {
tfr->originator = nodeid;
if (match->state == LEAVING) {
LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving",
- SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
+ SHORT_UUID(tfr->uuid), _RQ_TYPE(tfr->request_type),
tfr->originator);
goto out;
}
@@ -964,14 +1074,14 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
goto out;
}
- r = handle_cluster_request(tfr, i_am_server);
+ r = handle_cluster_request(match, tfr, i_am_server);
}
out:
if (r) {
LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
SHORT_UUID(tfr->uuid),
- RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
+ _RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
strerror(-r));
LOG_ERROR("[%s] Response : %s", SHORT_UUID(tfr->uuid),
(response) ? "YES" : "NO");
@@ -980,6 +1090,28 @@ out:
if (response)
LOG_ERROR("[%s] Responder : %u",
SHORT_UUID(tfr->uuid), nodeid);
+
+ LOG_ERROR("HISTORY::");
+ for (i = 0; i < DEBUGGING_HISTORY; i++) {
+ idx++;
+ idx = idx % DEBUGGING_HISTORY;
+ if (debugging[idx][0] == '\0')
+ continue;
+ LOG_ERROR("%d:%d) %s", i, idx, debugging[idx]);
+ }
+ } else if (!(tfr->request_type & DM_CLOG_RESPONSE) ||
+ (tfr->originator == my_cluster_id)) {
+ int len;
+ idx++;
+ idx = idx % DEBUGGING_HISTORY;
+ len = sprintf(debugging[idx],
+ "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
+ tfr->seq,
+ SHORT_UUID(tfr->uuid),
+ _RQ_TYPE(tfr->request_type),
+ tfr->originator, (response) ? "YES" : "NO");
+ if (response)
+ sprintf(debugging[idx] + len, ", RSPR=%u", nodeid);
}
}
@@ -993,6 +1125,13 @@ static void cpg_join_callback(struct clog_cpg *match,
uint32_t lowest = match->lowest_id;
struct clog_tfr *tfr;
+ {
+ idx++;
+ idx = idx % DEBUGGING_HISTORY;
+ sprintf(debugging[idx], "+++ UUID=%s %u join +++",
+ SHORT_UUID(match->name.value), joined->nodeid);
+ }
+
/* Assign my_cluster_id */
if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
my_cluster_id = joined->nodeid;
@@ -1007,13 +1146,14 @@ static void cpg_join_callback(struct clog_cpg *match,
if (joined->nodeid == my_cluster_id)
goto out;
- LOG_DBG("Joining node, %u needs checkpoint", joined->nodeid);
+ LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint",
+ SHORT_UUID(match->name.value), joined->nodeid);
/*
* FIXME: remove checkpoint_requesters/checkpoints_needed, and use
* the startup_queue interface exclusively
*/
- if (queue_empty(match->startup_queue)) {
+ if (queue_empty(match->startup_queue) && (match->state == VALID)) {
match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
goto out;
}
@@ -1031,11 +1171,10 @@ static void cpg_join_callback(struct clog_cpg *match,
goto out;
}
}
- tfr->request_type = DM_CLOG_CONFIG_CHANGE;
+ tfr->request_type = DM_CLOG_MEMBER_JOIN;
tfr->originator = joined->nodeid;
queue_add_tail(tfr, match->startup_queue);
-
out:
/* Find the lowest_id, i.e. the server */
match->lowest_id = member_list[0].nodeid;
@@ -1068,7 +1207,12 @@ static void cpg_leave_callback(struct clog_cpg *match,
uint32_t lowest = match->lowest_id;
struct clog_tfr *tfr;
- INIT_LIST_HEAD(&l);
+ {
+ idx++;
+ idx = idx % DEBUGGING_HISTORY;
+ sprintf(debugging[idx], "--- UUID=%s %u left ---",
+ SHORT_UUID(match->name.value), left->nodeid);
+ }
/* Am I leaving? */
if (my_cluster_id == left->nodeid) {
@@ -1080,8 +1224,8 @@ static void cpg_leave_callback(struct clog_cpg *match,
cluster_postsuspend(match->name.value);
+ INIT_LIST_HEAD(&l);
queue_remove_all(&l, cluster_queue);
-
list_for_each_safe(p, n, &l) {
list_del_init(p);
tfr = (struct clog_tfr *)p;
@@ -1098,6 +1242,18 @@ static void cpg_leave_callback(struct clog_cpg *match,
queue_add(tfr, free_queue);
}
+ INIT_LIST_HEAD(&l);
+ queue_remove_all(&l, match->cluster_queue);
+ list_for_each_safe(p, n, &l) {
+ list_del_init(p);
+ tfr = (struct clog_tfr *)p;
+
+ if (tfr->request_type == DM_CLOG_POSTSUSPEND)
+ kernel_send(tfr);
+ else
+ queue_add(tfr, free_queue);
+ }
+
cpg_finalize(match->handle);
if (match->startup_queue->count) {
@@ -1109,12 +1265,20 @@ static void cpg_leave_callback(struct clog_cpg *match,
}
}
- free(match->startup_queue);
match->free_me = 1;
match->lowest_id = 0xDEAD;
match->state = INVALID;
}
+ if (left->nodeid < my_cluster_id) {
+ match->delay = (match->delay > 0) ? match->delay - 1 : 0;
+ if (!match->delay && queue_empty(match->cluster_queue))
+ match->resend_requests = 0;
+ LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s",
+ SHORT_UUID(match->name.value), left->nodeid,
+ match->delay, (queue_empty(match->cluster_queue)) ? " -- cluster_queue empty": "");
+ }
+
/* Find the lowest_id, i.e. the server */
if (!member_list_entries) {
match->lowest_id = 0xDEAD;
@@ -1134,23 +1298,6 @@ static void cpg_leave_callback(struct clog_cpg *match,
LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u left)",
SHORT_UUID(match->name.value), lowest,
match->lowest_id, left->nodeid);
-
- list_for_each_safe(p, n, &cluster_queue->list) {
- tfr = (struct clog_tfr *)p;
-
- if (!strcmp(match->name.value, tfr->uuid)) {
- switch (tfr->request_type) {
- case DM_CLOG_POSTSUSPEND:
- /*
- * Don't resend DM_CLOG_POSTSUSPEND request, it will
- * be handled when we get our own config leave
- */
- break;
- default:
- match->resend_requests = 1;
- }
- }
- }
} else
LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u left)",
SHORT_UUID(match->name.value), lowest, left->nodeid);
@@ -1200,7 +1347,7 @@ int create_cluster_cpg(char *str)
{
int r;
int size;
- struct clog_cpg *new;
+ struct clog_cpg *new = NULL;
struct clog_cpg *tmp, *tmp2;
list_for_each_entry_safe(tmp, tmp2, &clog_cpg_list, list)
@@ -1220,13 +1367,30 @@ int create_cluster_cpg(char *str)
new->startup_queue = malloc(sizeof(struct queue));
if (!new->startup_queue) {
- free(new);
LOG_ERROR("Unable to allocate memory for clog_cpg");
- return -ENOMEM;
+ r = -ENOMEM;
+ goto fail;
}
INIT_LIST_HEAD(&(new->startup_queue->list));
new->startup_queue->count = 0;
+ new->delay_queue = malloc(sizeof(struct queue));
+ if (!new->delay_queue) {
+ LOG_ERROR("Unable to allocate memory for clog_cpg");
+ r = -ENOMEM;
+ goto fail;
+ }
+ INIT_LIST_HEAD(&(new->delay_queue->list));
+ new->delay_queue->count = 0;
+
+ new->cluster_queue = malloc(sizeof(struct queue));
+ if (!new->cluster_queue) {
+ LOG_ERROR("Unable to allocate memory for clog_cpg");
+ r = -ENOMEM;
+ goto fail;
+ }
+ INIT_LIST_HEAD(&(new->cluster_queue->list));
+ new->cluster_queue->count = 0;
size = ((strlen(str) + 1) > CPG_MAX_NAME_LENGTH) ?
CPG_MAX_NAME_LENGTH : (strlen(str) + 1);
@@ -1236,13 +1400,15 @@ int create_cluster_cpg(char *str)
r = cpg_initialize(&new->handle, &cpg_callbacks);
if (r != SA_AIS_OK) {
LOG_ERROR("cpg_initialize failed: Cannot join cluster");
- return -EPERM;
+ r = -EPERM;
+ goto fail;
}
r = cpg_join(new->handle, &new->name);
if (r != SA_AIS_OK) {
LOG_ERROR("cpg_join failed: Cannot join cluster");
- return -EPERM;
+ r = -EPERM;
+ goto fail;
}
new->cpg_state = VALID;
@@ -1255,6 +1421,18 @@ int create_cluster_cpg(char *str)
links_register(r, "cluster", do_cluster_work, NULL);
return 0;
+
+fail:
+ if (new) {
+ if (new->startup_queue)
+ free(new->startup_queue);
+ if (new->delay_queue)
+ free(new->delay_queue);
+ if (new->cluster_queue)
+ free(new->cluster_queue);
+ free(new);
+ }
+ return r;
}
static void abort_startup(struct clog_cpg *del)
@@ -1263,16 +1441,22 @@ static void abort_startup(struct clog_cpg *del)
SaNameT name;
SaAisErrorT rv;
SaCkptCheckpointHandleT h;
- struct clog_tfr *startup_tfr = NULL;
+ struct clog_tfr *tfr = NULL;
LOG_DBG("[%s] CPG teardown before checkpoint received",
SHORT_UUID(del->name.value));
- while ((startup_tfr = queue_remove(del->startup_queue))) {
+ while ((tfr = queue_remove(del->startup_queue))) {
LOG_DBG("[%s] Ignoring request from %u: %s",
- SHORT_UUID(del->name.value), startup_tfr->originator,
- RQ_TYPE(startup_tfr->request_type));
- queue_add(startup_tfr, free_queue);
+ SHORT_UUID(del->name.value), tfr->originator,
+ _RQ_TYPE(tfr->request_type));
+ queue_add(tfr, free_queue);
+ }
+
+ while ((tfr = queue_remove(del->delay_queue))) {
+ LOG_DBG("[%s] Ignoring delayed request: %s",
+ SHORT_UUID(del->name.value), _RQ_TYPE(tfr->request_type));
+ queue_add(tfr, free_queue);
}
len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
@@ -1309,23 +1493,44 @@ unlink_retry:
saCkptCheckpointClose(h);
}
-int destroy_cluster_cpg(char *str)
+static int _destroy_cluster_cpg(struct clog_cpg *del)
{
int r;
+ struct clog_tfr *tfr;
+
+ LOG_COND(log_resend_requests, "[%s] I am leaving (1).....",
+ SHORT_UUID(del->name.value));
+
+ tfr = queue_remove(free_queue);
+ if (tfr) {
+ tfr->request_type = DM_CLOG_MEMBER_LEAVE;
+ tfr->originator = my_cluster_id;
+ tfr->seq = 0;
+ strncpy(tfr->uuid, del->name.value, CPG_MAX_NAME_LENGTH);
+ cluster_send(tfr);
+ queue_add(tfr, free_queue);
+ }
+
+ del->cpg_state = INVALID;
+ del->state = LEAVING;
+
+ if (!queue_empty(del->startup_queue) ||
+ !queue_empty(del->delay_queue))
+ abort_startup(del);
+
+ r = cpg_leave(del->handle, &del->name);
+ if (r != CPG_OK)
+ LOG_ERROR("Error leaving CPG!");
+ return 0;
+}
+
+int destroy_cluster_cpg(char *str)
+{
struct clog_cpg *del, *tmp;
list_for_each_entry_safe(del, tmp, &clog_cpg_list, list)
- if (!strncmp(del->name.value, str, CPG_MAX_NAME_LENGTH)) {
- del->cpg_state = INVALID;
- del->state = LEAVING;
- if (!queue_empty(del->startup_queue))
- abort_startup(del);
-
- r = cpg_leave(del->handle, &del->name);
- if (r != CPG_OK)
- LOG_ERROR("Error leaving CPG!");
- break;
- }
+ if (!strncmp(del->name.value, str, CPG_MAX_NAME_LENGTH))
+ _destroy_cluster_cpg(del);
return 0;
}
@@ -1334,6 +1539,12 @@ int init_cluster(void)
{
SaAisErrorT rv;
+ {
+ int i;
+ for (i = 0; i < DEBUGGING_HISTORY; i++)
+ debugging[i][0] = '\0';
+ }
+
INIT_LIST_HEAD(&clog_cpg_list);
rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);