keboola / db-import-export
Package allows to import files to Snowflake from multiple cloud storages
Installs: 14 330
Dependents: 2
Suggesters: 0
Security: 0
Stars: 1
Watchers: 20
Forks: 0
Open Issues: 10
Requires
- php: ^8.1
- ext-json: *
- ext-pdo: *
- doctrine/dbal: ^3.3
- google/cloud-bigquery: ^1.23
- google/cloud-storage: ^1.27
- keboola/csv-options: ^1
- keboola/php-csv-db-import: ^6
- keboola/php-datatypes: ^7.6
- keboola/php-file-storage-utils: ^0.2.2
- keboola/php-temp: ^2.0
- keboola/table-backend-utils: >=2.7
- microsoft/azure-storage-blob: ^1.4
- symfony/process: ^4.4|^5.0|^6.0
Requires (Dev)
- keboola/coding-standard: ^15
- php-parallel-lint/php-parallel-lint: ^1.3
- phpstan/extension-installer: ^1.1
- phpstan/phpstan: ^1.4
- phpstan/phpstan-phpunit: ^1
- phpunit/phpunit: ^9
- react/async: ^4||^3
- symfony/finder: ^5.4
- dev-main
- v2.11.0
- v2.10.0
- v2.9.0
- v2.8.3
- v2.8.2
- v2.8.1
- v2.8.0
- v2.7.0
- v2.6.3
- v2.6.2
- v2.6.1
- v2.6.0
- v2.5.0
- v2.4.7
- v2.4.6
- v2.4.5
- v2.4.4
- v2.4.3
- v2.4.2
- v2.4.1
- v2.4.0
- v2.3.0
- v2.2.2
- v2.2.1
- v2.2.0
- v2.1.0
- v2.0.0
- v1.21.2
- v1.21.1
- v1.21.0
- v1.20.1
- v1.20.0
- v1.19.2
- v1.19.1
- v1.19
- v1.18.0
- v1.17.0
- v1.16.1
- v1.16.0
- v1.15.0
- v1.14.0
- v1.13.0
- v1.12.0
- v1.11.0
- v1.10.0
- v1.9.1
- v1.9.0
- v1.8.0
- v1.7.0
- v1.6.1
- v1.6.0
- v1.5.2
- v1.5.1
- v1.5.0
- v1.4.0
- v1.3.1
- v1.3.0
- v1.2.2
- v1.2.1
- v1.2.0
- v1.1.0
- v1.0.6
- v1.0.5
- v1.0.4
- v1.0.3
- v1.0.2
- v1.0.1
- v1.0.0
- v0.24.0
- v0.23.0
- v0.22.0
- v0.21.0
- v0.20.1
- v0.20
- v0.19
- v0.18
- v0.17
- v0.16.15
- v0.16.14
- v0.16.13
- v0.16.12
- v0.16.11
- v0.16.10
- v0.16.9
- v0.16.8
- v0.16.7
- v0.16.6
- v0.16.5
- v0.16.4
- v0.16.3
- v0.16.2
- v0.16.1
- v0.16.0
- v0.15.4
- v0.15.3
- v0.15.2
- v0.15.1
- v0.15.0
- v0.14.1
- v0.14.0
- v0.13.0
- v0.12.2
- v0.12.1
- v0.12.0
- v0.11.0
- v0.10.0
- v0.9.2
- v0.9.1
- v0.9.0
- v0.8.0
- v0.7.1
- v0.7.0
- v0.6.0
- v0.5.1
- v0.5.0
- v0.4.0
- v0.3.0
- v0.2.0
- v0.1.2
- v0.1.1
- v0.1
- dev-jirka-ct-1960-reactivate-exasol
- dev-jirka-ct-1424-drop-fractional-seconds
- dev-odin-KAB-312
- dev-zajca-fix-distinct-on-nonnative-tables-2
- dev-zajca-ct-1642
- dev-zajca-ct-950-ignore-columns
- dev-zajca-fix-wrong-tests
- dev-zajca-big-256
- dev-martinj-db-import-export-terraform-fix
- dev-martinj-fix-phpstan
- dev-martinj-ct-1361-default-value-for-null-conversion-in-bq
- dev-CT-933-add-release-development-branch-tag-to-able-to-require-in-other-lib-in-process-of-programming
- dev-CT-1169-put-column-definition-endpoint
- dev-jirka-1271-new-command
- dev-jirka-ct-1271-add-alter-command
- dev-jirka-ct-1331-add-protobuf-for-table-info-in-preview
- dev-zajca-ct-1301
- dev-erik-metadata-backend
- dev-BIG-208-create-table-definition-sql-injection
- dev-BIG-207-bq-import-user-exception-wrong-timestamp-format
- dev-jirka-big-193-convert-load-exception
- dev-martin-GCP-445
- dev-zajca-tag.php
- dev-backup-cache
- dev-CT-905-dont-run-build-if-create-tag-on-master-branch
- dev-adamvyborny-CM-727-php-datatypes-oracle
- dev-martin-build-ecr
- dev-big-160-update-common
- dev-jirka-big-167-too-many-requests-exception
- dev-zajca-big-171
- dev-zajca-big-185
- dev-zajca-big-169
- dev-zajca-big-170
- dev-zajca-ct-1128-1
- dev-big-153-runtime-options
- dev-zajca-ct-1118-no-bc
- dev-revert-84-zajca-ct-1118
- dev-big-160
- dev-zajca-ct-1118
- dev-zajca-BIG-155-ASCII
- dev-BIG-126-external-buckets
- dev-BIG-126-external-buckets-2
- dev-big-153-roman-improve-type-hint
- dev-jirka-ct-1084-add-table-type-bq-td
- dev-jirka-ct-910-external-tables
- dev-zajca-BIG-157
- dev-zajca-big-142
- dev-zajca-fix-zero-length
- dev-zajca-new-err-code
- dev-zajca-kbc-1003
- dev-CM-569-ondra
- dev-jirka-ct-924-re-enable-exasol-start-stop
- dev-disable-td
- dev-zajca-CT-666-snflk-null
- dev-php81
- dev-CT-950-ignore_timestamp
- dev-roman-finish-release
- dev-ct-835-fixx-export-null
- dev-ct-843-fix-numeric-value-is-empty-string
- dev-CT-843-null-import
- dev-PST-631_SNFLK-add-missing-types
- dev-roman-add-release-phase
- dev-add-ie-lib-repo
- dev-odbc-test
- dev-roman-hackaton-parquet
- dev-zajca-synapse-checksum
- dev-zajca-kbc-2902
- dev-zajca-tmp-deps
- dev-roman-kbc-2798-table-create
- dev-zajca-exa-7.1
- dev-zajca-debug-exa
- dev-zajca-kbc-1668
- dev-zajca-php74
- dev-zajca-kbc-1560
- dev-zajca-kbc-1516
- dev-zajca-kbc-1224
- dev-zajca-kbc-1251
- dev-zajca-optimize-file-to-table
- dev-azure-pipelines
- dev-zajca-synapse-polybase
- dev-zajca-fix-less-insert
- dev-zajca-kbc-1058
- dev-zajca-kbc-1032
- dev-zajca-synapse-temp-table
- dev-zajca-fix-ns
- dev-zajca-kbc-799
- dev-zajca-fix-temptable-as-heap
- dev-KBC-500-spk
- dev-zajca-snflk-2.21.3
- dev-zajca-harden-escaping-export-test
- dev-zajca-run-dedup-in-transaction
- dev-zajca-remove-skipped-tests
- dev-zajca-kbc-168-synapse
- dev-zajca-kbc-168-synapse-export
- dev-zajca-update-loader
- dev-zajca-sync-dep-version
- dev-zajca-export-bind
- dev-zajca-fix-php71
- dev-zajca-s3-support
- dev-zajca-kbc-58
- dev-zajca-csv-options
- dev-zajca-readme
- dev-zajca-export-snflk-azure
- dev-zajca-7.1
- dev-zajca-genrator
- dev-zajca-switch-connection-lib
This package is auto-updated.
Last update: 2025-01-22 08:49:24 UTC
README
Supported operations
- Load/Import csv from
ABS
toSnowflake
orSynapse
- Load/Import csv from
GCS
toBigquery
- Unload/Export table from
Snowflake
orSynapse
toABS
Features
Import
- Full load - destination table is truncated before load
- Incremental load - data are merged
- Primary key dedup for all engines
- Convert empty values to NULL (using convertEmptyValuesToNull option)
Export
- Full unload - destination csv is always rewriten
Development
Docker
Prepare .env
(copy of .env.dist
) and set up AWS keys which has access to keboola-drivers
bucket in order to build this image. Also add this user to group ci-php-import-export-lib
witch will allow you to work with newly created bucket for tests.
User can be created in Dev - Main legacy
, where are also groups for keboola-drivers
and ci-php-import-export-lib
.
If you don't have access to keboola-drivers
you have to change Dockerfile.
- Comment out first stage which downloads Teradata driver and tools and supply own downloaded from Teradata site:
- Tools: https://downloads.teradata.com/download/tools/teradata-tools-and-utilities-linux-installation-package-0
- Driver: https://downloads.teradata.com/download/connectivity/odbc-driver/linux
- Change
COPY --from=td
commands in Dockerfile with copy of you local Teradata packages
Then run docker compose build
The AWS credentials have to also have access to bucket specified in AWS_S3_BUCKET
. This bucket has to contain testing data. Run docker compose run --rm dev composer loadS3
to load them up.
Preparation
Azure
- Create storage account template can be found in provisioning ABS create template
- Create container in storage account
Blob service -> Containers
note: for tests this step can be skiped container is created withloadAbs
cmd - Fill env variables in .env file
ABS_ACCOUNT_NAME=storageAccount
ABS_ACCOUNT_KEY=accountKey
ABS_CONTAINER_NAME=containerName
- Upload test fixtures to ABS
docker compose run --rm dev composer loadAbs
Google cloud storage
-
Create bucket in GCS set bucket name in .env variable
GCS_BUCKET_NAME
-
Create service account in IAM
-
In bucket permissions grant service account admin access to bucket
-
Create new service account key
-
Convert key to string
awk -v RS= '{$1=$1}1' <key_file>.json >> .env
(orcat file.json | jq -c | jq -R
) -
Set content on last line of .env as variable
GCS_CREDENTIALS
-
Upload test fixtures to GCS
docker compose run --rm dev composer loadGcs-bigquery
ordocker compose run --rm dev composer loadGcs-snowflake
(depending on backend)
SNOWFLAKE
Role, user, database and warehouse are required for tests. You can create them:
CREATE ROLE "KEBOOLA_DB_IMPORT_EXPORT"; CREATE DATABASE "KEBOOLA_DB_IMPORT_EXPORT"; GRANT ALL PRIVILEGES ON DATABASE "KEBOOLA_DB_IMPORT_EXPORT" TO ROLE "KEBOOLA_DB_IMPORT_EXPORT"; GRANT USAGE ON WAREHOUSE "DEV" TO ROLE "KEBOOLA_DB_IMPORT_EXPORT"; CREATE USER "KEBOOLA_DB_IMPORT_EXPORT" PASSWORD = 'Password' DEFAULT_ROLE = "KEBOOLA_DB_IMPORT_EXPORT"; GRANT ROLE "KEBOOLA_DB_IMPORT_EXPORT" TO USER "KEBOOLA_DB_IMPORT_EXPORT"; -- For GCS create storage integration https://docs.snowflake.com/en/user-guide/data-load-gcs-config.html#creating-a-custom-iam-role CREATE STORAGE INTEGRATION "KEBOOLA_DB_IMPORT_EXPORT" TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = GCS ENABLED = TRUE STORAGE_ALLOWED_LOCATIONS = ('gcs://<your gcs bucket>/'); -- set integration name to env GCS_INTEGRATION_NAME in .env file -- get service account id `STORAGE_GCP_SERVICE_ACCOUNT` DESC STORAGE INTEGRATION "KEBOOLA_DB_IMPORT_EXPORT"; -- continue according manual ^ in snflk documentation assign roles for Data loading and unloading
SYNAPSE
Create synapse server on Azure portal or using CLI.
set up env variables: SYNAPSE_UID SYNAPSE_PWD SYNAPSE_DATABASE SYNAPSE_SERVER
Run query:
CREATE MASTER KEY;
this will create master key for polybase.
Managed Identity
Managed Identity is required when using ABS in vnet. docs How to setup and use Managed Identity is described in docs
TLDR; In IAM of ABS add role assignment "Blob Storage Data {Reader or Contributor}" to your Synapse server principal
Exasol
You can run Exasol locally in Docker or you can use SaaS.
Exasol locally in Docker
Run Exasol on your local machine in docker (for this case .env is preconfigured)
docker compose up -d exasol
Run Exasol server somewhere else and set up env variables:
EXASOL_HOST= EXASOL_USERNAME= EXASOL_PASSWORD=
Exasol in SaaS
Login to SaaS UI (or use a local client) and create user with following grants.
CREATE USER "<nick>_ie" IDENTIFIED BY "password"; GRANT CREATE SESSION, CREATE SCHEMA, CREATE TABLE, CREATE VIEW, CREATE USER, CREATE ROLE, DROP USER, DROP ANY ROLE, GRANT ANY ROLE, ALTER ANY SCHEMA, ALTER USER, IMPORT, EXPORT TO "<nick>_ie" WITH ADMIN OPTION;
Obtain host (with port), username and password (from previous step) and fill it in .env
as desribed above. Make sure, that your account has enabled network for your IP.
Teradata
Prepare Teradata servers on AWS/Azure and set following properties. See
create new database for tests:
CREATE DATABASE <nick>_ie_lib_tests FROM dbc AS PERMANENT = 1e8, SPOOL = 1e8;
TERADATA_HOST= TERADATA_USERNAME= TERADATA_PASSWORD= TERADATA_PORT= TERADATA_DATABASE=
Bigquery
Install Google Cloud client (via Brew), initialize it and log in to generate default credentials.
To prepare the backend you can use Terraform template.
You must have the resourcemanager.folders.create
permission for the organization.
# you can copy it to a folder somewhere and make an init
terraform init
Run terraform apply
with following variables:
- folder_id: Go to GCP Resource Manager and select your team dev folder ID (e.g. find 'KBC Team Dev' and copy ID)
- backend_prefix: your_name, all resources will create with this prefix
- billing_account_id: Go to Billing and copy your Billing account ID
terraform apply -var folder_id=<folder_id> -var backend_prefix=<your_prefix> -var billing_account_id=<billing_account_id>
For missing pieces see Connection repository. After terraform apply ends go to the service project in folder created by terraform.
- convert key to string and save to
.env
file:awk -v RS= '{$1=$1}1' <key_file>.json >> .env
- set content on the last line of
.env
as variableBQ_KEY_FILE
- set env variable
BQ_BUCKET_NAME
generated from TF templatefile_storage_bucket_id
Tests
Run tests with following command.
note: azure credentials must be provided and fixtures uploaded
docker compose run --rm dev composer tests
Unit and functional test can be run sepparetly
#unit test
docker compose run --rm dev composer tests-unit
#functional test
docker compose run --rm dev composer tests-functional
Code quality check
#phplint
docker compose run --rm dev composer phplint
#phpcs
docker compose run --rm dev composer phpcs
#phpstan
docker compose run --rm dev composer phpstan
Full CI workflow
This command will run all checks load fixtures and run tests
docker compose run --rm dev composer ci
Usage
Snowflake
ABS -> Snowflake import/load
use Keboola\Db\ImportExport\Backend\Snowflake\Importer; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage; $absSourceFile = new Storage\ABS\SourceFile(...); $snowflakeDestinationTable = new Storage\Snowflake\Table(...); $importOptions = new ImportOptions(...); (new Importer($snowflakeConnection))->importTable( $absSourceFile, $snowflakeDestinationTable, $importOptions );
Snowflake -> Snowflake copy
use Keboola\Db\ImportExport\Backend\Snowflake\Importer; use Keboola\Db\ImportExport\ImportOptions; use Keboola\Db\ImportExport\Storage; $snowflakeSourceTable = new Storage\Snowflake\Table(...); $snowflakeDestinationTable = new Storage\Snowflake\Table(...); $importOptions = new ImportOptions(...); (new Importer($snowflakeConnection))->importTable( $snowflakeSourceTable, $snowflakeDestinationTable, $importOptions );
Snowflake -> ABS export/unload
use Keboola\Db\ImportExport\Backend\Snowflake\Exporter; use Keboola\Db\ImportExport\ExportOptions; use Keboola\Db\ImportExport\Storage; $snowflakeSourceTable = new Storage\Snowflake\Table(...); $absDestinationFile = new Storage\ABS\DestinationFile(...); $exportOptions = new ExportOptions(...); (new Exporter($snowflakeConnection))->exportTable( $snowflakeSourceTable, $absDestinationFile, $exportOptions );
Synapse next (experimental)
Import to Synapse
use Keboola\TableBackendUtils\Table\SynapseTableDefinition; use Keboola\TableBackendUtils\Table\SynapseTableQueryBuilder; use Keboola\Db\ImportExport\Backend\Synapse\ToStage\StageTableDefinitionFactory; use Keboola\Db\ImportExport\Storage; use Keboola\Db\ImportExport\Backend\Synapse\ToStage\ToStageImporter; use Keboola\Db\ImportExport\Backend\Synapse\SynapseImportOptions; use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\IncrementalImporter; use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\FullImporter; use Keboola\Db\ImportExport\Backend\Synapse\ToFinalTable\SqlBuilder; use Doctrine\DBAL\Connection; $importSource = new Storage\ABS\SourceFile(...); // or $importSource = new Storage\Synapse\Table(...); // or $importSource = new Storage\Synapse\SelectSource(...); $destinationTable = new SynapseTableDefinition(...); $options = new SynapseImportOptions(...); $synapseConnection = new Connection(...); $stagingTable = StageTableDefinitionFactory::createStagingTableDefinition( $destinationTable, $importSource->getColumnsNames() ); $qb = new SynapseTableQueryBuilder($synapseConnection); $synapseConnection->executeStatement( $qb->getCreateTableCommandFromDefinition($stagingTable) ); $toStageImporter = new ToStageImporter($synapseConnection); $toFinalTableImporter = new IncrementalImporter($synapseConnection); // or $toFinalTableImporter = new FullImporter($synapseConnection); try { $importState = $toStageImporter->importToStagingTable( $importSource, $stagingTable, $options ); $result = $toFinalTableImporter->importToTable( $stagingTable, $destinationTable, $options, $importState ); } finally { $synapseConnection->executeStatement( (new SqlBuilder())->getDropTableIfExistsCommand( $stagingTable->getSchemaName(), $stagingTable->getTableName() ) ); }
Internals/Extending
Library consists of few simple interfaces.
Create new backend
Importer, Exporter Interface must be implemented in new Backed
Keboola\Db\ImportExport\Backend\ImporterInterface
Keboola\Db\ImportExport\Backend\ExporterInterface
For each backend there is corresponding adapter which supports own combination of SourceInterface and DestinationInterface. Custom adapters can be set with setAdapters
method.
Create new storage
Storage is now file storage ABS|S3 (in future) or table storage Snowflake|Synapse.
Storage can have Source
and Destination
which must implement SourceInterface
or DestinationInterface
. These interfaces are empty and it's up to adapter to support own combination.
In general there is one Import/Export adapter per FileStorage <=> TableStorage combination.
Adapter must implement:
Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface
for importKeboola\Db\ImportExport\Backend\BackendExportAdapterInterface
for export
Backend can require own extended AdapterInterface (Synapse and Snowflake do now).
License
MIT licensed, see LICENSE file.