/* * This file is part of QCluster. * * QCluster is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * QCluster is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with QCluster; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * * Peter Harper * * $Id: message.c,v 1.1 2003/12/02 06:17:38 rmello Exp $ */ static char rcsid[] = "@(#) $Id: message.c,v 1.1 2003/12/02 06:17:38 rmello Exp $"; #include "ns.h" #include "message.h" #include "clustercomms.h" #include "locks.h" Ns_RWLock g_msg_types_mutex; Tcl_HashTable g_msg_types; static void q_delegate_group(struct q_server *remote_server_ptr, struct q_group *local_group_ptr); /* *---------------------------------------------------------------------- * * q_register_msg_type -- * * Registers a particular message type with the qcluster system. This * function should be used by the layer above qcluster. Two function * pointers should be provided for encoding and decoding between local * host and network tranmission. * * Results: * Pointer to the newly created message type object. * *---------------------------------------------------------------------- */ struct q_message_type * q_register_msg_type( int msg_type_id, char *msg_type_name, int (*construct_in_func)(void *, int, void *, int *), int (*construct_out_func)(void *, int, void *, int *)) { struct q_message_type *type_ptr; Tcl_HashEntry *entry_ptr; int new_flag; /* * Create the message type object, and add it to the message types * hash table. */ type_ptr = ns_malloc(sizeof(struct q_message_type)); type_ptr->msg_type_id = msg_type_id; strcpy(type_ptr->type_name, msg_type_name); type_ptr->construct_in_func = construct_in_func; type_ptr->construct_out_func = construct_out_func; Ns_RWLockRdLock(&g_msg_types_mutex); entry_ptr = Tcl_CreateHashEntry(&g_msg_types, (char *)msg_type_id, &new_flag); Tcl_SetHashValue(entry_ptr, type_ptr); Ns_RWLockUnlock(&g_msg_types_mutex); return type_ptr; } /* *---------------------------------------------------------------------- * * q_can_process_p -- * * Adds the process_p number to the process_p flag, thus keeping a count * of the number of processors on this server. * * Results: * None * *---------------------------------------------------------------------- */ void q_set_can_process_p(struct q_group *group_ptr, int can_process_p) { q_get_wlock(group_ptr); group_ptr->can_process_p += can_process_p; if (group_ptr->can_process_p < 0) { can_process_p = 0; } /* * Send a can_process update message to remote servers */ q_comms_can_process_msg(group_ptr); q_release_lock(group_ptr); } /* *---------------------------------------------------------------------- * * q_remote_can_process_p -- * * Adds the process_p number to the process_p flag, thus keeping a count * of the number of processors on the remote server. * * Results: * None * *---------------------------------------------------------------------- */ void q_remote_set_can_process_val(struct q_server *remote_server_ptr, struct q_group *group_ptr, int can_process_val) { struct q_group *local_group_ptr; int local_can_process_val; int local_queue_empty; q_get_wlock(group_ptr); group_ptr->can_process_p = can_process_val; q_release_lock(group_ptr); /* * Check whether the remote server is indicating that it can process * a group that this server cannot, and that the local group has messages. * If this is the case, then we need to delegate those messages to the * remote host. */ if (can_process_val > 0) { local_group_ptr = q_get_group_ptr(g_server, group_ptr->grp_name); if (local_group_ptr == NULL) { Ns_Log(Error,"q_remote_set_can_process_val: Cannot find local group %s.", group_ptr->grp_name); } else { q_get_rlock(local_group_ptr); local_can_process_val = local_group_ptr->can_process_p; if (local_group_ptr->queue_ll == NULL) { local_queue_empty = 1; } else { local_queue_empty = 0; } q_release_lock(local_group_ptr); if (!local_queue_empty && local_can_process_val == 0) { /* * Delegate the local group to the remote server. */ q_delegate_group(remote_server_ptr, local_group_ptr); } } } } /* *---------------------------------------------------------------------- * * q_delegate_group -- * * Delegates an entire group to a specified remote server. * * Results: * None. * *---------------------------------------------------------------------- */ void q_delegate_group(struct q_server *remote_server_ptr, struct q_group *local_group_ptr) { Tcl_HashEntry *entry_ptr; int new_flag; int delegated_flag; int can_process_p; struct q_message *message_ptr; struct q_message_search msg_search; // Ns_Log(Notice, "q_delegate_group: entering (%s)", local_group_ptr->grp_name); /* * Loop through each message in the group, attempting to delegate the * message to the remote_server. If the delegation is successful, delete * the message locally and send a delete_recovered_message to delete the * message off the remote queues. */ q_get_rlock(local_group_ptr); message_ptr = q_data_first_msg_entry(local_group_ptr, -1, &msg_search); while (message_ptr != NULL) { // Ns_Log(Notice,"Delegating message %s", message_ptr->msg_id); /* * Delete the message pointer entry from the old group object. */ delegated_flag = q_comms_send_delegate_msg(remote_server_ptr, message_ptr, g_cluster_size); if (delegated_flag == NS_TRUE) { q_comms_delete_recovered_message(g_server, message_ptr); q_data_delete_msg(message_ptr->grp_ptr, message_ptr->msg_id, NS_TRUE, NS_TRUE); } message_ptr = q_data_next_msg_entry(&msg_search); } q_release_lock(local_group_ptr); return; } /* *---------------------------------------------------------------------- * * q_get_group_ptr -- * * Returns a pointer to the group object specified by the server * pointer and group name parameters. * * Results: * Group object pointer if successfully found. * Null otherwise. * *---------------------------------------------------------------------- */ struct q_group * q_get_group_ptr(struct q_server *server_ptr, char *group_name) { Tcl_HashEntry *entry_ptr; struct q_group *group_ptr; /* * Try and find the group in this server entry. If found, return a * pointer to it, otherwise return NULL. */ q_get_rlock(server_ptr); entry_ptr = Tcl_FindHashEntry(&server_ptr->groups_ht, group_name); q_release_lock(server_ptr); if (entry_ptr == NULL) { group_ptr = NULL; } else { group_ptr = Tcl_GetHashValue(entry_ptr); } return group_ptr; } /* *---------------------------------------------------------------------- * * q_get_msg_type_ptr -- * * Returns a pointer to the message type object, specified by the * given message type id parameter. * * Results: * Message type object pointer if successfully found. * Null otherwise. * *---------------------------------------------------------------------- */ struct q_message_type * q_get_msg_type_ptr(int msg_type_id) { Tcl_HashEntry *entry_ptr; struct q_message_type *msg_type_ptr; /* * Try and find the message_type. If found, return a * pointer to it, * otherwise return NULL. */ Ns_RWLockRdLock(&g_msg_types_mutex); entry_ptr = Tcl_FindHashEntry(&g_msg_types, (void *)msg_type_id); Ns_RWLockUnlock(&g_msg_types_mutex); if (entry_ptr == NULL) { msg_type_ptr = NULL; } else { msg_type_ptr = Tcl_GetHashValue(entry_ptr); } return msg_type_ptr; } /* *---------------------------------------------------------------------- * * q_queue_msg -- * * Queues a message within the given group. If the "cat_process_p" * flag is false, it'll pass the message to its nearest neighbour. * * Results: * None. * *---------------------------------------------------------------------- */ int q_queue_msg(struct q_message_type *msg_type_ptr, struct q_group *group_ptr, char *msg_id, int msg_status, void *msg_buf, int msg_size, int cur_hops) { struct q_message *msg_ptr; Tcl_HashEntry *entry_ptr; int new_flag; int delegated_flag; int can_process_p; // Ns_Log(Notice, "q_queue_msg: entering (%s) curhops = %d", msg_id, cur_hops); /* * Construct the new message object. */ msg_ptr = ns_malloc(sizeof(struct q_message)); msg_ptr->msg_ptr = ns_malloc(msg_size); strcpy(msg_ptr->msg_id, msg_id); memcpy(msg_ptr->msg_ptr, msg_buf, msg_size); msg_ptr->msg_size = msg_size; msg_ptr->timestamp = time(NULL); msg_ptr->status = msg_status; msg_ptr->grp_ptr = group_ptr; msg_ptr->type_ptr = msg_type_ptr; q_get_rlock(group_ptr); can_process_p = group_ptr->can_process_p; q_release_lock(group_ptr); delegated_flag = NS_FALSE; if (!can_process_p) { /* * This server can't process this message group, so delegate this * message to the nearest neighbour. If the delegation operation fails, * then we have no choice but to queue the message here. */ delegated_flag = q_comms_delegate_msg(NULL,msg_ptr, cur_hops); } if (can_process_p || delegated_flag == NS_FALSE) { /* * This server can process this message group, so add the message to * the group. */ q_get_wlock(group_ptr); q_data_add_msg(group_ptr,msg_ptr); q_release_lock(group_ptr); /* * Pass a copy of this message to all connected servers. */ q_comms_distribute_msg(NULL,msg_ptr); /* * Signal the queue append event for any consuming threads. */ Ns_CondBroadcast(&group_ptr->event); } // Ns_Log(Notice, "q_queue_msg: exiting (%x)", msg_ptr); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_remote_queue_msg -- * * Queues a message on another servers queue within the given group. * * Results: * None. * *---------------------------------------------------------------------- */ int q_remote_queue_msg(struct q_message_type *msg_type_ptr, struct q_group *group_ptr, char *msg_id, int msg_status, void *msg_buf, int msg_size) { struct q_message *msg_ptr; Tcl_HashEntry *entry_ptr; int new_flag; int delegated_flag; int can_process_p; // Ns_Log(Notice, "q_queue_remote_msg: entering (%s)", msg_id); /* * Construct the new message object. */ msg_ptr = ns_malloc(sizeof(struct q_message)); msg_ptr->msg_ptr = ns_malloc(msg_size); strcpy(msg_ptr->msg_id, msg_id); memcpy(msg_ptr->msg_ptr, msg_buf, msg_size); msg_ptr->msg_size = msg_size; msg_ptr->timestamp = time(NULL); msg_ptr->status = msg_status; msg_ptr->grp_ptr = group_ptr; msg_ptr->type_ptr = msg_type_ptr; /* * This server can process this message group, so add the message to * the group. */ q_get_wlock(group_ptr); q_data_add_msg(group_ptr, msg_ptr); q_release_lock(group_ptr); // Ns_Log(Notice, "q_queue_remote_msg: exiting"); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_msg_status_update -- * * Update the status of the given message id. If the status is * Q_MSG_STATUS_COMPLETE/FAILED, the message will be deleted from the * group. Any updates are forwarded to all the other qcluster * servers. * * Results: * None. * *---------------------------------------------------------------------- */ int q_msg_status_update(char *msg_id, struct q_group *group_ptr, int new_status) { struct q_message *msg_ptr; int old_status; // Ns_Log(Notice, "q_msg_status_update: entering"); q_get_wlock(group_ptr); msg_ptr = q_data_get_msg_entry(group_ptr, msg_id); if (msg_ptr == NULL) { Ns_Log(Error, "q_msg_status_update: Couldn't find message id %s in group %s", msg_id, group_ptr->grp_name); q_release_lock(group_ptr); return NS_FALSE; } if (new_status == Q_MSG_STATUS_COMPLETE || new_status == Q_MSG_STATUS_FAILED) { /* * If the new status is COMPLETE or FAILED, delete it. Note a slight * fudge in the status field because of the way the * q_comms_msg_status_update function assuming the msg_ptr is valid and * has the new status. */ old_status = msg_ptr->status; msg_ptr->status = new_status; q_comms_msg_status_update(msg_ptr); msg_ptr->status = old_status; q_data_delete_msg(group_ptr, msg_id, NS_TRUE, NS_TRUE); } else { /* * Try and find the message in this group entry, if found, update its * status to the new value. */ q_data_update_msg_status(group_ptr, msg_ptr, new_status); q_comms_msg_status_update(msg_ptr); } q_release_lock(group_ptr); // Ns_Log(Notice, "q_msg_status_update: exiting"); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_remote_msg_status_update -- * * Update the status of the given message id. If the status is * Q_MSG_STATUS_COMPLETE, the message will be deleted from the * group. * * Results: * None. * *---------------------------------------------------------------------- */ int q_remote_msg_status_update(char *msg_id, struct q_group *group_ptr, int new_status) { struct q_message *msg_ptr; // Ns_Log(Notice, "q_remote_msg_status_update: entering (%s,%x)", msg_id, new_status); q_get_wlock(group_ptr); msg_ptr = q_data_get_msg_entry(group_ptr, msg_id); if (msg_ptr == NULL) { Ns_Log(Error, "q_remote_msg_status_update: Couldn't find message id %s in group %s", msg_id, group_ptr->grp_name); q_release_lock(group_ptr); return NS_FALSE; } if (new_status == Q_MSG_STATUS_COMPLETE || new_status == Q_MSG_STATUS_FAILED) { /* * If the new status is COMPLETE, delete it. */ q_data_delete_msg(group_ptr, msg_id, NS_TRUE, NS_TRUE); } else { /* * Update its status to the new value. */ q_data_update_msg_status(group_ptr, msg_ptr, new_status); } q_release_lock(group_ptr); // Ns_Log(Notice, "q_remote_msg_status_update: exiting"); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_recover_msg -- * * Recovers a message from a failed server. * * NB: This function DOES NOT GET LOCKS on the failed server structure * because it relies on the calling function to get this lock. * * Results: * None. * *---------------------------------------------------------------------- */ int q_recover_msg(struct q_server *failed_server_ptr, struct q_message *msg_ptr) { struct q_group *group_ptr; Tcl_HashEntry *entry_ptr; int new_flag; int can_process_p; int delegated_flag; int cur_hops; // Ns_Log(Notice, "q_recover_msg: entering (%s)", msg_ptr->msg_id); /* * Reuse the message object, but update the group pointer to the local * server group. */ group_ptr = q_get_group_ptr(g_server, msg_ptr->grp_ptr->grp_name); if (group_ptr == NULL) { Ns_Log(Error,"q_recover_msg: group name not found: %s", msg_ptr->grp_ptr->grp_name); return NS_FALSE; } /* * Delete the message pointer entry from the old group object. */ q_data_delete_msg(msg_ptr->grp_ptr, msg_ptr->msg_id, NS_FALSE, NS_FALSE); msg_ptr->grp_ptr = group_ptr; msg_ptr->status = Q_MSG_STATUS_READY; q_get_rlock(group_ptr); can_process_p = group_ptr->can_process_p; q_release_lock(group_ptr); delegated_flag = NS_FALSE; if (!can_process_p) { /* * This server can't process this message group, so delegate this * message to the nearest neighbour. If the delegation operation fails, * then we have no choice but to queue the message here. */ delegated_flag = q_comms_delegate_msg(failed_server_ptr, msg_ptr, g_cluster_size); } if (can_process_p || delegated_flag == NS_FALSE) { /* * This server can process this message group, so add the message to * the group. */ q_get_wlock(group_ptr); q_data_add_msg(group_ptr, msg_ptr); q_release_lock(group_ptr); /* * Pass a copy of this message to all connected servers. */ q_comms_distribute_msg(failed_server_ptr, msg_ptr); } // Ns_Log(Notice, "q_recover_msg: exiting"); return NS_TRUE; } /* *---------------------------------------------------------------------- * * q_get_msg_and_set_status -- * * Retrieves a message with the given status from the given group. The * status of the message is then updated to the new status in one, single * atomic operation. * * Results: * Message pointer if a message was found. NULL otherwise. * *---------------------------------------------------------------------- */ struct q_message * q_get_msg_and_set_status(struct q_group *group_ptr, int cur_status, int new_status) { struct q_message *msg_ptr; struct q_message_search search; int old_status; // Ns_Log(Notice, "q_get_msg_and_set_status: entering"); q_get_wlock(group_ptr); msg_ptr = q_data_first_msg_entry(group_ptr, cur_status, &search); if (msg_ptr != NULL) { if (new_status == Q_MSG_STATUS_COMPLETE || new_status == Q_MSG_STATUS_FAILED) { /* * If the new status is COMPLETE or FAILED, delete it. */ old_status = msg_ptr->status; msg_ptr->status = new_status; q_comms_msg_status_update(msg_ptr); msg_ptr->status = old_status; q_data_delete_msg(group_ptr, msg_ptr->msg_id, NS_TRUE, NS_TRUE); } else { /* * Update the status of the message. */ q_data_update_msg_status(group_ptr, msg_ptr, new_status); q_comms_msg_status_update(msg_ptr); } } q_release_lock(group_ptr); // Ns_Log(Notice, "q_get_msg_and_set_status: exiting"); return msg_ptr; }