Commit 0be1e703 authored by Jason Song's avatar Jason Song

Use database as MQ, change separator to +.

parent 39fb6a84
package com.ctrip.apollo.adminservice;
import com.ctrip.apollo.biz.message.DummyMessageSender;
import com.ctrip.apollo.biz.message.MessageSender;
import com.ctrip.apollo.biz.message.RedisMessageSender;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Configuration
public class AdminServiceAutoConfiguration {
@ConditionalOnProperty(value = "apollo.redis.enabled", havingValue = "true", matchIfMissing = false)
public static class AdminRedisConfiguration {
@Value("${apollo.redis.host}")
private String host;
@Value("${apollo.redis.port}")
private int port;
@Bean
public JedisConnectionFactory redisConnectionFactory() {
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setHostName(host);
factory.setPort(port);
return factory;
}
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
return template;
}
@Bean
public MessageSender redisMessageSender(RedisTemplate<String, String> redisTemplate) {
return new RedisMessageSender(redisTemplate);
}
}
@Configuration
@ConditionalOnProperty(value = "apollo.redis.enabled", havingValue = "false", matchIfMissing = true)
public static class ConfigDefaultConfiguration {
@Bean
public MessageSender defaultMessageSender() {
return new DummyMessageSender();
}
}
}
package com.ctrip.apollo.adminservice.controller; package com.ctrip.apollo.adminservice.controller;
import java.util.List; import com.google.common.base.Joiner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.ctrip.apollo.biz.entity.Namespace; import com.ctrip.apollo.biz.entity.Namespace;
import com.ctrip.apollo.biz.entity.Release; import com.ctrip.apollo.biz.entity.Release;
...@@ -19,9 +11,20 @@ import com.ctrip.apollo.biz.service.NamespaceService; ...@@ -19,9 +11,20 @@ import com.ctrip.apollo.biz.service.NamespaceService;
import com.ctrip.apollo.biz.service.ReleaseService; import com.ctrip.apollo.biz.service.ReleaseService;
import com.ctrip.apollo.common.auth.ActiveUser; import com.ctrip.apollo.common.auth.ActiveUser;
import com.ctrip.apollo.common.utils.BeanUtils; import com.ctrip.apollo.common.utils.BeanUtils;
import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.core.dto.ReleaseDTO; import com.ctrip.apollo.core.dto.ReleaseDTO;
import com.ctrip.apollo.core.exception.NotFoundException; import com.ctrip.apollo.core.exception.NotFoundException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@RestController @RestController
public class ReleaseController { public class ReleaseController {
...@@ -37,26 +40,29 @@ public class ReleaseController { ...@@ -37,26 +40,29 @@ public class ReleaseController {
@Autowired @Autowired
private MessageSender messageSender; private MessageSender messageSender;
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
@RequestMapping("/release/{releaseId}") @RequestMapping("/release/{releaseId}")
public ReleaseDTO get(@PathVariable("releaseId") long releaseId) { public ReleaseDTO get(@PathVariable("releaseId") long releaseId) {
Release release = releaseService.findOne(releaseId); Release release = releaseService.findOne(releaseId);
if (release == null) if (release == null) {
throw new NotFoundException(String.format("release not found for %s", releaseId)); throw new NotFoundException(String.format("release not found for %s", releaseId));
}
return BeanUtils.transfrom(ReleaseDTO.class, release); return BeanUtils.transfrom(ReleaseDTO.class, release);
} }
@RequestMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases") @RequestMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases")
public List<ReleaseDTO> find(@PathVariable("appId") String appId, public List<ReleaseDTO> find(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName, @PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName) { @PathVariable("namespaceName") String namespaceName) {
List<Release> releases = releaseService.findReleases(appId, clusterName, namespaceName); List<Release> releases = releaseService.findReleases(appId, clusterName, namespaceName);
return BeanUtils.batchTransform(ReleaseDTO.class, releases); return BeanUtils.batchTransform(ReleaseDTO.class, releases);
} }
@RequestMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases/latest") @RequestMapping("/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases/latest")
public ReleaseDTO getLatest(@PathVariable("appId") String appId, public ReleaseDTO getLatest(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName, @PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName) { @PathVariable("namespaceName") String namespaceName) {
Release release = configService.findRelease(appId, clusterName, namespaceName); Release release = configService.findRelease(appId, clusterName, namespaceName);
if (release == null) { if (release == null) {
throw new NotFoundException(String.format("latest release not found for %s %s %s", appId, throw new NotFoundException(String.format("latest release not found for %s %s %s", appId,
...@@ -68,10 +74,11 @@ public class ReleaseController { ...@@ -68,10 +74,11 @@ public class ReleaseController {
@RequestMapping(path = "/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases", method = RequestMethod.POST) @RequestMapping(path = "/apps/{appId}/clusters/{clusterName}/namespaces/{namespaceName}/releases", method = RequestMethod.POST)
public ReleaseDTO buildRelease(@PathVariable("appId") String appId, public ReleaseDTO buildRelease(@PathVariable("appId") String appId,
@PathVariable("clusterName") String clusterName, @PathVariable("clusterName") String clusterName,
@PathVariable("namespaceName") String namespaceName, @RequestParam("name") String name, @PathVariable("namespaceName") String namespaceName,
@RequestParam(name = "comment", required = false) String comment, @RequestParam("name") String name,
@ActiveUser UserDetails user) { @RequestParam(name = "comment", required = false) String comment,
@ActiveUser UserDetails user) {
Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName); Namespace namespace = namespaceService.findOne(appId, clusterName, namespaceName);
if (namespace == null) { if (namespace == null) {
throw new NotFoundException(String.format("Could not find namespace for %s %s %s", appId, throw new NotFoundException(String.format("Could not find namespace for %s %s %s", appId,
...@@ -84,6 +91,6 @@ public class ReleaseController { ...@@ -84,6 +91,6 @@ public class ReleaseController {
} }
private String assembleKey(String appId, String cluster, String namespace) { private String assembleKey(String appId, String cluster, String namespace) {
return String.format("%s-%s-%s", appId, cluster, namespace); return STRING_JOINER.join(appId, cluster, namespace);
} }
} }
...@@ -5,6 +5,8 @@ import org.springframework.context.annotation.ComponentScan; ...@@ -5,6 +5,8 @@ import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScan.Filter; import org.springframework.context.annotation.ComponentScan.Filter;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.FilterType; import org.springframework.context.annotation.FilterType;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
@Configuration @Configuration
@ComponentScan(excludeFilters = {@Filter(type = FilterType.ASSIGNABLE_TYPE, value = { @ComponentScan(excludeFilters = {@Filter(type = FilterType.ASSIGNABLE_TYPE, value = {
......
package com.ctrip.apollo.adminservice.controller; package com.ctrip.apollo.adminservice.controller;
import java.util.HashMap; import com.google.common.base.Joiner;
import java.util.Map; import com.google.gson.Gson;
import com.ctrip.apollo.biz.entity.Namespace;
import com.ctrip.apollo.biz.message.MessageSender;
import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.biz.repository.ReleaseRepository;
import com.ctrip.apollo.biz.service.NamespaceService;
import com.ctrip.apollo.biz.service.ReleaseService;
import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.core.dto.AppDTO;
import com.ctrip.apollo.core.dto.ClusterDTO;
import com.ctrip.apollo.core.dto.ItemDTO;
import com.ctrip.apollo.core.dto.NamespaceDTO;
import com.ctrip.apollo.core.dto.ReleaseDTO;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
...@@ -16,18 +29,8 @@ import org.springframework.test.util.ReflectionTestUtils; ...@@ -16,18 +29,8 @@ import org.springframework.test.util.ReflectionTestUtils;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import com.ctrip.apollo.biz.entity.Namespace; import java.util.HashMap;
import com.ctrip.apollo.biz.message.MessageSender; import java.util.Map;
import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.biz.repository.ReleaseRepository;
import com.ctrip.apollo.biz.service.NamespaceService;
import com.ctrip.apollo.biz.service.ReleaseService;
import com.ctrip.apollo.core.dto.AppDTO;
import com.ctrip.apollo.core.dto.ClusterDTO;
import com.ctrip.apollo.core.dto.ItemDTO;
import com.ctrip.apollo.core.dto.NamespaceDTO;
import com.ctrip.apollo.core.dto.ReleaseDTO;
import com.google.gson.Gson;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
...@@ -119,7 +122,8 @@ public class ReleaseControllerTest extends AbstractControllerTest { ...@@ -119,7 +122,8 @@ public class ReleaseControllerTest extends AbstractControllerTest {
.buildRelease(someAppId, someCluster, someNamespaceName, someName, someComment, someUser); .buildRelease(someAppId, someCluster, someNamespaceName, someName, someComment, someUser);
verify(someMessageSender, times(1)) verify(someMessageSender, times(1))
.sendMessage(String.format("%s-%s-%s", someAppId, someCluster, someNamespaceName), .sendMessage(Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, someNamespaceName),
Topics.APOLLO_RELEASE_TOPIC); Topics.APOLLO_RELEASE_TOPIC);
} }
......
...@@ -15,6 +15,10 @@ public class TestWebSecurityConfig extends WebSecurityConfigurerAdapter { ...@@ -15,6 +15,10 @@ public class TestWebSecurityConfig extends WebSecurityConfigurerAdapter {
protected void configure(HttpSecurity http) throws Exception { protected void configure(HttpSecurity http) throws Exception {
http.httpBasic(); http.httpBasic();
http.csrf().disable(); http.csrf().disable();
http.authorizeRequests().antMatchers("/").permitAll().and()
.authorizeRequests().antMatchers("/console/**").permitAll();
http.headers().frameOptions().disable();
} }
@Autowired @Autowired
......
...@@ -11,24 +11,23 @@ INSERT INTO Cluster (AppId, Name) VALUES ('100003173', 'default'); ...@@ -11,24 +11,23 @@ INSERT INTO Cluster (AppId, Name) VALUES ('100003173', 'default');
INSERT INTO Cluster (AppId, Name) VALUES ('100003173', 'cluster3'); INSERT INTO Cluster (AppId, Name) VALUES ('100003173', 'cluster3');
INSERT INTO Cluster (AppId, Name) VALUES ('fxhermesproducer', 'default'); INSERT INTO Cluster (AppId, Name) VALUES ('fxhermesproducer', 'default');
INSERT INTO AppNamespace (AppId, Name) VALUES ('100003171', '100003171'); INSERT INTO AppNamespace (AppId, Name) VALUES ('100003171', 'application');
INSERT INTO AppNamespace (AppId, Name) VALUES ('100003171', 'fx.apollo.config'); INSERT INTO AppNamespace (AppId, Name) VALUES ('100003171', 'fx.apollo.config');
INSERT INTO AppNamespace (AppId, Name) VALUES ('100003172', '100003172'); INSERT INTO AppNamespace (AppId, Name) VALUES ('100003172', 'application');
INSERT INTO AppNamespace (AppId, Name) VALUES ('100003172', 'fx.apollo.admin'); INSERT INTO AppNamespace (AppId, Name) VALUES ('100003172', 'fx.apollo.admin');
INSERT INTO AppNamespace (AppId, Name) VALUES ('100003173', '100003173'); INSERT INTO AppNamespace (AppId, Name) VALUES ('100003173', 'application');
INSERT INTO AppNamespace (AppId, Name) VALUES ('100003173', 'fx.apollo.portal'); INSERT INTO AppNamespace (AppId, Name) VALUES ('100003173', 'fx.apollo.portal');
INSERT INTO AppNamespace (AppID, Name) VALUES ('fxhermesproducer', 'fx.hermes.producer'); INSERT INTO AppNamespace (AppID, Name) VALUES ('fxhermesproducer', 'fx.hermes.producer');
INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (1, '100003171', 'default', '100003171'); INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (1, '100003171', 'default', 'application');
INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (2, 'fxhermesproducer', 'default', 'fx.hermes.producer'); INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (2, 'fxhermesproducer', 'default', 'fx.hermes.producer');
INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (3, '100003172', 'default', '100003172'); INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (3, '100003172', 'default', 'application');
INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (4, '100003173', 'default', '100003173'); INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (4, '100003173', 'default', 'application');
INSERT INTO Namespace (Id, AppId, ClusterName, NamespaceName) VALUES (5, '100003171', 'default', '100003171');
INSERT INTO Item (NamespaceId, `Key`, Value, Comment) VALUES (1, 'k1', 'v1', 'comment1'); INSERT INTO Item (NamespaceId, `Key`, Value, Comment) VALUES (1, 'k1', 'v1', 'comment1');
INSERT INTO Item (NamespaceId, `Key`, Value, Comment) VALUES (1, 'k2', 'v2', 'comment2'); INSERT INTO Item (NamespaceId, `Key`, Value, Comment) VALUES (1, 'k2', 'v2', 'comment2');
INSERT INTO Item (NamespaceId, `Key`, Value, Comment) VALUES (2, 'k3', 'v3', 'comment3'); INSERT INTO Item (NamespaceId, `Key`, Value, Comment) VALUES (2, 'k3', 'v3', 'comment3');
INSERT INTO Item (NamespaceId, `Key`, Value, Comment) VALUES (5, 'k3', 'v4', 'comment4'); INSERT INTO Item (NamespaceId, `Key`, Value, Comment) VALUES (5, 'k3', 'v4', 'comment4');
INSERT INTO RELEASE (Name, Comment, AppId, ClusterName, NamespaceName, Configurations) VALUES ('REV1','First Release','100003171', 'default', '100003171', '{"k1":"v1"}'); INSERT INTO RELEASE (Name, Comment, AppId, ClusterName, NamespaceName, Configurations) VALUES ('REV1','First Release','100003171', 'default', 'application', '{"k1":"v1"}');
...@@ -22,10 +22,6 @@ ...@@ -22,10 +22,6 @@
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId> <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-redis</artifactId>
</dependency>
<dependency> <dependency>
<groupId>mysql</groupId> <groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId> <artifactId>mysql-connector-java</artifactId>
......
package com.ctrip.apollo.biz.entity;
import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.PrePersist;
import javax.persistence.Table;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Entity
@Table(name = "ReleaseMessage")
public class ReleaseMessage {
@Id
@GeneratedValue
@Column(name = "Id")
private long id;
@Column(name = "Message", nullable = false)
private String message;
@Column(name = "DataChange_LastTime")
private Date dataChangeLastModifiedTime;
@PrePersist
protected void prePersist() {
if (this.dataChangeLastModifiedTime == null) {
dataChangeLastModifiedTime = new Date();
}
}
public ReleaseMessage() {
}
public ReleaseMessage(String message) {
this.message = message;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
package com.ctrip.apollo.biz.message; package com.ctrip.apollo.biz.message;
import com.ctrip.apollo.biz.entity.ReleaseMessage;
import com.ctrip.apollo.biz.repository.ReleaseMessageRepository;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
import com.dianping.cat.message.Message; import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction; import com.dianping.cat.message.Transaction;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Objects;
/** /**
* @author Jason Song(song_s@ctrip.com) * @author Jason Song(song_s@ctrip.com)
*/ */
public class RedisMessageSender implements MessageSender { @Component
private static final Logger logger = LoggerFactory.getLogger(RedisMessageSender.class); public class DatabaseMessageSender implements MessageSender {
private RedisTemplate<String, String> redisTemplate; private static final Logger logger = LoggerFactory.getLogger(DatabaseMessageSender.class);
public RedisMessageSender( @Autowired
RedisTemplate<String, String> redisTemplate) { private ReleaseMessageRepository releaseMessageRepository;
this.redisTemplate = redisTemplate;
}
@Override @Override
public void sendMessage(String message, String channel) { public void sendMessage(String message, String channel) {
logger.info("Sending message {} to channel {}", message, channel); logger.info("Sending message {} to channel {}", message, channel);
Transaction transaction = Cat.newTransaction("Apollo.AdminService", "RedisMessageSender"); if (!Objects.equals(channel, Topics.APOLLO_RELEASE_TOPIC)) {
logger.warn("Channel {} not supported by DatabaseMessageSender!");
return;
}
Cat.logEvent("Apollo.AdminService.ReleaseMessage", message);
Transaction transaction = Cat.newTransaction("Apollo.AdminService", "sendMessage");
try { try {
redisTemplate.convertAndSend(channel, message); releaseMessageRepository.save(new ReleaseMessage(message));
transaction.setStatus(Message.SUCCESS); transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) { } catch (Throwable ex) {
logger.error("Sending message to redis failed", ex); logger.error("Sending message to database failed", ex);
transaction.setStatus(ex); transaction.setStatus(ex);
} finally { } finally {
transaction.complete(); transaction.complete();
......
package com.ctrip.apollo.biz.message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public class DummyMessageSender implements MessageSender{
private static final Logger logger = LoggerFactory.getLogger(DummyMessageSender.class);
@Override
public void sendMessage(String message, String channel) {
logger.warn("No message sender available! message: {}, channel: {}", message, channel);
}
}
package com.ctrip.apollo.biz.message;
import com.google.common.collect.Lists;
import com.ctrip.apollo.biz.entity.ReleaseMessage;
import com.ctrip.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.apollo.core.utils.ApolloThreadFactory;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public class ReleaseMessageScanner implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(ReleaseMessageScanner.class);
private static final int DEFAULT_SCAN_INTERVAL_IN_MS = 1000;
@Autowired
private Environment env;
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
private int databaseScanInterval;
private List<MessageListener> listeners;
private ScheduledExecutorService executorService;
private long maxIdScanned;
public ReleaseMessageScanner() {
listeners = Lists.newLinkedList();
executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory
.create("ReleaseMessageScanner", true));
}
@Override
public void afterPropertiesSet() throws Exception {
populateDataBaseInterval();
maxIdScanned = loadLargestMessageId();
executorService.scheduleWithFixedDelay((Runnable) () -> {
Transaction transaction = Cat.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");
try {
scanMessages();
transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) {
transaction.setStatus(ex);
logger.error("Scan and send message failed", ex);
} finally {
transaction.complete();
}
}, getDatabaseScanIntervalMs(), getDatabaseScanIntervalMs(), TimeUnit.MILLISECONDS);
}
/**
* add message listeners for release message
* @param listener
*/
public void addMessageListener(MessageListener listener) {
if (!listeners.contains(listener)) {
listeners.add(listener);
}
}
/**
* Scan messages, continue scanning until there is no more messages
*/
private void scanMessages() {
boolean hasMoreMessages = true;
while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {
hasMoreMessages = scanAndSendMessages();
}
}
/**
* scan messages and send
*
* @return whether there are more messages
*/
private boolean scanAndSendMessages() {
//current batch is 500
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();
return messageScanned == 500;
}
/**
* find largest message id as the current start point
* @return current largest message id
*/
private long loadLargestMessageId() {
ReleaseMessage releaseMessage = releaseMessageRepository.findTopByOrderByIdDesc();
return releaseMessage == null ? 0 : releaseMessage.getId();
}
/**
* Notify listeners with messages loaded
* @param messages
*/
private void fireMessageScanned(List<ReleaseMessage> messages) {
for (ReleaseMessage message : messages) {
for (MessageListener listener : listeners) {
try {
listener.handleMessage(message.getMessage(), Topics.APOLLO_RELEASE_TOPIC);
} catch (Throwable ex) {
Cat.logError(ex);
logger.error("Failed to invoke message listener {}", listener.getClass(), ex);
}
}
}
}
private void populateDataBaseInterval() {
databaseScanInterval = DEFAULT_SCAN_INTERVAL_IN_MS;
try {
String interval = env.getProperty("apollo.message-scan.interval");
if (!Objects.isNull(interval)) {
databaseScanInterval = Integer.parseInt(interval);
}
} catch (Throwable ex) {
Cat.logError(ex);
logger.error("Load apollo message scan interval from system property failed", ex);
}
}
private int getDatabaseScanIntervalMs() {
return databaseScanInterval;
}
}
package com.ctrip.apollo.biz.repository;
import com.ctrip.apollo.biz.entity.ReleaseMessage;
import org.springframework.data.repository.PagingAndSortingRepository;
import java.util.List;
/**
* @author Jason Song(song_s@ctrip.com)
*/
public interface ReleaseMessageRepository extends PagingAndSortingRepository<ReleaseMessage, Long> {
List<ReleaseMessage> findFirst500ByIdGreaterThanOrderByIdAsc(Long id);
ReleaseMessage findTopByOrderByIdDesc();
}
package com.ctrip.apollo.biz; package com.ctrip.apollo.biz;
import com.ctrip.apollo.biz.message.DatabaseMessageSenderTest;
import com.ctrip.apollo.biz.repository.AppRepositoryTest; import com.ctrip.apollo.biz.repository.AppRepositoryTest;
import com.ctrip.apollo.biz.service.AdminServiceTest; import com.ctrip.apollo.biz.service.AdminServiceTest;
import com.ctrip.apollo.biz.service.AdminServiceTransactionTest; import com.ctrip.apollo.biz.service.AdminServiceTransactionTest;
...@@ -16,7 +17,8 @@ import org.junit.runners.Suite.SuiteClasses; ...@@ -16,7 +17,8 @@ import org.junit.runners.Suite.SuiteClasses;
AdminServiceTest.class, AdminServiceTest.class,
ConfigServiceTest.class, ConfigServiceTest.class,
PrivilegeServiceTest.class, PrivilegeServiceTest.class,
AdminServiceTransactionTest.class}) AdminServiceTransactionTest.class,
DatabaseMessageSenderTest.class})
public class AllTests { public class AllTests {
} }
package com.ctrip.apollo.biz.message;
import com.ctrip.apollo.biz.entity.ReleaseMessage;
import com.ctrip.apollo.biz.repository.ReleaseMessageRepository;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.test.util.ReflectionTestUtils;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class DatabaseMessageSenderTest {
private DatabaseMessageSender messageSender;
@Mock
private ReleaseMessageRepository releaseMessageRepository;
@Before
public void setUp() throws Exception {
messageSender = new DatabaseMessageSender();
ReflectionTestUtils.setField(messageSender, "releaseMessageRepository", releaseMessageRepository);
}
@Test
public void testSendMessage() throws Exception {
String someMessage = "some-message";
ArgumentCaptor<ReleaseMessage> captor = ArgumentCaptor.forClass(ReleaseMessage.class);
messageSender.sendMessage(someMessage, Topics.APOLLO_RELEASE_TOPIC);
verify(releaseMessageRepository, times(1)).save(captor.capture());
assertEquals(someMessage, captor.getValue().getMessage());
}
@Test
public void testSendUnsupportedMessage() throws Exception {
String someMessage = "some-message";
String someUnsupportedTopic = "some-invalid-topic";
messageSender.sendMessage(someMessage, someUnsupportedTopic);
verify(releaseMessageRepository, never()).save(any(ReleaseMessage.class));
}
}
package com.ctrip.apollo.biz.message;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.data.redis.core.RedisTemplate;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class RedisMessageSenderTest {
@Mock
private RedisTemplate<String, String> redisTemplate;
private RedisMessageSender redisMessageSender;
@Before
public void setUp() throws Exception {
redisMessageSender = new RedisMessageSender(redisTemplate);
}
@Test
public void testSendMessage() throws Exception {
String someMessage = "someMessage";
String someChannel = "someChannel";
redisMessageSender.sendMessage(someMessage, someChannel);
verify(redisTemplate, times(1)).convertAndSend(someChannel, someMessage);
}
@Test
public void testSendMessageWithError() throws Exception {
String someMessage = "someMessage";
String someChannel = "someChannel";
doThrow(new RuntimeException()).when(redisTemplate).convertAndSend(someChannel, someMessage);
redisMessageSender.sendMessage(someMessage, someChannel);
}
}
package com.ctrip.apollo.biz.message;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import com.ctrip.apollo.biz.entity.ReleaseMessage;
import com.ctrip.apollo.biz.repository.ReleaseMessageRepository;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.core.env.Environment;
import org.springframework.test.util.ReflectionTestUtils;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.when;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@RunWith(MockitoJUnitRunner.class)
public class ReleaseMessageScannerTest {
private ReleaseMessageScanner releaseMessageScanner;
@Mock
private ReleaseMessageRepository releaseMessageRepository;
@Mock
private Environment env;
private int databaseScanInterval;
@Before
public void setUp() throws Exception {
releaseMessageScanner = new ReleaseMessageScanner();
ReflectionTestUtils
.setField(releaseMessageScanner, "releaseMessageRepository", releaseMessageRepository);
ReflectionTestUtils.setField(releaseMessageScanner, "env", env);
databaseScanInterval = 100; //100 ms
when(env.getProperty("apollo.message-scan.interval")).thenReturn(String.valueOf(databaseScanInterval));
releaseMessageScanner.afterPropertiesSet();
}
@Test
public void testScanMessageAndNotifyMessageListener() throws Exception {
SettableFuture<String> someListenerFuture = SettableFuture.create();
MessageListener someListener = (message, channel) -> someListenerFuture.set(message);
releaseMessageScanner.addMessageListener(someListener);
String someMessage = "someMessage";
long someId = 100;
ReleaseMessage someReleaseMessage = assembleReleaseMessage(someId, someMessage);
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(0L)).thenReturn(
Lists.newArrayList(someReleaseMessage));
String someListenerMessage =
someListenerFuture.get(5000, TimeUnit.MILLISECONDS);
assertEquals(someMessage, someListenerMessage);
SettableFuture<String> anotherListenerFuture = SettableFuture.create();
MessageListener anotherListener = (message, channel) -> anotherListenerFuture.set(message);
releaseMessageScanner.addMessageListener(anotherListener);
String anotherMessage = "anotherMessage";
long anotherId = someId + 1;
ReleaseMessage anotherReleaseMessage = assembleReleaseMessage(anotherId, anotherMessage);
when(releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(someId)).thenReturn(
Lists.newArrayList(anotherReleaseMessage));
String anotherListenerMessage =
anotherListenerFuture.get(5000, TimeUnit.MILLISECONDS);
assertEquals(anotherMessage, anotherListenerMessage);
}
private ReleaseMessage assembleReleaseMessage(long id, String message) {
ReleaseMessage releaseMessage = new ReleaseMessage();
releaseMessage.setId(id);
releaseMessage.setMessage(message);
return releaseMessage;
}
}
...@@ -25,7 +25,7 @@ public abstract class AbstractConfigRepository implements ConfigRepository { ...@@ -25,7 +25,7 @@ public abstract class AbstractConfigRepository implements ConfigRepository {
} catch (Throwable ex) { } catch (Throwable ex) {
Cat.logError(ex); Cat.logError(ex);
logger logger
.warn("Sync config failed with repository {}, reason: {}", this.getClass(), ExceptionUtil .warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex)); .getDetailMessage(ex));
} }
return false; return false;
......
package com.ctrip.apollo.internals; package com.ctrip.apollo.internals;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.util.ConfigUtil; import com.ctrip.apollo.util.ConfigUtil;
import com.ctrip.apollo.util.ExceptionUtil; import com.ctrip.apollo.util.ExceptionUtil;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
...@@ -202,8 +204,10 @@ public class LocalFileConfigRepository extends AbstractConfigRepository ...@@ -202,8 +204,10 @@ public class LocalFileConfigRepository extends AbstractConfigRepository
} }
File assembleLocalCacheFile(File baseDir, String namespace) { File assembleLocalCacheFile(File baseDir, String namespace) {
String fileName = String.format("%s-%s-%s.properties", m_configUtil.getAppId(),
m_configUtil.getCluster(), namespace); String fileName =
String.format("%s.properties", Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(m_configUtil.getAppId(), m_configUtil.getCluster(), namespace));
return new File(baseDir, fileName); return new File(baseDir, fileName);
} }
} }
...@@ -7,6 +7,7 @@ import com.google.common.collect.Maps; ...@@ -7,6 +7,7 @@ import com.google.common.collect.Maps;
import com.google.common.escape.Escaper; import com.google.common.escape.Escaper;
import com.google.common.net.UrlEscapers; import com.google.common.net.UrlEscapers;
import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.core.dto.ApolloConfig; import com.ctrip.apollo.core.dto.ApolloConfig;
import com.ctrip.apollo.core.dto.ApolloConfigNotification; import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.apollo.core.dto.ServiceDTO; import com.ctrip.apollo.core.dto.ServiceDTO;
...@@ -43,6 +44,8 @@ import java.util.concurrent.atomic.AtomicReference; ...@@ -43,6 +44,8 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public class RemoteConfigRepository extends AbstractConfigRepository { public class RemoteConfigRepository extends AbstractConfigRepository {
private static final Logger logger = LoggerFactory.getLogger(RemoteConfigRepository.class); private static final Logger logger = LoggerFactory.getLogger(RemoteConfigRepository.class);
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
private PlexusContainer m_container; private PlexusContainer m_container;
private final ConfigServiceLocator m_serviceLocator; private final ConfigServiceLocator m_serviceLocator;
private final HttpUtil m_httpUtil; private final HttpUtil m_httpUtil;
...@@ -135,8 +138,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -135,8 +138,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
String appId = m_configUtil.getAppId(); String appId = m_configUtil.getAppId();
String cluster = m_configUtil.getCluster(); String cluster = m_configUtil.getCluster();
String dataCenter = m_configUtil.getDataCenter(); String dataCenter = m_configUtil.getDataCenter();
Cat.logEvent("Apollo.Client.ConfigInfo", Cat.logEvent("Apollo.Client.ConfigInfo", STRING_JOINER.join(appId, cluster, m_namespace));
String.format("%s-%s-%s", appId, cluster, m_namespace));
int maxRetries = 2; int maxRetries = 2;
Throwable exception = null; Throwable exception = null;
...@@ -214,7 +216,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -214,7 +216,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
String pathExpanded = String.format(path, pathParams.toArray()); String pathExpanded = String.format(path, pathParams.toArray());
if (!queryParams.isEmpty()) { if (!queryParams.isEmpty()) {
pathExpanded += "?" + Joiner.on("&").withKeyValueSeparator("=").join(queryParams); pathExpanded += "?" + MAP_JOINER.join(queryParams);
} }
if (!uri.endsWith("/")) { if (!uri.endsWith("/")) {
uri += "/"; uri += "/";
...@@ -276,7 +278,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -276,7 +278,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
transaction.addData("StatusCode", response.getStatusCode()); transaction.addData("StatusCode", response.getStatusCode());
transaction.setStatus(Message.SUCCESS); transaction.setStatus(Message.SUCCESS);
} catch (Throwable ex) { } catch (Throwable ex) {
logger.warn("Long polling failed for appId: {}, cluster: {}, namespace: {}, reason: {}", logger.warn("Long polling failed, will retry. appId: {}, cluster: {}, namespace: {}, reason: {}",
appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex)); appId, cluster, m_namespace, ExceptionUtil.getDetailMessage(ex));
lastServiceDto = null; lastServiceDto = null;
Cat.logError(ex); Cat.logError(ex);
...@@ -284,7 +286,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -284,7 +286,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
transaction.setStatus(ex); transaction.setStatus(ex);
} }
try { try {
TimeUnit.SECONDS.sleep(10); TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
//ignore //ignore
} }
...@@ -314,7 +316,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository { ...@@ -314,7 +316,7 @@ public class RemoteConfigRepository extends AbstractConfigRepository {
queryParams.put("releaseId", escaper.escape(previousConfig.getReleaseId())); queryParams.put("releaseId", escaper.escape(previousConfig.getReleaseId()));
} }
String params = Joiner.on("&").withKeyValueSeparator("=").join(queryParams); String params = MAP_JOINER.join(queryParams);
if (!uri.endsWith("/")) { if (!uri.endsWith("/")) {
uri += "/"; uri += "/";
} }
......
package com.ctrip.apollo.integration; package com.ctrip.apollo.integration;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
...@@ -350,6 +351,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest { ...@@ -350,6 +351,7 @@ public class ConfigIntegrationTest extends BaseIntegrationTest {
} }
private String assembleLocalCacheFileName() { private String assembleLocalCacheFileName() {
return String.format("%s-%s-%s.properties", someAppId, someClusterName, defaultNamespace); return String.format("%s.properties", Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someClusterName, defaultNamespace));
} }
} }
package com.ctrip.apollo.internals; package com.ctrip.apollo.internals;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.util.ConfigUtil; import com.ctrip.apollo.util.ConfigUtil;
import org.junit.After; import org.junit.After;
...@@ -73,8 +75,8 @@ public class LocalFileConfigRepositoryTest extends ComponentTestCase { ...@@ -73,8 +75,8 @@ public class LocalFileConfigRepositoryTest extends ComponentTestCase {
} }
private String assembleLocalCacheFileName() { private String assembleLocalCacheFileName() {
return String.format("%s-%s-%s.properties", someAppId, return String.format("%s.properties", Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
someCluster, someNamespace); .join(someAppId, someCluster, someNamespace));
} }
...@@ -144,7 +146,7 @@ public class LocalFileConfigRepositoryTest extends ComponentTestCase { ...@@ -144,7 +146,7 @@ public class LocalFileConfigRepositoryTest extends ComponentTestCase {
assertThat( assertThat(
"LocalFileConfigRepository should persist local cache files and return that afterwards", "LocalFileConfigRepository should persist local cache files and return that afterwards",
someProperties.entrySet(), equalTo(anotherProperties.entrySet())); someProperties.entrySet(), equalTo(anotherProperties.entrySet()));
} }
......
package com.ctrip.apollo.configservice; package com.ctrip.apollo.configservice;
import com.ctrip.apollo.biz.message.Topics; import com.ctrip.apollo.biz.message.ReleaseMessageScanner;
import com.ctrip.apollo.configservice.controller.NotificationController; import com.ctrip.apollo.configservice.controller.NotificationController;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
/** /**
* @author Jason Song(song_s@ctrip.com) * @author Jason Song(song_s@ctrip.com)
*/ */
@Configuration @Configuration
public class ConfigServiceAutoConfiguration { public class ConfigServiceAutoConfiguration {
@Autowired
@ConditionalOnProperty(value = "apollo.redis.enabled", havingValue = "true", matchIfMissing = false) private NotificationController notificationController;
public static class ConfigRedisConfiguration {
@Value("${apollo.redis.host}") @Bean
private String host; public ReleaseMessageScanner releaseMessageScanner() {
@Value("${apollo.redis.port}") ReleaseMessageScanner releaseMessageScanner = new ReleaseMessageScanner();
private int port; releaseMessageScanner.addMessageListener(notificationController);
return releaseMessageScanner;
@Bean
public JedisConnectionFactory redisConnectionFactory() {
JedisConnectionFactory factory = new JedisConnectionFactory();
factory.setHostName(host);
factory.setPort(port);
return factory;
}
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
return container;
}
@Bean
public ChannelTopic apolloReleaseTopic() {
return new ChannelTopic(Topics.APOLLO_RELEASE_TOPIC);
}
@Bean
public MessageListenerAdapter apolloMessageListener(RedisMessageListenerContainer container,
NotificationController notification,
ChannelTopic topic) {
MessageListenerAdapter adapter = new MessageListenerAdapter(notification);
container.addMessageListener(adapter, topic);
return adapter;
}
} }
} }
...@@ -42,10 +42,11 @@ public class ConfigController { ...@@ -42,10 +42,11 @@ public class ConfigController {
@Autowired @Autowired
private AppNamespaceService appNamespaceService; private AppNamespaceService appNamespaceService;
private Gson gson = new Gson(); private static final Gson gson = new Gson();
private Type configurationTypeReference = private static final Type configurationTypeReference =
new TypeToken<Map<java.lang.String, java.lang.String>>() { new TypeToken<Map<java.lang.String, java.lang.String>>() {
}.getType(); }.getType();
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
@RequestMapping(value = "/{appId}/{clusterName}", method = RequestMethod.GET) @RequestMapping(value = "/{appId}/{clusterName}", method = RequestMethod.GET)
public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName, public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
...@@ -89,7 +90,7 @@ public class ConfigController { ...@@ -89,7 +90,7 @@ public class ConfigController {
} }
String mergedReleaseId = FluentIterable.from(releases).transform( String mergedReleaseId = FluentIterable.from(releases).transform(
input -> String.valueOf(input.getId())).join(Joiner.on("|")); input -> String.valueOf(input.getId())).join(STRING_JOINER);
if (mergedReleaseId.equals(clientSideReleaseId)) { if (mergedReleaseId.equals(clientSideReleaseId)) {
// Client side configuration is the same with server side, return 304 // Client side configuration is the same with server side, return 304
...@@ -148,11 +149,11 @@ public class ConfigController { ...@@ -148,11 +149,11 @@ public class ConfigController {
} }
private String assembleKey(String appId, String cluster, String namespace, String datacenter) { private String assembleKey(String appId, String cluster, String namespace, String datacenter) {
String key = String.format("%s-%s-%s", appId, cluster, namespace); List<String> keyParts = Lists.newArrayList(appId, cluster, namespace);
if (!Strings.isNullOrEmpty(datacenter)) { if (!Strings.isNullOrEmpty(datacenter)) {
key += "-" + datacenter; keyParts.add(datacenter);
} }
return key; return STRING_JOINER.join(keyParts);
} }
} }
package com.ctrip.apollo.configservice.controller; package com.ctrip.apollo.configservice.controller;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap; import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
...@@ -29,8 +31,6 @@ import java.util.Collection; ...@@ -29,8 +31,6 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import javax.servlet.http.HttpServletResponse;
/** /**
* @author Jason Song(song_s@ctrip.com) * @author Jason Song(song_s@ctrip.com)
*/ */
...@@ -40,8 +40,12 @@ public class NotificationController implements MessageListener { ...@@ -40,8 +40,12 @@ public class NotificationController implements MessageListener {
private static final Logger logger = LoggerFactory.getLogger(NotificationController.class); private static final Logger logger = LoggerFactory.getLogger(NotificationController.class);
private static final long TIMEOUT = 360 * 60 * 1000;//6 hours private static final long TIMEOUT = 360 * 60 * 1000;//6 hours
private final Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>> private final Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>>
deferredResults = deferredResults = Multimaps.synchronizedSetMultimap(HashMultimap.create());
Multimaps.synchronizedSetMultimap(HashMultimap.create()); private static final ResponseEntity<ApolloConfigNotification>
NOT_MODIFIED_RESPONSE = new ResponseEntity<>(HttpStatus.NOT_MODIFIED);
private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
private static final Splitter STRING_SPLITTER =
Splitter.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
@Autowired @Autowired
private AppNamespaceService appNamespaceService; private AppNamespaceService appNamespaceService;
...@@ -52,8 +56,7 @@ public class NotificationController implements MessageListener { ...@@ -52,8 +56,7 @@ public class NotificationController implements MessageListener {
@RequestParam(value = "cluster") String cluster, @RequestParam(value = "cluster") String cluster,
@RequestParam(value = "namespace", defaultValue = ConfigConsts.NAMESPACE_DEFAULT) String namespace, @RequestParam(value = "namespace", defaultValue = ConfigConsts.NAMESPACE_DEFAULT) String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter, @RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId, @RequestParam(value = "releaseId", defaultValue = "-1") String clientSideReleaseId) {
HttpServletResponse response) {
List<String> watchedKeys = Lists.newArrayList(assembleKey(appId, cluster, namespace)); List<String> watchedKeys = Lists.newArrayList(assembleKey(appId, cluster, namespace));
//Listen on more namespaces, since it's not the default namespace //Listen on more namespaces, since it's not the default namespace
...@@ -61,30 +64,32 @@ public class NotificationController implements MessageListener { ...@@ -61,30 +64,32 @@ public class NotificationController implements MessageListener {
watchedKeys.addAll(this.findPublicConfigWatchKey(appId, namespace, dataCenter)); watchedKeys.addAll(this.findPublicConfigWatchKey(appId, namespace, dataCenter));
} }
ResponseEntity<ApolloConfigNotification> body = new ResponseEntity<>(
HttpStatus.NOT_MODIFIED);
DeferredResult<ResponseEntity<ApolloConfigNotification>> deferredResult = DeferredResult<ResponseEntity<ApolloConfigNotification>> deferredResult =
new DeferredResult<>(TIMEOUT, body); new DeferredResult<>(TIMEOUT, NOT_MODIFIED_RESPONSE);
//register all keys //register all keys
for (String key : watchedKeys) { for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResult); this.deferredResults.put(key, deferredResult);
} }
deferredResult.onTimeout(() -> logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResult.onCompletion(() -> { deferredResult.onCompletion(() -> {
//unregister all keys //unregister all keys
for (String key : watchedKeys) { for (String key : watchedKeys) {
deferredResults.remove(key, deferredResult); deferredResults.remove(key, deferredResult);
} }
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.CompletedKeys");
}); });
logWatchedKeysToCat(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.info("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}", logger.info("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespace, dataCenter); watchedKeys, appId, cluster, namespace, dataCenter);
return deferredResult; return deferredResult;
} }
private String assembleKey(String appId, String cluster, String namespace) { private String assembleKey(String appId, String cluster, String namespace) {
return String.format("%s-%s-%s", appId, cluster, namespace); return STRING_JOINER.join(appId, cluster, namespace);
} }
private List<String> findPublicConfigWatchKey(String applicationId, String namespace, private List<String> findPublicConfigWatchKey(String applicationId, String namespace,
...@@ -114,20 +119,20 @@ public class NotificationController implements MessageListener { ...@@ -114,20 +119,20 @@ public class NotificationController implements MessageListener {
@Override @Override
public void handleMessage(String message, String channel) { public void handleMessage(String message, String channel) {
logger.info("message received - channel: {}, message: {}", channel, message); logger.info("message received - channel: {}, message: {}", channel, message);
Cat.logEvent("Apollo.LongPoll.Message", message); Cat.logEvent("Apollo.LongPoll.Messages", message);
if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message)) { if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(message)) {
return; return;
} }
String[] keys = message.split("-"); List<String> keys = STRING_SPLITTER.splitToList(message);
//message should be appId-cluster-namespace //message should be appId|cluster|namespace
if (keys.length != 3) { if (keys.size() != 3) {
logger.error("message format invalid - {}", message); logger.error("message format invalid - {}", message);
return; return;
} }
ResponseEntity<ApolloConfigNotification> notification = ResponseEntity<ApolloConfigNotification> notification =
new ResponseEntity<>( new ResponseEntity<>(
new ApolloConfigNotification(keys[2]), HttpStatus.OK); new ApolloConfigNotification(keys.get(2)), HttpStatus.OK);
Collection<DeferredResult<ResponseEntity<ApolloConfigNotification>>> Collection<DeferredResult<ResponseEntity<ApolloConfigNotification>>>
results = deferredResults.get(message); results = deferredResults.get(message);
...@@ -137,5 +142,11 @@ public class NotificationController implements MessageListener { ...@@ -137,5 +142,11 @@ public class NotificationController implements MessageListener {
result.setResult(notification); result.setResult(notification);
} }
} }
private void logWatchedKeysToCat(List<String> watchedKeys, String eventName) {
for (String watchedKey : watchedKeys) {
Cat.logEvent(eventName, watchedKey);
}
}
} }
package com.ctrip.apollo; package com.ctrip.apollo;
import com.ctrip.apollo.common.controller.WebSecurityConfig;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.ComponentScan.Filter; import org.springframework.context.annotation.ComponentScan.Filter;
...@@ -8,7 +10,7 @@ import org.springframework.context.annotation.FilterType; ...@@ -8,7 +10,7 @@ import org.springframework.context.annotation.FilterType;
@Configuration @Configuration
@ComponentScan(excludeFilters = {@Filter(type = FilterType.ASSIGNABLE_TYPE, value = { @ComponentScan(excludeFilters = {@Filter(type = FilterType.ASSIGNABLE_TYPE, value = {
SampleConfigServiceApplication.class, ConfigServiceApplication.class})}) SampleConfigServiceApplication.class, ConfigServiceApplication.class, WebSecurityConfig.class})})
@EnableAutoConfiguration @EnableAutoConfiguration
public class ConfigServiceTestConfiguration { public class ConfigServiceTestConfiguration {
......
package com.ctrip.apollo.configservice.controller; package com.ctrip.apollo.configservice.controller;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.gson.Gson; import com.google.gson.Gson;
...@@ -243,7 +244,8 @@ public class ConfigControllerTest { ...@@ -243,7 +244,8 @@ public class ConfigControllerTest {
.queryConfig(someAppId, someClusterName, somePublicNamespaceName, someDataCenter, .queryConfig(someAppId, someClusterName, somePublicNamespaceName, someDataCenter,
someAppSideReleaseId, someResponse); someAppSideReleaseId, someResponse);
assertEquals(String.format("%s|%s", someAppSideReleaseId, somePublicAppSideReleaseId), assertEquals(Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppSideReleaseId, somePublicAppSideReleaseId),
result.getReleaseId()); result.getReleaseId());
assertEquals(someAppId, result.getAppId()); assertEquals(someAppId, result.getAppId());
assertEquals(someClusterName, result.getCluster()); assertEquals(someClusterName, result.getCluster());
......
package com.ctrip.apollo.configservice.controller; package com.ctrip.apollo.configservice.controller;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Multimap; import com.google.common.collect.Multimap;
...@@ -40,8 +41,6 @@ public class NotificationControllerTest { ...@@ -40,8 +41,6 @@ public class NotificationControllerTest {
private String someDataCenter; private String someDataCenter;
private String someReleaseId; private String someReleaseId;
@Mock @Mock
private HttpServletResponse response;
@Mock
private AppNamespaceService appNamespaceService; private AppNamespaceService appNamespaceService;
private Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>> private Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>>
deferredResults; deferredResults;
...@@ -67,10 +66,11 @@ public class NotificationControllerTest { ...@@ -67,10 +66,11 @@ public class NotificationControllerTest {
public void testPollNotificationWithDefaultNamespace() throws Exception { public void testPollNotificationWithDefaultNamespace() throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter, someReleaseId, .pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter, someReleaseId);
response);
String key = String.format("%s-%s-%s", someAppId, someCluster, defaultNamespace); String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, defaultNamespace);
assertEquals(1, deferredResults.size()); assertEquals(1, deferredResults.size());
assertTrue(deferredResults.get(key).contains(deferredResult)); assertTrue(deferredResults.get(key).contains(deferredResult));
} }
...@@ -87,18 +87,21 @@ public class NotificationControllerTest { ...@@ -87,18 +87,21 @@ public class NotificationControllerTest {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter, .pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter,
someReleaseId, someReleaseId);
response);
List<String> publicClusters = List<String> publicClusters =
Lists.newArrayList(someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT); Lists.newArrayList(someDataCenter, ConfigConsts.CLUSTER_NAME_DEFAULT);
assertEquals(3, deferredResults.size()); assertEquals(3, deferredResults.size());
String key = String.format("%s-%s-%s", someAppId, someCluster, somePublicNamespace); String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, somePublicNamespace);
assertTrue(deferredResults.get(key).contains(deferredResult)); assertTrue(deferredResults.get(key).contains(deferredResult));
for (String cluster : publicClusters) { for (String cluster : publicClusters) {
String publicKey = String.format("%s-%s-%s", somePublicAppId, cluster, somePublicNamespace); String publicKey =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(somePublicAppId, cluster, somePublicNamespace);
assertTrue(deferredResults.get(publicKey).contains(deferredResult)); assertTrue(deferredResults.get(publicKey).contains(deferredResult));
} }
} }
...@@ -107,10 +110,11 @@ public class NotificationControllerTest { ...@@ -107,10 +110,11 @@ public class NotificationControllerTest {
public void testPollNotificationWithDefaultNamespaceAndHandleMessage() throws Exception { public void testPollNotificationWithDefaultNamespaceAndHandleMessage() throws Exception {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter, someReleaseId, .pollNotification(someAppId, someCluster, defaultNamespace, someDataCenter, someReleaseId);
response);
String key = String.format("%s-%s-%s", someAppId, someCluster, defaultNamespace); String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, defaultNamespace);
controller.handleMessage(key, Topics.APOLLO_RELEASE_TOPIC); controller.handleMessage(key, Topics.APOLLO_RELEASE_TOPIC);
...@@ -133,10 +137,12 @@ public class NotificationControllerTest { ...@@ -133,10 +137,12 @@ public class NotificationControllerTest {
DeferredResult<ResponseEntity<ApolloConfigNotification>> DeferredResult<ResponseEntity<ApolloConfigNotification>>
deferredResult = controller deferredResult = controller
.pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter, someReleaseId, .pollNotification(someAppId, someCluster, somePublicNamespace, someDataCenter,
response); someReleaseId);
String key = String.format("%s-%s-%s", somePublicAppId, someDataCenter, somePublicNamespace); String key =
Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(somePublicAppId, someDataCenter, somePublicNamespace);
controller.handleMessage(key, Topics.APOLLO_RELEASE_TOPIC); controller.handleMessage(key, Topics.APOLLO_RELEASE_TOPIC);
......
package com.ctrip.apollo.configservice.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter;
@Configuration
@Order(99)
public class TestWebSecurityConfig extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http.httpBasic();
http.csrf().disable();
http.authorizeRequests().antMatchers("/").permitAll().and()
.authorizeRequests().antMatchers("/console/**").permitAll();
http.headers().frameOptions().disable();
}
@Autowired
public void configureGlobal(AuthenticationManagerBuilder auth) throws Exception {
auth.inMemoryAuthentication().withUser("user").password("").roles("USER");
auth.inMemoryAuthentication().withUser("apollo").password("").roles("USER", "ADMIN");
}
}
...@@ -83,7 +83,8 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest ...@@ -83,7 +83,8 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest
public void testQueryConfigNotModified() throws Exception { public void testQueryConfigNotModified() throws Exception {
String releaseId = String.valueOf(991); String releaseId = String.valueOf(991);
ResponseEntity<ApolloConfig> response = restTemplate ResponseEntity<ApolloConfig> response = restTemplate
.getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?releaseId={releaseId}", ApolloConfig.class, .getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?releaseId={releaseId}",
ApolloConfig.class,
getHostUrl(), someAppId, someCluster, someNamespace, releaseId); getHostUrl(), someAppId, someCluster, someNamespace, releaseId);
assertEquals(HttpStatus.NOT_MODIFIED, response.getStatusCode()); assertEquals(HttpStatus.NOT_MODIFIED, response.getStatusCode());
...@@ -94,7 +95,8 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest ...@@ -94,7 +95,8 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) @Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testQueryPublicConfigWithDataCenterFoundAndNoOverride() throws Exception { public void testQueryPublicConfigWithDataCenterFoundAndNoOverride() throws Exception {
ResponseEntity<ApolloConfig> response = restTemplate ResponseEntity<ApolloConfig> response = restTemplate
.getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?dataCenter={dateCenter}", ApolloConfig.class, .getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?dataCenter={dateCenter}",
ApolloConfig.class,
getHostUrl(), someAppId, someCluster, somePublicNamespace, someDC); getHostUrl(), someAppId, someCluster, somePublicNamespace, someDC);
ApolloConfig result = response.getBody(); ApolloConfig result = response.getBody();
...@@ -111,11 +113,12 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest ...@@ -111,11 +113,12 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest
@Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD) @Sql(scripts = "/integration-test/cleanup.sql", executionPhase = Sql.ExecutionPhase.AFTER_TEST_METHOD)
public void testQueryPublicConfigWithDataCenterFoundAndOverride() throws Exception { public void testQueryPublicConfigWithDataCenterFoundAndOverride() throws Exception {
ResponseEntity<ApolloConfig> response = restTemplate ResponseEntity<ApolloConfig> response = restTemplate
.getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?dataCenter={dateCenter}", ApolloConfig.class, .getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?dataCenter={dateCenter}",
ApolloConfig.class,
getHostUrl(), someAppId, someDefaultCluster, somePublicNamespace, someDC); getHostUrl(), someAppId, someDefaultCluster, somePublicNamespace, someDC);
ApolloConfig result = response.getBody(); ApolloConfig result = response.getBody();
assertEquals("994|993", result.getReleaseId()); assertEquals("994" + ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR + "993", result.getReleaseId());
assertEquals(someAppId, result.getAppId()); assertEquals(someAppId, result.getAppId());
assertEquals(someDefaultCluster, result.getCluster()); assertEquals(someDefaultCluster, result.getCluster());
assertEquals(somePublicNamespace, result.getNamespace()); assertEquals(somePublicNamespace, result.getNamespace());
...@@ -129,7 +132,8 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest ...@@ -129,7 +132,8 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest
public void testQueryPublicConfigWithDataCenterNotFoundAndNoOverride() throws Exception { public void testQueryPublicConfigWithDataCenterNotFoundAndNoOverride() throws Exception {
String someDCNotFound = "someDCNotFound"; String someDCNotFound = "someDCNotFound";
ResponseEntity<ApolloConfig> response = restTemplate ResponseEntity<ApolloConfig> response = restTemplate
.getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?dataCenter={dateCenter}", ApolloConfig.class, .getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?dataCenter={dateCenter}",
ApolloConfig.class,
getHostUrl(), someAppId, someCluster, somePublicNamespace, someDCNotFound); getHostUrl(), someAppId, someCluster, somePublicNamespace, someDCNotFound);
ApolloConfig result = response.getBody(); ApolloConfig result = response.getBody();
...@@ -147,11 +151,12 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest ...@@ -147,11 +151,12 @@ public class ConfigControllerIntegrationTest extends AbstractBaseIntegrationTest
public void testQueryPublicConfigWithDataCenterNotFoundAndOverride() throws Exception { public void testQueryPublicConfigWithDataCenterNotFoundAndOverride() throws Exception {
String someDCNotFound = "someDCNotFound"; String someDCNotFound = "someDCNotFound";
ResponseEntity<ApolloConfig> response = restTemplate ResponseEntity<ApolloConfig> response = restTemplate
.getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?dataCenter={dateCenter}", ApolloConfig.class, .getForEntity("{baseurl}/configs/{appId}/{clusterName}/{namespace}?dataCenter={dateCenter}",
ApolloConfig.class,
getHostUrl(), someAppId, someDefaultCluster, somePublicNamespace, someDCNotFound); getHostUrl(), someAppId, someDefaultCluster, somePublicNamespace, someDCNotFound);
ApolloConfig result = response.getBody(); ApolloConfig result = response.getBody();
assertEquals("994|992", result.getReleaseId()); assertEquals("994" + ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR + "992", result.getReleaseId());
assertEquals(someAppId, result.getAppId()); assertEquals(someAppId, result.getAppId());
assertEquals(someDefaultCluster, result.getCluster()); assertEquals(someDefaultCluster, result.getCluster());
assertEquals(somePublicNamespace, result.getNamespace()); assertEquals(somePublicNamespace, result.getNamespace());
......
package com.ctrip.apollo.configservice.integration; package com.ctrip.apollo.configservice.integration;
import com.ctrip.apollo.biz.message.Topics; import com.google.common.base.Joiner;
import com.ctrip.apollo.biz.entity.ReleaseMessage;
import com.ctrip.apollo.biz.repository.ReleaseMessageRepository;
import com.ctrip.apollo.configservice.controller.NotificationController; import com.ctrip.apollo.configservice.controller.NotificationController;
import com.ctrip.apollo.core.ConfigConsts; import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.core.dto.ApolloConfigNotification; import com.ctrip.apollo.core.dto.ApolloConfigNotification;
...@@ -14,7 +17,6 @@ import org.springframework.test.context.jdbc.Sql; ...@@ -14,7 +17,6 @@ import org.springframework.test.context.jdbc.Sql;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -26,6 +28,8 @@ import static org.junit.Assert.assertEquals; ...@@ -26,6 +28,8 @@ import static org.junit.Assert.assertEquals;
public class NotificationControllerIntegrationTest extends AbstractBaseIntegrationTest { public class NotificationControllerIntegrationTest extends AbstractBaseIntegrationTest {
@Autowired @Autowired
private NotificationController notificationController; private NotificationController notificationController;
@Autowired
private ReleaseMessageRepository releaseMessageRepository;
private String someAppId; private String someAppId;
private String someCluster; private String someCluster;
private String defaultNamespace; private String defaultNamespace;
...@@ -43,20 +47,16 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -43,20 +47,16 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
@Test @Test
public void testPollNotificationWithDefaultNamespace() throws Exception { public void testPollNotificationWithDefaultNamespace() throws Exception {
Future<ResponseEntity<ApolloConfigNotification>> future = AtomicBoolean stop = new AtomicBoolean();
executorService.submit(() -> restTemplate perodicSendMessage(assembleKey(someAppId, someCluster, defaultNamespace), stop);
.getForEntity(
"{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}",
ApolloConfigNotification.class,
getHostUrl(), someAppId, someCluster, defaultNamespace));
//wait for the request connected to server ResponseEntity<ApolloConfigNotification> result = restTemplate.getForEntity(
TimeUnit.MILLISECONDS.sleep(500); "{baseurl}/notifications?appId={appId}&cluster={clusterName}&namespace={namespace}",
ApolloConfigNotification.class,
getHostUrl(), someAppId, someCluster, defaultNamespace);
notificationController.handleMessage(assembleKey(someAppId, someCluster, defaultNamespace), stop.set(true);
Topics.APOLLO_RELEASE_TOPIC);
ResponseEntity<ApolloConfigNotification> result = future.get(500, TimeUnit.MILLISECONDS);
ApolloConfigNotification notification = result.getBody(); ApolloConfigNotification notification = result.getBody();
assertEquals(HttpStatus.OK, result.getStatusCode()); assertEquals(HttpStatus.OK, result.getStatusCode());
assertEquals(defaultNamespace, notification.getNamespace()); assertEquals(defaultNamespace, notification.getNamespace());
...@@ -69,19 +69,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -69,19 +69,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
String publicAppId = "somePublicAppId"; String publicAppId = "somePublicAppId";
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
executorService.submit((Runnable) () -> { perodicSendMessage(assembleKey(publicAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, somePublicNamespace), stop);
//wait for the request connected to server
while (!stop.get() && !Thread.currentThread().isInterrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
notificationController.handleMessage(
assembleKey(publicAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, somePublicNamespace),
Topics.APOLLO_RELEASE_TOPIC);
}
});
ResponseEntity<ApolloConfigNotification> result = restTemplate ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity( .getForEntity(
...@@ -104,19 +92,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -104,19 +92,7 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
String someDC = "someDC"; String someDC = "someDC";
AtomicBoolean stop = new AtomicBoolean(); AtomicBoolean stop = new AtomicBoolean();
executorService.submit((Runnable) () -> { perodicSendMessage(assembleKey(publicAppId, someDC, somePublicNamespace), stop);
//wait for the request connected to server
while (!stop.get() && !Thread.currentThread().isInterrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
notificationController.handleMessage(
assembleKey(publicAppId, someDC, somePublicNamespace),
Topics.APOLLO_RELEASE_TOPIC);
}
});
ResponseEntity<ApolloConfigNotification> result = restTemplate ResponseEntity<ApolloConfigNotification> result = restTemplate
.getForEntity( .getForEntity(
...@@ -131,8 +107,22 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati ...@@ -131,8 +107,22 @@ public class NotificationControllerIntegrationTest extends AbstractBaseIntegrati
assertEquals(somePublicNamespace, notification.getNamespace()); assertEquals(somePublicNamespace, notification.getNamespace());
} }
private String assembleKey(String appId, String cluster, String namespace) { private String assembleKey(String appId, String cluster, String namespace) {
return String.format("%s-%s-%s", appId, cluster, namespace); return Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR).join(appId, cluster, namespace);
}
private void perodicSendMessage(String message, AtomicBoolean stop) {
executorService.submit((Runnable) () -> {
//wait for the request connected to server
while (!stop.get() && !Thread.currentThread().isInterrupted()) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
ReleaseMessage releaseMessage = new ReleaseMessage(message);
releaseMessageRepository.save(releaseMessage);
}
});
} }
} }
...@@ -3,3 +3,6 @@ spring.jpa.hibernate.naming_strategy=org.hibernate.cfg.EJB3NamingStrategy ...@@ -3,3 +3,6 @@ spring.jpa.hibernate.naming_strategy=org.hibernate.cfg.EJB3NamingStrategy
spring.h2.console.enabled = true spring.h2.console.enabled = true
spring.h2.console.settings.web-allow-others=true spring.h2.console.settings.web-allow-others=true
spring.jpa.properties.hibernate.show_sql=true spring.jpa.properties.hibernate.show_sql=true
# for ReleaseMessageScanner test
apollo.message-scan.interval=100
...@@ -3,4 +3,5 @@ package com.ctrip.apollo.core; ...@@ -3,4 +3,5 @@ package com.ctrip.apollo.core;
public interface ConfigConsts { public interface ConfigConsts {
String NAMESPACE_DEFAULT = "application"; String NAMESPACE_DEFAULT = "application";
String CLUSTER_NAME_DEFAULT = "default"; String CLUSTER_NAME_DEFAULT = "default";
String CLUSTER_NAMESPACE_SEPARATOR = "+";
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment