This arch is how the big players do it at scale (ie. datadog, new relic - the second it passes their edge it lands in a kafka cluster). Also otel components lack rate limiting(1) meaning its super easy to overload your backend storage (s3).
Grafana has some posts how they softened the s3 blow with memcached(2,3).
Grafana Labs employee here => On the linked articles: I'm not aware of any caching being used in the writing data to S3 part of the pipeline other then some time based/volume based buffering at the ingester microservices before writing the chunks of data to object storage.
The linked Loki caching docs/articles are for optimising the read access patterns of S3/object storage, not for writes.
I heard several times that Kafka was put in front of elasticsearch clusters for handling traffic burst. You can also use Redpanda, Pulsar, NATS and other distributed queues.
One thing that is also very interesting with Kafka is that you can achieve exactly-once semantic without too much efforts: by keeping track of the positions of partitions in your own database and carefully acknowledging them when you are sure data is safely stored in your db. That's what we did with our engine Quickwit, so far it's the most efficient way to index data in it.
One obvious drawback with Kafka is that it's one more piece to maintain... and it's not a small one.
> exactly-once semantic without too much efforts: by keeping track of the positions of partitions in your own database and carefully acknowledging them when you are sure data is safely stored in your db
That's not really "exactly once". What happens when your system dies after it made sure the data is safely stored in the db and before ack-ing?
Depending on how you use the database it is. If you write the data as well as the offset to the DB in the same transaction, you can then seek to the offset stored in the DB after application restart and continue from there.
You should drop "(...) and carefully acknowledging them when you are sure data is safely stored in your db (...)" part then, because it means it's not necessary, you don't rely on it.
One-or-more semantics + local deduplication gives one-and-only semantics.
In this case you're optimising local deduplication with strictly monotonic index.
One downside is that you leak internals of other system (partitions).
The other is that it implies serialised processing - you can't process anything in parallel as you have single index threshold that defines what has been and what has yet not been processed.
I'm not the one who wrote the original comment, so I can't modify it. But one should still commit offsets because it is the happy-path; DB transaction successful? Commit offset. If the latter fails due to e.g application crash and you seek at startup to the partition offset stored in the DB + 1, you get exactly-once semantics. There's some more details, e.g you'd have to do the same during consumer group rebalance, and topic configuration also plays a role, for example if the topic is a compacted topic or not, and if you write tombstones, what its retention policy is.
edit: You added some more to your comment after I posted this one, so I'll try to cover them as well:
> One downside is that you leak internals of other system (partitions).
Yeah, sure.
> The other is that it implies serialised processing - you can't process anything in parallel as you have single index threshold that defines what has been and what has yet not been processed.
It doesn't imply serialised processing. It depends on the use-case, if each record in a topic has to be processed serially, you can't parallelize full-stop; number of partitions equals 1. But if each record can be individually processed you get parallelism equal to the number of partitions the topic has configured.
You also achieve parallelism in the same way if only some records in a topic needs to be processed serially, at which point you can use the same key for the records needing to be serially processed and they will end up in the same partition, for example recording the coordinates of a plane - each plane can be processed in parallel, but an individual plane's coordinates need to be processed serially - just use the planes unique identifier as key and the coordinates for the same plane will be appended to the log of the same partition.
Presumably every exactly once processing scenario needs you to squeeze things through a serial pipe at some point, or you could have 2 messages with the same ID come in and be processed in parallel?
Yes but scope/blocking/serialisation can be narrow or wide - ie. it can be per message id (highly parallel, more state to persist, one entry per id) or one for all messages of certain type/partition (not parallel, less state required, single last index for all messages of that kind).
> The other is that it implies serialised processing - you can't process anything > in parallel as you have single index threshold that defines what has been and > what has yet not been processed.
Fortunately Kafka is partitioned. You cannot work in parallel along partitions.
Also, you can streamline your process.
If you are running your data through operation (A, B, C).
(C on batch N) can run at the same time as (B on batch N+1), and (A on batch N+2)
Good point: first you're right, we do the ack on Kafka but it's not necessary.
Second, this is not what I wanted to stress... and I should have not used the verb "acknowledge". What we do is upload the data on S3, then we commit partitions + positions in what we call the metastore. I can't edit my comment unfortunately.
> One downside is that you leak internals of other system (partitions).
True, but we generalized the concept of partitions for other datasources, pretty convenient to use it for distributing indexing tasks.
There has to be a retry system somewhere, otherwise you'd end up with a 0-or-more delivery system if the app crashes after picking up from the queue, but never processing or ack-ing.
You have to do a bit more than that if you want exactly once end-to-end (I.E if Kafka itself can contain duplicates). One of my former colleagues did a good write up on how Husky does it: https://www.datadoghq.com/blog/engineering/husky-deep-dive/
If you have distributed concurrent data streams that exhibit coherent temporal events, than at some point you pretty much have to implement a queuing balancer.
One simply trades latency for capacity and eventual coherent data locality.
Its almost a arbitrary detail whether you use Kafka, RabbitMQ, or Erlang channels. If you can add smart client application-layer predictive load-balancing, than it is possible to cut burst traffic loads by a magnitude or two. Cost optimized Dynamic host scaling is not always a solution that solves every problem.
A similar idea [^1] has cropped up in the serverless OpenTelemetry world to collate OpenTelemetry spans in a Kinesis stream before forwarding them to a third-party service for analysis, obviating the need for a separate collector, reducing forwarding latency and removing the cold-start overhead of the AWS Distribution for OpenTelemetry Lambda Layer.
Seems like overkill no? Otel collectors are fairly cheap, why add expensive Kafka into the mix. If you need to buffer why not just dump to s3 or similar data store as a temporary storage array.
> If you need to buffer why not just dump to s3 or similar data store as a temporary storage array.
At that point it's very easy to sleepwalk into implementing your own database on top of s3, which is very hard to get good semantics out of - e.g. it offers essentially no ordering guarantees, and forget atomicity. For telemetry you might well be ok with fuzzy data, but if you want exact traces every time then Kafka could make sense.
Yeah, and to use S3 efficiently you also need to batch your messages into large blobs of at least 10s of MB, which further complicates the matter, especially if you don't want to lose those messages buffers.
if your otel collector is being overwhelmed. In such cases you have a lot of backlogged data not able to be ingested. So you dead letter queue it to s3 for freeing up buffers.
The approach here is to only send data to s3 as a last ditch resort.
If you're ok with losing some data when your collectors are overwhelmed, surely you'd just drop overflowing data in that case? Why go to all the effort of building a fallback ingestion path if it's not going to be reliable?
it's very hard to think s3 work as a buffer. Every datastore can work for almost all storage usecases buffer/queue/db when the scale is low but the latter were designed to work at scale
I expect it would be far cheaper to scale up tempo/loki than it would be to even run an idle kafka cluster. This feels like spending thousands of dollars to save tens of dollars.
When handling surges of the order of 10x, it's much more difficult to scale the different components of loki than to write them to Kafka/Redpanda first and consume at a consistent rate.
Are there any client side dynamic samplers that can target a maximum event rate? Burstiness with otel has been a thorn in everything that uses it from my experience and it's frustrating.
It'd be nice to have something simpler as an otel processor. Otel could just dump events to local disk as sequential writes then read them back, load permitting.
I'm curious how long things stay in Kafka on average and worse case. If it's more than a few minutes, I imagine it lowers the quality of tail based sampling.
Grafana has some posts how they softened the s3 blow with memcached(2,3).
1. https://github.com/open-telemetry/opentelemetry-collector-co... 2. https://grafana.com/docs/loki/latest/operations/caching/ 3. https://grafana.com/blog/2023/08/23/how-we-scaled-grafana-cl...
I know the post is about telemetry data and my comments on grafana are logs, but the arch bits still apply.