Enriches the DataFrame by adding column(s) with content from REST API output based on the given configuration.
Each property can either be set as a static value or a value from an existing column of the input DataFrame. Please refer to the examples in the description column of each parameter for reference on how the string value should be formed.
|method||method for the new Request object: ||true|
|url||URL for the REST API.||true|
|params||Dictionary, list of tuples or bytes to send in the query string for the Request. eg: ||false|
|data||Dictionary to send in the body of the Request. eg: ||false|
|JSON||A JSON serializable Python object to send in the body of the Request. eg: ||false|
|headers||Dictionary of HTTP Headers to send with the Request. eg: ||false|
|cookies||Dictionary to send with the Request. eg: ||false|
|auth||Auth tuple to enable Basic/Digest/Custom HTTP Auth. eg: ||false|
|timeout||How many seconds to wait for the server to send data before giving up, as a float, eg: ||false|
|allow redirects||Enable/disable ||false||true|
|proxies||Dictionary mapping protocol to the URL of the proxy. eg: ||false|
|verify||Either a boolean, in which case it controls whether we verify the server’s TLS certificate eg: ||false||true|
|stream||if False, the response content will be immediately downloaded. eg: ||false|
|cert||if String, path to SSL client cert file (.pem). eg. ||false|
|parse content||Parse content as JSON (to make the schema available, please enable ||false||false|
- To store sensitive information like API key (headers), auth etc.,
Databricks secretscan be used as shown in Example below.
- If the expected number of rows are very large, it's better to provide
await timein the
advanced tabso you don't overwhelm the source server or exceed any request limits.
- For APIs which takes list of parameters as inputs, window functions like
collect_listcan be used before
RestApiEnrichGem to reduce the number of API calls.
Please make sure that cluster is connected while using the
parse content option to
infer the schema from cluster for the first time.
All input parameters are expected to be in string format. Other column types such as
array/JSON/struct can be created
using combination of aggregate/window Gems along with reformat component and then can be cast as string prior to passing the column in
Let's try to fetch prices for few cryptocurrencies from Coin-API.
We would be taking cryptocurrency and currency as input from DataFrame and pass url, headers as static values. Please note that URL in this example is created using static base url and adding cryptocurrency and currency as inputs from DataFrame.
Also, we would be using Databricks-secrets to pass headers as it requires API-key.
Let's take a more complex example, where all method, url, headers, params etc are passed as values from DataFrame columns.
def get_data_from_api(spark: SparkSession, in0: DataFrame) -> DataFrame:
requestDF = in0.withColumn(
to_json(struct(lit("GET").alias("method"), col("url"), lit(Config.coin_api_key).alias("headers"))),