I've been really interested in this architecture since Jay Kreps' blog post on it. One part that I'm less clear on is how this fits in with request-response style communication between, say, a Web browser and a Web server.
In a simple Web-app-writes-to-DB scenario, it's easy to read my writes, but with a async log processing system, how am I supposed to organize my code so I can read my writes and respond with useful information?
Maybe the solution is to eschew request-response entirely and have all requests return 200, then poll or use two-way communication?
Alternatively, I could have my log-appending operation return a value indicating the position in the totally-ordered log, which I could pass to the query interfaces as a way of indicating "don't return until you've processed at least to here." Does anyone do that?
Am I totally off base here? I'd love to hear from anyone who is using these kinds of systems today.
Here is how I think about this, there are three high level paradigms you see for processing:
1. Request/response (e.g. most UI actions, REST, etc)
2. Stream (e.g. subscribing to a Kafka topic)
3. Batch/periodic (e.g. Hadoop, DWH)
You actually need all of these at least somewhere in a company, and they each have their place.
The dividing line for request/response is that someone is waiting on the other end and that if the request fails you can just show them an error and continue on your way. So when a web service gets overwhelmed it usually just times out requests.
Consider an e-commerce site as an example:
1. Request/response
Displaying a product
Making a sale
2. Stream or batch
Restocking
Logistics and shipping
Price adjustments
Analytics
Product catalog import
Search index updates
Etc
The later category is asynchronous so it can be done in a batch fashion (once an hour or day) if latency is not a concern, or in a streaming fashion if it needs to be faster.
To connect a request/response system to a log-oriented system such as presented here,
an option is to use the request/reply server as a proxy for the log-oriented one :
- producing the requests to the log-oriented system,
- consuming the responses at some endpoint/topic of the dataflow,
- linking responses to requests using some id propagated along the whole dataflow,
- dealing with a cache of pending requests, asynchronous responses and timeout
In practice, to keep things manageable - latency and count of pending connections,
we can't always wait for the very end of the request processing dataflow;
but we can at least choose an intermediate log/topic where progress is sufficient
to forge a response or to reply with 202 accepted request.
Typically, you cache the last write somewhere close so that you can manually splice it into the corresponding page render. Then you engineer the rest of the system such that the log processing system is rarely/never falling behind.
Afterwards, either poll or communicate over sockets back to client side. If there's workers involved on the server: add socketid in the message envelope so it can be matched on return.
I've really learned a lot from these Confluent posts about building log-based architectures, but I feel like they're rehashing the same high-level architectural ideas again and again.
I'm already sold on this idea, and would really love to see more posts that get into the nitty-gritty details of how to integrate Kafka, how to migrate an existing infrastructure, case studies, sample code, etc. It all seems very handwavy otherwise.
Has anyone thought of how the distributed commit log can be extended to client-side FRP?
Elm has this notion of signal – http://elm-lang.org/learn/What-is-FRP.elm – which is really a stream of changing values, that are used to construct varying model and its rendered view (virtual DOM and all that).
I am wondering if we can merge these two notions – signals and commit logs. Consequently, this would replace the traditional "request-response" model in REST API with nothing but signals, thus leveraging the simplicity of FRP for the whole application. Client side Elm code does a "send" on the signal that is connected to the server-side commit log, and also reads from another signal connected to another log that receives new data (added from various places).
Martin's talks and blog posts are aways awesome. I'm really excited to see how this plays out for real applications.
The one thing I'm always somewhat confused by though is how a "totally ordered log" intersects with the reality of a partitioned log. The simplicity of a log seems to break down a bit when you partition.
For instance, imagine I want to implement multi-key transactions on top of a distributed datastore. With a totally ordered log this is easy. But with a partitioned log, it becomes much harder.
Alternatively, imagine I want to implement a collaborative editing app like Google Docks or something like Slack. A natural design would be to to have millions of independent logs. I can then replay logs to get current state and watch logs to keep it updated. But as far as I'm aware, partitioned logs like Kafka do not actually support millions of topics. So there's no way to replay a log for something like a channel or document.
One thing to realize is that a partitioned log is a generalization of an unpartitioned log (i.e. if you set # partitions = 1 in a partitioned log you have an unpartitioned log).
In Kafka the purpose of partitions is to provide computational parallelism not model entities in the world. So if you have 100m users you would map that into a number of partitions based on your computational parallelism (maybe 10-100 machines/processes/threads). In other words you would have a single topic partitioned by user id, not a topic per user.
If you have a centralized relational database that maps reasonably well to a single partition log (both in terms of scalability and guarantees).
For distributed databases you generally don't have a total order over all operations. What you usually have is (at best) a per partition ordering, which maps well to a partitioned log as well.
For applications that record events (logging or whatever) it is natural to think of each application thread or process as a kind of actor with a total order.
Yeah, this is a good way to look at it. But it's also my point. Partitions are about parallelism and don't always fit the data model or domain. While you can reduce the partitions to 1, this is limits parallelism. It's not always easy to design a partitioning scheme that preserves the replication semantics you want. And these semantics vary from having something totally ordered, to having something that you can replay on a very fine-grained level (to replicate to the client for instance.) Most discussions of the advantages of logs really emphasize how amazing a totally ordered log is for replication, but that's not actually what production deployments look like so you still need to think carefully about what happens when writes are being applied to your datastores without a clear order.
What Kafka gets right is that it explicitly addresses the fact that if you add parallelism, order goes out of the window (modulo partition key).
This is something that tends to surprise developers early on (myself included, years ago). But plenty of people still use queue solutions like RabbitMQ without thinking it all the way through.
Unfortunately, partitioning introduces a design step that makes it a little harder to make processing generic. With RabbitMQ you just post to an exchange and let queues (ie., consumers) filter on the routing keys; if no queues have been bound, for example, messages don't go anywhere. If you want, or don't want, parallelism, you just run either multiple consumers or just one. With Kafka, you need to decide beforehand, and design the "topology" of your log carefully, not just for the producer, but for each consumer. When producers and consumers are different apps, this starts smelling like a violation of the principle of "separation of concerns".
I rather wish Kafka had a better routing mechanism, actually. I don't see any reason why it couldn't have routing keys, just like RabbitMQ.
You could always map the document id or channel id to be always the same partition, then that particular document or channel log would be ordered. Seems like it would work.
if you want to build a new derived datastore, you can just start a new consumer
at the beginning of the log, and churn through the history of the log, applying
all the writes to your datastore.
For high-throughput environments with lots of appends to the log, how do you get around the ever-increasing size of your log file? I know the traditional answer is to take a periodic snapshot and compact the previous data, but is that built in to tools like Kafka?
There's a log compaction cleanup policy yes. Never used it myself but if I'm not mistaken it works like this: for each message you send to Kafka, you set a key with it. When Kafka does log compaction, it keeps only the last value for each key.
The other cleanup policy is to just have a retention time. After X minutes/days/weeks segments of the log are simply deleted.
That sounds great if your messages in the logs are the complete state for that key, but I'm not seeing how to use that compaction system if the messages are change events.
Is there a system designed for snapshotting the aggregate and logging the delta?
It's easy to store messages in HDFS or S3 for long-term storage. It's also easy to replay messages from those mediums, if you need to re-ingest data later on.
One idea is to shard the logs. By analogy with git: any given repo has a log of its commits, but you can have as many repos as you like.
It does limit throughput for any given shard, though, and then you're left with a distributed transaction problem to solve when you need to commit changes to objects in different repos.
The proposed architecture really works well for me. I've used it for a couple of projects now.
To throw around some terms for those interested in reading up/background:
- The separation of writes (through log) from reads (through any of the consumers) is sometimes called: CQRS (command query responsibility separation)
- having a centralized log as the defining store for updates/ change events is sometimes called: eventsourcing
- as mentioned in article: elastic search as a consumer of the log, which only gets updates through the log, is an example of an Eager Read Derivation.
> having a centralized log as the defining store for updates/ change events is sometimes called: eventsourcing
To clarify slightly: Event Sourcing isn't just emitting events for others, it means that your system is re-reading its own log to derive current state from past-events.
For performance reasons you try to avoid this with stuff like periodic "snapshots", but the capability of has to be there.
This is exactly what I thought of as soon as I read it.
"Hey, this is just CQRS by a different name!"
It works well, but it is very hard to get large teams of people to all do things this way. All it takes is for one guy to do a direct write, and his reviewer to miss it, and boom, inconsistency.
That talk was great. We introduced Kafka at my work probably 3-4 months ago, at first only to track events from our webservices, but eventually it became the backbone of communication between our services.
The Java library for the consumer part, still based on the Scala code, is not that great though. They're rewriting a Java-only library, which is much nicer to use, but I'm not sure when it'll be stable.
I have actually found myself thinking a lot about logs lately and how I can end up using them for a lot of problems, and are sometimes very simple to implement. But I always wondered if I was actually using the right tool for the job...
I had no idea they had such far-reaching implications and so many jobs where this is the right tool.
Firstly, we are not doing centralized blockchains, we're doing permissioned blockchains which work most efficiently when actors within the system have been identified. These blockchains are simply one element of the Eris Blockchain Application platform we are developing.
Comparing a smart contract enabled blockchain to a distributed messaging broker is a difficult proposition as they are very much apples and oranges in their capabilities, usage, and goals.
That said let me give it a go.
## Similarities
* Kafka and eris:db both work on a distributed model
* Kafka and eris:db both are inherently scalable and persist their data
* Kafka and eris:db both include a distributed messaging layer optimized for their system which is not http
## Differences
* eris:db persists using merkle-ized encoding schemes with all data having been digitally signed via a node using an ECC-PKI and verified by a logic gateway (known commonly as a smart contract) prior to being persisted -- eris:db further builds logs into blocks and chains those together using traditional blockchain techniques; kafka uses a relatively sophisticated, but not inherently as verifiable "traditional" logging mechanism
* eris:db, as with any blockchain, includes a [fork choice rule](https://eng.erisindustries.com/blockchain/2015/04/30/on-bloc...) for resolving differences within the nodes on the network as to prior history as well as adding a layer of byzantine fault tolerance; from my (admittedly cursory) overview of kafka while it is distributed it is not byzantine fault tolerant
* kafka is fast and meant as a near real time message broker; eris:db is not built for speed and sacrifices some availability for consistency
## Overall
Message brokers are great complements to blockchains actually as they can provide an additional speed layer for messages which blockchains are not particularly good at.
The Kafka ecosystem at LinkedIn processes nearly a trillion messages per day, which amounts to receiving over 10 million messages per second at the busiest times of day. The Bitcoin blockchain, by comparison, can handle 7 transactions per second.
In a simple Web-app-writes-to-DB scenario, it's easy to read my writes, but with a async log processing system, how am I supposed to organize my code so I can read my writes and respond with useful information?
Maybe the solution is to eschew request-response entirely and have all requests return 200, then poll or use two-way communication?
Alternatively, I could have my log-appending operation return a value indicating the position in the totally-ordered log, which I could pass to the query interfaces as a way of indicating "don't return until you've processed at least to here." Does anyone do that?
Am I totally off base here? I'd love to hear from anyone who is using these kinds of systems today.