pigeon的服务注册与发现

前言

pigeon是大众点评内部一直在使用的rpc框架,同时带有服务治理的功能。
组件是使用zookeeper + netty + jetty完成。
现在也已经开源

pigeon

但是开源似乎做的不用心,可能也是看dubbo很火热,pigeon没人用
而且pigeon和点评内部的一点闭源的框架联系紧密,所以用的人并不多(反正我是一个没看到)。

使用

demo

Server端

pigeon是与spring进行了很重的耦合的,如果要使用pigeon,那么必须使用spring框架。

如果要暴露我们的服务
比如创建一个简单的UserService接口和实现。

1
2
3
4
5
6
7
8
9
10
public interface UserService {
String getNameById();
}

public class UserServiceImpl implements UserService {
@Override
public String getNameById() {
return "id + name";
}
}

由于pigeon的服务注册和发现是依赖zookeeper的,我们还需要装载一个zookeeper
在resources文件夹下创建config文件夹,在config文件夹下创建pigeon.properties文件
在其中写上zookeeper的ip和端口

1
pigeon.registry.address=localhost:2181

同时为了区分服务,需要给我们的服务起一个名字
sourcesMETA-INF文件夹的app.properties里写上

1
pigeon.registry.address=localhost:2181

如果使用typical的方法声明服务的话,我们创建一个spring的配置文件,假设就叫spring.xml

1
2
3
4
5
6
7
8
9
10
<bean id="userServiceImpl" class="cc.lovezhy.service.UserServiceImpl"/>

<bean class="com.dianping.pigeon.remoting.provider.config.spring.ServiceBean" init-method="init">
<property name="services">
<map>
<entry
key="cc.lovezhy.service.UserService" value-ref="userServiceImpl"/>
</map>
</property>
</bean>

这时候,就可以直接用Main方法启动了

1
2
3
4
5
6
7
public class Main {
public static void main(String[] args) throws Exception {
SpringContainer container = new SpringContainer("classpath*:/META-INF/spring.xml");
container.start();
System.in.read();
}
}

当然如果你想要部署到tomcat中也是可以的。

同时默认在localhost:4080/services上我们可以看到当前我们暴露出去的服务
并且可以调用。

4080/services

client端

client端只需要包含服务的接口就行了。
所以service端的代码编写都是分模块写的,一个模块专门提供DTOservice接口
打包成maven供其他人使用。
然后具体的实现我们再在另外一个模块里写。

resourcesMETA-INF的内容和服务端一样的。
就是在bean的声明的时候,我们需要这样

1
2
3
4
<bean id="userService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean"
init-method="init">
<property name="interfaceName" value="cc.lovezhy.service.UserService"/>
</bean>

然后我们就可以在Main方法中引用了

1
2
3
4
5
6
7
public class Main {
public static void main(String[] args) {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath*:spring.xml");
UserService userService = applicationContext.getBean(UserService.class);
System.out.println(userService.getNameById());
}
}

配置客户端调用模式

pigeon内部,客户端调用远程服务有4种模式

  • sync同步
  • future异步
  • callback也是异步,只是通过回调的方式来处理结果
  • oneway不需要回复

例如spring编程方式下只需要配置callType属性:

1
2
3
4
5
6
<bean id="babyAccountService" class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean" init-method="init">
<property name="interfaceName" value="com.dianping.babytech.casecenter.api.BabyAccountService"/>
<property name="serialize" value="hessian" />
<property name="callType" value="sync"/>
<property name="timeout" value="5000"/>
</bean>

