Automate your pipelines as soon as and for all.
An upsert is a SQL assertion that handles each inserts and updates on a desk, often in bulk. It requires a vacation spot desk (the one you’re updating), and a desk or file of origin (the one which comprises more energizing data).
On this article, we’ll have a look at how one can carry out an upsert onto a Snowflake desk while you’ve obtained a Pandas DataFrame in Python. All of it works across the
MERGE command in Transact-SQL, for which you will discover additional documentation here.
I’ll first break it down step-by-step, after which convey all of it collectively ultimately. You’ll be able to skip the steps should you simply need to learn the code and deduce-adapt.
Create a JSON file
The very first thing you’ll need to do is to save lots of your DataFrame as a file Snowflake can stage. CSV clearly looks as if a go-to answer, however JSON is a safer selection to your information varieties. Any quantity you’re storing as a string (cellphone numbers being the principle motive I deserted CSV for this) will find yourself as an integer or a float, probably written in scientific notation—which is completely the very last thing we’d like on this home.
To make sure every part runs easily, there are three circumstances your JSON file should meet:
- it should be record-oriented,
- data should be saved in traces (versus an inventory),
- date varieties should be saved to the second (versus milli- or nanosecond).
To fulfill all of these, you need to use the next line of code:
This offers you a stupendous, line-by-line JSON file of your desk, which ought to appear like this:
Create a stage for JSON information on Snowflake
A stage is a short lived storage location for information containing information we have to copy right into a desk. Ideally, you’ll need to create it in the identical database and schema because the desk to which you’re making an attempt upsert your JSON file.
To create a becoming stage, you solely have to arrange your context accurately on a Snowflake worksheet, after which run this line:
CREATE OR REPLACE STAGE my_stage_name FILE_FORMAT=(TYPE=’JSON’);
Stage your JSON file
This requires just one command, which you’ll both run from a snowsql session in your terminal, or you may run in Python when you’ve arrange your Snowflake connection (code given on the finish).
PUT file:///path/to/json_filename @my_stage_name OVERWRITE=TRUE;
Run the MERGE
Merging a staged file right into a desk isn’t very tough—it’s simply the syntax that’s a bit on the market. It’s essential preface all of your columns with
$1: when referring to a “column” in your JSON file.
You’ll be able to observe this instance, the place column A is akin to a major key, columns B and C comprise data vulnerable to alter (like a “final up to date” timestamp—which is why we replace them when the merge has discovered a match on column A), and column D comprises data value inserting however not updating (like a date of beginning).
Within the automation to return, I’ll discuss with the likes of column A as “ID columns”, and to columns like B and C as “replace columns”.
Code within the following perform will soak up:
- your DataFrame
id_columns(checklist of columns that constitutes a major key—could be a checklist of 1 should you’ve obtained an ID readily available)
insert_columns(checklist of all of the columns you’ll need to insert for unmatched rows)
update_columns(checklist of all of the columns you would possibly need to replace—see instance above)
- your context (database, schema, desk, and stage names)
and run all steps talked about above minus the stage creation.
It opens a reference to Snowflake on traces 7 by way of 16, creates a JSON file traces 21 by way of 23, levels the file line 28, runs the merge traces 29 by way of 36, removes the file from the stage line 38, and deletes your native file, created just for the aim of staging, line 40. Lastly, it closes your Snowflake connection correctly on line 43. Traces 4, 18, and 41 present some perception because the perform runs.
After all, you’ll want the Snowflake connector for Python to arrange the connection.