go distributed task queue

Building a Distributed Task Scheduler in Go

System design & implementation.

go distributed task queue

Introduction

In this issue, we’ll discuss how we can design a distributed task scheduler, the motivation behind designing such a system, and then implement the same using Go.

You can find the complete code that we’ll be working towards on my github .

The system we talk about here is inspired by Reliable Cron Across the Planet , which is a research article by folks at Google, as well as the insights I picked up in Arpit Bhayani’s System Design Masterclass (which I highly recommend.)

Task Scheduling

Task scheduling is a common and widely needed use case, and every unix machine has a system utility to aid with it. This utility is called Cron and it’s used to run user and system-defined tasks at scheduled times (and intervals).

crond is a singular component that enables cron to do its job - this is a daemon that loads the list of scheduled cron jobs, sorted based on their next execution time. The daemon waits until the first item is scheduled to be executed - which is when it launches the given job, calculates the next time to launch them, and waits until the next scheduled time.

Why are we complicating cron?

Reliability.

It’s easy to see the reliability issues with cron -  its operation is confined to a single machine responsible for running the scheduler. Consequently, if the machine is offline, the scheduler is inactive, leading to a halt in the execution of scheduled tasks.

Failure Modes

Diving deeper into the failure modes, let us see the kinds of tasks for which one might want to use a scheduler. Some of these tasks can be idempotent, which means in the event of a malfunction it is safe to execute them multiple times.

For example, repeatedly running a job to retrieve the current weather poses no significant issue. On the other hand, certain tasks, like dispatching bulk notifications or processing employee payroll, are sensitive to duplication and should ideally be executed only once.

Similarly, it’s probably okay to skip launching certain jobs in the event of failures - such as updating a weather widget on your laptop every 15 minutes, however, it’s not acceptable to skip the job to process employee payroll that is scheduled to run once every month.

We're immediately confronted with the first trade-off we need to make: should we risk skipping a job or risk executing the same job multiple times ? For a lot of use cases, it may make more sense to favor skipping launches over risking duplicate executions. The reasoning behind this choice is straightforward. Setting up monitoring systems to detect skipped or failed launches is typically simpler and more effective than rectifying the consequences of a job that runs multiple times. For instance, reversing the effects of disbursing employee salaries twice or sending a newsletter email multiple times can be complex, if not impossible, in some cases.

For the sake of simplicity, we’ll try to strike a balance between the two options . We will launch any tasks that may have been scheduled to run at a given time or in the past, while also attempting to ensure that the same tasks are not dispatched multiple times during an execution run. Such systems can have extremely sophisticated edge cases and we won’t attempt to handle every single one.

Distributed Cron

To tackle the reliability issues with running cron on a single machine, let’s discuss how we may want to make the application distributed.

A core component of cron is the cron table - which manages the state of the system. The scheduler references this table to decide which task needs to be executed at a given moment. This state is one of the critical aspects of the system that needs to persist across system restarts and failovers. You can go one of two ways to store this state:

Store the state externally in externally distributed storage (eg. HDFS)

Distributed, and therefore reliable.

Optimized for large files (while cron needs to store very small files, which are expensive to write and have high latency).

Introduces an external dependency to a system that may need to be highly available. In case the cron service is a core part of the infra, one should prioritize minimal external dependencies.

Store the state within the cron nodes itself

Need to manage state consensus.

The core system state is not stored in an external dependency.

Storage can be highly optimized for the use case.

More complicated to implement.

While option 2 may make more sense at Google’s scale and SLAs, option 1 is simpler to implement. We’ll be following an approach similar to option 1, by using Postgres to manage the state for our scheduler.

Designing a Distributed Task Scheduler

We'll start from the user's initial interaction and build out the components incrementally. Here's how the system can be structured:

For this design, let’s assume that a task that a user submits has the following information:

A string representing the command or script the worker node should execute.

The time at which the task is supposed to be executed.

Optionally, the user can set up recurring tasks, but let’s not care about that at this stage - since recurring tasks are nothing but regular tasks scheduled multiple times.

When a user submits a task to the system, we need to store it in whatever storage system our scheduler is using for state management. We’ll be using Postgres, however, the exact database you use does not matter, as long as it provides certain levels of guarantees that we expect from the system (which we will soon discuss).

Database Schema Design

Any task that the user submits should be stored inside our Tasks table. The table stores the following information: uniquely identifies a scheduled task in the system.

Command (string) : command that needs to be run to execute the task, this is submitted by the user.

scheduled_at (unix timestamp) : When the command should be executed.

Further, you may be tempted to store the Status of the task in this table, which would tell us whether the task is queued , in-progress , completed , or failed . However, that introduces a problem. What if the machine that is executing our task (and is supposed to inform us of the task completion) crashes? The status in the table would become inconsistent.

Instead, we’ll adopt a more detailed tracking approach.

Execution Phases Timestamps

For each task, we'll store timestamps for different execution phases (e.g., picked_at (when the task is picked up for execution), started_at (when the worker node started executing the task), completed_at (when the task is completed), failed_at (when the task failed). This approach offers several benefits:

Status Derivation : The current status of each task can be derived from these timestamps. For example, if a task has a 'started_at' but no 'completed_at' timestamp, it's in progress.

System Health and Load Insights : By analyzing the timestamps, we can gauge system performance and load, understanding how long tasks are in the queue, execution times, and if there are any unusual delays.

Recovery and Redundancy : In case of crashes, this detailed historical record allows the system to determine what was happening at the time of failure and take appropriate recovery actions, such as rescheduling tasks or alerting administrators.

go distributed task queue

With this, we have built the scheduling components of our system.

go distributed task queue

Task Picking

In a distributed task scheduler, effectively managing task execution is crucial. We face a significant challenge: How to handle potentially thousands of tasks scheduled at the same time without overwhelming the database with requests or causing execution conflicts.

Challenges and Considerations

Independent Worker Fetching Drawback : Initially, one might consider allowing workers to independently fetch tasks from the database. However, this approach is prone to causing excessive concurrent connections and overwhelming the database in a large-scale environment.

Need for a Coordinated Approach : To address the limitations of independent fetching, we introduce a coordinator. This component is responsible for picking tasks currently scheduled for execution and delegating them to workers.

Implementing Coordinator-Led Task Assignment

The coordinator will fire queries on the database every few seconds to pick the tasks currently scheduled for execution. Let’s see what such a database query may look like. The coordinator will start a transaction, and fire the following SELECT query.

The above SQL query fetches all the tasks that are scheduled to be executed before the next 30 seconds and have not yet been executed.

For this query to work, every time the coordinator picks a task for execution, it must update the picked_at field. We can fire the following UPDATE query as a part of the overall transaction for each task that we pick:

