How to achieve 100% dynamic data pipeline (3)

xiaoxiao2021-03-06  83

Let's take a look, how to build a pipeline syntax according to the data of the intermediate layer:

1. First create a data window object: d_vdtcolumns

SQL syntax is:

SELECT vdt_columns.utid, vdt_columns.uid, vdt_columns.upkey, vdt_columns.udmid, vdt_columns.udmname, vdt_columns.unulls, vdt_columns.uwidth, vdt_columns.uscale, vdt_columns.uname, vdt_columns.udefault, vdt_columns.ucheck, vdt_columns.utname, vdt_columns .uidententity from vdt_columns where utName =: as_tname

2. Prepare for work, the following is the main battlefield, start building a data pipeline.

Considering a pipe object can transmit multiple tasks, establish an object NVO_PIPETRANSATTRIB to save the syndrome of the transmission: it contains the Instance variable:

String is_Objectname // Name String is_syntax // Piping Syntax

String is_sconnect = 'zw', is_dconnect = 'daixf' // source database connection and destination database connection string is_ptype, is_pcommit, is_ERRORS // Piping Several attribute string is_sname, is_dname // source table name, destination table String is_sqlsyntax / / Pipe SQL syntax

Establish an object to inherit from the data pipeline object.

Start constructing syntax: write a function. NVO_PIPETRANSATTRIB INV_ATTRIB []

String Ls_SYNTAX, LS_SOURESYNTAX, LS_DESTSYNTAX

int li, lj, li_ind, li_find, li_rows, li_identity string ls_tablename, ls_default, ls_defaultvalue, ls_pbdttype boolean lb_find dec ld_uwidth, ld_prec, ld_uscale string ls_types, ls_dbtype, ls_prikey, ls_name, ls_nulls, ls_msg, ls_title = 'of_constrpipesyntax ()' nvo_string lnv_string NVO_DATASTORE LDS_VDTCOLUMNS BOOLEAN LB_KEY

LDS_VDTCOLUMNS = CREATE NVO_DATASTORE LDS_VDTCOLUMNS.DATAOBJECT = 'D_VDTCOLUMNS' LDS_VDTCOLUMNS.SETTRANSOBJECT (SRCSQLCA) Li = 1

