feat(callback): use start pointer to manager callbacks by ShannonDing · Pull Request #232 · apache/rocketmq-client-cpp
Expand Up
@@ -39,6 +39,7 @@ class MockTcpRemotingClient : public TcpRemotingClient {
: TcpRemotingClient(pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout) {}
MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int)); MOCK_METHOD6(invokeAsync, bool(const string&, RemotingCommand&, std::shared_ptr<AsyncCallbackWrap>, int64, int, int)); }; class MockMQClientAPIImpl : public MQClientAPIImpl { public: Expand Down Expand Up @@ -137,6 +138,18 @@ TEST(MQClientAPIImplTest, getMinOffset) { int64 offset = impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc); EXPECT_EQ(2048, offset); } class MyMockAutoDeleteSendCallback : public AutoDeleteSendCallBack { public: virtual ~MyMockAutoDeleteSendCallback() {} virtual void onSuccess(SendResult& sendResult) { std::cout << "send Success" << std::endl; return; } virtual void onException(MQException& e) { std::cout << "send Exception" << e << std::endl; return; } };
TEST(MQClientAPIImplTest, sendMessage) { string cid = "testClientId"; Expand Down Expand Up @@ -169,6 +182,57 @@ TEST(MQClientAPIImplTest, sendMessage) { EXPECT_EQ(result.getOffsetMsgId(), "MessageID"); EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker"); EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
// Try to test Async send
EXPECT_CALL(*pClient, invokeAsync(_, _, _, _, _, _)) .Times(7) .WillOnce(Return(false)) .WillOnce(Return(true)) .WillOnce(Return(false)) .WillOnce(Return(true)) .WillOnce(Return(false)) .WillOnce(Return(false)) .WillOnce(Return(false));
SendMessageRequestHeader* requestHeader2 = new SendMessageRequestHeader(); requestHeader2->producerGroup = cid; requestHeader2->topic = (message.getTopic()); requestHeader2->defaultTopic = DEFAULT_TOPIC; requestHeader2->defaultTopicQueueNums = 4; requestHeader2->bornTimestamp = UtilAll::currentTimeMillis(); EXPECT_ANY_THROW( impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader2, 100, 1, ComMode_ASYNC, nullptr, sc));
SendMessageRequestHeader* requestHeader3 = new SendMessageRequestHeader(); requestHeader3->producerGroup = cid; requestHeader3->topic = (message.getTopic()); requestHeader3->defaultTopic = DEFAULT_TOPIC; requestHeader3->defaultTopicQueueNums = 4; requestHeader3->bornTimestamp = UtilAll::currentTimeMillis(); SendCallback* pSendCallback = new MyMockAutoDeleteSendCallback(); EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader3, 100, 1, ComMode_ASYNC, pSendCallback, sc));
SendMessageRequestHeader* requestHeader4 = new SendMessageRequestHeader(); requestHeader4->producerGroup = cid; requestHeader4->topic = (message.getTopic()); requestHeader4->defaultTopic = DEFAULT_TOPIC; requestHeader4->defaultTopicQueueNums = 4; requestHeader4->bornTimestamp = UtilAll::currentTimeMillis(); SendCallback* pSendCallback2 = new MyMockAutoDeleteSendCallback(); EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader4, 1000, 2, ComMode_ASYNC, pSendCallback2, sc));
SendMessageRequestHeader* requestHeader5 = new SendMessageRequestHeader(); requestHeader5->producerGroup = cid; requestHeader5->topic = (message.getTopic()); requestHeader5->defaultTopic = DEFAULT_TOPIC; requestHeader5->defaultTopicQueueNums = 4; requestHeader5->bornTimestamp = UtilAll::currentTimeMillis(); SendCallback* pSendCallback3 = new MyMockAutoDeleteSendCallback(); EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader5, 1000, 3, ComMode_ASYNC, pSendCallback3, sc)); }
TEST(MQClientAPIImplTest, consumerSendMessageBack) { Expand Down
MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int)); MOCK_METHOD6(invokeAsync, bool(const string&, RemotingCommand&, std::shared_ptr<AsyncCallbackWrap>, int64, int, int)); }; class MockMQClientAPIImpl : public MQClientAPIImpl { public: Expand Down Expand Up @@ -137,6 +138,18 @@ TEST(MQClientAPIImplTest, getMinOffset) { int64 offset = impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc); EXPECT_EQ(2048, offset); } class MyMockAutoDeleteSendCallback : public AutoDeleteSendCallBack { public: virtual ~MyMockAutoDeleteSendCallback() {} virtual void onSuccess(SendResult& sendResult) { std::cout << "send Success" << std::endl; return; } virtual void onException(MQException& e) { std::cout << "send Exception" << e << std::endl; return; } };
TEST(MQClientAPIImplTest, sendMessage) { string cid = "testClientId"; Expand Down Expand Up @@ -169,6 +182,57 @@ TEST(MQClientAPIImplTest, sendMessage) { EXPECT_EQ(result.getOffsetMsgId(), "MessageID"); EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker"); EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
// Try to test Async send
EXPECT_CALL(*pClient, invokeAsync(_, _, _, _, _, _)) .Times(7) .WillOnce(Return(false)) .WillOnce(Return(true)) .WillOnce(Return(false)) .WillOnce(Return(true)) .WillOnce(Return(false)) .WillOnce(Return(false)) .WillOnce(Return(false));
SendMessageRequestHeader* requestHeader2 = new SendMessageRequestHeader(); requestHeader2->producerGroup = cid; requestHeader2->topic = (message.getTopic()); requestHeader2->defaultTopic = DEFAULT_TOPIC; requestHeader2->defaultTopicQueueNums = 4; requestHeader2->bornTimestamp = UtilAll::currentTimeMillis(); EXPECT_ANY_THROW( impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader2, 100, 1, ComMode_ASYNC, nullptr, sc));
SendMessageRequestHeader* requestHeader3 = new SendMessageRequestHeader(); requestHeader3->producerGroup = cid; requestHeader3->topic = (message.getTopic()); requestHeader3->defaultTopic = DEFAULT_TOPIC; requestHeader3->defaultTopicQueueNums = 4; requestHeader3->bornTimestamp = UtilAll::currentTimeMillis(); SendCallback* pSendCallback = new MyMockAutoDeleteSendCallback(); EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader3, 100, 1, ComMode_ASYNC, pSendCallback, sc));
SendMessageRequestHeader* requestHeader4 = new SendMessageRequestHeader(); requestHeader4->producerGroup = cid; requestHeader4->topic = (message.getTopic()); requestHeader4->defaultTopic = DEFAULT_TOPIC; requestHeader4->defaultTopicQueueNums = 4; requestHeader4->bornTimestamp = UtilAll::currentTimeMillis(); SendCallback* pSendCallback2 = new MyMockAutoDeleteSendCallback(); EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader4, 1000, 2, ComMode_ASYNC, pSendCallback2, sc));
SendMessageRequestHeader* requestHeader5 = new SendMessageRequestHeader(); requestHeader5->producerGroup = cid; requestHeader5->topic = (message.getTopic()); requestHeader5->defaultTopic = DEFAULT_TOPIC; requestHeader5->defaultTopicQueueNums = 4; requestHeader5->bornTimestamp = UtilAll::currentTimeMillis(); SendCallback* pSendCallback3 = new MyMockAutoDeleteSendCallback(); EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader5, 1000, 3, ComMode_ASYNC, pSendCallback3, sc)); }
TEST(MQClientAPIImplTest, consumerSendMessageBack) { Expand Down