Remember, the coordinator should be a distributed service, so that it can scale up based on the load on the system. This would mean if multiple coordinators fire this query on the database at the same time, they will end up picking the same tasks - causing multiple executions. To deal with this, each coordinator needs to lock the rows that it is working on.

We can make use of the FOR UPDATE clause to take an exclusive lock on the rows that each coordinator is working on. This means, that the transactions started by the other coordinators won’t be able to select these rows while the current coordinator is operating on them.

There is still one problem with this query. Say 10 different coordinators fire the select query at the same time, each of them will try to pick up the top 10 tasks that are scheduled for execution. However, only one of them will be able to acquire a lock on these 10 rows. This means the rest of the 9 coordinators will end up blocking in contention over the same 10 rows that the first coordinator is already processing.

The SKIP LOCKED clause is introduced to prevent coordinators from blocking each other over the same tasks. This allows each coordinator to bypass tasks already locked by others, ensuring smooth and efficient task distribution.

Once a coordinator has selected a bunch of tasks, it can start delegating them to the workers in its worker pool.

The execution phase is straightforward. Workers remain on standby, awaiting tasks from the coordinator. The workers maintain an internal pool of threads to execute the tasks they receive. Any tasks sent to the workers get added into a thread-safe channel from where the threads can pick up these tasks.

The moment a task is assigned, they begin execution. They promptly notify the coordinator upon starting a task, enabling it to update the task's status in the database accordingly. Likewise, upon task completion or failure, the workers communicate the outcome back to the coordinator for a further update.

Therefore, our system ends up as follows

go distributed task queue

Implementation

It’s hard to discuss code in a newsletter, so I decided to go over the implementation on my YouTube channel. You can check it out here:

Subscribe to the YouTube Channel

In this article we discussed the design of a distributed task scheduler, offering a way to overcome the limitations of regular cron. We explored key concepts like task scheduling with detailed timestamps, and task picking with efficient locking strategies. Finally, we went over the code to understand how you can build such a system in Go.

go distributed task queue

Ready for more?

Golang Libraries, Apps, Golang Jobs and Go Tutorials

Golang News, Tutorials, Go Libraries and Golang Jobs

  • Golang Jobs / Go Developer Jobs
  • Go/JS Full Stack Development

A Reliable, simple & efficient distributed task queue in Golang

A Golang library for queueing tasks and processing them asynchronously with workers. It’s backed by Redis and designed to be scalable yet easy to start.

A high-level overview of how the library works:

  • The client puts tasks on a queue
  • The server pulls tasks off queues and starts a golang worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

The task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to horizontal scaling and high availability.

Example use case with some golang workers

Features, implemented with golang.

  • Guaranteed  at least one execution  of a task
  • Scheduling of tasks
  • Retries  of failed tasks
  • Low latency to add a task since writes are fast in Redis with Golang
  • De-duplication of tasks using  unique option
  • Allow  timeout and deadline per task
  • Allow  aggregating group of tasks  to batch multiple successive operations
  • Flexible handler interface with support for middlewares
  • Ability to pause queue  to stop processing tasks from the queue
  • Periodic Tasks
  • Support Redis Cluster  for automatic sharding and high availability
  • Support Redis Sentinels  for high availability
  • Integration with  Prometheus  to collect and visualize queue metrics
  • Web UI  to inspect and remote-control queues and tasks
  • CLI  to inspect and remote-control queues and tasks
  • Automatic recovery of tasks in the event of a worker crash
  • Weighted priority queues
  • Strict priority queues

go distributed task queue

Golang Libraries, Apps, Golang Jobs and Go Tutorials 2024 . Powered by WordPress

taskq

Golang asynchronous task queue with Redis, SQS, and IronMQ backends

Getting started

Familiar API

Using taskq, you can turn any Go function into an asynchronous task.

Multiple backends

taskq supports Redis, Simple Queue Service (SQS), IronMQ, and in-memory backends.

OpenTelemetry

Monitor performance and errors using OpenTelemetry instrumentation.

Browser not supported

This probably isn't the experience you were expecting. Internet Explorer isn't supported on Uber.com. Try switching to a different browser to view our site.

Uber logo

Schedule rides in advance

Cherami: Uber Engineering’s Durable and Scalable Task Queue in Go

Featured image for Cherami: Uber Engineering’s Durable and Scalable Task Queue in Go

Cherami is a distributed, scalable, durable, and highly available message queue system we developed at Uber Engineering to transport asynchronous tasks. We named our task queue after a heroic carrier pigeon with the hope that this system would be just as resilient and fault-tolerant, allowing Uber’s mission-critical business logic components to depend on it for message delivery.

Cher Ami was a US Army carrier pigeon in World War I. Despite being shot in the leg, she successfully delivered a message that helped save 194 lives.

Introduction

A task queue decouples components in a distributed system and allows them to communicate in an asynchronous manner. The two communicating parties can then scale separately, with the added features of load smoothing or throttling. In complex distributed systems, a task queue is essential. Cherami fills a role equivalent to Simple Queue Service (SQS) in Uber’s infrastructure ecosystem. Building our own system achieves better integration with our existing infrastructure while addressing some unique product development needs, like support for multiple consumer groups and increased availability, especially during network partition.

Cherami’s users are defined as either producers or consumers . Producers enqueue tasks. Consumers are worker processes that asynchronously pick up and process enqueued tasks. Cherami’s delivery model is the typical Competing Consumers pattern, where consumers in the same consumer group receive disjoint sets of tasks (except in failure cases, which cause redelivery). Using this model, work fans out to many workers in parallel. The number of workers is independent of any partitioning or sharding mechanisms internal to Cherami and can scale up and down simply by adding or removing workers. If a worker fails to perform a task, another worker can redeliver and retry the task.

Cherami also supports multiple consumer groups, where each consumer group receives all tasks in the queue. Each consumer group is associated with a dead letter queue . Tasks that exceed the maximum retry count (for example, “poison pills”) land in this queue so that the consumer group can continue processing other messages. These consumer handling features both distinguish Cherami from the simple message buses that are typically used in big data ingestion and analytics (e.g. Apache Kafka ), and make Cherami advantageous in task queue use cases.

Producers enqueue tasks into queues A and B. Queue A feeds to two consumers groups that both receive all tasks, distributed across consumers within the respective group. Queue B feeds to only one consumer group.

Prior to Cherami, Uber used Celery queues backed by Redis for all task-queue use-cases. The combo of Celery and Redis helped Uber scale quickly, up to a point. The drawbacks? Celery is Python-only, while we were increasingly relying on Go and Java to build higher performance backend services. Furthermore, Redis stores are memory-backed, which isn’t as durable or scalable as we needed.

We needed a longer-term solution for Uber’s future, so we built Cherami to satisfy these requirements:

  • Durability, losslessness, and tolerance of hardware failures
  • Flexibility between availability and consistency (AP vs CP) during network partition
  • Ability to scale the throughput of each queue up and down easily
  • Complete support for the competing-consumers consumption model
  • Language agnostic

To satisfy those requirements, Cherami’s design follows these design principles:

  • We choose eventual consistency as a core principle. This allows high availability and durability, with the tradeoff that we don’t provide ordering guarantees. However, that means that we can continue accepting requests during catastrophic failures or network partitions, and further improves availability by eliminating the need for a consistent metadata storage like Zookeeper .
  • We chose not to support the partitioned consumer pattern, and we don’t expose partitions to the user. This simplifies consumer worker management, as workers don’t need to coordinate which partition to consume from. It also simplifies provisioning, since both producers and consumers can scale independently.

In the following sections, we further elaborate on key design elements of Cherami and explain how we applied the design principles and tradeoffs.

Cherami’s Key Design Elements

1. failure recovery and replication.

To be truly lossless and available, Cherami must tolerate hardware failures. In practice, this requires Cherami to replicate each message across different hardware so that messages can reliably be read, but Cherami must also be able to accept new messages when hardware fails either transiently or permanently.

Cherami’s fault tolerance comes from leveraging the append-only property of messaging systems and using pipelining in message transfers. Each message in a message queue is a self-contained element that, once created, is never modified. In other words, message queues are append-only. If the storage host containing the queue fails, we can pick a different storage host and continue writing to it. The enqueue operation continues to be available.

A Cherami queue consists of one or more extents , which are conceptual substreams within a queue that independently support appending messages. Extents are replicated to the storage layer by a role called input host . When an extent is created, its metadata contains an immutable host information tuple (input host and list of storage hosts). In each storage host, the replicated copy of the extent is called a replica , and a storage host can host many replicas of different extents. If a single storage host fails, we don’t lose messages because the extent is still readable from other replicas.

Producers connect to the specific input host to publish to an extent belonging to some queue. Upon receiving messages from a producer, the input host simultaneously pipelines the messages into all extent replicas through a WebSocket connection, and receives acknowledgements ( acks ) from the respective replicas in the same connection.

Pipelining means the input host does not wait for an ack before writing the next message, and that there is no message reordering or message skipping between the input host and all replicas. This also applies to the acks that return from each replica; acks come in the order of corresponding writes. The input host tracks all acks. Only when all storage hosts ack receipt of the same message are received does the input host ack to the producer. This final ack implies that the message has been durably stored in all replicas.

Within each extent, messages are ordered due to the pipelining property. This ensures messages across all replicas are consistent, except for the tails where a storage host has yet to persist the messages.

Input host only receives acks for the first three messages from all storage hosts. It acks the first three messages to the producer, as those messages are guaranteed to be fully replicated.

When any replica fails, the input host cannot receive acks from that replica for any further writes. Thus, this extent is no longer appendable. If the input host fails, we would lose the inflight acks from the storage hosts. In both cases, the tails of the replicas can be inconsistent: one or more messages are not replicated in all replicas. To recover from this inconsistency, instead of trying to scan and repair the tails, which is a complicated operation, we simply declare this extent “sealed” as-is; it’s readable, but no more writes are allowed.

After sealing, Cherami creates a new extent for this queue, and a signaling channel notifies producers to reconnect and publish into the new extent. If a queue consists of only one open extent, sealing it would make the queue temporarily unavailable to publish for a short period of time before a new extent is created. To avoid publish latency spikes during failures, a queue normally sets a minimum number of extents so that publish can continue when one extent is being sealed and a new one created.

We choose to use sealing as a recovery mechanism because it is simple to implement. The tradeoff here is that duplicates can occur. The reason for the duplicates is that after a failure, the replica tails will contain messages not acked to the publisher, and it is not possible to determine which messages are unacked, if the input host has failed. Thus, in the read path, we will have to deliver everything, including these unacked messages. Publishers generally retry when failed to enqueue a message, so some of these messages may be republished in a new extent, which causes consumers to receive duplicates.

2. Scaling of Writes

Extents within Cherami are shared-nothing substreams. Cherami observes the throughput on each extent. As write load to a particular queue increases and some extents exceed their throughput limit, Cherami creates additional extents for that queue automatically. The new extents receive part of the write load, alleviating the load on existing extents.

As write load decreases, Cherami seals some of the extents without replacing them with new ones. In this way Cherami reduces some overhead (memory, network connections, and other maintenance) required to maintain an open extent.

Autoscaling up and down

3. Consumption handling

Consumers in the same consumer group receive tasks from the same queue, but may receive from one or more extents. When a consumer receives a message and successfully processes it, the consumer replies to Cherami with an ack. If Cherami doesn’t get an ack after some configured amount of time, it redelivers the message to retry. A consumer’s ack can be delayed or missing  when a consumer crashes, when a downstream dependency is unavailable, when a single task takes too long, or when processing gets stuck because of a deadlock. A consumer can also negatively acknowledge, or nack , a message, triggering immediate redelivery. Nacks allow consumer groups to process tasks that some members are incapable of processing (e.g. because of local failures, partial/rolling upgrade of a consumer group to a new task schema).

Because different consumers can take varied amount of time to process messages, acks arrive at Cherami in a different order than the ordering provided by the replicas. Some messaging systems store the read/unread state (also known as the visibility state) per message. However, to do that we would need to update these states on disk (with random writes) and handle the complexity of doing this for each of multiple consumer groups.

Cherami takes a different approach. In each consumer group, for each extent, we maintain an ack offset , which is a message sequence number below which all messages have been acked. We have a role called output host that consumers connect to in order to receive deliveries. The output host reads messages from storage hosts sequentially, keeping them cached in memory. It keeps tracks of in-flight messages (delivered to consumer, but not yet acked) and updates the ack offset when possible. Output host also keeps track of timing and nacks so that messages can be redelivered to another consumer as necessary. In Cherami, one extent can be consumed simultaneously by multiple consumer groups, so multiple output hosts might read from the same extent.

Further, the system is configured to redeliver each message a limited number of times. If the redelivery limit is reached, the message is published to a dead letter queue and the message is marked as acked so that the ack offset can advance. This way, no “poison pill” messages block the processing of other messages in the queue. The consumer group owner can manually examine messages in the DLQ, and then handle them in one of two ways: purging them or merging them. Purging them deletes the messages, and is appropriate when they are invalid, or if the have no value (e.g. they were time-sensitive). The owner can otherwise merge them back to the consumer group, which is appropriate when the consumer software has been fixed to handle the messages that previously could not be handled, or when the transient failure condition has subsided.

