This article describes an example of creating and executing an ETL task with source data filtering.
The repository must contain two tables T_Source, T_Destination», the tables must be identical in structure and must contain the ID field. Values of this field are used to set up filtering condition. The repository must also contain an ETL task with the ETLTASKS identifier. On executing the example presented below three objects are created in the ETL task: repository source, the Filter converter and repository consumer. The required properties are defined for all objects, links are set and filtering condition is defined:
After objects are created and saved, the ETL task is executed. The similar code applied to different objects is placed into separate procedures or functions.
Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Sub UserProc;
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumer: IEtlPlainDataConsumer;
MetabaseConsumer: IDtMetabaseConsumer;
Filter: IEtlPlainDataFilter;
Link, Link1: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields, FilterInFields, FilterOutFields, ConsFields: IEtlPlainFields;
Begin
//Open the ETL task
MB := MetabaseClass.Active;
ETLTask := MB.ItemById("ETLTASKS").Edit As IEtlTask;
//Create provider
EtlProvider := ETLTask.Create(EtlObjectType.PlainDataMetabaseProvider) As IEtlPlainDataProvider;
EtlProvider := EtlProvider.Edit;
EtlProvider.Id := "Metabase_Provider";
EtlProvider.Name := "Import from table";
MetabaseProvider := EtlProvider.Provider As IDtMetabaseProvider;
MetabaseProvider.Dataset := MB.ItemById("T_Source").Bind As IDatasetModel;
EtlProvider.FillDefault;
//Create visual object the source
CreateWX(ETLTask, EtlProvider, -60, 0);
//Create consumer
EtlConsumer := ETLTask.Create(EtlObjectType.PlainDataMetabaseConsumer) As IEtlPlainDataConsumer;
EtlConsumer := EtlConsumer.Edit;
EtlConsumer.Id := "Metabase_Consumer";
EtlConsumer.Name := "Export to table";
MetabaseConsumer := EtlConsumer.Consumer As IDtMetabaseConsumer;
MetabaseConsumer.Dataset := MB.ItemById("T_Destination").Bind As IDatasetModel;
//Create a visual object of consumer
CreateWX(ETLTask, EtlConsumer, 60, 0);
//Create the Filter converter
Filter := ETLTask.Create(EtlObjectType.PlainDataFilter) As IEtlPlainDataFilter;
Filter := Filter.Edit;
Filter.Id := "Filter_Transform";
Filter.Name := "Filter";
//Create a visual object of transformer
CreateWX(ETLTask, Filter, 0, 0);
//Fill in lists of fields of all objects based on source fields list
//Get fields list
ProvFields := EtlProvider.PlainOutput.Fields; //Source output
FilterInFields := Filter.PlainInput.Fields; //Transformer input
FilterOutFields := Filter.PlainOutput.Fields; //Transformer output
ConsFields := EtlConsumer.PlainInput.Fields; //Consumer input
//Fill in fields lists
FillFields(FilterInFields, ProvFields);
FillFields(FilterOutFields, FilterInFields);
FillFields(ConsFields, FilterOutFields);
//Set up filtering parameters: skip all records, for which ID<30
Filter.FilterIf.AsString := Filter.PlainInput.Id + ".ID<30";
Filter.KeepIfFulfillCondition := False;
//Save all objects
EtlProvider.Save;
Filter.Save;
EtlConsumer.Save;
//Link of transformer with all objects
//Create links
Link := CreateLink(ETLTask, EtlProvider.PlainOutput, Filter.PlainInput);
Link1 := CreateLink(ETLTask, Filter.PlainOutput, EtlConsumer.PlainInput);
//Create visual links Shapes:=ETLTask.Workspace.Shapes; CreateWXLink(ETLTask,Shapes.FindById(EtlProvider.Id),Shapes.FindById(Filter.Id),Link); CreateWXLink(ETLTask,Shapes.FindById(Filter.Id),Shapes.FindById(EtlConsumer.Id),Link1); //Save task (ETLTaskAsIMetabaseObject).Save; //Perform task ETLTask.Execute(Null); EndSubUserProc; //Create visual objects SubCreateWX(ETLTask:IEtlTask;ETLObject:IEtlObject;XPosition:Integer;YPosition:Integer); Var WxRect:IWxRectangle;
WxETLObj:IWxEtlObject; Begin WxRect:=ETLTask.Workspace.CreateRectangle; WxRect.Id:=ETLObject.Id; WxETLObj:=NewWxEtlObject.Create; WxETLObj.EtlObject:=ETLObject; WxRect.Style.TextPosition:=WxTextPosition.Bottom; WxRect.Style.PictureMarginTop:=-10; WxRect.PinPosition:=NewGxPointF.Create(XPosition,YPosition); WxRect.Extension:=WxETLObjAsIWxShapeExtension; EndSubCreateWX; //Fill lists of input/output lists SubFillFields(Fields,FieldsSource:IEtlPlainFields); Begin Fields:=Fields.Edit;
Fields.Fill(FieldsSource);
Fields.Save;
End Sub FillFields;
//Create a link between objects
Function CreateLink(ETLTask: IEtlTask; Output: IEtlPlainOutput; Input: IEtlPlainInput): IEtlPlainLink;
Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink;
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault;
Return Link;
EndFunctionCreateLink; //Create visual links SubCreateWXLink(ETLTask:IEtlTask;Shape1:IWxShape;Shape2:IWxShape;Link:IEtlPlainLink); Var WLink:IWxLink; WxETLLink:IWxEtlObject; Begin //Create visual objects of the link WLink:=ETLTask.Workspace.AutoLinkShapes(Shape1,Shape2); WLink.Style.LinePenBeginWxCap:=WxLineCap.Flat; WLink.Style.LinePenEndWxCap:=WxLineCap.Arrow30DegreeFilled; WxETLLink:=NewWxEtlObject.Create; WxETLLink.EtlObject:=Link; WLink.Extension:=WxETLLinkAsIWxShapeExtension; EndSubCreateWXLink;
The specified procedure is an entry point for .NET assembly. Add links to the Andy, Db, Drawing, Dt, Etl, Metabase system assemblies.
Imports Prognoz.Platform.Interop.Andy;
Imports Prognoz.Platform.Interop.Db;
Imports Prognoz.Platform.Interop.Drawing;
Imports Prognoz.Platform.Interop.Dt;
Imports Prognoz.Platform.Interop.Etl;
Imports Prognoz.Platform.Interop.Metabase;
Public Shared Sub Main(Params: StartParams);
Var
MB: IMetabase;
ETLTask: IEtlTask;
EtlProvider: IEtlPlainDataProvider;
MetabaseProvider: IDtMetabaseProvider;
EtlConsumer: IEtlPlainDataConsumer;
MetabaseConsumer: IDtMetabaseConsumer;
Filter: IEtlPlainDataFilter;
Link, Link1: IEtlPlainLink;
Shapes: IWxShapes;
ProvFields,FilterInFields,FilterOutFields,ConsFields:IEtlPlainFields; Begin //Open the ETL task MB:=Params.Metabase; ETLTask:=MB.ItemById["ETLTASKS"].Edit()AsIEtlTask; //Create source EtlProvider:=ETLTask.Create(EtlObjectType.eotPlainDataMetabaseProvider)AsIEtlPlainDataProvider; EtlProvider:=EtlProvider.Edit(); EtlProvider.Id:="Metabase_Provider"; EtlProvider.Name:="Import from the table"; MetabaseProvider:=EtlProvider.ProviderAsIDtMetabaseProvider;
MetabaseProvider.Dataset:=MB.ItemById["T_Source"].Bind()AsIDatasetModel; EtlProvider.FillDefault(); //Create visual object of the provider CreateWX(ETLTask,EtlProvider,-60,0); //Create a consumer EtlConsumer:=ETLTask.Create(EtlObjectType.eotPlainDataMetabaseConsumer)AsIEtlPlainDataConsumer; EtlConsumer:=EtlConsumer.Edit(); EtlConsumer.Id:="Metabase_Consumer"; EtlConsumer.Name:="Export to the table"; MetabaseConsumer:=EtlConsumer.ConsumerAsIDtMetabaseConsumer; MetabaseConsumer.Dataset:=MB.ItemById["T_Destination"].Bind()AsIDatasetModel; //Create visual object of the consumer CreateWX(ETLTask,EtlConsumer,60,0); //Create the Filtering transformer Filter:=ETLTask.Create(EtlObjectType.eotPlainDataFilter)AsIEtlPlainDataFilter; Filter:=Filter.Edit(); Filter.Id:="Filter_Transform";
Filter.Name:="Filtering"; //Create visual object of transformer CreateWX(ETLTask,Filter,0,0); //Fill fields lists of all objects basing on the provider fields //Get fields list ProvFields:=EtlProvider.PlainOutput.Fields;//Provider output FilterInFields:=Filter.PlainInput.Fields;//Transformer input FilterOutFields:=Filter.PlainOutput.Fields;//Transformer output ConsFields:=EtlConsumer.PlainInput.Fields;//Consumer input //Fill lists of fields FillFields(FilterInFields,ProvFields); FillFields(FilterOutFields,FilterInFields); FillFields(ConsFields,FilterOutFields);
//Filtering parameters setting:skip all records for which ID<30 Filter.FilterIf.AsString:=Filter.PlainInput.Id+".ID<30"; Filter.KeepIfFulfillCondition:=False; //Save all objects EtlProvider.Save(); Filter.Save(); EtlConsumer.Save(); //Link of the transformer with all objects //Create links Link:=CreateLink(ETLTask,EtlProvider.PlainOutput,Filter.PlainInput); Link1:=CreateLink(ETLTask,Filter.PlainOutput,EtlConsumer.PlainInput); //Create visual links Shapes:=ETLTask.Workspace.Shapes; CreateWXLink(ETLTask,Shapes.FindById(EtlProvider.Id),Shapes.FindById(Filter.Id),Link);
CreateWXLink(ETLTask,Shapes.FindById(Filter.Id),Shapes.FindById(EtlConsumer.Id),Link1); //Savetask (ETLTaskAsIMetabaseObject).Save(); //Executetask ETLTask.Execute(Null); EndSub; //Createvisualobjects PublicSharedSubCreateWX(ETLTask:IEtlTask;ETLObject:IEtlObject;XPosition:Integer;YPosition:Integer); Var GxPointFCls:GxPointF=NewGxPointFClass(); WxRect:IWxRectangle; WxETLObj:IWxEtlObject=NewWxEtlObject(); Begin WxRect:=ETLTask.Workspace.CreateRectangle(); WxRect.Id:=ETLObject.Id; WxETLObj.EtlObject:=ETLObject;
WxRect.Style.TextPosition:=WxTextPosition.wtpBottom; WxRect.Style.PictureMarginTop:=-10; GxPointFCls.Create(XPosition,YPosition); WxRect.PinPosition:=GxPointFCls; WxRect.Extension:=WxETLObjAsIWxShapeExtension; EndSub; //Createvisuallinks PublicSharedSubCreateWXLink(ETLTask:IEtlTask;Shape1:IWxShape;Shape2:IWxShape;Link:IEtlPlainLink); Var WLink:IWxLink; WxETLLink:IWxEtlObject=NewWxEtlObject(); Begin //Createvisuallinkobjects WLink:=ETLTask.Workspace.AutoLinkShapes(Shape1,Shape2); WLink.Style.LinePenBeginWxCap:=WxLineCap.wlcFlat; WLink.Style.LinePenEndWxCap:=WxLineCap.wlcArrow30DegreeFilled;
WxETLLink.EtlObject:=Link; WLink.Extension:=WxETLLinkAsIWxShapeExtension; EndSub; //Filllistsof input/outputfields PublicSharedSubFillFields(Fields,FieldsSource:IEtlPlainFields); Begin Fields:=Fields.Edit(); Fields.Fill(FieldsSource); Fields.Save(); EndSub; //Createa linkbetweenobjects PublicSharedFunctionCreateLink(ETLTask:IEtlTask;Output:IEtlPlainOutput;Input:IEtlPlainInput):IEtlPlainLink; Var
Link: IEtlPlainLink;
Begin
Link := ETLTask.CreatePlainLink();
Link.SourceObjectOutput := Output;
Link.DestinationObjectInput := Input;
Link.FillDefault();
Return Link;
End Function;
See also:
Examples | IEtlPlainDataFilter