of_input (inv_attrib [li]) li_find = pos (inv_attrib [li] .is_sqlsyntax, '*', 1) if li_find> 0 then lds_vdtcolumns.retrieve (as_tablename) of_filterimg (lds_vdtcolumns) li_rows = lds_vdtcolumns.rowcount () for lj = 1 to li_rows ls_name = lds_vdtcolumns.getitemstring (lj, 'uname') ls_types = lds_vdtcolumns.getitemstring (lj, 'udmname') li_identity = lds_vdtcolumns.getitemnumber (lj, 'uidentity') ls_types = of_getpipedbtype (is_s_dbtype, ls_types) ls_pbdttype = of_getpbdttype ( is_s_dbtype, ls_types) choose case ls_types case 'char', 'varchar', 'nchar', 'nvarchar', 'long varchar' if ls_types = 'long varchar' then ls_types = 'varchar' ld_uwidth = lds_vdtcolumns.getitemnumber (lj, ' uwidth ') ls_dbtype = ls_types ' ( ' string (int (ld_uwidth)) ') 'case' decimal ',' numeric 'ld_uwidth = lds_vdtcolumns.getitemnumber (lj,' uwidth ') ld_uscale = lds_vdtcolumns.getitemnumber (lj,' USCALE ') IF li_identity = 1 THEN LS_DBTYPE =' Identity ' ' (' string (int (ld_uwidth) ', ' string (int (ld_uscale)) ') 'Else Ls_dbty pe = ls_types '(' string (int (ld_uwidth)) ',' string (int (ld_uscale)) ')' end if case else ls_dbtype = ls_types end choose ls_prikey = lds_vdtcolumns.getitemstring (lj, 'upkey' IF ls_prikey = 'y' Then LB_Key = true ls_prikey = 'key = yes,' else ls_prikey = '' end if ls_nulls = lds_vdtcolumns.getItemString (lj, 'unulls') if ls_nulls =

'Y' then ls_nulls = 'yes' else ls_nulls = 'no' end if ls_default = isnull (lds_vdtcolumns.getitemstring (lj, 'udefault'), '') ls_sourcesyntax = "COLUMN (type =" ls_pbdttype ", name = ~ "" ls_name "~", dbtype = ~ "" ls_dbtype "~", " ls_prikey " nulls_allowed = " ls_nulls ") ~ r ~ n "if ls_default = '' then if li_identity = 1 then ls_destsyntax =" COLUMN (Type = " LS_PBDTTYPE ", Name = ~ " LS_NAME " ~ ", DBTYPE = ~" " LS_DBTYPE " ~ "," LS_PRIKEY "NULLS_ALLOWED =" LS_NULLS ", Initial_Value = ~" Exclude ~ ") ~ r ~ n "else ls_destsyntax =" COLUMN (type = " ls_pbdttype ", name = ~ "" ls_name "~", dbtype = ~ "" ls_dbtype "~", " ls_prikey " nulls_allowed = " ls_nulls ") ~ r ~ n "end if else if li_identity = 1 THEN LS_DESTSYNTAX =" Column (type = " ls_pbdttype ", name = ~ " ls_name " ~ ", dbtype = ~" " ls_dbtype " ~ "," LS_PRIKEY "nulls_allowed =" ls_nulls ", default_value = ~" " ls_default " ~ ", initial_value = ~" exclude ~ ") ~ r ~ n" else ls_destsyntax = "COLUMN (type =" ls_pbdttype ", name = ~" " LS_NAME "~", dbtype = ~ "" ls_dbtype "~", " ls_prikey "

nulls_allowed = " ls_nulls ", default_value = ~ "" ls_default "~") ~ r ~ n "end if end if next else return '' end if ls_sourcesyntax = ')' ls_destsyntax = ')' // generate PIPELINE // example: // PIPELINE (source_connect = csfdata, destination_connect = csfdata, type = replace, commit = 100, errors = 100, keyname = "Bar_x") if lb_key then ls_syntax = 'PIPELINE (source_connect =' inv_attrib [li] .is_sconnect ', destination_connect =' inv_attrib [li] .is_dconnect ', type =' inv_attrib [li] .is_ptype ', commit =' inv_attrib [li] .is_pcommit ', errors =' inv_attrib [li] .is_errors ', keyname = " ' as_tablename ' _ x") ~ r ~ n 'else ls_syntax =' PIPELINE (source_connect = ' inv_attrib [li] .is_sconnect ', destination_connect = ' inv_attrib [li] .is_dconnect ', type = ' inv_attrib [li] .is_ptype ', commit =' inv_attrib [li] .is_pcommit ', errors =' INV_ATTRIB [li] .Ind iver ') ~ r ~ n' end if

// generate source // example: // source (name = "bar", column (type = char, name = "customcode", dbtype = "char (8)", key = yes, nulls_allowed = no) LS_SYNTAX = ' Source (Name = " INV_ATTRIB [li] .is_sname '",'

LS_SYNTAX = Ls_SourceSyntax

// generate retrieve // ​​example: // Retrieve (statement = "Select Bar.customcode, Bar. Barcode, bar.Itemcode, bar.metering, bar.Packsize, bar.length, bar.width, bar.high, bar. Vol, Bar.Weight, Bar.NewPackFlagFROM Bar ") ls_syntax = 'RETRIEVE (statement ="' inv_attrib [li] .is_sqlsyntax ' ")' // generate DESTINATION // example: // DESTINATION (name =" Bar_copy ", // column (type = char, name = "customcode", dbtype = "char (8)", key = yes, nulls_allowed = no, initial_value = "Spaces") LS_SYNTAX = 'Destination (Name = "' inv_attrib [li ] .is_dname '",' Ls_SYNTAX = LS_DESTSYNTAX

Return LS_SYNTAX The return value of this function is the system of building completion.

Among them: the function of initialization: OF_INPUT (INV_ATTRIB [Li])

It is initialized, INV_ATTRIB function, initialized data is mainly the conditions that users need to enter, such as Type, Commit, Errors, SELECT statement of the pipe.

It is to be explained, and a function of several special cases is processed.

OF_FILTERIMG (LDS_VDTCOLUMNS):

Filter the image column in the table because the pipe does not support image data transmission.

OF_GETPIPEDBTYPE (IS_S_DBTYPE, LS_TYPES):

The type of data column in the pipe is obtained according to the type in the table, because they don't always correspond to one.

This can be implemented by an Extend DataWindowObject and contains initial data.

OF_GETPBDTTTYPE (IS_S_DBTYPE, LS_TYPES): Depending on the type of the column in the table, you get the type of column in the pipe, because they don't always correspond to one. This can be implemented by an Extend DataWindowObject and contains initial data.

The pipeline syntax is completed, and the pipeline is executed:

this.syntax = get the syntax

li_RC = this.Start (SrcSqlca, DestSqlca, idw_Errors) If li_RC <> 1 Then if not ib_silence then msg (ls_title, "object transmission failed:" string (li_rc)) of_addtransmsg ( 'an object <' is_currentobj '> transmission failure : ' String (li_rc)) Return Li_RC Rollback; Else Commit; END IF

转载请注明原文地址:https://www.9cbs.com/read-94916.html

New Post(0)