Hacker News new | past | comments | ask | show | jobs | submit login
Queues and databases (antirez.com)
197 points by edmccard on July 14, 2014 | hide | past | favorite | 40 comments



Relevant paper: Queues are databases, Jim Gray

http://research.microsoft.com/pubs/69641/tr-95-56.pdf


Good paper.

Likewise, I would say that databases are queues.

In fact everything in computing is a queue; disk i/o, cpu tasks, people at their keyboards.


Aaaand, that's one more op system I can look forward to replacing with Redis.

Totally off-topic, but I have such a fangirl crush on Salvatore Sanfilippo... Every time he posts I get butterflies.


Are you sure that's a good thing? antirez seems to have a hard time reasoning correctly about distributed systems:

http://aphyr.com/posts/283-call-me-maybe-redis http://aphyr.com/posts/307-call-me-maybe-redis-redux


In the first post Redis + asynchronous replication + (an alpha version of) Sentinel were found to lose writes during partitions. This is an obvious result of asynchronous replication + failover, since this is, technically, an eventually consistent system where the merge function is picking a single timeline when there are divergences. Redis was improved since then by adding heuristics to practically lose less writes, but basically this was an expected result that nobody can change.

In the second blog post I proposed, just for an argument of a different discussion, a strongly consistent system where basically you have God coordinating the failover, and every write replicated N times. This is something that's not even worth to test, since it is obvious that a system like that is linearizable, however it was still tested, and since it was obviously flawless per definition, the flaw was found, very surprising, in something that violates any CP system, that is, serving reads directly without any agreement. Later Aphyr in this article (http://aphyr.com/posts/316-call-me-maybe-etcd-and-consul) pointed to reading from a possible stale leader as a flaw, while in its implementation in the previous article he setup the things in order to explicitly read from nodes directly.

In both the instances I can't see how this translates to me not understanding stuff. That said, distributed systems are all about details, and while I'll try hard to do my best, I can still end with something buggy. I try to improve my knowledge every day and I understand I'm not an expert in the field and that I need to learn, test my designs, make them publish in the hope that other people will analyze them, and so forth. However I believe I'm not the only one that needs to perform this learning process, especially if you want to point at random articles on the web as evidence of my failure, you also need to have a solid understanding of the basic principles.


swoon


He's a wonderful coder, and a great writer. The two together are sadly a rare pairing.


Here's what I'm building my new queue with:

https://github.com/jondot/sneakers

It's a ruby library that implements a Resque-like interface on top of RabbitMQ. Crucially, it doesn't have any opinion on the message contents. It doesn't even deserialize them for you. Most of what it does is manage a unicorn-like process/thread pool plus the associated worker objects, and consumes messages in a thread-safe way. That and the usual configuration boilerplate.

Also check out this branch which implements a max-retries strategy using dead letter exchanges, and is kept up to date with the upstream:

https://github.com/Yesware/sneakers

I imagine it will be merged soon.

The thread/process management is done by the serverengine gem:

https://github.com/frsyuki/serverengine


Sneakers is nice, but keep in mind that RabbitMQ has a huge problem with partitions -- which can occur even on flawless networks, but will certainly occur frequently on VPS systems like AWS and Digital Ocean. See [1] for an excellent analysis.

Unfortunately, once you have a partition there is no way to merge the isolated nodes back into the cluster; since queues "live" on the node they were initially created on, this could be extremely disruptive. The only way to recover is to discard queue data.

I love how RabbitMQ works, but it's terrible in a virtualization environment. Unfortunately there aren't any worthwhile competitors. Kafka comes closest, but it lacks support for topic fanouts, which means every producer has to know about every client, which isn't really acceptable.

[1] http://aphyr.com/posts/315-call-me-maybe-rabbitmq


Fortunately all our jobs are idempotent, and I hope to keep it that way. Postgres keeps track of what's currently "in-flight" so to speak, so my plan for failures is just to re-queue stuff.

The good thing about sneakers is we're not locked into any particular routing scheme or message structure so it'll be easy to move away from once we eventually experience problems.

So, community, am I being naive? Will the idempotency thing not save me?

(Also, lobster_johnson when you say terrible in a virtualisation environment, are you referring to AWS-ish shared VPSs or to a containers-on-colocated-boxes solution, which is what we favor?)


If you can determine what messages need to be re-queued, and easily re-queue them, then you should be fine.

(That does raise the question of what you need a queue for, if your Postgres database keeps your state. Postgres can act as a pretty efficient queue — for one app we do this extensively — especially if you use listen/notify. It won't scale to thousands of requests/sec, but it's simpler to maintain than RabbitMQ, and has the benefit of being fully ACID.)

