使用azkaban 建立一个简单的任务调度系统
azkaban 可以作为hadoop 任务的调度工具,也可以作为shell任务和java任务的调度工具。安装过程有点繁琐,见安装文档。
Job Type
见链接
任务DAG
通过任务之间的依赖关系(dependencies)构建DAG
下面是一个 upload.job 的例子, 依赖 report-en.job
type=java#指定类的全路径job.class=com.example.demo.task.BbUploadJob#指定执行jar包的路径classpath=lib/*#依赖任务dependencies=report-en#jvm 参数Xmx=512M# 自定义参数batch.timestamp=${azkaban.flow.start.timestamp}
schedule
azkaban 目前包含Quartz,支持 Cron 表达式
java job
JavaJob 目前需要配置Hadoop,但是可以通过修改源代码来去掉 Hadoop 的依赖关系,参考安装过程。
Java Job template
- run 相当于 main 方法,任务代码写在这里
- cancel 在 run 方法出现 Exception 之后调用,任务失败后处理
- getJobGeneratedProperties 是输出的参数,用于给下一个任务传递参数
import azkaban.utils.Props;import org.slf4j.Logger;import org.slf4j.LoggerFactory;public class JavaJob1 { private static final Logger logger = LoggerFactory.getLogger(JavaJob1.class); private Props props; public JavaJob1(String name, Props props) { this.props = props; } public void run() { String timestamp = props.getString("azkaban.flow.start.timestamp"); logger.info("timestamp value is ==> " + timestamp); } public void cancel(){ } public Props getJobGeneratedProperties(){ Props props = new Props(); return props; }}
java job 中使用 Spring
在JavaJob 中可以使用Spring,和普通的Java代码一模一样
this.classPathXmlApplicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");this.reportService = (ReportService) this.classPathXmlApplicationContext.getBean("reportService");
其他问题
邮件通知
azkaban 原生就支持通知功能,在安装的时候配置smtp服务器,在job文件中里配置failure.emails, success.emails, notify.emails 来通知任务执行情况(多个邮箱地址用逗号分隔)
任务之间参数传递
azkaban支持任务之间传递参数,A任务可以向依赖A的任务B传递参数。实际上是通过读写临时文件来实现这个功能。
System.getenv("JOB_OUTPUT_PROP_FILE") // 任务输出的参数文件System.getenv("JOB_PROP_FILE") // 任务初始化的参数文件
B任务初始化的参数文件中会包含A任务输出的参数,这是azkaban帮我们做的。
JavaJob 更加方便,只要在Job中增加一个getJobGeneratedProperties方法,返回Props对象,然后B任务可以直接在初始化的Props中读取到这个参数。
public Props getJobGeneratedProperties(){ Props props = new Props(); props.put("demo.test.arg1", "Hello World!"); return props;}