自己创建一个网站需要多少钱,湛江模板建站系统,情侣博客网站模板下载,42区 网站开发指南原理
触发器监控工作流实例表#xff0c;当工作流实例表中的状态更新后#xff0c;针对状态为失败的任务进行企业微信告警。
发送企业微信消息函数
# 必须在pg的主机上线安装requests模块
pip install requests
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U po…原理
触发器监控工作流实例表当工作流实例表中的状态更新后针对状态为失败的任务进行企业微信告警。
发送企业微信消息函数
# 必须在pg的主机上线安装requests模块
pip install requests
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 创建插件plpython3u
create extension plpython3u;
# plpython3u为不受信语言所以只能被超级用户使用
# 在tool模式下建立发送企业微信消息函数tool.sp_send_wechat
CREATE OR REPLACE FUNCTION tool.sp_send_wechat(message json, webhook character varying DEFAULT https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key你自己的key::character varying)RETURNS textLANGUAGE plpython3uSECURITY DEFINER
AS $function$
import requests
import json/** 作者 : v-yuzhenc* 功能 : 给企业微信发送一条消息* message : 需要发送的消息json格式* webhook : 企业微信机器人的webhook* */import requests
import json# 企业微信自定义机器人的webhook地址
p_webhook webhook
# 要发送的消息内容
p_message json.loads(message)
# 发送POST请求
response requests.post(p_webhook, datajson.dumps(p_message), headers{Content-Type: application/json})# 打印响应结果
return response.text
$function$
;
--将函数直接转给tool
ALTER FUNCTION tool.sp_send_wechat(json, varchar) OWNER TO tool;
--公开函数的执行权限
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO public;
--将函数的执行权限授权给tool用户
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO tool;
\q企业微信告警触发器
由于企业微信markdown格式的消息艾特指定的人只能通过企业微信中的userid(即用户在企业微信中的账号)调用所以我们在海豚调度的元数据表t_ds_user中增加wechat_userid字段人工将海豚的用户对应的企业微信的userid维护上去
# 以dp用户登录etl数据库
psql etl -U dp
# 增加字段
alter table t_ds_user add wechat_userid varchar(100);
comment on column t_ds_user.wechat_userid is 对应的企业微信的userid;
# 维护wechat_userid中的数据
# 这里根据自己的企业实际情况做
update t_ds_user
set wechat_userid YuZhenChao
where user_name yuzhenchao
;
# 创建触发器函数dp.tg_ds_udef_alert_wechat
CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_wechat()RETURNS triggerLANGUAGE plpgsql
AS $function$
/** 作者v-yuzhenc* 功能海豚调度工作流失败自动告警* */
declarei record;v_mobile varchar;v_content text;v_message varchar;
beginif new.state in (4,5,6) then for i in (select||d.wechat_userid||\r\n# [DolphinScheduler Job ]\r\n 实例 id : [||a.id::varchar||/||b.id||](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/||g.code||/workflow/instances/||a.id||?code||a.process_definition_code||)\r\n 项目名称 : font color\comment\||g.name||(||g.code||)/font||\r\n 工作流名 : font color\comment\||e.name||(||a.process_definition_code||)/font||\r\n 任务名称 : font color\comment\||b.name||(||b.task_code||)/font||\r\n 任务类型 : font color\comment\||b.task_type||/font\r\n 开始时间 : font color\comment\||to_char(b.start_time,yyyy-mm-dd hh24:mi:ss)||/font\r\n 结束时间 : font color\comment\||to_char(b.end_time,yyyy-mm-dd hh24:mi:ss)||/font\r\n 任务状态 : font color\warning\执行失败/font||\r\n 所属用户 : font color\comment\||d.user_name||(||c.user_id||)/font as wechat_content,d.phonefrom t_ds_process_instance a inner join t_ds_task_instance b on (a.id b.process_instance_id)inner join t_ds_task_definition c on (b.task_code c.code and b.task_definition_version c.version)inner join t_ds_user d on (c.user_id d.id)inner join t_ds_process_definition e on (a.process_definition_code e.code and a.process_definition_version e.version)inner join t_ds_project g on (e.project_code g.code)where c.task_type SUB_PROCESSand a.state 6and b.state 6and a.id new.id) loop v_mobile : i.phone;v_content : i.wechat_content;v_message : $v_message${msgtype:markdown,markdown: {content:$v_message$||v_content||$v_message$}
}$v_message$;--告警perform tool.sp_send_wechat(v_message::json);end loop;end if;return new;
end;
$function$
;
--授权给dp
ALTER FUNCTION dp.tg_ds_udef_alert_wechat() OWNER TO dp;
GRANT ALL ON FUNCTION dp.tg_ds_udef_alert_wechat() TO dp;
# 创建时候触发器
create trigger tg_state_ds_process_instance after update on dp.t_ds_process_instance for each row execute function dp.tg_ds_udef_alert_wechat();
\q测试
新建一个工作流选择SQL组件 保存工作流 上线工作流并运行工作流 工作流运行失败 随即企业微信来了消息提醒