Still, the RabbitMQ partitioning does exist and requires manual intervention whenever it happens. You can tell RabbitMQ to run in a mode where it will automatically recover from a partition by using the node which has the most connections, and discarding the data on the other conflicting nodes; but the problem in your case would be that you might not be able to tell when to re-queue your messages.


The database doesn't keep all the state, it just knows about which domain objects are currently "awaiting data," which are "processing," which are "failed" and which are "finished." Within the processing state there are several different things that go on, performed by various chained rabbit tasks. These push the data into elasticsearch, where the magic actually happens.

My plan in a complete rabbitmq queue loss scenario is to simply fire the whole process off from the beginning for every row which is in the processing or failed state. Because we have unique deterministic IDs (we just hash the content with blake2b) I'm pretty confident this will work. I should probably do better modelling of how long this would take if it goes down while fully loaded though.

I guess I thought we wanted rabbit for the powerful routing and rerouting logic, but now I think about it again, you could do that pretty elegantly in SQL. I do like postgres. What I really want to avoid at this stage is the process/thread/socket management for the actual workers, which sneakers handles sanely.


My recommendation would be to put your whole system on a test jig and bend it until it breaks.

https://github.com/aphyr/jepsen

Until then, you won't know.


Sounds like a good plan.


https://github.com/chanks/que/blob/master/README.md is a postgresql queue that had been benchmarked at 10k/a on AWS.


The statement about Kafka sounds a bit strange... Producers don't need to know about the consumers/clients. They just post messages to the brokers.


Kafka's topics are also queues, unlike RabbitMQ where a queue is bound to an exchange, and you publish by posting messages to the exchange, not the queue; the exchange determines which bound queues get the message.

This binding uses a routing key, which is a path that supports wildcards. For example, a consumer can bind its queue to an exchange "foo" (representing an app's events) using the routing key "create.photo.*". It will get "create.photo.5378" and so on. It won't get "update.photo.5378".

It's a really nice, flexible system that allows you to separate publishers from consumers without incurring much of a performance hit; if nobody is listening to a particular key, nothing happens, and if multiple groups of consumers listen to overlapping keys, they still get the messages in the correct order, because each message goes into a single queue.

Kafka's topics, being queues, doesn't allow this. Publishing is done to a topic, and consumers must also consume from that topic. One message, one topic.

There are three options:

1. Use a single topic for different (heterogenous) messages. For example, "foo" could be the topic, and each message would contain a field called "path" or similar (eg., "photo.5378"). Each consumer would filter out messages it's not interested in based on this path. This puts a potentially expensive burden on the consumer.

2. Create several topics, and let consumers listen to each one: "foo.create", perhaps (all create events about different objects), or "foo.photo" (all events about photos) or even "foo.create.photo". This adds complexity and will potentially explode the number of queues needed for many apps, object types, events, etc. Moreover, since message delivery order guarantees are per topic, messages across different types can come out of order: deletes before creates, for example.

3. Invert the relationship; a producer must know every possible consumer's range of interest, and post to each of their topics. Foo must know that app "bar" is only interested in photos, and that app "baz" wants events about all types of objects. Of course, this breaks separation of concerns (although the routing table could go into something like ZooKeeper)

Kafka could implement a similar routing system if it wanted to, of course.


Something that is often missing in discussions about queues is a discussion about consistency across queues. This really confuses me and makes me wonder if I'm missing something that everybody else understands.

Let's say I'm building a payment system that needs to report settled cash flows to a separate accounting system. In other words, when Alice clicks "Approve" on a payment that Bob has created, the payment system -- in a single database transaction -- records two accounting entries, updates account balances, finalises the payment status, creates audit entries, and must now send the details of the transaction to the company's accounting system (and presumably an external payment network, which is much the same problem, with just more components to fail along the way).

If you just put that message onto a brokered queue that feeds the accounting system, there is a risk, however small, that the queue loses the message. In some systems, losing a message may not be an issue; in our particular system, losing an accounting transaction is a serious problem so we must deal with the possibility.

I've seen 3 patterns for dealing with this in practice (I'd be very interested if there are others, I can think of a 4th but have not seen it used):

1. a reconciliation process is put in place between the two systems, whereby the payment system delivers a reconciliation file (e.g. daily) and a matching system pairs transactions with a reconciliation file produced by the accounting system; mismatches ("breaks") lead to further investigation

2. the receiver sends an ACK back to the sender; if the sender does not receive an ACK back timeously it may elect to resend, or send an alert to operators, who investigate further

