Background
Clobotics is a company that connects the physical and digital realms. Clobotics operates in two industries: wind power and retail. Though different, both industries benefit from Clobotics’ groundbreaking technology through the use of computer vision, artificial intelligence/machine learning, and data analytics that bring the physical world into the digital.
These technologies are applied through multiple forms, including autonomous drones, mobile apps, and IoT devices. For retail, Clobotics enables an end-to-end solution known as Smart Retail that provides Retail Insights as a Service (RIaaS). Through this, retail customers are able to access fast and accurate real-time inventory data. The power of RIaaS lets retail customers know whether a product is available at a grocery store ahead of time.
Problem
Clobotics uses high-resolution photography and image processing to generate retail inventory data. Cameras are attached to shelves and coolers or part of a hand-held device. Raw images are then uploaded to the cloud.
The images loaded to the cloud are cropped into different segments and analyzed independently. Images are taken only when activity is detected (opening and closing of cooler door). Therefore, the number of images processed at any given time is highly variable. The high variability described above means that Clobotics needs to adopt a solution that is capable of scaling for peak volume.
Initial Solution
As a fast-growing startup, Clobotics, initially emphasized developing with tools that allowed for rapid iteration and production. Scalability is less of a concern when exploring product-market fit. As such, Python, Pandas, and PIL were used to crop images. Pandas is used to hold metadata such as file paths. Python is used to access the latest image via an API, and PIL is used to load the images, crop them, and save the output back to cloud storage for further processing. An example of what this data looks like can be found below.
To parallelize locally, tools like pandarellel and the native Python multiprocessing pool were used. However, the problem is that these tools are limited to a single machine. In order to support the highly variable number of images coming in, Spark was explored for its autoscaling capabilities. By using a cluster, the processing can be done distributedly, making the architecture a better fit for burst-type workloads.
The problem was how to migrate over the existing code to be compatible with Spark.
Bringing Code to Spark with Fugue
Making code compatible to run on top of Spark requires significant rewrites and a lot of additional boilerplate code. Thus, the Clobotics team looked for solutions that would simplify porting code over. Koalas provides a Pandas API on top of Spark, but Clobotics problems were not confined to the Pandas semantics, especially the API requests and image processing portions.
The team eventually settled on Fugue, an abstraction layer that allows users to port Python, Pandas, and SQL code to distributed computing frameworks (Spark, Dask, or Ray). Fugue reduces migration effort and is minimally invasive, often just needing a few additional lines of code to port over. Using Fugue allowed Clobotics to re-use most of the same code to scale to Spark.
Below is a highly simplified example of the workflow Clobotics uses. The DataFrame above is converted to a list of dictionaries (with some helper code), and each row is processed independently. For each row, the API is hit to retrieve the image, and then it is processed and uploaded to cloud storage. In reality, there is some more logic to prevent the re-downloading of images along with some metadata handling. The code has been trimmed for clarity.
python 1
2import requests
3from typing import Any, Dict, Iterable
4from PIL import Image
5from io import BytesIO
6
7def transform_img(df: List[Dict[str, Any]]) -> Iterable[str, Any]:
8 for row in df:
9 try:
10 response = requests.get(row["ImgUrl"], timeout=5)
11 img = Image.open(BytesIO(response.content))
12 xmin = float(row["xmin"]) * img.width
13 xmax = float(row["xmax"]) * img.width
14 ymin = float(row["ymin"]) * img.height
15 ymax = float(row["ymax"]) * img.height
16 img = img.crop((xmin, ymin, xmax, ymax))
17
18 # logic to save image to cloud storage
19
20 yield row
21 except Exception as e:
22 logger.error(e)
23 yield row
24
25 results = transform_img(df.to_dict("records"))
This code runs on Pandas and Python. In order to bring it to Spark, all we have to do is add a couple of lines of code to use Fugue.
python1from fugue import transform
2from pyspark.sql import SparkSession
3
4spark = SparkSession.builder.getOrCreate()
5results = transform(df,
6 transform_img,
7 schema="*",
8 engine=spark)
Here we used the Fugue transform()
function to port transform_img()
to Spark. We passed in spark
as the engine to indicate that the DataFrame should be converted and the operation should run on top of Spark. If no engine is passed, the operation will run on Pandas. This allows for quick iteration before bringing the work to Spark, which can be slow to spin up. Notice that Fugue can also parse the type annotations of the Python function and apply the necessary conversions to bring it to Spark.
FugueSQL for End-to-end Workflows
The other part of using Fugue is that it has a SQL interface. The above example is heavily simplified, but the real scenario often has joins with metadata, which is easier to express in SQL than in Python. FugueSQL is a modified SQL interface that allows users to invoke Python functions.
For example, in order to quickly test our Python code, we could use the DuckDB engine, as seen below. The TRANSFORM
keyword will invoke the Python function.
Similar to the Fugue transform()
, all we need to do to bring this to Spark for the full data is to use %%fsql spark
at the top instead. The query will then run on top of SparkSQL and Pyspark.
Conclusion
As a fast-growing startup, leveraging an abstraction layer gives Clobotics the flexibility to scale already existing code with minimal re-writes. Having a codebase that is compatible with both Pandas and Spark allows for these key benefits:
- The overhead to maintain the codebase is much smaller because there is no dependency on Spark, which adds a lot of boilerplate code.
- As Clobotics continues to refine algorithms, it becomes much faster to iterate on the local setting before bringing the code to Spark. Fugue allows changing engines with just one line of code change.
- The flexibility of having an end-to-end SQL interface allows developers to choose the best tool for the job. For workflows that require a lot of joins, the end-to-end pipeline can be done in FugueSQL because of the enhanced FugueSQL interface.
Resources
For more resources, check the following links: