消息服务使用说明 更新时间:2017-09-13 

一、消息服务简介

BMCS消息服务是力方开放平台(BMOP)为了提高应用的API调用效率而推出的一种主动推送服务(以下简称BMCS),基于BMCS,应用获取力方的订单状态不需要不停的轮询查询订单API,仅需要在收到BMCS主动推送的消息时处理数据即可,大大提高了API的调用效率并且节省了应用的API流量包。

推送的内容主要包括两大类:

1、充值订单消息,包括:手机话费、手机流量、水电煤、固话宽带、游戏直充、交通罚款、加油卡直充、各类卡密、视频卡等充值状态变更消息(充值成功或失败)。

2、票务订单消息,包括:火车票、汽车票、飞机票等票务订单状态变更消息(预定完成、出票成功、出票失败、退票等等)。

那么如何使用BMCS呢?下面将基于SDK方式接受消息、接口方式接受消息和回调地址接收消息做详细说明。

二、消息服务使用流程

1、应用订阅消息主题

BOSS用户登录力方开放平台控制台,在“应用管理 > 选择应用 > 消息服务 > 订阅消息”页面,选择需要订阅的消息主题,点击操作列下的“订阅”按钮即可。如下图:

点击“订阅”按钮后,可以看到操作列显示已订阅,点击“取消订阅”即可取消订阅,另外也可以在“我的消息”中看到已经成功订阅的消息并且也支持取消订阅。消息主题取消以后,应用将不会接受到该主题的消息推送。如下图:

点击“消息主题名称”,可以查看此消息主题的详细信息,包括返回的详细字段信息,如下图:

2、给直销商用户(授权用户)开通消息

BOSS用户登录力方开放平台控制台,在“应用管理 > 选择应用 > 授权管理”页面,选择要开通的授权用户(开通消息必须是已授权的直销商用户, 点击如何授权),点击操作列下的“开启消息”按钮即可。如下图:

开启消息之后,需要给授权用户创建“消息分组”(点击消息分组说明),一般建议消息分组为授权用户编号,后续平台将会实现点击“开通消息”即默认创建消息分组(默认为授权用户编号)。注意“消息分组”名称不能是default或者以default开头,如下图:

创建之后如下图,如果需要修改,点击操作列下“消息分组内容”即可修改:

三、接受消息实现方式

应用订阅消息以后,并且为用户开通了消息,就可以获取消息内容进行处理了,获取消息有三种方式:通过SDK建立长连接接收消息、通过主动调用API接口拉取消息、通过回调地址异步接收消息,下面将对这三种方式的使用情况详细说明:

1、通过SDK建立长连接接收消息

目前SDK长连接仅支持JAVA和.NET语言,其他语言建议采用API接口主动拉取消息或回调地址异步接收消息。通过SDK接受消息,只需要关注接受消息后的业务处理,不需要关心消息重发、确认、长连接的心跳、重连接等操作,SDK内部会自动处理好这一切。

消息服务地址
正式环境 : ws://mc.bm001.com
public interface MessageHandler {

    /**
     * 消息通道客户端收到消息后,会回调该方法处理具体的业务,处理结果可以通过以下两种方式来表述:
     * 1.抛出异常或设置status.fail()表明消息处理失败,需要消息通道服务端重发
     * 2.不抛出异常,也没有设置status信息,则表明消息处理成功,消息通道服务端不会再投递此消息
     *
     * @param message 消息内容
     * @param status 处理结果,如果调用status.fail(),通道将会择机重发消息;否则,通道认为消息处理成功
     * @throws Exception 消息处理失败,消息通道将会择机重发消息
     */
    public void onMessage(Message message, MessageStatus status) throws Exception;

}
  //创建链接客户端,使用默认分组
  QmcsClient client = new QmcsClient(appkey,appSecret,"default");
  //消息处理
  client.setMessageHandler(new MessageHandler(){
        @Override
        public void onMessage(Message message, MessageStatus status) throws Exception {
            try {
                //以下仅为示例代码,实际业务处理根据自身需要开发
                System.out.println(message.getTopic());
                System.out.println(message.getContent());
            } catch (Exception e){
                 e.printStackTrace();
                 status.fail();//消息处理失败,需要服务端重新推送
            }
        }
  });
  client.connect("ws://mc.bm001.com");//发起连接请求