Messages in Cherami are durably stored on disks. On the storage hosts, we chose RocksDB as our storage engine for performance and indexing features, and we use a separate RocksDB instance per extent with a shared LRU block cache. Messages are stored in the database with an increasing sequence number as the key, and the message itself as the value. Because the keys are always increasing, RocksDB optimizes its compaction so that we don’t suffer from write amplification. When output host reads messages from an extent, it simply seeks to the ack offset for the consumer group it’s serving, and iterates by the sequence number to read further messages.

With RocksDB, we can also easily implement timer queues, which are queues where each message is associated with a delay time. In such a case, the message is only delivered after the specified delay. For timer queues, we construct the key to contain the delivery time in high order bits, and sequence number in low order bits. Since RocksDB provides a sorted iterator, the keys are iterated in order of delivery time, while the sequence number of the lower bits ensures uniqueness of the keys:

System Architecture

Cherami consists of several different roles. In addition to the input, storage, and output roles we already introduced, there’s controller , and frontend . A typical Cherami deployment consists of several instances of each role:

Different roles can exist on the same physical host or even be linked into a single binary. At Uber, each role runs in an individual Docker container. Input, storage, and output form the data plane of the system. Controller and frontend handle control plane functions and metadata operations.

Controller is the grand coordinator, the intelligence that coordinates all of the other components. It primarily determines when to create and where to place (to which input and which storage hosts) an extent. It also determines which output hosts handle the consumption for a consumer group.

All data plane roles report load information to Controller via RPC calls. With this information, controller makes the placement decision and balance load. There are several instances of this controller role, one of them weakly elected the leader using Uber’s Ringpop library for gossip and consistent hashing. Ringpop also performs distributed health check and membership functions.

Frontend hosts expose TChannel – Thrift APIs that perform CRUD operations of queues and consumer groups. They also expose APIs for data plane routing purposes. When a producer wants to publish messages into a queue, it invokes the routing API to discover which input hosts contain the extents of the queue. Next, the producer connects to those input hosts using WebSocket connections and publishes messages in the established streams.

Similarly, when a consumer wants to consume messages from a queue, it first invokes the routing API to discover which output hosts manage the consumption of extents of the queue. Then, the producer connects to those output hosts using WebSocket connections and pulls messages. When new extents are created, Cherami sends back a notification to the producer and consumer so that they can connect to new extents. We developed client-side libraries to simplify these interactions.

Cassandra and Queueing

Finally, Cherami stores metadata on Cassandra , which is separately deployed. Metadata contains information about a queue, all its Extents, and all the Consumer Group information such as ACK offsets per Extent per Consumer Group. We chose Cassandra not only because Cassandra is a highly available data storage system, but also because of its tunable consistency model. Such flexibility allows us to offer queues that can be either partition tolerant while not order preserving (AP queues), or order preserving (CP queues) but not available in the minor partition during such a partition event. The main difference in the handling of two types of queues is whether Extent creation requires conditional update operation.

For AP queues, extent creation does not need Quorum-level consistency in Cassandra. When a network partition occurs, Extents can be created on both sides of the partition. Let’s call the partitions A and B. Producers in Partition A can publish into Extents in that partition, and Producers in Partition B can publish into Extents in Partition B. Therefore, writes are not blocked by network partition. For reads, Consumers in Partition A can only consume from Extents in that partition, and similar for Consumers in Partition B. However, when the network partition heals, Consumers are able to reach all Extents. The tradeoff here is that messages are eventually-consistent: it is not possible to establish a global ordering of messages because Extents can be created anytime, anywhere. In our implementation, we use Cassandra consistency level “ONE” when we write the Extent metadata.

For CP queues, Extent creation needs to be linearizable: in the case of a network partition, we must make sure that only one partition can create an Extent to succeed the previously sealed one. To ensure this, we use Cassandra’s lightweight transaction so that if at the same time more than one Extent is created for any reason, only one can be used for a CP queue.

Cherami, Summarized

Cherami is a competing-consumer messaging queue that is durable, fault-tolerant, highly available and scalable. We achieve durability and fault-tolerance by replicating messages across storage hosts, and high availability by leveraging the append-only property of messaging queues and choosing eventual consistency as our basic model. Cherami is also scalable, as the design does not have single bottleneck.

Cherami was designed and built from the ground up in about six months in our Seattle engineering office . Currently, Cherami transports many hundred millions of tasks durably per day among Uber Engineering’s many microservices , helping use cases such as post-trip processing, fraud detection, user notification, incentive campaigns, and many other use cases.

Cherami is completely written in Go , a language that makes building highly performant and concurrent system software a lot of fun. Additionally, Cherami uses several libraries that Uber has already open sourced: TChannel for RPC and Ringpop for health checking and group membership. Cherami depends on several third-party open source technologies: Cassandra for metadata storage, RocksDB for message storage, and many other third-party Go packages that are available on GitHub. We plan to open source Cherami in the near future.

Editor Update January 3 2017 : Cherami is now open sourced at the following links:  github.com/uber/cherami-server  &  github.com/uber/cherami-client-go .

Xu Ning is an engineering manager and co-wrote this article with Maxim Fateev, a staff software engineer . Both are based in Uber’s Seattle engineering office .

Photo Credits for Header: “ Paloma ” by Pablo Ibañez , licensed under CC-BY 2.0 . Image cropped for header dimensions and color corrected.

Photo Credits for intro pigeon image: United States Signal Corps via Smithsonian Institution , public domain.

Xu Ning

Xu Ning is a Senior Engineering Manager in Uber’s Seattle Engineering office, currently leading multiple development teams in Uber’s Michelangelo Machine Learning Platform. He previously led Uber's Cherami distributed task queue, Hadoop observability, and Data security teams.

Maxim Fateev

Maxim Fateev

Maxim Fateev is a staff software engineer at Uber.

Posted by Xu Ning, Maxim Fateev

Related articles

Image

Scaling AI/ML Infrastructure at Uber

March 28 / Global

Image

Model Excellence Scores: A Framework for Enhancing the Quality of Machine Learning Systems at Scale

March 21 / Global

Image

Balancing HDFS DataNodes in the Uber DataLake

March 14 / Global

Image

Load Balancing: Handling Heterogeneous Hardware

March 7 / Global

Image

Network IDS Ruleset Management with Aristotle v2

February 29 / Global

Most popular

Post thumbnail

Case study: Tri-Rail’s role in paving the way for effortless commuting

Post thumbnail

