Developer

Files

Files can in any of the following formats:

  • Basic formats: Json, Text, Delimited (csv)
  • Big data formats: Parquet, Orc, Delta
  • Legacy formats: fixed formats - ebcdic, cobol (Enterprise Edition only)

File Sources

We support reading/writing files from DBFS. It is very common to read files from Amazon S3 or Azure Data Lake Storage. We recommend mounting to DBFS. Here are the documentation links

Example

Here is the Amazon S3 mechanism as an example:

# python
aws_bucket_name = "<aws-bucket-name>"
mount_name = "<mount-name>"
dbutils.fs.mount("s3a://%s" % aws_bucket_name, "/mnt/%s" % mount_name)
display(dbutils.fs.ls("/mnt/%s" % mount_name))
# python
access_key = dbutils.secrets.get(scope = "aws", key = "aws-access-key")
secret_key = dbutils.secrets.get(scope = "aws", key = "aws-secret-key")
encoded_secret_key = secret_key.replace("/", "%2F")
aws_bucket_name = "<aws-bucket-name>"
mount_name = "<mount-name>"

dbutils.fs.mount("s3a://%s:%s@%s" % (access_key, encoded_secret_key, aws_bucket_name), "/mnt/%s" % mount_name)
display(dbutils.fs.ls("/mnt/%s" % mount_name))

.

Delimited Files (CSVs)

Delimited files are often called Comma Separated Files (CSVs) and come from various sources including hand written Excel files for test data. Here is an example file to read

customer_id,first_name,last_name,phone,email,country_code,account_open_date,account_flags
1,Griz,Roubeix,761-351-2493,groubeix0@yellowpages.com,RU,2019-07-20,E
2,Carleen,Eles,480-128-4143,celes1@usa.gov,CN,2019-10-21,E
3,Marjy,Della,328-629-6853,mdella2@vimeo.com,NO,2019-05-04,E
4,Osborne,MacAdam,473-395-4702,omacadam3@sbwire.com,ID,2019-10-19,C
5,Roselia,Trethewey,439-680-3415,rtrethewey4@github.com,AR,2020-02-19,E
6,Cassy,Gainsford,358-877-6530,cgainsford5@vinaora.com,BH,2019-04-20,E
customer_id first_name last_name phone email country_code account_open_date account_flags
1 Griz Roubeix 761-351-2493 groubeix0@yellowpages.com RU 2019-07-20 E
2 Carleen Eles 480-128-4143 celes1@usa.gov CN 2019-10-21 E
3 Marjy Della 328-629-6853 mdella2@vimeo.com NO 2019-05-04 E
4 Osborne MacAdam 473-395-4702 omacadam3@sbwire.com ID 2019-10-19 C
5 Roselia Trethewey 439-680-3415 rtrethewey4@github.com AR 2020-02-19 E
6 Cassy Gainsford 358-877-6530 cgainsford5@vinaora.com BH 2019-04-20 E

We note the following:

  • One row in the file represents one row of data
  • In one row, multiple values (columns) are separated by a delimiter - comma (,) in this case
  • The names of columns are in the first row
  • Data types of various columns (schema) can be inferred from the data itself

Visual Summary

Here is an example of steps to be followed for reading the CSV file

  • Add Source component
  • Click Create Dataset (Dataset represents any data in Prophecy)
    • Select File and CSV: Since we want to read a CSV file, we’ll choose this option
    • Add Name: Let’s pick the name customer since this represents customer data
    • Add Path: Now, we browse to the file we want and click on it
    • Check Properties: Since we want to infer schema from data and first row is header and the column delimiter is comma, we’ll pick these options
    • Click Infer: Click infer to read the file, infer the schema and see some rows of data to confirm that the file was read correctly
    • Click Create: And we’re done. This Source component is reading the new customer dataset. The created dataset can be read from any workflow in the project.

Generated Code

Once you’re happy with the source component in the visual graph, you can click the Code Editor to see the generated code, and modify it if required. Please note that here you’ll be able to see code for all Fabrics, so that the same workflow reads the dataset seamlessly through your transition from Dev to Integration and Production environments.

Note: We capture the schema explicitly, it helps enforce the schema across various physical datasets. It also means that you can edit workflows without running it on a cluster, while still having schema present.

@Visual(id = "Customers", label = "Customers", x = 59, y = 221, phase = 0)
object Customers {

  @UsesDataset(id = "25", version = 0)
  def apply(spark: SparkSession): Source = {
    import spark.implicits._

    val fabric = Config.fabricName

    val out = fabric match {
      case "default" =>
        val schemaArg = StructType(
          Array(
            StructField("customer_id",       IntegerType,   true),
            StructField("first_name",        StringType,    true),
            StructField("last_name",         StringType,    true),
            StructField("phone",             StringType,    true),
            StructField("email",             StringType,    true),
            StructField("country_code",      StringType,    true),
            StructField("account_open_date", TimestampType, true),
            StructField("account_flags",     StringType,    true)
          )
        )
        spark.read
          .format("csv")
          .option("header", true)
          .option("sep",    ",")
          .schema(schemaArg)
          .load("dbfs:/path_here/CustomersDatasetInput.csv")
          .cache()
      case _ => throw new Exception(s"The fabric '$fabric' is not handled")
    }

    out

  }

}
    

# Coming soon!
    

Other Files

Other file formats can be read similarly, the visual interface is intuitive and the following section has details of the available properties.

Note: For JSON files, you’ll often get a nested schema and you can use Flatten Schema component to convert it to a flat table

File Properties

Here are the properties you can use for various file formats.

Property Default Description
sep , sets a single character as a separator for each field and value
encoding UTF-8 decodes the CSV files by the given encoding type
quote " sets a single character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you need to set not null but an empty string. This behaviour is different from com.databricks.spark.csv
escape \ sets a single character used for escaping quotes inside an already quoted value
charToEscapeQuoteEscaping escape or \0 sets a single character used for escaping the escape for the quote character. The default value is escape character when escape and quote characters are different, \0 otherwise
comment empty string sets a single character used for skipping lines beginning with this character. By default, it is disabled
header false uses the first line as names of columns.
enforceSchema true If it is set to true, the specified or inferred schema will be forcibly applied to datasource files, and headers in CSV files will be ignored. If the option is set to false, the schema will be validated against all headers in CSV files in the case when the header option is set to true. Field names in the schema and column names in CSV headers are checked by their positions taking into account spark.sql.caseSensitive. Though the default value is true, it is recommended to disable the enforceSchema option to avoid incorrect results
inferSchema false infers the input schema automatically from data. It requires one extra pass over the data
samplingRatio 1.0 defines fraction of rows used for schema inferring
ignoreLeadingWhiteSpace false a flag indicating whether or not leading whitespaces from values being read should be skipped
ignoreTrailingWhiteSpace false a flag indicating whether or not trailing whitespaces from values being read should be skipped
nullValue empty string sets the string representation of a null value. Since 2.0.1, this applies to all supported types including the string type.
emptyValue empty string sets the string representation of an empty value.
nanValue NaN sets the string representation of a non-number" value.
positiveInf Inf sets the string representation of a positive infinity value.
negativeInf -Inf sets the string representation of a negative infinity value.
dateFormat yyyy-MM-dd sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type.
timestampFormat yyyy-MM-dd’T’HH:mm:ss.SSSXXX sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.
maxColumns default 20480 defines a hard limit of how many columns a record can have.
maxCharsPerColumn -1 defines the maximum number of characters allowed for any given value being read. By default, it is -1 meaning unlimited length
mode PERMISSIVE allows a mode for dealing with corrupt records during parsing. It supports the following case-insensitive modes.
PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets other fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. A record with less/more tokens than schema is not a corrupted record to CSV. When it meets a record having fewer tokens than the length of the schema, sets null to extra fields. When the record has more tokens than the length of the schema, it drops extra tokens.
DROPMALFORMED : ignores the whole corrupted records.
FAILFAST : throws an exception when it meets corrupted records.
columnNameOfCorruptRecord spark.sql. columnNameOfCorruptRecord allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql. columnNameOfCorruptRecord.
multiLine false parse one record, which may span multiple lines.
Property Default Description
primitivesAsString false infers all primitive values as a string type
prefersDecimal false infers all floating-point values as a decimal type. If the values do not fit in decimal, then it infers them as doubles
allowComments false ignores Java/C++ style comment in JSON records
allowUnquotedFieldNames false allows unquoted JSON field names
allowSingleQuotes true allows single quotes in addition to double quotes
allowNumericLeadingZeros false allows leading zeros in numbers (e.g. 00012)
allowBackslashEscapingAnyCharacter false allows accepting quoting of all character using backslash quoting mechanism
allowUnquotedControlChars false allows JSON Strings to contain unquoted control characters (ASCII characters with value less than 32, including tab and line feed characters) or not.
mode PERMISSIVE allows a mode for dealing with corrupt records during parsing.
PERMISSIVE : when it meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord, and sets other fields to null. To keep corrupt records, an user can set a string type field named columnNameOfCorruptRecord in an user-defined schema. If a schema does not have the field, it drops corrupt records during parsing. When inferring a schema, it implicitly adds a columnNameOfCorruptRecord field in an output schema.
DROPMALFORMED : ignores the whole corrupted records.
FAILFAST : throws an exception when it meets corrupted records.
columnNameOfCorruptRecord spark.sql. columnNameOfCorruptRecord allows renaming the new field having malformed string created by PERMISSIVE mode. This overrides spark.sql. columnNameOfCorruptRecord.
dateFormat yyyy-MM-dd sets the string that indicates a date format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to date type.
timestampFormat yyyy-MM-dd’T’HH:mm:ss.SSSXXX sets the string that indicates a timestamp format. Custom date formats follow the formats at java.text.SimpleDateFormat. This applies to timestamp type.
multiLine false parse one record, which may span multiple lines, per file
encoding by default it is not set allows to forcibly set one of standard basic or extended encoding for the JSON files. For example UTF-16BE, UTF-32LE. If the encoding is not specified and multiLine is set to true, it will be detected automatically.
lineSep \r, \r\n and \n defines the line separator that should be used for parsing.
samplingRatio 1.0 defines fraction of input JSON objects used for schema inferring.
dropFieldIfAllNull false whether to ignore column of all null values or empty array/struct during schema inference.

ORC

There are no options for ORC

PARQUET

mergeSchema (default is the value specified in spark.sql.parquet.mergeSchema): sets whether we should merge schemas collected from all Parquet part-files. This will override spark.sql.parquet.mergeSchema.