3. distributed transactions: I'm not going to discuss these because they reduce to a single system in analysis of a simple system, and a nightmare in a complex system

(Also you can implement #1 and #2 simultaneously to cover each other; often the #1 process conceptually does more work since it ensures double-entry accounting principles are followed in e.g. financial systems)

#1 is nice and simple, but the downside is it happens as a batch process typically at end of day; in a payments system this can be a serious problem because e.g. in an inter-bank transfer application it impairs effective cash management on nostro accounts at other banks by attracting finance charges on negative balances overnight. You can speed it up by reducing the batch size and increasing the frequency, and when you go far enough you end up with #2.

#2 requires the sender to maintain state about what was put onto the queue and whether it was acknowledged or not. The interesting thing about this is that this state is in effect a copy of the queue's state. Even better, the state here is consistent with the rest of the state in your database. You don't need any ACK protocol or recon so long as you access the database-backed queue transactionally.

So: why introduce a 3rd component, a standalone (typically brokered) queue, between process 1 and 2, when you already have everything you need in the shared database? In my mind it only makes sense when there are sound reasons, e.g. for performance reasons, or there is no shared database because the applications belong to different business units, or to decouple applications that are in the same business unit but conceptually independent. Note that the principle idea in OP's article -- creating a time/processing barrier -- is support by database-backed queues.

While the email delivery queue example in OP's post is a fair example of something you want to decouple from the rest of the application because it's a very generic service, what if you're extending your payment system to produce PDF receipts that are specialised to this system and incorporate data from the system database? This is a high-CPU requirement that you definitely don't need or want on your critical path but would you really deploy a queue component and a separate database to backup, or waste time decoupling schemas for such an intrinsically tightly coupled feature?

What surprises me is how often on sites like HN and StackOverflow the default answer to somebody who wants to know how to offload computation is "use a [brokered] queue". I emphasize "brokered" because the database is very good at implementing queues, but I rarely see a discussion about the reliability of brokered queues and the implications for consistency vs. just using a database-backed queue in your application DB -- often you can't just put things on a queue without making the rest of your application aware of this fact, yet the solution is frequently presented without this caveat, or even an acknowledgement of the alternative. What am I missing?


This paper describes ideas around #2: http://www.ics.uci.edu/~cs223/papers/cidr07p15.pdf

But yes, your database stores the status of a message. At this point you could drop every single message on your queue and have enough information to resend them. Each message could have its own resend (SLA) semantics.

Amazon Simple Workflows is an implementation of this pattern: http://aws.amazon.com/swf/. I've never used SWF but the docs are great food for thought.

[edit]

Also might be of interest to watch Rich Hickey's 'Language of the System'. (https://www.youtube.com/watch?v=ROor6_NGIWU&feature=kp -- there are a few versions of this talk, not sure if this is the exact viewing I saw). The talk is not really about queues, but he tries to break things up a bit. You need a data store, you need a queue, etc. As soon as a queue tries to have durable messages it is becoming a database and has all of the problems a database has to deal with. Instead you could keep data storage being solved by the data storage provider and let queues focus on passing messages.

This raises the issue of how to deal with dropped messages, but that can be solved without durable queues (like that paper describes / SWF / etc).


Thanks for those links. There seem to be some good concepts in there for formulating a solution to my present personal challenge: integrating multiple disparate sources of real-time events, some transient and being delivered with low latency, and some persistent but retrieved with high latency (up to 30min!), which needs to be analyzed (and potentially replayed and reanalyzed), producing a best-effort real-time feature stream while populating (and repopulating) a reliable data warehouse. It's taking me a long time to break the problem down to the right level of components.

> As soon as a queue tries to have durable messages it is becoming a database and has all of the problems a database has to deal with. Instead you could keep data storage being solved by the data storage provider and let queues focus on passing messages.

Yes; mind you, that doesn't exclude the queue from having a persistent backing store (to reduce the instances where your application has to be involved in replay), it just means applications shouldn't use queues as the golden source of events.


Are you talking about a "distributed commit" ? In general, the best way to architect such a system is to model a single "Approve" operation as an FSM of multiple steps. The queue acts as the state-transition table... however all the databases in question (the accounting DB, payment status, etc.) all have the current status of the operation ...

Additionally, you have a garbage-collector "sweeper" that periodically checks the database to make sure that the FSM is not violating any time guarantees. For example, if the entire FSM for the "Approve" action should not take any more than an hour (in the most pathological case), the sweeper reverts back all operations that violate this. this takes care of the case that your queue fails.


Yes I suppose you could call it a distributed commit of a distributed transaction (hence formalised distributed transactions being one solution). And yes, the design you describe is what I have followed in my applications.

What I find odd is that I don't see this pattern being talked about, despite the importance in certain domains... if one goes through a couple of articles on highscalability, it never seems to be discussed. Queues are treated as reliable golden sources. Perhaps it's because a lot of the high volume stuff that's discussed out in the open doesn't need to be 100% reliable (e.g. missing a few Facebook Like click events -- which can be reconciled later anyway) or has a natural replay source (e.g. processing application logs). Software dealing with more business critical scenarios (e.g. financial applications) aren't generally discussed out in the open in detail.


yes - I know what you mean.

Part of the reason is that there has been an entire body of effort into making queues scale and be available. Interestingly, just today I discovered a new queue that I did not know of before, written in Go [1] that keeps everything in memory and tries to fail over.

I think by the time you need a distributed commit, you have the staffing required to build a queue that scales and does not go down. So there is less focus on semantics of distributed commit (which needs a lot of complexity on the application side and is hard to reason about), than to build a scalable queue.

[1] https://github.com/bitly/nsq


If you have this type of situation, Java EE was built for it. Distributed transactions across applications including messaging (JMS) is part of the spec. This is really the (possibly only) domain where you should consider using Java EE, it's what it excels at.


I'm not sure I understand the issue here: If you have an at-least-once queue, it might still be reasonable for the application to directly write to a transaction log somewhere that can be reconciled for auditing, but that audit should always pass (even under fault conditions): there is either a design flaw in the system or malicious insider tampering if the queue does not deliver the message. Is it necessary to audit for that sort of thing on a sub-day level?

It doesn't seem like an appropriate place to put a wait-and-retry loop -- it absolutely breaks the entire point of queueing if the sender does not consider the task completed and flush all of its state once the message has been successfully enqueued.


http://queue.acm.org/detail.cfm?id=2187821

Section: "Zero or more times ... gauranteed"

"When considering the behavior of the underlying message transport, it is best to remember what is promised. Each message is guaranteed to be delivered zero or more times! That is a guarantee you can count on. There is a lovely probability spike showing that most messages are delivered one time.

If you don’t assume that the underlying transport may drop or repeat messages, then you will have latent bugs in your application. More interesting is the question of how much help the plumbing layered on top of the transport can give you. If the communicating applications run on top of plumbing that shares common abstractions for messaging, some help may exist. In most environments, the app must cope with this issue by itself."

At least once, at most once, etc are just probability distributions. All you can count on is zero or more times.


> there is either a design flaw in the system or malicious insider tampering if the queue does not deliver the message

Aside from the partitioning/client issues that hueyp points to, queue managers with transient storage can crash or lose power. Queues managers with persistent storage can suffer corruption or data loss. In this way the 'at least once' "guarantee" will not be met, on the queue side.

I have noticed a model forming these days where you have lots and lots of queue managers that mirror each other and provide a sort of defense-in-depth against failure. It's an interesting approach, and very convenient, but infeasible on a low budget. I think you also need to be very careful with such an architecture because it seems like it could be easy to accidentally break it.


At-least-once[1] queues are sound under bounded partitioning and can withstand any specified amount of data loss[2]. Uncorrected data corruption errors cause whole-system failure in non-distributed systems as well as distributed ones and the solutions are the same in both.

You're not just throwing a bunch of queues at the problem and hoping they stick. Your queue based system is in one of two states: it has a specified amount of fault tolerance based on its design, or it is defective (see the 'call me maybe' posts.) If you have a defective distributed system adding more nodes makes it worse, so it is vitally important to know which one you have and design accordingly.

[1] Calling them zero or more times queues is profoundly bullshit in that it conflates faults (multiple delivery) and failures (non-delivery), which is reliability 101. Failures are expected to propagate; faults must not, and a design which attempts to work around failure instead of just faults is almost certainly a sign of deep confusion.

[2] This doesn't violate CAP because a client must write to enough nodes that it can be certain that it is not part of a doomed partition (potential loss of availability), or pretend the write succeeded without that guarantee (potential loss of consistency).


If I've understood you correctly, you're saying that when designing a distributed system, (1) non-defective database(s) and queue(s) have some degree of fault tolerance, which we can assume for argument's sake is the same (whatever we can do to make our database reliable we can do to make our queue reliable), and (2) whichever fails - database or queue - the resolution is the same in both?

If this understanding is correct, the point of contention is #2.

    Scenario        |  Untracked  |  Tracked  |
    Queue data loss |    (A)      |     (B)   |
    Database loss   |    (C)      |     (C)   |
(A) we can't replay events from database to queue

(B) we can replay events from database;

(C) we have to restore from backup

Obviously the two scenarios are indeed the same if we lose the database (C). But when it comes to losing the queue, if we're tracking what we've put on the queue and whether we've received an acknowledgement or not, we can easily replay events to that queue (B). If we didn't track what we put on the queue, how do you recover in (A)?

As I see it, the database represents a consistent view of the application state and data -- e.g. "payment 123 was settled". If you lose your database, you have to restore, then move upstream and replay events there (if possible). Downstream systems handle duplicates in replay scenarios by discarding them. This is a nice straightforward way to recover. If you lose your queue, it's not an issue, because the queue is just a channel and you replay events over it. What's important is your higher-level protocol across the channel.

In my mind the pattern is: S -> channel -> R. Channel can be a brokered queue, a brokerless standalone queue, async IPC, a thread delivering messages ala ZMQ, a file, a web API, raw UDP, whatever, but either way its the same pattern -- S tracks its sending state, it doesn't trust the channel because if things break you don't have a consistent view of your downstream obligations that you can recover from. The channel's job is to deal with delivery, but it can't address higher-level concerns across the channel.

We put messages onto queues as part of a higher level abstraction -- we use TCP/IP, for instance, as part of an HTTP conversation: if a web browser sends a GET and doesn't get a response, it doesn't just sit waiting forever hoping the TCP/IP stack will figure things out... It recovers and maybe even retries but warns you about the problem so that you notice you typed the wrong IP address in and fix it. TCP/IP is a nice unreliable example, but whatever your channel is, it's going to be unreliable, so in our business applications we need a business-case specific protocol to record our business transactional state as well to handle the failures. If you offload that obligation to the channel, you're stuffed because the channel can't understand your business case and consequently how will you recover?

Unless you mean you can design systems with a degree of fault tolerance so great that its not worth considering failures? That can't be right, what am I missing?


I have no doubt that whenever Salvatore releases a distributed, Redis queue feature it will be amazing. But if you're on AWS, SQS is a solid at-least-once delivery queue service that is hard to beat. (at least when you don't want to manage your own redis machines)