官方文档解释的很清楚了所有的选项。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<bean id="echoService" 
class="com.dianping.pigeon.remoting.invoker.config.spring.ReferenceBean"
init-method="init">
<!-- 服务全局唯一的标识url,默认是服务接口类名,必须设置 -->
<property name="url" value="http://service.dianping.com/demoService/echoService_1.0.0" />
<!-- 接口名称,必须设置 -->
<property name="interfaceName" value="com.dianping.pigeon.demo.EchoService" />
<!-- 超时时间,毫秒,默认5000,建议自己设置 -->
<property name="timeout" value="2000" />
<!-- 序列化,hessian/fst/protostuff,默认hessian,可不设置-->
<property name="serialize" value="hessian" />
<!-- 调用方式,sync/future/callback/oneway,默认sync,可不设置 -->
<property name="callType" value="sync" />
<!-- 失败策略,快速失败failfast/失败转移failover/失败忽略failsafe/并发取最快返回forking,默认failfast,可不设置 -->
<property name="cluster" value="failfast" />
<!-- 是否超时重试,默认false,可不设置 -->
<property name="timeoutRetry" value="false" />
<!-- 重试次数,默认1,可不设置 -->
<property name="retries" value="1" />
</bean>

ReferenceBean的获取,init方法

这个我也画了一个简单的图,不过省略了很多细节部分,在InvokerBootStrap部分和Serializer.proxyRequest部分省略了很多。

下面具体看代码。

在我们声明bean的时候还带有一个init-method参数
init-method="init"
意思是在这个bean创建的时候还会调用一下ReferenceBeaninit方法。

init方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void init() throws Exception {
this.objType = ClassUtils.loadClass(this.classLoader, this.interfaceName.trim());
//这里直接就创建了,其实如果配置简单的话,很多的参数都是空的。
InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,
this.serialize, this.callback, this.group, this.writeBufferLimit, this.loadBalance, this.cluster,
this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);
invokerConfig.setClassLoader(classLoader);
invokerConfig.setSecret(secret);
invokerConfig.setRegionPolicy(regionPolicy);

if (!CollectionUtils.isEmpty(methods)) {
Map<String, InvokerMethodConfig> methodMap = new HashMap<String, InvokerMethodConfig>();
invokerConfig.setMethods(methodMap);
for (InvokerMethodConfig method : methods) {
methodMap.put(method.getName(), method);
}
}

//降级配置检查
checkMock();
invokerConfig.setMock(mock);
checkRemoteAppkey();
invokerConfig.setRemoteAppKey(remoteAppKey);
//这里就得到了service的代理实例。
this.obj = ServiceFactory.getService(invokerConfig);
configLoadBalance(invokerConfig);
}
1
2
3
InvokerConfig<?> invokerConfig = new InvokerConfig(this.objType, this.url, this.timeout, this.callType,
this.serialize, this.callback, this.suffix, this.writeBufferLimit, this.loadBalance, this.cluster,
this.retries, this.timeoutRetry, this.vip, this.version, this.protocol);

InvokerConfig参数

  • Class<T> serviceInterface 接口的Class类
  • String url 服务全局唯一的标识url,感觉有了serviceInterface就够了
  • String version 版本?
  • byte callMethod 就是call的方式,sync还是future之类,但是是byte类型的,1代表sync等
  • String callType callType就是callMethod的String
  • byte serialize 序列化方式,默认是hessian
  • int timeout 超时时间
  • InvocationCallback callback 设置了callback模式才有
  • String suffix
  • String loadbalance 负载均衡的策略设置
  • String routePolicy 路由的规则
  • RoutePolicy routePolicyObj
  • boolean timeoutRetry
  • String cluster
  • int retries 失败了重试的次数
  • String vip
  • int maxRequests
  • String protocol
  • Map<String, InvokerMethodConfig> methods
  • ClassLoader classLoader
  • String secret
  • String remoteAppKey
  • Object mock

上面还提到一个服务降级的问题,这个我们之后再说。

从上面看到,得到的service是从ServiceFactory这个类直接得到的。

getService的时候直接get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//ServiceFactory这个类,主要就是管理service的加载,发布和获取的。
//它的很多方法,从名字就可以看出来,发布service,取消service,获得service,给service设置权重之类。
public class ServiceFactory {
private static ServiceProxy serviceProxy = ServiceProxyLoader.getServiceProxy();
private static PublishPolicy publishPolicy = PublishPolicyLoader.getPublishPolicy();
static {
try {
//这个类,从名字上就可以看到和Provider相关的,进行了一些东西的初始化
ProviderBootStrap.init();
} catch (Throwable t) {
logger.error("error while initializing service factory:", t);
System.exit(1);
}
}
public static <T> T getService(InvokerConfig<T> invokerConfig) throws RpcException {
return serviceProxy.getProxy(invokerConfig);
}
}

默认的话是建一个DefaultServiceProxy
它的调用是调用了父类的

1
2
3
4
5
6
public final class DefaultServiceProxy extends AbstractServiceProxy {
@Override
public <T> T getProxy(InvokerConfig<T> invokerConfig) {
return super.getProxy(invokerConfig);
}
}

AbstractServiceProxy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
//这个其实是个缓存的map,看下面的get过程中有个加锁的过程
protected final static Map<InvokerConfig<?>, Object> services = new ConcurrentHashMap<InvokerConfig<?>, Object>();
@Override
public <T> T getProxy(InvokerConfig<T> invokerConfig) {
//...省略好多好多
Object service = null;
service = services.get(invokerConfig);
if (service == null) {
//这个锁,好像用的guava中一个东西,提供和String.intern相同的作用但是不会占用老年代空间?
//有时间再研究研究
synchronized (interner.intern(invokerConfig)) {
service = services.get(invokerConfig);
if (service == null) {
try {
//参见另外一篇文章
InvokerBootStrap.startup();

//下面这句就调用AbstractSerializer中的proxyRequest动态生成一个代理类
service = SerializerFactory.getSerializer(invokerConfig.getSerialize()).proxyRequest(invokerConfig);
if (StringUtils.isNotBlank(invokerConfig.getLoadbalance())) {
LoadBalanceManager.register(invokerConfig.getUrl(), invokerConfig.getSuffix(), invokerConfig.getLoadbalance());
}
} catch (Throwable t) {
throw new RpcException("error while trying to get service:" + invokerConfig, t);
}

try {
//配置地域策略,默认的是autoSwitch
routePolicyManager.register(invokerConfig.getUrl(), invokerConfig.getSuffix(),
invokerConfig.getRoutePolicy());
} catch (Throwable t) {
throw new RouteException("error while setup region route policy: " + invokerConfig, t);
}

// watch service config
try {
serviceConfigManager.register(invokerConfig.getUrl());
} catch (Throwable t) {
throw new ConfigException("error while trying to watch service config: " + invokerConfig, t);
}

try {
ClientManager.getInstance().registerClients(invokerConfig);
} catch (Throwable t) {
logger.warn("error while trying to setup service client:" + invokerConfig, t);
}
//加到Map缓存里
services.put(invokerConfig, service);
}
}
}
return (T) service;
}

InvokerBootStrap.startup()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//有很多的init方法,说明都是和invoke相关的一些配件。

//这是处理调用超时问题的,把一个InvocationTimeoutListener跑在一个线程中
ServiceInvocationRepository.getInstance().init();

//初始化那个proxy的handle,下面会提到
InvokerProcessHandlerFactory.init();

//序列化工厂的初始化,默认支持很多序列化方式
SerializerFactory.init();

//负载均衡调度的初始化,默认支持四种
LoadBalanceManager.init();

//Region策略,就是分地域的策略,如果是北京上海都有服务,那么调用哪一边的问题。
RegionPolicyManager.INSTANCE.init();
Monitor monitor = MonitorLoader.getMonitor();
if (monitor != null) {
monitor.init();
}
isStartup = true;

得到服务的唯一标志,如果不指定url的话,默认就是接口的全称。

1
2
3
4
5
ServiceFactory.getServiceUrl(invokerConfig);
public static <T> String getServiceUrl(InvokerConfig<T> invokerConfig) {
String url = invokerConfig.getServiceInterface().getCanonicalName();
return url;
}

