Commit fa5624df authored by Jason Song's avatar Jason Song

Merge pull request #173 from nobodyiam/notification-concurrent-fix

Notification concurrent fix
parents 605847dd be15d8ce
package com.ctrip.apollo.biz.utils;
import org.springframework.orm.jpa.EntityManagerFactoryAccessor;
import org.springframework.orm.jpa.EntityManagerFactoryUtils;
import org.springframework.orm.jpa.EntityManagerHolder;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionSynchronizationManager;
/**
* @author Jason Song(song_s@ctrip.com)
*/
@Component
public class EntityManagerUtil extends EntityManagerFactoryAccessor {
/**
* close the entity manager.
* Use it with caution! This is only intended for use with async request, which Spring won't
* close the entity manager until the async request is finished.
*/
public void closeEntityManager() {
EntityManagerHolder emHolder = (EntityManagerHolder)
TransactionSynchronizationManager.getResource(getEntityManagerFactory());
logger.debug("Closing JPA EntityManager in EntityManagerUtil");
EntityManagerFactoryUtils.closeEntityManager(emHolder.getEntityManager());
}
}
...@@ -14,6 +14,8 @@ import org.mockito.ArgumentCaptor; ...@@ -14,6 +14,8 @@ import org.mockito.ArgumentCaptor;
import org.unidal.lookup.ComponentTestCase; import org.unidal.lookup.ComponentTestCase;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Properties; import java.util.Properties;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
...@@ -82,12 +84,13 @@ public class LocalFileConfigRepositoryTest extends ComponentTestCase { ...@@ -82,12 +84,13 @@ public class LocalFileConfigRepositoryTest extends ComponentTestCase {
@Test @Test
public void testLoadConfigWithLocalFile() throws Exception { public void testLoadConfigWithLocalFile() throws Exception {
File file = new File(someBaseDir, assembleLocalCacheFileName());
String someKey = "someKey"; String someKey = "someKey";
String someValue = "someValue"; String someValue = "someValue\nxxx\nyyy";
Files.write(someKey + "=" + someValue, file, Charsets.UTF_8); Properties someProperties = new Properties();
someProperties.setProperty(someKey, someValue);
createLocalCachePropertyFile(someProperties);
LocalFileConfigRepository localRepo = new LocalFileConfigRepository(someBaseDir, someNamespace); LocalFileConfigRepository localRepo = new LocalFileConfigRepository(someBaseDir, someNamespace);
Properties properties = localRepo.getConfig(); Properties properties = localRepo.getConfig();
...@@ -186,4 +189,17 @@ public class LocalFileConfigRepositoryTest extends ComponentTestCase { ...@@ -186,4 +189,17 @@ public class LocalFileConfigRepositoryTest extends ComponentTestCase {
} }
} }
private File createLocalCachePropertyFile(Properties properties) throws IOException {
File file = new File(someBaseDir, assembleLocalCacheFileName());
FileOutputStream in = null;
try {
in = new FileOutputStream(file);
properties.store(in, "Persisted by LocalFileConfigRepositoryTest");
} finally {
if (in != null) {
in.close();
}
}
return file;
}
} }
...@@ -12,6 +12,7 @@ import com.ctrip.apollo.biz.entity.AppNamespace; ...@@ -12,6 +12,7 @@ import com.ctrip.apollo.biz.entity.AppNamespace;
import com.ctrip.apollo.biz.message.MessageListener; import com.ctrip.apollo.biz.message.MessageListener;
import com.ctrip.apollo.biz.message.Topics; import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.biz.service.AppNamespaceService; import com.ctrip.apollo.biz.service.AppNamespaceService;
import com.ctrip.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.apollo.core.ConfigConsts; import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.core.dto.ApolloConfigNotification; import com.ctrip.apollo.core.dto.ApolloConfigNotification;
import com.dianping.cat.Cat; import com.dianping.cat.Cat;
...@@ -27,7 +28,6 @@ import org.springframework.web.bind.annotation.RequestParam; ...@@ -27,7 +28,6 @@ import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.DeferredResult;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
...@@ -50,6 +50,9 @@ public class NotificationController implements MessageListener { ...@@ -50,6 +50,9 @@ public class NotificationController implements MessageListener {
@Autowired @Autowired
private AppNamespaceService appNamespaceService; private AppNamespaceService appNamespaceService;
@Autowired
private EntityManagerUtil entityManagerUtil;
@RequestMapping(method = RequestMethod.GET) @RequestMapping(method = RequestMethod.GET)
public DeferredResult<ResponseEntity<ApolloConfigNotification>> pollNotification( public DeferredResult<ResponseEntity<ApolloConfigNotification>> pollNotification(
@RequestParam(value = "appId") String appId, @RequestParam(value = "appId") String appId,
...@@ -95,6 +98,13 @@ public class NotificationController implements MessageListener { ...@@ -95,6 +98,13 @@ public class NotificationController implements MessageListener {
String dataCenter) { String dataCenter) {
List<String> publicWatchedKeys = Lists.newArrayList(); List<String> publicWatchedKeys = Lists.newArrayList();
AppNamespace appNamespace = appNamespaceService.findByNamespaceName(namespace); AppNamespace appNamespace = appNamespaceService.findByNamespaceName(namespace);
/**
* Manually close the entity manager.
* Since for async request, Spring won't do so until the request is finished,
* which is unacceptable since we are doing long polling - means the db connection would be hold
* for a very long time
*/
entityManagerUtil.closeEntityManager();
//check whether the namespace's appId equals to current one //check whether the namespace's appId equals to current one
if (Objects.isNull(appNamespace) || Objects.equals(applicationId, appNamespace.getAppId())) { if (Objects.isNull(appNamespace) || Objects.equals(applicationId, appNamespace.getAppId())) {
...@@ -133,13 +143,15 @@ public class NotificationController implements MessageListener { ...@@ -133,13 +143,15 @@ public class NotificationController implements MessageListener {
new ResponseEntity<>( new ResponseEntity<>(
new ApolloConfigNotification(keys.get(2)), HttpStatus.OK); new ApolloConfigNotification(keys.get(2)), HttpStatus.OK);
Collection<DeferredResult<ResponseEntity<ApolloConfigNotification>>> //create a new list to avoid ConcurrentModificationException
results = deferredResults.get(message); List<DeferredResult<ResponseEntity<ApolloConfigNotification>>> results =
Lists.newArrayList(deferredResults.get(message));
logger.info("Notify {} clients for key {}", results.size(), message); logger.info("Notify {} clients for key {}", results.size(), message);
for (DeferredResult<ResponseEntity<ApolloConfigNotification>> result : results) { for (DeferredResult<ResponseEntity<ApolloConfigNotification>> result : results) {
result.setResult(notification); result.setResult(notification);
} }
logger.info("Notification completed");
} }
private void logWatchedKeysToCat(List<String> watchedKeys, String eventName) { private void logWatchedKeysToCat(List<String> watchedKeys, String eventName) {
......
...@@ -7,6 +7,7 @@ import com.google.common.collect.Multimap; ...@@ -7,6 +7,7 @@ import com.google.common.collect.Multimap;
import com.ctrip.apollo.biz.entity.AppNamespace; import com.ctrip.apollo.biz.entity.AppNamespace;
import com.ctrip.apollo.biz.message.Topics; import com.ctrip.apollo.biz.message.Topics;
import com.ctrip.apollo.biz.service.AppNamespaceService; import com.ctrip.apollo.biz.service.AppNamespaceService;
import com.ctrip.apollo.biz.utils.EntityManagerUtil;
import com.ctrip.apollo.core.ConfigConsts; import com.ctrip.apollo.core.ConfigConsts;
import com.ctrip.apollo.core.dto.ApolloConfigNotification; import com.ctrip.apollo.core.dto.ApolloConfigNotification;
...@@ -15,6 +16,7 @@ import org.junit.Test; ...@@ -15,6 +16,7 @@ import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus; import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
import org.springframework.test.util.ReflectionTestUtils; import org.springframework.test.util.ReflectionTestUtils;
...@@ -39,6 +41,8 @@ public class NotificationControllerTest { ...@@ -39,6 +41,8 @@ public class NotificationControllerTest {
private String someDataCenter; private String someDataCenter;
@Mock @Mock
private AppNamespaceService appNamespaceService; private AppNamespaceService appNamespaceService;
@Mock
private EntityManagerUtil entityManagerUtil;
private Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>> private Multimap<String, DeferredResult<ResponseEntity<ApolloConfigNotification>>>
deferredResults; deferredResults;
...@@ -46,6 +50,7 @@ public class NotificationControllerTest { ...@@ -46,6 +50,7 @@ public class NotificationControllerTest {
public void setUp() throws Exception { public void setUp() throws Exception {
controller = new NotificationController(); controller = new NotificationController();
ReflectionTestUtils.setField(controller, "appNamespaceService", appNamespaceService); ReflectionTestUtils.setField(controller, "appNamespaceService", appNamespaceService);
ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil);
someAppId = "someAppId"; someAppId = "someAppId";
someCluster = "someCluster"; someCluster = "someCluster";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment