Commit 6ff5a3ff authored by lepdou's avatar lepdou

Merge pull request #239 from nobodyiam/long-poll-improvement-merge

Add notification id for long poll
parents cdcd35a7 52e88f6e
package com.ctrip.framework.apollo.biz.entity; package com.ctrip.framework.apollo.biz.entity;
import com.google.common.base.MoreObjects;
import java.util.Date; import java.util.Date;
import javax.persistence.Column; import javax.persistence.Column;
...@@ -55,4 +57,14 @@ public class ReleaseMessage { ...@@ -55,4 +57,14 @@ public class ReleaseMessage {
public void setMessage(String message) { public void setMessage(String message) {
this.message = message; this.message = message;
} }
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.omitNullValues()
.add("id", id)
.add("message", message)
.add("dataChangeLastModifiedTime", dataChangeLastModifiedTime)
.toString();
}
} }
package com.ctrip.framework.apollo.biz.message; package com.ctrip.framework.apollo.biz.message;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
/** /**
* @author Jason Song(song_s@ctrip.com) * @author Jason Song(song_s@ctrip.com)
*/ */
public interface MessageListener { public interface ReleaseMessageListener {
void handleMessage(String message, String channel); void handleMessage(ReleaseMessage message, String channel);
} }
...@@ -33,7 +33,7 @@ public class ReleaseMessageScanner implements InitializingBean { ...@@ -33,7 +33,7 @@ public class ReleaseMessageScanner implements InitializingBean {
@Autowired @Autowired
private ReleaseMessageRepository releaseMessageRepository; private ReleaseMessageRepository releaseMessageRepository;
private int databaseScanInterval; private int databaseScanInterval;
private List<MessageListener> listeners; private List<ReleaseMessageListener> listeners;
private ScheduledExecutorService executorService; private ScheduledExecutorService executorService;
private long maxIdScanned; private long maxIdScanned;
...@@ -66,7 +66,7 @@ public class ReleaseMessageScanner implements InitializingBean { ...@@ -66,7 +66,7 @@ public class ReleaseMessageScanner implements InitializingBean {
* add message listeners for release message * add message listeners for release message
* @param listener * @param listener
*/ */
public void addMessageListener(MessageListener listener) { public void addMessageListener(ReleaseMessageListener listener) {
if (!listeners.contains(listener)) { if (!listeners.contains(listener)) {
listeners.add(listener); listeners.add(listener);
} }
...@@ -115,9 +115,9 @@ public class ReleaseMessageScanner implements InitializingBean { ...@@ -115,9 +115,9 @@ public class ReleaseMessageScanner implements InitializingBean {
*/ */
private void fireMessageScanned(List<ReleaseMessage> messages) { private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) { for (ReleaseMessage message : messages) {
for (MessageListener listener : listeners) { for (ReleaseMessageListener listener : listeners) {
try { try {
listener.handleMessage(message.getMessage(), Topics.APOLLO_RELEASE_TOPIC); listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) { } catch (Throwable ex) {
Cat.logError(ex); Cat.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex); logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
......
...@@ -4,6 +4,7 @@ import com.ctrip.framework.apollo.biz.entity.ReleaseMessage; ...@@ -4,6 +4,7 @@ import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import org.springframework.data.repository.PagingAndSortingRepository; import org.springframework.data.repository.PagingAndSortingRepository;
import java.util.Collection;
import java.util.List; import java.util.List;
/** /**
...@@ -13,4 +14,6 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel ...@@ -13,4 +14,6 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel
List<ReleaseMessage> findFirst500ByIdGreaterThanOrderByIdAsc(Long id); List<ReleaseMessage> findFirst500ByIdGreaterThanOrderByIdAsc(Long id);
ReleaseMessage findTopByOrderByIdDesc(); ReleaseMessage findTopByOrderByIdDesc();
ReleaseMessage findTopByMessageInOrderByIdDesc(Collection<String> messages);
} }
package com.ctrip.framework.apollo.biz.service;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Collection;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Service
public class ReleaseMessageService {
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
public ReleaseMessage findLatestReleaseMessageForMessages(Collection<String> messages) {
return releaseMessageRepository.findTopByMessageInOrderByIdDesc(messages);
}
}
...@@ -44,8 +44,8 @@ public class ReleaseMessageScannerTest { ...@@ -44,8 +44,8 @@ public class ReleaseMessageScannerTest {
@Test @Test
public void testScanMessageAndNotifyMessageListener() throws Exception { public void testScanMessageAndNotifyMessageListener() throws Exception {
SettableFuture<String> someListenerFuture = SettableFuture.create(); SettableFuture<ReleaseMessage> someListenerFuture = SettableFuture.create();
MessageListener someListener = (message, channel) -> someListenerFuture.set(message); ReleaseMessageListener someListener = (message, channel) -> someListenerFuture.set(message);
releaseMessageScanner.addMessageListener(someListener); releaseMessageScanner.addMessageListener(someListener);
String someMessage = "someMessage"; String someMessage = "someMessage";
...@@ -55,13 +55,14 @@ public class ReleaseMessageScannerTest { ...@@ -55,13 +55,14 @@ public class ReleaseMessageScannerTest {
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn( when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(
Lists.newArrayList(someReleaseMessage)); Lists.newArrayList(someReleaseMessage));
String someListenerMessage = ReleaseMessage someListenerMessage =
someListenerFuture.get(5000, TimeUnit.MILLISECONDS); someListenerFuture.get(5000, TimeUnit.MILLISECONDS);
assertEquals(someMessage, someListenerMessage); assertEquals(someMessage, someListenerMessage.getMessage());
assertEquals(someId, someListenerMessage.getId());
SettableFuture<String> anotherListenerFuture = SettableFuture.create(); SettableFuture<ReleaseMessage> anotherListenerFuture = SettableFuture.create();
MessageListener anotherListener = (message, channel) -> anotherListenerFuture.set(message); ReleaseMessageListener anotherListener = (message, channel) -> anotherListenerFuture.set(message);
releaseMessageScanner.addMessageListener(anotherListener); releaseMessageScanner.addMessageListener(anotherListener);
String anotherMessage = "anotherMessage"; String anotherMessage = "anotherMessage";
...@@ -71,10 +72,11 @@ public class ReleaseMessageScannerTest { ...@@ -71,10 +72,11 @@ public class ReleaseMessageScannerTest {
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someId)).thenReturn( when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someId)).thenReturn(
Lists.newArrayList(anotherReleaseMessage)); Lists.newArrayList(anotherReleaseMessage));
String anotherListenerMessage = ReleaseMessage anotherListenerMessage =
anotherListenerFuture.get(5000, TimeUnit.MILLISECONDS); anotherListenerFuture.get(5000, TimeUnit.MILLISECONDS);
assertEquals(anotherMessage, anotherListenerMessage); assertEquals(anotherMessage, anotherListenerMessage.getMessage());
assertEquals(anotherId, anotherListenerMessage.getId());
} }
......
...@@ -59,6 +59,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -59,6 +59,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private final AtomicBoolean m_longPollingStopped; private final AtomicBoolean m_longPollingStopped;
private SchedulePolicy m_longPollSchedulePolicy; private SchedulePolicy m_longPollSchedulePolicy;
private AtomicReference<ServiceDTO> m_longPollServiceDto; private AtomicReference<ServiceDTO> m_longPollServiceDto;
private AtomicReference<ApolloConfigNotification> m_longPollResult;
/** /**
* Constructor. * Constructor.
...@@ -84,6 +85,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -84,6 +85,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
m_longPollingService = Executors.newFixedThreadPool(2, m_longPollingService = Executors.newFixedThreadPool(2,
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true)); ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
m_longPollServiceDto = new AtomicReference<>(); m_longPollServiceDto = new AtomicReference<>();
m_longPollResult = new AtomicReference<>();
this.trySync(); this.trySync();
this.schedulePeriodicRefresh(); this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh(); this.scheduleLongPollingRefresh();
...@@ -270,7 +272,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -270,7 +272,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
String url = String url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster,
m_namespace, dataCenter); m_namespace, dataCenter, m_longPollResult.get());
logger.debug("Long polling from {}", url); logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url); HttpRequest request = new HttpRequest(url);
...@@ -286,6 +288,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -286,6 +288,10 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url); logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200) { if (response.getStatusCode() == 200) {
m_longPollServiceDto.set(lastServiceDto); m_longPollServiceDto.set(lastServiceDto);
if (response.getBody() != null) {
m_longPollResult.set(response.getBody());
transaction.addData("Result", response.getBody().toString());
}
longPollingService.submit(new Runnable() { longPollingService.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
...@@ -320,7 +326,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -320,7 +326,8 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
} }
private String assembleLongPollRefreshUrl(String uri, String appId, String cluster, private String assembleLongPollRefreshUrl(String uri, String appId, String cluster,
String namespace, String dataCenter) { String namespace, String dataCenter,
ApolloConfigNotification previousResult) {
Escaper escaper = UrlEscapers.urlPathSegmentEscaper(); Escaper escaper = UrlEscapers.urlPathSegmentEscaper();
Map<String, String> queryParams = Maps.newHashMap(); Map<String, String> queryParams = Maps.newHashMap();
queryParams.put("appId", escaper.escape(appId)); queryParams.put("appId", escaper.escape(appId));
...@@ -337,6 +344,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -337,6 +344,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
queryParams.put("ip", escaper.escape(localIp)); queryParams.put("ip", escaper.escape(localIp));
} }
if (previousResult != null) {
//number doesn't need encode
queryParams.put("notificationId", String.valueOf(previousResult.getNotificationId()));
}
String params = MAP_JOINER.join(queryParams); String params = MAP_JOINER.join(queryParams);
if (!uri.endsWith("/")) { if (!uri.endsWith("/")) {
uri += "/"; uri += "/";
......
...@@ -234,6 +234,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest { ...@@ -234,6 +234,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
final String someKey = "someKey"; final String someKey = "someKey";
final String someValue = "someValue"; final String someValue = "someValue";
final String anotherValue = "anotherValue"; final String anotherValue = "anotherValue";
long someNotificationId = 1;
long pollTimeoutInMS = 50; long pollTimeoutInMS = 50;
Map<String, String> configurations = Maps.newHashMap(); Map<String, String> configurations = Maps.newHashMap();
...@@ -242,7 +243,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest { ...@@ -242,7 +243,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig); ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler = ContextHandler pollHandler =
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK, mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
new ApolloConfigNotification(apolloConfig.getNamespaceName()), false); new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId), false);
startServerWithHandlers(configHandler, pollHandler); startServerWithHandlers(configHandler, pollHandler);
......
...@@ -10,9 +10,11 @@ import com.google.common.collect.Multimaps; ...@@ -10,9 +10,11 @@ import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.entity.AppNamespace; import com.ctrip.framework.apollo.biz.entity.AppNamespace;
import com.ctrip.framework.apollo.biz.message.MessageListener; import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
import com.ctrip.framework.apollo.biz.message.Topics; import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService; import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil; import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification; import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
...@@ -38,7 +40,7 @@ import java.util.Set; ...@@ -38,7 +40,7 @@ import java.util.Set;
*/ */
@RestController @RestController
@RequestMapping("/notifications") @RequestMapping("/notifications")
public class NotificationController implements MessageListener { public class NotificationController implements ReleaseMessageListener {
private static final Logger logger = LoggerFactory.getLogger(NotificationController.class); private static final Logger logger = LoggerFactory.getLogger(NotificationController.class);
private static final long TIMEOUT = 30 * 1000;//30 seconds private static final long TIMEOUT = 30 * 1000;//30 seconds
private final Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>> private final Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>>
...@@ -52,6 +54,9 @@ public class NotificationController implements MessageListener { ...@@ -52,6 +54,9 @@ public class NotificationController implements MessageListener {
@Autowired @Autowired
private AppNamespaceService appNamespaceService; private AppNamespaceService appNamespaceService;
@Autowired
private ReleaseMessageService releaseMessageService;
@Autowired @Autowired
private EntityManagerUtil entityManagerUtil; private EntityManagerUtil entityManagerUtil;
...@@ -61,6 +66,7 @@ public class NotificationController implements MessageListener { ...@@ -61,6 +66,7 @@ public class NotificationController implements MessageListener {
@RequestParam(value = "cluster") String cluster, @RequestParam(value = "cluster") String cluster,
@RequestParam(value = "namespace", defaultValue = ConfigConsts.NAMESPACE_DEFAULT) String namespace, @RequestParam(value = "namespace", defaultValue = ConfigConsts.NAMESPACE_DEFAULT) String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter, @RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "notificationId", defaultValue = "-1") long notificationId,
@RequestParam(value = "ip", required = false) String clientIp) { @RequestParam(value = "ip", required = false) String clientIp) {
Set<String> watchedKeys = assembleWatchKeys(appId, cluster, namespace, dataCenter); Set<String> watchedKeys = assembleWatchKeys(appId, cluster, namespace, dataCenter);
...@@ -72,12 +78,28 @@ public class NotificationController implements MessageListener { ...@@ -72,12 +78,28 @@ public class NotificationController implements MessageListener {
DeferredResult<ResponseEntity<ApolloConfigNotification>> deferredResult = DeferredResult<ResponseEntity<ApolloConfigNotification>> deferredResult =
new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE); new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE);
//check whether client is out-dated
ReleaseMessage latest = releaseMessageService.findLatestReleaseMessageForMessages(watchedKeys);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();
if (latest != null && latest.getId() != notificationId) {
deferredResult.setResult(new ResponseEntity<>(
new ApolloConfigNotification(namespace, latest.getId()), HttpStatus.OK));
} else {
//register all keys //register all keys
for (String key : watchedKeys) { for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResult); this.deferredResults.put(key, deferredResult);
} }
deferredResult.onTimeout(() -> logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.TimeOutKeys")); deferredResult
.onTimeout(() -> logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResult.onCompletion(() -> { deferredResult.onCompletion(() -> {
//unregister all keys //unregister all keys
...@@ -90,6 +112,8 @@ public class NotificationController implements MessageListener { ...@@ -90,6 +112,8 @@ public class NotificationController implements MessageListener {
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.RegisteredKeys"); logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.info("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}", logger.info("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespace, dataCenter); watchedKeys, appId, cluster, namespace, dataCenter);
}
return deferredResult; return deferredResult;
} }
...@@ -101,13 +125,6 @@ public class NotificationController implements MessageListener { ...@@ -101,13 +125,6 @@ public class NotificationController implements MessageListener {
String namespace, String namespace,
String dataCenter) { String dataCenter) {
AppNamespace appNamespace = appNamespaceService.findByNamespaceName(namespace); AppNamespace appNamespace = appNamespaceService.findByNamespaceName(namespace);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();
//check whether the namespace's appId equals to current one //check whether the namespace's appId equals to current one
if (Objects.isNull(appNamespace) || Objects.equals(applicationId, appNamespace.getAppId())) { if (Objects.isNull(appNamespace) || Objects.equals(applicationId, appNamespace.getAppId())) {
...@@ -140,27 +157,29 @@ public class NotificationController implements MessageListener { ...@@ -140,27 +157,29 @@ public class NotificationController implements MessageListener {
} }
@Override @Override
public void handleMessage(String message, String channel) { public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message); logger.info("message received - channel: {}, message: {}", channel, message);
Cat.logEvent("Apollo.LongPoll.Messages", message);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message)) { String content = message.getMessage();
Cat.logEvent("Apollo.LongPoll.Messages", content);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return; return;
} }
List<String> keys = STRING_SPLITTER.splitToList(message); List<String> keys = STRING_SPLITTER.splitToList(content);
//message should be appId|cluster|namespace //message should be appId+cluster+namespace
if (keys.size() != 3) { if (keys.size() != 3) {
logger.error("message format invalid - {}", message); logger.error("message format invalid - {}", content);
return; return;
} }
ResponseEntity<ApolloConfigNotification> notification = ResponseEntity<ApolloConfigNotification> notification =
new ResponseEntity<>( new ResponseEntity<>(
new ApolloConfigNotification(keys.get(2)), HttpStatus.OK); new ApolloConfigNotification(keys.get(2), message.getId()), HttpStatus.OK);
//create a new list to avoid ConcurrentModificationException //create a new list to avoid ConcurrentModificationException
List<DeferredResult<ResponseEntity<ApolloConfigNotification>>> results = List<DeferredResult<ResponseEntity<ApolloConfigNotification>>> results =
Lists.newArrayList(deferredResults.get(message)); Lists.newArrayList(deferredResults.get(content));
logger.info("Notify {} clients for key {}", results.size(), message); logger.info("Notify {} clients for key {}", results.size(), content);
for (DeferredResult<ResponseEntity<ApolloConfigNotification>> result : results) { for (DeferredResult<ResponseEntity<ApolloConfigNotification>> result : results) {
result.setResult(notification); result.setResult(notification);
......
...@@ -5,8 +5,10 @@ import com.google.common.collect.Lists; ...@@ -5,8 +5,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.ctrip.framework.apollo.biz.entity.AppNamespace; import com.ctrip.framework.apollo.biz.entity.AppNamespace;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.Topics; import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService; import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil; import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification; import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
...@@ -25,6 +27,8 @@ import java.util.List; ...@@ -25,6 +27,8 @@ import java.util.List;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyCollectionOf;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
/** /**
...@@ -39,10 +43,13 @@ public class NotificationControllerTest { ...@@ -39,10 +43,13 @@ public class NotificationControllerTest {
private String defaultNamespace; private String defaultNamespace;
private String somePublicNamespace; private String somePublicNamespace;
private String someDataCenter; private String someDataCenter;
private long someNotificationId;
private String someClientIp; private String someClientIp;
@Mock @Mock
private AppNamespaceService appNamespaceService; private AppNamespaceService appNamespaceService;
@Mock @Mock
private ReleaseMessageService releaseMessageService;
@Mock
private EntityManagerUtil entityManagerUtil; private EntityManagerUtil entityManagerUtil;
private Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>> private Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>>
deferredResults; deferredResults;
...@@ -51,6 +58,7 @@ public class NotificationControllerTest { ...@@ -51,6 +58,7 @@ public class NotificationControllerTest {
public void setUp() throws Exception { public void setUp() throws Exception {
controller = new NotificationController(); controller = new NotificationController();
ReflectionTestUtils.setField(controller, "appNamespaceService", appNamespaceService); ReflectionTestUtils.setField(controller, "appNamespaceService", appNamespaceService);
ReflectionTestUtils.setField(controller, "releaseMessageService", releaseMessageService);
ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil); ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil);
someAppId = "someAppId"; someAppId = "someAppId";
...@@ -59,6 +67,7 @@ public class NotificationControllerTest { ...@@ -59,6 +67,7 @@ public class NotificationControllerTest {
defaultNamespace = ConfigConsts.NAMESPACE_DEFAULT; defaultNamespace = ConfigConsts.NAMESPACE_DEFAULT;
somePublicNamespace = "somePublicNamespace"; somePublicNamespace = "somePublicNamespace";
someDataCenter = "someDC"; someDataCenter = "someDC";
someNotificationId = 1;
someClientIp = "someClientIp"; someClientIp = "someClientIp";
deferredResults = deferredResults =
...@@ -70,7 +79,8 @@ public class NotificationControllerTest { ...@@ -70,7 +79,8 @@ public class NotificationControllerTest {
public void testPollNotificationWithDefaultNamespace() throws Exception { public void testPollNotificationWithDefaultNamespace() throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter, someClientIp); .pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp);
List<String> clusters = List<String> clusters =
Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT); Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT);
...@@ -85,12 +95,35 @@ public class NotificationControllerTest { ...@@ -85,12 +95,35 @@ public class NotificationControllerTest {
} }
} }
@Test
public void testPollNotificationWithDefaultNamespaceWithNotificationIdOutDated() throws Exception {
long notificationId = someNotificationId + 1;
ReleaseMessage someReleaseMessage = mock(ReleaseMessage.class);
when(someReleaseMessage.getId()).thenReturn(notificationId);
when(releaseMessageService.findLatestReleaseMessageForMessages(anyCollectionOf(String.class)))
.thenReturn(someReleaseMessage);
DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp);
ResponseEntity<ApolloConfigNotification> result =
(ResponseEntity<ApolloConfigNotification>) deferredResult.getResult();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(defaultNamespace, result.getBody().getNamespaceName());
assertEquals(notificationId, result.getBody().getNotificationId());
}
@Test @Test
public void testPollNotificationWithDefaultNamespaceWithDefaultClusterWithDataCenter() public void testPollNotificationWithDefaultNamespaceWithDefaultClusterWithDataCenter()
throws Exception { throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, defaultCluster, defaultNamespace, someDataCenter, someClientIp); .pollNotification(someAppId, defaultCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp);
List<String> clusters = List<String> clusters =
Lists.newArrayList(someDataCenter, defaultCluster); Lists.newArrayList(someDataCenter, defaultCluster);
...@@ -110,7 +143,7 @@ public class NotificationControllerTest { ...@@ -110,7 +143,7 @@ public class NotificationControllerTest {
throws Exception { throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, defaultCluster, defaultNamespace, null, someClientIp); .pollNotification(someAppId, defaultCluster, defaultNamespace, null, someNotificationId, someClientIp);
List<String> clusters = List<String> clusters =
Lists.newArrayList(defaultCluster); Lists.newArrayList(defaultCluster);
...@@ -137,7 +170,8 @@ public class NotificationControllerTest { ...@@ -137,7 +170,8 @@ public class NotificationControllerTest {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter, someClientIp); .pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter,
someNotificationId, someClientIp);
List<String> clusters = List<String> clusters =
Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT); Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT);
...@@ -159,17 +193,50 @@ public class NotificationControllerTest { ...@@ -159,17 +193,50 @@ public class NotificationControllerTest {
} }
} }
@Test
public void testPollNotificationWithPublicNamespaceWithNotificationIdOutDated() throws Exception {
long notificationId = someNotificationId + 1;
ReleaseMessage someReleaseMessage = mock(ReleaseMessage.class);
when(someReleaseMessage.getId()).thenReturn(notificationId);
when(releaseMessageService.findLatestReleaseMessageForMessages(anyCollectionOf(String.class)))
.thenReturn(someReleaseMessage);
String somePublicAppId = "somePublicAppId";
AppNamespace somePublicAppNamespace =
assmbleAppNamespace(somePublicAppId, somePublicNamespace);
when(appNamespaceService.findByNamespaceName(somePublicNamespace))
.thenReturn(somePublicAppNamespace);
DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter,
someNotificationId, someClientIp);
ResponseEntity<ApolloConfigNotification> result =
(ResponseEntity<ApolloConfigNotification>) deferredResult.getResult();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(somePublicNamespace, result.getBody().getNamespaceName());
assertEquals(notificationId, result.getBody().getNotificationId());
}
@Test @Test
public void testPollNotificationWithDefaultNamespaceAndHandleMessage() throws Exception { public void testPollNotificationWithDefaultNamespaceAndHandleMessage() throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter, someClientIp); .pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp);
String key = String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR) Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, defaultNamespace); .join(someAppId, someCluster, defaultNamespace);
long someId = 1;
ReleaseMessage someReleaseMessage = new ReleaseMessage(key);
someReleaseMessage.setId(someId);
controller.handleMessage(key, Topics.APOLLO_RELEASE_TOPIC); controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC);
ResponseEntity<ApolloConfigNotification> response = ResponseEntity<ApolloConfigNotification> response =
(ResponseEntity<ApolloConfigNotification>) deferredResult.getResult(); (ResponseEntity<ApolloConfigNotification>) deferredResult.getResult();
...@@ -177,6 +244,7 @@ public class NotificationControllerTest { ...@@ -177,6 +244,7 @@ public class NotificationControllerTest {
assertEquals(HttpStatus.OK, response.getStatusCode()); assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals(defaultNamespace, notification.getNamespaceName()); assertEquals(defaultNamespace, notification.getNamespaceName());
assertEquals(someId, notification.getNotificationId());
} }
@Test @Test
...@@ -190,13 +258,17 @@ public class NotificationControllerTest { ...@@ -190,13 +258,17 @@ public class NotificationControllerTest {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter, someClientIp); .pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter,
someNotificationId, someClientIp);
String key = String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR) Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(somePublicAppId, someDataCenter, somePublicNamespace); .join(somePublicAppId, someDataCenter, somePublicNamespace);
long someId = 1;
ReleaseMessage someReleaseMessage = new ReleaseMessage(key);
someReleaseMessage.setId(someId);
controller.handleMessage(key, Topics.APOLLO_RELEASE_TOPIC); controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC);
ResponseEntity<ApolloConfigNotification> response = ResponseEntity<ApolloConfigNotification> response =
(ResponseEntity<ApolloConfigNotification>) deferredResult.getResult(); (ResponseEntity<ApolloConfigNotification>) deferredResult.getResult();
...@@ -204,7 +276,7 @@ public class NotificationControllerTest { ...@@ -204,7 +276,7 @@ public class NotificationControllerTest {
assertEquals(HttpStatus.OK, response.getStatusCode()); assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals(somePublicNamespace, notification.getNamespaceName()); assertEquals(somePublicNamespace, notification.getNamespaceName());
assertEquals(someId, notification.getNotificationId());
} }
private AppNamespace assmbleAppNamespace(String appId, String namespace) { private AppNamespace assmbleAppNamespace(String appId, String namespace) {
......
...@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit; ...@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
/** /**
* @author Jason Song(song_s@ctrip.com) * @author Jason Song(song_s@ctrip.com)
...@@ -45,7 +46,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -45,7 +46,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
executorService = Executors.newSingleThreadExecutor(); executorService = Executors.newSingleThreadExecutor();
} }
@Test @Test(timeout = 5000L)
public void testPollNotificationWithDefaultNamespace() throws Exception { public void testPollNotificationWithDefaultNamespace() throws Exception {
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
perodicSendMessage(assembleKey(someAppId, someCluster, defaultNamespace), stop); perodicSendMessage(assembleKey(someAppId, someCluster, defaultNamespace), stop);
...@@ -60,6 +61,39 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -60,6 +61,39 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
ApolloConfigNotification notification = result.getBody(); ApolloConfigNotification notification = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode()); assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(defaultNamespace, notification.getNamespaceName()); assertEquals(defaultNamespace, notification.getNamespaceName());
assertNotEquals(0, notification.getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release-message.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespaceWithNotificationIdNull() throws Exception {
ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}",
ApolloConfigNotification.class,
getHostUrl(), someAppId, someCluster, defaultNamespace);
ApolloConfigNotification notification = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(defaultNamespace, notification.getNamespaceName());
assertEquals(10, notification.getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/test-release-message.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespaceWithNotificationIdOutDated() throws Exception {
long someOutDatedNotificationId = 1;
ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}&notificationId={notificationId}",
ApolloConfigNotification.class,
getHostUrl(), someAppId, someCluster, defaultNamespace, someOutDatedNotificationId);
ApolloConfigNotification notification = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(defaultNamespace, notification.getNamespaceName());
assertEquals(10, notification.getNotificationId());
} }
@Test(timeout = 5000L) @Test(timeout = 5000L)
...@@ -82,6 +116,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -82,6 +116,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
ApolloConfigNotification notification = result.getBody(); ApolloConfigNotification notification = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode()); assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(somePublicNamespace, notification.getNamespaceName()); assertEquals(somePublicNamespace, notification.getNamespaceName());
assertNotEquals(0, notification.getNotificationId());
} }
@Test(timeout = 5000L) @Test(timeout = 5000L)
...@@ -105,6 +140,24 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -105,6 +140,24 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
ApolloConfigNotification notification = result.getBody(); ApolloConfigNotification notification = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode()); assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(somePublicNamespace, notification.getNamespaceName()); assertEquals(somePublicNamespace, notification.getNamespaceName());
assertNotEquals(0, notification.getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/test-release-message.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithPublicNamespaceWithNotificationIdOutDated() throws Exception {
long someOutDatedNotificationId = 1;
ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}&notificationId={notificationId}",
ApolloConfigNotification.class,
getHostUrl(), someAppId, someCluster, somePublicNamespace, someOutDatedNotificationId);
ApolloConfigNotification notification = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(somePublicNamespace, notification.getNamespaceName());
assertEquals(20, notification.getNotificationId());
} }
private String assembleKey(String appId, String cluster, String namespace) { private String assembleKey(String appId, String cluster, String namespace) {
......
...@@ -3,4 +3,5 @@ DELETE FROM Namespace; ...@@ -3,4 +3,5 @@ DELETE FROM Namespace;
DELETE FROM AppNamespace; DELETE FROM AppNamespace;
DELETE FROM Cluster; DELETE FROM Cluster;
DELETE FROM App; DELETE FROM App;
DELETE FROM ReleaseMessage;
INSERT INTO `releasemessage` (`Id`, `Message`)
VALUES
(10, 'someAppId+default+application');
INSERT INTO `releasemessage` (`Id`, `Message`)
VALUES
(20, 'somePublicAppId+default+somePublicNamespace');
...@@ -5,20 +5,30 @@ package com.ctrip.framework.apollo.core.dto; ...@@ -5,20 +5,30 @@ package com.ctrip.framework.apollo.core.dto;
*/ */
public class ApolloConfigNotification { public class ApolloConfigNotification {
private String namespaceName; private String namespaceName;
private long notificationId;
//for json converter //for json converter
public ApolloConfigNotification() { public ApolloConfigNotification() {
} }
public ApolloConfigNotification(String namespaceName) { public ApolloConfigNotification(String namespaceName, long notificationId) {
this.namespaceName = namespaceName; this.namespaceName = namespaceName;
this.notificationId = notificationId;
} }
public String getNamespaceName() { public String getNamespaceName() {
return namespaceName; return namespaceName;
} }
public void setNamespaceName(String namespaceName) { public long getNotificationId() {
this.namespaceName = namespaceName; return notificationId;
}
@Override
public String toString() {
return "ApolloConfigNotification{" +
"namespaceName='" + namespaceName + '\'' +
", notificationId=" + notificationId +
'}';
} }
} }
...@@ -18,7 +18,7 @@ public class ClassLoaderUtil { ...@@ -18,7 +18,7 @@ public class ClassLoaderUtil {
static { static {
if (loader == null) { if (loader == null) {
logger.info("Using system class loader"); logger.warn("Using system class loader");
loader = ClassLoader.getSystemClassLoader(); loader = ClassLoader.getSystemClassLoader();
} }
......
...@@ -19,7 +19,7 @@ public class ResourceUtils { ...@@ -19,7 +19,7 @@ public class ResourceUtils {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static Properties readConfigFile(String configPath, Properties defaults) { public static Properties readConfigFile(String configPath, Properties defaults) {
InputStream in = ClassLoaderUtil.getLoader().getResourceAsStream(configPath); InputStream in = ClassLoaderUtil.getLoader().getResourceAsStream(configPath);
logger.info("Reading config from resource {}", configPath); logger.debug("Reading config from resource {}", configPath);
Properties props = new Properties(); Properties props = new Properties();
try { try {
if (in == null) { if (in == null) {
...@@ -27,9 +27,9 @@ public class ResourceUtils { ...@@ -27,9 +27,9 @@ public class ResourceUtils {
Path path = new File(System.getProperty("user.dir") + configPath).toPath(); Path path = new File(System.getProperty("user.dir") + configPath).toPath();
if (Files.isReadable(path)) { if (Files.isReadable(path)) {
in = new FileInputStream(path.toFile()); in = new FileInputStream(path.toFile());
logger.info("Reading config from file {} ", path); logger.debug("Reading config from file {} ", path);
} else { } else {
logger.info("Could not find available config file"); logger.warn("Could not find available config file");
} }
} }
if (defaults != null) { if (defaults != null) {
...@@ -59,9 +59,9 @@ public class ResourceUtils { ...@@ -59,9 +59,9 @@ public class ResourceUtils {
sb.append(key).append('=').append(val).append('\n'); sb.append(key).append('=').append(val).append('\n');
} }
if (sb.length() > 0) { if (sb.length() > 0) {
logger.info("Reading properties: \n" + sb.toString()); logger.debug("Reading properties: \n" + sb.toString());
} else { } else {
logger.info("No available properties"); logger.warn("No available properties");
} }
return props; return props;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment