第五章:Spring Cloud总线

Spring cloud bus通过轻量消息代理连接各个分布的节点。这会用在广播状态的变化(例如配置变化)或者其他的消息指令。Spring bus的一个核心思想是通过分布式的启动器对spring boot应用进行扩展,也可以用来建立一个多个应用之间的通信频道。目前唯一实现的方式是用AMQP消息代理作为通道,同样特性的设置(有些取决于通道的设置)在更多通道的文档中。

注意:spring cloud在 非严格的Apache 2.0版权说明下发布的,如果你发现了什么错误,可以共享到文档中,在github上能找到项目的源码和问题记录。

快速启动

Spring cloud bus检测到项目在classpath下添加了spring boot自动配置就可以工作了。你需要做的是把spring-cloud-starter-bus-amqp或者spring-cloud-starter-bus-kafka添加到你的依赖管理中,Spring cloud会处理接下来的部分。确认消息代理(RabbitMQ或者Kafka)可用,并且配置好了:在本地运行的话不需要做其他的事,但想远程使用Spring cloud的话,可以用spring cloud连接器或者Spring Boot约定定义消息代理的细节,例如Rabbit:

application.yml

spring:
  rabbitmq:
    host: mybroker.com
    port: 5672
    username: user
    password: secret

总线目前支持发送消息到所有监听的节点,或者某个服务的所有节点(Eureka定义的)。未来会添加更多的选择约束(例如服务于数据中心y的x个节点,等等...)。还有在/bus/*命名空间下的许多http端。目前有两种实现。第一个是/bus/env,以key/value键值对的方式发送每个节点的spring环境变量的更新。另一个是/bus/refresh,会强制每个spring分布节点的配置,就像对每个应用调用/refresh路径一样。

注意:总线尝试覆盖Rabbit和Kafka,因为这些是最常用的2中实现,但是spring cloud流非常方便,并且可以和spring-cloud-bus一起绑定使用。

为实例确定地址

http端接收destination参数,例如/bus/refresh?destination=customers:9000,destination参数是ApplicationContext 的id,如果这个id被总线上的某个实例拥有的话,这个实例负责处理他的消息,而其他的实例则会忽视它。Spring boot默认通过spring.application.name和active profiles和server.port的结合一起在ContextIdApplicationContextInitializer中设置id。

为一个服务的所有实例定位

总线对一个事件的处理发出两次事件,一次是通过ApplicationEvent,另一次通过queue队列。它会请求的应用的id检查和当前应用的id做比较。如果有多个实例有同样的应用id,事件就会停止处理。本地机器上运行的话,每个服务会运行在不同的端口上并且应用id包含了端口号信息。Cloud Foundry提供了去重的索引。为了确保每一个应用的id都是唯一的话,为每个服务的id设置一个spring.application.index唯一值。例如,在spring lattice上,application.properties配置文件里设置 spring.application.index=${INSTANCE_INDEX}(或者在配置服务里面是bootstrap.properties)。

自定义消息代理

Spring cloud 总线用Spring Cloud Stream 广播并获取消息,只需要在classpath下添加自己需要的绑定实现。spring cloud总线为AMQP(RabbitMQ)以及Kafka(spring-cloud-starter-bus-[amqp,kafka]))提供了非常简单的开始项目。更通俗的说,spring cloud stream依赖于spring boot的自动配置约定去配置中间件,因此amqp的地址可以通过spring.rabbitmq.*配置属性配置。spring cloud总线提供了大量spring.cloud.bus.*配置属性(例如spring.cloud.bus.destination是使用其他中间件的主题名称)。一般地,初始值足够了。 想要对spring cloud stream进行更多的配置,可以查阅Spring Cloud Stream的文档。

总线事件跟踪日志

总线事件(RemoteApplicationEvent的子类)的日志可以通过设置spring.cloud.bus.trace.enabled=true开启。这样操作了的话,TraceRepository(如果存在的话)会显示所有发出的事件,以及每个服务实例的所有被通知的情况。例如(在/trace路径下):

{
  "timestamp": "2015-11-26T10:24:44.411+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "stores:8081",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.864+0000",
  "info": {
    "signal": "spring.cloud.bus.sent",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.862+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
}

这个日志显示了customers:9000发出了RefreshRemoteApplicationEvent事件,广播给所有的服务,被customers:9000和stores:8081接受到了。想要对接受到的消息自定义自己的处理方式的话,可以添加@EventListener注解的AckRemoteApplicationEvent和SentApplicationEvent类型到你自己的应用中。或者到TraceRepository类中,直接处理数据。

注意:任何总线的应用都可以追踪消息通知的日志,但有时候对中心化服务做复杂的数据信息查询会很有用。或者直接推送到特定的日志跟踪服务上。

广播自己的事件

总线会成承载任意RemoteApplicationEvent类型的事件,默认是通过JSON协议,反序列化需要事先知道用的什么类型。需要在org.springframework.cloud.bus.event子包里面注册新的类型。你可以使用@JsonTypeName注解自己的类,或者使用默认策略,简单的使用类名。注意,服务消费者和提供者都要能对这个类能访问到。

results matching ""

    No results matching ""