Tidbits

Tidbit: a small piece of interesting information, or a small dish of pleasant-tasting food. (Cambridge dictionary)

Array in udf

How to apply an udf on a column which contains an array type: use Seq[String]

Dataframe looks like this:

    root
     |-- id: long (nullable = false)
     |-- items: array (nullable = true)
     |    |-- element: string (containsNull = true)

udf:

    val countItems = udf((history: Seq[String]) => history.size)
    df.withColumn("items_count", countItems($"history"))  

AlmostEqual for scala testing

Before:

    assert(point.getX === 1.0)

// result:
//[info] - test *** FAILED ***
//[info]   0.9999999999999998 did not equal 1.0 (Test.scala:47)

During:

    // Import 
    import org.scalactic.TolerantNumerics

// In test class
val epsilon = 1e-4f
implicit val doubleEq = TolerantNumerics.tolerantDoubleEquality(epsilon)

After: All tests passed.

Pandas save as Decimal

Decimal has the most precision when it comes to floating points. But it's not a native type in pandas. This is how you cast values into decimal.

import decimal
df['dx'] = df['dx'].astype(str).map(decimal.Decimal)
df['dy'] = df['dy'].astype(str).map(decimal.Decimal)

Change the author of the last commit

git commit --amend --author="username <email@gmail.com>"

Roll back to some previous commit

Here n is the commit number starting as 0 from top. So running this command with HEAD~0 should have no effect.

git reset HEAD~n --hard

Spark compiled or provided?

It's provided. Both sbt and Maven have assembly plugins. When creating assembly jars, list Spark and Hadoop as provided dependencies; these need not be bundled since they are provided by the cluster manager at runtime. Once you have an assembled jar you can call the bin/spark-submit script as shown here while passing your jar.

Write a single csv file with Spark

    df
      .coalesce(1)
      .write.format("com.databricks.spark.csv")
      .option("header", "true")
      .save("single_csv_output.csv")

Should I use df.sparkSession?

Using traits as interfaces?

As scala-lang docs says, this is a legit approach.

Download images as renamed listed in csv

Let's say we have csv file which contains list of links and file names. What is the easiest way to download these images and save them as file names given in csv other column? Node.js ? Python script? No, the shortest solution is using unix tools and pipe operator.

Assumint we have image links in the first col and file names in the second col, we can solve the problem like this:

cat source.csv | awk -F"," '{print "-O " $2 " " $1}' | xargs wget

Let's break this apart and see how it works.

  1. cat source.csv: reads the csv file line by line
  2. awk -F"," '{print "-O " $2 " " $1}': first we need to make awk understand the colmunar format is csv, so the divider is comma. Do this by providing -F",". And then, we create the required arguments for wget. To be able to download and save an image renamed, we do wget -O foo.png some-site.com/cat.png. If you run the command until this point, it will print -O foo.png some-site.com/cat.png exactly what we need to pass to wget.
  3. xargs wget: using xargs, now we pass the arguments we built in the previous step to wget.

Unix pipes are magic.

Git undo options

Combine columns into array in a Spark DataFrame

We can do it with array function.

    import org.apache.spark.sql.functions._
    val result = inputSmall.withColumn("combined", array($"transformedCol1", $"transformedCol2"))
    result.show()

    +-------+---------------+-------+---------------+-----------+
    |column1|transformedCol1|column2|transformedCol2|   combined|
    +-------+---------------+-------+---------------+-----------+
    |      A|            0.3|      B|           0.25|[0.3, 0.25]|
    |      A|            0.3|      g|            0.4| [0.3, 0.4]|
    |      d|            0.0|      f|            0.1| [0.0, 0.1]|
    |      d|            0.0|      d|            0.7| [0.0, 0.7]|
    |      A|            0.3|      d|            0.7| [0.3, 0.7]|
    |      d|            0.0|      g|            0.4| [0.0, 0.4]|
    |      c|            0.2|      B|           0.25|[0.2, 0.25]|
    +-------+---------------+-------+---------------+-----------+

We can also create columns list dynamically, querying all columns. For instance, I want to combine columns starting with m_ only:

    val filteredCols: Array[Column] = df.columns.filter(_.startsWith("m_")).map(col(_))
    val result = df.withColumn("combined", array(filteredCols:_*))

Create multiple columns dynamically

I want to create multiple columns on a dataframe depending on an array maybe:

    val result = frames.select($"*" +: timeWindows.map(t => 
      when($"timestamp" >= c.getAs[Long]("start") && $"timestamp" <= c.getAs[Long]("end"), lit(c.getAs[String]("windowId")))
        .otherwise(lit(0)).alias(s"m_${c.getAs[String]("windowId")}")
    ): _*)

Create a UUID column based on a unique key

Let's say we have a unique key field in a Spark dataframe, but this does not look as good as UUID. We can create a column with UUIDs based on this unique key.

    import java.util.UUID

    val uuidFromKey = udf((key: String) => UUID.nameUUIDFromBytes(key.getBytes()).toString())

    val framesWithUUIDs = frames
        .withColumn("unique_uuid", uuidFromKey($"unique_key"))
2020 (c) generated by simplest blog engine [get it here]