Home computickets Implementation Producer / Consumer Pattern

Implementation Producer / Consumer Pattern

Author

Date

Category

The PRODUCER / CONSUMER pattern is often found in multi-threaded programming. Its meaning is that one or several streams produce data, and in parallel to this one or more flows consume them.

How to correctly implement this pattern in popular programming languages? The task itself is nontrivial, since it includes synchronization between flows, and a potential race between several manufacturers and consumers.


Help.

Producing stream (or threads) is called “Manufacturer”, “Supplier” or simply “Producer”, consuming (s) – “Consumer” or “Consumer”.

Nontriviality of the problem is that it is potentially both creating new data and their consumption can occupy a long time, and I would like the processing to go without downtime, at the maximum possible speed.

Examples:

The data generated may represent the intensive task. In this case, it is reasonable to have a single generating stream, and several performing threads (for example, as much as in the processor core system, if a narrow processing place – calculations).

or the flows loading download data from the network, and at the end of the loading, the flocks are parsing the loaded data. In this case, it is reasonable to have one manufacturer on the site and, and limit the number of manufacturers if the limit of the network speed has been exhausted.


This question is the adaptation of the same study from the hashkode.


1, Authority 100%

Implementation on C #

For modern versions of the language (starting with C # 4.0), it makes sense not to write the implementation manually, but (guided by the @flammable advice), Use the class blockingcollection , representing the necessary functionality.

To read in Consumer-threads, we use simply cycles by the sequence that gives getConsumingNumerable () . In Producer-flows, we use Add , and at the end do not forget CompleteAdding so that Consumer-flows can stop.

Example:

class program
{
  Static Public Void Main ()
  {
    New Program (). Run ();
  }
  BlockingCollection & lt; String & GT; Q = New BlockingCollection & LT; String & GT; ();
  Void Run ()
  {
    Var Threads = New [] {New Thread (Consumer), New Thread (Consumer)};
    Foreach (Var T in Threads)
      t.start ();
    String S;
    While ((S = Console.ReadLine ()). length! = 0)
      q.add (s);
    q.completeadding (); // Stop
    Foreach (Var T in Threads)
      t.join ();
  }
  Void Consumer ()
  {
    foreach (var s in q.getconsumingenumerable ())
    {
      Console.WriteLine ("Processing: {0}", s);
      Thread.Sleep (2000);
      Console.Writeline ("Processed: {0}", s);
    }
  }
}

BlockingCollection & LT; T & GT; allows limit the number of elements , so an attempt to add an item into the crowded queue can also be blocked before the release of the place.

Note that getconsumingenumerable correctly works even in the case when you have a lot of conferences. It is not so obvious.


If you work with the old version of C #, you will have to write the desired functionality manually. You can use the built-in monitor class (which is an analogue of Mutex + Condition Variable from Pthreads).

Public Class ProducerConsumer & LT; T & GT; Where T: Class
{
  OBJECT MUTEX = NEW OBJECT ();
  Queue & lt; T & GT; Queue = New Queue & lt; T & GT; ();
  Bool isDead = false;
  Public Void Enqueue (T Task)
  {
    If (Task == NULL)
      Throw New ArgumentNullexception ("Task");
    Lock (Mutex) 
{
      if (ISDEAD)
        Throw New InvalidOperationException ("Queue Already Stopped");
      Queue.enqueue (Task);
      Monitor.pulse (mutex);
    }
  }
  Public T dequeue ()
  {
    Lock (Mutex)
    {
      While (queue.count == 0 & amp; & amp;! isDead)
        Monitor.Wait (Mutex);
      if (queue.count == 0)
        RETURN NULL;
      Return queue.dequeue ();
    }
  }
  Public Void Stop ()
  {
    Lock (Mutex)
    {
      isDead = True;
      Monitor.Pulseall (Mutex);
    }
  }
}

Use (similar example):

class program
{
  Static Public Void Main ()
  {
    New Program (). Run ();
  }
  PRODUCERCONSUMER & LT; STRING & GT; Q = New ProducerConsumer & LT; String & GT; ();
  Void Run ()
  {
    Var Threads = New [] {New Thread (Consumer), New Thread (Consumer)};
    Foreach (Var T in Threads)
      t.start ();
    String S;
    While ((S = Console.ReadLine ()). length! = 0)
      q.enqueue (s);
    Q.Stop ();
    Foreach (Var T in Threads)
      t.join ();
  }
  Void Consumer ()
  {
    While (True)
    {
      String S = Q.Dequeue ();
      if (s == null)
        Break;
      Console.WriteLine ("Processing: {0}", s);
      Thread.Sleep (2000);
      Console.Writeline ("Processed: {0}", s);
    }
  }
}

2, Authority 84%

Sales on C with the PTHREADS library

in c, according to the spirit of the language, there are no built-in high-level synchronized collections. Probably the most popular and widely used library that implements multithreading is pthreads. With its help, the pattern can be implemented as follows:

# include & lt; pthread.h & gt;
// declare the data structure for one task
Struct Producer_Consumer_Queue_Item {
 struct producer_consumer_queueue_item * Next;
 // here are actually data. You can change this piece,
 // Using a structure, more specific to your task
 Void * Data;
};
// declare a queue with additional structures for synchronization.
// in this queue will be stored manufactured, but not yet required tasks.
STRUCT PRODUCER_CONSUMER_QUEUE {
 struct producer_consumer_queuee_item * head, * tail;
               // Head == tail == 0, if the queue is empty
 pthread_mutex_t lock; // Mutex for all manipulations with queue
 pthread_cond_t cond; // This COND "Signal", when the queue was not empty
 int is_alive; // shows, did not finish his turn
};

Now we need procedures for adding and extracting tasks from the queue.

void
Enqueue (void * data, struct product_consumer_queue * q)
{
 // Pack the task to a new structure
 struct producer_consumer_queue_item * p = (typeof (P)) Malloc (SizeOF (* P));
 P- & GT; Data = DATA;
 p- & gt; next = 0;
 // Get "Exclusive" access to the task queue
 pthread_mutex_lock (& ​​amp; Q- & GT; Lock);
 // ... and add a new task there:
 if (q- & gt; tail)
  Q- & GT; Tail- & gt; next = P;
 ELSE {
  Q- & GT; Head = P;
  // queue was empty, and now no - you need to wake consumers
  pthread_cond_broadcast (& amp; Q- & GT; COND);
 }
 Q- & gt; tail = p;
 // Allow access to everyone again
 pthread_mutex_unlock (& ​​amp; Q- & gt; lock);
}
void *
dequeue (Struct Producer_Consumer_Queue * Q)
{
 // Get exclusive access to the queue:
 pthread_mutex_lock (& ​​amp; Q- & GT; Lock);
 While (! Q- & gt; Head & amp; & amp; q- & gt; is_alive) {
  // Queue is empty, there is nothing to do, we wait ...
  pthread_cond_wait (& amp; Q- & GT; COND, & AMP; Q- & GT; LOCK);
  // Wait allows access to others for waiting time
 }
 // remember the current element or 0 if the queue died
 struct producer_consumer_queue_item * p = Q- & gt; head;
 if (P)
 {
  // and remove it out of the queue
  Q- & GT; Head = Q- & GT; head- & gt; next;
  If (! Q- & GT; Head) 
q- & gt; tail = q- & gt; head;
 }
 // return exclusive access to other participants
 pthread_mutex_unlock (& ​​amp; q- & gt; lock);
 // we return data
 void * data = p? p- & gt; data: 0; // 0 means that the data will no longer be
 free (p);
 return data;
}

More need a procedure to initialize the queue:

struct producer_consumer_queue *
producer_consumer_queue_create ()
{
 struct producer_consumer_queue * q = (typeof (q)) malloc (sizeof (* q));
 q- & gt; head = q- & gt; tail = 0;
 q- & gt; is_alive = 1;
 pthread_mutex_init (& amp; q- & gt; lock, 0);
 pthread_cond_init (& amp; q- & gt; cond, 0);
 return q;
}

And the procedure for closing the line:

void
producer_consumer_queue_stop (struct producer_consumer_queue * q)
{
 // to access the shared variables requires exclusive access
 pthread_mutex_lock (& ​​amp; q- & gt; lock);
 q- & gt; is_alive = 0;
 pthread_cond_broadcast (& amp; q- & gt; cond);
 pthread_mutex_unlock (& ​​amp; q- & gt; lock);
}

Well, we have all that we need.

How to use it? It is necessary: ​​

  • run multiple potokov- “producers” and a few “consumers»
  • to come up with a data structure for the job

Example: (manufacturer – main stream consumers – stream 2)

// is the consumer thread
void *
consumer_thread (void * arg)
{
 struct producer_consumer_queue * q = (typeof (q)) arg;
 for (;;) {
  void * data = dequeue (q);
  // this is a signal that all is over
  if (! data)
   Break; // then it's time to close the stream
  char * str = (char *) data;
  // here is our data processing
  printf ( "consuming:% s \ n", str);
  sleep (2);
  printf ( "consumed:% s \ n", str);
  free (str);
 }
 Return 0;
}
int
main ()
{
 pthread_t consumer_threads [2];
 void * res = 0;
 char * in = NULL;
 size_t sz;
 // create a queue:
 struct producer_consumer_queue * q = producer_consumer_queue_create ();
 // potoki- and "consumers"
 pthread_create (& amp; consumer_threads [0], 0, consumer_thread, (void *) q);
 pthread_create (& amp; consumer_threads [1], 0, consumer_thread, (void *) q);
 // main loop
 // get data from the keyboard:
 while (getline (& amp; in, & amp; sz, stdin) & gt; 0) {
  enqueue (in, q);
  in = NULL;
 }
 producer_consumer_queue_stop (q);
 if (pthread_join (consumer_threads [0], & amp; res) ||
   pthread_join (consumer_threads [1], & amp; res))
  perror ( "join");
 return (long) res;
}

This is an implementation problem with the “infinite” burst. In practice, it is sometimes (or almost always?) Is more useful to limit the size of the queue and thus balance the manufacturers speed, sometimes translating them into a dormant state, with the possibilities of consumers.

To do this, a little change our producer_consumer_queue

struct producer_consumer_queue {
 struct producer_consumer_queue_item * head, * tail;
               // head == tail == 0 if the queue is empty
 pthread_mutex_t lock; // mutex for all manipulations of the queue
 pthread_cond_t condp; // this the cond "signals" when the turn became NOT EMPTY
 pthread_cond_t condc; // this the cond "signals" that will appear in the queue FREE SPACE
 int is_alive; // shows whether the queue has not finished its work
 int max, cnt, // maximum size of the queue and the number of jobs in it
  pqcnt; // number of manufacturers are waiting in the queue space
};

Add the pthread_cond_t condc for the “sleep / wake” producers flows, their counter in the queue for sending messages and a couple of variables containing the maximum size of the queue and the current number of jobs in it.

Accordingly, the functions are changed to set the task queue (enqueue ), sampling it out of the queue (dequeue ), queue initialization (product_consumer_queue_create ) and Its stops (product_consumer_queue_stop ):

void
Enqueue (Void * Data, Struct Producer_Consumer_Queue * AQ)
{
 volatile struct product_consumer_queue * q = aq;
 // Pack the task to a new structure
 struct producer_consumer_queue_item * p = (typeof (P)) Malloc (SizeOF (* P));
 P- & GT; Data = DATA;
 p- & gt; next = 0;
 // Get "Exclusive" access to the task queue
 pthread_mutex_lock (& ​​amp; AQ- & GT; LOCK);
 // Check if it is not overflowing
 If (Q- & GT; MAX & LT; = Q- & GT; CNT) {
  Q- & GT; PQCNT ++;
  ASM VOLATILE ("" ::: "Memory");
  // Fix the change in the queue in memory
  // We will wait for consumers of it slightly empty
  While (Q- & gt; max & lt; = q- & gt; CNT & amp; Q- & GT; is_alive)
   pthread_cond_wait (& amp; AQ- & GT; CONDC, & AMP; AQ- & GT; LOCK);
  Q- & GT; PQCNT--;
  ASM VOLATILE ("" ::: "Memory");
 }
 // ... and add a new task there:
 if (q- & gt; tail)
  Q- & GT; Tail- & gt; next = P;
 ELSE {
  Q- & GT; Head = P;
  // queue was empty, and now no - you need to wake consumers
  pthread_cond_broadcast (& amp; AQ- & GT; CONDP);
 }
 Q- & gt; tail = p;
 Q- & GT; CNT ++;
 ASM VOLATILE ("" ::: "Memory");
 // Allow access to everyone again
 pthread_mutex_unlock (& ​​amp; aq- & gt; lock);
}
void *
Dequeue (Struct Producer_Consumer_Queue * AQ)
{
 volatile struct product_consumer_queue * q = aq;
 // Get exclusive access to the queue:
 pthread_mutex_lock (& ​​amp; AQ- & GT; LOCK);
 if (q- & gt; pqcnt & amp; & amp; q- & gt; max & gt; q- & gt; cnt)
  // in line there is a place, and someone sleeps, wake them
  pthread_cond_broadcast (& amp; AQ- & GT; CONDC);
 While (! Q- & gt; Head & amp; & amp; q- & gt; is_alive) {
  // Queue is empty, there is nothing to do, we wait ...
  pthread_cond_wait (& amp; AQ- & GT; CONDP, & AMP; AQ- & GT; LOCK);
  // Wait allows access to others for waiting time
 }
 // remember the current element or 0 if the queue died
 struct producer_consumer_queue_item * p = Q- & gt; head;
 if (p) {
  // and remove it out of the queue
  Q- & GT; Head = Q- & GT; head- & gt; next;
  If (! Q- & GT; Head)
   Q- & gt; tail = q- & gt; head;
  Q- & GT; CNT--;
  ASM VOLATILE ("" ::: "Memory");
  // Fix the change in the queue in memory
  // Wake up suppliers in their turn
  pthread_cond_broadcast (& amp; AQ- & GT; CONDC);
 }
 // Return exclusive access to other participants
 pthread_mutex_unlock (& ​​amp; aq- & gt; lock);
 // Replace the data
 void * data = p? P- & GT; Data: 0; // 0 means that there will be no more data
 FREE (P);
 RETURN DATA;
}
Struct Producer_Consumer_Queue *
Producer_Consumer_Queue_Create (Int Max)
{
 struct product_consumer_queue * q = (typeof (q)) Malloc (SizeOF (* Q));
 Q- & GT; Head = Q- & gt; tail = 0;
 q- & gt; is_alive = 1;
 Q- & GT; Max = Max;
 Q- & GT; CNT = 0;
 Q- & gt; pqcnt = 0;
 pthread_mutex_init (& amp; Q- & GT; Lock, 0);
 pthread_cond_init (& amp; q- & gt; condc, 0);
 pthread_cond_init (& amp; q- & gt; condp, 0);
 Return Q;
}
// and the procedure for closing the queue:
Void.
Producer_Consumer_Queue_Stop (Struct Producer_Consumer_Queue * AQ)
{
 volatile struct product_consumer_queue * q = aq;
 // For accessing shared variables, exclusive access is required.
 pthread_mutex_lock (& ​​amp; AQ- & GT; LOCK);
 Q- & gt; is_alive = 0;
 ASM VOLATILE ("" ::: "Memory");
 pthread_cond_broadcast (& amp; AQ- & GT; CONDC);
 pthread_cond_broadcast (& amp; AQ- & GT; CONDP);
 pthread_mutex_unlock (& ​​amp; aq- & gt; lock);
}

Here shows Memory Barrier (ASM VOLATILE ("" ::: "Memory"); ), the use of which prohibits the compiler to reordering read-write operations from RAM.

This implementation does not provide “ordering” manufacturers awaiting their queue to send a message. Those. The flow of the manufacturer, the “swept” first due to the lack of a free space in the queue does not necessarily wake up first.

If this behavior does not suit us, you will have to make some changes to our data, first of all adding the queue of suppliers from the product_queue_item structure (which will be part of the structure>product_consumer_queue structure.

Get the following data structures:

// declare data structure for one job
Struct Producer_Consumer_Queue_Item {
 struct producer_consumer_queueue_item * Next;
 // here are actually data. You can change this piece,
 // Using a structure, more specific to your task
 Void * Data;
};
// Data String for Sleeping (Waiting Free Place) Producer Flow
struct producer_queueue_item {
 struct producer_queue_item * next;
 struct producer_consumer_queueue_item * item; // data for which there is no place
 pthread_cond_t cond; // This COND "Signal", when the place appeared in the queue
#if Debug.
 PID_T TID; // Linux Thread ID for Debug Print
 int signaled; // Wake-up Indicator for Debug Print
#Endif
};
// declare data queue with additional synchronization structures.
// in this queue will be stored manufactured, but not yet required tasks.
STRUCT PRODUCER_CONSUMER_QUEUE {
 struct producer_consumer_queuee_item * head, * tail;
               // Head == tail == 0, if the queue is empty
 pthread_mutex_t lock; // Mutex for all manipulations with queue
 pthread_cond_t cond; // This COND "Signal", when the queue was not empty
 int is_alive; // shows, did not finish his turn
 INT MAX, CNT; // Maximum queue size and number of tasks in it
 // TIME OF PRODUCTS OF PRODUCTS WAITING WAITING FOR HIS DATA DATA
 struct producer_queuee_item * pqhead,
  * PQTail;
};

and the implementation of basic functions:

void
Enqueue (void * data, struct product_consumer_queue * q)
{
 Voltile struct producer_consumer_queue * vq = q;
 // Pack the task to a new structure
 struct producer_consumer_queue_item * p = (typeof (P)) Malloc (SizeOF (* P));
 P- & GT; Data = DATA;
 p- & gt; next = 0;
 // Get "Exclusive" access to the task queue
 pthread_mutex_lock (& ​​amp; Q- & GT; Lock);
#if Debug.
 PrintF ("% LD (CNT:% d) --- & gt;% s", (long) gettid (), VQ- & gt; CNT, (char *) (P- & GT; DATA));
#Endif
 // ... and add a new task there:
 if (VQ- & gt; max & lt; = VQ- & gt; CNT || VQ- & gt; pqtail) {// Manufacturer must wait
#if Debug.
  If (VQ- & GT; CNT & LT; VQ- & GT; MAX) {
   Puts ("========================");
   Print_Queue (Q, 0);
   Puts ("========================");
  }
#Endif
  struct product_queueue_item * pq = (TypeOf (PQ)) Malloc (SizeOF (* PQ));
  pthread_cond_init (& amp; pq- & gt; cond, 0); // COND on which it will be wondered
  PQ- & gt; next = 0;
  PQ- & GT; Item = P; // Save data for sleep time
#if Debug.
  pq- & gt; tid = gettid ();
#Endif
  // Position yourself in the queue of sleeping manufacturers
  if (VQ- & GT; PQTail)
   VQ- & gt; pqtail- & gt; next = pq;
  ELSE.
   VQ- & gt; pqhead = pq;
  VQ- & gt; pqtail = pq;
  ASM VOLATILE ("" ::: "Memory");
  // Fix the change in the queue in memory
#if Debug.
  int at = 0; // Awakening Cycles Counter
#Endif
  do {// let's go to sleep before the appearance of free space in the data queue
#if Debug.
   PrintF ("% LD PROD COND WAIT (CNT:% D AT:% D)% s",
     (long) gettid (), VQ- & gt; CNT, AT ++, (char *) (P- & GT; DATA));
   PQ- & gt; Signaled = 0;
#Endif.
   pthread_cond_wait (& amp; PQ- & GT; COND, & AMP; Q- & GT; LOCK);
  } While (VQ- & gt; max & lt; = VQ- & gt; CNT & amp; & amp; vq- & gt; is_alive);
  // woke up and own the queue
  / *
   Here is a subtle moment. The stream activation procedure is not defined,
   And we need to follow the order of data.
   Therefore, reinstall local variables from the queue,
   Although it can be the data laid there with another stream.
  * /
#if Debug.
  if (PQ! = VQ- & GT; PQHEAD) {
   Printf ("Baaad% LD (CNT:% d at:% d)% s",
     (long) gettid (), VQ- & gt; CNT, AT, (char *) (P- & GT; DATA));
   Print_Queue (Q, 0);
   if (VQ- & gt; is_alive) 
EXIT (1); // really bad, this should not be
   ELSE.
  puts ( "CONTINUE");
  }
#Endif
  pq = vq- & gt; pqhead; // in any case take the head producers of the queue
  if ((vq- & gt; pqhead = pq- & gt; next) == 0) // and remove it
   vq- & gt; pqtail = 0;
  asm volatile ( ""::: "memory");
  p = pq- & gt; item;
  free (pq);
#if DEBUG
  printf ( "% ld prod enqueued after wait (cnt:% d at:% d)% s",
    (Long) gettid (), vq- & gt; cnt, at, (char *) (p- & gt; data));
#Endif
 }
 // here really put the data in a queue for consumers
 if (vq- & gt; tail)
  vq- & gt; tail- & gt; next = p;
 ELSE {
  vq- & gt; head = p;
  // queue was empty, and now no - you need to wake consumers
  pthread_cond_broadcast (& amp; Q- & GT; COND);
 }
 vq- & gt; tail = p;
 vq- & gt; cnt ++;
 asm volatile ( ""::: "memory");
 // Reset the queue changes in memory
 // Allow access to everyone again
 pthread_mutex_unlock (& ​​amp; Q- & gt; lock);
}
#if DEBUG
#define cond_signal_producer (q) ({\
   if ((q) - & gt; pqhead) {\
   (Q) - & gt; pqhead- & gt; signaled = 1; \
   pthread_cond_signal (& amp; (q) - & gt; pqhead- & gt; cond); \
   } \
  })
#ELSE.
#define cond_signal_producer (q) ({\
   if ((q) - & gt; pqhead) \
   pthread_cond_signal (& amp; (q) - & gt; pqhead- & gt; cond); \
  })
#Endif
void *
dequeue (Struct Producer_Consumer_Queue * Q)
{
 volatile struct producer_consumer_queue * vq = q;
 // Get exclusive access to the queue:
 pthread_mutex_lock (& ​​amp; Q- & GT; Lock);
 // if there is a dormant manufacturers, the wake of the first
 cond_signal_producer (vq);
 while (vq- & gt;! head & amp; & amp; vq- & gt; is_alive) {
  // Queue is empty, there is nothing to do, we wait ...
  pthread_cond_wait (& amp; Q- & GT; COND, & AMP; Q- & GT; LOCK);
  // Wait allows access to others for waiting time
 }
 // remember the current element or 0 if the queue died
 struct producer_consumer_queue_item * p = vq- & gt; head;
 if (p) {
  // and remove it out of the queue
  vq- & gt; head = vq- & gt; head- & gt; next;
  if (vq- & gt;! head)
   vq- & gt; tail = vq- & gt; head;
  vq- & gt; cnt--;
  asm volatile ( ""::: "memory");
  // Reset the queue changes in memory
  // wake of the first provider in their turn
  cond_signal_producer (vq);
 }
 // return exclusive access to other participants
 pthread_mutex_unlock (& ​​amp; Q- & gt; lock);
 // we return data
 void * data = p? p- & gt; data: 0; // 0 means that the data will no longer be
 // according 7.20.3.2/2, you can not check on the 0
 free (p);
 return data;
}
struct producer_consumer_queue *
producer_consumer_queue_create (int max)
{
 struct producer_consumer_queue * q = (typeof (q)) malloc (sizeof (* q));
 q- & gt; head = q- & gt; tail = 0;
 q- & gt; pqhead = q- & gt; pqtail = 0;
 q- & gt; is_alive = 1;
 q- & gt; max = max;
 q- & gt; cnt = 0;
 pthread_mutex_init (& amp; q- & gt; lock, 0);
 pthread_cond_init (& amp; q- & gt; cond, 0);
 return q;
}
// And the procedure to close the queue:
Void.
producer_consumer_queue_stop (struct producer_consumer_queue * q)
{
 volatile struct producer_consumer_queue * vq = q;
 // to access the shared variables requires exclusive access
 pthread_mutex_lock (& ​​amp; Q- & GT; Lock);
 vq- & gt; is_alive = 0;
 pthread_cond_broadcast (& amp; Q- & GT; COND); // wake consumers
 volatile struct producer_queue_item * pq;
 for (pq = vq- & gt; pqhead; pq; pq = pq- & gt; next) {
#if DEBUG
  pq- & gt; signaled = 1;
  asm volatile ( ""::: "memory");
#Endif
  // wake up each waiting producer
  pthread_cond_signal ((pthread_cond_t *) & amp; pq- & gt; cond);
 }
 pthread_mutex_unlock (& ​​amp; Q- & gt; lock);
}

All three programs (pq1.c, pq2.c and pq3.c) together with the function gettid () are http://pastebin.com/E23r9DZk . For the experiments, copy them to different files and compile, for example, gcc pq3.c -pthread gettid.o


3, authority 74%

The implementation in C #, Library Dataflow

Another alternative is the use of the Microsoft library DataFlow , which is actually created in order to manage data streams. To use the code in the examples, you need to connect the Microsoft.tpl.DataFlow nuget package. Class Bufferblock & LT; T & GT; is practically ready to manufacturer / consumer, but, unlike the blocking interface BlockingCollection & LT; T & GT; , it has an ASYNC interface!

For asynchronous adding task, the Supplier can use the Sendasync . Asynchronous addition is needed, since the queue may be limited, and therefore adding should be waiting for free space! At the end of the addition, you need to call COMPLETE .

async task productsingle (itargetblock & lt; string & gt; queue, int ho hoch)
{
  Random R = New Random ();
  While (Howmuch-- & gt; 0)
  {
    // I emulate long-term work on the preparation of the following task
    // Duration Choose by chance that the tasks come to
    // Unpredictable moments of time
    await task.delay (1000 * R.NEXT (1, 3));
    var v = string.format ("automatic {0}", r.next (1, 10));
    await queue.sendasync (v);
  }
  queue.complete ();
}

