When I joined CloudFlare about 18 months ago, we had just started to build out our new Data Platform. At that point, the log processing and analytics pipeline built in the early days of the company had reached its limits. This was due to the rapidly increasing log volume from our Edge Platform where we’ve had to deal with traffic growth in excess of 400% annually.
Our log processing pipeline started out like most everybody else’s: compressed log files shipped to a central location for aggregation by a motley collection of Perl scripts and C++ programs with a single PostgreSQL instance to store the aggregated data. Since then, CloudFlare has grown to serve millions of requests per second for millions of sites. Apart from the hundreds of terabytes of log data that has to be aggregated every day, we also face some unique challenges in providing detailed analytics for each of the millions of sites on CloudFlare.
For the next iteration of our Customer Analytics application, we wanted to get something up and running quickly, try out Kafka, write the aggregation application in Go, and see what could be done to scale out our trusty go-to database, PostgreSQL, from a single machine to a cluster of servers without requiring us to deal with sharding in the application.
As we were analyzing our scaling requirements for PostgreSQL, we came across Citus Data, one of the companies to launch out of Y Combinator in the summer of 2011. Citus Data builds a database called CitusDB that scales out PostgreSQL for real-time workloads. Because CitusDB enables both real-time data ingest and sub-second queries across billions of rows, it has become a crucial part of our analytics infrastructure.
Log Processing Pipeline for Analytics
Before jumping into the details of our database backend, let’s review the pipeline that takes a log event from CloudFlare’s Edge to our analytics database.
An HTTP access log event proceeds through the CloudFlare data pipeline as follows:
A web browser makes a request (e.g., an HTTP GET request).
An Nginx web server running Lua code handles the request and generates a binary log event in Cap’n Proto format.
A Go program akin to Heka receives the log event from Nginx over a UNIX socket, batches it with other events, compresses the batch using a fast algorithm like Snappy or LZ4, and sends it to our data center over a TLS-encrypted TCP connection.
Another Go program (the Kafka shim) receives the log event stream, decrypts it, decompresses the batches, and produces the events into a Kafka topic with partitions replicated on many servers.
Go aggregators (one process per partition) consume the topic-partitions and insert aggregates (not individual events) with 1-minute granularity into the CitusDB database. Further rollups to 1-hour and 1-day granularity occur later to reduce the amount of data to be queried and to speed up queries over intervals spanning many hours or days.
Why Go?
Previous blog posts and talks have covered various CloudFlare projects that have been built using Go. We’ve found that Go is a great language for teams to use when building the kinds of distributed systems needed at CloudFlare, and this is true regardless of an engineer’s level of experience with Go. Our Customer Analytics team is made up of engineers that have been using Go since before its 1.0 release as well as complete Go newbies. Team members that were new to Go were able to spin up quickly, and the code base has remained maintainable even as we’ve continued to build many more data processing and aggregation applications such as a new version of our Layer 7 DDoS attack mitigation system.
Another factor that makes Go great is the ever-expanding ecosystem of third party libraries. We used go-capnproto to generate Go code to handle binary log events in Cap’n Proto format from a common schema shared between Go, C++, and Lua projects. Go support for Kafka with Shopify’s Sarama library, support for ZooKeeper with go-zookeeper, support for PostgreSQL/CitusDB through database/sql and the lib/pq driver are all very good.
Why Kafka?
As we started building our new data processing applications in Go, we had some additional requirements for the pipeline:
Use a queue with persistence to allow short periods of downtime for downstream servers and/or consumer services.
Make the data available for processing in real time by scripts written by members of our Site Reliability Engineering team.
Allow future aggregators to be built in other languages like Java, C or C++.
After extensive testing, we selected Kafka as the first stage of the log processing pipeline.
Why Postgres?
As we mentioned when PostgreSQL 9.3 was released, PostgreSQL has long been an important part of our stack, and for good reason.
Foreign data wrappers and other extension mechanisms make PostgreSQL an excellent platform for storing lots of data, or as a gateway to other NoSQL data stores, without having to give up the power of SQL. PostgreSQL also has great performance and documentation. Lastly, PostgreSQL has a large and active community, and we've had the privilege of meeting many of the PostgreSQL contributors at meetups held at the CloudFlare office and elsewhere, organized by the The San Francisco Bay Area PostgreSQL Meetup Group.
Why CitusDB?
CloudFlare has been using PostgreSQL since day one. We trust it, and we wanted to keep using it. However, CloudFlare's data has been growing rapidly, and we were running into the limitations of a single PostgreSQL instance. Our team was tasked with scaling out our analytics database in a short time so we started by defining the criteria that are important to us:
Performance: Our system powers the Customer Analytics dashboard, so typical queries need to return in less than a second even when dealing with data from many customer sites over long time periods.
PostgreSQL: We have extensive experience running PostgreSQL in production. We also find several extensions useful, e.g., Hstore enables us to store semi-structured data and HyperLogLog (HLL) makes unique count approximation queries fast.
Scaling: We need to dynamically scale out our cluster for performance and huge data storage. That is, if we realize that our cluster is becoming overutilized, we want to solve the problem by just adding new machines.
High availability: This cluster needs to be highly available. As such, the cluster needs to automatically recover from failures like disks dying or servers going down.
Business intelligence queries: in addition to sub-second responses for customer queries, we need to be able to perform business intelligence queries that may need to analyze billions of rows of analytics data.
At first, we evaluated what it would take to build an application that deals with sharding on top of stock PostgreSQL. We investigated using the postgres_fdw extension to provide a unified view on top of a number of independent PostgreSQL servers, but this solution did not deal well with servers going down.
Research into the major players in the PostgreSQL space indicated that CitusDB had the potential to be a great fit for us. On the performance point, they already had customers running real-time analytics with queries running in parallel across a large cluster in tens of milliseconds.
CitusDB has also maintained compatibility with PostgreSQL, not by forking the code base like other vendors, but by extending it to plan and execute distributed queries. Furthermore, CitusDB used the concept of many logical shards so that if we were to add new machines to our cluster, we could easily rebalance the shards in the cluster by calling a simple PostgreSQL user-defined function.
With CitusDB, we could replicate logical shards to independent machines in the cluster, and automatically fail over between replicas even during queries. In case of a hardware failure, we could also use the rebalance function to re-replicate shards in the cluster.
CitusDB Architecture
CitusDB follows an architecture similar to Hadoop to scale out Postgres: one primary node holds authoritative metadata about shards in the cluster and parallelizes incoming queries. The worker nodes then do all the actual work of running the queries.
In CloudFlare's case, the cluster holds about 1 million shards and each shard is replicated to multiple machines. When the application sends a query to the cluster, the primary node first prunes away unrelated shards and finds the specific shards relevant to the query. The primary node then transforms the query into many smaller queries for parallel execution and ships those smaller queries to the worker nodes.
Finally, the primary node receives intermediate results from the workers, merges them, and returns the final results to the application. This takes anywhere between 25 milliseconds to 2 seconds for queries in the CloudFlare analytics cluster, depending on whether some or all of the data is available in page cache.
From a high availability standpoint, when a worker node fails, the primary node automatically fails over to the replicas, even during a query. The primary node holds slowly changing metadata, making it a good fit for continuous backups or PostgreSQL's streaming replication feature. Citus Data is currently working on further improvements to make it easy to replicate the primary metadata to all the other nodes.
At CloudFlare, we love the CitusDB architecture because it enabled us to continue using PostgreSQL. Our analytics dashboard and BI tools connect to Citus using standard PostgreSQL connectors, and tools like pg_dump
and pg_upgrade
just work. Two features that stand out for us are CitusDB’s PostgreSQL extensions that power our analytics dashboards, and CitusDB’s ability to parallelize the logic in those extensions out of the box.
Postgres Extensions on CitusDB
PostgreSQL extensions are pieces of software that add functionality to the core database itself. Some examples are data types, user-defined functions, operators, aggregates, and custom index types. PostgreSQL has more than 150 publicly available official extensions. We’d like to highlight two of these extensions that might be of general interest. It’s worth noting that with CitusDB all of these extensions automatically scale to many servers without any changes.
HyperLogLog
HyperLogLog is a sophisticated algorithm developed for doing unique count approximations quickly. And since a HLL implementation for PostgreSQL was open sourced by the good folks at Aggregate Knowledge, we could use it with CitusDB unchanged because it’s compatible with most (if not all) Postgres extensions.
HLL was important for our application because we needed to compute unique IP counts across various time intervals in real time and we didn’t want to store the unique IPs themselves. With this extension, we could, for example, count the number of unique IP addresses accessing a customer site in a minute, but still have an accurate count when further rolling up the aggregated data into a 1-hour aggregate.
Hstore
The hstore data type stores sets of key/value pairs within a single PostgreSQL value. This can be helpful in various scenarios such as with rows with many attributes that are rarely examined, or to represent semi-structured data. We use the hstore data type to hold counters for sparse categories (e.g. country, HTTP status, data center).
With the hstore data type, we save ourselves from the burden of denormalizing our table schema into hundreds or thousands of columns. For example, we have one hstore data type that holds the number of requests coming in from different data centers per minute per CloudFlare customer. With millions of customers and hundreds of data centers, this counter data ends up being very sparse. Thanks to hstore, we can efficiently store that data, and thanks to CitusDB, we can efficiently parallelize queries of that data.
For future applications, we are also investigating other extensions such as the Postgres columnar store extension cstore_fdw that Citus Data has open sourced. This will allow us to compress and store even more historical analytics data in a smaller footprint.
Conclusion
CitusDB has been working very well for us as the new backend for our Customer Analytics system. We have also found many uses for the analytics data in a business intelligence context. The ease with which we can run distributed queries on the data allows us to quickly answer new questions about the CloudFlare network that arise from anyone in the company, from the SRE team through to Sales.
We are looking forward to features available in the recently released CitusDB 4.0, especially the performance improvements and the new shard rebalancer. We’re also excited about using the JSONB data type with CitusDB 4.0, along with all the other improvements that come standard as part of PostgreSQL 9.4.
Finally, if you’re interested in building and operating distributed services like Kafka or CitusDB and writing Go as part of a dynamic team dealing with big (nay, gargantuan) amounts of data, CloudFlare is hiring.