Robust Azure ETLs with Python

Microsoft Azure cloud computing platform faces criticism due to its Python API being little customisable and poorly documented. Still it is a popular choice for many companies, thus data scientists need to squeeze maximum of features and performance out of it rather than complain. Below I am sharing thoughts on creating robust Extract-Transform-Load processes in this setup.

Extract from database: consumer-producer!

The key trick is to enable the consumer-producer pattern: you want to process your data in batches, for the sake of robustness and efficiency. The popular pandas library dedicated for tabular data is not enough for this task, as it hinders useful features. Gain more control using dedicated database drivers, e.g. psycogp driver for the popular Postgres database.

import psycopg2

dsn = f'user={db_user} password={db_password} dbname={db_name} host={db_host}'
conn = psycopg2.connect(dsn)

query = """
    SELECT *
    FROM sales
    WHERE date > '2021'
    """


def process_rows(row_group):
    '''do your stuff, e.g. filter and append to a file'''
    pass


n_prefetch = 10000


# mind the server-side cursor! 
with conn.cursor(name='server_side_cursor') as cur:
    cur.itersize = n_prefetch
    cur.execute(query)
    while True:
        row_group = cur.fetchmany(n_prefetch)
        if len(row_group) > 0:
            process_rows(row_group)
        else:
            break

Cast datatypes

Cast datatypes early and explicitly having these point in mind

Best to do this casting upstream, adapting at the database driver level, like in this example:

import psycopg2

# cast some data types upon receiving from database
datetype_casted = psycopg2.extensions.new_type(
    psycopg2.extensions.DATE.values, "date", psycopg2.DATETIME
)
psycopg2.extensions.register_type(datetype_casted)
decimal_casted = psycopg2.extensions.new_type(
    psycopg2.extensions.DECIMAL.values, "decimal", psycopg2.extensions.FLOAT
)
psycopg2.extensions.register_type(decimal_casted)

Use Parquet to store tabular data

The Apache Parquet is invaluable to efficient work with large data in tabular format. There are two major drivers for Pyhon: pyarrow and fastparquet. The first one can be integrated with Azure data flows (e.g. you can stream and filter data), although it has been limited in supporting more sophisticated data such as timedelta. Remember that writing Parquet incurs memory overhead so better to do this in batches. The relevant code may look as below:

def sql_to_parquet(
    conn, query, column_names, target_dir, n_prefetch=1000000, **parquet_kwargs
):
    """Writes the result of a SQL query to a Parquet file (in chunks).

    Args:
        conn: Psycopg connection object (must be open)
        query: SQL query of "select" type
        column_names: column names given to the resulting SQL table
        target_dir: local directory where Parquet is written to; must exist, data is overwritten
        n_prefetch: chunk of SQL data processed (read from SQL and dumped to Parquet) at a time. Defaults to 1000000.
    """
    with conn.cursor(name="server_side_cursor") as cur:
        # start query
        cur.itersize = n_prefetch
        cur.execute(query)
        # set up consumer
        chunk = 0
        # consume until stream is empty
        while True:
            # get and process one batch
            row_group = cur.fetchmany(n_prefetch)
            chunk += 1
            if len(row_group) > 0:
                out = pd.DataFrame(data=row_group, columns=column_names)
                fname = os.path.join(target_dir, f"part_{chunk:04d}.parquet")
                out.to_parquet(fname, engine="pyarrow", **parquet_kwargs)
            else:
                break


def df_to_parquet(df, target_dir, chunk_size=100000, **parquet_kwargs):
    """Writes pandas DataFrame to parquet format with pyarrow.

    Args:
        df: pandas DataFrame
        target_dir: local directory where parquet files are written to
        chunk_size: number of rows stored in one chunk of parquet file. Defaults to 100000.
    """
    for i in range(0, len(df), chunk_size):
        slc = df.iloc[i : i + chunk_size]
        chunk = int(i / chunk_size)
        fname = os.path.join(target_dir, f"part_{chunk:04d}.parquet")
        slc.to_parquet(fname, engine="pyarrow", **parquet_kwargs)

Leverage Azure API properly!

The Azure API is both cryptic in documentation and under-explained in data science blogs. Below I am sharing a comprehensive receipt on how to upload and register Parquet data with minimum effort. While uploading makes the data persist, registering enables further Azure features (such as pipe-lining). Best to dump your Parquet file to a directory (large Parquet files should be chunked) with tempfile then use the Azure Dataset class to both upload to the storage (use universally unique identifier for reproducibility/avoiding path collisions) and register to the workspace (use Tabular subclass). Use classes Workspace and Datastore to facilitate interaction with Azure.

from azureml.core import Workspace, Dataset
import tempfile
from uuid import uuid4
import psycopg2

ws = Workspace.from_config()
dstore = ws.datastores.get("my_datastore")

with tempfile.TemporaryDirectory() as tempdir:
    sql_to_parquet(conn, query, query_cols, tempdir)
    target = (dstore, f"raw/{tab}/{str(uuid4())}")
    Dataset.File.upload_directory(tempdir, target)
    ds = Dataset.Tabular.from_parquet_files(target)
    ds.register(
        ws,
        name=f"dataset_{tab}",
        description="created from Sales table",
        tags={"JIRA": "ticket-01"},
        create_new_version=True,
    )

Dodaj komentarz

Twój adres e-mail nie zostanie opublikowany.