If you have several suppliers, you need to close the queue only when they all work:

async task products1 (itargetblock & lt; string & gt; queue, int hohmuch)
{
  Random R = New Random ();
  While (Howmuch-- & gt; 0)
  {
    await task.delay (1000 * R.NEXT (1, 3));
    var v = string.format ("automatic {0}", r.next (1, 10));
    await queue.sendasync (v);
  }
}
// Console.ReadLine () function - blocking, so we perform it asynchronously
// (otherwise it will block the caller)
// Console has no async interface.
Task & Lt; String & GT; READCONSOLE ()
{
  // Blocking function Unload in Thread Pool
  Return Task.run (() = & gt; console.ReadLine ());
}
ASYNC TASK PRODUCE2 (ITARGETBLOCK & LT; STRING & GT; QUEUE)
{
  String S;
  While ((s = await readconsole ()). Length! = 0)
    await queue.sendasync ("manual" + s);
}
Async Task Produceall (ITARGETBLOCK & LT; String & GT; Queue)
{
  VAR P1 = Produce1 (Queue, 20);
  VAR P2 = PRODUCE2 (Queue);
  Await Task.whenall (P1, P2);
  queue.complete ();
}

Now, consumer. If the consumer is only one, everything is simple:

async task consumesingle (isourceblock & lt; string & gt; queue)
{
  While (await queue.outpeutavaableAsync ())
    Console.Writeline (await queue.ReceiveAsync ());
}

For the case of several consumers to use ReceiveAsync – incorrectly, because Task can be taken out of the queue by another consumer ! There are no TryReceiveAsync functions, so after asynchronous finding out that the queue is not empty, we use tryReceive :

async task consumecooperative (IreceivablesourceBlock & lt; String & GT; Queue, Int Number)
{
  Random R = New Random ();
  While (await queue.outpeutavaableAsync ())
  {
    String V;
    // At this point, the data may already go to another consumer
    If (! queue.tryReceive (Out V))
      Continue; // continue to wait
    // Color output and other buns
    // I'm too lazy to synchronize the conclusion on the console, although of course it is a shared resource
    if (console.cursorleft! = 0)
      Console.Writeline ();
    var savedColor = Console.ForegroundColor;
    Console.ForeGroundColor = (Consolecolor) (Number + 8);
    Console.WriteLine (String.Format ("{0} [{1}]: {2}", 
New String ('', Number * 4), Number, V));
    Console.ForeGroundColor = SavedColor;
    // Simulate the long-term processing of the result by the client
    await task.delay (1000 * R.NEXT (1, 3));
  }
}
Task Consumeall (IReceIvableSourceblock & Lt; String & GT; Queue)
{
  VAR C1 = ConsumeCooperative (Queue, 1);
  VAR C2 = ConsumeCooperative (Queue, 2);
  Return Task.Whenall (C1, C2);
}

Stopping:

class program
{
  Static Void Main (String [] Args)
  {
    New Program (). runall (). Wait ();
  }
  ASYNC TASK RUNALL ()
  {
    Bufferblock & lt; String & GT; queue = new bufferblock & lt; string & gt; ();
    VAR P = PRODUCEALL (QUEUE);
    VAR C = Consumeall (queue);
    AWAIT TASK.WHENALL (P, C, QUEUE.COMPLTION);
  }
  // Other Methods
}

In its article async Producer / Consumer Queue Using DataFlow Stephen Cleary offers another approach, more in the DataFlow library. It contains symmetry between source blocks (isourceblock & lt; t & gt; ), receivers blocks (itargetblock & lt; t & gt; ), and converter blocks (ipropagatorblock & lt; tinput , TUTPUT & GT; ). In accordance with this ideology, we apply a block receiver for the supplier ActionBlock & lt; T & GT; :

task consume2 (isourceblock & lt; string & gt; queue, int number)
{
  var consumeroptions = new executiondataflowblockoptions {boundedcapacity = 1};
  var consumer = new actionsblock & lt; string & gt; (v = & gt; Consumeimpl2 (V, Number), Consumeroptions);
  var linkoptions = new dataflowlinkoptions {PropagateCompletion = True};
  queue.linkto (Consumer, LinkOptions);
  RETURN CONSUMER.COMPLTION;
}
Void Consumeimpl2 (String V, Int Number)
{
  Console.WriteLine (String.Format ("[{0}]: {1}", Number, V));
  Thread.sleep (1500);
}
Task Consumeall2 (isourceblock & lt; String & GT; Queue)
{
  var c1 = Consume2 (Queue, 1);
  var c2 = Consume2 (Queue, 2);
  Return Task.Whenall (C1, C2);
}

