FileOperation
Spark Gem
Helps perform file operations like copy
and move
on different file systems.
Parameters
Parameter | Description | Required |
---|---|---|
File System | Local - for operations on driver node file system DBFS - for operations on Databricks file system S3 - for operations on S3 object store | True |
Operation | Operation to perform, Copy , Move or Sync | True |
Filename Regex | Regex to Filter File Names Eg: stdlog.*.txt | False |
Ignore empty files | Ignore if file size is empty (Size of file is 0 bytes) | False |
Recurse | Boolean for performing Operation recursively. Default is False | False |
Source Path | Path of source file/directory. Eg: /dbfs/source_file.txt, dbfs:/source_file.txt, s3://source_bucket/source_prefix/filename.txt | True |
Destination Path | Path of destination file/directory. Eg: /dbfs/target_file.txt, dbfs:/target_file.txt, s3://target_bucket/target_prefix/filename.txt | True |
info
You can perform operations on DBFS files using Local
file system too by providing path under /dbfs
!
This is because Databricks uses a FUSE mount to provide local access to the files stored in the cloud. A FUSE mount is a secure, virtual filesystem.
Examples
Copy Single File
- DBFS
- Local
- S3
def copy_file(spark: SparkSession):
from pyspark.dbutils import DBUtils
DBUtils(spark).fs.cp(
"dbfs:/Prophecy/example/source/person.json",
"dbfs:/Prophecy/example/target/person.json",
recurse = False
)
def copy_file(spark: SparkSession):
import os
import shutil
shutil.copy2("/dbfs/Prophecy/example/source/person.json",
"/dbfs/Prophecy/example/target/person.json")
def copy_file(spark: SparkSession):
for obj in boto3.client("s3").list_objects_v2(Bucket = src_bucket, Prefix = src_url.path.lstrip('/'))['Contents']:
new_dest_prefix = re.sub(src_prefix, dest_prefix, obj['Key'], 1)
if (
(
mode in ["copy", "move"]
and not obj['Key'].endswith("/")
)
or (
not obj['Key'].endswith("/")
and mode == "sync"
and re.sub(src_prefix, dest_prefix, obj['Key'], 1) not in dest_files
)
):
if (
(
bool(ignoreEmptyFiles) == True
and (
s3.head_object(Bucket=src_bucket, Key=obj['Key'])['ContentLength']
== 0
)
)
or (
bool(fileRegex)
and fileRegex != ""
and not bool(
re.compile(fileRegex).match(obj['Key'].split('/')[- 1])
)
)
):
continue
s3.copy(
{'Bucket' : src_bucket, 'Key' : obj['Key']},
dest_bucket,
re.sub(src_prefix, dest_prefix, obj['Key'], 1)
)
if props.operation == "move":
s3.delete_object(Bucket = src_bucket, Key = obj['Key'])
Copy All Files From A Directory
- DBFS
- Local
- S3
def copy_file(spark: SparkSession):
from pyspark.dbutils import DBUtils
DBUtils(spark).fs.cp(
"dbfs:/Prophecy/example/source/",
"dbfs:/Prophecy/example/target/",
recurse = True
)
def copy_file(spark: SparkSession):
import os
import shutil
shutil.copytree(
"/dbfs/Prophecy/example/source/",
"/dbfs/Prophecy/example/target/",
copy_function = shutil.copy2,
dirs_exist_ok = True
)
def copy_file(spark: SparkSession):
for obj in boto3.client("s3").list_objects_v2(Bucket = src_bucket, Prefix = src_url.path.lstrip('/'))['Contents']:
new_dest_prefix = re.sub(src_prefix, dest_prefix, obj['Key'], 1)
if (
(
mode in ["copy", "move"]
and not obj['Key'].endswith("/")
)
or (
not obj['Key'].endswith("/")
and mode == "sync"
and re.sub(src_prefix, dest_prefix, obj['Key'], 1) not in dest_files
)
):
if (
(
bool(ignoreEmptyFiles) == True
and (
s3.head_object(Bucket=src_bucket, Key=obj['Key'])['ContentLength']
== 0
)
)
or (
bool(fileRegex)
and fileRegex != ""
and not bool(
re.compile(fileRegex).match(obj['Key'].split('/')[- 1])
)
)
):
continue
s3.copy(
{'Bucket' : src_bucket, 'Key' : obj['Key']},
dest_bucket,
re.sub(src_prefix, dest_prefix, obj['Key'], 1)
)
if props.operation == "move":
s3.delete_object(Bucket = src_bucket, Key = obj['Key'])
Move Files
- DBFS
- Local
- S3
def move_file(spark: SparkSession):
from pyspark.dbutils import DBUtils
DBUtils(spark).fs.mv("dbfs:/Prophecy/example/source/", "dbfs:/Prophecy/example/target/", recurse = False)
def move_file(spark: SparkSession):
import os
import shutil
shutil.copy2("/Prophecy/example/source/", "/Prophecy/example/target/")
shutil.rmtree("/Prophecy/example/source/")
def move_file(spark: SparkSession):
for obj in boto3.client("s3").list_objects_v2(Bucket = src_bucket, Prefix = src_url.path.lstrip('/'))['Contents']:
new_dest_prefix = re.sub(src_prefix, dest_prefix, obj['Key'], 1)
if (
(
mode in ["copy", "move"]
and not obj['Key'].endswith("/")
)
or (
not obj['Key'].endswith("/")
and mode == "sync"
and re.sub(src_prefix, dest_prefix, obj['Key'], 1) not in dest_files
)
):
if (
(
bool(ignoreEmptyFiles) == True
and (
s3.head_object(Bucket=src_bucket, Key=obj['Key'])['ContentLength']
== 0
)
)
or (
bool(fileRegex)
and fileRegex != ""
and not bool(re.compile(fileRegex).match(obj['Key'].split('/')[- 1]))
)
):
continue
s3.copy(
{'Bucket' : src_bucket, 'Key' : obj['Key']},
dest_bucket,
re.sub(src_prefix, dest_prefix, obj['Key'], 1)
)
if mode == "move":
s3.delete_object(Bucket = src_bucket, Key = obj['Key'])
S3 - Sync Entire Directory
- S3
def sync_file(spark: SparkSession):
dest_files = set(
[
f_object['Key'].lstrip('/')
for f_object in boto3.client("s3").list_objects_v2(Bucket = dest_bucket, Prefix = dest_url.path.lstrip('/'))['Contents']
if not f_object['Key'].endswith("/")
]
)
for obj in boto3.client("s3").list_objects_v2(Bucket = src_bucket, Prefix = src_url.path.lstrip('/'))['Contents']:
new_dest_prefix = re.sub(src_prefix, dest_prefix, obj['Key'], 1)
if (
(
mode in ["copy", "move"]
and not obj['Key'].endswith("/")
)
or (
not obj['Key'].endswith("/")
and mode == "sync"
and re.sub(src_prefix, dest_prefix, obj['Key'], 1) not in dest_files
)
):
if (
(
bool(ignoreEmptyFiles) == True
and (
s3.head_object(Bucket=src_bucket, Key=obj['Key'])['ContentLength']
== 0
)
)
or (
bool(fileRegex)
and fileRegex != ""
and not bool(re.compile(fileRegex).match(obj['Key'].split('/')[- 1]))
)
):
continue
s3.copy(
{'Bucket' : src_bucket, 'Key' : obj['Key']},
dest_bucket,
re.sub(src_prefix, dest_prefix, obj['Key'], 1)
)
if mode == "move":
s3.delete_object(Bucket = src_bucket, Key = obj['Key'])