Skip to content

Come posso evitare il "maxing out" della CPU: Metodo sincrono che chiama più worker in modo asincrono e throttling usando SemaphoreSlim?

Ciao utente del nostro sito web, abbiamo trovato la risposta a quello che stavi cercando, scorri e la troverai qui sotto.

Soluzione:

Non hai spiegato come vuoi limitare le chiamate simultanee. Si vogliono 30 task worker concomitanti in esecuzione, o si vogliono 30 chiamate WCF, ognuna delle quali ha tutti i propri task worker in esecuzione in modo concomitante, o si vuole che le chiamate WCF concomitanti abbiano ciascuna il proprio limite di task worker concomitanti? Dato che hai detto che ogni chiamata WCF ha solo 4 task worker e guardando il tuo codice di esempio, presumo che tu voglia un limite globale di 30 task worker simultanei.

In primo luogo, come suggerito da @mjwills, è necessario utilizzare SemaphoreSlim per limitare le chiamate a workerService.DoWorkAsync(). Il codice attualmente li avvia tutti e cerca solo di limitare il numero di chiamate da terminare. Suppongo che questo sia il motivo per cui la CPU è al massimo. Il numero di task worker avviati rimane illimitato. Si noti tuttavia che è necessario attendere il task worker mentre si tiene il semaforo, altrimenti si limiterà la velocità di creazione dei task, non il numero di task in esecuzione simultanea.

In secondo luogo, si sta creando un nuovo SemaphoreSlim per ogni richiesta WCF. Da qui la domanda del primo paragrafo. L'unico modo in cui questo strozza qualcosa è se si hanno più servizi worker rispetto al conteggio iniziale, che nel vostro esempio è 30, ma avete detto che ci sono solo 4 worker. Per avere un limite "globale", è necessario utilizzare un singleton SemaphoreSlim.

In terzo luogo, non si chiama mai .Release() su SemaphoreSlim, quindi se lo si rende un singleton, il codice si bloccherà dopo aver avviato 30 lavoratori dall'inizio del processo. Assicurarsi di farlo in un blocco try-finally, in modo che se il worker si blocca, venga comunque rilasciato.

Ecco un esempio di codice scritto frettolosamente:

public async Task ProcessAllPendingWork()
{
    var workerTasks = new List>();
    foreach(var workerService in _workerServices)
    {
        var workerTask = RunWorker(workerService);
        workerTasks.Add(workerTask);
    }

    await Task.WhenAll(workerTasks);
}

private async Task RunWorker(Func workerService)
{
    // use singleton semaphore.
    await _semaphore.WaitAsync();
    try
    {
        return await workerService.DoWorkAsync();
    }
    catch (System.Exception)
    {
        //assume error is a predefined logging service
        Log.Error(ex);
        return false; // ??
    }
    finally
    {
        _semaphore.Release();
    }
}

L'astrazione Task fornita da TPL (Task parallel library) è un'astrazione di Thread; I task vengono messi in lista in un pool di thread e poi eseguiti quando un esecutore può gestire la richiesta.

In altre parole, a seconda di alcuni fattori (il traffico, il rapporto tra CPU e IO e il modello di distribuzione), il tentativo di eseguire un task gestito nella funzione worker potrebbe non portare alcun beneficio (o in alcuni casi essere più lento).

Detto questo, vi suggerisco di usare Task.WaitAll (disponibile da .NET 4.0) che usa astrazioni di livello molto alto per gestire la concorrenza; in particolare questo pezzo di codice potrebbe essere utile per voi:

  • crea lavoratori e aspetta tutti
  • richiede 10 secondi per l'esecuzione (il Worker più lungo)
  • cattura e dà la possibilità di gestire le eccezioni
  • [last but not least] è un'API declerativa che focalizza l'attenzione su cosa fare e non su come fare.
public class Q57572902
{
    public void ProcessAllPendingWork()
    {
        var workers = new Action[] {Worker1, Worker2, Worker3};

        try
        {
            Task.WaitAll(workers.Select(Task.Factory.StartNew).ToArray());
            // ok
        }
        catch (AggregateException exceptions)
        {
            foreach (var ex in exceptions.InnerExceptions)
            {
                Log.Error(ex);
            }
            // ko
        }
    }

    public void Worker1() => Thread.Sleep(FromSeconds(5)); // do something

    public void Worker2() => Thread.Sleep(FromSeconds(10)); // do something

    public void Worker3() => throw new NotImplementedException("error to manage"); // something wrong

}

Ho visto dai commenti che si richiede un massimo di 3 worker in esecuzione nello stesso tempoe; in questo caso si può semplicemente copiare-incollare un file LimitedConcurrencyLevelTaskScheduler dalla documentazione di TaskScheduler.

Dopodiché bisogna creare un'istanza di sigleton TaskScheduler con il suo onw TaskFactory in questo modo:

public static class WorkerScheduler
{
    public static readonly TaskFactory Factory;

    static WorkerScheduler()
    {
        var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
        Factory = new TaskFactory(scheduler);
    }
}

Precedente ProcessAllPendingWork() il codice rimane lo stesso, tranne che per

...workers.Select(Task.Factory.StartNew)...

che diventa

...workers.Select(WorkerScheduler.Factory.StartNew)...

perché si deve usare il codice TaskFactory associato al vostro WorkerScheduler.

Se il worker deve restituire alcuni dati in risposta, gli errori e i dati devono essere gestiti in modo diverso, come segue:

public void ProcessAllPendingWork()
{
    var workers = new Func[] {Worker1, Worker2, Worker3};
    var tasks = workers.Select(WorkerScheduler.Factory.StartNew).ToArray();

    bool[] results = null;

    Task
        .WhenAll(tasks)
        .ContinueWith(x =>
        {
            if (x.Status == TaskStatus.Faulted)
            {
                foreach (var exception in x.Exception.InnerExceptions)
                    Log(exception);

                return;
            }

            results = x.Result; // save data in outer scope
        })
        .Wait();

    // continue execution
    // results is now filled: if results is null, some errors occured
}

A meno che non mi sfugga qualcosa, il codice di esempio esegue TUTTI i worker in parallelo. Quando si chiama 'workerService.DoWorkAsync()' il worker inizia il suo lavoro. 'RunWorkerTasks' attende solo il completamento del task del worker. 'DoWorkAsync()' dà il via all'operazione asincrona, mentre 'await' mette in pausa il metodo chiamante dall'esecuzione fino al completamento dell'attività attesa.

L'elevato utilizzo della CPU è probabilmente dovuto all'attività dei servizi worker e non al modo in cui vengono chiamati. Per verificarlo, provare a sostituire workerService.DoWorkAsync() con Thread.Sleep(..) o Task.Delay(..). Se l'utilizzo della CPU diminuisce, la colpa è dei worker. (A seconda di cosa fa workerService) potrebbe essere normale o addirittura previsto che il consumo di CPU aumenti una volta eseguiti in parallelo.

Veniamo alla domanda su come limitare l'esecuzione parallela. Si noti che il seguente esempio non utilizza esattamente 3 thread, ma al massimo 3 thread.

    Parallel.ForEach(
        _workerServices,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        workerService => workerService.DoWorkAsync()
            .ContinueWith(res => 
            {
                // Handle your result or possible exceptions by consulting res.
            })
            .Wait());

Poiché si è detto che in precedenza il codice veniva eseguito in modo sequenziale, presumo che anche i worker abbiano un equivalente non asincrono. Probabilmente è più facile usarli. Chiamare un metodo async in modo sincrono è per lo più una seccatura. Ho persino avuto scenari di deadlock semplicemente chiamando DoWorkAsync().Wait(). Si è discusso molto su Come posso eseguire un task asincrono in modo sincrono? In sostanza, cerco di evitarlo. Se non è possibile, cerco di usare il metodo ContinueWith che aumenta la complessità, oppure AsyncHelper della discussione SO precedente.

    var results = new ConcurrentDictionary();
    Parallel.ForEach(
        _workerServices,
        new ParallelOptions { MaxDegreeOfParallelism = 3 },
        workerService => 
            {
                // Handle possible exceptions via try-catch.
                results.TryAdd(workerService, workerService.DoWork());
            });
    // evaluate results

Parallel.ForEach sfrutta un ThreadPool o un TaskPool. In altre parole, distribuisce ogni esecuzione del parametro dato Action body su un thread dedicato. È possibile verificarlo facilmente con il codice seguente. Se Parallel.ForEach distribuisce già il lavoro su diversi thread, si può semplicemente eseguire l'operazione 'costosa' in modo sincrono. Qualsiasi operazione asincrona sarebbe inutile o addirittura avrebbe un impatto negativo sulle prestazioni di runtime.

    Parallel.ForEach(
        Enumerable.Range(1, 4),
        m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));

Questo è il progetto demo che ho usato per i test e che non si basa sul workerService.

    private static bool DoWork()
    {
        Thread.Sleep(5000);
        Console.WriteLine($"done by {Thread.CurrentThread.ManagedThreadId}.");
        return DateTime.Now.Millisecond % 2 == 0;
    }

    private static Task DoWorkAsync() => Task.Run(DoWork);

    private static void Main(string[] args)
    {
        var sw = new Stopwatch();
        sw.Start();

        // define a thread-safe dict to store the results of the async operation
        var results = new ConcurrentDictionary();

        Parallel.ForEach(
            Enumerable.Range(1, 4), // this replaces the list of workers
            new ParallelOptions { MaxDegreeOfParallelism = 3 },
            // m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
            m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());

        sw.Stop();

        // print results
        foreach (var item in results)
        {
            Console.WriteLine($"{item.Key}={item.Value}");
        }

        Console.WriteLine(sw.Elapsed.ToString());
        Console.ReadLine();
    }

Se hai dubbi e modi per aumentare il nostro articolo puoi scrivere un rapporto e lo analizzeremo con desiderio.



Utilizzate il nostro motore di ricerca

Ricerca
Generic filters

Lascia un commento

Il tuo indirizzo email non sarà pubblicato.