DataStage JOB 开发指导说明 - 图文

更新时间:2024-01-23 00:43:01 阅读量: 教育文库 文档下载

说明:文章内容仅供预览,部分内容可能不全。下载后的文档,内容与下面显示的完全一致。下载之前请确认下面内容是否您想要的,是否完整无缺。

1. 概述

本文档是用来对ETL 开发人员初步熟悉整个基于DataStage 平台进行ETL 开发指导手册。

整个DS 处理的过程如下:

从一张业务系统数据源表到DW模型目的表的数据流处理在开发CODING中一般分4个JOB,包含一个调度JOB。

? 第一个JOB(XXXX_DS10CD)用于队连续2天的数据源表进行比较和记录的

?

筛选(选出新增和变化的)

第二个JOB(XXXX_DS20)对第一步处理后的数据进行Trim和简单的校验格式处理确认、以及生成各个目的表所需要的KEY值(比如各IP主题区域表的?

IP_ID)

第三步JOB(XXXX_DS30)将第二步处理后的数据分别映射到各个目的表、

代码转换,最终形成按照各个目的表结构的文件。

注:以下的说明及示例中所列例子都是为本文档所需而命名,具体的各个文件和STAGE的命名规范请参考《CZB-ETL-Design-V1.1.doc》

2. DS 处理流程

2.1. 组件快速使用

2.1.1. 数据源结构导入及筛选

在进行Ds 开发中,会经常对表结构进行导入导出,已加快ETL 开发速度。下面是对元数据结构如何导入和保存进行整体说明。

2.1.1.1. 结构导入

在DataStage Designer的工作窗口左边“Repository”区域的“Table Definitions”中导入数据源的表结构。导入数据结构存放路径参考《CZB-ETL-DESIGN-V1.1.DOC》。如下图:

2.1.1.2. 数据源结构筛选

导入的数据源结构跟实际ETL所需要进行加工的结构是不同的,ETL只对源结构中有用的字段进行加工,所以真正实际的数据源文件中所包含的字段是不同于数据源表的字段。DataStage要处理这些文件就需要对导入的数据源结构字段进行筛选。在“Repository”中找到该表结构,打开该表字段属性的界面,然后根据《》接口文档对不用的字段进行删除。如下图:

2.1.2. DS函数的查看

在开发过程的映射中需要用到很多函数,比如用于查询IP主题的IP_ID函数IPLookUp,查询AR主题的AR_ID函数的ARLookUp等。所有函数查看都可以使用DS manager Client 登录服务进行查看。(注意事项:不是原作者请不要进行修改其代码 ,查看后,点击 cancel) 如下图:

2.1.3. Shared Containers的查看

在开发过程中有些STAGE可以直接使用公共的“Shared Container”,比如IP主题的TEL_ADR表,只要知道电话及其该电话在IP主题中的IP_ID就可以通过名为“TELC”的Container生成符合TEL_ADR结构的文件。

(注意事项: 不是具体的源作者不要进行任何修改。否则可能对其它的引用此job的job 带来影响。打开查看后,点击cancel ,不要点击 Ok。)

如下图:

2.2. DS10CD 2.2.1. Introduction

DS10主要完成将数据源的连续2天的文件进行比较以选出新增和UPDATE的记录到CHANGE文件以供后续步骤进行。 主要STAGE包括:

? 3个Sequential File(T-1文件和T文件以及CHANGE文件)

? 1个Hashed File(T-1文件经过CRC处理后生成) ? 2个Transformer(T-1文件进行CRC处理以及比较处理) ? 2个InterProcess(分别代表比较后ins和upd的数据流) ? 1个Link Collector(将ins和upd的数据流进行合并)。

2.2.2. 参数

DS10CD 包含如下参数:

$PATH_SRC_XXXX(XXXX表示源系统如“ACMS”,下同):数据源文件存放路径 $PATH_HASH_XXXX:Hashed File存放路径 $PATH_WRK_XXXX:中间处理文件及最终文件存放路径

以上三个参数 是由Ds 的环境变量,统一添加。要添加请提交DS Project 的管理人员,暂为 :商军雷

DATEOLD:处理日上一日日期 DATENEW:处理日当日日期

注:参数命名规范请参考《CZB-ETL-DESIGN-V1.1.DOC》

