Commit 562bc959 authored by 张乐's avatar 张乐 Committed by GitHub

Merge pull request #332 from nobodyiam/client-long-poll-refactor-merge

multiple namespaces reuse the same long poll connection
parents 90d635ec 47d1940b
...@@ -5,14 +5,19 @@ import org.springframework.data.repository.PagingAndSortingRepository; ...@@ -5,14 +5,19 @@ import org.springframework.data.repository.PagingAndSortingRepository;
import com.ctrip.framework.apollo.common.entity.AppNamespace; import com.ctrip.framework.apollo.common.entity.AppNamespace;
import java.util.List; import java.util.List;
import java.util.Set;
public interface AppNamespaceRepository extends PagingAndSortingRepository<AppNamespace, Long>{ public interface AppNamespaceRepository extends PagingAndSortingRepository<AppNamespace, Long>{
AppNamespace findByAppIdAndName(String appId, String namespaceName); AppNamespace findByAppIdAndName(String appId, String namespaceName);
List<AppNamespace> findByAppIdAndNameIn(String appId, Set<String> namespaceNames);
AppNamespace findByNameAndIsPublicTrue(String namespaceName); AppNamespace findByNameAndIsPublicTrue(String namespaceName);
List<AppNamespace> findByNameInAndIsPublicTrue(Set<String> namespaceNames);
List<AppNamespace> findByAppIdAndIsPublic(String appId, boolean isPublic); List<AppNamespace> findByAppIdAndIsPublic(String appId, boolean isPublic);
} }
...@@ -2,7 +2,9 @@ package com.ctrip.framework.apollo.biz.repository; ...@@ -2,7 +2,9 @@ package com.ctrip.framework.apollo.biz.repository;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage; import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.PagingAndSortingRepository; import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.data.repository.query.Param;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
...@@ -16,4 +18,7 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel ...@@ -16,4 +18,7 @@ public interface ReleaseMessageRepository extends PagingAndSortingRepository<Rel
ReleaseMessage findTopByOrderByIdDesc(); ReleaseMessage findTopByOrderByIdDesc();
ReleaseMessage findTopByMessageInOrderByIdDesc(Collection<String> messages); ReleaseMessage findTopByMessageInOrderByIdDesc(Collection<String> messages);
@Query("select message, max(id) as id from ReleaseMessage where message in :messages group by message")
List<Object[]> findLatestReleaseMessagesGroupByMessages(@Param("messages") Collection<String> messages);
} }
package com.ctrip.framework.apollo.biz.service; package com.ctrip.framework.apollo.biz.service;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.List; import com.ctrip.framework.apollo.biz.entity.Audit;
import java.util.Objects;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.ctrip.framework.apollo.biz.entity.Cluster; import com.ctrip.framework.apollo.biz.entity.Cluster;
import com.ctrip.framework.apollo.biz.entity.Namespace; import com.ctrip.framework.apollo.biz.entity.Namespace;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.biz.entity.Audit;
import com.ctrip.framework.apollo.biz.repository.AppNamespaceRepository; import com.ctrip.framework.apollo.biz.repository.AppNamespaceRepository;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.common.utils.BeanUtils; import com.ctrip.framework.apollo.common.utils.BeanUtils;
import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat; import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.core.exception.ServiceException; import com.ctrip.framework.apollo.core.exception.ServiceException;
import com.ctrip.framework.apollo.core.utils.StringUtils; import com.ctrip.framework.apollo.core.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@Service @Service
public class AppNamespaceService { public class AppNamespaceService {
...@@ -45,6 +48,14 @@ public class AppNamespaceService { ...@@ -45,6 +48,14 @@ public class AppNamespaceService {
return appNamespaceRepository.findByNameAndIsPublicTrue(namespaceName); return appNamespaceRepository.findByNameAndIsPublicTrue(namespaceName);
} }
public List<AppNamespace> findPublicNamespacesByNames(Set<String> namespaceNames) {
if (namespaceNames == null || namespaceNames.isEmpty()) {
return Collections.EMPTY_LIST;
}
return appNamespaceRepository.findByNameInAndIsPublicTrue(namespaceNames);
}
public List<AppNamespace> findPrivateAppNamespace(String appId){ public List<AppNamespace> findPrivateAppNamespace(String appId){
return appNamespaceRepository.findByAppIdAndIsPublic(appId, false); return appNamespaceRepository.findByAppIdAndIsPublic(appId, false);
} }
...@@ -54,6 +65,14 @@ public class AppNamespaceService { ...@@ -54,6 +65,14 @@ public class AppNamespaceService {
return appNamespaceRepository.findByAppIdAndName(appId, namespaceName); return appNamespaceRepository.findByAppIdAndName(appId, namespaceName);
} }
public List<AppNamespace> findByAppIdAndNamespaces(String appId, Set<String> namespaceNames) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(appId), "appId must not be null");
if (namespaceNames == null || namespaceNames.isEmpty()) {
return Collections.EMPTY_LIST;
}
return appNamespaceRepository.findByAppIdAndNameIn(appId, namespaceNames);
}
@Transactional @Transactional
public void createDefaultAppNamespace(String appId, String createBy) { public void createDefaultAppNamespace(String appId, String createBy) {
if (!isAppNamespaceNameUnique(appId, ConfigConsts.NAMESPACE_APPLICATION)) { if (!isAppNamespaceNameUnique(appId, ConfigConsts.NAMESPACE_APPLICATION)) {
......
package com.ctrip.framework.apollo.biz.service; package com.ctrip.framework.apollo.biz.service;
import com.google.common.collect.Lists;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage; import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository; import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.dianping.cat.Cat;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map;
/** /**
* @author Jason Song(song_s@ctrip.com) * @author Jason Song(song_s@ctrip.com)
...@@ -19,4 +24,20 @@ public class ReleaseMessageService { ...@@ -19,4 +24,20 @@ public class ReleaseMessageService {
public ReleaseMessage findLatestReleaseMessageForMessages(Collection<String> messages) { public ReleaseMessage findLatestReleaseMessageForMessages(Collection<String> messages) {
return releaseMessageRepository.findTopByMessageInOrderByIdDesc(messages); return releaseMessageRepository.findTopByMessageInOrderByIdDesc(messages);
} }
public List<ReleaseMessage> findLatestReleaseMessagesGroupByMessages(Collection<String> messages) {
List<Object[]> result =
releaseMessageRepository.findLatestReleaseMessagesGroupByMessages(messages);
List<ReleaseMessage> releaseMessages = Lists.newArrayList();
for (Object[] o : result) {
try {
ReleaseMessage releaseMessage = new ReleaseMessage((String) o[0]);
releaseMessage.setId((Long) o[1]);
releaseMessages.add(releaseMessage);
} catch (Exception ex) {
Cat.logError("Parsing LatestReleaseMessagesGroupByMessages failed", ex);
}
}
return releaseMessages;
}
} }
...@@ -64,6 +64,11 @@ ...@@ -64,6 +64,11 @@
<artifactId>jetty-server</artifactId> <artifactId>jetty-server</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>test</scope>
</dependency>
<!-- end of test --> <!-- end of test -->
<!-- dal-jdbc --> <!-- dal-jdbc -->
<dependency> <dependency>
......
...@@ -2,6 +2,7 @@ package com.ctrip.framework.apollo.build; ...@@ -2,6 +2,7 @@ package com.ctrip.framework.apollo.build;
import com.ctrip.framework.apollo.internals.ConfigServiceLocator; import com.ctrip.framework.apollo.internals.ConfigServiceLocator;
import com.ctrip.framework.apollo.internals.DefaultConfigManager; import com.ctrip.framework.apollo.internals.DefaultConfigManager;
import com.ctrip.framework.apollo.internals.RemoteConfigLongPollService;
import com.ctrip.framework.apollo.spi.DefaultConfigFactory; import com.ctrip.framework.apollo.spi.DefaultConfigFactory;
import com.ctrip.framework.apollo.spi.DefaultConfigFactoryManager; import com.ctrip.framework.apollo.spi.DefaultConfigFactoryManager;
import com.ctrip.framework.apollo.spi.DefaultConfigRegistry; import com.ctrip.framework.apollo.spi.DefaultConfigRegistry;
...@@ -33,6 +34,7 @@ public class ComponentConfigurator extends AbstractResourceConfigurator { ...@@ -33,6 +34,7 @@ public class ComponentConfigurator extends AbstractResourceConfigurator {
all.add(A(ConfigUtil.class)); all.add(A(ConfigUtil.class));
all.add(A(HttpUtil.class)); all.add(A(HttpUtil.class));
all.add(A(ConfigServiceLocator.class)); all.add(A(ConfigServiceLocator.class));
all.add(A(RemoteConfigLongPollService.class));
return all; return all;
} }
......
package com.ctrip.framework.apollo.internals;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.escape.Escaper;
import com.google.common.net.UrlEscapers;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.dto.ServiceDTO;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.core.schedule.ExponentialSchedulePolicy;
import com.ctrip.framework.apollo.core.schedule.SchedulePolicy;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.exceptions.ApolloConfigException;
import com.ctrip.framework.apollo.util.ConfigUtil;
import com.ctrip.framework.apollo.util.ExceptionUtil;
import com.ctrip.framework.apollo.util.http.HttpRequest;
import com.ctrip.framework.apollo.util.http.HttpResponse;
import com.ctrip.framework.apollo.util.http.HttpUtil;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Named(type = RemoteConfigLongPollService.class)
public class RemoteConfigLongPollService implements Initializable {
private static final Logger logger = LoggerFactory.getLogger(RemoteConfigLongPollService.class);
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();
private static final long INIT_NOTIFICATION_ID = -1;
private final ExecutorService m_longPollingService;
private final AtomicBoolean m_longPollingStopped;
private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
private RateLimiter m_longPollRateLimiter;
private final AtomicBoolean m_longPollStarted;
private final Multimap<String, RemoteConfigRepository> m_longPollNamespaces;
private final Map<String, Long> m_notifications;
private Type m_responseType;
private Gson gson;
@Inject
private ConfigUtil m_configUtil;
@Inject
private HttpUtil m_httpUtil;
@Inject
private ConfigServiceLocator m_serviceLocator;
/**
* Constructor.
*/
public RemoteConfigLongPollService() {
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigLongPollService", true));
m_longPollStarted = new AtomicBoolean(false);
m_longPollNamespaces =
Multimaps.synchronizedSetMultimap(HashMultimap.<String, RemoteConfigRepository>create());
m_notifications = Maps.newConcurrentMap();
m_responseType = new TypeToken<List<ApolloConfigNotification>>() {
}.getType();
gson = new Gson();
}
@Override
public void initialize() throws InitializationException {
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
}
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
startLongPolling();
}
return added;
}
private void startLongPolling() {
if (!m_longPollStarted.compareAndSet(false, true)) {
//already started
return;
}
try {
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
doLongPollingRefresh(appId, cluster, dataCenter);
}
});
} catch (Throwable ex) {
m_longPollStarted.set(false);
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Cat.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}
void stopLongPollingRefresh() {
this.m_longPollingStopped.compareAndSet(false, true);
}
private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
}
String url =
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,
m_notifications);
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
//longer timeout for read - 10 minutes
request.setReadTimeout(600000);
transaction.addData("Url", url);
final HttpResponse<List<ApolloConfigNotification>> response =
m_httpUtil.doGet(request, m_responseType);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200 && response.getBody() != null) {
updateNotifications(response.getBody());
transaction.addData("Result", response.getBody().toString());
notify(lastServiceDto, response.getBody());
}
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Cat.logError(ex);
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, reason: {}",
sleepTimeInSecond, appId, cluster, assembleNamespaces(),
ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
private void notify(ServiceDTO lastServiceDto, List<ApolloConfigNotification> notifications) {
if (notifications == null || notifications.isEmpty()) {
return;
}
for (ApolloConfigNotification notification : notifications) {
String namespaceName = notification.getNamespaceName();
//create a new list to avoid ConcurrentModificationException
List<RemoteConfigRepository> toBeNotified =
Lists.newArrayList(m_longPollNamespaces.get(namespaceName));
//since .properties are filtered out by default, so we need to check if there is any listener for it
toBeNotified.addAll(m_longPollNamespaces
.get(String.format("%s.%s", namespaceName, ConfigFileFormat.Properties.getValue())));
for (RemoteConfigRepository remoteConfigRepository : toBeNotified) {
try {
remoteConfigRepository.onLongPollNotified(lastServiceDto);
} catch (Throwable ex) {
Cat.logError(ex);
}
}
}
}
private void updateNotifications(List<ApolloConfigNotification> deltaNotifications) {
for (ApolloConfigNotification notification : deltaNotifications) {
if (Strings.isNullOrEmpty(notification.getNamespaceName())) {
continue;
}
m_notifications.put(notification.getNamespaceName(), notification.getNotificationId());
}
}
private String assembleNamespaces() {
return STRING_JOINER.join(m_longPollNamespaces.keySet());
}
String assembleLongPollRefreshUrl(String uri, String appId, String cluster, String dataCenter,
Map<String, Long> notificationsMap) {
Map<String, String> queryParams = Maps.newHashMap();
queryParams.put("appId", queryParamEscaper.escape(appId));
queryParams.put("cluster", queryParamEscaper.escape(cluster));
queryParams
.put("notifications", queryParamEscaper.escape(assembleNotifications(notificationsMap)));
if (!Strings.isNullOrEmpty(dataCenter)) {
queryParams.put("dataCenter", queryParamEscaper.escape(dataCenter));
}
String localIp = m_configUtil.getLocalIp();
if (!Strings.isNullOrEmpty(localIp)) {
queryParams.put("ip", queryParamEscaper.escape(localIp));
}
String params = MAP_JOINER.join(queryParams);
if (!uri.endsWith("/")) {
uri += "/";
}
return uri + "notifications/v2?" + params;
}
String assembleNotifications(Map<String, Long> notificationsMap) {
List<ApolloConfigNotification> notifications = Lists.newArrayList();
for (Map.Entry<String, Long> entry : notificationsMap.entrySet()) {
ApolloConfigNotification notification = new ApolloConfigNotification();
notification.setNamespaceName(entry.getKey());
notification.setNotificationId(entry.getValue());
notifications.add(notification);
}
return gson.toJson(notifications);
}
private List<ServiceDTO> getConfigServices() {
List<ServiceDTO> services = m_serviceLocator.getConfigServices();
if (services.size() == 0) {
throw new ApolloConfigException("No available config service");
}
return services;
}
}
...@@ -11,14 +11,10 @@ import com.google.common.util.concurrent.RateLimiter; ...@@ -11,14 +11,10 @@ import com.google.common.util.concurrent.RateLimiter;
import com.ctrip.framework.apollo.Apollo; import com.ctrip.framework.apollo.Apollo;
import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfig; import com.ctrip.framework.apollo.core.dto.ApolloConfig;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.dto.ServiceDTO; import com.ctrip.framework.apollo.core.dto.ServiceDTO;
import com.ctrip.framework.apollo.core.schedule.ExponentialSchedulePolicy;
import com.ctrip.framework.apollo.core.schedule.SchedulePolicy;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory; import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.exceptions.ApolloConfigException; import com.ctrip.framework.apollo.exceptions.ApolloConfigException;
import com.ctrip.framework.apollo.util.ConfigUtil; import com.ctrip.framework.apollo.util.ConfigUtil;
import com.ctrip.framework.apollo.util.ExceptionUtil;
import com.ctrip.framework.apollo.util.http.HttpRequest; import com.ctrip.framework.apollo.util.http.HttpRequest;
import com.ctrip.framework.apollo.util.http.HttpResponse; import com.ctrip.framework.apollo.util.http.HttpResponse;
import com.ctrip.framework.apollo.util.http.HttpUtil; import com.ctrip.framework.apollo.util.http.HttpUtil;
...@@ -36,12 +32,9 @@ import java.util.Collections; ...@@ -36,12 +32,9 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
...@@ -55,15 +48,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -55,15 +48,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private final ConfigServiceLocator m_serviceLocator; private final ConfigServiceLocator m_serviceLocator;
private final HttpUtil m_httpUtil; private final HttpUtil m_httpUtil;
private final ConfigUtil m_configUtil; private final ConfigUtil m_configUtil;
private final RemoteConfigLongPollService remoteConfigLongPollService;
private volatile AtomicReference<ApolloConfig> m_configCache; private volatile AtomicReference<ApolloConfig> m_configCache;
private final String m_namespace; private final String m_namespace;
private final static ScheduledExecutorService m_executorService; private final static ScheduledExecutorService m_executorService;
private final ExecutorService m_longPollingService;
private final AtomicBoolean m_longPollingStopped;
private SchedulePolicy m_longPollFailSchedulePolicyInSecond;
private AtomicReference<ServiceDTO> m_longPollServiceDto; private AtomicReference<ServiceDTO> m_longPollServiceDto;
private AtomicReference<ApolloConfigNotification> m_longPollResult;
private RateLimiter m_longPollRateLimiter;
private RateLimiter m_loadConfigRateLimiter; private RateLimiter m_loadConfigRateLimiter;
private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper(); private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper();
private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper(); private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();
...@@ -86,17 +75,12 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -86,17 +75,12 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
m_configUtil = m_container.lookup(ConfigUtil.class); m_configUtil = m_container.lookup(ConfigUtil.class);
m_httpUtil = m_container.lookup(HttpUtil.class); m_httpUtil = m_container.lookup(HttpUtil.class);
m_serviceLocator = m_container.lookup(ConfigServiceLocator.class); m_serviceLocator = m_container.lookup(ConfigServiceLocator.class);
remoteConfigLongPollService = m_container.lookup(RemoteConfigLongPollService.class);
} catch (ComponentLookupException ex) { } catch (ComponentLookupException ex) {
Cat.logError(ex); Cat.logError(ex);
throw new ApolloConfigException("Unable to load component!", ex); throw new ApolloConfigException("Unable to load component!", ex);
} }
m_longPollFailSchedulePolicyInSecond = new ExponentialSchedulePolicy(1, 120); //in second
m_longPollingStopped = new AtomicBoolean(false);
m_longPollingService = Executors.newSingleThreadExecutor(
ApolloThreadFactory.create("RemoteConfigRepository-LongPolling", true));
m_longPollServiceDto = new AtomicReference<>(); m_longPollServiceDto = new AtomicReference<>();
m_longPollResult = new AtomicReference<>();
m_longPollRateLimiter = RateLimiter.create(m_configUtil.getLongPollQPS());
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS()); m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
this.trySync(); this.trySync();
this.schedulePeriodicRefresh(); this.schedulePeriodicRefresh();
...@@ -119,7 +103,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -119,7 +103,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
private void schedulePeriodicRefresh() { private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}", logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit()); m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
this.m_executorService.scheduleAtFixedRate( m_executorService.scheduleAtFixedRate(
new Runnable() { new Runnable() {
@Override @Override
public void run() { public void run() {
...@@ -271,63 +255,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -271,63 +255,11 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
} }
private void scheduleLongPollingRefresh() { private void scheduleLongPollingRefresh() {
try { remoteConfigLongPollService.submit(m_namespace, this);
final String appId = m_configUtil.getAppId();
final String cluster = m_configUtil.getCluster();
final String dataCenter = m_configUtil.getDataCenter();
m_longPollingService.submit(new Runnable() {
@Override
public void run() {
doLongPollingRefresh(appId, cluster, dataCenter);
}
});
} catch (Throwable ex) {
ApolloConfigException exception =
new ApolloConfigException("Schedule long polling refresh failed", ex);
Cat.logError(exception);
logger.warn(ExceptionUtil.getDetailMessage(exception));
}
}
private void doLongPollingRefresh(String appId, String cluster, String dataCenter) {
final Random random = new Random();
ServiceDTO lastServiceDto = null;
while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
//wait at most 5 seconds
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
}
}
Transaction transaction = Cat.newTransaction("Apollo.ConfigService", "pollNotification");
try {
if (lastServiceDto == null) {
List<ServiceDTO> configServices = getConfigServices();
lastServiceDto = configServices.get(random.nextInt(configServices.size()));
} }
String url = public void onLongPollNotified(ServiceDTO longPollNotifiedServiceDto) {
assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, m_longPollServiceDto.set(longPollNotifiedServiceDto);
m_namespace, dataCenter, m_longPollResult.get());
logger.debug("Long polling from {}", url);
HttpRequest request = new HttpRequest(url);
//longer timeout for read - 1 minute
request.setReadTimeout(60000);
transaction.addData("Url", url);
HttpResponse<ApolloConfigNotification> response =
m_httpUtil.doGet(request, ApolloConfigNotification.class);
logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
if (response.getStatusCode() == 200) {
m_longPollServiceDto.set(lastServiceDto);
if (response.getBody() != null) {
m_longPollResult.set(response.getBody());
transaction.addData("Result", response.getBody().toString());
}
m_executorService.submit(new Runnable() { m_executorService.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
...@@ -336,63 +268,6 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -336,63 +268,6 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
}); });
} }
m_longPollFailSchedulePolicyInSecond.success();
transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) {
lastServiceDto = null;
Cat.logError(ex);
transaction.setStatus(ex);
long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
logger.warn(
"Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespace: {}, reason: {}",
sleepTimeInSecond, appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
try {
TimeUnit.SECONDS.sleep(sleepTimeInSecond);
} catch (InterruptedException ie) {
//ignore
}
} finally {
transaction.complete();
}
}
}
String assembleLongPollRefreshUrl(String uri, String appId, String cluster,
String namespace, String dataCenter,
ApolloConfigNotification previousResult) {
Map<String, String> queryParams = Maps.newHashMap();
queryParams.put("appId", queryParamEscaper.escape(appId));
queryParams.put("cluster", queryParamEscaper.escape(cluster));
if (!Strings.isNullOrEmpty(namespace)) {
queryParams.put("namespace", queryParamEscaper.escape(namespace));
}
if (!Strings.isNullOrEmpty(dataCenter)) {
queryParams.put("dataCenter", queryParamEscaper.escape(dataCenter));
}
String localIp = m_configUtil.getLocalIp();
if (!Strings.isNullOrEmpty(localIp)) {
queryParams.put("ip", queryParamEscaper.escape(localIp));
}
if (previousResult != null) {
//number doesn't need encode
queryParams.put("notificationId", String.valueOf(previousResult.getNotificationId()));
}
String params = MAP_JOINER.join(queryParams);
if (!uri.endsWith("/")) {
uri += "/";
}
return uri + "notifications?" + params;
}
void stopLongPollingRefresh() {
this.m_longPollingStopped.compareAndSet(false, true);
}
private List<ServiceDTO> getConfigServices() { private List<ServiceDTO> getConfigServices() {
List<ServiceDTO> services = m_serviceLocator.getConfigServices(); List<ServiceDTO> services = m_serviceLocator.getConfigServices();
if (services.size() == 0) { if (services.size() == 0) {
......
...@@ -96,7 +96,7 @@ public class ConfigUtil { ...@@ -96,7 +96,7 @@ public class ConfigUtil {
} }
public String getLocalIp() { public String getLocalIp() {
return Networks.forIp().getLocalHostAddress(); return Foundation.net().getHostAddress();
} }
public String getMetaServerDomainName() { public String getMetaServerDomainName() {
......
...@@ -30,6 +30,15 @@ ...@@ -30,6 +30,15 @@
<role>com.ctrip.framework.apollo.util.ConfigUtil</role> <role>com.ctrip.framework.apollo.util.ConfigUtil</role>
<implementation>com.ctrip.framework.apollo.util.ConfigUtil</implementation> <implementation>com.ctrip.framework.apollo.util.ConfigUtil</implementation>
</component> </component>
<component>
<role>com.ctrip.framework.apollo.util.http.HttpUtil</role>
<implementation>com.ctrip.framework.apollo.util.http.HttpUtil</implementation>
<requirements>
<requirement>
<role>com.ctrip.framework.apollo.util.ConfigUtil</role>
</requirement>
</requirements>
</component>
<component> <component>
<role>com.ctrip.framework.apollo.internals.ConfigServiceLocator</role> <role>com.ctrip.framework.apollo.internals.ConfigServiceLocator</role>
<implementation>com.ctrip.framework.apollo.internals.ConfigServiceLocator</implementation> <implementation>com.ctrip.framework.apollo.internals.ConfigServiceLocator</implementation>
...@@ -43,12 +52,18 @@ ...@@ -43,12 +52,18 @@
</requirements> </requirements>
</component> </component>
<component> <component>
<role>com.ctrip.framework.apollo.util.http.HttpUtil</role> <role>com.ctrip.framework.apollo.internals.RemoteConfigLongPollService</role>
<implementation>com.ctrip.framework.apollo.util.http.HttpUtil</implementation> <implementation>com.ctrip.framework.apollo.internals.RemoteConfigLongPollService</implementation>
<requirements> <requirements>
<requirement> <requirement>
<role>com.ctrip.framework.apollo.util.ConfigUtil</role> <role>com.ctrip.framework.apollo.util.ConfigUtil</role>
</requirement> </requirement>
<requirement>
<role>com.ctrip.framework.apollo.util.http.HttpUtil</role>
</requirement>
<requirement>
<role>com.ctrip.framework.apollo.internals.ConfigServiceLocator</role>
</requirement>
</requirements> </requirements>
</component> </component>
</components> </components>
......
...@@ -6,6 +6,7 @@ import com.ctrip.framework.apollo.internals.DefaultConfigManagerTest; ...@@ -6,6 +6,7 @@ import com.ctrip.framework.apollo.internals.DefaultConfigManagerTest;
import com.ctrip.framework.apollo.internals.DefaultConfigTest; import com.ctrip.framework.apollo.internals.DefaultConfigTest;
import com.ctrip.framework.apollo.internals.LocalFileConfigRepositoryTest; import com.ctrip.framework.apollo.internals.LocalFileConfigRepositoryTest;
import com.ctrip.framework.apollo.internals.PropertiesConfigFileTest; import com.ctrip.framework.apollo.internals.PropertiesConfigFileTest;
import com.ctrip.framework.apollo.internals.RemoteConfigLongPollServiceTest;
import com.ctrip.framework.apollo.internals.RemoteConfigRepositoryTest; import com.ctrip.framework.apollo.internals.RemoteConfigRepositoryTest;
import com.ctrip.framework.apollo.internals.SimpleConfigTest; import com.ctrip.framework.apollo.internals.SimpleConfigTest;
import com.ctrip.framework.apollo.internals.XmlConfigFileTest; import com.ctrip.framework.apollo.internals.XmlConfigFileTest;
...@@ -23,7 +24,8 @@ import org.junit.runners.Suite.SuiteClasses; ...@@ -23,7 +24,8 @@ import org.junit.runners.Suite.SuiteClasses;
ConfigServiceTest.class, DefaultConfigRegistryTest.class, DefaultConfigFactoryManagerTest.class, ConfigServiceTest.class, DefaultConfigRegistryTest.class, DefaultConfigFactoryManagerTest.class,
DefaultConfigManagerTest.class, DefaultConfigTest.class, LocalFileConfigRepositoryTest.class, DefaultConfigManagerTest.class, DefaultConfigTest.class, LocalFileConfigRepositoryTest.class,
RemoteConfigRepositoryTest.class, SimpleConfigTest.class, DefaultConfigFactoryTest.class, RemoteConfigRepositoryTest.class, SimpleConfigTest.class, DefaultConfigFactoryTest.class,
ConfigIntegrationTest.class, ExceptionUtilTest.class, XmlConfigFileTest.class, PropertiesConfigFileTest.class ConfigIntegrationTest.class, ExceptionUtilTest.class, XmlConfigFileTest.class,
PropertiesConfigFileTest.class, RemoteConfigLongPollServiceTest.class
}) })
public class AllTests { public class AllTests {
......
...@@ -49,12 +49,14 @@ public class ConfigIntegrationTest extends BaseIntegrationTest { ...@@ -49,12 +49,14 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
private String someReleaseKey; private String someReleaseKey;
private File configDir; private File configDir;
private String defaultNamespace; private String defaultNamespace;
private String someOtherNamespace;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
super.setUp(); super.setUp();
defaultNamespace = ConfigConsts.NAMESPACE_APPLICATION; defaultNamespace = ConfigConsts.NAMESPACE_APPLICATION;
someOtherNamespace = "someOtherNamespace";
someReleaseKey = "1"; someReleaseKey = "1";
configDir = new File(ClassLoaderUtil.getClassPath() + "config-cache"); configDir = new File(ClassLoaderUtil.getClassPath() + "config-cache");
if (configDir.exists()) { if (configDir.exists()) {
...@@ -246,7 +248,9 @@ public class ConfigIntegrationTest extends BaseIntegrationTest { ...@@ -246,7 +248,9 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig); ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler = ContextHandler pollHandler =
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK, mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId), false); Lists.newArrayList(
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId)),
false);
startServerWithHandlers(configHandler, pollHandler); startServerWithHandlers(configHandler, pollHandler);
...@@ -267,14 +271,109 @@ public class ConfigIntegrationTest extends BaseIntegrationTest { ...@@ -267,14 +271,109 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
longPollFinished.get(pollTimeoutInMS * 20, TimeUnit.MILLISECONDS); longPollFinished.get(pollTimeoutInMS * 20, TimeUnit.MILLISECONDS);
assertEquals(anotherValue, config.getProperty(someKey, null)); assertEquals(anotherValue, config.getProperty(someKey, null));
}
@Test
public void testLongPollRefreshWithMultipleNamespacesAndOnlyOneNamespaceNotified() throws Exception {
final String someKey = "someKey";
final String someValue = "someValue";
final String anotherValue = "anotherValue";
long someNotificationId = 1;
long pollTimeoutInMS = 50;
Map<String, String> configurations = Maps.newHashMap();
configurations.put(someKey, someValue);
ApolloConfig apolloConfig = assembleApolloConfig(configurations);
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler =
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
Lists.newArrayList(
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId)),
false);
startServerWithHandlers(configHandler, pollHandler);
Config someOtherConfig = ConfigService.getConfig(someOtherNamespace);
Config config = ConfigService.getAppConfig();
assertEquals(someValue, config.getProperty(someKey, null));
assertEquals(someValue, someOtherConfig.getProperty(someKey, null));
final SettableFuture<Boolean> longPollFinished = SettableFuture.create();
config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
longPollFinished.set(true);
}
});
apolloConfig.getConfigurations().put(someKey, anotherValue);
longPollFinished.get(pollTimeoutInMS * 50, TimeUnit.MILLISECONDS);
assertEquals(anotherValue, config.getProperty(someKey, null));
TimeUnit.MILLISECONDS.sleep(pollTimeoutInMS * 10);
assertEquals(someValue, someOtherConfig.getProperty(someKey, null));
}
@Test
public void testLongPollRefreshWithMultipleNamespacesAndMultipleNamespaceNotified() throws Exception {
final String someKey = "someKey";
final String someValue = "someValue";
final String anotherValue = "anotherValue";
long someNotificationId = 1;
long pollTimeoutInMS = 50;
Map<String, String> configurations = Maps.newHashMap();
configurations.put(someKey, someValue);
ApolloConfig apolloConfig = assembleApolloConfig(configurations);
ContextHandler configHandler = mockConfigServerHandler(HttpServletResponse.SC_OK, apolloConfig);
ContextHandler pollHandler =
mockPollNotificationHandler(pollTimeoutInMS, HttpServletResponse.SC_OK,
Lists.newArrayList(
new ApolloConfigNotification(apolloConfig.getNamespaceName(), someNotificationId),
new ApolloConfigNotification(someOtherNamespace, someNotificationId)),
false);
startServerWithHandlers(configHandler, pollHandler);
Config config = ConfigService.getAppConfig();
Config someOtherConfig = ConfigService.getConfig(someOtherNamespace);
assertEquals(someValue, config.getProperty(someKey, null));
assertEquals(someValue, someOtherConfig.getProperty(someKey, null));
final SettableFuture<Boolean> longPollFinished = SettableFuture.create();
final SettableFuture<Boolean> someOtherNamespacelongPollFinished = SettableFuture.create();
config.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
longPollFinished.set(true);
}
});
someOtherConfig.addChangeListener(new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
someOtherNamespacelongPollFinished.set(true);
}
});
apolloConfig.getConfigurations().put(someKey, anotherValue);
longPollFinished.get(pollTimeoutInMS * 20, TimeUnit.MILLISECONDS);
someOtherNamespacelongPollFinished.get(pollTimeoutInMS * 20, TimeUnit.MILLISECONDS);
assertEquals(anotherValue, config.getProperty(someKey, null));
assertEquals(anotherValue, someOtherConfig.getProperty(someKey, null));
} }
private ContextHandler mockPollNotificationHandler(final long pollResultTimeOutInMS, private ContextHandler mockPollNotificationHandler(final long pollResultTimeOutInMS,
final int statusCode, final int statusCode,
final ApolloConfigNotification result, final List<ApolloConfigNotification> result,
final boolean failedAtFirstTime) { final boolean failedAtFirstTime) {
ContextHandler context = new ContextHandler("/notifications"); ContextHandler context = new ContextHandler("/notifications/v2");
context.setHandler(new AbstractHandler() { context.setHandler(new AbstractHandler() {
AtomicInteger counter = new AtomicInteger(0); AtomicInteger counter = new AtomicInteger(0);
......
package com.ctrip.framework.apollo.internals;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.dto.ServiceDTO;
import com.ctrip.framework.apollo.util.ConfigUtil;
import com.ctrip.framework.apollo.util.http.HttpRequest;
import com.ctrip.framework.apollo.util.http.HttpResponse;
import com.ctrip.framework.apollo.util.http.HttpUtil;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.springframework.test.util.ReflectionTestUtils;
import org.unidal.lookup.ComponentTestCase;
import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class RemoteConfigLongPollServiceTest extends ComponentTestCase {
private RemoteConfigLongPollService remoteConfigLongPollService;
@Mock
private HttpResponse<List<ApolloConfigNotification>> pollResponse;
@Mock
private HttpUtil httpUtil;
private Type responseType;
private static String someServerUrl;
private static String someAppId;
private static String someCluster;
@Before
public void setUp() throws Exception {
super.setUp();
defineComponent(ConfigUtil.class, MockConfigUtil.class);
defineComponent(ConfigServiceLocator.class, MockConfigServiceLocator.class);
remoteConfigLongPollService = lookup(RemoteConfigLongPollService.class);
ReflectionTestUtils.setField(remoteConfigLongPollService, "m_httpUtil", httpUtil);
responseType =
(Type) ReflectionTestUtils.getField(remoteConfigLongPollService, "m_responseType");
someServerUrl = "http://someServer";
someAppId = "someAppId";
someCluster = "someCluster";
}
@Test
public void testSubmitLongPollNamespaceWith304Response() throws Exception {
RemoteConfigRepository someRepository = mock(RemoteConfigRepository.class);
final String someNamespace = "someNamespace";
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_NOT_MODIFIED);
final SettableFuture<Boolean> longPollFinished = SettableFuture.create();
doAnswer(new Answer<HttpResponse<List<ApolloConfigNotification>>>() {
@Override
public HttpResponse<List<ApolloConfigNotification>> answer(InvocationOnMock invocation)
throws Throwable {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
}
HttpRequest request = invocation.getArgumentAt(0, HttpRequest.class);
assertTrue(request.getUrl().contains(someServerUrl + "/notifications/v2?"));
assertTrue(request.getUrl().contains("appId=" + someAppId));
assertTrue(request.getUrl().contains("cluster=" + someCluster));
assertTrue(request.getUrl().contains("notifications="));
assertTrue(request.getUrl().contains(someNamespace));
longPollFinished.set(true);
return pollResponse;
}
}).when(httpUtil).doGet(any(HttpRequest.class), eq(responseType));
remoteConfigLongPollService.submit(someNamespace, someRepository);
longPollFinished.get(5000, TimeUnit.MILLISECONDS);
remoteConfigLongPollService.stopLongPollingRefresh();
verify(someRepository, never()).onLongPollNotified(any(ServiceDTO.class));
}
@Test
public void testSubmitLongPollNamespaceWith200Response() throws Exception {
RemoteConfigRepository someRepository = mock(RemoteConfigRepository.class);
final String someNamespace = "someNamespace";
ApolloConfigNotification someNotification = mock(ApolloConfigNotification.class);
when(someNotification.getNamespaceName()).thenReturn(someNamespace);
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK);
when(pollResponse.getBody()).thenReturn(Lists.newArrayList(someNotification));
doAnswer(new Answer<HttpResponse<List<ApolloConfigNotification>>>() {
@Override
public HttpResponse<List<ApolloConfigNotification>> answer(InvocationOnMock invocation)
throws Throwable {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
}
return pollResponse;
}
}).when(httpUtil).doGet(any(HttpRequest.class), eq(responseType));
final SettableFuture<Boolean> onNotified = SettableFuture.create();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
onNotified.set(true);
return null;
}
}).when(someRepository).onLongPollNotified(any(ServiceDTO.class));
remoteConfigLongPollService.submit(someNamespace, someRepository);
onNotified.get(5000, TimeUnit.MILLISECONDS);
remoteConfigLongPollService.stopLongPollingRefresh();
verify(someRepository, times(1)).onLongPollNotified(any(ServiceDTO.class));
}
@Test
public void testSubmitLongPollMultipleNamespaces() throws Exception {
RemoteConfigRepository someRepository = mock(RemoteConfigRepository.class);
RemoteConfigRepository anotherRepository = mock(RemoteConfigRepository.class);
final String someNamespace = "someNamespace";
final String anotherNamespace = "anotherNamespace";
final ApolloConfigNotification someNotification = mock(ApolloConfigNotification.class);
when(someNotification.getNamespaceName()).thenReturn(someNamespace);
final ApolloConfigNotification anotherNotification = mock(ApolloConfigNotification.class);
when(anotherNotification.getNamespaceName()).thenReturn(anotherNamespace);
final SettableFuture<Boolean> submitAnotherNamespaceStart = SettableFuture.create();
final SettableFuture<Boolean> submitAnotherNamespaceFinish = SettableFuture.create();
doAnswer(new Answer<HttpResponse<List<ApolloConfigNotification>>>() {
final AtomicInteger counter = new AtomicInteger();
@Override
public HttpResponse<List<ApolloConfigNotification>> answer(InvocationOnMock invocation)
throws Throwable {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
}
//the first time
if (counter.incrementAndGet() == 1) {
HttpRequest request = invocation.getArgumentAt(0, HttpRequest.class);
assertTrue(request.getUrl().contains("notifications="));
assertTrue(request.getUrl().contains(someNamespace));
submitAnotherNamespaceStart.set(true);
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK);
when(pollResponse.getBody()).thenReturn(Lists.newArrayList(someNotification));
} else if (submitAnotherNamespaceFinish.get()) {
HttpRequest request = invocation.getArgumentAt(0, HttpRequest.class);
assertTrue(request.getUrl().contains("notifications="));
assertTrue(request.getUrl().contains(someNamespace));
assertTrue(request.getUrl().contains(anotherNamespace));
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK);
when(pollResponse.getBody()).thenReturn(Lists.newArrayList(anotherNotification));
} else {
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_NOT_MODIFIED);
when(pollResponse.getBody()).thenReturn(null);
}
return pollResponse;
}
}).when(httpUtil).doGet(any(HttpRequest.class), eq(responseType));
final SettableFuture<Boolean> onAnotherRepositoryNotified = SettableFuture.create();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
onAnotherRepositoryNotified.set(true);
return null;
}
}).when(anotherRepository).onLongPollNotified(any(ServiceDTO.class));
remoteConfigLongPollService.submit(someNamespace, someRepository);
submitAnotherNamespaceStart.get(5000, TimeUnit.MILLISECONDS);
remoteConfigLongPollService.submit(anotherNamespace, anotherRepository);
submitAnotherNamespaceFinish.set(true);
onAnotherRepositoryNotified.get(500, TimeUnit.MILLISECONDS);
remoteConfigLongPollService.stopLongPollingRefresh();
verify(someRepository, times(1)).onLongPollNotified(any(ServiceDTO.class));
verify(anotherRepository, times(1)).onLongPollNotified(any(ServiceDTO.class));
}
@Test
public void testSubmitLongPollMultipleNamespacesWithMultipleNotificationsReturned() throws Exception {
RemoteConfigRepository someRepository = mock(RemoteConfigRepository.class);
RemoteConfigRepository anotherRepository = mock(RemoteConfigRepository.class);
final String someNamespace = "someNamespace";
final String anotherNamespace = "anotherNamespace";
final ApolloConfigNotification someNotification = mock(ApolloConfigNotification.class);
when(someNotification.getNamespaceName()).thenReturn(someNamespace);
final ApolloConfigNotification anotherNotification = mock(ApolloConfigNotification.class);
when(anotherNotification.getNamespaceName()).thenReturn(anotherNamespace);
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK);
when(pollResponse.getBody()).thenReturn(Lists.newArrayList(someNotification, anotherNotification));
doAnswer(new Answer<HttpResponse<List<ApolloConfigNotification>>>() {
@Override
public HttpResponse<List<ApolloConfigNotification>> answer(InvocationOnMock invocation)
throws Throwable {
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
}
return pollResponse;
}
}).when(httpUtil).doGet(any(HttpRequest.class), eq(responseType));
final SettableFuture<Boolean> someRepositoryNotified = SettableFuture.create();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
someRepositoryNotified.set(true);
return null;
}
}).when(someRepository).onLongPollNotified(any(ServiceDTO.class));
final SettableFuture<Boolean> anotherRepositoryNotified = SettableFuture.create();
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
anotherRepositoryNotified.set(true);
return null;
}
}).when(anotherRepository).onLongPollNotified(any(ServiceDTO.class));
remoteConfigLongPollService.submit(someNamespace, someRepository);
remoteConfigLongPollService.submit(anotherNamespace, anotherRepository);
someRepositoryNotified.get(5000, TimeUnit.MILLISECONDS);
anotherRepositoryNotified.get(5000, TimeUnit.MILLISECONDS);
remoteConfigLongPollService.stopLongPollingRefresh();
verify(someRepository, times(1)).onLongPollNotified(any(ServiceDTO.class));
verify(anotherRepository, times(1)).onLongPollNotified(any(ServiceDTO.class));
}
@Test
public void testAssembleLongPollRefreshUrl() throws Exception {
String someUri = someServerUrl;
String someAppId = "someAppId";
String someCluster = "someCluster+ &.-_someSign";
String someNamespace = "someName";
long someNotificationId = 1;
Map<String, Long> notificationsMap = ImmutableMap.of(someNamespace, someNotificationId);
String longPollRefreshUrl =
remoteConfigLongPollService
.assembleLongPollRefreshUrl(someUri, someAppId, someCluster, null, notificationsMap);
assertTrue(longPollRefreshUrl.contains(someServerUrl + "/notifications/v2?"));
assertTrue(longPollRefreshUrl.contains("appId=" + someAppId));
assertTrue(longPollRefreshUrl.contains("cluster=someCluster%2B+%26.-_someSign"));
assertTrue(longPollRefreshUrl.contains(
"notifications=%5B%7B%22namespaceName%22%3A%22" + someNamespace
+ "%22%2C%22notificationId%22%3A" + 1 + "%7D%5D"));
}
@Test
public void testAssembleLongPollRefreshUrlWithMultipleNamespaces() throws Exception {
String someUri = someServerUrl;
String someAppId = "someAppId";
String someCluster = "someCluster+ &.-_someSign";
String someNamespace = "someName";
String anotherNamespace = "anotherName";
long someNotificationId = 1;
long anotherNotificationId = 2;
Map<String, Long> notificationsMap =
ImmutableMap.of(someNamespace, someNotificationId, anotherNamespace, anotherNotificationId);
String longPollRefreshUrl =
remoteConfigLongPollService
.assembleLongPollRefreshUrl(someUri, someAppId, someCluster, null, notificationsMap);
assertTrue(longPollRefreshUrl.contains(someServerUrl + "/notifications/v2?"));
assertTrue(longPollRefreshUrl.contains("appId=" + someAppId));
assertTrue(longPollRefreshUrl.contains("cluster=someCluster%2B+%26.-_someSign"));
assertTrue(
longPollRefreshUrl.contains("notifications=%5B%7B%22namespaceName%22%3A%22" + someNamespace
+ "%22%2C%22notificationId%22%3A" + someNotificationId
+ "%7D%2C%7B%22namespaceName%22%3A%22" + anotherNamespace
+ "%22%2C%22notificationId%22%3A" + anotherNotificationId + "%7D%5D"));
}
public static class MockConfigUtil extends ConfigUtil {
@Override
public String getAppId() {
return someAppId;
}
@Override
public String getCluster() {
return someCluster;
}
@Override
public String getDataCenter() {
return null;
}
@Override
public int getLoadConfigQPS() {
return 200;
}
@Override
public int getLongPollQPS() {
return 200;
}
}
public static class MockConfigServiceLocator extends ConfigServiceLocator {
@Override
public List<ServiceDTO> getConfigServices() {
ServiceDTO serviceDTO = mock(ServiceDTO.class);
when(serviceDTO.getHomepageUrl()).thenReturn(someServerUrl);
return Lists.newArrayList(serviceDTO);
}
@Override
public void initialize() throws InitializationException {
//do nothing
}
}
}
...@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableMap; ...@@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.SettableFuture;
import com.ctrip.framework.apollo.core.dto.ApolloConfig; import com.ctrip.framework.apollo.core.dto.ApolloConfig;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification; import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.dto.ServiceDTO; import com.ctrip.framework.apollo.core.dto.ServiceDTO;
...@@ -24,6 +25,7 @@ import org.mockito.runners.MockitoJUnitRunner; ...@@ -24,6 +25,7 @@ import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.unidal.lookup.ComponentTestCase; import org.unidal.lookup.ComponentTestCase;
import java.lang.reflect.Type;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
...@@ -34,12 +36,12 @@ import javax.servlet.http.HttpServletResponse; ...@@ -34,12 +36,12 @@ import javax.servlet.http.HttpServletResponse;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
/** /**
* Created by Jason on 4/9/16. * Created by Jason on 4/9/16.
...@@ -52,9 +54,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -52,9 +54,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
@Mock @Mock
private static HttpResponse<ApolloConfig> someResponse; private static HttpResponse<ApolloConfig> someResponse;
@Mock @Mock
private static HttpResponse<ApolloConfigNotification> pollResponse; private static HttpResponse<List<ApolloConfigNotification>> pollResponse;
@Mock private RemoteConfigLongPollService remoteConfigLongPollService;
private ConfigUtil someConfigUtil;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
...@@ -66,6 +67,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -66,6 +67,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
defineComponent(ConfigUtil.class, MockConfigUtil.class); defineComponent(ConfigUtil.class, MockConfigUtil.class);
defineComponent(ConfigServiceLocator.class, MockConfigServiceLocator.class); defineComponent(ConfigServiceLocator.class, MockConfigServiceLocator.class);
defineComponent(HttpUtil.class, MockHttpUtil.class); defineComponent(HttpUtil.class, MockHttpUtil.class);
remoteConfigLongPollService = lookup(RemoteConfigLongPollService.class);
} }
@Test @Test
...@@ -84,7 +87,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -84,7 +87,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
Properties config = remoteConfigRepository.getConfig(); Properties config = remoteConfigRepository.getConfig();
assertEquals(configurations, config); assertEquals(configurations, config);
remoteConfigRepository.stopLongPollingRefresh(); remoteConfigLongPollService.stopLongPollingRefresh();
} }
@Test(expected = ApolloConfigException.class) @Test(expected = ApolloConfigException.class)
...@@ -95,7 +98,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -95,7 +98,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace); RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace);
//must stop the long polling before exception occurred //must stop the long polling before exception occurred
remoteConfigRepository.stopLongPollingRefresh(); remoteConfigLongPollService.stopLongPollingRefresh();
remoteConfigRepository.getConfig(); remoteConfigRepository.getConfig();
} }
...@@ -124,7 +127,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -124,7 +127,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
assertEquals(newConfigurations, captor.getValue()); assertEquals(newConfigurations, captor.getValue());
remoteConfigRepository.stopLongPollingRefresh(); remoteConfigLongPollService.stopLongPollingRefresh();
} }
@Test @Test
...@@ -137,7 +140,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -137,7 +140,7 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
final SettableFuture<Boolean> longPollFinished = SettableFuture.create(); final SettableFuture<Boolean> longPollFinished = SettableFuture.create();
RepositoryChangeListener someListener = mock(RepositoryChangeListener.class); RepositoryChangeListener someListener = mock(RepositoryChangeListener.class);
doAnswer(new Answer<Void>(){ doAnswer(new Answer<Void>() {
@Override @Override
public Void answer(InvocationOnMock invocation) throws Throwable { public Void answer(InvocationOnMock invocation) throws Throwable {
...@@ -153,37 +156,21 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -153,37 +156,21 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
Map<String, String> newConfigurations = ImmutableMap.of("someKey", "anotherValue"); Map<String, String> newConfigurations = ImmutableMap.of("someKey", "anotherValue");
ApolloConfig newApolloConfig = assembleApolloConfig(newConfigurations); ApolloConfig newApolloConfig = assembleApolloConfig(newConfigurations);
ApolloConfigNotification someNotification = mock(ApolloConfigNotification.class);
when(someNotification.getNamespaceName()).thenReturn(someNamespace);
when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK); when(pollResponse.getStatusCode()).thenReturn(HttpServletResponse.SC_OK);
when(pollResponse.getBody()).thenReturn(Lists.newArrayList(someNotification));
when(someResponse.getBody()).thenReturn(newApolloConfig); when(someResponse.getBody()).thenReturn(newApolloConfig);
longPollFinished.get(500, TimeUnit.MILLISECONDS); longPollFinished.get(500, TimeUnit.MILLISECONDS);
remoteConfigRepository.stopLongPollingRefresh(); remoteConfigLongPollService.stopLongPollingRefresh();
verify(someListener, times(1)).onRepositoryChange(eq(someNamespace), captor.capture()); verify(someListener, times(1)).onRepositoryChange(eq(someNamespace), captor.capture());
assertEquals(newConfigurations, captor.getValue()); assertEquals(newConfigurations, captor.getValue());
} }
@Test
public void testAssembleLongPollRefreshUrl() throws Exception {
String someUri = "http://someServer";
String someAppId = "someAppId";
String someCluster = "someCluster+ &.-_someSign";
RemoteConfigRepository remoteConfigRepository = new RemoteConfigRepository(someNamespace);
String longPollRefreshUrl =
remoteConfigRepository
.assembleLongPollRefreshUrl(someUri, someAppId, someCluster, someNamespace, null, null);
assertTrue(longPollRefreshUrl.contains("http://someServer/notifications?"));
assertTrue(longPollRefreshUrl.contains("appId=someAppId"));
assertTrue(longPollRefreshUrl.contains("cluster=someCluster%2B+%26.-_someSign"));
assertTrue(longPollRefreshUrl.contains("namespace=" + someNamespace));
}
@Test @Test
public void testAssembleQueryConfigUrl() throws Exception { public void testAssembleQueryConfigUrl() throws Exception {
String someUri = "http://someServer"; String someUri = "http://someServer";
...@@ -200,7 +187,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -200,7 +187,8 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
someApolloConfig); someApolloConfig);
assertTrue(queryConfigUrl assertTrue(queryConfigUrl
.contains("http://someServer/configs/someAppId/someCluster+%20&.-_someSign/" + someNamespace)); .contains(
"http://someServer/configs/someAppId/someCluster+%20&.-_someSign/" + someNamespace));
assertTrue(queryConfigUrl assertTrue(queryConfigUrl
.contains("releaseKey=20160705193346-583078ef5716c055%2B20160705193308-31c471ddf9087c3f")); .contains("releaseKey=20160705193346-583078ef5716c055%2B20160705193308-31c471ddf9087c3f"));
...@@ -265,15 +253,17 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase { ...@@ -265,15 +253,17 @@ public class RemoteConfigRepositoryTest extends ComponentTestCase {
public static class MockHttpUtil extends HttpUtil { public static class MockHttpUtil extends HttpUtil {
@Override @Override
public <T> HttpResponse<T> doGet(HttpRequest httpRequest, Class<T> responseType) { public <T> HttpResponse<T> doGet(HttpRequest httpRequest, Class<T> responseType) {
if (httpRequest.getUrl().contains("notifications?")) { return (HttpResponse<T>) someResponse;
}
@Override
public <T> HttpResponse<T> doGet(HttpRequest httpRequest, Type responseType) {
try { try {
TimeUnit.MILLISECONDS.sleep(50); TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) { } catch (InterruptedException e) {
} }
return (HttpResponse<T>) pollResponse; return (HttpResponse<T>) pollResponse;
} }
return (HttpResponse<T>) someResponse;
}
} }
} }
...@@ -3,6 +3,7 @@ package com.ctrip.framework.apollo.configservice; ...@@ -3,6 +3,7 @@ package com.ctrip.framework.apollo.configservice;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageScanner; import com.ctrip.framework.apollo.biz.message.ReleaseMessageScanner;
import com.ctrip.framework.apollo.configservice.controller.ConfigFileController; import com.ctrip.framework.apollo.configservice.controller.ConfigFileController;
import com.ctrip.framework.apollo.configservice.controller.NotificationController; import com.ctrip.framework.apollo.configservice.controller.NotificationController;
import com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
...@@ -17,12 +18,15 @@ public class ConfigServiceAutoConfiguration { ...@@ -17,12 +18,15 @@ public class ConfigServiceAutoConfiguration {
private NotificationController notificationController; private NotificationController notificationController;
@Autowired @Autowired
private ConfigFileController configFileController; private ConfigFileController configFileController;
@Autowired
private NotificationControllerV2 notificationControllerV2;
@Bean @Bean
public ReleaseMessageScanner releaseMessageScanner() { public ReleaseMessageScanner releaseMessageScanner() {
ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner(); ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
//handle server cache first //handle server cache first
releaseMessageScanner.addMessageListener(configFileController); releaseMessageScanner.addMessageListener(configFileController);
releaseMessageScanner.addMessageListener(notificationControllerV2);
releaseMessageScanner.addMessageListener(notificationController); releaseMessageScanner.addMessageListener(notificationController);
return releaseMessageScanner; return releaseMessageScanner;
} }
......
...@@ -59,6 +59,17 @@ public class NotificationController implements ReleaseMessageListener { ...@@ -59,6 +59,17 @@ public class NotificationController implements ReleaseMessageListener {
@Autowired @Autowired
private NamespaceUtil namespaceUtil; private NamespaceUtil namespaceUtil;
/**
* For single namespace notification, reserved for older version of apollo clients
*
* @param appId the appId
* @param cluster the cluster
* @param namespace the namespace name
* @param dataCenter the datacenter
* @param notificationId the notification id for the namespace
* @param clientIp the client side ip
* @return a deferred result
*/
@RequestMapping(method = RequestMethod.GET) @RequestMapping(method = RequestMethod.GET)
public DeferredResult<ResponseEntity<ApolloConfigNotification>> pollNotification( public DeferredResult<ResponseEntity<ApolloConfigNotification>> pollNotification(
@RequestParam(value = "appId") String appId, @RequestParam(value = "appId") String appId,
......
package com.ctrip.framework.apollo.configservice.controller;
import com.google.common.base.Function;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.exception.BadRequestException;
import com.dianping.cat.Cat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.util.CollectionUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RestController
@RequestMapping("/notifications/v2")
public class NotificationControllerV2 implements ReleaseMessageListener {
private static final Logger logger = LoggerFactory.getLogger(NotificationControllerV2.class);
private static final long TIMEOUT = 30 * 1000;//30 seconds
private final Multimap<String, DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>>
deferredResults = Multimaps.synchronizedSetMultimap(HashMultimap.create());
private static final ResponseEntity<List<ApolloConfigNotification>>
NOT_MODIFIED_RESPONSE_LIST = new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private static final Splitter STRING_SPLITTER =
Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).omitEmptyStrings();
private static final long NOTIFICATION_ID_PLACEHOLDER = -1;
private static final Type notificationsTypeReference =
new TypeToken<List<ApolloConfigNotification>>() {
}.getType();
@Autowired
private WatchKeysUtil watchKeysUtil;
@Autowired
private ReleaseMessageService releaseMessageService;
@Autowired
private EntityManagerUtil entityManagerUtil;
@Autowired
private NamespaceUtil namespaceUtil;
@Autowired
private Gson gson;
@RequestMapping(method = RequestMethod.GET)
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
@RequestParam(value = "cluster") String cluster,
@RequestParam(value = "notifications") String notificationsAsString,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "ip", required = false) String clientIp) {
List<ApolloConfigNotification> notifications = null;
try {
notifications =
gson.fromJson(notificationsAsString, notificationsTypeReference);
} catch (Throwable ex) {
Cat.logError(ex);
}
if (CollectionUtils.isEmpty(notifications)) {
throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);
}
Set<String> namespaces = Sets.newHashSet();
Map<String, Long> clientSideNotifications = Maps.newHashMap();
for (ApolloConfigNotification notification : notifications) {
if (Strings.isNullOrEmpty(notification.getNamespaceName())) {
continue;
}
//strip out .properties suffix
String namespace = namespaceUtil.filterNamespaceName(notification.getNamespaceName());
namespaces.add(namespace);
clientSideNotifications.put(namespace, notification.getNotificationId());
}
Multimap<String, String> watchedKeysMap =
watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> deferredResult =
new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE_LIST);
Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
List<ReleaseMessage> latestReleaseMessages =
releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();
List<ApolloConfigNotification> newNotifications =
getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,
latestReleaseMessages);
if (!CollectionUtils.isEmpty(newNotifications)) {
deferredResult.setResult(new ResponseEntity<>(newNotifications, HttpStatus.OK));
} else {
//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResult);
}
deferredResult
.onTimeout(() -> logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResult.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResult);
}
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespaces, dataCenter);
}
return deferredResult;
}
private List<ApolloConfigNotification> getApolloConfigNotifications(Set<String> namespaces,
Map<String, Long> clientSideNotifications,
Multimap<String, String> watchedKeysMap,
List<ReleaseMessage> latestReleaseMessages) {
List<ApolloConfigNotification> newNotifications = Lists.newArrayList();
if (!CollectionUtils.isEmpty(latestReleaseMessages)) {
Map<String, Long> latestNotifications = Maps.newHashMap();
for (ReleaseMessage releaseMessage : latestReleaseMessages) {
latestNotifications.put(releaseMessage.getMessage(), releaseMessage.getId());
}
for (String namespace : namespaces) {
long clientSideId = clientSideNotifications.get(namespace);
long latestId = NOTIFICATION_ID_PLACEHOLDER;
Collection<String> namespaceWatchedKeys = watchedKeysMap.get(namespace);
for (String namespaceWatchedKey : namespaceWatchedKeys) {
long namespaceNotificationId =
latestNotifications.getOrDefault(namespaceWatchedKey, NOTIFICATION_ID_PLACEHOLDER);
if (namespaceNotificationId > latestId) {
latestId = namespaceNotificationId;
}
}
if (latestId > clientSideId) {
newNotifications.add(new ApolloConfigNotification(namespace, latestId));
}
}
}
return newNotifications;
}
@Override
public void handleMessage(ReleaseMessage message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message);
String content = message.getMessage();
Cat.logEvent("Apollo.LongPoll.Messages", content);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {
return;
}
String changedNamespace = retrieveNamespaceFromReleaseMessage.apply(content);
if (Strings.isNullOrEmpty(changedNamespace)) {
logger.error("message format invalid - {}", content);
return;
}
ResponseEntity<List<ApolloConfigNotification>> notification =
new ResponseEntity<>(
Lists.newArrayList(new ApolloConfigNotification(changedNamespace, message.getId())),
HttpStatus.OK);
if (!deferredResults.containsKey(content)) {
return;
}
//create a new list to avoid ConcurrentModificationException
List<DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>> results =
Lists.newArrayList(deferredResults.get(content));
logger.debug("Notify {} clients for key {}", results.size(), content);
for (DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result : results) {
result.setResult(notification);
}
logger.debug("Notification completed");
}
private static final Function<String, String> retrieveNamespaceFromReleaseMessage =
releaseMessage -> {
if (Strings.isNullOrEmpty(releaseMessage)) {
return null;
}
List<String> keys = STRING_SPLITTER.splitToList(releaseMessage);
//message should be appId+cluster+namespace
if (keys.size() != 3) {
logger.error("message format invalid - {}", releaseMessage);
return null;
}
return keys.get(2);
};
private void logWatchedKeysToCat(Set<String> watchedKeys, String eventName) {
for (String watchedKey : watchedKeys) {
Cat.logEvent(eventName, watchedKey);
}
}
}
...@@ -2,6 +2,9 @@ package com.ctrip.framework.apollo.configservice.util; ...@@ -2,6 +2,9 @@ package com.ctrip.framework.apollo.configservice.util;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService; import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
...@@ -11,6 +14,8 @@ import com.ctrip.framework.apollo.core.ConfigConsts; ...@@ -11,6 +14,8 @@ import com.ctrip.framework.apollo.core.ConfigConsts;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
...@@ -23,32 +28,61 @@ public class WatchKeysUtil { ...@@ -23,32 +28,61 @@ public class WatchKeysUtil {
@Autowired @Autowired
private AppNamespaceService appNamespaceService; private AppNamespaceService appNamespaceService;
/**
* Assemble watch keys for the given appId, cluster, namespace, dataCenter combination
*/
public Set<String> assembleAllWatchKeys(String appId, String clusterName, String namespace, public Set<String> assembleAllWatchKeys(String appId, String clusterName, String namespace,
String dataCenter) { String dataCenter) {
Set<String> watchedKeys = assembleWatchKeys(appId, clusterName, namespace, dataCenter); Multimap<String, String> watchedKeysMap =
assembleAllWatchKeys(appId, clusterName, Sets.newHashSet(namespace), dataCenter);
return Sets.newHashSet(watchedKeysMap.get(namespace));
}
/**
* Assemble watch keys for the given appId, cluster, namespaces, dataCenter combination
* @return a multimap with namespace as the key and watch keys as the value
*/
public Multimap<String, String> assembleAllWatchKeys(String appId, String clusterName,
Set<String> namespaces,
String dataCenter) {
Multimap<String, String> watchedKeysMap =
assembleWatchKeys(appId, clusterName, namespaces, dataCenter);
//Every app has an 'application' namespace
if (!(namespaces.size() == 1 && namespaces.contains(ConfigConsts.NAMESPACE_APPLICATION))) {
Set<String> namespacesBelongToAppId = namespacesBelongToAppId(appId, namespaces);
Set<String> publicNamespaces = Sets.difference(namespaces, namespacesBelongToAppId);
//Listen on more namespaces if it's a public namespace //Listen on more namespaces if it's a public namespace
if (!namespaceBelongsToAppId(appId, namespace)) { if (!publicNamespaces.isEmpty()) {
watchedKeys.addAll(this.findPublicConfigWatchKey(appId, clusterName, namespace, dataCenter)); watchedKeysMap
.putAll(findPublicConfigWatchKeys(appId, clusterName, publicNamespaces, dataCenter));
}
} }
return watchedKeys; return watchedKeysMap;
} }
private Set<String> findPublicConfigWatchKey(String applicationId, String clusterName, private Multimap<String, String> findPublicConfigWatchKeys(String applicationId,
String namespace, String clusterName,
Set<String> namespaces,
String dataCenter) { String dataCenter) {
AppNamespace appNamespace = appNamespaceService.findPublicNamespaceByName(namespace); Multimap<String, String> watchedKeysMap = HashMultimap.create();
List<AppNamespace> appNamespaces = appNamespaceService.findPublicNamespacesByNames(namespaces);
for (AppNamespace appNamespace : appNamespaces) {
//check whether the namespace's appId equals to current one //check whether the namespace's appId equals to current one
if (Objects.isNull(appNamespace) || Objects.equals(applicationId, appNamespace.getAppId())) { if (Objects.equals(applicationId, appNamespace.getAppId())) {
return Sets.newHashSet(); continue;
} }
String publicConfigAppId = appNamespace.getAppId(); String publicConfigAppId = appNamespace.getAppId();
return assembleWatchKeys(publicConfigAppId, clusterName, namespace, dataCenter); watchedKeysMap.putAll(appNamespace.getName(),
assembleWatchKeys(publicConfigAppId, clusterName, appNamespace.getName(), dataCenter));
}
return watchedKeysMap;
} }
private String assembleKey(String appId, String cluster, String namespace) { private String assembleKey(String appId, String cluster, String namespace) {
...@@ -57,6 +91,7 @@ public class WatchKeysUtil { ...@@ -57,6 +91,7 @@ public class WatchKeysUtil {
private Set<String> assembleWatchKeys(String appId, String clusterName, String namespace, private Set<String> assembleWatchKeys(String appId, String clusterName, String namespace,
String dataCenter) { String dataCenter) {
Set<String> watchedKeys = Sets.newHashSet(); Set<String> watchedKeys = Sets.newHashSet();
//watch specified cluster config change //watch specified cluster config change
...@@ -75,14 +110,27 @@ public class WatchKeysUtil { ...@@ -75,14 +110,27 @@ public class WatchKeysUtil {
return watchedKeys; return watchedKeys;
} }
private boolean namespaceBelongsToAppId(String appId, String namespaceName) { private Multimap<String, String> assembleWatchKeys(String appId, String clusterName,
//Every app has an 'application' namespace Set<String> namespaces,
if (Objects.equals(ConfigConsts.NAMESPACE_APPLICATION, namespaceName)) { String dataCenter) {
return true; Multimap<String, String> watchedKeysMap = HashMultimap.create();
for (String namespace : namespaces) {
watchedKeysMap
.putAll(namespace, assembleWatchKeys(appId, clusterName, namespace, dataCenter));
}
return watchedKeysMap;
} }
AppNamespace appNamespace = appNamespaceService.findOne(appId, namespaceName); private Set<String> namespacesBelongToAppId(String appId, Set<String> namespaces) {
List<AppNamespace> appNamespaces =
appNamespaceService.findByAppIdAndNamespaces(appId, namespaces);
if (appNamespaces == null || appNamespaces.isEmpty()) {
return Collections.emptySet();
}
return appNamespace != null; return FluentIterable.from(appNamespaces).transform(AppNamespace::getName).toSet();
} }
} }
...@@ -3,10 +3,13 @@ package com.ctrip.framework.apollo.configservice; ...@@ -3,10 +3,13 @@ package com.ctrip.framework.apollo.configservice;
import com.ctrip.framework.apollo.configservice.controller.ConfigControllerTest; import com.ctrip.framework.apollo.configservice.controller.ConfigControllerTest;
import com.ctrip.framework.apollo.configservice.controller.ConfigFileControllerTest; import com.ctrip.framework.apollo.configservice.controller.ConfigFileControllerTest;
import com.ctrip.framework.apollo.configservice.controller.NotificationControllerTest; import com.ctrip.framework.apollo.configservice.controller.NotificationControllerTest;
import com.ctrip.framework.apollo.configservice.controller.NotificationControllerV2Test;
import com.ctrip.framework.apollo.configservice.integration.ConfigControllerIntegrationTest; import com.ctrip.framework.apollo.configservice.integration.ConfigControllerIntegrationTest;
import com.ctrip.framework.apollo.configservice.integration.ConfigFileControllerIntegrationTest; import com.ctrip.framework.apollo.configservice.integration.ConfigFileControllerIntegrationTest;
import com.ctrip.framework.apollo.configservice.integration.NotificationControllerIntegrationTest; import com.ctrip.framework.apollo.configservice.integration.NotificationControllerIntegrationTest;
import com.ctrip.framework.apollo.configservice.integration.NotificationControllerV2IntegrationTest;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtilTest; import com.ctrip.framework.apollo.configservice.util.NamespaceUtilTest;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtilTest;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.Suite; import org.junit.runners.Suite;
...@@ -16,7 +19,9 @@ import org.junit.runners.Suite.SuiteClasses; ...@@ -16,7 +19,9 @@ import org.junit.runners.Suite.SuiteClasses;
@SuiteClasses({ConfigControllerTest.class, NotificationControllerTest.class, @SuiteClasses({ConfigControllerTest.class, NotificationControllerTest.class,
ConfigControllerIntegrationTest.class, NotificationControllerIntegrationTest.class, ConfigControllerIntegrationTest.class, NotificationControllerIntegrationTest.class,
NamespaceUtilTest.class, ConfigFileControllerTest.class, NamespaceUtilTest.class, ConfigFileControllerTest.class,
ConfigFileControllerIntegrationTest.class}) ConfigFileControllerIntegrationTest.class, WatchKeysUtilTest.class,
NotificationControllerV2Test.class, NotificationControllerV2IntegrationTest.class
})
public class AllTests { public class AllTests {
} }
package com.ctrip.framework.apollo.configservice.controller; package com.ctrip.framework.apollo.configservice.controller;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage; import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.Topics; import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService; import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil; import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil; import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil; import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.ConfigConsts;
...@@ -25,12 +23,13 @@ import org.springframework.http.ResponseEntity; ...@@ -25,12 +23,13 @@ import org.springframework.http.ResponseEntity;
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import java.util.List; import java.util.Set;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyCollectionOf;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
/** /**
...@@ -48,13 +47,13 @@ public class NotificationControllerTest { ...@@ -48,13 +47,13 @@ public class NotificationControllerTest {
private long someNotificationId; private long someNotificationId;
private String someClientIp; private String someClientIp;
@Mock @Mock
private AppNamespaceService appNamespaceService;
@Mock
private ReleaseMessageService releaseMessageService; private ReleaseMessageService releaseMessageService;
@Mock @Mock
private EntityManagerUtil entityManagerUtil; private EntityManagerUtil entityManagerUtil;
@Mock @Mock
private NamespaceUtil namespaceUtil; private NamespaceUtil namespaceUtil;
@Mock
private WatchKeysUtil watchKeysUtil;
private Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>> private Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>>
deferredResults; deferredResults;
...@@ -65,11 +64,8 @@ public class NotificationControllerTest { ...@@ -65,11 +64,8 @@ public class NotificationControllerTest {
ReflectionTestUtils.setField(controller, "releaseMessageService", releaseMessageService); ReflectionTestUtils.setField(controller, "releaseMessageService", releaseMessageService);
ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil); ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil);
ReflectionTestUtils.setField(controller, "namespaceUtil", namespaceUtil); ReflectionTestUtils.setField(controller, "namespaceUtil", namespaceUtil);
WatchKeysUtil watchKeysUtil = new WatchKeysUtil();
ReflectionTestUtils.setField(watchKeysUtil, "appNamespaceService", appNamespaceService);
ReflectionTestUtils.setField(controller, "watchKeysUtil", watchKeysUtil); ReflectionTestUtils.setField(controller, "watchKeysUtil", watchKeysUtil);
someAppId = "someAppId"; someAppId = "someAppId";
someCluster = "someCluster"; someCluster = "someCluster";
defaultCluster = ConfigConsts.CLUSTER_NAME_DEFAULT; defaultCluster = ConfigConsts.CLUSTER_NAME_DEFAULT;
...@@ -89,21 +85,25 @@ public class NotificationControllerTest { ...@@ -89,21 +85,25 @@ public class NotificationControllerTest {
@Test @Test
public void testPollNotificationWithDefaultNamespace() throws Exception { public void testPollNotificationWithDefaultNamespace() throws Exception {
String someWatchKey = "someKey";
String anotherWatchKey = "anotherKey";
Set<String> watchKeys = Sets.newHashSet(someWatchKey, anotherWatchKey);
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster, defaultNamespace,
someDataCenter)).thenReturn(
watchKeys);
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter, .pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp); someNotificationId, someClientIp);
List<String> clusters = assertEquals(watchKeys.size(), deferredResults.size());
Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT);
assertEquals(clusters.size(), deferredResults.size());
for (String cluster : clusters) { for (String watchKey : watchKeys) {
String key = assertTrue(deferredResults.get(watchKey).contains(deferredResult));
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, cluster, defaultNamespace);
assertTrue(deferredResults.get(key).contains(deferredResult));
} }
} }
...@@ -112,260 +112,109 @@ public class NotificationControllerTest { ...@@ -112,260 +112,109 @@ public class NotificationControllerTest {
String namespace = String.format("%s.%s", defaultNamespace, "properties"); String namespace = String.format("%s.%s", defaultNamespace, "properties");
when(namespaceUtil.filterNamespaceName(namespace)).thenReturn(defaultNamespace); when(namespaceUtil.filterNamespaceName(namespace)).thenReturn(defaultNamespace);
DeferredResult<ResponseEntity<ApolloConfigNotification>> String someWatchKey = "someKey";
deferredResult = controller String anotherWatchKey = "anotherKey";
.pollNotification(someAppId, someCluster, namespace, someDataCenter,
someNotificationId, someClientIp);
List<String> clusters =
Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT);
assertEquals(clusters.size(), deferredResults.size());
for (String cluster : clusters) {
String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, cluster, defaultNamespace);
assertTrue(deferredResults.get(key).contains(deferredResult));
}
}
@Test Set<String> watchKeys = Sets.newHashSet(someWatchKey, anotherWatchKey);
public void testPollNotificationWithPrivateNamespaceAsFile() throws Exception {
String namespace = String.format("someNamespace.xml");
AppNamespace appNamespace = mock(AppNamespace.class);
when(namespaceUtil.filterNamespaceName(namespace)).thenReturn(namespace); when(watchKeysUtil
when(appNamespaceService.findOne(someAppId, namespace)).thenReturn(appNamespace); .assembleAllWatchKeys(someAppId, someCluster, defaultNamespace,
someDataCenter)).thenReturn(
watchKeys);
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, namespace, someDataCenter, .pollNotification(someAppId, someCluster, namespace, someDataCenter,
someNotificationId, someClientIp); someNotificationId, someClientIp);
List<String> clusters = assertEquals(watchKeys.size(), deferredResults.size());
Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT);
assertEquals(clusters.size(), deferredResults.size());
for (String cluster : clusters) { for (String watchKey : watchKeys) {
String key = assertTrue(deferredResults.get(watchKey).contains(deferredResult));
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, cluster, namespace);
assertTrue(deferredResults.get(key).contains(deferredResult));
} }
} }
@Test @Test
public void testPollNotificationWithDefaultNamespaceWithNotificationIdOutDated() throws Exception { public void testPollNotificationWithSomeNamespaceAsFile() throws Exception {
long notificationId = someNotificationId + 1; String namespace = String.format("someNamespace.xml");
ReleaseMessage someReleaseMessage = mock(ReleaseMessage.class);
when(someReleaseMessage.getId()).thenReturn(notificationId);
when(releaseMessageService.findLatestReleaseMessageForMessages(anyCollectionOf(String.class)))
.thenReturn(someReleaseMessage);
DeferredResult<ResponseEntity<ApolloConfigNotification>> when(namespaceUtil.filterNamespaceName(namespace)).thenReturn(namespace);
deferredResult = controller
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp);
ResponseEntity<ApolloConfigNotification> result = String someWatchKey = "someKey";
(ResponseEntity<ApolloConfigNotification>) deferredResult.getResult();
assertEquals(HttpStatus.OK, result.getStatusCode()); Set<String> watchKeys = Sets.newHashSet(someWatchKey);
assertEquals(defaultNamespace, result.getBody().getNamespaceName()); when(watchKeysUtil
assertEquals(notificationId, result.getBody().getNotificationId()); .assembleAllWatchKeys(someAppId, someCluster, namespace, someDataCenter))
} .thenReturn(
watchKeys);
@Test
public void testPollNotificationWithDefaultNamespaceWithDefaultClusterWithDataCenter()
throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, defaultCluster, defaultNamespace, someDataCenter, .pollNotification(someAppId, someCluster, namespace, someDataCenter,
someNotificationId, someClientIp); someNotificationId, someClientIp);
List<String> clusters = assertEquals(watchKeys.size(), deferredResults.size());
Lists.newArrayList(someDataCenter, defaultCluster);
assertEquals(clusters.size(), deferredResults.size());
for (String cluster : clusters) { for (String watchKey : watchKeys) {
String key = assertTrue(deferredResults.get(watchKey).contains(deferredResult));
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, cluster, defaultNamespace);
assertTrue(deferredResults.get(key).contains(deferredResult));
} }
} }
@Test @Test
public void testPollNotificationWithDefaultNamespaceWithDefaultClusterWithNoDataCenter() public void testPollNotificationWithDefaultNamespaceWithNotificationIdOutDated()
throws Exception { throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller
.pollNotification(someAppId, defaultCluster, defaultNamespace, null, someNotificationId, someClientIp);
List<String> clusters =
Lists.newArrayList(defaultCluster);
assertEquals(clusters.size(), deferredResults.size());
for (String cluster : clusters) {
String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, cluster, defaultNamespace);
assertTrue(deferredResults.get(key).contains(deferredResult));
}
}
@Test
public void testPollNotificationWithPublicNamespace() throws Exception {
String somePublicAppId = "somePublicAppId";
AppNamespace somePublicAppNamespace =
assmbleAppNamespace(somePublicAppId, somePublicNamespace);
when(appNamespaceService.findPublicNamespaceByName(somePublicNamespace))
.thenReturn(somePublicAppNamespace);
DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter,
someNotificationId, someClientIp);
List<String> clusters =
Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT);
assertEquals(clusters.size() * 2, deferredResults.size());
for (String cluster : clusters) {
String publicKey =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, cluster, somePublicNamespace);
assertTrue(deferredResults.get(publicKey).contains(deferredResult));
}
for (String cluster : clusters) {
String publicKey =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(somePublicAppId, cluster, somePublicNamespace);
assertTrue(deferredResults.get(publicKey).contains(deferredResult));
}
}
@Test
public void testPollNotificationWithPublicNamespaceAsFile() throws Exception {
String somePublicNamespaceAsFile = String.format("%s.%s", somePublicNamespace, "xml");
String somePublicAppId = "somePublicAppId";
AppNamespace somePublicAppNamespace =
assmbleAppNamespace(somePublicAppId, somePublicNamespace);
when(namespaceUtil.filterNamespaceName(somePublicNamespaceAsFile))
.thenReturn(somePublicNamespace);
when(appNamespaceService.findPublicNamespaceByName(somePublicNamespace))
.thenReturn(somePublicAppNamespace);
when(appNamespaceService.findOne(someAppId, somePublicNamespace)).thenReturn(null);
DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespaceAsFile, someDataCenter,
someNotificationId, someClientIp);
List<String> clusters =
Lists.newArrayList(someCluster, someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT);
assertEquals(clusters.size() * 2, deferredResults.size());
for (String cluster : clusters) {
String publicKey =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, cluster, somePublicNamespace);
assertTrue(deferredResults.get(publicKey).contains(deferredResult));
}
for (String cluster : clusters) {
String publicKey =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(somePublicAppId, cluster, somePublicNamespace);
assertTrue(deferredResults.get(publicKey).contains(deferredResult));
}
}
@Test
public void testPollNotificationWithPublicNamespaceWithNotificationIdOutDated() throws Exception {
long notificationId = someNotificationId + 1; long notificationId = someNotificationId + 1;
String releaseMessage = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, defaultNamespace);
ReleaseMessage someReleaseMessage = mock(ReleaseMessage.class); ReleaseMessage someReleaseMessage = mock(ReleaseMessage.class);
when(someReleaseMessage.getId()).thenReturn(notificationId); String someWatchKey = "someKey";
when(releaseMessageService.findLatestReleaseMessageForMessages(anyCollectionOf(String.class)))
.thenReturn(someReleaseMessage);
String somePublicAppId = "somePublicAppId"; Set<String> watchKeys = Sets.newHashSet(someWatchKey);
AppNamespace somePublicAppNamespace = when(watchKeysUtil
assmbleAppNamespace(somePublicAppId, somePublicNamespace); .assembleAllWatchKeys(someAppId, someCluster, defaultNamespace,
someDataCenter))
.thenReturn(
watchKeys);
when(appNamespaceService.findPublicNamespaceByName(somePublicNamespace)) when(someReleaseMessage.getId()).thenReturn(notificationId);
.thenReturn(somePublicAppNamespace); when(someReleaseMessage.getMessage()).thenReturn(releaseMessage);
when(releaseMessageService.findLatestReleaseMessageForMessages(watchKeys))
.thenReturn(someReleaseMessage);
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter, .pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp); someNotificationId, someClientIp);
ResponseEntity<ApolloConfigNotification> result = ResponseEntity<ApolloConfigNotification> result =
(ResponseEntity<ApolloConfigNotification>) deferredResult.getResult(); (ResponseEntity<ApolloConfigNotification>) deferredResult.getResult();
assertEquals(HttpStatus.OK, result.getStatusCode()); assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(somePublicNamespace, result.getBody().getNamespaceName()); assertEquals(defaultNamespace, result.getBody().getNamespaceName());
assertEquals(notificationId, result.getBody().getNotificationId()); assertEquals(notificationId, result.getBody().getNotificationId());
} }
@Test @Test
public void testPollNotificationWithDefaultNamespaceAndHandleMessage() throws Exception { public void testPollNotificationWithDefaultNamespaceAndHandleMessage() throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>> String someWatchKey = "someKey";
deferredResult = controller String anotherWatchKey = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp);
String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, defaultNamespace); .join(someAppId, someCluster, defaultNamespace);
long someId = 1;
ReleaseMessage someReleaseMessage = new ReleaseMessage(key);
someReleaseMessage.setId(someId);
controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC); Set<String> watchKeys = Sets.newHashSet(someWatchKey, anotherWatchKey);
ResponseEntity<ApolloConfigNotification> response = when(watchKeysUtil
(ResponseEntity<ApolloConfigNotification>) deferredResult.getResult(); .assembleAllWatchKeys(someAppId, someCluster, defaultNamespace,
ApolloConfigNotification notification = response.getBody(); someDataCenter)).thenReturn(
watchKeys);
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals(defaultNamespace, notification.getNamespaceName());
assertEquals(someId, notification.getNotificationId());
}
@Test
public void testPollNotificationWithPublicNamespaceAndHandleMessage() throws Exception {
String somePublicAppId = "somePublicAppId";
AppNamespace somePublicAppNamespace =
assmbleAppNamespace(somePublicAppId, somePublicNamespace);
when(appNamespaceService.findPublicNamespaceByName(somePublicNamespace))
.thenReturn(somePublicAppNamespace);
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter, .pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter,
someNotificationId, someClientIp); someNotificationId, someClientIp);
String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(somePublicAppId, someDataCenter, somePublicNamespace);
long someId = 1; long someId = 1;
ReleaseMessage someReleaseMessage = new ReleaseMessage(key); ReleaseMessage someReleaseMessage = new ReleaseMessage(anotherWatchKey);
someReleaseMessage.setId(someId); someReleaseMessage.setId(someId);
controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC); controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC);
...@@ -375,14 +224,7 @@ public class NotificationControllerTest { ...@@ -375,14 +224,7 @@ public class NotificationControllerTest {
ApolloConfigNotification notification = response.getBody(); ApolloConfigNotification notification = response.getBody();
assertEquals(HttpStatus.OK, response.getStatusCode()); assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals(somePublicNamespace, notification.getNamespaceName()); assertEquals(defaultNamespace, notification.getNamespaceName());
assertEquals(someId, notification.getNotificationId()); assertEquals(someId, notification.getNotificationId());
} }
private AppNamespace assmbleAppNamespace(String appId, String namespace) {
AppNamespace appNamespace = new AppNamespace();
appNamespace.setAppId(appId);
appNamespace.setName(namespace);
return appNamespace;
}
} }
package com.ctrip.framework.apollo.configservice.controller;
import com.google.common.base.Joiner;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.service.ReleaseMessageService;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.framework.apollo.configservice.util.NamespaceUtil;
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class NotificationControllerV2Test {
private NotificationControllerV2 controller;
private String someAppId;
private String someCluster;
private String defaultCluster;
private String defaultNamespace;
private String somePublicNamespace;
private String someDataCenter;
private long someNotificationId;
private String someClientIp;
@Mock
private ReleaseMessageService releaseMessageService;
@Mock
private EntityManagerUtil entityManagerUtil;
@Mock
private NamespaceUtil namespaceUtil;
@Mock
private WatchKeysUtil watchKeysUtil;
private Gson gson;
private Multimap<String, DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>>
deferredResults;
@Before
public void setUp() throws Exception {
controller = new NotificationControllerV2();
gson = new Gson();
ReflectionTestUtils.setField(controller, "releaseMessageService", releaseMessageService);
ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil);
ReflectionTestUtils.setField(controller, "namespaceUtil", namespaceUtil);
ReflectionTestUtils.setField(controller, "watchKeysUtil", watchKeysUtil);
ReflectionTestUtils.setField(controller, "gson", gson);
someAppId = "someAppId";
someCluster = "someCluster";
defaultCluster = ConfigConsts.CLUSTER_NAME_DEFAULT;
defaultNamespace = ConfigConsts.NAMESPACE_APPLICATION;
somePublicNamespace = "somePublicNamespace";
someDataCenter = "someDC";
someNotificationId = 1;
someClientIp = "someClientIp";
when(namespaceUtil.filterNamespaceName(defaultNamespace)).thenReturn(defaultNamespace);
when(namespaceUtil.filterNamespaceName(somePublicNamespace)).thenReturn(somePublicNamespace);
deferredResults =
(Multimap<String, DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>>) ReflectionTestUtils
.getField(controller, "deferredResults");
}
@Test
public void testPollNotificationWithDefaultNamespace() throws Exception {
String someWatchKey = "someKey";
String anotherWatchKey = "anotherKey";
Multimap<String, String> watchKeysMap =
assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey, anotherWatchKey));
String notificationAsString =
transformApolloConfigNotificationsToString(defaultNamespace, someNotificationId);
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster, Sets.newHashSet(defaultNamespace),
someDataCenter)).thenReturn(
watchKeysMap);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
deferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);
assertEquals(watchKeysMap.size(), deferredResults.size());
for (String watchKey : watchKeysMap.values()) {
assertTrue(deferredResults.get(watchKey).contains(deferredResult));
}
}
@Test
public void testPollNotificationWithDefaultNamespaceAsFile() throws Exception {
String namespace = String.format("%s.%s", defaultNamespace, "properties");
when(namespaceUtil.filterNamespaceName(namespace)).thenReturn(defaultNamespace);
String someWatchKey = "someKey";
String anotherWatchKey = "anotherKey";
Multimap<String, String> watchKeysMap =
assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey, anotherWatchKey));
String notificationAsString =
transformApolloConfigNotificationsToString(namespace, someNotificationId);
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster, Sets.newHashSet(defaultNamespace),
someDataCenter)).thenReturn(
watchKeysMap);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
deferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);
assertEquals(watchKeysMap.size(), deferredResults.size());
for (String watchKey : watchKeysMap.values()) {
assertTrue(deferredResults.get(watchKey).contains(deferredResult));
}
}
@Test
public void testPollNotificationWithMultipleNamespaces() throws Exception {
String defaultNamespaceAsFile = defaultNamespace + ".properties";
String somePublicNamespaceAsFile = somePublicNamespace + ".xml";
when(namespaceUtil.filterNamespaceName(defaultNamespaceAsFile)).thenReturn(defaultNamespace);
when(namespaceUtil.filterNamespaceName(somePublicNamespaceAsFile))
.thenReturn(somePublicNamespaceAsFile);
String someWatchKey = "someKey";
String anotherWatchKey = "anotherKey";
String somePublicWatchKey = "somePublicWatchKey";
String somePublicFileWatchKey = "somePublicFileWatchKey";
Multimap<String, String> watchKeysMap =
assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey, anotherWatchKey));
watchKeysMap
.putAll(assembleMultiMap(somePublicNamespace, Lists.newArrayList(somePublicWatchKey)));
watchKeysMap
.putAll(assembleMultiMap(somePublicNamespaceAsFile,
Lists.newArrayList(somePublicFileWatchKey)));
String notificationAsString =
transformApolloConfigNotificationsToString(defaultNamespaceAsFile, someNotificationId,
somePublicNamespace, someNotificationId, somePublicNamespaceAsFile,
someNotificationId);
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster,
Sets.newHashSet(defaultNamespace, somePublicNamespace, somePublicNamespaceAsFile),
someDataCenter)).thenReturn(
watchKeysMap);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
deferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);
assertEquals(watchKeysMap.size(), deferredResults.size());
for (String watchKey : watchKeysMap.values()) {
assertTrue(deferredResults.get(watchKey).contains(deferredResult));
}
verify(watchKeysUtil, times(1)).assembleAllWatchKeys(someAppId, someCluster,
Sets.newHashSet(defaultNamespace, somePublicNamespace, somePublicNamespaceAsFile),
someDataCenter);
}
@Test
public void testPollNotificationWithMultipleNamespaceWithNotificationIdOutDated()
throws Exception {
String someWatchKey = "someKey";
String anotherWatchKey = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, somePublicNamespace);
long notificationId = someNotificationId + 1;
Multimap<String, String> watchKeysMap =
assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey));
watchKeysMap
.putAll(assembleMultiMap(somePublicNamespace, Lists.newArrayList(anotherWatchKey)));
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster,
Sets.newHashSet(defaultNamespace, somePublicNamespace), someDataCenter)).thenReturn(
watchKeysMap);
ReleaseMessage someReleaseMessage = mock(ReleaseMessage.class);
when(someReleaseMessage.getId()).thenReturn(notificationId);
when(someReleaseMessage.getMessage()).thenReturn(anotherWatchKey);
when(releaseMessageService
.findLatestReleaseMessagesGroupByMessages(Sets.newHashSet(watchKeysMap.values())))
.thenReturn(Lists.newArrayList(someReleaseMessage));
String notificationAsString =
transformApolloConfigNotificationsToString(defaultNamespace, someNotificationId,
somePublicNamespace, someNotificationId);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
deferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);
ResponseEntity<List<ApolloConfigNotification>> result =
(ResponseEntity<List<ApolloConfigNotification>>) deferredResult.getResult();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, result.getBody().size());
assertEquals(somePublicNamespace, result.getBody().get(0).getNamespaceName());
assertEquals(notificationId, result.getBody().get(0).getNotificationId());
}
@Test
public void testPollNotificationWithMultipleNamespacesAndHandleMessage() throws Exception {
String someWatchKey = "someKey";
String anotherWatchKey = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, somePublicNamespace);
Multimap<String, String> watchKeysMap =
assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey));
watchKeysMap
.putAll(assembleMultiMap(somePublicNamespace, Lists.newArrayList(anotherWatchKey)));
when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster,
Sets.newHashSet(defaultNamespace, somePublicNamespace), someDataCenter)).thenReturn(
watchKeysMap);
String notificationAsString =
transformApolloConfigNotificationsToString(defaultNamespace, someNotificationId,
somePublicNamespace, someNotificationId);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
deferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);
assertEquals(watchKeysMap.size(), deferredResults.size());
long someId = 1;
ReleaseMessage someReleaseMessage = new ReleaseMessage(anotherWatchKey);
someReleaseMessage.setId(someId);
controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC);
ResponseEntity<List<ApolloConfigNotification>> response =
(ResponseEntity<List<ApolloConfigNotification>>) deferredResult.getResult();
assertEquals(1, response.getBody().size());
ApolloConfigNotification notification = response.getBody().get(0);
assertEquals(HttpStatus.OK, response.getStatusCode());
assertEquals(somePublicNamespace, notification.getNamespaceName());
assertEquals(someId, notification.getNotificationId());
}
private String transformApolloConfigNotificationsToString(
String namespace, long notificationId) {
List<ApolloConfigNotification> notifications =
Lists.newArrayList(assembleApolloConfigNotification(namespace, notificationId));
return gson.toJson(notifications);
}
private String transformApolloConfigNotificationsToString(String namespace, long notificationId,
String anotherNamespace,
long anotherNotificationId) {
List<ApolloConfigNotification> notifications =
Lists.newArrayList(assembleApolloConfigNotification(namespace, notificationId),
assembleApolloConfigNotification(anotherNamespace, anotherNotificationId));
return gson.toJson(notifications);
}
private String transformApolloConfigNotificationsToString(String namespace, long notificationId,
String anotherNamespace,
long anotherNotificationId,
String yetAnotherNamespace,
long yetAnotherNotificationId) {
List<ApolloConfigNotification> notifications =
Lists.newArrayList(assembleApolloConfigNotification(namespace, notificationId),
assembleApolloConfigNotification(anotherNamespace, anotherNotificationId),
assembleApolloConfigNotification(yetAnotherNamespace, yetAnotherNotificationId));
return gson.toJson(notifications);
}
private ApolloConfigNotification assembleApolloConfigNotification(String namespace,
long notificationId) {
ApolloConfigNotification notification = new ApolloConfigNotification();
notification.setNamespaceName(namespace);
notification.setNotificationId(notificationId);
return notification;
}
private Multimap<String, String> assembleMultiMap(String key, Iterable<String> values) {
Multimap<String, String> multimap = HashMultimap.create();
multimap.putAll(key, values);
return multimap;
}
}
...@@ -24,6 +24,9 @@ import org.springframework.web.client.RestTemplate; ...@@ -24,6 +24,9 @@ import org.springframework.web.client.RestTemplate;
import java.util.Date; import java.util.Date;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
...@@ -83,4 +86,23 @@ public abstract class AbstractBaseIntegrationTest { ...@@ -83,4 +86,23 @@ public abstract class AbstractBaseIntegrationTest {
return release; return release;
} }
protected void periodicSendMessage(ExecutorService executorService, String message, AtomicBoolean stop) {
executorService.submit((Runnable) () -> {
//wait for the request connected to server
while (!stop.get() && !Thread.currentThread().isInterrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
//double check
if (stop.get()) {
break;
}
sendReleaseMessage(message);
}
});
}
} }
...@@ -2,22 +2,17 @@ package com.ctrip.framework.apollo.configservice.integration; ...@@ -2,22 +2,17 @@ package com.ctrip.framework.apollo.configservice.integration;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.framework.apollo.configservice.controller.NotificationController;
import com.ctrip.framework.apollo.core.ConfigConsts; import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification; import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.test.context.jdbc.Sql; import org.springframework.test.context.jdbc.Sql;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
...@@ -27,9 +22,6 @@ import static org.junit.Assert.assertNotEquals; ...@@ -27,9 +22,6 @@ import static org.junit.Assert.assertNotEquals;
* @author Jason Song(song_s@ctrip.com) * @author Jason Song(song_s@ctrip.com)
*/ */
public class NotificationControllerIntegrationTest extends AbstractBaseIntegrationTest { public class NotificationControllerIntegrationTest extends AbstractBaseIntegrationTest {
@Autowired
private NotificationController notificationController;
private String someAppId; private String someAppId;
private String someCluster; private String someCluster;
private String defaultNamespace; private String defaultNamespace;
...@@ -49,7 +41,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -49,7 +41,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) @Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespace() throws Exception { public void testPollNotificationWithDefaultNamespace() throws Exception {
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(someAppId, someCluster, defaultNamespace), stop); periodicSendMessage(executorService, assembleKey(someAppId, someCluster, defaultNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity( ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}", "{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}",
...@@ -68,7 +60,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -68,7 +60,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) @Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespaceAsFile() throws Exception { public void testPollNotificationWithDefaultNamespaceAsFile() throws Exception {
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(someAppId, someCluster, defaultNamespace), stop); periodicSendMessage(executorService, assembleKey(someAppId, someCluster, defaultNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity( ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}", "{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}",
...@@ -89,7 +81,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -89,7 +81,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
public void testPollNotificationWithPrivateNamespaceAsFile() throws Exception { public void testPollNotificationWithPrivateNamespaceAsFile() throws Exception {
String namespace = "someNamespace.xml"; String namespace = "someNamespace.xml";
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(someAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, namespace), stop); periodicSendMessage(executorService, assembleKey(someAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, namespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity( .getForEntity(
...@@ -144,7 +136,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -144,7 +136,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
String publicAppId = "somePublicAppId"; String publicAppId = "somePublicAppId";
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(publicAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, somePublicNamespace), stop); periodicSendMessage(executorService, assembleKey(publicAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, somePublicNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity( .getForEntity(
...@@ -168,7 +160,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -168,7 +160,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
String someDC = "someDC"; String someDC = "someDC";
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(publicAppId, someDC, somePublicNamespace), stop); periodicSendMessage(executorService, assembleKey(publicAppId, someDC, somePublicNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity( .getForEntity(
...@@ -192,7 +184,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -192,7 +184,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
String someDC = "someDC"; String someDC = "someDC";
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(assembleKey(publicAppId, someDC, somePublicNamespace), stop); periodicSendMessage(executorService, assembleKey(publicAppId, someDC, somePublicNamespace), stop);
ResponseEntity<ApolloConfigNotification> result = restTemplate ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity( .getForEntity(
...@@ -228,23 +220,4 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -228,23 +220,4 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
private String assembleKey(String appId, String cluster, String namespace) { private String assembleKey(String appId, String cluster, String namespace) {
return Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).join(appId, cluster, namespace); return Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).join(appId, cluster, namespace);
} }
private void periodicSendMessage(String message, AtomicBoolean stop) {
executorService.submit((Runnable) () -> {
//wait for the request connected to server
while (!stop.get() && !Thread.currentThread().isInterrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
//double check
if (stop.get()) {
break;
}
sendReleaseMessage(message);
}
});
}
} }
package com.ctrip.framework.apollo.configservice.integration;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.test.context.jdbc.Sql;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public class NotificationControllerV2IntegrationTest extends AbstractBaseIntegrationTest {
@Autowired
private Gson gson;
private String someAppId;
private String someCluster;
private String defaultNamespace;
private String somePublicNamespace;
private ExecutorService executorService;
private ParameterizedTypeReference<List<ApolloConfigNotification>> typeReference;
@Before
public void setUp() throws Exception {
someAppId = "someAppId";
someCluster = ConfigConsts.CLUSTER_NAME_DEFAULT;
defaultNamespace = ConfigConsts.NAMESPACE_APPLICATION;
somePublicNamespace = "somePublicNamespace";
executorService = Executors.newSingleThreadExecutor();
typeReference = new ParameterizedTypeReference<List<ApolloConfigNotification>>() {
};
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespace() throws Exception {
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(executorService, assembleKey(someAppId, someCluster, defaultNamespace),
stop);
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(defaultNamespace, -1));
stop.set(true);
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(defaultNamespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespaceAsFile() throws Exception {
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(executorService, assembleKey(someAppId, someCluster, defaultNamespace),
stop);
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(defaultNamespace + ".properties", -1));
stop.set(true);
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(defaultNamespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithMultipleNamespaces() throws Exception {
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(executorService, assembleKey(someAppId, someCluster, somePublicNamespace),
stop);
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(defaultNamespace + ".properties", -1,
defaultNamespace, -1, somePublicNamespace, -1));
stop.set(true);
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(somePublicNamespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithPrivateNamespaceAsFile() throws Exception {
String namespace = "someNamespace.xml";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(executorService,
assembleKey(someAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, namespace),
stop);
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(namespace, -1));
stop.set(true);
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(namespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/test-release-message.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithDefaultNamespaceWithNotificationIdOutDated()
throws Exception {
long someOutDatedNotificationId = 1;
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(defaultNamespace, someOutDatedNotificationId));
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(defaultNamespace, notifications.get(0).getNamespaceName());
assertEquals(10, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWthPublicNamespaceAndNoDataCenter() throws Exception {
String publicAppId = "somePublicAppId";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(executorService,
assembleKey(publicAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, somePublicNamespace),
stop);
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(somePublicNamespace, -1));
stop.set(true);
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(somePublicNamespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWthPublicNamespaceAndDataCenter() throws Exception {
String publicAppId = "somePublicAppId";
String someDC = "someDC";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(executorService, assembleKey(publicAppId, someDC, somePublicNamespace),
stop);
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}&dataCenter={dataCenter}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(somePublicNamespace, -1), someDC);
stop.set(true);
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(somePublicNamespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWthMultipleNamespacesAndMultipleNamespacesChanged()
throws Exception {
String publicAppId = "somePublicAppId";
String someDC = "someDC";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(executorService, assembleKey(publicAppId, someDC, somePublicNamespace),
stop);
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}&dataCenter={dataCenter}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(defaultNamespace, -1, somePublicNamespace, -1),
someDC);
stop.set(true);
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(somePublicNamespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWthPublicNamespaceAsFile() throws Exception {
String publicAppId = "somePublicAppId";
String someDC = "someDC";
AtomicBoolean stop = new AtomicBoolean();
periodicSendMessage(executorService, assembleKey(publicAppId, someDC, somePublicNamespace),
stop);
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}&dataCenter={dataCenter}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(somePublicNamespace + ".properties", -1),
someDC);
stop.set(true);
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(somePublicNamespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/test-release-message.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithPublicNamespaceWithNotificationIdOutDated() throws Exception {
long someOutDatedNotificationId = 1;
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(somePublicNamespace,
someOutDatedNotificationId));
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(1, notifications.size());
assertEquals(somePublicNamespace, notifications.get(0).getNamespaceName());
assertNotEquals(0, notifications.get(0).getNotificationId());
}
@Test(timeout = 5000L)
@Sql(scripts = "/integration-test/test-release.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/test-release-message.sql", executionPhase = Sql.ExecutionPhase.BEFORE_TEST_METHOD)
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testPollNotificationWithMultipleNamespacesAndNotificationIdsOutDated()
throws Exception {
long someOutDatedNotificationId = 1;
ResponseEntity<List<ApolloConfigNotification>> result = restTemplate.exchange(
"{baseurl}/notifications/v2?appId={appId}&cluster={clusterName}&notifications={notifications}",
HttpMethod.GET, null, typeReference,
getHostUrl(), someAppId, someCluster,
transformApolloConfigNotificationsToString(somePublicNamespace,
someOutDatedNotificationId, defaultNamespace, someOutDatedNotificationId));
List<ApolloConfigNotification> notifications = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(2, notifications.size());
Set<String> outDatedNamespaces =
Sets.newHashSet(notifications.get(0).getNamespaceName(),
notifications.get(1).getNamespaceName());
assertEquals(Sets.newHashSet(defaultNamespace, somePublicNamespace), outDatedNamespaces);
assertNotEquals(0, notifications.get(0).getNotificationId());
assertNotEquals(1, notifications.get(1).getNotificationId());
}
private String assembleKey(String appId, String cluster, String namespace) {
return Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).join(appId, cluster, namespace);
}
private String transformApolloConfigNotificationsToString(
String namespace, long notificationId) {
List<ApolloConfigNotification> notifications =
Lists.newArrayList(assembleApolloConfigNotification(namespace, notificationId));
return gson.toJson(notifications);
}
private String transformApolloConfigNotificationsToString(String namespace, long notificationId,
String anotherNamespace,
long anotherNotificationId) {
List<ApolloConfigNotification> notifications =
Lists.newArrayList(assembleApolloConfigNotification(namespace, notificationId),
assembleApolloConfigNotification(anotherNamespace, anotherNotificationId));
return gson.toJson(notifications);
}
private String transformApolloConfigNotificationsToString(String namespace, long notificationId,
String anotherNamespace,
long anotherNotificationId,
String yetAnotherNamespace,
long yetAnotherNotificationId) {
List<ApolloConfigNotification> notifications =
Lists.newArrayList(assembleApolloConfigNotification(namespace, notificationId),
assembleApolloConfigNotification(anotherNamespace, anotherNotificationId),
assembleApolloConfigNotification(yetAnotherNamespace, yetAnotherNotificationId));
return gson.toJson(notifications);
}
private ApolloConfigNotification assembleApolloConfigNotification(String namespace,
long notificationId) {
ApolloConfigNotification notification = new ApolloConfigNotification();
notification.setNamespaceName(namespace);
notification.setNotificationId(notificationId);
return notification;
}
}
package com.ctrip.framework.apollo.configservice.util;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.ctrip.framework.apollo.biz.service.AppNamespaceService;
import com.ctrip.framework.apollo.common.entity.AppNamespace;
import com.ctrip.framework.apollo.core.ConfigConsts;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.Collection;
import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class WatchKeysUtilTest {
@Mock
private AppNamespaceService appNamespaceService;
@Mock
private AppNamespace someAppNamespace;
@Mock
private AppNamespace anotherAppNamespace;
@Mock
private AppNamespace somePublicAppNamespace;
private WatchKeysUtil watchKeysUtil;
private String someAppId;
private String someCluster;
private String someNamespace;
private String anotherNamespace;
private String somePublicNamespace;
private String defaultCluster;
private String someDC;
private String somePublicAppId;
@Before
public void setUp() throws Exception {
watchKeysUtil = new WatchKeysUtil();
someAppId = "someId";
someCluster = "someCluster";
someNamespace = "someName";
anotherNamespace = "anotherName";
somePublicNamespace = "somePublicName";
defaultCluster = ConfigConsts.CLUSTER_NAME_DEFAULT;
someDC = "someDC";
somePublicAppId = "somePublicId";
when(someAppNamespace.getName()).thenReturn(someNamespace);
when(anotherAppNamespace.getName()).thenReturn(anotherNamespace);
when(appNamespaceService.findByAppIdAndNamespaces(someAppId, Sets.newHashSet(someNamespace)))
.thenReturn(Lists.newArrayList(someAppNamespace));
when(appNamespaceService
.findByAppIdAndNamespaces(someAppId, Sets.newHashSet(someNamespace, anotherNamespace)))
.thenReturn(Lists.newArrayList(someAppNamespace, anotherAppNamespace));
when(appNamespaceService
.findByAppIdAndNamespaces(someAppId,
Sets.newHashSet(someNamespace, anotherNamespace, somePublicNamespace)))
.thenReturn(Lists.newArrayList(someAppNamespace, anotherAppNamespace));
when(somePublicAppNamespace.getAppId()).thenReturn(somePublicAppId);
when(somePublicAppNamespace.getName()).thenReturn(somePublicNamespace);
when(appNamespaceService.findPublicNamespacesByNames(Sets.newHashSet(somePublicNamespace)))
.thenReturn(Lists.newArrayList(somePublicAppNamespace));
ReflectionTestUtils.setField(watchKeysUtil, "appNamespaceService", appNamespaceService);
}
@Test
public void testAssembleAllWatchKeysWithOneNamespaceAndDefaultCluster() throws Exception {
Set<String> watchKeys =
watchKeysUtil.assembleAllWatchKeys(someAppId, defaultCluster, someNamespace, null);
Set<String> clusters = Sets.newHashSet(defaultCluster);
assertEquals(clusters.size(), watchKeys.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeys);
}
@Test
public void testAssembleAllWatchKeysWithOneNamespaceAndSomeDC() throws Exception {
Set<String> watchKeys =
watchKeysUtil.assembleAllWatchKeys(someAppId, someDC, someNamespace, someDC);
Set<String> clusters = Sets.newHashSet(defaultCluster, someDC);
assertEquals(clusters.size(), watchKeys.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeys);
}
@Test
public void testAssembleAllWatchKeysWithOneNamespaceAndSomeDCAndSomeCluster() throws Exception {
Set<String> watchKeys =
watchKeysUtil.assembleAllWatchKeys(someAppId, someCluster, someNamespace, someDC);
Set<String> clusters = Sets.newHashSet(defaultCluster, someCluster, someDC);
assertEquals(clusters.size(), watchKeys.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeys);
}
@Test
public void testAssembleAllWatchKeysWithMultipleNamespaces() throws Exception {
Multimap<String, String> watchKeysMap =
watchKeysUtil.assembleAllWatchKeys(someAppId, someCluster,
Sets.newHashSet(someNamespace, anotherNamespace), someDC);
Set<String> clusters = Sets.newHashSet(defaultCluster, someCluster, someDC);
assertEquals(clusters.size() * 2, watchKeysMap.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeysMap.get(someNamespace));
assertWatchKeys(someAppId, clusters, anotherNamespace, watchKeysMap.get(anotherNamespace));
}
@Test
public void testAssembleAllWatchKeysWithPrivateAndPublicNamespaces() throws Exception {
Multimap<String, String> watchKeysMap =
watchKeysUtil.assembleAllWatchKeys(someAppId, someCluster,
Sets.newHashSet(someNamespace, anotherNamespace, somePublicNamespace), someDC);
Set<String> clusters = Sets.newHashSet(defaultCluster, someCluster, someDC);
assertEquals(clusters.size() * 4, watchKeysMap.size());
assertWatchKeys(someAppId, clusters, someNamespace, watchKeysMap.get(someNamespace));
assertWatchKeys(someAppId, clusters, anotherNamespace, watchKeysMap.get(anotherNamespace));
assertWatchKeys(someAppId, clusters, somePublicNamespace, watchKeysMap.get(somePublicNamespace));
assertWatchKeys(somePublicAppId, clusters, somePublicNamespace, watchKeysMap.get(somePublicNamespace));
}
private void assertWatchKeys(String appId, Set<String> clusters, String namespaceName,
Collection<String> watchedKeys) {
for (String cluster : clusters) {
String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(appId, cluster, namespaceName);
assertTrue(watchedKeys.contains(key));
}
}
}
import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener; import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigFile;
import com.ctrip.framework.apollo.ConfigService; import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.model.ConfigChange; import com.ctrip.framework.apollo.model.ConfigChange;
import com.ctrip.framework.apollo.model.ConfigChangeEvent; import com.ctrip.framework.apollo.model.ConfigChangeEvent;
...@@ -18,11 +16,12 @@ import java.io.InputStreamReader; ...@@ -18,11 +16,12 @@ import java.io.InputStreamReader;
*/ */
public class ApolloConfigDemo { public class ApolloConfigDemo {
private static final Logger logger = LoggerFactory.getLogger(ApolloConfigDemo.class); private static final Logger logger = LoggerFactory.getLogger(ApolloConfigDemo.class);
private String DEFAULT_VALUE = "undefined";
private Config config; private Config config;
private Config publicConfig;
public ApolloConfigDemo() { public ApolloConfigDemo() {
config = ConfigService.getAppConfig(); ConfigChangeListener changeListener = new ConfigChangeListener() {
config.addChangeListener(new ConfigChangeListener() {
@Override @Override
public void onChange(ConfigChangeEvent changeEvent) { public void onChange(ConfigChangeEvent changeEvent) {
logger.info("Changes for namespace {}", changeEvent.getNamespace()); logger.info("Changes for namespace {}", changeEvent.getNamespace());
...@@ -33,12 +32,19 @@ public class ApolloConfigDemo { ...@@ -33,12 +32,19 @@ public class ApolloConfigDemo {
change.getChangeType()); change.getChangeType());
} }
} }
}); };
config = ConfigService.getAppConfig();
config.addChangeListener(changeListener);
publicConfig = ConfigService.getConfig("FX.apollo");
publicConfig.addChangeListener(changeListener);
} }
private String getConfig(String key) { private String getConfig(String key) {
String result = config.getProperty(key, "undefined"); String result = config.getProperty(key, DEFAULT_VALUE);
logger.info(String.format("Loading key: %s with value: %s", key, result)); if (DEFAULT_VALUE.equals(result)) {
result = publicConfig.getProperty(key, DEFAULT_VALUE);
}
logger.info(String.format("Loading key : %s with value: %s", key, result));
return result; return result;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment