Skip to main content

FileOperation

Spark Gem

Helps perform file operations like copy and move on different file systems.

Parameters

ParameterDescriptionRequired
File SystemLocal - for operations on driver node file system
DBFS - for operations on Databricks file system
S3 - for operations on S3 object store
True
OperationOperation to perform, Copy, Move or SyncTrue
Filename RegexRegex to Filter File Names Eg: stdlog.*.txtFalse
Ignore empty filesIgnore if file size is empty (Size of file is 0 bytes)False
RecurseBoolean for performing Operation recursively. Default is FalseFalse
Source PathPath of source file/directory.
Eg: /dbfs/source_file.txt, dbfs:/source_file.txt, s3://source_bucket/source_prefix/filename.txt
True
Destination PathPath of destination file/directory.
Eg: /dbfs/target_file.txt, dbfs:/target_file.txt, s3://target_bucket/target_prefix/filename.txt
True
info

You can perform operations on DBFS files using Local file system too by providing path under /dbfs!
This is because Databricks uses a FUSE mount to provide local access to the files stored in the cloud. A FUSE mount is a secure, virtual filesystem.

Examples


Copy Single File

def copy_file(spark: SparkSession):
from pyspark.dbutils import DBUtils
DBUtils(spark).fs.cp(
"dbfs:/Prophecy/example/source/person.json",
"dbfs:/Prophecy/example/target/person.json",
recurse = False
)

Copy All Files From A Directory

def copy_file(spark: SparkSession):
from pyspark.dbutils import DBUtils
DBUtils(spark).fs.cp(
"dbfs:/Prophecy/example/source/",
"dbfs:/Prophecy/example/target/",
recurse = True
)

Move Files

def move_file(spark: SparkSession):
from pyspark.dbutils import DBUtils
DBUtils(spark).fs.mv("dbfs:/Prophecy/example/source/", "dbfs:/Prophecy/example/target/", recurse = False)


S3 - Sync Entire Directory

def sync_file(spark: SparkSession):
dest_files = set(
[
f_object['Key'].lstrip('/')
for f_object in boto3.client("s3").list_objects_v2(Bucket = dest_bucket, Prefix = dest_url.path.lstrip('/'))['Contents']
if not f_object['Key'].endswith("/")
]
)

for obj in boto3.client("s3").list_objects_v2(Bucket = src_bucket, Prefix = src_url.path.lstrip('/'))['Contents']:
new_dest_prefix = re.sub(src_prefix, dest_prefix, obj['Key'], 1)

if (
(
mode in ["copy", "move"]
and not obj['Key'].endswith("/")
)
or (
not obj['Key'].endswith("/")
and mode == "sync"
and re.sub(src_prefix, dest_prefix, obj['Key'], 1) not in dest_files
)
):

if (
(
bool(ignoreEmptyFiles) == True
and (
s3.head_object(Bucket=src_bucket, Key=obj['Key'])['ContentLength']
== 0
)
)
or (
bool(fileRegex)
and fileRegex != ""
and not bool(re.compile(fileRegex).match(obj['Key'].split('/')[- 1]))
)
):
continue

s3.copy(
{'Bucket' : src_bucket, 'Key' : obj['Key']},
dest_bucket,
re.sub(src_prefix, dest_prefix, obj['Key'], 1)
)

if mode == "move":
s3.delete_object(Bucket = src_bucket, Key = obj['Key'])