备注: 采用Java main方法在Eclipse或idea里面运行上面的代码测试时,需要让线程保持一段时间,请在client.connect()后面加上Thread.sleep或者System.in.read让main线程暂时不结束,以便观察消息的实时接收情况,否则main线程结束后,BMCS长连接也会跟着断开。如果是在web服务器上运行上面的代码,则不用在 client.connect()后面加任何线程等待代码,也不需要在外面包一层while(true)循环,因为web服务器上的主线程只要服务器不关闭都是不会结束的,BMCS的长连接会一直保持。

  //创建链接客户端,使用默认分组
  QmcsClient client = new QmcsClient(appkey,appSecret,"default");
  //消息处理
  client.OnMessage += (s,e) =>
  {
      try
      {
            //以下仅为示例代码,实际业务处理根据自身需要开发
            Console.WriteLine(e.Message.Topic);
            Console.WriteLine(e.Message.Content);
      }
      catch (Exception exp)
      {
            Console.WriteLine(exp.StackTrace);
            e.Fail();   //消息处理失败,需要服务端重新推送
      }
  };
  client.Connect("ws://mc.bm001.com");//发起连接请求

备注: 采用C# Main方法在VS控制台工程里面运行上面的代码测试时,请在client.Connect后面加上Console.Read()或 Thread.Sleep让main线程暂时不结束,以便观察消息的实时接收情况,否则Main线程结束后,BMCS长连接也会跟着断开。如果是在IIS服务器或C#应用程序里面运行上面的代码,则不用在client.Connect后面加任何线程等待的代码,也不需要在外面包一层while(true)循环,只要保持IIS服务器或C#应用程序不关闭,BMCS的长连接会一直保持。

2、通过主动调用API接口拉取消息

由于很多语言无法保障多线程和长连接处理,或者处理起来非常不方便,比如PHP、Python等,这些语言官方暂时未提供SDK接受消息方式,可以通过API方式来消费和确认消息,下面将对如何使用API方式获取消息做详细说明:

消费消息:

调用bm.bmcs.messages.consume接口消费消息,每次最多可以获取到100条消息,消费消息接口API只能获取未消费过的消息,已经确认过的消息是无法再次获取到的。

消费消息:

调用bm.bmcs.messages.confirm接口将消息置为已消费状态,如果不确认消费,开放平台将会每隔10分钟重新推送,如果消息3天内都没有确认消费,消息将会被清除。

  OpenClient client = new DefaultOpenClient(url,appkey,appSecret);
  do {
    Integer quantity = 100;
    String groupName = "default";
    QmcsMessagesConsumeResponse rsp = null;
    do {
        QmcsMessagesConsumeRequest req = new QmcsMessagesConsumeRequest();
        req.setQuantity(quantity);
        req.setGroupName(groupName);
        rsp = client.execute(req);
        if (rsp.isSuccess() && rsp.getQmcs_messages() != null) {
            for (QmcsMessage msg : rsp.getQmcs_messages()) {
                // 处理消息,这里仅为示例代码,实际业务根据自身需要开发
                System.out.println(msg.getContent());
                System.out.println(msg.getTopic());
                // 确认消费状态
                QmcsMessagesConfirmRequest cReq = new QmcsMessagesConfirmRequest();
                cReq.setSMessageIds(msg.getId());
                QmcsMessagesConfirmResponse cRsp = client.execute(cReq);
                System.out.println(cRsp.isSuccess());
            }
        }
        System.out.println(rsp.getBody());
    } while (rsp != null && rsp.isSuccess() && rsp.getQmcs_messages() != null );
    Thread.sleep(5000L); //根据自身服务器处理能力合理设置轮训时间
  } while (true);
  IOpenClient client = new DefaultOpenClient(url,appkey,appSecret);
  do
  {
    int quantity = 100;
    string groupName = "default";
    QmcsMessagesConsumeResponse rsp = null;
    do
    {
        QmcsMessagesConsumeRequest req = new QmcsMessagesConsumeRequest();
        req.setQuantity(quantity);
        req.setGroupName(groupName);
        rsp = client.execute(req);
        if (!rsp.IsError && rsp.QmcsMessages != null)
        {
            foreach (QmcsMessage msg in rsp.QmcsMessages)
            {
                // 处理消息,这里仅为示例代码,实际业务根据自身需要开发
                Console.WriteLine(msg.Topic);
                Console.WriteLine(msg.Content);
                // 确认消费状态
                QmcsMessagesConfirmRequest cReq = new QmcsMessagesConfirmRequest();
                cReq.setSMessageIds(msg.Id);
                QmcsMessagesConfirmResponse cRsp = client.execute(cReq);
                Console.WriteLine(cRsp.IsError);
            }
        }
        Console.WriteLine(rsp.Body);
    } while (rsp != null && !rsp.IsError && rsp.QmcsMessages != null );
    Thread.sleep(new TimeSpan(0, 0, 5)); //根据自身服务器处理能力合理设置轮训时间
  } while (true);

