博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
golang 实现延迟消息原理与方法
阅读量:6608 次
发布时间:2019-06-24

本文共 3214 字,大约阅读时间需要 10 分钟。

实现延迟消息具体思路我是看的下面这篇文章

https://mp.weixin.qq.com/s/eDMV25YqCPYjxQG-dvqSqQ

实现延迟消息最主要的两个结构:

环形队列:通过golang中的数组实现,分成3600个slot。

任务集合:通过map[key]*Task,每个slot一个map,map的值就是我们要执行的任务。

原理图如下:

实现代码如下:

package main;import (	"time"	"errors"	"fmt")//延迟消息type DelayMessage struct {	//当前下标	curIndex int;	//环形槽	slots [3600]map[string]*Task;	//关闭	closed chan bool;	//任务关闭	taskClose chan bool;	//时间关闭	timeClose chan bool;	//启动时间	startTime time.Time;}//执行的任务函数type TaskFunc func(args ...interface{});//任务type Task struct {	//循环次数	cycleNum int;	//执行的函数	exec   TaskFunc;	params []interface{};}//创建一个延迟消息func NewDelayMessage() *DelayMessage {	dm := &DelayMessage{		curIndex:  0,		closed:    make(chan bool),		taskClose: make(chan bool),		timeClose: make(chan bool),		startTime: time.Now(),	};	for i := 0; i < 3600; i++ {		dm.slots[i] = make(map[string]*Task);	}	return dm;}//启动延迟消息func (dm *DelayMessage) Start() {	go dm.taskLoop();	go dm.timeLoop();	select {	case <-dm.closed:		{			dm.taskClose <- true;			dm.timeClose <- true;			break;		}	};}//关闭延迟消息func (dm *DelayMessage) Close() {	dm.closed <- true;}//处理每1秒的任务func (dm *DelayMessage) taskLoop() {	defer func() {		fmt.Println("taskLoop exit");	}();	for {		select {		case <-dm.taskClose:			{				return;			}		default:			{				//取出当前的槽的任务				tasks := dm.slots[dm.curIndex];				if len(tasks) > 0 {					//遍历任务,判断任务循环次数等于0,则运行任务					//否则任务循环次数减1					for k, v := range tasks {						if v.cycleNum == 0 {							go v.exec(v.params...);							//删除运行过的任务							delete(tasks, k);						} else {							v.cycleNum--;						}					}				}			}		}	}}//处理每1秒移动下标func (dm *DelayMessage) timeLoop() {	defer func() {		fmt.Println("timeLoop exit");	}();	tick := time.NewTicker(time.Second);	for {		select {		case <-dm.timeClose:			{				return;			}		case <-tick.C:			{				fmt.Println(time.Now().Format("2006-01-02 15:04:05"));				//判断当前下标,如果等于3599则重置为0,否则加1				if dm.curIndex == 3599 {					dm.curIndex = 0;				} else {					dm.curIndex++;				}			}		}	}}//添加任务func (dm *DelayMessage) AddTask(t time.Time, key string, exec TaskFunc, params []interface{}) error {	if dm.startTime.After(t) {		return errors.New("时间错误");	}	//当前时间与指定时间相差秒数	subSecond := t.Unix() - dm.startTime.Unix();	//计算循环次数	cycleNum := int(subSecond / 3600);	//计算任务所在的slots的下标	ix := subSecond % 3600;	//把任务加入tasks中	tasks := dm.slots[ix];	if _, ok := tasks[key]; ok {		return errors.New("该slots中已存在key为" + key + "的任务");	}	tasks[key] = &Task{		cycleNum: cycleNum,		exec:     exec,		params:   params,	};	return nil;}func main() {	//创建延迟消息	dm := NewDelayMessage();	//添加任务	dm.AddTask(time.Now().Add(time.Second*10), "test1", func(args ...interface{}) {		fmt.Println(args...);	}, []interface{}{1, 2, 3});	dm.AddTask(time.Now().Add(time.Second*10), "test2", func(args ...interface{}) {		fmt.Println(args...);	}, []interface{}{4, 5, 6});	dm.AddTask(time.Now().Add(time.Second*20), "test3", func(args ...interface{}) {		fmt.Println(args...);	}, []interface{}{"hello", "world", "test"});	dm.AddTask(time.Now().Add(time.Second*30), "test4", func(args ...interface{}) {		sum := 0;		for arg := range args {			sum += arg;		}		fmt.Println("sum : ", sum);	}, []interface{}{1, 2, 3});	//40秒后关闭	time.AfterFunc(time.Second*40, func() {		dm.Close();	});	dm.Start();}

测试结果如下:

转载地址:http://ziiso.baihongyu.com/

你可能感兴趣的文章
Github-Client(ANDROID)开源之旅(二) ------ 浅析ActionBarSherkLock
查看>>
no identities are available for signing
查看>>
javascript 和 jquery插件开发
查看>>
Linux Shell文件差集
查看>>
eclipse中如何去除警告:Class is a raw type. References to generic type Class<T> should be parameterized...
查看>>
Gradle脚本基础全攻略
查看>>
Django模版中的过滤器详细解析 Django filter大全
查看>>
Linux中使用pwconv实现passwd中密码到shadow
查看>>
MongoDB C++ gridfs worked example
查看>>
Visual Studio 2017各版本安装包离线下载
查看>>
C#线程安全的那些事
查看>>
【论文笔记】Social Role-Aware Emotion Contagion in Image Social Networks
查看>>
rpm安装PostgreSQL
查看>>
k sum(lintcode)
查看>>
28. extjs中Ext.BLANK_IMAGE_URL的作用
查看>>
Hibernate注解配置N:N关联
查看>>
Android 控件属性
查看>>
【244】◀▶IEW-Unit09
查看>>
处理有外键约束的数据
查看>>
par函数的xaxt函数-控制x轴刻度的显示
查看>>