当前位置: 首页 > news >正文

免费网站建设哪家好成人短期电脑培训班学费

免费网站建设哪家好,成人短期电脑培训班学费,无上升级系统,uc做购物网站引用的是rabbitMQ官方示例的库:github.com/rabbitmq/amqp091-go在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的"代价"都是很高的,所以就要去实现如何复用这些连接,并要做到高效并可靠。预期效果:项目初始化…

引用的是rabbitMQ官方示例的库:github.com/rabbitmq/amqp091-go

在网络编程中我们知道tcp连接的创建、交互、销毁等相关操作的"代价"都是很高的,所以就要去实现如何复用这些连接,并要做到高效并可靠。

预期效果:

项目初始化构建时可以自定义选择生产者开启多个connection,每个connection可以启动多少个channel【都是全局复用的】,因为rabbitMQ所有的命令都是基本都是通过channel去操作完成的,所以这个channel很重要,也是我们想要复用的重点。

初始化创建完connection和channel后,当生产者需要发送一条消息的时候,我们可以通过一些策略去选择它发送到哪个connection和channel,我这里采用的就是随机选择,也可以采用哈希取模、轮询权重算法等,这个可以根据自身业务来做。

我简单画了一个效果图:

定义RabbitMQ结构体以及Config结构体

type Config struct {Host     stringPort     intUser     stringPassword string
}type RabbitMQ struct {ctx     context.Contextn       intm       *sync.MutexConn    *amqp.ConnectionChannel []*amqp.Channel
}

实例化RabbitMQ结构体

func (mq *RabbitMQ) New(config Config) (rabbitmq *RabbitMQ) {configString := fmt.Sprintf("amqp://%s:%s@%s:%d/", config.User, config.Password, config.Host, config.Port)conn, err := amqp.Dial(configString)if err != nil {log.Panicf("amqp connect error: %v \n", err)}rabbitmq = &RabbitMQ{ctx:  context.Background(),m:    &sync.Mutex{},Conn: conn,}return
}

一、创建消费者

// ConsumeWithWork rabbitmq消费消息[work模式 channelNums可以设置当前连接开启多少个channel]
func (mq *RabbitMQ) ConsumeWithWork(queueName string, channelNums int) {for i := 0; i < channelNums; i++ {go func(i int) {ch, err := mq.Conn.Channel()if err != nil {log.Panicf("amqp open a channel error: %v \n", err)}q, err := ch.QueueDeclare(queueName, // nametrue,      // durablefalse,     // delete when unusedfalse,     // exclusivefalse,     // no-waitnil,       // arguments)if err != nil {log.Panicf("amqp declare a queue error: %v \n", err)}err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global)if err != nil {log.Panicf("amqp set QoS error: %v \n", err)}msg, err := ch.Consume(q.Name, // queue"",     // consumerfalse,  // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)if err != nil {log.Panicf("amqp register a consumer error: %v \n", err)}log.Printf(" [work-%d] Waiting for messages. To exit press CTRL+C", i)for d := range msg {time.Sleep(2 * time.Second)fmt.Printf("[work-%d] Received a message: %s \n", i, d.Body)err = d.Ack(false)if err != nil {log.Printf("work_one Ack Err: %v", err)}}}(i)}var forever chan struct{}<-forever
}

二、创建生产者组

// NewPlusherGroups 创建生产者组
func NewPlusherGroups(config Config, connNums, channelNums int) (plusherGroups map[int]*RabbitMQ) {plusherGroups = make(map[int]*RabbitMQ, connNums)for i := 0; i < connNums; i++ {var rabbitmq *RabbitMQrabbitmq = rabbitmq.New(config)rabbitmq.n = ifor cN := 0; cN < channelNums; cN++ {ch, err := rabbitmq.Conn.Channel()if err != nil {log.Panicf("amqp open a channel error: %v \n", err)}rabbitmq.Channel = append(rabbitmq.Channel, ch)}plusherGroups[i] = rabbitmq}return
}

三、将消息随机分发给不同的connection、channel

// SendMessageWithWork 生产者发送消息[work模式+(many conn and many channel)]
func SendMessageWithWork(plusherGroups map[int]*RabbitMQ, queueName, body string) bool {if plusherGroups == nil {log.Panicln("SendMessageWithWork plusherGroups params is nil!")}rand.Seed(time.Now().UnixNano())//获取连接个数connNums := len(plusherGroups)//随机分配一个连接对象randConnIndex := rand.Intn(connNums)//选择随机分配的连接对象conn := plusherGroups[randConnIndex]//既然采用了发布者复用conn、channel的形式那么一定要加锁处理//这里为每个对象的操作进行加锁(非线程安全,不加锁会报错的)//至于在存在并发竞争的情况下会存在一定性能损耗,但是我们配置好适量的conn和channel这个基本可以忽略conn.m.Lock()defer conn.m.Unlock()//获取当前对象的channel个数channelNums := len(conn.Channel)//随机分配一个channel对象randChannelIndex := rand.Intn(channelNums)//选择随机分配的channelch := conn.Channel[randChannelIndex]q, err := ch.QueueDeclare(queueName, // nametrue,      // durablefalse,     // delete when unusedfalse,     // exclusivefalse,     // no-waitnil,       // arguments)if err != nil {log.Panicf("amqp declare a queue error: %v \n", err)}body = fmt.Sprintf("conn[%d] channel[%d] send message : %s", randConnIndex, randChannelIndex, body)err = ch.PublishWithContext(conn.ctx,"",     // exchangeq.Name, // routing keyfalse,  // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte(body),})if err != nil {log.Panicf("amqp publish a message error: %v \n", err)}return true
}

四、main函数调用消费者

package mainimport (rabbitmq "go-test/rabbitmq/package"
)func main()  {queueName := "task_queue"config := rabbitmq.Config{Host: "192.168.6.103",Port: 5672,User: "root",Password: "root",}var mq *rabbitmq.RabbitMQmq = mq.New(config)//开启N个消费者mq.ConsumeWithWork(queueName, 3)
}

五、main函数调用生产者组发送消息

package mainimport ("fmt""github.com/gin-gonic/gin"rabbitmq "go-test/rabbitmq/package""net/http""time"
)func main()  {var messageNo intqueueName := "task_queue"config := rabbitmq.Config{Host: "192.168.6.103",Port: 5672,User: "root",Password: "root",}//conn连接数connNums := 2//channel连接数channelNums := 3//启动N个不同conn的连接,并且每个连接对应的channel为N个的rabbitmq实例plusherGroup := rabbitmq.NewPlusherGroups(config, connNums, channelNums)e := gin.Default()e.GET("/", func(c *gin.Context) {body := fmt.Sprintf("这是第%d条消息...", messageNo)if rabbitmq.SendMessageWithWork(plusherGroup, queueName, body) == true {messageNo++c.JSON(200, gin.H{"code": 200,"msg": "success",})} else {c.JSON(200, gin.H{"code": 500,"msg": "error",})}})server := &http.Server{Addr:         ":18776",Handler:      e,ReadTimeout:  time.Minute,WriteTimeout: time.Minute,}if err := server.ListenAndServe(); err != nil {panic(any("HttpServer启动失败"))}
}

执行流程:

  1. 启动消费者进程

可以看到我们用3个协程开启了3个work,也就是对应了3个channel

  1. 启动生产者组进程

这里用的gin框架,正常启动

我们可以看到rabbitMQ的控制台中,一共3个连接,1个是消费者进程,另外2个是生产者组进程,这2个正好和我们上面配置的connNums参数匹配

我们可以看到rabbitMQ的控制台中,一共9个channel,3个是消费者进程,另外6个是生产者组进程,这6个正好和我们上面配置的channelNums参数匹配

  1. 调用发送消息

ab.exe -n 1000 -c 1000 http://127.0.0.1:18776/

我们来看消费者日志打印情况,标红的可以证明我们在发送消息时让生产者根据我们的随机分配策略选择connection和channel

http://www.shuangfujiaoyu.com/news/53392.html

相关文章:

  • 做网站需要服务器北京关键词优化服务
  • 用那个程序做网站收录好百度在线搜索
  • wordpress导出淘宝优化大师手机版下载
  • 专业做邯郸网站优化学seo哪个培训好
  • 全网营销的渠道陕西整站关键词自然排名优化
  • 唐河县住房和城乡建设局网站自媒体平台有哪些
  • 住房和城乡建设查询平台seo用什么论坛引流
  • 学做淘宝店的网站吗网站制作策划书
  • 做网站 模板外贸推广平台哪个好
  • http做轮播图网站seo网站推广培训
  • 石家庄专业网站建设世界球队最新排名榜
  • 张店网站建设价seo优化服务公司
  • 网站建设发好处漯河网站推广公司
  • 怎样做微信网站郑州做网站最好的公司
  • 老网站绑定新网站如何做?深圳app推广平台
  • 天猫秘券网站怎么做比百度好用的搜索软件
  • 做网站学哪个语言最好seo网站优化服务
  • 网站升级改版需要多久推广公众号的9种方法
  • 网站服务器租用时间网络推广100种方法
  • 如何建设黄色网站韶关新闻最新今日头条
  • 阿里妈妈新建网站怎么做太原seo代理商
  • 淘宝上开个网站建设包头整站优化
  • 掌握cms建设网站实训报告东莞seo建站哪家好
  • 阜宁做网站价格西安专业做网站公司
  • 升级wordpress 另一更新正在进行seo的方法有哪些
  • 开发app下载营销排名seo
  • 网站后台栏目根据什么做的seo培训机构
  • 深圳网站建设设计科技有限公司网站运营一个月多少钱
  • 网站开发的验收标准seowhy培训
  • 大连手机自适应网站制作价格怎么才能在百度上做引流呢