How Uber Serves Over 40 Million Reads Per Second from Online Storage Using an Integrated Cache

Post thumbnail

Building Scalable, Real-Time Chat to Improve Customer Experience

Post thumbnail

Resources for driving and delivering with Uber

Experiences and information for people on the move

Ordering meals for delivery is just the beginning with Uber Eats

Putting stores within reach of a world of customers

Transforming the way companies move and feed their people

Taking shipping logistics in a new direction

Moving care forward together with medical providers

Expanding the reach of public transportation

Explore how Uber employees from around the globe are helping us drive the world forward at work and beyond

Engineering

The technology behind Uber Engineering

Community support

Doing the right thing for cities and communities globally

Uber news and updates in your country

Product, how-to, and policy content—and more

Sign up to drive

Sign up to ride.

DEV Community

DEV Community

Yeqing (Marvin) Zhang

Posted on Oct 26, 2022

Golang in Action: How to implement a simple distributed system

Introduction.

Nowadays, many cloud-native and distributed systems such as Kubernetes are written in Go. This is because Go natively supports not only asynchronous programming but also static typing to ensure system stability. My open-source project Crawlab, a web crawler management platform, has applied distributed architecture. This article will introduce about how to design and implement a simple distributed system.

Before we start to code, we need to think about what we need to implement.

  • Master Node : A central control system, similar to a troop commander to issue orders
  • Worker Node : Executors, similar to soldiers to execute tasks

Apart from the concepts above, we would need to implement some simple functionalities.

  • Report Status : Worker nodes report current status to the master node.
  • Assign Task : Client makes API requests to the master node which assign tasks to worker nodes.
  • Execute Script : Worker nodes execute scripts from tasks.

The overall architectural diagram is as below.

Main Process Diagram

Node Communication

The communication between nodes are very important in distributed systems. If each node runs individually, it will be not necessary to use a distributed system. Therefore, the communication module is an essential part in distributed systems.

gRPC Protocol

First, let's think about how to make nodes to communicate with each other. The most common way is API, which yet has a drawback that it requires nodes to expose their IP addresses and ports to others, which is very insecure particularly in the public network. In light of that, we chose gRPC , a popular Remote Procedure Call (RPC) framework. We won't go deep into its technical details, but it is actually a mechanism to allow remote machines to execute commands by RPC callers.

To use gRPC, let's first create a file named go.mod and enter below content, then execute go mod download .

Then we create a Protocol Buffers file node.proto , a gRPC protocol file, and enter below content.

Here we have created two RPC service methods: one for reporting current status with a Simple RPC, another for assigning tasks with a Server-Side RPC. The difference between Simple RPC and Server-Side RPC is that Server-Side RPC can allow the server (master node in our case) to send data to the client (worker node) through a stream, but Simple RPC can only allow clients to call servers.

RPC Diagram

After .proto file is created, we need to compile it into .go code file so that it can be used by the Go program. Let's execute the command as below. (Note: the compiler protocol is not built-in and needs to be downloaded, please refer to https://grpc.io/docs/protoc-installation/ )

After it is executed, you can see two Go code files under the directory core , node.pb.go and node_grpc.pb.go respectively, which serve as the gRPC library.

gRPC Server

Now let's start writing server-side code.

Firstly create a new file core/node_service_server.go , and enter the content below. It implemented the two gRPC service methods created before. The channel CmdChannel in particular would transfer commands to be executed on worker nodes.

gRPC Client

We don't have to care too much about the implementation of gRPC client. Instead, we only need to call the methods in the gRPC client, and the rest of the service requests and response will be handled automatically by the program.

Master Node

After we implemented the node communication part, we can now write the master node, which is the core of the distributed system.

Let's create a new file node.go and enter the content below.

There are two placeholder methods Init and Start to be implemented.

In the initialization method Init , we will two things:

  • Regster gRPC services
  • Register API services

Now, we can add below code in Init method.

Here we created a gRPC server, and registered NodeServiceGrpcServer created before. We then use API framework gin to create a simple API service, which can allow POST request to /tasks to send commands to the channel CmdChannel and pass to NodeServiceGrpcServer . All pieces have been put together!

The starting method Start is quite simple, which is simplely to start the gRPC server and API server.

下一步,我们就要实现实际做任务的工作节点了。

Worker Node

现在,我们创建一个新文件 core/worker_node.go ,输入以下内容。

In the above code, we created the gRPC client and connected it to the gRPC server in Init method.

In Start method, we have done several things:

  • Report status with a Simple RPC method.
  • Assign tasks to acquire a stream with a Server-Side RPC method.
  • Continuously receive data from the server (master node) via the acquired stream and execute commands.

Now we have completed all core logics in the distributed systems.

Putting them all together

Finally, we need to encapsulate these core functionalities.

Create the main entry file main.go and enter the content below.

Now the simple distributed system is all done!

Final Results

We can then test the code.

Open two command prompts. Enter go run main.go master in one prompt to start the master node, and enter go run main.go worker to start the worker node in another.

If the master code starts successfully, you should be able to see the logs below.

For worker node, you can see logs like this.

After the master node and worker node have all started, we can open another command prompt to execute the command below to make an API call.

In the worker node logs, you should be able to see received command: touch /tmp/hello-distributed-system .

Then let's check if the file has been created by executing ls -l /tmp/hello-distributed-system .

The file was successfully created, which means the worker node has executed the task successfully. Hooray!

This article introduced a way to develop a simple distributed system written in Golang, with gRPC and built-in Go channel.

Core libraries and techniques:

  • Protocol Buffers

The code of the whole project is on GitHub: https://github.com/tikazyq/codao-code/tree/main/2022-10/go-distributed-system

Top comments (0)

pic

Templates let you quickly answer FAQs or store snippets for re-use.

Are you sure you want to hide this comment? It will become hidden in your post, but will still be visible via the comment's permalink .

Hide child comments as well

For further actions, you may consider blocking this person and/or reporting abuse

chigbeef_77 profile image

Golang's Attack On Memory (Go1.22 Update Issue)

Chig Beef - Feb 10

utpalnadiger profile image

Terraform drift detection and remediation - a primer

Utpal Nadiger - Feb 8

abhirockzz profile image

Run and test DynamoDB applications locally using Docker and Testcontainers

Abhishek Gupta - Feb 7

marcuskohlberg profile image

Using Pub/Sub for event-driven Go backends

Marcus Kohlberg - Feb 16

DEV Community

We're a place where coders share, stay up-to-date and grow their careers.

This package is not in the latest version of its module.

Go Client/Server for Celery Distributed Task Queue

Go Report Card