Why do you need boundedcapacity = 1 ? The cases are that by default ActionBlock & lt; T & GT; has an “unlimited” capacity, and thus will pay all the data from the queue. Thus, if we have introduced a limit on the volume of the queue

queue = new bufferblock & lt; string & gt; (new dataflowblockoptions {boundedcapacity = 20});

The data will still accumulate in ActionBlock E. In order for the role of the repository to be Bufferblock Ohm, and the consumer is ActionBlock , and it is necessary to limit it volume. Note also that the limit of the tank ActionBlock ‘A allows the DataFlow library to balance the load by sending data to the free block.

Note that in this case, in which context (flow pool? Dedicated stream?) is executed by ActionBlock , it is not controlled via the standard ASYNC / AWAIT mechanism, but by TASKSCHEDULER ‘And in the settings .

The approach with docking blocks of execution can quickly become too heavy, so I would advise you to use it only for quite complex tasks.


4, Authority 21%

Implementation on C #, Channels

How suggested in @aepot comments, starting with .NET Core 3.0 (and .NET 5) Another specialized tool appeared: Channel . Obviously, he came under the influence of the GO language , and describes, in fact, the same pattern: Producer / Consumer, but implemented more effectively (due to a smaller number of Fich).

Class Channel & Lt; T & GT; is, in fact, the finished implementation, from the box that supports an asynchronous interface, cancel via CancellationToken , multiple manufacturers / consumers and batch read / write.

The interface is very similar to the DataFlow library interface, and the translation of the code in simple cases will not be much difficulty.

The manufacturer has a function to write one WriteAsync element, and if we want to immediately write down several elements in a row, maybe it can be better to use WaittowRiteAsync and next synchronous trywrite in the cycle. At the end of the addition of elements, you need to use Complete to inform consumers that there is nothing more to wait.

The consumer has a Readasync function for reading one element, and the ReadALLASYNC function, which is similar to getConsumingEnumerable from blockingcollection & lt; t & gt; It gives a sequence of readable elements (but asynchronously), distributing them in all consumers. If you have elements can come in large parties, you may need a bundle waittoreadasync + cycle with tryread so as not to cause an asynchronous function when new tasks are likely to be present in the channel .

So, the manufacturer code.

Simple version:

async task productsimple (Channelwriter & lt; String & GT; Writer, Int Howmuch)
{
  Random R = new ();
  int n = 0;
  While (Howmuch-- & gt; 0)
  {
    // I emulate long-term work on the preparation of the following task
    // Duration Choose by chance that the tasks come to
    // Unpredictable moments of time
    await task.delay (1000 * R.NEXT (1, 3));
    await writer.writeasync ($ "simple {n ++}");
  }
}

option with multiple job sending:

async task product & lt; string & gt; writer, int ho hohmuch)
{
  Random R = new ();
  int n = 0;
  While (Howmuch & GT; 0)
  {
    await task.delay (1000 * R.NEXT (1, 3));
    // In reality, tasks usually come with packs, so we will send a lot
    await writer.waittowriteasync (); // it is not necessary if the volume of the queue is not limited
    // Random Putch size, you most likely this logic will not be needed, since.
    // data will come in groups naturally
    var batchsize = math.min (r.next (1, 5), howmuch);
    While (Batchsize-- & gt; 0)
    {
      // Prepare the task
      VAR V = $ "Batched {N}";
      If (Writer.trywrite (V))
      {
        N ++;
        --howmuch;
        ifmuch & lt; = 0)
          Break;
      }
      // Prepared, but not sent task must be saved
      // for the next iteration
    }
  }
}

Now the user code. The basic option is very simple:

async task consumesimple (channelreader & lt; string & gt; reader, int number)
{
  Random R = new ();
  AWAIT FOREACH (VAR V IN READER.REDALLASYNC ())
  {
    OUTPUT (Number, V);
    // Simulate long-term processing of results by the client
    await task.delay (1000 * R.NEXT (0, 3));
  }
}

If the processing of one task is fast, it makes sense to use the option in which we synchronously subtract all the applicable data over time:

async task consumebatches (channelreader & lt; string & gt; reader, int number)
{
  Random R = new ();
  While (await reader.waittoreadasync ())
  {
    // Pull all that is in the channel, synchronously
    // At this point, the data may already leave to another consumer,
    // so the cycle may turn out to be empty
    While (reader.tryread (Out VAR V))
      OUTPUT (Number, V);
    // Simulate long-term processing of results by the client
    await task.delay (1000 * R.NEXT (1, 3));
  }
}

Left Add Call and Run:

async task produceall (channelwriter & lt; string & gt; writer)
{
  VAR P1 = PRODUCESIMPLE (Writer, 20);
  VAR P2 = PRODUCEBATCHED (Writer, 25);
  Await Task.whenall (P1, P2);
  Writer.comPlete (); // Do not forget to complete the manufacturer!
}
Task Consumeall (ChannelReader & LT; String & GT; Reader)
{
  VAR C1 = Consumesimple (Reader, 1);
  VAR C2 = Consumebatches (Reader, 2);
  Return Task.Whenall (C1, C2);
}
ASYNC TASK RUNALL ()
{
  Channel & lt; String & GT; Channel = Channel.CreateUnBounded & lt; String & GT; ();
  Var P = Produceall (Channel.writer);
  VAR C = Consumeall (Channel.Reader);
  Await Task.whenall (P, C);
} 

Auxiliary function:

void output (int readernumber, String V)
{
  // Color output and other buns
  // I'm too lazy to synchronize the conclusion on the console, although of course it is a shared resource
  if (console.cursorleft! = 0)
    Console.Writeline ();
  var savedColor = Console.ForegroundColor;
  CONSOLE.FOREGROUNDCOLOR = (CONSOLECOLOR) (READERNUMBER + 8);
  Var Indent = New String ('', ReaderNumber * 4);
  Console.WriteLine ($ "{indent} [Reader # {ReaderNumber}]: {v}");
  Console.ForeGroundColor = SavedColor;
}

Well, everything must be packaged in the program:

class program
{
  Static ASYNC Task Main (String [] Args) = & gt; Await New Program (). runall ();
  // everything else

If you are friends with Channels, it is possible not to be inventing bicycles, it makes sense to use this library .


* How it is more efficient: one element or in the “batch” mode cannot be said in advance. Test both options under load specific to your application. For the option “one”, you may have an extra asynchronous code (no matter how optimized was readasync , it still needs to “twist” state state machine). For the option “package” processing you can overtake the problem when there are many standing consumers (or manufacturers) simultaneously wake up and only one Of these, it receives a resource . Profile!

Thank you @Pavel Mayorov for a useful discussion and examples from personal experience.

Programmers, Start Your Engines!

Why spend time searching for the correct question and then entering your answer when you can find it in a second? That's what CompuTicket is all about! Here you'll find thousands of questions and answers from hundreds of computer languages.

Recent questions