Getting towards the end of our CRM Data Lake integration, we are down to the nitty gritty. The nitty gritty is actually quite a bit of grit when you are moving SaaS style servers with hundreds of client databases, all similar but hiding differences one wouldn't expect.
Imagine the sand in your car's floor mats after a day at the beach. That is the grit: user defined fields, user defined tables, other customizations or repurposing of fields--you name it, I've seen it. The most recent of these differences centered around the use, non-use, or sporadic use of columns of the type 'Date.' Date fields are just that, a date with no time signature, 2022-07-14. In our system we allow clients to create templates and add their own, custom fields (UDF's), of which one type is a date. We also offer timestamps, boolean, varchar, and a host of other MySQL data types for use in these custom templates. This is what allows our SaaS model to succeed when each business has different needs.
If you recall from previous posts and their associated diagrams, we store the data in our S3 based lake in Apache Parquet format. This format is readily consumed by a few latter steps in the data movement, as well as Machine Learning systems, so it was a wise choice for us all around. If you've ever explored Parquet files, you might know that they store dates and timestamps as integers; dates are stored as the number of days from the unix date, 1970-01-01, and timestamps as the number of seconds from unix time, 1970-01-01 00:00:00. This normally does not pose a problem for Spark as it will interpret the data on ingest and perform the conversion for you, however, if your column is sporadically used, you might run into a sampling issue.
Within AWS glue you are often creating Dynamic Frames from the Glue catalog or from other options, as we are in this case, sourcing from S3. The code will look something like this:
dyf = glueContext.create_dynamic_frame.from_options(
connection_type='s3',
connection_options={'path': path, 'recurse': True},
format='parquet',
transformation_ctx='dynamic_frame')
This particular way of bringing data in does not allow for a Sampling Ratio in which you could (in theory this could be quite hairy if the data is large, so be cautious) have the entire dataset scanned before determining the data type. Spark is generally pretty good about inferring a data type so you don't have to include a schema, which was optimal in our case because we are processing hundreds of tables with the same script(s). We also didn't want the liability of keeping a mapping document up to date--you know how that goes when one team doesn't inform the other of changes: Boom, broken data movement. We also need to account for those UDF's I mentioned above which the client's can change as they see fit or need.
For AWS and Dynamic Frames, the sampling ratio option is only available when reading in data from an RDD, which also holds true for spark's CreateDataFrame function. This proved to be problematic for us as this sparsely populated date field was being read in as an Integer, which you cannot cast to Date as is. You could bring in the entire data frame, create a view, then use SQL to manipulate the rows but this is a costly action if your data is of any size. A better option, and something native to Spark, is simply to alter all values for the column, casting to the desired type in the process. It is very handy and surprisingly fast even on larger datasets.
As I mentioned before, we are dealing with hundreds of tables, all with different schemas. What this meant was that we needed, for each table, to go out and get the schema from the source, read in the data frame, then map each column to what the source says it should be. This allowed us to correct the date field's issue, mapping the column from an int to a date, during which we convert the integer number of days from the unix date to the proper date for the data. Confusing? Don't worry, it took us a minute to sort it out too. Here is the basic idea:
# Define the function to get the desired/known ddl
def get_ddl(database, tbl):
athena_table_name = 'mysql_{}'.format(tbl)
stmt_ddl = "select COLUMN_NAME, COLUMN_NAME, CASE WHEN DATA_TYPE = 'integer' THEN 'INT' WHEN DATA_TYPE = 'text' THEN 'STRING' WHEN DATA_TYPE = 'varchar' THEN 'STRING' ELSE DATA_TYPE END AS DATA_TYPE from INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}'".format(database, athena_table_name)
dfs = wr.athena.read_sql_query(stmt_ddl, database=database, ctas_approach=False, s3_additional_kwargs={'ServerSideEncryption': 'aws:kms','SSEKMSKeyId': kmsKey}, s3_output=queryFolder)
records = dfs.to_records(index=False)
sf_ath_ddl = [eval(str(item)) for item in records]
return ddl
# Define your parsing function
def data_parsing(df, ddl):
for obj in ddl:
colname = obj[0]
coltype = obj[2]
try:
df_type = [dtype for name, dtype in df.dtypes if colname.lower() == name.lower()][0]
except IndexError as c:
logger.info('The {} column is not recognized in the dataframe, skipping! {}'.format(str(obj), str(c)))
continue
logger.info('Column info: {}, {}, {}'.format(colname, coltype, df_type))
if df_type.lower() == 'int' and coltype.lower() == 'date':
logger.info('Parquet Date was misrepresented as Integer. Correcting {} from {} to {}'.format(colname, df_type.lower(), coltype.lower()))
df = df.withColumn(colname, F.from_unixtime(col(colname)*86400, "yyyy-MM-dd").cast("date"))
return df
# Create your dynamic frame
dyf=glueContext.create_dynamic_frame.from_options(connection_type='s3',
connection_options={'path': path, 'recurse': True},
format='parquet',
transformation_ctx='df_inc')
# Convert it to a Spark data frame
df_prep = dyf.toDF()
# Utilize the functions to parse the data to the correct format
ddl = get_ddl(database, tbl)
df_final = data_parsing(df_prep, ddl)
Please note, we are using the open source package 'aws_wrangler' to access Athena as the Curated realm of our Data Lake is not part of the glue catalog at this time; We are currently storing only the Raw realm tables within the AWS Glue data catalog.
The use of withColumn allows us to convert the data on the fly, and the mapping ensures the data frame uses the correct type for the column. For whatever reason, this issue did not occur in timestamps or other types where the column was infrequently populated, more often than not NULL. It is my belief that there is something about the date integer being stored as number of days vs seconds, or perhaps the sample size taken from the data frame used to infer the type. Interestingly, if we left the column entirely NULL in our testing, i.e. no use at all, it would still be inferred as an Integer, however, if the column is 99% populated, or some other more frequent percentage of use, the correct data type, Date, was inferred.
I am honestly still a little baffled by this one and I couldn't really find any examples where other folks had run into the same issue. We posed the question to AWS support as well as another vendor linked to this process before we totally understood the source of the issue--neither could come up with a solution but both could replicate the issue. I would love to hear from anyone who has dealt with something similar to know if there are other data types where infrequent or non-use results in incorrect type inference--if you have seen this, please reach out! I would love to swap testing stories.
Until next time, toodles from California. More on that later!
Commenti