service代理对象

1
2
3
4
5
6
7
8
9
10
//proxyRequest得到一个proxy的对象
public abstract class AbstractSerializer implements Serializer {
@Override
public Object proxyRequest(InvokerConfig<?> invokerConfig) throws SerializationException {
//第一个参数是ClassLoader,第二个参数是interface的数组,第三个参数是Proxy类
return Proxy.newProxyInstance(ClassUtils.getCurrentClassLoader(invokerConfig.getClassLoader()),
new Class[] { invokerConfig.getServiceInterface() }, new ServiceInvocationProxy(invokerConfig,
InvokerProcessHandlerFactory.selectInvocationHandler(invokerConfig)));
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//这里的handler是连接远程调用的桥梁
public class ServiceInvocationProxy implements InvocationHandler {

public ServiceInvocationProxy(InvokerConfig<?> invokerConfig, ServiceInvocationHandler handler) {
this.invokerConfig = invokerConfig;
this.handler = handler;
}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(handler, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return handler.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return handler.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return handler.equals(args[0]);
}
return extractResult(handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)),
method.getReturnType());
}
}

当我们调用其他的方法时,其实是在

1
handler.handle(new DefaultInvokerContext(invokerConfig, methodName, parameterTypes, args)

这里面进行处理的。

这里的handle是

1
private ServiceInvocationHandler handler;

InvokerProcessHandlerFactory中得到一个实例
//这里应该是最重要的部分了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public final class InvokerProcessHandlerFactory {

private static List<InvocationInvokeFilter> bizProcessFilters = new LinkedList<InvocationInvokeFilter>();
private static ServiceInvocationHandler bizInvocationHandler = null;
private static volatile boolean isInitialized = false;
//这里进行了一个责任链的处理,类似于netty的那种
//在调用前进行日志,权限之类的分析。
//最后的RemoteCallInvokeFilter才是真正调用我们想要的方法
//初始化在之前就进行了,见上
public static void init() {
if (!isInitialized) {
if (Constants.MONITOR_ENABLE) {
registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
}
//trace 监控信息
registerBizProcessFilter(new TraceFilter());
//服务降级
registerBizProcessFilter(new DegradationFilter());

registerBizProcessFilter(new ClusterInvokeFilter());
//网关,统计流量啥的
registerBizProcessFilter(new GatewayInvokeFilter());
registerBizProcessFilter(new ContextPrepareInvokeFilter());
//安全验证
registerBizProcessFilter(new SecurityFilter());
//通过Netty调用
registerBizProcessFilter(new RemoteCallInvokeFilter());
bizInvocationHandler = createInvocationHandler(bizProcessFilters);
isInitialized = true;
}
}

public static ServiceInvocationHandler selectInvocationHandler(InvokerConfig<?> invokerConfig) {
return bizInvocationHandler;
}

@SuppressWarnings({ "rawtypes" })
private static <V extends ServiceInvocationFilter> ServiceInvocationHandler createInvocationHandler( List<V> internalFilters) {
ServiceInvocationHandler last = null;
List<V> filterList = new ArrayList<V>();
filterList.addAll(internalFilters);
//创建一个调用链
for (int i = filterList.size() - 1; i >= 0; i--) {
final V filter = filterList.get(i);
final ServiceInvocationHandler next = last;
last = new ServiceInvocationHandler() {
@SuppressWarnings("unchecked")
@Override
public InvocationResponse handle(InvocationContext invocationContext) throws Throwable {
InvocationResponse resp = filter.invoke(next, invocationContext);
return resp;
}
};
}
return last;
}

public static void registerBizProcessFilter(InvocationInvokeFilter filter) {
bizProcessFilters.add(filter);
}
}

RemoteCallInvokeFilter中invoke方法中,
调用的是InvokerUtils的方法

