"""
Databricks supports data ingestion of local files via a cloud staging location.
Ingestion commands will work on DBR >13.2 And you must include a staging_allowed_local_path kwarg when
calling sql.connect().
Use databricks-sql-connector to PUT files into the staging location where Databricks can access them:
put '/path/to/local/data.csv' into
'/volumes/test_catalog/test_schema/test_volume/examples/sales.csv' [overwrite]
and you can delete with a REMOVE command:
REMOVE '/Volumes/test_catalog/test_schema/test_volume/examples/sales.csv'
Ingestion queries are passed to cursor.execute() like any other query. For PUT commands, a local file
will be read. For security, this local file must be contained within, or descended from, a
staging_allowed_local_path of the connection.
After file is written to staging location, you can issue COPY INTO commands to loads the file into a
delta table:
COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
To run this script:
1. Set the environment variables for secrets:
export DATABRICKS_HOST=********.databricks.com
export DATABRICKS_HTTP_PATH=/sql/1.0/endpoints/****************
export DATABRICKS_TOKEN=dapi********************************
2. Set the CATALOG, SCHEMA, VOLUME and TABLE constants to your catalog, schema, volume and table.
3. Set the FILEPATH constant to the path of a file that will be uploaded (this example assumes its a CSV
file)
4. Run this file
"""
CATALOG = "test_catalog"
SCHEMA = "test_schema"
TABLE = "test_table"
VOLUME = "test_volume"
FILEPATH = "example.csv"
# FILEPATH can be relative to the current directory.
# Resolve it into an absolute path
_complete_path = os.path.realpath(FILEPATH)
if not os.path.exists(_complete_path):
# It's easiest to save a file in the same directory as this script. But any path to a file will
work.
raise Exception(
"You need to set FILEPATH in this script to a file that actually exists."
)
# Set staging_allowed_local_path equal to the directory that contains FILEPATH
staging_allowed_local_path = os.path.split(_complete_path)[0]