Transaccional  hacia  Inteligencia de Negocio

Introducción

Todos los desarrolladores de sistemas Transaccionales(OLTP) han tenido que realizar determinados procesos para la alimentación de las bases de datos de Inteligencia de Negocio(BI). Este tipo de procesos, pueden ser más o menos complicados dependiendo del gerente del sistema OLTP y del gerente del sistema de BI. Me explico: Normalmente, la información necesaria en ambos tipos de sistema no suele ser la misma, así que, ya sea el equipo del OLTP o ya sea el del BI, deberá analizar, seleccionar y trasvasar la información disponible desde un sistema a otro. Y dependiendo de los gerentes, esta tarea la realizará uno u otro equipo.

En el caso de que dicha tarea la deban realizar el equipo del OLTP, normalmente, se le ha de añadir un requisito adicional: la eficiencia. Ello suele ser debido a que los sistemas OLTP suelen estar muy ocupados durante el día con el trabajo online, y por las noches, con cargas de datos y realización de informes variados. Así, la ventana de tiempo dada para este tipo de procesos suele ser muy pequeña, por lo que se ha de optimizar al máximo todo el proceso de trasvase.

Aquí se va a describir una metodología de trabajo usando las librerías y procedimientos almacenados de Insert Bulk Bestia de forma que se minimice el tiempo de análisis, selección y trasmisión de la información desde un sistema a otro.

El código mostrado en este capítulo se presenta a modo de ejemplo de utilización de la librería SQLServerAsyncExecProcedures incluida en Insert Bulk Bestia. InforCustom no se hace responsable del uso de dicho código.

Planteamiento del Problema

Se desea realizar un proceso diario para enviar determinada información desde un sistema OLTP a otro sistema para BI (ambos se encuentran en hosts diferentes). Dicha información debe quedar marcada indicando para cada registro si es nuevo, se ha modificado o eliminado desde la última vez que se ejecutó el proceso.

Para ello se crearán una serie de tablas repositorio en ambos sistemas, y en BI se creará una tabla, "ACTUALIZACIONES", donde quedará registrado cada ejecución del proceso. BI chequeará dicha tabla y cuando aparezca un nuevo registro, deberá procesar la información trasvasada.

Nota: Todos los objetos creados se harán en el esquema "rca".

También se parte de la hipótesis de que en el OLTP se encuentran instaladas las librerías de Insert Bulk Bestia y en el sistema de BI no.

Repositorio en OLTP

En la base de datos del sistema OLTP se va a crear un repositorio de tablas con la información que se desea transmitir, y asociado a cada tabla existirá otra tabla donde se almacenará un par de HASHs de la información. Estos dos HASHs van a facilitar y acelerar el proceso de comparación de la información con respecto a si se hiciese con querys que comparasen campo a campo, junto a las querys necesarias para averiguar cuáles son los registros nuevos y los eliminados.

Además del repositorio, se va a incluir en el código una Secuencia utilizada para dar nombre a las sucesivas ejecuciones y una función de cálculo de HASH para aquellas versiones de SQL Server (2014 e inferiores) cuya función HASHBYTES está limitada a 4000 caracteres (NVARCHAR). Veámoslo con código (por brevedad, sólo se van a incluir un par de tablas de ejemplo):


IF EXISTS(select NULL from sys.sequences where object_id = object_id('rca.SEQ_PARALELO'))
  DROP SEQUENCE rca.SEQ_PARALELO
GO
CREATE SEQUENCE rca.SEQ_PARALELO
 AS [bigint]
 START WITH 1
 INCREMENT BY 1
 MINVALUE 1
 MAXVALUE 9223372036854775807
 CACHE 10
GO

--SQL Server 2014 y versiones anteriores:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.fnHashBytes') IS NOT NULL
  DROP FUNCTION rca.fnHashBytes
GO
CREATE FUNCTION rca.fnHashBytes (
  @EncryptAlgorithm NVARCHAR(50),
  @DataToEncrypt nvarchar(max))
RETURNS VARBINARY(MAX) WITH SCHEMABINDING
AS BEGIN
  DECLARE @Index INT = 1, @DataToEncryptLength INT
  DECLARE @EncryptedResult varbinary(max) = NULL

  IF @DataToEncrypt IS NOT NULL
  BEGIN
    SELECT @EncryptedResult = 0x, @DataToEncryptLength = DATALENGTH(@DataToEncrypt)
    WHILE @Index <= @DataToEncryptLength
    BEGIN
      SELECT @EncryptedResult = @EncryptedResult + HASHBYTES(@EncryptAlgorithm, SUBSTRING(@DataToEncrypt, @Index, 4000)), @Index = @Index + 4000
    END
  END
  RETURN @EncryptedResult
END
GO

Como se puede observar, se utiliza el esquema "rca" para definir los objetos utilizados para el repositorio.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1') IS NOT NULL
  DROP TABLE rca.Tabla1
GO
CREATE TABLE rca.Tabla1(
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,
    Campo3 varchar(55) NOT NULL,

    ACT_CD_TIPO smallint NOT NULL,

  CONSTRAINT Tabla1_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     Campo2 ASC,
     ACT_CD_TIPO ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1_HASH') IS NOT NULL
  DROP TABLE rca.Tabla1_HASH
GO
CREATE TABLE rca.Tabla1_HASH (
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,

    HASH_OLD varbinary(max) NULL,
    HASH_NEW varbinary(max) NULL,

 CONSTRAINT Tabla1_HASH_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     Campo2 ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2') IS NOT NULL
  DROP TABLE rca.Tabla2
GO
CREATE TABLE rca.Tabla2(
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,
    Campo3 varchar(55) NOT NULL,

    ACT_CD_TIPO smallint NOT NULL,

  CONSTRAINT Tabla2_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     ACT_CD_TIPO ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2_HASH') IS NOT NULL
  DROP TABLE rca.Tabla2_HASH
GO
CREATE TABLE rca.Tabla2_HASH (
    Campo1 varchar(16) NOT NULL,

    HASH_OLD varbinary(max) NULL,
    HASH_NEW varbinary(max) NULL,

 CONSTRAINT Tabla2_HASH_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

Se va a suponer que tanto "Tabla1" como "Tabla2" sólo contienen los campos que se desean trasvasar. La información va a provenir de las tablas "Tabla1Org" y "Tabla2Org" respectivamente, las cuales contendrán mucha más información, pero que no es necesaria para BI. "Tabla1" tiene una clave primaria que se compone de dos campos y "Tabla2" de un solo campo.

Ambas tablas contienen el campo ACT_CD_TIPO que forma parte de la clave principal. Durante el procesado de la información, los registros se van a duplicar, y este campo se utilizará para distinguir los registros nuevos de los antiguos.

El campo HASH_OLD contendrá el HASH del registro de la última ejecución del proceso, mientras que el campo y HASH_NEW contendrá el HASH del registro de la ejecución actual. Si HASH_OLD es NULL, querrá decir que el registro es nuevo, si HAS_NEW es NULL querrá decir que el registro se ha eliminado de la tabla original. Y si ambos campos no son NULL y no coinciden querrá decir que el registro ha sido modificado.

Procedimientos Almacenados en OLTP

En el OLTP se van a crear una serie de procedimientos almacenados para la actualización de las tablas del repositorio que se han creado.

Se quiere destacar que en las tablas del repositorio no se ha creado ninguna Foreing Key. Sin embargo, si las hubiese, antes de la actualización de las tablas habría que eliminarlas. La actualización de las tablas del repositorio se va a realizar en paralelo, por lo que no habrá ningún orden preestablecido. Así, en primer lugar, se van a crear dos procedimientos almacenados para eliminar y recrear dichas Foreing Keys. A continuación se presenta una aproximación a dichos procedimientos almacenados:

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.EliminaFkLocales') IS NOT NULL
  DROP PROCEDURE rca.EliminaFkLocales
GO
CREATE PROCEDURE rca.EliminaFkLocales
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Eliminar todas las FKs de las tablas del repositorio para su carga
    IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'Tabla1_Tabla2_FK')
      ALTER TABLE rca.Tabla1 DROP CONSTRAINT Tabla1_Tabla2_FK

    EXECUTE ibb.Log_Errores_Inserta N'Elimnadas Foreing Keys Locales.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento

  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR eliminando las Foreing Keys Locales.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.CreaFkLocales') IS NOT NULL
  DROP PROCEDURE rca.CreaFkLocales
GO
CREATE PROCEDURE rca.CreaFkLocales
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Crear todas las FKs necesarias
    --En nuestro ejemplo, no hay FKs entre tablas del repositorio
    --IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'Tabla1_Tabla2_FK')
    --  ALTER TABLE rca.Tabla1 ADD CONSTRAINT Tabla1_Tabla2_FK FOREIGN KEY(Campo1)
    --    REFERENCES rca.Tabla2 (Campo1)

    EXECUTE ibb.Log_Errores_Inserta N'Creadas Foreing Keys Locales.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento

  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR creando las Foreing Keys Locales.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

A continuación, vendrían los procedimientos almacenados encargados de actualizar las tablas del repositorio. Los que se muestran aquí, se encargan de actualizar el contenido de las tablas "rca.Tabla1", "rca.Tabla1_HASH", "rca.Tabla2" y "rca.Tabla2_HASH".

Estos procedimientos almacenados se ejecutarán en paralelo. Por ello, son importantes las siguientes instrucciones:

  • Se desea que, si se produce un error en ellos, no continúe la ejecución (SET XACT_ABORT ON).
  • El objetivo de la ejecución en paralelo es la de a aprovechar al máximo todas las CPUs disponibles. Por ello, no se desea que las Querys se ejecuten en paralelo (OPTION(MAXDOP 1)). MERGE suele ser una instrucción candidata a ser ejecutada en paralelo, por lo que es muy importante indicarle que no lo haga.

Algoritmo:

  • Actualizar el campo ACT_CD_TIPO a 2 para todos los registros en la tabla del repositorio (Tabla1).
  • Insertar todos los registros procedentes de la tabla original del OLTP (Tabla1Org) en la tabla del repositorio (Tabla1). El campo ACT_CD_TIPO se pone a 0 para indicar que son los registros insertados.
    En esta Query es donde se selecciona y transforma adecuadamente la información procedente del OLTP para cubrir las necesidades de BI.
  • Eliminar los registros viejos:
    • Tabla1 contiene los registros de la anterior ejecución marcados con ACT_CD_TIPO a 2 y TODOS los registros del OLTP marcados con ACT_CD_TIPO a 0.
    • El DELETE eliminará los registros que ya existían en Tabla1 y que siguen existiendo en el OLTP. Para ello se usa la clave de Tabla1.
    • De esta forma, los registros con ACT_CD_TIPO = 2 serán los registros que han desaparecido del OLTP, y los registros con ACT_CD_TIPO = 0 contendrán la información existente en el OLTP.
  • Actualización de Tabla1_HASH:
    • Se calcula el HASH de cada registro y se compara con el HASH contenido en el campo HASH_NEW.
    • Si el registro existe en ambas tablas (Tabla1 y Tabla1_HASH), pero el HASH calculado no coincide con HASH_NEW, se actualiza el campo HASH_NEW con el HASH calculado. Esos serán los registros cuya información ha sido modificada en el OLTP. Para estos registros, HASH_OLD y HASH_NEW no coincidirán. Se verá por qué más adelante.
    • Si el registro no existe en Tabla1_HASH, se inserta en dicha tabla. Serán los registros nuevos. Estos registros tendrán el campo HAS_OLD a NULL.
    • Si el registro no existe en Tabla1, en Tabla1_HASH se actualiza el campo HASH_NEW a NULL. Estos registros serán los eliminados y el campo HASH_OLD no será NULL.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTabla1') IS NOT NULL
  DROP PROCEDURE rca.RellenaTabla1
GO
CREATE PROCEDURE rca.RellenaTabla1
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Este procedimiento almacenado se ejecutará en paralelo, por lo que se desea que si se produce un error no coninue con la ejecución.
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY
    --Marcar todos los registros como 2
    UPDATE rca.Tabla1
      SET ACT_CD_TIPO = 2
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Insertar Todos los registros procedentes de la Tabla Original marcados como 0
    INSERT INTO rca.Tabla1
      SELECT Tb1Org.Campo1, Tb1Org.Campo2, Tb1Org.Campo3, 0 AS ACT_CD_TIPO
        FROM dbo.Tabla1Org Tb1Org
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Eliminar los registros viejos:
    --  Si registro1 existe en Tabla1 y TablaOrg1, después del INSERT anterior Tabla1
    --  contiene: registro1-2 (del UPDATE) y registro1-0 (del INSERT). Con este DELETE
    --  se eliminará registro1-2, quedando el nuevo registro.
    DELETE Tb1Del
      FROM rca.Tabla1 Tb1Del
        INNER JOIN rca.Tabla1 Tb1Ins
          ON Tb1Del.Campo1 = Tb1Ins.Campo1 AND Tb1Del.Campo2 = Tb1Ins.Campo2
      WHERE Tb1Del.ACT_CD_TIPO = 2 AND Tb1Ins.ACT_CD_TIPO = 0
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Actualización de Tabla1_HASH
    --  Se calcula el HASH de cada registro y se compara con el campo HASH_NEW
    --    Si el registro existe en ambas tablas, pero el HASH no coincide, se actualiza el campo HASH_NEW
    --    Si el registro no existe Tabla1_HASH, se inserta
    --    Si el registro no existe Tabla1, se anula el campo HAS_NEW
    MERGE rca.Tabla1_HASH TablaHash USING
          (SELECT MBT.Campo1, MBT.Campo2,
                  --SQL Server 2014 y versiones anteriores:
                  --  Aquí se está utilizando la función de SQL Server HASHBYTES, porque se supone que
                  --  la cadena de caracteres que se va a formar no superará los 4000 caracteres. Si se
                  --  superase dicha longitud se debería usar la función rca.fnHashBytes
                    HASHBYTES('SHA2_512', (select MBT.* from (values(null))foo(bar) for xml auto)) as HASH_NEW
            FROM rca.Tabla1 AS MBT
            WHERE MBT.ACT_CD_TIPO = 0
          ) AS Hases_Nuevos
            ON TablaHash.Campo1 = Hases_Nuevos.Campo1 AND TablaHash.Campo2 = Hases_Nuevos.Campo2
      WHEN MATCHED AND TablaHash.HASH_NEW <> Hases_Nuevos.HASH_NEW THEN --Modificados
        UPDATE SET TablaHash.HASH_NEW = Hases_Nuevos.HASH_NEW
      WHEN NOT MATCHED BY TARGET THEN --Nuevos
        INSERT(Campo1, Campo2, HASH_NEW)
        VALUES(Hases_Nuevos.Campo1, Hases_Nuevos.Campo2, Hases_Nuevos.HASH_NEW)
      WHEN NOT MATCHED BY SOURCE THEN --Borrados
        UPDATE SET HASH_NEW = NULL
      OPTION(MAXDOP 1); --Como se ejecutará en paralelo, no hemos de bloquear las CPUs
      --MERGE es una instrucción candidata a ser paralelizada por SQL Server. Como ya se hacen muchas cosas
      --en paralelo, se le indica que no la paralelice para no bloquear las CPUs

    --Puede ser conveniente actualizar los índices y las estadísticas por si hay muchas modificaciones
    ALTER INDEX ALL ON rca.Tabla1_HASH REBUILD WITH (MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla1_HASH WITH COLUMNS
    ALTER INDEX ALL ON rca.Tabla1 REBUILD WITH (MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla1 WITH COLUMNS

    EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado Tabla1 y Tabla1_HASH.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento

  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando Tabla1 y Tabla1_HASH.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTabla2') IS NOT NULL
  DROP PROCEDURE rca.RellenaTabla2
GO
CREATE PROCEDURE rca.RellenaTabla2
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Este procedimiento almacenado se ejecutará en paralelo, por lo que se desea que si se produce un error no coninue con la ejecución.
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY
    --Marcar todos los registros como 2
    UPDATE rca.Tabla2
      SET ACT_CD_TIPO = 2
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Insertar Todos los registros procedentes de la Tabla Original marcados como 0
    INSERT INTO rca.Tabla2
      SELECT Tb2Org.Campo1, Tb2Org.Campo2, Tb2Org.Campo3, 0 AS ACT_CD_TIPO
        FROM dbo.Tabla2Org Tb2Org
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Eliminar los registros viejos:
    --  Si registro1 existe en Tabla2 y Tabla2Org, después del INSERT anterior Tabla1
    --  contiene: registro1-2 (del UPDATE) y registro1-0 (del INSERT). Con este DELETE
    --  se eliminará registro1-2, quedando el nuevo registro.
    DELETE Tb2Del
      FROM rca.Tabla2 Tb2Del
        INNER JOIN rca.Tabla2 Tb2Ins
          ON Tb2Del.Campo1 = Tb2Ins.Campo1
      WHERE Tb2Del.ACT_CD_TIPO = 2 AND Tb2Ins.ACT_CD_TIPO = 0
      OPTION(MAXDOP 1) --Como se ejecutará en paralelo, no hemos de bloquear las CPUs

    --Actualización de Tabla2_HASH
    --  Se calcula el HASH de cada registro y se compara con el campo HASH_NEW
    --    Si el registro existe en ambas tablas, pero el HASH no coincide, se actualiza el campo HASH_NEW
    --    Si el registro no existe Tabla1_HASH, se inserta
    --    Si el registro no existe Tabla1, se anula el campo HAS_NEW
    MERGE rca.Tabla2_HASH TablaHash USING
          (SELECT MBT.Campo1,
                  --SQL Server 2014 y versiones anteriores:
                  --  Aquí se está utilizando la función de SQL Server HASHBYTES, porque se supone que
                  --  la cadena de caracteres que se va a formar no superará los 4000 caracteres. Si se
                  --  superase dicha longitud se debería usar la función rca.fnHashBytes
                    HASHBYTES('SHA2_512', (select MBT.* from (values(null))foo(bar) for xml auto)) as HASH_NEW
            FROM rca.Tabla2 AS MBT
            WHERE MBT.ACT_CD_TIPO = 0
          ) AS Hases_Nuevos
            ON TablaHash.Campo1 = Hases_Nuevos.Campo1
      WHEN MATCHED AND TablaHash.HASH_NEW <> Hases_Nuevos.HASH_NEW THEN --Modificados
        UPDATE SET TablaHash.HASH_NEW = Hases_Nuevos.HASH_NEW
      WHEN NOT MATCHED BY TARGET THEN --Nuevos
        INSERT(Campo1, HASH_NEW)
        VALUES(Hases_Nuevos.Campo1, Hases_Nuevos.HASH_NEW)
      WHEN NOT MATCHED BY SOURCE THEN --Borrados
        UPDATE SET HASH_NEW = NULL
      OPTION(MAXDOP 1); --Como se ejecutará en paralelo, no hemos de bloquear las CPUs
      --MERGE es una instrucción candidata a ser paralelizada por SQL Server. Como ya se hacen muchas cosas
      --en paralelo, se le indica que no la paralelice para no bloquear las CPUs

    --Puede ser conveniente actualizar los índices y las estadísticas por si hay muchas modificaciones
    ALTER INDEX ALL ON rca.Tabla2_HASH REBUILD WITH (MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla2_HASH WITH COLUMNS
    ALTER INDEX ALL ON rca.Tabla2 REBUILD WITH (MAXDOP = 1, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla2 WITH COLUMNS

    EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado Tabla2 y Tabla2_HASH.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento

  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando Tabla2 y Tabla2_HASH.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Repositorio en BI

Al igual que se han creado una serie de tablas en el OLTP, se crearán sus réplicas en BI de acuerdo con los requisitos planteados al inicio.

A continuación, se muestra las réplicas de "Tabla1" y "Tabla2". Se les ha añadido un campo nuevo, ACT_CD_ACTUALIZACION, que se corresponde con la clave de la tabla rca.ACTUALIZACIONES. Ambas tablas contienen el campo ACT_CD_TIPO que indicará lo que ha sucedido con el registro desde la última vez que se ejecutó el proceso. Contendrá los valores: 0-Insert, 1-Update, 2-Delete.

La tabla rca.ACTUALIZACIONES va a contener un registro por cada vez que se haya intentado realizar el proceso de trasvase de datos. Si el proceso se ha realizado con éxito, el campo ACT_FH_FIN tendrá un valor, y si ha habido errores, este campo contendrá un NULL.

Al igual que en el OLTP, no se han definido Foreing Keys entre las tablas del repositorio, sin embargo, si se han definido las FKs que relacionan dichas tablas con la tabla rca.ACTUALIZACIONES.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla1') IS NOT NULL
  DROP TABLE rca.Tabla1
GO
CREATE TABLE rca.Tabla1(
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,
    Campo3 varchar(55) NOT NULL,

    ACT_CD_TIPO smallint NOT NULL,
    ACT_CD_ACTUALIZACION bigint NOT NULL,

   CONSTRAINT Tabla1_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     Campo2 ASC,
     ACT_CD_ACTUALIZACION ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Tabla2') IS NOT NULL
  DROP TABLE rca.Tabla2
GO
CREATE TABLE rca.Tabla2(
    Campo1 varchar(16) NOT NULL,
    Campo2 smallint NOT NULL,
    Campo3 varchar(55) NOT NULL,

    ACT_CD_TIPO smallint NOT NULL,
    ACT_CD_ACTUALIZACION bigint NOT NULL,

   CONSTRAINT Tabla2_PK PRIMARY KEY CLUSTERED
    (
     Campo1 ASC,
     ACT_CD_ACTUALIZACION ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ACTUALIZACIONES') IS NOT NULL
  DROP TABLE rca.ACTUALIZACIONES
GO
CREATE TABLE rca.ACTUALIZACIONES(
  ACT_CD_ACTUALIZACION bigint NOT NULL,
  ACT_FH_INICIO datetime2(0) NOT NULL,
  ACT_FH_FIN datetime2(0) NULL,
  ACT_DURACION nvarchar(50) NULL,

  CONSTRAINT ACTUALIZACIONES_PK PRIMARY KEY CLUSTERED
    (
        ACT_CD_ACTUALIZACION ASC
    )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

CREATE NONCLUSTERED INDEX RCA_ACTUALIZACIONES_FIN_NULO_IDX ON rca.ACTUALIZACIONES
(
  ACT_FH_FIN ASC
)
INCLUDE(ACT_CD_ACTUALIZACION)
WITH (PAD_INDEX = ON, STATISTICS_NORECOMPUTE = OFF, SORT_IN_TEMPDB = OFF, DROP_EXISTING = OFF, ONLINE = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, FILLFACTOR = 100) ON [PRIMARY]
GO


ALTER TABLE rca.Tabla1 ADD CONSTRAINT ACTUALIZACIONES_Tabla1_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
  REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE CASCADE
GO
ALTER TABLE rca.Tabla2 ADD CONSTRAINT ACTUALIZACIONES_Tabla2_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
  REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE CASCADE
GO

Procedimientos Almacenados en BI

También va a ser necesario la creación de determinados procedimientos almacenados del lado de BI:

  • Limpieza de las tablas del repositorio y de la tabla rca.ACTUALIZACIONES de posibles ejecuciones anteriores que no finalizaron correctamente: (ACT_FH_FIN IS NULL, o bien, ACT_CD_ACTUALIZACION no existe en rca.ACTUALIZACIONES). También inserta en rca.ACTUALIZACIONES un nuevo registro para la ejecución en curso.
  • También se crea un procedimiento almacenado para actualizar la fecha de finalización del proceso, indicando, además, que el proceso se ha ejecutado correctamente.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.LimpiaTablasRemotas') IS NOT NULL
  DROP PROCEDURE rca.LimpiaTablasRemotas
GO
CREATE PROCEDURE rca.LimpiaTablasRemotas
  @fechaInicioProceso NVARCHAR(50),
  @IdParalelo BIGINT,
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON

  SET @error = 0
  BEGIN TRY

    --Eliminar de Tabla1 y Tabla2 aquellos registros que se hayan podido insertar de un proceso anterior que haya
    --podido fallar (ACT_FH_FIN IS NULL o bien ACT_CD_ACTUALIZACION no existe en rca.ACTUALIZACIONES)
    DELETE rca.Tabla1
      WHERE ACT_CD_ACTUALIZACION IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES WHERE ACT_FH_FIN IS NULL)
        OR ACT_CD_ACTUALIZACION NOT IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES)

    DELETE rca.Tabla2
      WHERE ACT_CD_ACTUALIZACION IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES WHERE ACT_FH_FIN IS NULL)
        OR ACT_CD_ACTUALIZACION NOT IN (SELECT ACT_CD_ACTUALIZACION FROM rca.ACTUALIZACIONES)

    --Hay un DELETE CASCADE, pero por tema de prestaciones se realizan las querys anteriores.
    DELETE rca.ACTUALIZACIONES
      WHERE ACT_FH_FIN IS NULL

    --Se crea el nuevo registro para la ejecución en curso
    INSERT INTO rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION, ACT_FH_INICIO)
      SELECT @IdParalelo, TRY_CONVERT(datetime2(0), @fechaInicioProceso, 121)

    --Por prestaciones, puede ser conveniente la actualización de los índices y estadísticas.
    ALTER INDEX ALL ON rca.ACTUALIZACIONES REBUILD WITH (MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.ACTUALIZACIONES WITH COLUMNS
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.UpdateFinActualizacion') IS NOT NULL
  DROP PROCEDURE rca.UpdateFinActualizacion
GO
CREATE PROCEDURE rca.UpdateFinActualizacion
  @IdParalelo bigint,
  @Duracion nvarchar(50),
  @fFin nvarchar(50),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  SET @error = 0
  BEGIN TRY
    UPDATE rca.ACTUALIZACIONES
        SET ACT_FH_FIN = TRY_CONVERT(datetime2(0), @fFin, 121), ACT_DURACION = @Duracion
      WHERE ACT_CD_ACTUALIZACION = @IdParalelo
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

En segundo lugar, se crean los procedimientos almacenados para eliminar y crear los Foreing Keys en el servidor remoto. Aunque no se definan FKs entre las tablas del repositorio, si se han definido FKs entre dichas tablas y la tabla rca.ACTUALIZACIONES.

También se ha creado un procedimiento almacenado para la regeneración de los índices y estadísticas de las tablas de BI.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.EliminaFkRemotas') IS NOT NULL
  DROP PROCEDURE rca.EliminaFkRemotas
GO
CREATE PROCEDURE rca.EliminaFkRemotas
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  SET @error = 0
  BEGIN TRY
    --Eliminar todas las FKs de las tablas del repositorio para su carga
    IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'Tabla1_Tabla2_FK')
      ALTER TABLE rca.Tabla1 DROP CONSTRAINT Tabla1_Tabla2_FK

    --Eliminar todas las FKs contra la tabla de rca.ACTUALIZACIONES
    IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'ACTUALIZACIONES_Tabla1_FK')
      ALTER TABLE rca.Tabla1 DROP CONSTRAINT ACTUALIZACIONES_Tabla1_FK

    IF EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'ACTUALIZACIONES_Tabla2_FK')
      ALTER TABLE rca.Tabla2 DROP CONSTRAINT ACTUALIZACIONES_Tabla2_FK
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.CreaFkRemotas') IS NOT NULL
  DROP PROCEDURE rca.CreaFkRemotas
GO
CREATE PROCEDURE rca.CreaFkRemotas
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  SET @error = 0
  BEGIN TRY

    --Crear todas las FKs necesarias
    --En el ejemplo no hay ninguna FKs entre las tablas del repositorio
    --IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'Tabla1_Tabla2_FK')
    --ALTER TABLE rca.Tabla1 ADD CONSTRAINT Tabla1_Tabla2_FK FOREIGN KEY(Campo1)
    --  REFERENCES rca.Tabla2 (Campo1)

    --Crear todas las FKs contra la tabla de rca.ACTUALIZACIONES
    IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'ACTUALIZACIONES_Tabla1_FK')
      ALTER TABLE rca.Tabla1 ADD CONSTRAINT ACTUALIZACIONES_Tabla1_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
      REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE CASCADE

    IF NOT EXISTS(SELECT NULL FROM sys.foreign_keys WHERE name = 'ACTUALIZACIONES_Tabla2_FK')
      ALTER TABLE rca.Tabla2 ADD CONSTRAINT ACTUALIZACIONES_Tabla2_FK FOREIGN KEY(ACT_CD_ACTUALIZACION)
      REFERENCES rca.ACTUALIZACIONES (ACT_CD_ACTUALIZACION) ON DELETE CASCADE
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RegeneraIndicesRemotos') IS NOT NULL
  DROP PROCEDURE rca.RegeneraIndicesRemotos
GO
CREATE PROCEDURE rca.RegeneraIndicesRemotos
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  SET @error = 0
  BEGIN TRY
    --Si hay muchas actualizaciones en el repositorio, puede ser conveniente la actualización
    --de los índices y las estadísticas
    ALTER INDEX ALL ON rca.Tabla1 REBUILD WITH (MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla1 WITH COLUMNS

    ALTER INDEX ALL ON rca.Tabla2 REBUILD WITH (MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.Tabla2 WITH COLUMNS

    ALTER INDEX ALL ON rca.ACTUALIZACIONES REBUILD WITH (MAXDOP = 0, SORT_IN_TEMPDB = ON, ONLINE = OFF, STATISTICS_NORECOMPUTE = OFF)
    UPDATE STATISTICS rca.ACTUALIZACIONES WITH COLUMNS
  END TRY
  BEGIN CATCH
    --Sería conveniente crear toda la estructura de ibb.Log_Errores en el sistema de BI y dejar las trazas correspondientes
    SET @error = 1
  END CATCH
END
GO

Procedimientos Almacenados para el Trasvase de Información

En el OLTP, se crean los procedimientos almacenados para el trasvase de información. Se supone que se está utilizando un Servidor Vinculado llamado RCA, y una base de datos de destino también llamada RCA. Las tablas de destino, como ya se ha comentado, se encuentran en el esquema rca.

Evidentemente, el uso de un Servidor Vinculado no es lo más apropiado para la realización de este tipo de cosas. Sin embargo, si éste es uno de los requisitos, una de las formas de acelerar el proceso es el uso de las librerías 'SQLServerAsyncExecProcedures' de forma que se optimicen al máximo los recursos disponibles, y sobre todo, la transmisión de datos a través de la red, ya que éste suele ser el cuello de botella.

Se puede observar que con el algoritmo utilizado hasta ahora no es necesario que la información viaje en los dos sentidos, es decir, en el OLTP se conoce la información que se ha de enviar a BI, y no es necesario que la información de BI deba viajar al OLTP para realizar comparaciones. Evidentemente, esto genera una gran eficiencia, ya que el cuello de botella suele estar en la red.

Así, a continuación, se muestran los procedimientos almacenados que trasvasan los datos para Tabla1 y Tabla2:

  • Si hay transacciones, éstas deberán promocionar a transacciones distribuidas: SET REMOTE_PROC_TRANSACTIONS ON.
  • Se desea que, si se produce un error en ellos, no continúe la ejecución (SET XACT_ABORT ON).
  • En BI, se han de insertar los registros nuevos (HASH_OLD IS NULL), los modificados y los eliminados (HASH_NEW != HASH_OLD).
  • El campo ACT_CD_TIPO contendrá los valores: 0-Insert(HASH_OLD IS NULL), 1-Update(HASH_NEW != HASH_OLD), 2-Delete(HASH_NEW IS NULL).
  • El campo ACT_CD_ACTUALIZACION contendrá el valor apropiado como ya se verá mas adelante.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Traspasa_Tabla1') IS NOT NULL
  DROP PROCEDURE rca.Traspasa_Tabla1
GO
CREATE PROCEDURE rca.Traspasa_Tabla1
  @idParalelo BIGINT,
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Si hay transacciones, éstas deben poder promocionar
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY
    --Insert de los Nuevos (HASH_OLD Es nulo) y Modificados (HASH_NEW != HASH_OLD)
    INSERT INTO RCA.RCA.rca.Tabla1
      SELECT tb1.Campo1, tb1.Campo2, tb1.Campo3,
             IIF(ExpH.HASH_OLD IS NULL, 0, IIF(ExpH.HASH_NEW IS NULL, 2, 1)), @idParalelo
        FROM rca.Tabla1 tb1
          INNER JOIN rca.Tabla1_HASH ExpH
            ON ExpH.Campo1 = tb1.Campo1 AND ExpH.Campo2 = tb1.Campo2
        WHERE ExpH.HASH_OLD IS NULL OR ExpH.HASH_NEW != ExpH.HASH_OLD

    EXECUTE ibb.Log_Errores_Inserta N'Se ha trasvasado Tabla1.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR trasvasando Tabla1.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Traspasa_Tabla2') IS NOT NULL
  DROP PROCEDURE rca.Traspasa_Tabla2
GO
CREATE PROCEDURE rca.Traspasa_Tabla2
  @idParalelo BIGINT,
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--Si hay transacciones, éstas deben poder promocionar
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY
    --Insert de los Nuevos (HASH_OLD Es nulo) y Modificados (HASH_NEW != HASH_OLD)
    INSERT INTO RCA.RCA.rca.Tabla2
      SELECT tb1.Campo1, tb1.Campo2, tb1.Campo3,
             IIF(ExpH.HASH_OLD IS NULL, 0, IIF(ExpH.HASH_NEW IS NULL, 2, 1)), @idParalelo
        FROM rca.Tabla2 tb1
          INNER JOIN rca.Tabla2_HASH ExpH
            ON ExpH.Campo1 = tb1.Campo1
        WHERE ExpH.HASH_OLD IS NULL OR ExpH.HASH_NEW != ExpH.HASH_OLD

    EXECUTE ibb.Log_Errores_Inserta N'Se ha trasvasado Tabla2.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR trasvasando Tabla2.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Procedimientos Almacenados para el COMMIT

Hasta ahora, todos los procedimientos almacenados no necesitan transacciones, ya que, en caso de error el algoritmo se recupera de errores. Por ejemplo, si falla en la actualización de las tablas del OLTP o la generación de los HASH_NEWs, en la próxima ejecución del proceso se recuperará de estos errores. Si falla en la transmisión de las tablas desde el OLTP a BI, el procedimiento almacenado 'RCA.RCA.rca.LimpiaTablasRemotas' recuperará las tablas remotas.

Sin embargo, los procedimientos almacenados que aquí se describen son que los realmente hacen el COMMIT del proceso, es decir, son procedimientos almacenados que necesitan hacerse dentro de una transacción, ya que en caso de fallo, el proceso no podrá recuperarse. Las transacciones pueden ser de cualquier nivel de aislamiento, ya que cada procedimiento almacenado no necesita "ver" los datos del resto de procedimientos almacenados.

Los procedimientos almacenados 'rca.ActualizaHash_Tabla1' y 'rca.ActualizaHash_Tabla2' preparan las tablas Tabla1_HASH y Tabla2_HASH para la próxima ejecución del proceso:

  • Eliminan de Tabla1_HASH los registros que han desaparecido en el OLTP (HASH_NEW IS NULL).
  • Eliminan de Tabla1 aquellos registros que no existen en Tabla1_HASH.
  • Actualiza en Tabla1_HASH el campo HASH_OLD con el contenido de HASH_NEW.
Si fallan estos procedimientos, no se podrá distinguir qué registro ha sido procesado correctamente del que no lo ha sido. Las transacciones que puedan definirse para estos procedimientos almacenados van a ser locales a la instancia donde se encuentra el OLTP.

En este paso crítico también se ha de llamar al procedimiento remoto 'RCA.RCA.rca.UpdateFinActualizacion', que actualiza el campo 'ACT_FH_FIN' indicando que el proceso se ha ejecutado correctamente. Si falla este método, el proceso tampoco se podrá recuperar en la próxima ejecución. En este otro caso, la transacción que se asocie a este procedimiento almacenado va a ser distribuida, por lo que se tendrá que habilitar MS DTC en ambos servidores.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ActualizaHash_Tabla1') IS NOT NULL
  DROP PROCEDURE rca.ActualizaHash_Tabla1
GO
CREATE PROCEDURE rca.ActualizaHash_Tabla1
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Eliminar de Tabla1_HASH aquellos registros cuyo HAS_NEW es nulo (eliminados)
    DELETE rca.Tabla1_HASH
      WHERE HASH_NEW IS NULL

    --Eliminar de Tabla1 aquellos registros que no existan en Tabla1_HASH
    DELETE Tbl
      FROM rca.Tabla1 Tbl
        LEFT JOIN rca.Tabla1_HASH TblHs
          ON Tbl.Campo1 = TblHs.Campo1 AND Tbl.Campo2 = TblHs.Campo2
      WHERE TblHs.Campo1 IS NULL AND TblHs.Campo2 IS NULL

    --Actualizar los HASH_OLD con los HAS_NEW
    UPDATE rca.Tabla1_HASH
      SET HASH_OLD = HASH_NEW

    EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado Tabla1_HASH.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando Tabla1_HASH.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.ActualizaHash_Tabla2') IS NOT NULL
  DROP PROCEDURE rca.ActualizaHash_Tabla2
GO
CREATE PROCEDURE rca.ActualizaHash_Tabla2
  @NombreParalelo NVARCHAR(256),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET XACT_ABORT ON

  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Eliminar de Tabla1_HASH aquellos registros cuyo HAS_NEW es nulo (eliminados)
    DELETE rca.Tabla2_HASH
      WHERE HASH_NEW IS NULL

    --Eliminar de Tabla1 aquellos registros que no existan en Tabla1_HASH
    DELETE Tbl
      FROM rca.Tabla2 Tbl
        LEFT JOIN rca.Tabla2_HASH TblHs
          ON Tbl.Campo1 = TblHs.Campo1
      WHERE TblHs.Campo1 IS NULL

    --Actualizar los HASH_OLD con los HAS_NEW
    UPDATE rca.Tabla2_HASH
      SET HASH_OLD = HASH_NEW

    EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado Tabla2_HASH.',
      0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando Tabla2_HASH.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Para este punto crítico existen dos posibilidades:

  • La primera es la de ejecutar en paralelo cada uno de estos procedimientos almacenados con transacciones independientes. Si todos los procedimientos se ejecutan sin errores se hará un commit de todas las transacciones. Si alguno falla, se hará un rollback. Conviene destacar que la transacción necesaria para ejecutar 'RCA.RCA.rca.UpdateFinActualizacion' debe ser una transacción distribuida, por lo que se necesita tener habilitado MS DTC en ambos servidores. La experiencia dice que esta posibilidad tiene una alta probabilidad de que las transacciones fallen, no los procedimientos almacenados, sino las propias transacciones.
  • La segunda es crear un nuevo procedimiento almacenado que llame a todos estos procedimientos almacenados de actualización de HASHs, junto con el procedimiento almacenado 'RCA.RCA.rca.UpdateFinActualizacion' y ponerlo en una transacción. La transacción, evidentemente, va a promocionar a una transacción distribuida, por lo que se necesita tener habilitado MS DTC en ambos servidores.

Como aquí se está presentando un ejemplo demostrativo, se mostrará el código para ambas soluciones. En función de las pruebas de ensayo y error, ya se decidirá por una u otra.

Si se opta por la segunda posibilidad, habrá que crear el siguiente procedimiento almacenado:

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.Actualiza_Hash_Todas_Tablas_Secuencial') IS NOT NULL
  DROP PROCEDURE rca.Actualiza_Hash_Todas_Tablas_Secuencial
GO
CREATE PROCEDURE rca.Actualiza_Hash_Todas_Tablas_Secuencial
  @NombreParalelo NVARCHAR(256),
  @IdParalelo bigint,
  @inicioProceso DATETIME2(7),
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)
  DECLARE @Duracion nvarchar(50), @fFin DATETIME2(7)

  SET @error = 0
  BEGIN TRY

    EXEC rca.ActualizaHash_Tabla1 @NombreParalelo, @error out
    IF @error = 0
      EXEC rca.ActualizaHash_Tabla2 @NombreParalelo, @error out
    --Y así con el resto de tablas

    --Por último, actualizar la fecha de finalización del proceso
    IF @error = 0
    BEGIN
      SELECT @fFin = SYSDATETIME(), @Duracion = ibb.TimeSpanToNow(@inicioProceso)
      EXEC RCA.RCA.rca.UpdateFinActualizacion @IdParalelo, @Duracion, @fFin, @error out
    END

    IF @error = 0
      EXECUTE ibb.Log_Errores_Inserta N'Se ha actualizado el HASH de todas las Tablas.',
        0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
    ELSE
      EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando el HASH de todas las Tablas.',
        10, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR actualizando el HASH de todas las Tablas.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Creación del Paralelo y Ejecución

Por último, se detalla el procedimiento almacenado que crea el 'Paralelo' y lo ejecuta.

  • Si hay transacciones, éstas deberán promocionar a transacciones distribuidas: SET REMOTE_PROC_TRANSACTIONS ON.
  • Se desea que, si se produce un error en ellos, no continúe la ejecución (SET XACT_ABORT ON).
  • Si hay transacciones abiertas en la llamada a este procedimiento remoto, se aborta la ejecución.
  • Con el fin de identificar cada ejecución por un nombre distinto, se va a crear dicho nombre a partir de una secuencia. Este valor, también será utilizado para identificar una nueva ejecución del proceso en BI (ACT_CD_ACTUALIZACION).
  • El Paralelo se va a componer de 3 Lotes de Trabajos:

    • 1º Actualización de las tablas del repositorio del OLTP. Sin transacciones.
      Trabajos iniciales ejecutados secuencialmente:
      • Crear el nuevo registro en rca.ACTUALIZACIONES y eliminar datos de ejecuciones pasadas fallidas.
      • Si hay FKs en el repositorio del OLTP: Eliminarlas.
      • Eliminar las FKs del repositorio de BI.
      Trabajos a realizar en paralelo:
      • Actualizar las tablas del repositorio del OLTP en paralelo.
        Nota: en los procedimientos almacenados va incluido la regeneración de índices y estadísticas, si no, habría que hacer un procedimiento almacenado para ello y ejecutarlo como trabajos Finales.
      Trabajos finales a ejecutar secuencialmente:
      • Si hay FKs en el repositorio del OLTP: Crearlas

    • 2º Trasvase de Información desde el repositorio del OLTP al repositorio de BI. Sin transacciones.
      Trabajos a realizar en paralelo:
      • Traspasar la información desde el repositorio del OLTP a BI en paralelo.
      Trabajos finales a ejecutar secuencialmente:
      • Crear las FKs en BI.
      • Regenerar los índices en BI.

    • 3º Actualización de HAHS_OLD por HASH_NEW y poner indicador de fin del trabajo. Con transacciones.

      PRIMERA OPCIÓN: Ejecutar en paralelo y con transacciones la actualización de los HASH y la Fecha de Finalización

      Trabajos a realizar en paralelo:
      • Actualizar los HASHs de las tablas en el repositorio del OLTP.
      • Poner la fecha de finalización en el repositorio de BI.
      Opciones de ejecución para el paralelo:
      • Se han creado tres lotes de trabajos, y después de experimentar varias veces se obtienen los grados de paralelismo óptimos. En el código se indica que para el primer lote el grado de paralelismo es de 6(6 procesos simultáneos), para el segundo 20(transmisión de la información) y para el tercero 15(si se opta por la PRIMERA OPCION)
      • Se indica que las transacciones a crear serán de tipo 'ReadCommitted' y que el tiempo máximo que durará abierta una transacción será de 5 minutos. Este valor deberá ser tal que sea el tiempo total de duración del 3º lote, ya que es el único que tiene transacciones.
      Ejecutar el paralelo

      SEGUNDA OPCIÓN: Ejecutar el procedimiento almacenado "rca.Actualiza_Hash_Todas_Tablas_Secuencial" dentro de una transacción

      Opciones de ejecución para el paralelo:
      • Se han creado dos lotes de trabajos, y después de experimentar varias veces se obtienen los grados de paralelismo óptimos. En el código se indica que para el primer lote el grado de paralelismo es de 6(6 procesos simultáneos), y para el segundo 20(transmisión de la información). No hay tercer lote.
      • Se indica que las transacciones a crear serán de tipo 'ReadCommitted' y que el tiempo máximo que durará abierta una transacción será de 5 minutos. Sin embargo, esta parametrización no tiene efecto, ya que el tercer lote de trabajos no se ha creado y el resto de lotes no tienen transacciones.
      Ejecutar el paralelo
      Si el paralelo se ha ejecutado correctamente:
      • Abrir una transacción
      • Ejecutar el procedimiento almacenado creado expresamente para la SEGUNDA OPCIÓN.
      • Si se ha ejecutado correctamente, finalizar la transacción, en caso contrario, deshacerla.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
IF OBJECT_ID ('rca.RellenaTablasParalelo') IS NOT NULL
  DROP PROCEDURE rca.RellenaTablasParalelo
GO
CREATE PROCEDURE rca.RellenaTablasParalelo
  @error INT OUTPUT
AS BEGIN SET NOCOUNT ON
--SET IMPLICIT_TRANSACTIONS OFF
SET REMOTE_PROC_TRANSACTIONS ON
SET XACT_ABORT ON

  DECLARE @NumTranAntes INT = @@TRANCOUNT
  DECLARE @idParalelo bigint, @NombreParalelo varchar(256) = NULL, @Parametros NVARCHAR(MAX) = NULL, @Trabajo NVARCHAR(MAX) = NULL
  DECLARE @Duracion nvarchar(50), @fFin DATETIME2(7)

  --Variables para LOG
  DECLARE @inicio DATETIME2(7) = SYSDATETIME(), @nameDB SYSNAME = DB_NAME(), @esquema SYSNAME = OBJECT_SCHEMA_NAME(@@PROCID), @procedimiento SYSNAME = OBJECT_NAME(@@PROCID)

  SET @error = 0
  BEGIN TRY

    --Si hay transacciones abiertas, el sistema no funcionará
    IF @NumTranAntes > 0
    BEGIN
      SET @error = 1
      EXECUTE ibb.Log_Errores_Inserta N'El trasvase de tablas no puede funcionar con una transacción abierta.',
        0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
      RETURN
    END

    --Con el fin de identificar cada ejecución por un nombre distinto, se va a crear dicho nombre a partir de una secuencia:
    SELECT @idParalelo = NEXT VALUE FOR rca.SEQ_PARALELO, @NombreParalelo = CONCAT('ParaleloRepositorio', @idParalelo)

    --El Paralelo se va a componer de 3 Lotes de Trabajos:
    --  1º Actualización de las tablas del repositorio local. Sin transacciones
    --  2º Trasvase de Información desde el repositorio local al remoto. Sin transacciones
    --  3º Actualización de HAHS_OLD por HASH_NEW y poner indicador de fin del trabajo. Con transacciones

    --Lote de Trabajos 1º:
    --  1º Crear el nuevo registro en rca.ACTUALIZACIONES y eliminar datos de ejecuciones pasadas fallidas
    --  2º Si hay FKs locales en el respositorio: Eliminarlas.
    --  3º Eliminar las FKs remotas.
    SET @Trabajo = CONCAT(N'EXEC SGR.rca.LimpiaTablasRemotas N''', TRY_CONVERT(nvarchar, @inicio, 121), N''', ', @idParalelo, N', @error out')
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Iniciales', N'Limpia Tablas Remotas', 0, @Trabajo
    SELECT @Parametros = CONCAT(N'N''', @NombreParalelo, N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.EliminaFkLocales ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Iniciales', N'Elimina FKs Locales', 0, @Trabajo
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Iniciales', N'Elimina FKs Remotas', 0, N'EXEC SGR.rca.EliminaFkRemotas @error out'

    --  4º Actualizar las tablas del repositorio local en paralelo.
    --    Nota: en los procedimientos almacenados va incluido la regeneración de índices y estadísticas, si no, habría
    --          que hacer un procedimiento almacenado para ello y ejecutarlo como trabajos Finales
    SET @Trabajo = CONCAT(N'EXEC rca.RellenaTabla1 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Paralelo', N'Carga Tabla1', 0, @Trabajo
    SET @Trabajo = CONCAT(N'EXEC rca.RellenaTabla2 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Paralelo', N'Carga Tabla2', 0, @Trabajo

    --  5º Si hay FKs locales en el repositorio: Crearlas
    SET @Trabajo = CONCAT(N'EXEC rca.CreaFkLocales ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 1, N'Finales', N'Crea FKs Locales', 0, @Trabajo

    --Lote de Trabajos 2º:
    --  1º Traspasar la información desde el local al remoto en paralelo
    --  2º Crear las FKs remotas
    --  3º Regenerar los índices remotos
    SELECT @Parametros = CONCAT(@idParalelo, N', N''', @NombreParalelo, N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.Traspasa_Tabla1 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2, N'Paralelo', N'Traspasa Tabla1', 0, @Trabajo
    SELECT @Trabajo = CONCAT(N'EXEC rca.Traspasa_Tabla2 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2, N'Paralelo', N'Traspasa Tabla2', 0, @Trabajo

    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2, N'Finales', N'Crea FKs Remotas', 0, N'EXEC SGR.rca.CreaFkRemotas @error out'
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 2, N'Finales', N'Regenera Índices Remotos', 0, N'EXEC SGR.rca.RegeneraIndicesRemotos @error out'

    --PRIMERA OPCIÓN: Ejecutar en paralelo y con transacciones la actualización de los HASH y la Fecha de Finalización
    --Lote de trabajos 3º, con transacciones:
    --  1º Actualizar los HASHs de las tablas
    --  2º Poner la fecha de finalización en el remoto
    SELECT @Parametros = CONCAT(N'''', @NombreParalelo, N''', @error out'), @Trabajo = CONCAT(N'EXEC rca.ActualizaHash_Tabla1 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3, N'Paralelo', N'ActualizaHash Tabla1', 1, @Trabajo
    SELECT @Trabajo = CONCAT(N'EXEC rca.ActualizaHash_Tabla2 ', @Parametros)
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3, N'Paralelo', N'ActualizaHash Tabla2', 1, @Trabajo
    SELECT @Duracion = ibb.TimeSpanToNow(@inicio), @Trabajo = CONCAT(N'EXEC SGR.rca.UpdateFinActualizacion ', @idParalelo,
      N', ''', @Duracion, N''', ''', TRY_CONVERT(nvarchar, SYSDATETIME(), 121), N''', @error out')
    EXEC ibbclr.Parallel_AddJob @NombreParalelo, 3, N'Paralelo', N'Actualiza Fecha Fin', 1, @Trabajo

    --Opciones para la ejecución de los Trabajos
    --  Se han creado tres lotes de trabajos, y después de experimentar varias veces comprobamos que los siguientes grados
    --  de paralelismo son los óptimos
    EXEC ibbclr.Parallel_SetOption_MaxTaskBatch @NombreParalelo, 1, 6
    EXEC ibbclr.Parallel_SetOption_MaxTaskBatch @NombreParalelo, 2, 20
    EXEC ibbclr.Parallel_SetOption_MaxTaskBatch @NombreParalelo, 3, 15
    --  Se indica que las transacciones a crear serán de tipo 'ReadCommitted' y que el tiempo máximo que durará abierta una
    --  Transacción será de 5 minutos. Este valor deberá ser tal que sea el tiempo total de duración del 3º lote, ya que es el
    --  único que tiene transacciones
    EXEC ibbclr.Parallel_SetOptions @NombreParalelo, N'ReadCommitted', 5

    --Ejecución del Paralelo
    EXEC @error = ibbclr.Parallel_Execute @NombreParalelo

    ----SEGUNDA OPCION: Ejecutar el procedimiento almacenado rca.Actualiza_Hash_Todas_Tablas_Secuencial dentro de una transacción
    --BEGIN TRAN
    --EXEC rca.Actualiza_Hash_Todas_Tablas_Secuencial @NombreParalelo, @idParalelo, @inicio, @error out
    --IF @error = 0
    --  COMMIT
    --ELSE
    --  ROLLBACK

    IF @error = 0
      EXECUTE ibb.Log_Errores_Inserta N'Se ha ejecutado el Paralelo.',
        0, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
    ELSE
      EXECUTE ibb.Log_Errores_Inserta N'ERROR ejecutando el Paralelo.',
        10, @NombreParalelo, @inicio, @nameDB, @esquema, @procedimiento
  END TRY
  BEGIN CATCH
    DECLARE @ERR_SQL_NUM_ERROR INT = ERROR_NUMBER(), @ERR_SQL_NUM_LINEA INT = ERROR_LINE(), @ERR_SQL_MENSAJE NVARCHAR(MAX) = ERROR_MESSAGE(), @ERR_SQL_PROCEDIMIENTO SYSNAME = ERROR_PROCEDURE(), @ERR_SQL_SEVERIDAD INT = ERROR_SEVERITY(), @ERR_SQL_ESTADO INT = ERROR_STATE()
    EXECUTE ibb.Log_Errores_Inserta N'ERROR ejecutando el Paralelo.',
      10, @NombreParalelo, NULL, @nameDB, @esquema, @procedimiento, @ERR_SQL_NUM_ERROR, @ERR_SQL_NUM_LINEA, @ERR_SQL_MENSAJE, @ERR_SQL_PROCEDIMIENTO, @ERR_SQL_SEVERIDAD, @ERR_SQL_ESTADO
    SET @error = @ERR_SQL_NUM_ERROR
  END CATCH
END
GO

Una vez programado todo, la ejecución es bastante sencilla:

DECLARE @error int
EXEC rca.RellenaTablasParalelo @error out

En la tabla 'ibb.Log_Errores' aparecerán los mensajes de LOG junto con los mensajes de ERROR si los hubiese. Los mensajes son bastante largos, y contienen saltos de línea, por lo que se recomienda activar la opción de conservar CR/LF de SSMS, para poder copiarlos y pegarlos donde se puedan leer completos.

No entra dentro de los objetivos de este capítulo, pero al algoritmo que aquí se describe se le puede añadir funcnionalidad para recuperación ante desastres, es decir, recuparación de las tablas del repositorio del OLTP desde las tablas del repositorio de BI, y viceversa.