备注:通过API拉取消息的平均RT在网络好的情况下能达到100毫秒之内,请根据自身业务情况,以及自身服务器处理消息的能力,合理设置消息消费接口的轮训时间,如果设置轮训时间过短,可能会经常性的获取消息列表为空,产生很多无谓的请求,不仅浪费了开放平台的资源,还浪费了应用自身的API流量包。

3、通过回调地址异步接收消息

消息异步回调可能是很多开发者喜欢的一种方式,它实现简单并且高效,只需要提供一个Web服务器地址并支持Http协议即可。那如何实现消息异步回调呢?首先开放平台大多数类目的创建订单并支付API(bm.xxx.xxxx.payBill)都会有一个业务级可选的callback参数,只需要下单的时候传入接收回调消息的Web服务器地址,一旦订单结果状态(成功或失败)发生变更将会立即发起异步回调请求。

消息异步回调对接时需要注意和说明的地方:
  • callback参数必须填写全路径(包括http://)例如:http://api.bm001.com/recharge/callback。
  • 回调服务器地址必须支持POST请求,因为开放平台发起回调只支持发送POST请求,但是不排除以后开放平台会支持发起GET请求,所以回调服务器最好POST和GET请求都能支持。
  • 回调服务器在收到回调消息后需要在5秒内(开放平台等待响应超时时间为5秒)返回success字符串,如果没有返回success,开放平台后续将会每分钟发起1次回调请求,总共会发起4次。如果4次还未返回success,开放平台将会丢弃此消息,不再发送。
  • 回调的数据以键值对的方式发起,编码格式为UTF-8。
  • 为了保证回调数据在传输过程不被恶意拦截篡改,建议收到平台回调数据后验证签名,确保消息安全,点击查看数字签名机制
@RequestMapping(value = "/recharge/callback", method = {RequestMethod.POST, RequestMethod.GET})
@ResponseBody
public String postCallback(HttpServletRequest request, HttpServletResponse response) {
    // 从request获取参数
    Enumeration keys = request.getParameterNames();
    System.out.println("所有的参数:");
    while (keys.hasMoreElements()) {
        String key = (String)keys.nextElement();
        String value = request.getParameter(key);
        System.out.println(key + ":" + value);
    }
    // 验证签名,其他业务逻辑

    // 给服务器响应success字符串(收到请求后5秒内),否则将每一分钟发送回调信息一次,共发送4次。
    // 如果4次还未返回success,则服务器丢弃消息
    return "success";
}

备注:此示例使用了Spring MVC框架,如若项目中没有使用Spring MVC框架,只需要领会其示例代码中大体意图和基本原理然后可以自行去实现。

四、消息分组说明

一般在线订购应用,如果用户很多的情况,应用部署在多台服务器上面,组成了集群来接受消息,或者需要对当前用户进行消息隔离,分别处理,都可以通过设置分组的方式通过多连接来获取消息。

如果消息通过SDK主动推送方式获取,多连接有两种方式:1.创建多个用户分组,为每个分组创建一个连接通道;2.为同一个分组创建多个连接通道(同一应用使用同一分组最多可以创建5个连接通道)。

五、消息主题说明

力方开放平台已经将消息字段文档结构化,您可以通过“文档中心 > 消息文档”进入查看,点击查看消息文档, 您也可以登录力方开放平台控制台,在“应用管理 > 选择应用 > 消息服务 > 订阅消息”页面点击“消息主题名称”,可以查看此消息主题的详细信息,包括返回的详细字段信息。

六、消息体字段解析

为了方便ISV解析消息,下面对获取到的完整的消息内容示例进行数据解析,以E生活充值订单状态变更消息举例,消息topic为“bm_elife_rechargeStateChange”。

{
  "id":"MU20170417111623368",
  "topic":"bm_elife_rechargeStateChange",
  "pub_app_key":"10000000",
  "user_id":"A111100",
  "pub_time":"2017-04-17 11:16:23",
  "content":"{\"recharge_state\": \"1\", \"tid\": \"S00022112544\", \"timestamp\": \"2017-04-17 11:16:23\", \"user_id\": \"A111100\", \"sign\": \"9AEEF86791E4E754BD5A084D721E26B4AA1BE214\"}",
}
消息体字段说明
名称 示例 描述
id MU20170417111623368 消息ID
topic bm_elife_rechargeStateChange 消息类型topic
pub_app_key 10000000 消息所属appKey
user_id A111100 消息所属的用户编号
pub_time 2017-04-17 11:16:23 消息发布时间
sign 9AEEF86791E4E754BD5A084D721E26B4AA1BE214 签名
content {"recharge_state": "1", "tid": "S00022112544", "timestamp": "2017-04-17 11:16:23", "user_id": "A111100", "sign": "9AEEF86791E4E754BD5A084D721E26B4AA1BE214"} 消息业务内容,不同topic的字段不同,请参考消息文档

七、常见问题

1:什么是BMCS消息服务,为什么需要消息服务?

BMCS是力方开放平台推出的一种消息主动推送服务,之所以推出BMCS,典型的场景如下:

充值缴费订单充值,在调用支付接口以后,由于充值订单的特殊性,例如话费充值订单成功到账有一定的时差,并不是立即返回到账的,需要供货商回调力方以后订单充值状态才会变更(一般在1分钟内),在消息服务推出之前开发者只能定时轮询订单详情接口,这种方式有很大的局限性,商家不知道订单何时成功到账,只能不停轮询订单接口,并且只能单个订单处理,在订单量很大的情况下,不仅浪费了流量包,而且满足不了业务处理需要的。

通过消息服务推送,可以获取实时的订单状态变更消息,不仅可以减少减轻开放平台服务器压力,而且可以减少应用自身的API流量包消耗。如果开发者使用JAVA和C#等语言就可以使用SDK接受主动推送的消息,其他语言可以基于API定时轮询消费消息接口bm.bmcs.messages.consume,另外还提供回调地址主动推送消息,总有一款适合您,然后根据不同消息的TOPIC类型,解析不同的消息内容,更新相应的订单充值状态。

2:什么是分组,是否需要添加分组?

消息分组是进行消息隔离的手段,同一个分组内用户的消息只会通过分组获取信息,同一个分组支持多连接,随机发送到组内的某一个连接上面,例如在线订购应用,如果用户需要对不同用户的消息进行区别对待,比如,优先保证VIP缴费用户,然后在保证免费用户,就可以通过不同的分组进行消费消息,每个应用最多只能建50个自定义分组,每个分组用户数量不限,某个授权用户只能属于某一个分组,新的分组建立将会导致旧的分组失效。

应用如果存在多直销商共用一个appKey情况,建议各直销商都创建一个单独的分组,分组名称建议用各直销商A编号,以免消息被其他用户消费到。

3:什么是多连接,如何建立多连接?

多连接收消息是指同对一分组,ISV服务器与力方开放平台的消息服务器建立多个连接来收消息。多链接是对同一个分组而言,消息在下发时随机选择从分组内的多个连接中选择一个连接下发消息。多链接有的随机下发消息的功能,可以用同一分组多连接来实现集群,负载功能。

建立多链接只需要用相同的代码重新启动一个QmcsClient实例。可在同一个ISV服务器上,也可在不同的ISV服务器上,建立同一个分组的多链接。

4:在什么情况下需要建立多连接?

如果消息量很多,ISV客户端在单负载情况下处理不了堆积的消息,就可以建立多连接,一般情况下,单个分组是不需要建立多个连接的,单个连接就能把网卡泡满,消息服务的多连接,一般是应用在多个分组的情况下,或者做多负载集群部署的情况下。

5:如果用户Token过期,消息会不会推送?

消息推送是有2个条件的,1:授权用户的accessToken在有效期内,2:用户已开通该主题的消息,只有2个条件都满足的情况下消息才会推送,如果accessToken过期一个月后还没有进行刷新,那么消息开通的关系将会被删除。如果在一个月内重新授权,那么就不需要为这个用户再次开通消息服务。

6:消息服务有延时吗?

一般来说,开通消息接受功能以后1S之内便生效,使用中,如果消息产生,基本不会超过1秒就会收到消息,如果消息发送堆积或者程序处理不及时,就会有消息延迟。取消接受消息功能1秒内生效,已经产生的消息可以继续消费,新的消息将不会推送。

7:消息重新推送是什么场景?

通过SDK获取消息时,如果客户端断开了与服务端的连接,服务端的消息就会堆积,等应用重新连接以后,服务端会把已经堆积的消息按时间顺序推送给客户端,一条消息从发布之日起,服务端只会保留3天,如果客户端一直不接受消息,超过3天,服务端会自动清除,对于正常连接的客户端,如果消息处理失败,即调用status.fail(),那么服务端会隔10分钟进行第一次重新推送,如果一直处理失败,服务端会定时每隔10分钟重新推送,直到消息被清除。

通过API获取消息时,如果客户端在获取消息处理后没有调用消息确认接口进行确认,那么服务端会每隔10分钟重新推送一次消息,直至消息被确认消费或者过期被清除,另外在消息产生3天内都可以拉取到消息,超过3天消息会被清除

通过回调地址获取消息时,不会存在重新推送的场景,只有消息产生,就立马与下单传递的回调地址进行通信,如果连接超时或在5秒内没有返回success,后续将会每分钟发起1次回调请求,总共会发起4次。如果4次还未返回success,开放平台将会丢弃此消息,不再发送推送。

8:如果我下单传递了回调地址并且我也创建了SDK长连接而且我还会定时通过API获取消息,它们会如何处理消息的推送?

首先API获取消息和回调地址获取消息与SDK长连接不冲突,因为谁先得到消息,谁就推送,所有的消息都有一个状态,如果判断消息运行中,则其他消息通知不会处理此消息

回调地址获取消息和SDK长连接这两种方式会存在一个优先级的概念,当一个消息过来时,开放平台会先检查所属消息的SDK客户端与服务器端连接是否正常,如果正常,则使用SDK长连接推送,如果不正常,则获取回调地址推送,如果回调地址连续推送失败,则消息丢弃,所以一般不建议回调地址和SDK长连接并存,因为SDK长连接如果没有正常连接上,消息至少会保留3天。

9:SDK长连接如何做消息断开和心跳检测?

客户端如果需要断开与服务端的连接,则调用:QmcsClient.close(),判断心跳检测是否正常,则调用QmcsClient.isOnLine().

微信号:18851048710