搜索
查看: 10879|回复: 4

如何非阻塞的方式接受消息

[复制链接]

26

主题

104

帖子

644

积分

荣誉会员

Rank: 8Rank: 8

积分
644
发表于 2012-2-17 23:38:55 | 显示全部楼层 |阅读模式
问题背景:以namwonsta版主天气发布-订阅的例子,当发布端未启动或者没有发布消息的时候,订阅端会一直停在接受消息语句上(默认阻塞方式获取消息)。这个时候如何退出呢?

我试过的办法:
1.设置非阻塞方式读取消息
var msg = subscriber.recv(,,1/*_ZMQ_NOBLOCK*/)
运行结果:
---------------------------
aardio:运行时错误
---------------------------
Resource temporarily unavailable
---------------------------
确定   
---------------------------

2.设置接受超时参数
var ret = subscriber.setsockopt(0x11/*_ZMQ_LINGER*/,{int value=60};
运行结果: ret 返回true 说明,参数设置成功了。 (时间的单位不知道是秒还是毫秒)
但是我等了很久 还是没有超时返回

3.强制结束线程
结果:弹出 该内存不能读的错误

4.进程创建context 上下文设备,句柄传递给子线程读取消息。需要退出时候,进程执行 context.term()关闭
(我在namwonsta版主上传的资料找到相关说明
* 相反地,2.0中可以在尚有套接字打开的情况下调用zmq_term() )

运行结果:线程报错 error:Context was terminated    然后再弹出 该内存不能读的错误.

很可能我上面的办法用错了,所以得不到正确结果, 但我查了很多资料,自己还是不能解决 所以求教各位,真心感谢!

附namwonsta版主订阅者代码:

/*
气象信息客户端
连接SUB套接字至tcp://*:5556端点
收集指定邮编的气象信息,并计算平均温度

作者:namwonsta ( http://bbs.aardio.com )
*/


import zeromq;

io.open()
start =
function(filter,tcp){
   
var context = zeromq.context()
   
   
//用于和服务端通信的套接字
    io.print("Collecting updates from weather server...")
   
var subscriber = context.zmq_socket_sub()
    subscriber.connect( tcp :
"tcp://localhost:5556" )
   
   
//设置订阅信息,默认为纽约,邮编10001
    filter := {byte value[] = "10001 "}
    subscriber.setsockopt(6
/*_ZMQ_SUBSCRIBE*/,filter)
   
   
//处理100条更新信息
    var updateNum = 100;
   
var totalTemp = 0;
   
for(i=1;updateNum){
        
//var msg = subscriber.recv()
        var msg = subscriber.recv(,,1/*_ZMQ_NOBLOCK*/)
        
var zipcode, temperature, relhumidity = string.match(msg,"(\d*) ([\d-]*) (\d*)")
        io.print(i,temperature)
        totalTemp += temperature;
    }
    io.print(string.format(
"Average temperature for zipcode '%s' was %dF, total = %d",
        filter.value, (totalTemp / updateNum), totalTemp))
   
    subscriber.close();
    context.term();
}

start()



/**
气象信息更新服务
绑定PUB套接字至tcp://*:5556端点
发布随机气象信息

作者:namwonsta ( http://bbs.aardio.com )
**/


import zeromq;

//准备上下文和PUB套接字
var context = zeromq.context()
var publisher = context.zmq_socket_pub()
publisher.bind(
"tcp://*:5556")
//publisher.bind("ipc://weather.ipc")

//初始化随机数生成器
math.randomize(time.tick())
while(1){
   
//获取数据,忽悠下老板
    var zipcode = math.random(10000, 11000)
   
var temperature = math.random(-80, 135)
   
var relhumidity = math.random(10, 60)
   
   
//向所有订阅者发送消息
    publisher.sendMsg( string.format("%05d %d %d", zipcode, temperature, relhumidity) )
}

publisher.close()
context.term()




回复

使用道具 举报

26

主题

104

帖子

644

积分

荣誉会员

Rank: 8Rank: 8

积分
644
 楼主| 发表于 2012-2-18 00:44:11 | 显示全部楼层

非阻塞参数_ZMQ_NOBLOCK 正确使用方法 我找到了 还是namwonsta版主的例子(msreader.aau) var msg = zeromq.m

非阻塞参数_ZMQ_NOBLOCK 正确使用方法 我找到了 还是namwonsta版主的例子(msreader.aau)
var msg = zeromq.message()
var ret = subscriber.recvMsg(msg,1/*_ZMQ_NOBLOCK*/);

附完整例子天气订阅者 非阻塞接收数据

import zeromq;

io.open()
start = function(filter,tcp){
        var context = zeromq.context()
       
        //用于和服务端通信的套接字
        io.print("Collecting updates from weather server...")
        var subscriber = context.zmq_socket_sub()
        subscriber.connect( tcp : "tcp://localhost:5556" )
       
        //设置订阅信息,默认为纽约,邮编10001
        filter := {byte value[] = "10001 "}
        subscriber.setsockopt(6/*_ZMQ_SUBSCRIBE*/,filter)
//        var ret = subscriber.setsockopt(0x11/*_ZMQ_LINGER*/,{int value = 0})
        io.print(ret);
       
        //处理100条更新信息
        var updateNum = 100;
        var totalTemp = 0;
        for(i=1;updateNum){
                var msg = zeromq.message()
                var ret = subscriber.recvMsg(msg,1/*_ZMQ_NOBLOCK*/);
                if(ret){
                        var str = ..raw.tostring( msg.getData(),1,msg.getSize())
                        msg.close();
                        var zipcode, temperature, relhumidity = string.match(str,"(\d*) ([\d-]*) (\d*)")
                        io.print(i,temperature)
                        totalTemp += temperature;
                }
                sleep(5)//加点延迟
        }
        io.print(string.format("Average temperature for zipcode '%s' was %dF, total = %d",
            filter.value, (totalTemp / updateNum), totalTemp))
       
        subscriber.close();
        context.term();
}

start()


感谢namwonsta版
回复

使用道具 举报

42

主题

128

帖子

927

积分

荣誉会员

Rank: 8Rank: 8

积分
927
发表于 2016-9-13 08:39:59 | 显示全部楼层

[i=s] 本帖最后由 cad 于 2016-9-13 08:42 编辑 [/i] 服务端一开起CPU就暴涨了,如果在循环中加了sleep则收不到消息

本帖最后由 cad 于 2016-9-13 08:42 编辑

服务端一开起CPU就暴涨了,如果在循环中加了sleep则收不到消息


  1. import zeromq;

  2. //准备上下文和PUB套接字
  3. var context = zeromq.context()
  4. var publisher = context.zmq_socket_pub()
  5. publisher.bind("tcp://*:5556")
  6. //publisher.bind("ipc://weather.ipc")

  7. //初始化随机数生成器
  8. math.randomize(time.tick())
  9. while(1){
  10.     //获取数据,忽悠下老板
  11.     var zipcode = math.random(10000, 11000)
  12.     var temperature = math.random(-80, 135)
  13.     var relhumidity = math.random(10, 60)
  14.    
  15.     //向所有订阅者发送消息
  16.     publisher.sendMsg( string.format("%05d %d %d", zipcode, temperature, relhumidity) )
  17.     sleep(1) //这里加了收不到消息,不加CPU暴涨
  18. }

  19. publisher.close()
  20. context.term()
复制代码
回复

使用道具 举报

42

主题

128

帖子

927

积分

荣誉会员

Rank: 8Rank: 8

积分
927
发表于 2016-9-13 09:43:42 | 显示全部楼层

do{ start() }while(true) 客户端最后改进成这样就可以了

do{
   start()
}while(true)

客户端最后改进成这样就可以了
回复

使用道具 举报

42

主题

128

帖子

927

积分

荣誉会员

Rank: 8Rank: 8

积分
927
发表于 2016-9-13 22:32:41 | 显示全部楼层

[i=s] 本帖最后由 cad 于 2016-9-13 22:37 编辑 [/i] 精简了一下,给有需要的朋友 服务端 [code][lang=a

本帖最后由 cad 于 2016-9-13 22:37 编辑

精简了一下,给有需要的朋友

服务端


  1. io.open()

  2. import zeromq;

  3. var context = zeromq.context()
  4. var publisher = context.zmq_socket_pub()
  5. publisher.bind("tcp://*:5556")

  6. while(1){  
  7.     // 222 是消息类型, 777 是消息内容
  8.    publisher.sendMsg( "222" ++ "777" )
  9.    sleep(1)
  10. }

  11. publisher.close()
  12. context.term()

复制代码



客户端阻塞


  1. import zeromq;

  2. io.open()

  3. var context = zeromq.context()
  4. var subscriber = context.zmq_socket_sub()
  5. subscriber.connect( tcp : "tcp://localhost:5556" )

  6. // 222 是客户端需要的消息类型
  7. subscriber.setsockopt(6/*_ZMQ_SUBSCRIBE*/, {byte msgType[] = "222"} )               

  8. do{
  9.         var str = subscriber.recv()       
  10.         io.print("客户端收到:", str)
  11.         sleep(1000)
  12. }while(true)

  13. subscriber.close();
  14. context.term();
复制代码



客户端非阻塞


  1. import zeromq;

  2. io.open()

  3. var context = zeromq.context()
  4. var subscriber = context.zmq_socket_sub()
  5. subscriber.connect( tcp : "tcp://localhost:5556" )

  6. // 222 是客户端需要的消息类型
  7. subscriber.setsockopt(6/*_ZMQ_SUBSCRIBE*/, {byte msgType[] = "222"} )               

  8. do{
  9.          var msg = zeromq.message()
  10.      var ret = subscriber.recvMsg(msg,1/*_ZMQ_NOBLOCK*/);
  11.      if( ret ){
  12.              io.print("客户端收到:",  msg.getString() )
  13.      }       
  14.         sleep(1000)
  15. }while(true)

  16. subscriber.close();
  17. context.term();
复制代码
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 注册会员

本版积分规则

手机版|未经许可严禁引用或转载本站文章|站长邮箱|aardio.com|aardio官方社区 ( 皖ICP备09012014号 )

GMT+8, 2018-12-16 15:43 , Processed in 0.062500 second(s), 22 queries .

Powered by Discuz! X3.4

© 2001-2017 Comsenz Inc.

快速回复 返回顶部 返回列表