Algorithmia Blog

Getting Started with Algorithmia in Spark

Apache Spark is one of the most useful tools for large scale data processing. It allows for data ingestion, aggregation, analysis and more on massive amounts of data and has been widely adopted by data engineers and other professionals.

With Spark Streaming and Spark SQL you can perform ETL operations in real-time on data coming from a variety of sources such as Kafka or Flume. And now if you want to do some basic machine learning, you can do that with SparkML, which is a library where they bring core statistical models like KMeans or decision tree models to users in a high level API.

But what if you want to analyze thousands of Tweets in real time, yet you don’t have a trained dataset to discover the sentiment of those tweets. Or maybe you want to classify documents on the fly or remove profanity from text or nudity from images?

Algorithmia’s over 4,000 pre-trained models and functions cover all of the above use cases and perfectly compliment Spark’s core functionality. These pre-trained models can easily integrate into Spark via a REST API endpoint. And just like Spark, Algorithmia has Python, R, Java, and Scala clients so you can stay in the language you’re familiar with while building robust machine and deep learning pipelines that scale with your data.

Today, we’ll show you how easy it is to integrate a deep fashion pre-trained model into Spark by simply calling the Algorithmia API from your Spark cluster in a few lines of Scala code.

The deep fashion model hosted on Algorithmia is based on the faster-rcnn project which was inspired by the paper: Faster R-CNN: Towards Real-Time Object Detection with Region Proposal Networks. This algorithm takes an image as input and returns the image with bounding boxes around the articles of clothing found, along with the clothing article labels and the confidence score associated with those articles. For more information on how we built this model check out our blog post Classify Clothing and Fashion in Images.

Alright, let’s get started writing some code!

First you’ll need a free Algorithmia account, which includes 5,000 free credits a month.

Sign up here, and then grab your API key under My Profile > Credentials because we’ll need it in a bit.

Next, clone the repo from GitHub and go to the build.sbt file which will look like this:

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "2.1.1",
    "org.apache.spark" %% "spark-streaming" % "2.1.1",
    "org.apache.spark" %% "spark-sql" % "2.1.1",
    "org.twitter4j" % "twitter4j-core" % "4.0.6",
    "org.apache.bahir" %% "spark-streaming-twitter" % "2.1.0",
    "com.algorithmia" %% "algorithmia-scala" % "0.9.2",
    "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.6"
)

If you have a different build environment other than sbt, check out the Algorithmia Scala Client which is published to Maven central with additional reference documentation that can be found in the Algorithmia Scala Client API Docs and the Algorithmia API docs.

Now, back to our example. Under “src/main/scala” folder you’ll see two different files, one is Auth.scala and the other is CollectedTweets.scala.

Let’s first go to the Auth.scala:

import play.api.libs.json.Json
import scala.io.Source

// Twitter and Algorithmia auth
case class Auth(
  consumerKey: String,
  consumerSecret: String,
  accessToken: String,
  accessTokenSecret: String,
  algorithmiaApiKey: String
)

object Auth {
  private val authFile = "auth.json"
  private implicit val authReads = Json.reads[Auth]

  def load: Auth = {
    Json.fromJson[Auth](Json.parse(Source.fromFile(authFile).mkString)).get
  }
}

Where you’ll see the function:

def load: Auth = {
  Json.fromJson[Auth](Json.parse(Source.fromFile(authFile).mkString)).get
}

This function gets our various API keys from our JSON file which we’ll need for both the twitter4j authentication and Algorithmia’s API.

Go ahead and check out the JSON file “twitter_deepfashion/auth.json” and set your API keys for these variables:

consumerKey: String,
consumerSecret: String,
accessToken: String,
accessTokenSecret: String,
algorithmiaApiKey: String

If you haven’t worked with the Twitter API, sign up here and grab your auth credentials. If you didn’t get your Algorithmia API key already, then grab it from it from your profile under the Credentials tab.

Now that’s done, we can go to “twitter_deepfashion/src/main/scala/CollectTweets.scala” where we’ll write the bulk of our code.

First you’ll notice that below your imports you’ll need to create a couple of case classes. The first one is the Article class, which will contain the label of each article of clothing we’ll be retrieving from each image. The second one is called Result so we can get our results from Algorithmia as a list.

import com.algorithmia._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import play.api.libs.json.Json

case class Article(article_name: String)
case class Result(articles: List[Article])

Next let’s look at the object CollectTweets:

object CollectTweets {
  private implicit val articleReads = Json.reads[Article]
  private implicit val resultReads = Json.reads[Result]

  def main(args: Array[String]) {
    // Load auth from json file
    val auth = Auth.load

    println("Initializing Streaming Spark Context...")

    val SPARK_HOSTNAME = "local[*]"
    val conf = new SparkConf().setAppName("CollectTweets").setMaster(SPARK_HOSTNAME)
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(1))
    ssc.checkpoint("/tmp/spark-test")
    val sparky = SparkSession.builder.config(sc.getConf).getOrCreate()

    System.setProperty("twitter4j.oauth.consumerKey", auth.consumerKey)
    System.setProperty("twitter4j.oauth.consumerSecret", auth.consumerSecret)
    System.setProperty("twitter4j.oauth.accessToken", auth.accessToken)
    System.setProperty("twitter4j.oauth.accessTokenSecret", auth.accessTokenSecret)

  }

}

First, we created a couple of variables that will hold a JSONified version of the Article and Result values.

Then in our main function, we created a Spark Streaming Context and added our Twitter auth credentials. Remember if you didn’t add your Twitter auth strings in “twitter_deepfashion/auth.json” then do that now.

Next, add within your main function directly below System.setProperty:

val tweetFilters = Seq("fashionblogger", "OOTD")
TwitterUtils.createStream(ssc, None, tweetFilters)
  .flatMap(_.getMediaEntities)        // Get tweet media entities
  .map(_.getMediaURL)                 // Get image urls
  // Parallelize tweets into partitions, and get image tags using Algorithmia
  .mapPartitions { partition =>
    // Create client per-partition, since it may be distributed across cluster
    val client = Algorithmia.client(auth.algorithmiaApiKey)
    val algo = client.algo("algo://algorithmiahq/DeepFashion/0.1.1")
    // Send urls to Algorithmia for tagging:
    partition.map(url => algo.pipe(url).as[Result])
  }

In the code above, we are creating a filter to only get tweets that either mention the terms “fashionblogger” or “OOTD” which is the acronym for “outfit of the day”.

Then we are using the Spark Streaming Twitter library to pass in our tweet filters. Notice we only want to get the media entities so we can then extract the image urls from the Twitter API.

Next we use .mapPartitions to parallelize tweets into partitions. Following that we create the Algorithmia client for each partition, passing in our auth.algorithmiaApiKey. Then, we use the .algo() method so we pass in the algorithm we want to call which is Deep Fashion.

Notice that we add the version number to the algorithm path to ensure we always get the expected behavior. If you want more information about how to call algorithms check out the API docs.

Next, we pass in each image url that we got from Twitter, and then use the .as() method to cast our resulting image tags into a list.

Now, go ahead and place the following chained methods right under the closing curly brace of the previous code snippet:

  .flatMap(result => result.articles) // Get articles from tweets
  .map(art => (art.article_name, 1))  // Create counting pairs
  .reduceByKey(_ + _)                 // Sum within partitions
  .updateStateByKey(sumState)         
  .foreachRDD { rdd =>
    val tagCounts = rdd
      .collect()
      .toList
      .sortBy(-_._2)
    if(tagCounts.nonEmpty) {
      println("Got tag counts: " + tagCounts.mkString(","))
      List topArticles = tagCounts.take(6)
      println("Top Articles: " + topArticles.mkString(","))
    }
  }

ssc.start()
ssc.awaitTermination()

}

Finally, in the code above, we do some basic summation of all articles of clothing that are of the same type within the partitions. Then notice we’re collecting all the articles from the rdd’s and sorting them by their counts within a list. For all tags that aren’t empty, we’ll print them out.

But don’t run your code yet. If you noticed in the code above, we passed in an argument sumState, which is a function we haven’t created.

Below the last closing curly brace in the code above, go ahead and create the sumState function:

// This accumulates totals from lists of ints, for updateStateByKey
private def sumState: (Seq[Int],Option[Int]) => Option[Int] = {
  case (newCounts, sumSoFar) => sumSoFar.orElse(Some(0)).map(_ + newCounts.sum)
}

This is where we are gathering all the counts for each item and then we sum them up. Ok, now that we’ve done the final step you can run your code!

You’ll get something a bit different, but here’s an example of an image from #OOTD #fashionblogger on Twitter:

Which generates this output:

{
  "articles": [
    {
      "article_name": "sweater",
      "bounding_box": {
        "x0": 1106.947021484375,
        "x1": 1636.649658203125,
        "y0": 283.66015625,
        "y1": 1202.80908203125
      },
      "confidence": 0.9918832778930664
    },
    {
      "article_name": "skirt",
      "bounding_box": {
        "x0": 320.62493896484375,
        "x1": 985.2966918945312,
        "y0": 943.1708984375,
        "y1": 1248.072265625
      },
      "confidence": 0.9788508415222168
    },
    {
      "article_name": "heels pumps or wedges",
      "bounding_box": {
        "x0": 1069.8662109375,
        "x1": 1367.072265625,
        "y0": 1956.006103515625,
        "y1": 2047
      },
      "confidence": 0.6501792669296265
    }
  ],
  "output": "data://.algo/temp/86e3885f-eb37-44d9-8c4f-c3b496497477.png"
}

Awesome! The algorithm found all the articles of clothing correctly and was able to do it on the fly in real-time from Spark Streaming.

If you want to see the complete code, check out the GitHub repo or if you’re wanting more ideas about what you could build after you get your clothing articles check out this post about the same algorithm.

Thanks for checking out our Algorithmia + Spark project and let us know what you’re building at info@algorithmia.com or @Algorithmia on Twitter.