refactor(rebalance): use smart_ptr to manage pull request by ShannonDing · Pull Request #206 · apache/rocketmq-client-cpp
Expand Up
@@ -103,14 +103,25 @@ MessageListenerType ConsumeMessageOrderlyService::getConsumeMsgSerivceListenerTy
return m_pMessageListener->getMessageListenerType();
}
void ConsumeMessageOrderlyService::submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) { void ConsumeMessageOrderlyService::submitConsumeRequest(boost::weak_ptr<PullRequest> pullRequest, vector<MQMessageExt>& msgs) { boost::shared_ptr<PullRequest> request = pullRequest.lock(); if (!request) { LOG_WARN("Pull request has been released"); return; } m_ioService.post(boost::bind(&ConsumeMessageOrderlyService::ConsumeRequest, this, request)); }
void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* context, PullRequest* request, boost::weak_ptr<PullRequest> pullRequest, bool tryLockMQ, boost::asio::deadline_timer* t) { boost::shared_ptr<PullRequest> request = pullRequest.lock(); if (!request) { LOG_WARN("Pull request has been released"); return; } LOG_INFO("submit consumeRequest later for mq:%s", request->m_messageQueue.toString().c_str()); vector<MQMessageExt> msgs; ConsumeMessageOrderlyService* orderlyService = (ConsumeMessageOrderlyService*)context; Expand All @@ -122,7 +133,12 @@ void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* contex deleteAndZero(t); }
void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) { void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest) { boost::shared_ptr<PullRequest> request = pullRequest.lock(); if (!request) { LOG_WARN("Pull request has been released"); return; } bool bGetMutex = false; boost::unique_lock<boost::timed_mutex> lock(request->getPullRequestCriticalSection(), boost::try_to_lock); if (!lock.owns_lock()) { Expand All @@ -140,7 +156,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) { // request->m_messageQueue.toString().c_str()); return; } if (!request || request->isDroped()) { if (!request || request->isDropped()) { LOG_WARN("the pull result is NULL or Had been dropped"); request->clearAllMsgs(); // add clear operation to avoid bad state when // dropped pullRequest returns normal Expand Down Expand Up @@ -189,11 +205,16 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) { } } } void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ) { void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> pullRequest, bool tryLockMQ) { boost::shared_ptr<PullRequest> request = pullRequest.lock(); if (!request) { LOG_WARN("Pull request has been released"); return; } int retryTimer = tryLockMQ ? 500 : 100; boost::asio::deadline_timer* t = new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(retryTimer)); t->async_wait( boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater), this, request, tryLockMQ, t)); } } } // namespace rocketmq
void ConsumeMessageOrderlyService::submitConsumeRequest(PullRequest* request, vector<MQMessageExt>& msgs) { void ConsumeMessageOrderlyService::submitConsumeRequest(boost::weak_ptr<PullRequest> pullRequest, vector<MQMessageExt>& msgs) { boost::shared_ptr<PullRequest> request = pullRequest.lock(); if (!request) { LOG_WARN("Pull request has been released"); return; } m_ioService.post(boost::bind(&ConsumeMessageOrderlyService::ConsumeRequest, this, request)); }
void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* context, PullRequest* request, boost::weak_ptr<PullRequest> pullRequest, bool tryLockMQ, boost::asio::deadline_timer* t) { boost::shared_ptr<PullRequest> request = pullRequest.lock(); if (!request) { LOG_WARN("Pull request has been released"); return; } LOG_INFO("submit consumeRequest later for mq:%s", request->m_messageQueue.toString().c_str()); vector<MQMessageExt> msgs; ConsumeMessageOrderlyService* orderlyService = (ConsumeMessageOrderlyService*)context; Expand All @@ -122,7 +133,12 @@ void ConsumeMessageOrderlyService::static_submitConsumeRequestLater(void* contex deleteAndZero(t); }
void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) { void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> pullRequest) { boost::shared_ptr<PullRequest> request = pullRequest.lock(); if (!request) { LOG_WARN("Pull request has been released"); return; } bool bGetMutex = false; boost::unique_lock<boost::timed_mutex> lock(request->getPullRequestCriticalSection(), boost::try_to_lock); if (!lock.owns_lock()) { Expand All @@ -140,7 +156,7 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) { // request->m_messageQueue.toString().c_str()); return; } if (!request || request->isDroped()) { if (!request || request->isDropped()) { LOG_WARN("the pull result is NULL or Had been dropped"); request->clearAllMsgs(); // add clear operation to avoid bad state when // dropped pullRequest returns normal Expand Down Expand Up @@ -189,11 +205,16 @@ void ConsumeMessageOrderlyService::ConsumeRequest(PullRequest* request) { } } } void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(PullRequest* request, bool tryLockMQ) { void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(boost::weak_ptr<PullRequest> pullRequest, bool tryLockMQ) { boost::shared_ptr<PullRequest> request = pullRequest.lock(); if (!request) { LOG_WARN("Pull request has been released"); return; } int retryTimer = tryLockMQ ? 500 : 100; boost::asio::deadline_timer* t = new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(retryTimer)); t->async_wait( boost::bind(&(ConsumeMessageOrderlyService::static_submitConsumeRequestLater), this, request, tryLockMQ, t)); } } } // namespace rocketmq