What is your fallback though if amazon loses your confidence? AWS infrastructures seem very nice, but aren't you hitching yourself to their wagon in a way that doesnt give you an easy or cheap exit strategy if things go south?

I keep hoping one day that they will open source the software that runs everything so that worst case scenario you arent SOL if you want to migrate away. Some might say that would lose customers for them, but it would probably gain people like me.


Restrict yourself to a set of simple, generic queue API, .e.g. put, getNext, getWithID. Abstract those APIs and implement with them with SQS first. It's fairly easy to switch to a another implementation (Kafka, activemq, etc.) later. Even in production, you can easily switch off the producers, drain the queue, move to the new queue, then restart.


If you have a lot of tooling targeting AWS, but want some reassurance for the tools (i.e. not speaking to the data, merely the APIs) then Eucalyptus (https://www.eucalyptus.com/) which is Open Source (https://github.com/eucalyptus) under the GPLv3 might be a way out of that lock-in.

I haven't yet taken the time to provision a local Eucalyptus cluster to test the actual versus the marketing, but it certainly seems like they have a lot of momentum which I wouldn't expect if it didn't do as advertised.


If amazon loses my confidence I think I'd start shopping at Walmart before I switch infrastructure platforms... I see AWS as a century old business, just in the same way PG&E and utility companies of the 20th century have developed a stronghold in energy distribution.

But if I had to host one myself, http://nsq.io/ is the way to go.


SQS is certainly simple but I've had race condition issues with it in the past.


Are you deleting messages after you receive them? I haven't run into any race conditions but I suppose if I did, I'd use redis's HyperLogLog or something similar to see if I've already processed the message.


HyperLogLog is for cardinality estimation, not set membership. A bloom filter would be more appropriate I believe.


This looks like something that is very useful for low latency applications which don't do a lot of processing. Have you looked at Hazelcast(http://hazelcast.org/).


see http://aphyr.com/posts/283-call-me-maybe-redis for a discussion of distributed behaviour of redis.


Redis + asynchronous replication + failover is a given distributed system (eventual consistency + merge function being picking a single timeline, so it has very limited write safety during certain partitions). When you use N Redis instances as a building block for a distributed system, you don't incur in the limitations of Redis asynchronous replication + failover model. As written in the article the queue system I'm designing uses N independent Redis instances.




Join us for AI Startup School this June 16-17 in San Francisco!

Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: