Transactional  to  Business Intelligence

Introduction

All developers of Transactional(OLTP) systems have had to carry out certain processes for the feed of Business Intelligence(BI) databases. This type of process can be more or less complicated depending on the manager of the OLTP system and the manager of the business analysis system. Let me explain: Normally, the necessary information in both types of system is not usually the same, so, either the OLTP team or the business intelligence team, they must analyze, select and transfer the information available from one system to another. And depending on the managers, this task will be done by one or other equipment.

In the event that this task must be carried out by the OLTP team, normally, there is usually a additional requirement: efficiency. This is usually due to the fact that OLTP systems are usually very busy during the day with online work, and at night with data uploads and reporting varied. Thus, the time window given for this type of process is usually very small, so it is necessary to optimizing the entire transfer process as much as possible.

Here we will describe a work methodology using the Insert Bulk Bestia libraries and stored procedures in such a way that the analysis, selection and transmission of information from one system to another is minimized.

The code shown in this chapter is presented as an example of the use of the SQLServerAsyncExecProcedures library included in Insert Bulk Bestia. InforCustom is not responsible for the use of such code.

Problem Statement

You want to perform a daily process to send certain information from one OLTP system to another system for BI (both are on different hosts). Said information must be marked indicating for each record if it is new, has been modified or deleted since the last time the process was executed.

To do this, a series of repository tables will be created in both systems, and in BI a table will be created, "ACTUALIZACIONES", where each execution of the process will be recorded. BI will check this table and when a new record appears, it must process the transferred information.

Note: All created objects will be made in the "rca" scheme.

It is also based on the hypothesis that Insert Bulk Bestia libraries are installed in the OLTP and not in the BI system.

Repository in OLTP

In the database of the OLTP system, a repository of tables will be created with the information to be transmitted, and associated with each table there will be another table where a couple of HASHs of the information will be stored. These two HASHs will facilitate and speed up the process of comparing the information with respect to whether it were done with queries that compared field to field, together with the queries necessary to find out which are the new and deleted records.

In addition to the repository, a Sequence used to name the successive executions and a HASH calculation function will be included in the code for those versions of SQL Server (2014 and lower) whose HASHBYTES function is limited to 4000 characters (NVARCHAR) . Let's take a look at it in code (for brevity, only a couple of example tables will be included):


IF EXISTS(select NULL from sys.sequences where object_id = object_id('rca.SEQ_PARALELO'))
  DROP SEQUENCE rca.SEQ_PARALELO
GO
CREATE SEQUENCE rca.SEQ_PARALELO
 AS [bigint]
 START WITH 1
 INCREMENT BY 1
 MINVALUE 1
 MAXVALUE 9223372036854775807
 CACHE 10
GO

--SQL Server 2014 y versiones anteriores:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.fnHashBytes') IS NOT NULL
  DROP FUNCTION rca.fnHashBytes
GO
CREATE FUNCTION rca.fnHashBytes (
  @EncryptAlgorithm NVARCHAR(50),
  @DataToEncrypt nvarchar(max))
RETURNS VARBINARY(MAX) WITH SCHEMABINDING
AS BEGIN
  DECLARE @Index INT = 1, @DataToEncryptLength INT
  DECLARE @EncryptedResult varbinary(max) = NULL

  IF @DataToEncrypt IS NOT NULL
  BEGIN
    SELECT @EncryptedResult = 0x, @DataToEncryptLength = DATALENGTH(@DataToEncrypt)
    WHILE @Index <= @DataToEncryptLength
    BEGIN
      SELECT @EncryptedResult = @EncryptedResult + HASHBYTES(@EncryptAlgorithm, SUBSTRING(@DataToEncrypt, @Index, 4000)), @Index = @Index + 4000
    END
  END
  RETURN @EncryptedResult
END
GO

As can be seen, the "rca" schema is used to define the objects used for the repository.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1') IS NOT NULL
  DROP TABLE rca.Tabla1
GO
CREATE TABLE rca.Tabla1(
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,
    Campo3 varchar(55) NOT NULL,

    ACT_CD_TIPO smallint NOT NULL,

  CONSTRAINT Tabla1_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     Campo2 ASC,
     ACT_CD_TIPO ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1_HASH') IS NOT NULL
  DROP TABLE rca.Tabla1_HASH
GO
CREATE TABLE rca.Tabla1_HASH (
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,

    HASH_OLD varbinary(max) NULL,
    HASH_NEW varbinary(max) NULL,

 CONSTRAINT Tabla1_HASH_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     Campo2 ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2') IS NOT NULL
  DROP TABLE rca.Tabla2
GO
CREATE TABLE rca.Tabla2(
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,
    Campo3 varchar(55) NOT NULL,

    ACT_CD_TIPO smallint NOT NULL,

  CONSTRAINT Tabla2_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     ACT_CD_TIPO ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2_HASH') IS NOT NULL
  DROP TABLE rca.Tabla2_HASH
GO
CREATE TABLE rca.Tabla2_HASH (
    Campo1 varchar(16) NOT NULL,

    HASH_OLD varbinary(max) NULL,
    HASH_NEW varbinary(max) NULL,

 CONSTRAINT Tabla2_HASH_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

It is assumed that both "Table1" and "Table2" only contain the fields to be transferred. The information will come from the tables "Table1Org" and "Table2Org" respectively, which will contain much more information, but which is not necessary for BI. "Table1" has a primary key that is made up of two fields and "Table2" has a single field.

Both tables contain the ACT_CD_TIPO field, which is part of the primary key. During information processing, records will be duplicated, and this field will be used to distinguish new records from old ones.

The HASH_OLD field will contain the HASH of the record of the last execution of the process, while the and HASH_NEW field will contain the HASH of the record of the current execution. If HASH_OLD is NULL, it means that the record is new, if HAS_NEW is NULL, it means that the record has been removed from the original table. And if both fields are not NULL and do not match, it means that the record has been modified.

Stored Procedures in OLTP

In the OLTP, a series of stored procedures will be created to update the repository tables that have been created.

It should be noted that no Foreing Key has been created in the repository tables. However, if there were any, before updating the tables they would have to be removed. The update of the repository tables will be done in parallel, so there will be no pre-established order. Thus, first of all, two stored procedures will be created to eliminate and recreate these Foreing Keys. Here is an approach to these stored procedures:

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.EliminaFkLocales') IS NOT NULL
  DROP PROCEDURE rca.EliminaFkLocales
GO
CREATE PROCEDURE rca.EliminaFkLocales
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Eliminar todas las FKs de las tablas del repositorio para su carga
    IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'Tabla1_Tabla2_FK')
      ALTER TABLE rca.Tabla1 DROP CONSTRAINT Tabla1_Tabla2_FK

    EXECUTE ibb.Log_Errores_Inserta N'Elimnadas Foreing Keys Locales.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento

  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR eliminando las Foreing Keys Locales.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.CreaFkLocales') IS NOT NULL
  DROP PROCEDURE rca.CreaFkLocales
GO
CREATE PROCEDURE rca.CreaFkLocales
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Crear todas las FKs necesarias
    --En nuestro ejemplo, no hay FKs entre tablas del repositorio
    --IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'Tabla1_Tabla2_FK')
    --  ALTER TABLE rca.Tabla1 ADD CONSTRAINT Tabla1_Tabla2_FK FOREIGN KEY(Campo1)
    --    REFERENCES rca.Tabla2 (Campo1)

    EXECUTE ibb.Log_Errores_Inserta N'Creadas Foreing Keys Locales.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento

  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR creando las Foreing Keys Locales.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

This is followed by the stored procedures responsible for updating the repository tables. The ones shown here are in charge of updating the contents of the tables "rca.Tabla1", "rca.Tabla1_HASH", "rca.Tabla2" and "rca.Tabla2_HASH". The code comments are descriptive enough.

These stored procedures will run in parallel. Therefore, the following instructions are important:

  • It is desired that, if an error occurs in them, it does not continue the execution (SET XACT_ABORT ON).
  • The goal of parallel execution is to make the most of all available CPUs. Therefore, you do not want Querys to be executed in parallel (OPTION (MAXDOP 1)). MERGE is usually a candidate statement to be executed in parallel, so it is very important to tell it not to do it.

Algorithm:

  • Update ACT_CD_TIPO field to 2 for all records in repository table (Tabla1).
  • Insert all the records from the original OLTP table (Table1Org) in the repository table (Table1). The ACT_CD_TIPO field is set to 0 to indicate that they are the inserted records.
    This Query is where the information from the OLTP is selected and transformed appropriately to meet BI needs.
  • Delete old records:
    • Table1 contains the records of the previous execution marked with ACT_CD_TIPO at 2 and ALL the OLTP records marked with ACT_CD_TIPO at 0.
    • The DELETE will delete the records that already existed in Table1 and that continue to exist in the OLTP. For this, the key from Table1 is used.
    • In this way, the records with ACT_CD_TIPO = 2 will be the records that have disappeared from the OLTP, and the records with ACT_CD_TIPO = 0 will contain the existing information in the OLTP.
  • Table1_HASH update:
    • The HASH of each record is calculated and compared with the HASH contained in the HASH_NEW field.
    • If the record exists in both tables (Table1 and Table1_HASH), but the calculated HASH does not match the HASH_NEW, the HASH_NEW field is updated with the calculated HASH. Those will be the records whose information has been modified in the OLTP. For these records, HASH_OLD and HASH_NEW will not match. We will see why later.
    • If the record does not exist in Table1_HASH, it is inserted into that table. It will be the new records. These records will have the HAS_OLD field set to NULL.
    • If the record does not exist in Table1, in Table1_HASH the HASH_NEW field is updated to NULL. These records will be deleted and the HASH_OLD field will not be NULL.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTabla1') IS NOT NULL
  DROP PROCEDURE rca.RellenaTabla1
GO
CREATE PROCEDURE rca.RellenaTabla1
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Este procedimiento almacenado se ejecutará en paralelo, por lo que se desea que si se produce un error no coninue con la ejecución.
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY
    --Marcar todos los registros como 2
    UPDATE rca.Tabla1
      SET ACT_CD_TIPO = 2
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Insertar Todos los registros procedentes de la Tabla Original marcados como 0
    INSERT INTO rca.Tabla1
      SELECT Tb1Org.Campo1, Tb1Org.Campo2, Tb1Org.Campo3, 0 AS ACT_CD_TIPO
        FROM dbo.Tabla1Org Tb1Org
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Eliminar los registros viejos:
    --  Si registro1 existe en Tabla1 y TablaOrg1, después del INSERT anterior Tabla1
    --  contiene: registro1-2 (del UPDATE) y registro1-0 (del INSERT). Con este DELETE
    --  se eliminará registro1-2, quedando el nuevo registro.
    DELETE Tb1Del
      FROM rca.Tabla1 Tb1Del
        INNER JOIN rca.Tabla1 Tb1Ins
          ON Tb1Del.Campo1 = Tb1Ins.Campo1 AND Tb1Del.Campo2 = Tb1Ins.Campo2
      WHERE Tb1Del.ACT_CD_TIPO = 2 AND Tb1Ins.ACT_CD_TIPO = 0
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Actualización de Tabla1_HASH
    --  Se calcula el HASH de cada registro y se compara con el campo HASH_NEW
    --    Si el registro existe en ambas tablas, pero el HASH no coincide, se actualiza el campo HASH_NEW
    --    Si el registro no existe Tabla1_HASH, se inserta
    --    Si el registro no existe Tabla1, se anula el campo HAS_NEW
    MERGE rca.Tabla1_HASH TablaHash USING
          (SELECT MBT.Campo1, MBT.Campo2,
                  --SQL Server 2014 y versiones anteriores:
                  --  Aquí se está utilizando la función de SQL Server HASHBYTES, porque se supone que
                  --  la cadena de caracteres que se va a formar no superará los 4000 caracteres. Si se
                  --  superase dicha longitud se debería usar la función rca.fnHashBytes
                    HASHBYTES('SHA2_512', (select MBT.* from (values(null))foo(bar) for xml auto)) as HASH_NEW
            FROM rca.Tabla1 AS MBT
            WHERE MBT.ACT_CD_TIPO = 0
          ) AS Hases_Nuevos
            ON TablaHash.Campo1 = Hases_Nuevos.Campo1 AND TablaHash.Campo2 = Hases_Nuevos.Campo2
      WHEN MATCHED AND TablaHash.HASH_NEW <> Hases_Nuevos.HASH_NEW THEN --Modificados
        UPDATE SET TablaHash.HASH_NEW = Hases_Nuevos.HASH_NEW
      WHEN NOT MATCHED BY TARGET THEN --Nuevos
        INSERT(Campo1, Campo2, HASH_NEW)
        VALUES(Hases_Nuevos.Campo1, Hases_Nuevos.Campo2, Hases_Nuevos.HASH_NEW)
      WHEN NOT MATCHED BY SOURCE THEN --Borrados
        UPDATE SET HASH_NEW = NULL
      OPTION(MAXDOP 1); --Como se ejecutará en paralelo, no hemos de bloquear las CPUs
      --MERGE es una instrucción candidata a ser paralelizada por SQL Server. Como ya se hacen muchas cosas
      --en paralelo, se le indica que no la paralelice para no bloquear las CPUs

    --Puede ser conveniente actualizar los índices y las estadísticas por si hay muchas modificaciones
    ALTER INDEX ALL ON rca.Tabla1_HASH REBUILD WITH (MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla1_HASH WITH COLUMNS
    ALTER INDEX ALL ON rca.Tabla1 REBUILD WITH (MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla1 WITH COLUMNS

    EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado Tabla1 y Tabla1_HASH.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento

  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando Tabla1 y Tabla1_HASH.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTabla2') IS NOT NULL
  DROP PROCEDURE rca.RellenaTabla2
GO
CREATE PROCEDURE rca.RellenaTabla2
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Este procedimiento almacenado se ejecutará en paralelo, por lo que se desea que si se produce un error no coninue con la ejecución.
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY
    --Marcar todos los registros como 2
    UPDATE rca.Tabla2
      SET ACT_CD_TIPO = 2
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Insertar Todos los registros procedentes de la Tabla Original marcados como 0
    INSERT INTO rca.Tabla2
      SELECT Tb2Org.Campo1, Tb2Org.Campo2, Tb2Org.Campo3, 0 AS ACT_CD_TIPO
        FROM dbo.Tabla2Org Tb2Org
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Eliminar los registros viejos:
    --  Si registro1 existe en Tabla2 y Tabla2Org, después del INSERT anterior Tabla1
    --  contiene: registro1-2 (del UPDATE) y registro1-0 (del INSERT). Con este DELETE
    --  se eliminará registro1-2, quedando el nuevo registro.
    DELETE Tb2Del
      FROM rca.Tabla2 Tb2Del
        INNER JOIN rca.Tabla2 Tb2Ins
          ON Tb2Del.Campo1 = Tb2Ins.Campo1
      WHERE Tb2Del.ACT_CD_TIPO = 2 AND Tb2Ins.ACT_CD_TIPO = 0
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Actualización de Tabla2_HASH
    --  Se calcula el HASH de cada registro y se compara con el campo HASH_NEW
    --    Si el registro existe en ambas tablas, pero el HASH no coincide, se actualiza el campo HASH_NEW
    --    Si el registro no existe Tabla1_HASH, se inserta
    --    Si el registro no existe Tabla1, se anula el campo HAS_NEW
    MERGE rca.Tabla2_HASH TablaHash USING
          (SELECT MBT.Campo1,
                  --SQL Server 2014 y versiones anteriores:
                  --  Aquí se está utilizando la función de SQL Server HASHBYTES, porque se supone que
                  --  la cadena de caracteres que se va a formar no superará los 4000 caracteres. Si se
                  --  superase dicha longitud se debería usar la función rca.fnHashBytes
                    HASHBYTES('SHA2_512', (select MBT.* from (values(null))foo(bar) for xml auto)) as HASH_NEW
            FROM rca.Tabla2 AS MBT
            WHERE MBT.ACT_CD_TIPO = 0
          ) AS Hases_Nuevos
            ON TablaHash.Campo1 = Hases_Nuevos.Campo1
      WHEN MATCHED AND TablaHash.HASH_NEW <> Hases_Nuevos.HASH_NEW THEN --Modificados
        UPDATE SET TablaHash.HASH_NEW = Hases_Nuevos.HASH_NEW
      WHEN NOT MATCHED BY TARGET THEN --Nuevos
        INSERT(Campo1, HASH_NEW)
        VALUES(Hases_Nuevos.Campo1, Hases_Nuevos.HASH_NEW)
      WHEN NOT MATCHED BY SOURCE THEN --Borrados
        UPDATE SET HASH_NEW = NULL
      OPTION(MAXDOP 1); --Como se ejecutará en paralelo, no hemos de bloquear las CPUs
      --MERGE es una instrucción candidata a ser paralelizada por SQL Server. Como ya se hacen muchas cosas
      --en paralelo, se le indica que no la paralelice para no bloquear las CPUs

    --Puede ser conveniente actualizar los índices y las estadísticas por si hay muchas modificaciones
    ALTER INDEX ALL ON rca.Tabla2_HASH REBUILD WITH (MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla2_HASH WITH COLUMNS
    ALTER INDEX ALL ON rca.Tabla2 REBUILD WITH (MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla2 WITH COLUMNS

    EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado Tabla2 y Tabla2_HASH.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento

  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando Tabla2 y Tabla2_HASH.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Repository in BI

Just as a series of tables have been created in the OLTP, their replicas will be created in BI according to the requirements outlined at the beginning.

The replicas of "Table1" and "Table2" are shown below. A new field has been added, ACT_CD_ACTUALIZACION, which corresponds to the key of the table rca.UPDATES. Both tables contain the ACT_CD_TIPO field that will indicate what has happened to the record since the last time the process was run. It will contain the values: 0-Insert, 1-Update, 2-Delete.

The rca.UPDATES table will contain a record for each time the data transfer process has been attempted. If the process was successful, the ACT_FH_FIN field will have a value, and if there have been errors, this field will contain a NULL.

As in OLTP, Foreing Keys have not been defined between the repository tables, however, the FKs that relate these tables to the rca table have been defined. UPDATES.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1') IS NOT NULL
  DROP TABLE rca.Tabla1
GO
CREATE TABLE rca.Tabla1(
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,
    Campo3 varchar(55) NOT NULL,

    ACT_CD_TIPO smallint NOT NULL,
    ACT_CD_ACTUALIZACION bigint NOT NULL,

   CONSTRAINT Tabla1_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     Campo2 ASC,
     ACT_CD_ACTUALIZACION ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2') IS NOT NULL
  DROP TABLE rca.Tabla2
GO
CREATE TABLE rca.Tabla2(
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,
    Campo3 varchar(55) NOT NULL,

    ACT_CD_TIPO smallint NOT NULL,
    ACT_CD_ACTUALIZACION bigint NOT NULL,

   CONSTRAINT Tabla2_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     ACT_CD_ACTUALIZACION ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ACTUALIZACIONES') IS NOT NULL
  DROP TABLE rca.ACTUALIZACIONES
GO
CREATE TABLE rca.ACTUALIZACIONES(
  ACT_CD_ACTUALIZACION bigint NOT NULL,
  ACT_FH_INICIO datetime2(0) NOT NULL,
  ACT_FH_FIN datetime2(0) NULL,
  ACT_DURACION nvarchar(50) NULL,

  CONSTRAINT ACTUALIZACIONES_PK PRIMARY KEY CLUSTERED
    (
        ACT_CD_ACTUALIZACION ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

CREATE NONCLUSTERED INDEX RCA_ACTUALIZACIONES_FIN_NULO_IDX ON rca.ACTUALIZACIONES
(
  ACT_FH_FIN ASC
)
INCLUDE(ACT_CD_ACTUALIZACION)
WITH (PAD_INDEX = ON, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 100) ON [PRIMARY]
GO


ALTER TABLE rca.Tabla1 ADD CONSTRAINT ACTUALIZACIONES_Tabla1_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
  REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE CASCADE
GO
ALTER TABLE rca.Tabla2 ADD CONSTRAINT ACTUALIZACIONES_Tabla2_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
  REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE CASCADE
GO

Stored Procedures in BI

It will also be necessary to create certain stored procedures on the BI side:

  • Cleaning of the repository tables and of the rca.UPDATES table of possible previous executions that did not finish correctly: (ACT_FH_FIN IS NULL, or ACT_CD_ACTUALIZACION does not exist in rca.UPDATES). It also inserts a new record for the current execution in rca.UPDATES.
  • A stored procedure is also created to update the end date of the process, also indicating that the process has run successfully.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.LimpiaTablasRemotas') IS NOT NULL
  DROP PROCEDURE rca.LimpiaTablasRemotas
GO
CREATE PROCEDURE rca.LimpiaTablasRemotas
  @fechaInicioProceso NVARCHAR(50),
  @IdParalelo BIGINT,
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON

  SET @error = 0
  BEGIN TRY

    --Eliminar de Tabla1 y Tabla2 aquellos registros que se hayan podido insertar de un proceso anterior que haya
    --podido fallar (ACT_FH_FIN IS NULL o bien ACT_CD_ACTUALIZACION no existe en rca.ACTUALIZACIONES)
    DELETE rca.Tabla1
      WHERE ACT_CD_ACTUALIZACION IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES WHERE ACT_FH_FIN IS NULL)
        OR ACT_CD_ACTUALIZACION NOT IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES)

    DELETE rca.Tabla2
      WHERE ACT_CD_ACTUALIZACION IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES WHERE ACT_FH_FIN IS NULL)
        OR ACT_CD_ACTUALIZACION NOT IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES)

    --Hay un DELETE CASCADE, pero por tema de prestaciones se realizan las querys anteriores.
    DELETE rca.ACTUALIZACIONES
      WHERE ACT_FH_FIN IS NULL

    --Se crea el nuevo registro para la ejecución en curso
    INSERT INTO rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION, ACT_FH_INICIO)
      SELECT @IdParalelo, TRY_CONVERT(datetime2(0), @fechaInicioProceso, 121)

    --Por prestaciones, puede ser conveniente la actualización de los índices y estadísticas.
    ALTER INDEX ALL ON rca.ACTUALIZACIONES REBUILD WITH (MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.ACTUALIZACIONES WITH COLUMNS
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.UpdateFinActualizacion') IS NOT NULL
  DROP PROCEDURE rca.UpdateFinActualizacion
GO
CREATE PROCEDURE rca.UpdateFinActualizacion
  @IdParalelo bigint,
  @Duracion nvarchar(50),
  @fFin nvarchar(50),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  SET @error = 0
  BEGIN TRY
    UPDATE rca.ACTUALIZACIONES
        SET ACT_FH_FIN = TRY_CONVERT(datetime2(0), @fFin, 121), ACT_DURACION = @Duracion
      WHERE ACT_CD_ACTUALIZACION = @IdParalelo
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

Second, the stored procedures to delete and create the Foreing Keys are created on the remote server. Even if FKs are not defined between the repository tables, FKs have been defined between these tables and the rca.ACTUALIZACIONES table.

A stored procedure has also been created for the regeneration of the indexes and statistics of the BI tables.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.EliminaFkRemotas') IS NOT NULL
  DROP PROCEDURE rca.EliminaFkRemotas
GO
CREATE PROCEDURE rca.EliminaFkRemotas
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  SET @error = 0
  BEGIN TRY
    --Eliminar todas las FKs de las tablas del repositorio para su carga
    IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'Tabla1_Tabla2_FK')
      ALTER TABLE rca.Tabla1 DROP CONSTRAINT Tabla1_Tabla2_FK

    --Eliminar todas las FKs contra la tabla de rca.ACTUALIZACIONES
    IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'ACTUALIZACIONES_Tabla1_FK')
      ALTER TABLE rca.Tabla1 DROP CONSTRAINT ACTUALIZACIONES_Tabla1_FK

    IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'ACTUALIZACIONES_Tabla2_FK')
      ALTER TABLE rca.Tabla2 DROP CONSTRAINT ACTUALIZACIONES_Tabla2_FK
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.CreaFkRemotas') IS NOT NULL
  DROP PROCEDURE rca.CreaFkRemotas
GO
CREATE PROCEDURE rca.CreaFkRemotas
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  SET @error = 0
  BEGIN TRY

    --Crear todas las FKs necesarias
    --En el ejemplo no hay ninguna FKs entre las tablas del repositorio
    --IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'Tabla1_Tabla2_FK')
    --ALTER TABLE rca.Tabla1 ADD CONSTRAINT Tabla1_Tabla2_FK FOREIGN KEY(Campo1)
    --  REFERENCES rca.Tabla2 (Campo1)

    --Crear todas las FKs contra la tabla de rca.ACTUALIZACIONES
    IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'ACTUALIZACIONES_Tabla1_FK')
      ALTER TABLE rca.Tabla1 ADD CONSTRAINT ACTUALIZACIONES_Tabla1_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
      REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE CASCADE

    IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'ACTUALIZACIONES_Tabla2_FK')
      ALTER TABLE rca.Tabla2 ADD CONSTRAINT ACTUALIZACIONES_Tabla2_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
      REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE CASCADE
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RegeneraIndicesRemotos') IS NOT NULL
  DROP PROCEDURE rca.RegeneraIndicesRemotos
GO
CREATE PROCEDURE rca.RegeneraIndicesRemotos
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  SET @error = 0
  BEGIN TRY
    --Si hay muchas actualizaciones en el repositorio, puede ser conveniente la actualización
    --de los índices y las estadísticas
    ALTER INDEX ALL ON rca.Tabla1 REBUILD WITH (MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla1 WITH COLUMNS

    ALTER INDEX ALL ON rca.Tabla2 REBUILD WITH (MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla2 WITH COLUMNS

    ALTER INDEX ALL ON rca.ACTUALIZACIONES REBUILD WITH (MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.ACTUALIZACIONES WITH COLUMNS
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

Stored Procedures for Transfer of Information

In the OLTP, the stored procedures for the transfer of information are created. It is assumed that you are using a Linked Server called RCA, and a destination database also called RCA. The target tables, as already mentioned, are in the rca schema.

Obviously, the use of a Linked Server is not the most appropriate for doing this kind of thing. However, if this is one of the requirements, one of the ways to speed up the process is to use the 'SQLServerAsyncExecProcedures' libraries so that the available resources are optimized to the maximum, and above all, the transmission of data through the network, as this is often the bottleneck.

It can be seen that with the algorithm used so far it is not necessary for the information to travel in both directions, that is, in the OLTP, the information to be sent to BI is known, and it is not necessary that the BI information must travel to OLTP to make comparisons. Obviously, this generates great efficiency, since the bottleneck is usually in the network.

Thus, the following are the stored procedures that pass the data for Table1 and Table2:

  • If there are transactions, they should promote to distributed transactions: SET REMOTE_PROC_TRANSACTIONS ON.
  • It is desired that, if an error occurs in them, the execution does not continue (SET XACT_ABORT ON).
  • In BI, new records (HASH_OLD IS NULL), modified and deleted records (HASH_NEW! = HASH_OLD) must be inserted.
  • The ACT_CD_TIPO field will contain the values: 0-Insert(HASH_OLD IS NULL), 1-Update(HASH_NEW != HASH_OLD), 2-Delete(HASH_NEW IS NULL).
  • The ACT_CD_ACTUALIZACION field will contain the appropriate value as will be seen later.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Traspasa_Tabla1') IS NOT NULL
  DROP PROCEDURE rca.Traspasa_Tabla1
GO
CREATE PROCEDURE rca.Traspasa_Tabla1
  @idParalelo BIGINT,
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Si hay transacciones, éstas deben poder promocionar
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY
    --Insert de los Nuevos (HASH_OLD Es nulo) y Modificados (HASH_NEW != HASH_OLD)
    INSERT INTO RCA.RCA.rca.Tabla1
      SELECT tb1.Campo1, tb1.Campo2, tb1.Campo3,
             IIF(ExpH.HASH_OLD IS NULL, 0, IIF(ExpH.HASH_NEW IS NULL, 2, 1)), @idParalelo
        FROM rca.Tabla1 tb1
          INNER JOIN rca.Tabla1_HASH ExpH
            ON ExpH.Campo1 = tb1.Campo1 AND ExpH.Campo2 = tb1.Campo2
        WHERE ExpH.HASH_OLD IS NULL OR ExpH.HASH_NEW != ExpH.HASH_OLD

    EXECUTE ibb.Log_Errores_Inserta N'Se ha trasvasado Tabla1.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR trasvasando Tabla1.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Traspasa_Tabla2') IS NOT NULL
  DROP PROCEDURE rca.Traspasa_Tabla2
GO
CREATE PROCEDURE rca.Traspasa_Tabla2
  @idParalelo BIGINT,
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Si hay transacciones, éstas deben poder promocionar
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY
    --Insert de los Nuevos (HASH_OLD Es nulo) y Modificados (HASH_NEW != HASH_OLD)
    INSERT INTO RCA.RCA.rca.Tabla2
      SELECT tb1.Campo1, tb1.Campo2, tb1.Campo3,
             IIF(ExpH.HASH_OLD IS NULL, 0, IIF(ExpH.HASH_NEW IS NULL, 2, 1)), @idParalelo
        FROM rca.Tabla2 tb1
          INNER JOIN rca.Tabla2_HASH ExpH
            ON ExpH.Campo1 = tb1.Campo1
        WHERE ExpH.HASH_OLD IS NULL OR ExpH.HASH_NEW != ExpH.HASH_OLD

    EXECUTE ibb.Log_Errores_Inserta N'Se ha trasvasado Tabla2.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR trasvasando Tabla2.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Stored Procedures for COMMIT

Until now, all stored procedures do not need transactions, since in case of error the algorithm recovers from errors. For example, if it fails to update the OLTP tables or the generation of the HASH_NEWs, in the next execution of the process it will recover from these errors. If the transmission of the tables from OLTP to BI fails, the stored procedure 'RCA.RCA.rca.LimpiaTablasRemotas' will retrieve the remote tables.

However, the stored procedures described here are the ones that actually COMMIT the process, that is, they are stored procedures that need to be done within a transaction, since in case of failure, the process will not be able to recover. Transactions can be of any isolation level, since each stored procedure does not need to "see" the data for the other stored procedures.

The stored procedures 'rca.UpdateHash_Tabla1' and 'rca.UpdateHash_Tabla2' prepare the Table1_HASH and Table2_HASH tables for the next process execution:

  • They remove from Table1_HASH the records that have disappeared in the OLTP (HASH_NEW IS NULL).
  • They remove from Table1 those records that do not exist in Table1_HASH.
  • Updates the HASH_OLD field in Table1_HASH with the content of HASH_NEW.
If these procedures fail, it will not be possible to distinguish which record has been processed correctly from which has not. The transactions that can be defined for these stored procedures will be local to the instance where the OLTP is located.

In this critical step, the remote procedure 'RCA.RCA.rca.UpdateFinActualizacion' must also be called, which updates the field 'ACT_FH_FIN' indicating that the process has been executed correctly. If this method fails, the process will also not be recoverable on the next run. In this other case, the transaction that is associated with this stored procedure will be distributed, so MS DTC will have to be enabled on both servers.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ActualizaHash_Tabla1') IS NOT NULL
  DROP PROCEDURE rca.ActualizaHash_Tabla1
GO
CREATE PROCEDURE rca.ActualizaHash_Tabla1
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Eliminar de Tabla1_HASH aquellos registros cuyo HAS_NEW es nulo (eliminados)
    DELETE rca.Tabla1_HASH
      WHERE HASH_NEW IS NULL

    --Eliminar de Tabla1 aquellos registros que no existan en Tabla1_HASH
    DELETE Tbl
      FROM rca.Tabla1 Tbl
        LEFT JOIN rca.Tabla1_HASH TblHs
          ON Tbl.Campo1 = TblHs.Campo1 AND Tbl.Campo2 = TblHs.Campo2
      WHERE TblHs.Campo1 IS NULL AND TblHs.Campo2 IS NULL

    --Actualizar los HASH_OLD con los HAS_NEW
    UPDATE rca.Tabla1_HASH
      SET HASH_OLD = HASH_NEW

    EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado Tabla1_HASH.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando Tabla1_HASH.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ActualizaHash_Tabla2') IS NOT NULL
  DROP PROCEDURE rca.ActualizaHash_Tabla2
GO
CREATE PROCEDURE rca.ActualizaHash_Tabla2
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Eliminar de Tabla1_HASH aquellos registros cuyo HAS_NEW es nulo (eliminados)
    DELETE rca.Tabla2_HASH
      WHERE HASH_NEW IS NULL

    --Eliminar de Tabla1 aquellos registros que no existan en Tabla1_HASH
    DELETE Tbl
      FROM rca.Tabla2 Tbl
        LEFT JOIN rca.Tabla2_HASH TblHs
          ON Tbl.Campo1 = TblHs.Campo1
      WHERE TblHs.Campo1 IS NULL

    --Actualizar los HASH_OLD con los HAS_NEW
    UPDATE rca.Tabla2_HASH
      SET HASH_OLD = HASH_NEW

    EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado Tabla2_HASH.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando Tabla2_HASH.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

For this critical point there are two possibilities:

  • The first is to execute in parallel each of these stored procedures with independent transactions. If all procedures run without errors, all transactions will be committed. If any fail, a rollback will be made. It should be noted that the transaction required to execute 'RCA.RCA.rca.UpdateFinActualizacion' must be a distributed transaction, so MS DTC needs to be enabled on both servers. Experience says that this possibility has a high probability that transactions fail, not stored procedures, but the transactions themselves.
  • The second is to create a new stored procedure that calls all of these update HASH stored procedures, along with the stored procedure 'RCA.RCA.rca.UpdateFinUpdate' and put it in a transaction. The transaction is obviously going to promote to a distributed transaction, so you need to have MS DTC enabled on both servers.

Since a demonstration example is being presented here, the code for both solutions will be shown. Based on trial and error tests, you will decide on one or the other.

If the second possibility is chosen, the following stored procedure will have to be created:

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Actualiza_Hash_Todas_Tablas_Secuencial') IS NOT NULL
  DROP PROCEDURE rca.Actualiza_Hash_Todas_Tablas_Secuencial
GO
CREATE PROCEDURE rca.Actualiza_Hash_Todas_Tablas_Secuencial
  @NombreParalelo NVARCHAR(256),
  @IdParalelo bigint,
  @inicioProceso DATETIME2(7),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
  DECLARE @Duracion nvarchar(50), @fFin DATETIME2(7)

  SET @error = 0
  BEGIN TRY

    EXEC rca.ActualizaHash_Tabla1 @NombreParalelo, @error out
    IF @error = 0
      EXEC rca.ActualizaHash_Tabla2 @NombreParalelo, @error out
    --Y así con el resto de tablas

    --Por último, actualizar la fecha de finalización del proceso
    IF @error = 0
    BEGIN
      SELECT @fFin = SYSDATETIME(), @Duracion = ibb.TimeSpanToNow(@inicioProceso)
      EXEC RCA.RCA.rca.UpdateFinActualizacion @IdParalelo, @Duracion, @fFin, @error out
    END

    IF @error = 0
      EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado el HASH de todas las Tablas.',
        0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
    ELSE
      EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando el HASH de todas las Tablas.',
        10, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando el HASH de todas las Tablas.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Parallel Creation and Execution

Finally, the stored procedure that creates the 'Parallel' and executes it is detailed.

  • If there are transactions, they should promote to distributed transactions: SET REMOTE_PROC_TRANSACTIONS ON.
  • It is desired that, if an error occurs in them, it does not continue the execution (SET XACT_ABORT ON).
  • If there are open transactions in the call to this remote procedure, the execution is aborted.
  • In order to identify each execution by a different name, this name will be created from a sequence. This value will also be used to identify a new execution of the process in BI (ACT_CD_ACTUALIZACION).
  • The Parallel will be made up of 3 Lots of Works:

    • 1º Updating of the OLTP repository tables. No transactions.
      Initial jobs executed sequentially:
      • Create the new record in rca.UPDATES and delete data from past failed runs.
      • If there are FKs in the OLTP repository: Delete them.
      • Delete the FKs from the BI repository.
      Jobs executed in parallel:
      • Update the tables of the OLTP repository in parallel.
        Note: the regeneration of indexes and statistics is included in the stored procedures, if not, a stored procedure would have to be made for it and executed as Final jobs.
      Final jobs to be executed sequentially:
      • If there are FKs in the OLTP repository: Create them

    • 2º Transfer of Information from the OLTP repository to the BI repository. No transactions.
      Jobs executed in parallel:
      • Transfer the information from the OLTP repository to BI in parallel.
      Final jobs to be executed sequentially:
      • Create the FKs in BI.
      • Regenerate the indexes in BI.

    • 3º Update HAHS_OLD by HASH_NEW and put the end of process indicator. With transactions.

      FIRST OPTION: Execute in parallel and with transactions the update of the HASH and the End Date

      Jobs executed in parallel:
      • Update the HASHs of the tables in the OLTP repository.
      • Put the end date in the BI repository.
      Execution options for parallel:
      • Three batches of jobs have been created, and after experimentation several times the optimal degrees of parallelism are obtained. The code indicates that for the first batch the degree of parallelism is 6 (6 simultaneous processes), for the second 20 (transmission of information) and for the third 15 (if the FIRST OPTION is chosen)
      • It is indicated that the transactions to be created will be of type 'ReadCommitted' and that the maximum time that a transaction will last open will be 5 minutes. This value should be such that it is the total duration of the 3rd lot, since it is the only one that has transactions.
      Run parallel

      SECOND OPTION: Execute the stored procedure "rca.Update_Hash_All_Tables_Sequential" within a transaction

      Execution options for parallel:
      • Two batches of jobs have been created, and after experimentation several times the optimal degrees of parallelism are obtained. The code indicates that for the first batch the degree of parallelism is 6 (6 simultaneous processes), and for the second, 20 (transmission of information). There is no third batch.
      • It is indicated that the transactions to be created will be of type 'ReadCommitted' and that the maximum time that a transaction will last open will be 5 minutes. However, this setting has no effect, since the third batch of jobs has not been created and the rest of the batches have no transactions.
      Run parallel
      If the parallel was successful:
      • Open a transaction
      • Execute the stored procedure created expressly for the SECOND OPTION.
      • If it has been executed successfully, end the transaction, otherwise, undo it.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTablasParalelo') IS NOT NULL
  DROP PROCEDURE rca.RellenaTablasParalelo
GO
CREATE PROCEDURE rca.RellenaTablasParalelo
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--SET IMPLICIT_TRANSACTIONS OFF
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  DECLARE @NumTranAntes INT = @@TRANCOUNT
  DECLARE @idParalelo bigint, @NombreParalelo varchar(256) = NULL, @Parametros NVARCHAR(MAX) = NULL, @Trabajo NVARCHAR(MAX) = NULL
  DECLARE @Duracion nvarchar(50), @fFin DATETIME2(7)

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Si hay transacciones abiertas, el sistema no funcionará
    IF @NumTranAntes > 0
    BEGIN
      SET @error = 1
      EXECUTE ibb.Log_Errores_Inserta N'El trasvase de tablas no puede funcionar con una transacción abierta.',
        0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
      RETURN
    END

    --Con el fin de identificar cada ejecución por un nombre distinto, se va a crear dicho nombre a partir de una secuencia:
    SELECT @idParalelo = NEXT VALUE FOR rca.SEQ_PARALELO, @NombreParalelo = CONCAT('ParaleloRepositorio', @idParalelo)

    --El Paralelo se va a componer de 3 Lotes de Trabajos:
    --  1º Actualización de las tablas del repositorio local. Sin transacciones
    --  2º Trasvase de Información desde el repositorio local al remoto. Sin transacciones
    --  3º Actualización de HAHS_OLD por HASH_NEW y poner indicador de fin del trabajo. Con transacciones

    --Lote de Trabajos 1º:
    --  1º Crear el nuevo registro en rca.ACTUALIZACIONES y eliminar datos de ejecuciones pasadas fallidas
    --  2º Si hay FKs locales en el respositorio: Eliminarlas.
    --  3º Eliminar las FKs remotas.
    SET @Trabajo = CONCAT(N'EXEC SGR.rca.LimpiaTablasRemotas N''', TRY_CONVERT(nvarchar, @inicio, 121), N''', ', @idParalelo, N', @error out')
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Iniciales', N'Limpia Tablas Remotas', 0, @Trabajo
    SELECT @Parametros = CONCAT(N'N''', @NombreParalelo, N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.EliminaFkLocales ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Iniciales', N'Elimina FKs Locales', 0, @Trabajo
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Iniciales', N'Elimina FKs Remotas', 0, N'EXEC SGR.rca.EliminaFkRemotas @error out'

    --  4º Actualizar las tablas del repositorio local en paralelo.
    --    Nota: en los procedimientos almacenados va incluido la regeneración de índices y estadísticas, si no, habría
    --          que hacer un procedimiento almacenado para ello y ejecutarlo como trabajos Finales
    SET @Trabajo = CONCAT(N'EXEC rca.RellenaTabla1 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Paralelo', N'Carga Tabla1', 0, @Trabajo
    SET @Trabajo = CONCAT(N'EXEC rca.RellenaTabla2 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Paralelo', N'Carga Tabla2', 0, @Trabajo

    --  5º Si hay FKs locales en el repositorio: Crearlas
    SET @Trabajo = CONCAT(N'EXEC rca.CreaFkLocales ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Finales', N'Crea FKs Locales', 0, @Trabajo

    --Lote de Trabajos 2º:
    --  1º Traspasar la información desde el local al remoto en paralelo
    --  2º Crear las FKs remotas
    --  3º Regenerar los índices remotos
    SELECT @Parametros = CONCAT(@idParalelo, N', N''', @NombreParalelo, N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.Traspasa_Tabla1 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2, N'Paralelo', N'Traspasa Tabla1', 0, @Trabajo
    SELECT @Trabajo = CONCAT(N'EXEC rca.Traspasa_Tabla2 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2, N'Paralelo', N'Traspasa Tabla2', 0, @Trabajo

    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2, N'Finales', N'Crea FKs Remotas', 0, N'EXEC SGR.rca.CreaFkRemotas @error out'
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2, N'Finales', N'Regenera Índices Remotos', 0, N'EXEC SGR.rca.RegeneraIndicesRemotos @error out'

    --PRIMERA OPCIÓN: Ejecutar en paralelo y con transacciones la actualización de los HASH y la Fecha de Finalización
    --Lote de trabajos 3º, con transacciones:
    --  1º Actualizar los HASHs de las tablas
    --  2º Poner la fecha de finalización en el remoto
    SELECT @Parametros = CONCAT(N'''', @NombreParalelo, N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.ActualizaHash_Tabla1 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3, N'Paralelo', N'ActualizaHash Tabla1', 1, @Trabajo
    SELECT @Trabajo = CONCAT(N'EXEC rca.ActualizaHash_Tabla2 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3, N'Paralelo', N'ActualizaHash Tabla2', 1, @Trabajo
    SELECT @Duracion = ibb.TimeSpanToNow(@inicio), @Trabajo = CONCAT(N'EXEC SGR.rca.UpdateFinActualizacion ', @idParalelo,
      N', ''', @Duracion, N''', ''', TRY_CONVERT(nvarchar, SYSDATETIME(), 121), N''', @error out')
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3, N'Paralelo', N'Actualiza Fecha Fin', 1, @Trabajo

    --Opciones para la ejecución de los Trabajos
    --  Se han creado tres lotes de trabajos, y después de experimentar varias veces comprobamos que los siguientes grados
    --  de paralelismo son los óptimos
    EXEC ibbclr.Parallel_SetOption_MaxTaskBatch @NombreParalelo, 1, 6
    EXEC ibbclr.Parallel_SetOption_MaxTaskBatch @NombreParalelo, 2, 20
    EXEC ibbclr.Parallel_SetOption_MaxTaskBatch @NombreParalelo, 3, 15
    --  Se indica que las transacciones a crear serán de tipo 'ReadCommitted' y que el tiempo máximo que durará abierta una
    --  Transacción será de 5 minutos. Este valor deberá ser tal que sea el tiempo total de duración del 3º lote, ya que es el
    --  único que tiene transacciones
    EXEC ibbclr.Parallel_SetOptions @NombreParalelo, N'ReadCommitted', 5

    --Ejecución del Paralelo
    EXEC @error = ibbclr.Parallel_Execute @NombreParalelo

    ----SEGUNDA OPCION: Ejecutar el procedimiento almacenado rca.Actualiza_Hash_Todas_Tablas_Secuencial dentro de una transacción
    --BEGIN TRAN
    --EXEC rca.Actualiza_Hash_Todas_Tablas_Secuencial @NombreParalelo, @idParalelo, @inicio, @error out
    --IF @error = 0
    --  COMMIT
    --ELSE
    --  ROLLBACK

    IF @error = 0
      EXECUTE ibb.Log_Errores_Inserta N'Se ha ejecutado el Paralelo.',
        0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
    ELSE
      EXECUTE ibb.Log_Errores_Inserta N'ERROR ejecutando el Paralelo.',
        10, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR ejecutando el Paralelo.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Once everything is programmed, the execution is quite simple:

DECLARE @error int
EXEC rca.RellenaTablasParalelo @error out

In the table 'ibb.Log_Errores' the LOG messages will appear together with the ERROR messages, if any. The messages are quite long, and contain line breaks, so it is recommended to activate the option to keep CR/LF of SSMS, to be able to copy and paste them where they can be read in full.

It is not within the scope of this chapter, but the algorithm described here can be added functionality for disaster recovery, that is, retrieving the OLTP repository tables from the BI repository tables, and vice versa.