Javascript required
Skip to content Skip to sidebar Skip to footer

How to Read File in Spark Scala

visibility eight,028 access_time 2 years ago languageEnglish

This article shows virtually how read CSV or TSV file as Spark DataFrame using Scala. The CSV file tin be a local file or a file in HDFS (Hadoop Distributed File Organization).

Read CSV Spark API

SparkSession.read can exist used to read CSV files.

def csv(path: String): DataFrame Loads a CSV file and returns the result every bit a DataFrame. See the documentation on the other overloaded csv() method for more than details.

This function is but available for Spark version 2.0. For Spark one.x, y'all need to user SparkContext to convert the data to RDD and so convert it to Spark DataFrame.

Read CSV file

The following lawmaking snippet reads from a local CSV file named examination.csv with the following content:

ColA,ColB 1,2 3,4 v,6 seven,eight

Lawmaking snippet

scala> spark.read.option("header","truthful").csv("file:///F:\\big-data/exam.csv").testify() +----+----+ |ColA|ColB| +----+----+ |   1|   2| |   3|   four| |   5|   6| |   vii|   8| +----+----+

Alternatively, you tin likewise use the generic load APIs with format and options specified:

scala> spark.read.format("csv").option("header","truthful").load("file:///F:\\big-data/examination.csv").testify() +----+----+ |ColA|ColB| +----+----+ |   1|   ii| |   three|   4| |   five|   6| |   7|   8| +----+----+

infoIn the path parameter, file:/// is specified to point the input is from a local source file. If you only specify path like /path/to/your/file, Spark will assume the information is bachelor in HDFS by default.

CSV format options

There are a number of CSV options tin be specified. The post-obit options are cited from Spark 3.0.1 Scala API documentation for reference:

You lot can set the following CSV-specific options to deal with CSV files:

  • sep  (default , ): sets a separator for each field and value. This separator tin be ane or more than characters.
  • encoding  (default UTF-8 ): decodes the CSV files by the given encoding type.
  • quote  (default " ): sets a unmarried character used for escaping quoted values where the separator can be part of the value. If you would like to turn off quotations, you need to set not null  but an empty string. This behaviour is unlike from com.databricks.spark.csv .
  • escape  (default \ ): sets a single character used for escaping quotes inside an already quoted value.
  • charToEscapeQuoteEscaping  (default escape  or \0 ): sets a unmarried character used for escaping the escape for the quote character. The default value is escape grapheme when escape and quote characters are dissimilar, \0  otherwise.
  • comment  (default empty string): sets a single character used for skipping lines offset with this graphic symbol. By default, information technology is disabled.
  • header  (default false ): uses the first line every bit names of columns.
  • enforceSchema  (default true ): If it is set to true , the specified or inferred schema will exist forcibly applied to datasource files, and headers in CSV files volition be ignored. If the option is set to false , the schema will exist validated against all headers in CSV files in the case when the header  option is ready to true . Field names in the schema and column names in CSV headers are checked by their positions taking into account spark.sql.caseSensitive . Though the default value is true, it is recommended to disable the enforceSchema  selection to avoid incorrect results.
  • inferSchema  (default imitation ): infers the input schema automatically from data. It requires 1 extra pass over the information.
  • samplingRatio  (default is ane.0): defines fraction of rows used for schema inferring.
  • ignoreLeadingWhiteSpace  (default simulated ): a flag indicating whether or not leading whitespaces from values existence read should be skipped.
  • ignoreTrailingWhiteSpace  (default false ): a flag indicating whether or not abaft whitespaces from values existence read should be skipped.
  • nullValue  (default empty string): sets the cord representation of a null value. Since 2.0.1, this applies to all supported types including the string blazon.
  • emptyValue  (default empty string): sets the string representation of an empty value.
  • nanValue  (default NaN ): sets the string representation of a not-number" value.
  • positiveInf  (default Inf ): sets the cord representation of a positive infinity value.
  • negativeInf  (default -Inf ): sets the string representation of a negative infinity value.
  • dateFormat  (default yyyy-MM-dd ): sets the string that indicates a date format. Custom date formats follow the formats at Datetime Patterns. This applies to date type.
  • timestampFormat  (default yyyy-MM-dd'T'HH:mm:ss[.SSS][30] ): sets the string that indicates a timestamp format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp blazon.
  • maxColumns  (default 20480 ): defines a hard limit of how many columns a tape can have.
  • maxCharsPerColumn  (default -1 ): defines the maximum number of characters allowed for any given value existence read. Past default, it is -1 meaning unlimited length
  • manner  (default PERMISSIVE ): allows a mode for dealing with corrupt records during parsing. It supports the post-obit example-insensitive modes. Note that Spark tries to parse only required columns in CSV under cavalcade pruning. Therefore, corrupt records can be dissimilar based on required prepare of fields. This behavior can exist controlled by spark.sql.csv.parser.columnPruning.enabled  (enabled by default).
  • PERMISSIVE  : when information technology meets a corrupted record, puts the malformed string into a field configured by columnNameOfCorruptRecord , and sets malformed fields to naught . To go along corrupt records, an user can set up a string type field named columnNameOfCorruptRecord  in an user-defined schema. If a schema does not have the field, it drops decadent records during parsing. A record with less/more than tokens than schema is non a corrupted record to CSV. When it meets a record having fewer tokens than the length of the schema, sets null  to actress fields. When the record has more tokens than the length of the schema, it drops extra tokens.
  • DROPMALFORMED  : ignores the whole corrupted records.
  • FAILFAST  : throws an exception when it meets corrupted records.
  • columnNameOfCorruptRecord  (default is the value specified in spark.sql.columnNameOfCorruptRecord ): allows renaming the new field having malformed string created by PERMISSIVE  mode. This overrides spark.sql.columnNameOfCorruptRecord .
  • multiLine  (default faux ): parse i tape, which may span multiple lines.
  • locale  (default is en-US ): sets a locale as language tag in IETF BCP 47 format. For instance, this is used while parsing dates and timestamps.
  • lineSep  (default covers all \r , \r\n  and \due north ): defines the line separator that should exist used for parsing. Maximum length is 1 character.
  • pathGlobFilter : an optional glob blueprint to only include files with paths matching the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter . It does not change the behavior of partition discovery.
  • recursiveFileLookup: recursively scan a directory for files. Using this selection disables partition discovery

Load TSV file

Choice sep can exist used to specify input file as TSV (tab separated values) or whatever other character delimited files. Past default, the value is , (comma).

spark.read.format("csv").option("header","true").choice("sep","\t").load("file:///F:\\large-data/test.csv").show()

Reference

Refer to Spark Scala official documentation for more than details about the APIs:

Spark 3.0.i ScalaDoc - org.apache.spark.sql.DataFrameReader

rimmeralte1945.blogspot.com

Source: https://kontext.tech/column/spark/533/scala-read-csv-file-as-spark-dataframe