Commit 469c1c18 authored by bkblack's avatar bkblack Committed by Jason Song
parent 989f37b1
...@@ -132,6 +132,34 @@ public class NotificationControllerV2 implements ReleaseMessageListener { ...@@ -132,6 +132,34 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values()); Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());
/**
* 1、set deferredResult before the check, for avoid more waiting
* If the check before setting deferredResult,it may receive a notification the next time
* when method handleMessage is executed between check and set deferredResult.
*/
deferredResultWrapper
.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResultWrapper.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});
//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespaces, dataCenter);
/**
* 2、check new release
*/
List<ReleaseMessage> latestReleaseMessages = List<ReleaseMessage> latestReleaseMessages =
releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys); releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);
...@@ -149,26 +177,6 @@ public class NotificationControllerV2 implements ReleaseMessageListener { ...@@ -149,26 +177,6 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
if (!CollectionUtils.isEmpty(newNotifications)) { if (!CollectionUtils.isEmpty(newNotifications)) {
deferredResultWrapper.setResult(newNotifications); deferredResultWrapper.setResult(newNotifications);
} else {
deferredResultWrapper
.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));
deferredResultWrapper.onCompletion(() -> {
//unregister all keys
for (String key : watchedKeys) {
deferredResults.remove(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");
});
//register all keys
for (String key : watchedKeys) {
this.deferredResults.put(key, deferredResultWrapper);
}
logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");
logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",
watchedKeys, appId, cluster, namespaces, dataCenter);
} }
return deferredResultWrapper.getResult(); return deferredResultWrapper.getResult();
...@@ -308,4 +316,3 @@ public class NotificationControllerV2 implements ReleaseMessageListener { ...@@ -308,4 +316,3 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment