Go语言dolphinscheduler任务调度处理
简介
dolphinscheduler是一个可视化DAG工作流任务调度平台,在大数据领域做任务调用非常流行
提供了类似azkaban工作流调度,比azkaban更强的可视化DAG,支持大数据领域flink,spark,shell,python,java,scala,http等各种类型任务
官网传送门: https://dolphinscheduler.apache.org/zh-cn/
自动化
为什么需要自动化任务处理,当你的dolphinscheduler有几百上千个任务,管理是非常耗时的,如果每个任务都配置邮件告警,那一有问题整天都在救火
此时就需要任务结果监控和任务重跑来解决 失败任务和任务自动重跑,避免浪费过多时间在维护dolphinscheduler任务上
使用
在调用api之前需要为用户申请token,按图操作
dolphinscheduler提供类似swagge接口UI工具,访问doc地址访问
http://ip:12345/dolphinscheduler/doc.html?language=zh_CN&lang=cn
例子
该demo还是使用了http请求包(HttpRequest),json数据搜索包(go-jmespath)
任务结果检查
填坑说明
- 日期处理: 使用了%20转译空格,使用Sprintf方法拼接字符串
- 多种数据类型: 使用interface{}来支持int,string等多种数据类型
- 数据转换1: 将byte数据转成json格式,方便搜索
- 数据转换2: 将interface{}数据转成字符串切片,方便使用
该方法可以做成周期性任务运行,将失败的job查出来,后续是要告警通知,还是根据job名称查出对应id进行重跑任务
package main import ( "encoding/json" "fmt" "github.com/jmespath/go-jmespath" "github.com/kirinlabs/HttpRequest" "time" ) var ( url = "http://ip:12345/dolphinscheduler" token = "xxxxxxx" req *HttpRequest.Request ) func init() { req = HttpRequest.NewRequest().Debug(true).SetTimeout(time.Second*5). SetHeaders(map[string]string{ "token":token, }) } func main() { //testConn() jobCheck() } func jobCheck() { //获取日期 today := time.Now().Format("2006-01-02") tomorrow := time.Now().AddDate(0, 0, +1).Format("2006-01-02") //拼接日期 %20是空格的转译 fmt.Println(fmt.Sprintf("%v%v",today,"%2000:00:00")) fmt.Println(fmt.Sprintf("%v%v",tomorrow,"%2000:00:00")) //需要检查的项目名称 projects := []string{"jdOrder","jdPlay"} //需要检查的时间段 页码是int类型,日期是string类型 m := make(map[string]interface{}) m["pageNo"] = 1 m["pageSize"] = 22 m["stateType"] = "FAILURE" m["startDate"] = fmt.Sprintf("%v%v",today,"%2000:00:00") m["endDate"] = fmt.Sprintf("%v%v",tomorrow,"%2000:00:00") for _, project := range projects { resp, _ := req.Get(url+"/projects/"+project+"/task-instance/list-paging",m) if resp.StatusCode() != 200 { fmt.Println("job检查状态码不符期望: ",resp.StatusCode()) return } fmt.Println("resp",resp) //将返回数据从byte转成json格式 body, _ := resp.Body() var i interface{} var s []string _ = json.Unmarshal(body, &i) //搜索出需要的字段对应数据 processInstanceNames, _ := jmespath.Search("data.totalList[*].processInstanceName", i) //将interface转成[]string for _,v := range processInstanceNames.([]interface{}) { s = append(s,v.(string)) } //打印出结果 for _,v := range s { fmt.Println(v) } } }
测试连接
如果上小节任务跑不成功,可以先运行该方法,测试连接正确性
func testConn() { resp, _ := req.Get(url + "/projects/query-project-list") fmt.Println("resp",resp) body, _ := resp.Body() var i interface{} _ = json.Unmarshal(body, &i) fmt.Println("i",i) }
重跑任务
重跑任务其实就是再次启动任务,直接调用start_job既可
项目名称和ID需要通过该接口获取,这个是固定的
http://ip:12345/dolphinscheduler/projects/monitor/process/list-paging
调用示例: startJob("ads_jd_order",678)
func startJob(projectName string,projectId int) { m := make(map[string]interface{}) m["failureStrategy"] = "CONTINUE" m["warningGroupId"] = 0 m["warningType"] = "NONE" m["runMode"] = "RUN_MODE_SERIAL" m["processInstancePriority"] = "MEDIUM" m["workerGroup"] = "default" m["processDefinitionId"] = projectId resp, _ := req.JSON().Post(url+"projects/" + projectName+"/executors/start-process-instance",m) if resp.StatusCode() != 200 { fmt.Println("job开始状态码不符期望: ",resp.StatusCode()) return } }
小结
dolphinscheduler api调用有文档,不太复杂,但网上资料较少,需要自行摸索,以上就是Go语言dolphinscheduler任务调度处理的详细内容,更多关于Go语言dolphinscheduler任务调度的资料请关注猪先飞其它相关文章!
原文出处:https://juejin.cn/post/7035286827703468063
相关文章
- 在程序员中,尤其是go新手,经常听到的一个讨论话题是:如何处理错误,这篇文章主要给大家介绍了关于Go应用中优雅处理Error的一些相关技巧,需要的朋友可以参考下...2021-09-08
Django def clean()函数对表单中的数据进行验证操作
这篇文章主要介绍了Django def clean()函数对表单中的数据进行验证操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-07-09- 这篇文章主要介绍了golang官方嵌入文件到可执行程序,本文通过示例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-02-20
- 这篇文章主要介绍了go浮点数转字符串保留小数点后N位解决办法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2020-05-11
- 这篇文章主要介绍了Go语言使用读写OPC详解,图文讲解的很清晰,有感兴趣的同学可以学习下...2021-03-05
- 这篇文章主要介绍了Go项目的目录结构,对基础目录做了讲解,对项目开发中的其它目录也一并做了介绍,需要的朋友可以参考下...2020-05-01
- string与[]byte经常需要互相转化,普通转化会发生底层数据的复制,下面这篇文章主要给大家介绍了关于Go中string与[]byte高效互转的相关资料,文中通过示例代码介绍的非常详细,需要的朋友可以参考下...2021-09-20
- Go 语言提供的基础容器,免不了要查询容器中的数据,那么是如何实现遍历的呢?本文将会介绍几种常用容易的遍历及其使用。感兴趣的可以了解一下...2021-06-13
- 这篇文章主要介绍了创建第一个Go语言程序Hello,Go!本文详细的给出项目创建、代码编写的过程,同时讲解了GOPATH、Go install等内容,需要的朋友可以参考下...2020-05-01
- 这篇文章主要介绍了在Django中使用MQTT的方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2021-05-10
- 这篇文章主要介绍了go语言中的Carbon库时间处理,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下...2021-02-05
- 这篇文章主要介绍了go嵌套匿名结构体的初始化详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-12-16
解决导入django_filters不成功问题No module named 'django_filter'
这篇文章主要介绍了解决导入django_filters不成功问题No module named 'django_filter',具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-07-15详解如何使用Docker部署Django+MySQL8开发环境
这篇文章主要介绍了详解如何使用Docker部署Django+MySQL8开发环境,文中通过示例代码以及图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧...2020-07-19- 本文主要介绍了Django项目连接MongoDB的三种方法,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...2021-09-27
- 这篇文章主要介绍了Go语言json编码驼峰转下划线、下划线转驼峰的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧...2020-06-09
- 这篇文章主要介绍了Django 解决由save方法引发的错误,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-05-21
- 这篇文章主要介绍了django前端页面下拉选择框默认值设置方式,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-08-10
- 这篇文章主要介绍了django数据模型中null和blank的区别说明,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2020-09-03
- 这篇文章主要介绍了Go 自定义package包设置与导入操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...2021-05-06