2.2.3. STAGE流程 2.2.3.1. 流程方向说明

1) 前一日数据源文件到CRC处理

2) CRC处理到HASH文件

3) HASH文件和当日数据源文件到比较处理 4) 比较处理分开INSERT和UPDATE数据流 5) INSERT和UPDATE合并 6) 合并数据到第一步生成文件

示例图(本图未按照命名规范来实施,请参照具体的命名规范)

注:以上示图各个STAGE和LINK命名在实际开发中请参考《CZB-ETL-DESIGN-V1.1.DOC》

2.2.3.2. STAGE属性

a. SRC_OLD_FILE:

Stage的sheet中NLS选择“MS936”

Outputs的sheet中General界面的“File name”填入

“#$PATH_SRC_XXXX#/YYYY#DATEOLD#”,其中“XXXX”指源系统如“ACMS”,“YYYY”指该处理的源系统文件名如“IND_INFO”,“DATEOLD”指处理日前一日,必须跟参数的“处理日上一日”命名一样。

Outputs的sheet中Columns界面表示该文件的字段,可以直接导入:

b. SRC_OLD_HASH

Stage的sheet中Directory path填入“#$PATH_HASH_XXXX#”其中“XXXX”表示源系统如“ACMS”

Inputs和Outputs中的columns的字段导入参考“SRC_OLD_FILE”的Stage属性说明 c. SRC_OLD_CR

在右上部分的映射表中除了KEY外每个字段都需经过“CRC()”的处理,如下图:

d. SRC_NEW_FILE

Outputs的sheet中General界面的“File name”填入处理日当日文件,命名参考《CZB-ETL-DESIGN-V1.1.DOC》

其他部分与“SRC_OLD_FILE”的Stage属性说明一致

e. TODAY_INS/TODAY_UPD/TODAY_COLLECT

Inputs和Outputs中的columns的字段导入参考“SRC_OLD_FILE”的Stage属性说明。注:此处因为已经是经过了连续2天的数据文件的比较,所以在数据源表原有的字段最

后再增加一列“ETL_FLAG”以表示该记录是“I”还是“U”(新增和修改)

f. Update

左上部分的2个输入文件其KEY值的约束关系需填入,如下图:

将左上部分“NEW_2_COMP”表里的字段全部选中拖拽到右上部分的“COMP_2_INS”中;左上部分则将表里的除了KEY外每个字段进行比较,函数CompaCol(HASH_2_COMP.Col1,NEW_2_COMP.Col1)目的:如果记录比较发生变化则取变化值,否则就置“BDWNOCHG”。如下:

最后一个“ETL_FLAG”字段,“COMP_2_UPD”中填“U”,“COMP_2_INS”填“I” 右上部分2张表都需要填如约束条件

“COMP_2_INS”填“ISNULL(HASH_2_COMP.KeyCol)”即表示2张表比较只要旧文件中不存在而新文件中存在即认为是新增记录应该进入“COMP_2_INS”; “COMP_2_UPD”填:

Not(isnull (HASH_2_COMP.KeyCol)) and (

(NEW_2_COMP.KeyCol <> HASH_2_COMP.KeyCol) or (crc (NEW_2_COMP.Col1) <> HASH_2_COMP.Col1) or (crc (NEW_2_COMP.Col2 ) <> HASH_2_COMP.Col2 ) or

……

即表示2张比较只要同一KEY的2条记录有一个字段发生变化即认为是修改记录进入“COMP_2_UPD”

g. TODAY_CHANGE

Outputs的sheet中General界面的“File name”填入2天文件比较后生成的当日变化记录文件,命名参考《CZB-ETL-DESIGN-V1.1.DOC》

其他部分与“SRC_OLD_FILE”的Stage属性说明一致

注意:在DS10 job 中 有时一些字段的值需要在后期进行查询等操作。需要全程保留。具体的情况再具体分析。

2.3. DS20

2.3.1. Introduction

DS20主要完成将第一步DS10处理之后的数据进行简单的处理之后生成一些关键字段的ID_KEY。比如IP_ID,AR_ID等。

主要STAGE包括:

2个Sequential File(DS20的输入和输出文件)。

2个Transformer(一个输入文件的Trim和简单校验,一个生成各种ID_KEY)。 一个InterProcess(连接2个Transformer)。

2.3.2. 参数

$PATH_WRK_XXXX:中间处理文件及最终文件存放路径 DATE:处理日当日日期

注:参数命名请参考《CZB-ETL-DESIGN-V1.1.DOC》

2.3.3. STAGE流程 2.3.3.1. 流程方向说明

DS10后的处理文件“TODAY_CHANGE”到Transformer“VALIDATE”进行Trim和简单校验; VALIDATE经过InterProcess到GENERATEKEY进行ID_KEY的生成; a. 示例图

2.3.3.2. STAGE属性

a. TODAY_CHANGE

Outputs的sheet中General界面的“File name”填入第一步DS10种最后一步所生

成的文件(即与DS10的TODAY_CHANGE一致),命名参考

《CZB-ETL-DESIGN-V1.1.DOC》。

其他部分与“DS10”的“TODAY_CHANGE”Stage一致

b. VALIDATE

将左上部分“CHG_2_VALI”的字段全部选中拖拽到右上部分的VALI_2_PROC中。

除了KEY外其他字段都进行Trim处理。 电话字段需加入“ValidatePhone”,日期字段需调用函数(如AcmsToDateFormat)将该日期转成DW所需的标准格式“YYYY-MM-DD”,时间字段亦此。

c. INTERPROCESS

处理方式与DS10中的“TODAY_INS/TODAY_UPD”Stage一致

d. GENERATEKEY

先将左上部分“PROC_2_GENKEY”的全部字段拖拽到右上部分; 通过映射文档统计需要查询ID_KEY的字段。比如需要电话的LOID,工作地址的LOID,

客户的IP_ID,操作员的IP_ID等;这些新的DS 表结构需要保存,后期有可能用到。保存遵循 规范《CZB-ETL-DESIGN-V1.1.DOC》。

在右上部分的“StageVariables”表里增加变量,将统计出来的所需ID的变量放入,表左半部分的“Derivation”则根据映射文档中的函数填入,如下图:

在ST2_T中的Column Name中增加字段,将查询到的各个ID值也放入到Stage文件里,如下图:

e. CHANGE_KEY

Outputs的sheet中General界面的“File name”填入生成ID_KEY之后的文件,命名参考《CZB-ETL-DESIGN-V1.1.DOC》

其他部分与“DS10”的“TODAY_CHANGE”Stage一致。

注:这里又在原表的基础上新增了各个ID_KEY的字段。

2.4. DS30

上图是DS30的范例,名称:IND_INFO_DS30,存放路径:Jobs\\01_SRC\\0102_ACMS\\IND_INFO,命名规则和存放路径参考文档《CZB-ETL-Design-V1.1.doc》。

这个JOB的功能是代码表转换、文件拆分,最终生成BDW 准实体SSA文件。 规则是从源文件IND_INFO_SK通过Split得到目标文件IND_INFO_SPLIT。其中IND_INFO_SK是由IND_INFO_DS20产生的,Split是一个Transformer,在其中制定拆分规则,IND_INFO_SPLIT标识产生的目标文件,文件可能有多个。其拆分规则需要参照 ETL Mapping 规则文档。

2.4.1. 定义参数

如下图,进入JOB的Job Properties。

选择Parameters标签,填入需要定义的参数。

上图是我们需要的参数。工作路径($PATH_WRK_ACMS$)是环境变量,点击右下方的”Add Environment Vaiable”选择,不同的源系统需要选择不同的环境变量。W_DT表示数据日期,不带分割符。C_DT表示SSA文件内使用的数据日期,有分隔符。SRC_STM_ID表示源系统ID,JOB_SEQ_ID表示任务序号。

2.4.2. 定义入口文件

在该范例中即定义IND_INFO_SK,如下图。

在上图中查看IND_INFO_SK文件的Properties,在Outputs标签内的General中需要定义File name,命名规则参考《CZB-ETL-Design-V1.1.doc》,需要注意该文件需要和IND_INFO_DS20产生的文件一致。

还需要在Columns标签内填入入口文件的数据结构,如下图。可以将IND_INFO_DS20的生成文件的数据结构保存,便于在这里导入。这一步也可以在Transformer内做。

2.4.3. Shared Container

简单的拆分可以直接link,从Split到目标文件不需要特别处理,但是有些复杂处理需要使用Shared Containers。另外,很多针对目标表的映射已经定义了公用的Shared Containers,使用Shared Containers可以根据它的Input数据结构构造入口数据,Shared Containers可以自动完成对目标表的映射。这样可以简化拆分工作,减少错误。

上图是对Email拆分调用的Shared Container,点击Open可以查看Shared Container的内容。 查看Properties,我们需要对调用的Shared Container做一些设置,如下图:

需要在Stage的Properties中填入需要传入的参数。

在Inputs的General内点击Validate可以传入Shared Container的入口结构,可以在Columns内查看。

在Outputs的General内点击Validate可以传入Shared Container的出口结构,可以在Columns内查看。

2.4.4. 定义出口文件

如下图,由于可能由多个输出文件,需要定义每个文件的名称。

命名规则参考《CZB-ETL-Design-V1.1.doc》,实际上我们只需要修改目标表的名称,如下图。

2.4.5. Split

Split是个Transformer,需要在其中根据《CZB ETL mapping.xls》内的Mapping Detail表内定义的Mapping规则完成数据转换。

可以使用变量,如果多次使用同一个值,定义变量可以简化维护工作。如下图,进入Stage Properties

在Stage的Variables中定义需要使用的变量。

还需要在Constraints内定义过滤条件,如下图:

需要注意的是,在公用的Shared Container中一般已经加入了过滤条件,在这里就不需要添加了。

2.4.6. 描述

在Job Properties的General标签内,我们可以在Full job description中填入任务的描述,便

于以后生成工作文档。格式参考《CZB-ETL-Design-V1.1.doc》

2.4.7. 测试

编译后,可以用Run Director测试Job是否正确,查看生成的文件可以右击IND_INFO_SPLIT,View…data。

2.5. DS_XXXX_YYYY(调用命令)

其中XXXX表示源系统,YYYY表示源系统的表名

2.5.1. 参数

DATEOFDATA:处理日日期,格式“YYYYMMDD” JOB_SEQ_ID:该源系统表在DS的处理序号,具体参考《CZB-ETL-DESIGN-V1.1.DOC》

2.5.2. Job Control

查看“Properities”的“Job Contral”sheet,需要对“Job Contral”里面的控制函数进行修改。

需修改处如下面红色标记处,换为需处理的表和参数

***Define the date format var through the detailed source system. *Function GetJobLogs(JobNM,FilePath1)

Deffun GetJobLogs(A,B) Calling \

WDT=DATEOFDATA[1,4]:\注:此处的WDT的值处理日日期,格式为“YYYY-MM-DD” SRC_STM_ID=1

注:此处的SRC_STM_ID的值为各原系统ID Core 1 Acms 2 Opics 3 Auth 4

****Generate the last date

LWDT=oconv(iconv(DATEOFDATA,\A,\[9,2]

注:此处的LWDT的值为处理日上一日日期,格式为“YYYYMMDD”

*Set the Job ready to Run

FilePath=\IFJBXX_\

注:生成的日志log。用来记录DS job 运行情况。

* Setup IFJBXX_DS10CD, run it, wait for it to finish, and test for success JobName = \IFJBXX_DS10CD\ GoToMainExit = \ GoSub RecoveryCheck

If GoToMainExit = \ GoTo MainExit End

hJob1 = DSAttachJob(\IFJBXX_DS10CD\ If NOT(hJob1) Then

Call DSLogFatal(\IFJBXX_DS10CD\ Abort End

ErrCode = DSSetParam(hJob1, \OLDDATE\ ErrCode = DSSetParam(hJob1, \DATE\ ErrCode = DSRunJob(hJob1, DSJ.RUNNORMAL) ErrCode = DSWaitForJob(hJob1)

Status = DSGetJobInfo(hJob1, DSJ.JOBSTATUS)

If Status = DSJS.RUNFAILED Or Status = DSJS.CRASHED Then * Fatal Error - No Return

ErrCode=GetJobLogs(JobName,FilePath)

Call DSLogFatal(\IFJBXX_DS10CD\

End

* Setup IFJBXX_DS20 ,run it, wait for it to finish, and test for success JobName = \IFJBXX_DS20\ GoToMainExit = \ GoSub RecoveryCheck

If GoToMainExit = \ GoTo MainExit End

hJob2 = DSAttachJob(\IFJBXX_DS20\

If NOT(hJob2) Then

Call DSLogFatal(\IFJBXX_DS20\ Abort

End

ErrCode = DSSetParam(hJob2, \DATE\

ErrCode = DSRunJob(hJob2, DSJ.RUNNORMAL) ErrCode = DSWaitForJob(hJob2)

Status = DSGetJobInfo(hJob2, DSJ.JOBSTATUS)

If Status = DSJS.RUNFAILED Or Status = DSJS.CRASHED Then * Fatal Error - No Return

ErrCode=GetJobLogs(JobName,FilePath)

Call DSLogFatal(\ IFJBXX_DS20\ End

** Setup IFJBXX_DS30 ,run it, wait for it to finish, and test for success JobName = \IFJBXX_DS30\ GoToMainExit = \ GoSub RecoveryCheck

If GoToMainExit = \ GoTo MainExit End

hJob3 = DSAttachJob(\IFJBXX_DS30\ If NOT(hJob3) Then

Call DSLogFatal(\ IFJBXX_DS30\ Abort End

ErrCode = DSSetParam(hJob3, \DATE\ ErrCode = DSSetParam(hJob3, \DEALDATE\

ErrCode = DSSetParam(hJob3, \ ErrCode = DSSetParam(hJob3, \ ErrCode = DSRunJob(hJob3, DSJ.RUNNORMAL) ErrCode = DSWaitForJob(hJob3)

Status = DSGetJobInfo(hJob3, DSJ.JOBSTATUS)

If Status = DSJS.RUNFAILED Or Status = DSJS.CRASHED Then * Fatal Error - No Return

ErrCode=GetJobLogs(JobName,FilePath)

Call DSLogFatal(\IFJBXX_DS30\ End

****###############################################################################

RecoveryCheck:

JobHandle = \

JobHandle = DSAttachJob(JobName, DSJ.ERRFATAL)

If NOT(JobHandle) Then

Call DSLogFatal(\ Abort End

Else

Status = DSGetJobInfo(JobHandle, DSJ.JOBSTATUS)

JobInfoTime = DSGetJobInfo(JobHandle, DSJ.JOBSTARTTIMESTAMP) JobStatusDate = Iconv(JobInfoTime[1,10], \

If JobRunStatus = \

If Status = DSJS.RUNOK Or Status = DSJS.RUNWARN Then

If JobStatusDate < @DATE Then

Call DSLogInfo(\: \: JobName : \job ready to run.\\

ErrCode = DSDetachJob(JobHandle) Return End Else

Call DSLogWarn(\JobInfoTime, \ GoToMainExit = \ Return End

End

End

If JobRunStatus = \

If Status = DSJS.RUNFAILED Or Status = DSJS.CRASHED Or Status = DSJS.STOPPED Or Status = DSJS.RESET Then

Call DSLogInfo(\: \: JobName : \job aborted in previous run\\

ErrCode = DSDetachJob(JobHandle) GoSub ResetJob

JobRunStatus = \ Return End Else

Call DSLogInfo(\: \: JobName : \job status was not aborted or stopped. Therefore job not supposed to be run.\ GoToMainExit = \ Return End End End

******If the Job's status is not compiled or aborted then Reset it. ResetJob:

JobHandle = \

JobHandle = DSAttachJob(JobName, DSJ.ERRFATAL) If NOT(JobHandle) Then

Call DSLogFatal(\ Abort End

Else

Status = DSGetJobInfo(JobHandle, DSJ.JOBSTATUS)

If Status = DSJS.RUNFAILED Or Status = DSJS.CRASHED Or Status = DSJS.STOPPED

Then

Call DSLogInfo(\ ErrCode = DSRunJob(JobHandle, DSJ.RUNRESET) ErrCode = DSWaitForJob(JobHandle)

Status = DSGetJobInfo(JobHandle, DSJ.JOBSTATUS)

If Status = DSJS.RUNFAILED Or Status = DSJS.CRASHED Or Status = DSJS.STOPPED Then

Call DSLogFatal(\: \: JobName : \to reset\\ End Else

Call DSLogInfo(\\: JobName : \sucessfully reset.\\

ErrCode = DSDetachJob(JobHandle) Return End

End Else

ErrCode = DSDetachJob(JobHandle) Return

End End

*退出

MainExit:

本文来源:https://www.bwwdw.com/article/we7o.html

Top