1
response = InvokerUtils.sendRequest(client, invocationContext.getRequest(), future);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static InvocationResponse sendRequest(Client client, InvocationRequest request, Callback callback) {
if (request.getCallType() == Constants.CALLTYPE_REPLY) {
RemoteInvocationBean invocationBean = new RemoteInvocationBean();
invocationBean.request = request;
invocationBean.callback = callback;
callback.setRequest(request);
callback.setClient(client);
invocationRepository.put(request.getSequence(), invocationBean);
}
InvocationResponse response = null;
try {
//发送的请求在这儿发送的
response = client.write(request);
} catch (NetworkException e) {
invocationRepository.remove(request.getSequence());
logger.warn("network exception ocurred:" + request, e);
throw e;
} finally {
if (response != null) {
invocationRepository.remove(request.getSequence());
}
}
return response;
}

client的实现有两种,一个是Tcp的还有一个是Http的,pigeon两种都支持。
好像一般的调用是调用的tcp的方案,然后在4080/services查看和调用的是http的协议。

Client实现

Netty的实现中的doWrite方法,其实就是调用了channelwrite0方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public InvocationResponse doWrite(InvocationRequest request) throws NetworkException {
NettyChannel channel = null;
try {
channel = channelPool.selectChannel();
ChannelFuture future = channel.write0(request);
afterWrite(request, channel);
if (request.getMessageType() == Constants.MESSAGE_TYPE_SERVICE
|| request.getMessageType() == Constants.MESSAGE_TYPE_HEART) {
future.addListener(new MessageWriteListener(request, channel));
}
} catch (Exception e) {
throw new NetworkException("[doRequest] remote call failed:" + request, e);
}
return null;
}

至此,代理bean的创建和invoke的流程大概就理清楚了。

但是其实它是怎么和zookeeper沟通拿到service的呢。

我猜想肯定是从那条责任链的某个地方中取得的,于是进行了一番苦苦查找。

ClusterFactoryselect中,默认是返回FailfastCluster,在它的invoke方法中

1
Client remoteClient = clientManager.getClient(invokerConfig, request, null);

有这句话。

这个Client默认是NettyClient

如果我们在仔细看一下ClientManager的构造函数

1
2
3
4
5
6
7
8
9
10
private ClientManager() {
this.providerAvailableListener = new ProviderAvailableListener();
this.clusterListener = new DefaultClusterListener(providerAvailableListener);
this.clusterListenerManager.addListener(this.clusterListener);
providerAvailableThreadPool.execute(this.providerAvailableListener);
RegistryEventListener.addListener(providerChangeListener);
RegistryEventListener.addListener(registryConnectionListener);
RegistryEventListener.addListener(groupChangeListener);
registerThreadPool.getExecutor().allowCoreThreadTimeOut(true);
}

这里其实启动了一个ProviderAvailableListener

这个类和RegisterManager关系密切,而RegisterManager则是掌管和zookeeper沟通的,由此不难看出,这里的service就是从这儿得到的。

服务端集群策略

ClusterInvokeFilter中进行的配置

  • failfast - 调用服务的一个节点失败后抛出异常返回,可以同时配置重试timeoutRetry和retries属性
  • failover - 调用服务的一个节点失败后会尝试调用另外的一个节点,可以同时配置重试 timeoutRetry和retries属性
  • failsafe - 调用服务的一个节点失败后不会抛出异常,返回null,后续版本会考虑按配置默认值返回
  • forking - 同时调用服务的所有可用节点,返回调用最快的节点结果数据。
    可以通过配置forkingSize,指定最多调用的节点数(pigeon2.10.3及以上版本通过xml配置forkingSize,其余版本可以通过lion配置{appkey}.pigeon.invoker.forking.size)
  • hedged - 发出第一个请求后,如果hedgedDelay时间内没有返回,会向其他节点发送第二个请求,返回最先返回的结果数据

ServiceBean的注册


(画了我好久的图)
具体的流程和ReferenceBean的获取其实差不了太多。