Having been involved in several projects migrating servers from Python to Go, I have realized Go can improve performance of existing python web applications. As Celery distributed tasks are often used in such web applications, this library allows you to both implement celery workers and submit celery tasks in Go.

You can also use this library as pure go distributed task queue.

Go Celery Worker in Action

Supported brokers/backends.

Now supporting both Redis ONLY!!

  • Redis (broker/backend)

Celery Configuration

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json .

Starting from version 4.0, Celery uses message protocol version 2 as default value. GoCelery does not yet support message protocol version 2, so you must explicitly set CELERY_TASK_PROTOCOL to 1.

GoCelery GoDoc has good examples. Also take a look at example directory for sample python code.

GoCelery Worker Example

Run Celery Worker implemented in Go

Python Client Example

Submit Task from Python Client

Python Worker Example

Run Celery Worker implemented in Python

GoCelery Client Example

Submit Task from Go Client

Sample Celery Task Message

Celery Message Protocol Version 1

Please let us know if you use gocelery in your project!

Contributing

You are more than welcome to make any contributions. Please create Pull Request for any changes.

The gocelery is offered under MIT license.

Documentation ¶

Package gocelery is Celery Distributed Task Queue in Go

Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go.

This package can also be used as pure go distributed task queue.

Supported brokers/backends

  • AMQP (broker/backend)

Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json.

WIP: NOTICE: this broker is NOT tested yet

  • func GetRealValue(val *reflect.Value) interface{}
  • func NewRedisPool(uri string) *redis.Pool
  • type AsyncResult
  • func (ar *AsyncResult) AsyncGet() (interface{}, error)
  • func (ar *AsyncResult) Clear() (err error)
  • func (ar *AsyncResult) Get(timeout time.Duration) (interface{}, error)
  • func (ar *AsyncResult) Ready() (bool, error)
  • type CeleryBackend
  • type CeleryBroker
  • type CeleryClient
  • func NewCeleryClient(broker CeleryBroker, backend CeleryBackend, numWorkers int) (*CeleryClient, error)
  • func (cc *CeleryClient) ClearResult(taskID string) error
  • func (cc *CeleryClient) Delay(task string, args ...interface{}) (*AsyncResult, error)
  • func (cc *CeleryClient) DelayJSON(task string, input interface{}) (*AsyncResult, error)
  • func (cc *CeleryClient) DelayJSONTo(queue, task string, input interface{}) (*AsyncResult, error)
  • func (cc *CeleryClient) DelayKwargs(task string, args map[string]interface{}) (*AsyncResult, error)
  • func (cc *CeleryClient) DelayKwargsTo(queue, task string, args map[string]interface{}) (*AsyncResult, error)
  • func (cc *CeleryClient) DelayTo(queue, task string, args ...interface{}) (*AsyncResult, error)
  • func (cc *CeleryClient) FindResult(taskID string) *AsyncResult
  • func (cc *CeleryClient) PollResults(handler func(string, interface{}), taskIDs ...string)
  • func (cc *CeleryClient) Register(name string, task interface{})
  • func (cc *CeleryClient) StartWorker(queues ...string) error
  • func (cc *CeleryClient) StartWorkerWithContext(ctx context.Context, queues ...string) (err error)
  • func (cc *CeleryClient) StopWorker() (err error)
  • func (cc *CeleryClient) WaitForStopWorker()
  • type CeleryDeliveryInfo
  • type CeleryMessage
  • func (cm *CeleryMessage) GetTaskMessage() *TaskMessage
  • type CeleryProperties
  • type CeleryTask
  • type CeleryWorker
  • func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int) *CeleryWorker
  • func (w *CeleryWorker) GetNumWorkers() int
  • func (w *CeleryWorker) GetTask(name string) interface{}
  • func (w *CeleryWorker) Register(name string, task interface{})
  • func (w *CeleryWorker) RunTask(message *TaskMessage) (*ResultMessage, error)
  • func (w *CeleryWorker) SetRateLimitPeriod(rate time.Duration) *CeleryWorker
  • func (w *CeleryWorker) StartWorker(queues ...string) error
  • func (w *CeleryWorker) StartWorkerWithContext(ctx context.Context, queues ...string) (err error)
  • func (w *CeleryWorker) StopWait()
  • func (w *CeleryWorker) StopWorker() (err error)
  • func (w *CeleryWorker) StopWorkerWithContext(ctx context.Context) (err error)
  • type NSQCeleryBroker
  • func NewNSQCeleryBroker(cfg *NSQConfig, channel string, queues ...string) *NSQCeleryBroker
  • func (nb *NSQCeleryBroker) GetTaskMessage() (message *TaskMessage, error error)
  • func (nb *NSQCeleryBroker) GetTaskMessageFrom(queue string) (message *TaskMessage, err error)
  • func (nb *NSQCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
  • func (nb *NSQCeleryBroker) SendCeleryMessageTo(queue string, message *CeleryMessage) error
  • type NSQConfig
  • func (c *NSQConfig) ToNSQConfig() *nsq.Config
  • type NSQHandler
  • func NewNSQHandler(h func(message *nsq.Message) error) *NSQHandler
  • func (h *NSQHandler) HandleMessage(message *nsq.Message) error
  • type QueueIterator
  • func NewQueueIterator(queues ...string) *QueueIterator
  • func (qi *QueueIterator) DefaultQueueName() string
  • func (cb *QueueIterator) Length() int
  • func (qi *QueueIterator) ListQueues() []string
  • func (qi *QueueIterator) NextQueueName() string
  • type RedisCeleryBackend
  • func NewRedisCeleryBackend(uri string) *RedisCeleryBackend
  • func (cb *RedisCeleryBackend) ClearResult(taskID string) error
  • func (cb *RedisCeleryBackend) GetResult(taskID string) (*ResultMessage, error)
  • func (cb *RedisCeleryBackend) SetResult(taskID string, result *ResultMessage) error
  • type RedisCeleryBroker
  • func NewRedisCeleryBroker(uri string, queues ...string) *RedisCeleryBroker
  • func (cb *RedisCeleryBroker) GetCeleryMessage() (msg *CeleryMessage, err error)
  • func (cb *RedisCeleryBroker) GetCeleryMessageFrom(queue string) (*CeleryMessage, error)
  • func (cb *RedisCeleryBroker) GetTaskMessage() (*TaskMessage, error)
  • func (cb *RedisCeleryBroker) GetTaskMessageFrom(queue string) (*TaskMessage, error)
  • func (cb *RedisCeleryBroker) SendCeleryMessage(message *CeleryMessage) error
  • func (cb *RedisCeleryBroker) SendCeleryMessageTo(queue string, message *CeleryMessage) error
  • type ResultMessage
  • type TaskMessage
  • func DecodeTaskMessage(encodedBody string) (*TaskMessage, error)
  • func (tm *TaskMessage) Encode() (string, error)
  • Package (Client)
  • Package (ClientWithNamedArguments)
  • Package (Worker)
  • Package (WorkerWithContext)
  • Package (WorkerWithNamedArguments)

Constants ¶

Variables ¶.

This section is empty.

Functions ¶

Func getrealvalue ¶.

GetRealValue returns real value of reflect.Value Required for JSON Marshalling

func NewRedisPool ¶

NewRedisPool creates pool of redis connections from given connection string

type AsyncResult ¶

AsyncResult represents pending result

func (*AsyncResult) AsyncGet ¶

AsyncGet gets actual result from backend and returns nil if not available

func (*AsyncResult) Clear ¶

Func (*asyncresult) get ¶.

Get gets actual result from backend It blocks for period of time set by timeout and returns error if unavailable

func (*AsyncResult) Ready ¶

Ready checks if actual result is ready

type CeleryBackend ¶

CeleryBackend is interface for celery backend database

type CeleryBroker ¶

CeleryBroker is interface for celery broker database

type CeleryClient ¶

CeleryClient provides API for sending celery tasks

func NewCeleryClient ¶

NewCeleryClient creates new celery client

func (*CeleryClient) ClearResult ¶

Func (*celeryclient) delay ¶.

Delay gets asynchronous result

func (*CeleryClient) DelayJSON ¶

Marshal args as json

func (*CeleryClient) DelayJSONTo ¶

Func (*celeryclient) delaykwargs ¶.

DelayKwargs gets asynchronous results with argument map

func (*CeleryClient) DelayKwargsTo ¶

Func (*celeryclient) delayto ¶, func (*celeryclient) findresult ¶, func (*celeryclient) pollresults ¶, func (*celeryclient) register ¶.

Register task

func (*CeleryClient) StartWorker ¶

StartWorker starts celery workers

func (*CeleryClient) StartWorkerWithContext ¶

StartWorkerWithContext starts celery workers with given parent context

func (*CeleryClient) StopWorker ¶

StopWorker stops celery workers

func (*CeleryClient) WaitForStopWorker ¶

WaitForStopWorker waits for celery workers to terminate

type CeleryDeliveryInfo ¶

CeleryDeliveryInfo represents deliveryinfo json

type CeleryMessage ¶

CeleryMessage is actual message to be sent to Redis

func (*CeleryMessage) GetTaskMessage ¶

GetTaskMessage retrieve and decode task messages from broker

type CeleryProperties ¶

CeleryProperties represents properties json

type CeleryTask ¶

CeleryTask is an interface that represents actual task Passing CeleryTask interface instead of function pointer avoids reflection and may have performance gain. ResultMessage must be obtained using GetResultMessage()

type CeleryWorker ¶

CeleryWorker represents distributed task worker

func NewCeleryWorker ¶

NewCeleryWorker returns new celery worker

func (*CeleryWorker) GetNumWorkers ¶

GetNumWorkers returns number of currently running workers

func (*CeleryWorker) GetTask ¶

GetTask retrieves registered task

func (*CeleryWorker) Register ¶

Register registers tasks (functions)

func (*CeleryWorker) RunTask ¶

RunTask runs celery task

func (*CeleryWorker) SetRateLimitPeriod ¶

Func (*celeryworker) startworker ¶, func (*celeryworker) startworkerwithcontext ¶.

StartWorkerWithContext starts celery worker(s) with given parent context

func (*CeleryWorker) StopWait ¶

StopWait waits for celery workers to terminate

func (*CeleryWorker) StopWorker ¶

Func (*celeryworker) stopworkerwithcontext ¶, type nsqcelerybroker ¶, func newnsqcelerybroker ¶.

TODO: test NewNSQCeleryBroker creates new NSQCeleryBroker based on given config NOTE: DON'T USE IT NOW, IT IS WORKING IN PROGRESS!!!

func (*NSQCeleryBroker) GetTaskMessage ¶

Func (*nsqcelerybroker) gettaskmessagefrom ¶.

TODO: NOT TESTED YET TODO: optimize: cache consumer

func (*NSQCeleryBroker) SendCeleryMessage ¶

Func (*nsqcelerybroker) sendcelerymessageto ¶, type nsqconfig ¶, func (*nsqconfig) tonsqconfig ¶, type nsqhandler ¶, func newnsqhandler ¶, func (*nsqhandler) handlemessage ¶, type queueiterator ¶, func newqueueiterator ¶, func (*queueiterator) defaultqueuename ¶, func (*queueiterator) length ¶, func (*queueiterator) listqueues ¶, func (*queueiterator) nextqueuename ¶, type rediscelerybackend ¶.

RedisCeleryBackend is celery backend for redis

func NewRedisCeleryBackend ¶

NewRedisCeleryBackend creates new RedisCeleryBackend

func (*RedisCeleryBackend) ClearResult ¶

Func (*rediscelerybackend) getresult ¶.

GetResult queries redis backend to get asynchronous result

func (*RedisCeleryBackend) SetResult ¶

SetResult pushes result back into redis backend

type RedisCeleryBroker ¶

RedisCeleryBroker is celery broker for redis

func NewRedisCeleryBroker ¶

NewRedisCeleryBroker creates new RedisCeleryBroker based on given uri

func (*RedisCeleryBroker) GetCeleryMessage ¶

GetCeleryMessage retrieves celery message from redis queue

func (*RedisCeleryBroker) GetCeleryMessageFrom ¶

Func (*rediscelerybroker) gettaskmessage ¶.

GetTaskMessage retrieves task message from redis queue

func (*RedisCeleryBroker) GetTaskMessageFrom ¶

Func (*rediscelerybroker) sendcelerymessage ¶.

SendCeleryMessage sends CeleryMessage to redis queue

func (*RedisCeleryBroker) SendCeleryMessageTo ¶

Type resultmessage ¶.

ResultMessage is return message received from broker

type TaskMessage ¶

TaskMessage is celery-compatible message

func DecodeTaskMessage ¶

DecodeTaskMessage decodes base64 encrypted body and return TaskMessage object

func (*TaskMessage) Encode ¶

Encode returns base64 json encoded string

Source Files ¶

  • gocelery.go
  • nsq_broker.go
  • queue_iter.go
  • redis_backend.go
  • redis_broker.go

Directories ¶

Keyboard shortcuts.

IMAGES

  1. A Reliable, simple & efficient distributed task queue in Golang

    go distributed task queue

  2. FOQS: Scaling a distributed priority queue

    go distributed task queue

  3. Cherami: Uber Engineering’s durable and scalable task queue in #golang

    go distributed task queue

  4. Daskqueue: Dask-based distributed task queue

    go distributed task queue

  5. Best of 2019: Implementing Message Queue in Kubernetes

    go distributed task queue

  6. Chained Task in the Distributed Task Queue Architecture

    go distributed task queue

VIDEO

  1. Celery Distributed Task Queue Display with Flower UI

  2. Task Queue Basics

  3. Distributed Job Scheduler

  4. 16. System Design

  5. What is Message Queueing? Message Queue explained

  6. LeetCode

COMMENTS

  1. Simple, reliable & efficient distributed task queue in Go

    Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started. Highlevel overview of how Asynq works: Client puts tasks on a queue. Server pulls tasks off queues and starts a worker goroutine for each task.

  2. Asynq: simple, reliable & efficient distributed task queue for your

    Introduction Hi, DEV friends! 😉 It's time to share a great find that you must try in your next project. I'm talking about simple, reliable and efficient distributed task queue written on Go and called Asynq.. I already have experience using Asynq in production on one of my work projects (microservice for sending scheduled messages to subscribers of Telegram bot).

  3. asynq package

    Simple, reliable & efficient distributed task queue in Go. Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started. Highlevel overview of how Asynq works: Client puts tasks on a queue

  4. Implementing a distributed task queue in Go

    In this tutorial, we will discuss the implementation of a distributed task queue in Go, a popular programming language for developing concurrent applications. A distributed task queue is a system that distributes tasks to multiple worker processes, which execute the tasks concurrently. This approach can significantly improve the performance and ...

  5. Celery

    Celery - Distributed Task Queue. ¶. Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system. It's a task queue with focus on real-time processing, while also supporting task scheduling. Celery has a large and diverse ...

  6. Tutorial: Asynq. Simple, reliable & efficient distributed task queue

    📖 Tutorial: Asynq. Simple, reliable & efficient distributed task queue for your next Go project. - koddr/tutorial-go-asynq

  7. Building a Distributed Task Scheduler in Go

    Implementing Coordinator-Led Task Assignment. The coordinator will fire queries on the database every few seconds to pick the tasks currently scheduled for execution. Let's see what such a database query may look like. The coordinator will start a transaction, and fire the following SELECT query.

  8. taskq package

    Uptrace is an open source and blazingly fast distributed tracing tool powered by OpenTelemetry and ClickHouse. Give it a star as well! Features. Redis, SQS, IronMQ, and in-memory backends. ... Package taskq implements task/job queue with Redis, SQS, IronMQ, and in-memory backends. ... queue.go; registry.go; storage.go; sysinfo_linux.go;

  9. A Reliable, simple & efficient distributed task queue in Golang

    The server pulls tasks off queues and starts a golang worker goroutine for each task. Tasks are processed concurrently by multiple workers. The task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to horizontal scaling and high availability.

  10. Simple, reliable & efficient distributed task queue in Go

    \n. Status: The library is currently undergoing heavy development with frequent, breaking API changes. \n\n. ☝️ Important Note: Current major version is zero (v0.x.x) to accommodate rapid development and fast iteration while getting early feedback from users (feedback on APIs are appreciated!The public API could change without a major version update before v1.0.0 release.

  11. gocelery package

    Overview. Package gocelery is Celery Distributed Task Queue in Go. Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go. This package can also be used as pure go distributed task queue. Supported brokers/backends.

  12. For those running Go in production at scale, what do you use ...

    Temporal is too smart for a distributed task queue. As the name already implies, it's about (time spanning) workflows. If you need a high throughput of small tasks that need to be run ASAP, a message broker is IMO the better choice (as others already suggested, NATS JetStream is nice with it's exactly once delivery mechanism).

  13. Golang Task Queue

    Golang asynchronous task queue with Redis, SQS, and IronMQ backends. Getting started . Familiar API. Using taskq, you can turn any Go function into an asynchronous task. Multiple backends. taskq supports Redis, Simple Queue Service (SQS), IronMQ, and in-memory backends. OpenTelemetry.

  14. 7 Actively Maintained Golang Libraries For Managing Work Queues

    Machinery is an asynchronous task queue/job queue based on distributed message passing. 𝐓𝐚𝐬𝐪𝐮𝐞𝐮𝐞. Tasqueue is a simple, lightweight distributed job/worker implementation in Go. Above libraries/packages have long-term support and a decent amount of community base. Thanks for reading this article so far.

  15. Cherami: Uber Engineering's Durable and Scalable Task Queue in Go

    Cherami is a distributed, scalable, durable, and highly available message queue system we developed at Uber Engineering to transport asynchronous tasks. We named our task queue after a heroic carrier pigeon with the hope that this system would be just as resilient and fault-tolerant, allowing Uber's mission-critical business logic components to depend on it for message delivery.

  16. GitHub

    Celery must be configured to use json instead of default pickle encoding. This is because Go currently has no stable support for decoding pickle objects. Pass below configuration parameters to use json. Starting from version 4.0, Celery uses message protocol version 2 as default value. GoCelery does not yet support message protocol version 2 ...

  17. queue package

    Queue. Queue is a Golang library for spawning and managing a Goroutine pool, allowing you to create multiple workers according to the limited CPU number of machines.

  18. go

    From "The Go programming Language" by Donovan and Kernighan (pag 233): Novices are sometimes tempted to use buffered channels within a single goroutine as a queue, lured by their pleasingly simple syntax, but this is a mistake .Channels are deeply connected to goroutine scheduling, and without another goroutine receiving from the channel, a sender—and perhaps the whole program—risks ...

  19. What is the best way to route tasks in machinery (go)?

    The correct way to route tasks is to set RoutingKey in the task's signature to the desired queue's name and use taskserver.NewCustomQueueWorker to get a queue specific worker object instead of taskserver.NewWorker. Sending a task to a specific queue: task := tasks.Signature{. Name: "<TASKNAME>",

  20. gocelery package

    Overview. Package gocelery is Celery Distributed Task Queue in Go. Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go. This package can also be used as pure go distributed task queue. Supported brokers/backends.

  21. Golang in Action: How to implement a simple distributed system

    We can then test the code. Open two command prompts. Enter go run main.go master in one prompt to start the master node, and enter go run main.go worker to start the worker node in another. If the master code starts successfully, you should be able to see the logs below.

  22. gocelery package

    Overview. Package gocelery is Celery Distributed Task Queue in Go. Celery distributed tasks are used heavily in many python web applications and this library allows you to implement celery workers in Go as well as being able to submit celery tasks in Go. This package can also be used as pure go distributed task queue. Supported brokers/backends.