media: pisp-be: Split jobs creation and scheduling

Message ID 20240806143652.917735-1-jacopo.mondi@ideasonboard.com (mailing list archive)
State Superseded
Headers
Series media: pisp-be: Split jobs creation and scheduling |

Commit Message

Jacopo Mondi Aug. 6, 2024, 2:36 p.m. UTC
Currently the 'pispbe_schedule()' function does two things:

1) Tries to assemble a job by inspecting all the video node queues
   to make sure all the required buffers are available
2) Submit the job to the hardware

The pispbe_schedule() function is called at:

- video device start_streaming() time
- video device qbuf() time
- irq handler

As assembling a job requires inspecting all queues, it is a rather
time consuming operation which is better not run in IRQ context.

To avoid executing the time consuming job creation in interrupt
context, split the job creation and job scheduling in two distinct
operations. When a well-formed job is created, append it to the
newly introduced 'pispbe->job_queue' where it will be dequeued from
by the scheduling routine.

At start_streaming() and qbuf() time immediately try to schedule a job
if one has been created as the irq handler routine is only called when
a job has completed, and we can't solely rely on it for scheduling new
jobs.

Signed-off-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
---
 .../platform/raspberrypi/pisp_be/pisp_be.c    | 98 +++++++++++++------
 1 file changed, 69 insertions(+), 29 deletions(-)

--
2.45.2
  

Comments

Laurent Pinchart Aug. 11, 2024, 7:41 p.m. UTC | #1
Hi Jacopo,

Thank you for the patch.

On Tue, Aug 06, 2024 at 04:36:51PM +0200, Jacopo Mondi wrote:
> Currently the 'pispbe_schedule()' function does two things:
> 
> 1) Tries to assemble a job by inspecting all the video node queues
>    to make sure all the required buffers are available
> 2) Submit the job to the hardware
> 
> The pispbe_schedule() function is called at:
> 
> - video device start_streaming() time
> - video device qbuf() time
> - irq handler
> 
> As assembling a job requires inspecting all queues, it is a rather
> time consuming operation which is better not run in IRQ context.
> 
> To avoid executing the time consuming job creation in interrupt
> context, split the job creation and job scheduling in two distinct
> operations. When a well-formed job is created, append it to the
> newly introduced 'pispbe->job_queue' where it will be dequeued from
> by the scheduling routine.
> 
> At start_streaming() and qbuf() time immediately try to schedule a job
> if one has been created as the irq handler routine is only called when
> a job has completed, and we can't solely rely on it for scheduling new
> jobs.
> 
> Signed-off-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
> ---
>  .../platform/raspberrypi/pisp_be/pisp_be.c    | 98 +++++++++++++------
>  1 file changed, 69 insertions(+), 29 deletions(-)
> 
> diff --git a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> index 65ff2382cffe..ae8b79530e72 100644
> --- a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> +++ b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> @@ -190,9 +190,11 @@ struct pispbe_hw_enables {
> 
>  /* Records a job configuration and memory addresses. */
>  struct pispbe_job_descriptor {
> +	struct pispbe_buffer *buf[PISPBE_NUM_NODES];

Maybe bufs instead of buf, or even buffers.

>  	dma_addr_t hw_dma_addrs[N_HW_ADDRESSES];
>  	struct pisp_be_tiles_config *config;
>  	struct pispbe_hw_enables hw_enables;
> +	struct list_head queue;

I'd put this one at the top, it feels weird to have it here in the
middle.

>  	dma_addr_t tiles;
>  };
> 
> @@ -215,8 +217,10 @@ struct pispbe_dev {
>  	unsigned int sequence;
>  	u32 streaming_map;
>  	struct pispbe_job queued_job, running_job;
> -	spinlock_t hw_lock; /* protects "hw_busy" flag and streaming_map */
> +	/* protects "hw_busy" flag, streaming_map and job queue*/

s/job queue/job_queue /

> +	spinlock_t hw_lock;
>  	bool hw_busy; /* non-zero if a job is queued or is being started */
> +	struct list_head job_queue;
>  	int irq;
>  	u32 hw_version;
>  	u8 done, started;
> @@ -443,27 +447,32 @@ static void pispbe_xlate_addrs(struct pispbe_dev *pispbe,

While at it, in pispbe_xlate_addrs() there's a comment referencing
pispbe_schedule_internal(). That function doesn't exist, could you fix
it ?

>   * For Output0, Output1, Tdn and Stitch, a buffer only needs to be
>   * available if the blocks are enabled in the config.
>   *
> - * Needs to be called with hw_lock held.
> + * If all the buffers required to form a job are available, append the
> + * job descriptor to the job queue to be later queued to the HW.
>   *
>   * Returns 0 if a job has been successfully prepared, < 0 otherwise.
>   */
> -static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> -			      struct pispbe_job_descriptor *job)
> +static int pispbe_prepare_job(struct pispbe_dev *pispbe)
>  {
>  	struct pispbe_buffer *buf[PISPBE_NUM_NODES] = {};
> +	struct pispbe_job_descriptor *job;
> +	unsigned int streaming_map;
>  	unsigned int config_index;
>  	struct pispbe_node *node;
>  	unsigned long flags;
> 
> -	lockdep_assert_held(&pispbe->hw_lock);
> -
> -	memset(job, 0, sizeof(struct pispbe_job_descriptor));
> +	spin_lock_irqsave(&pispbe->hw_lock, flags);

Is _irqsave() the right version, now that this function isn't called
from interrupt context anymore ?

> +	streaming_map = pispbe->streaming_map;
> +	spin_unlock_irqrestore(&pispbe->hw_lock, flags);

Are you taking the lock here to honour the lock documentation, or is it
actually needed ?

> 
> -	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) &
> -		pispbe->streaming_map) !=
> -			(BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
> +	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) & streaming_map) !=
> +	    (BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
>  		return -ENODEV;

Another way to handle this could be

	scoped_guard(spinlock_irqsave)(&pispbe->hw_lock) {
		static const mask = BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE);

		if (pispbe->streaming_map & mask != mask)
			return -ENODEV;
	}

I think I'm starting to really like scoped guards.

I however wonder if we really need to keep this check. Isn't it enough
that buffers have been queued ? I also think we need to revisit when to
actually start and stop streaming, you remove queued jobs when all video
devices have been stopped, but only prepare them when both the config
and main input buffers are available. I think it would be better to take
links into account.

> 
> +	job = kzalloc(sizeof(*job), GFP_KERNEL);
> +	if (!job)
> +		return -ENOMEM;
> +
>  	node = &pispbe->node[CONFIG_NODE];
>  	spin_lock_irqsave(&node->ready_lock, flags);
>  	buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> @@ -471,13 +480,15 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
>  						    ready_list);
>  	if (buf[CONFIG_NODE]) {
>  		list_del(&buf[CONFIG_NODE]->ready_list);
> -		pispbe->queued_job.buf[CONFIG_NODE] = buf[CONFIG_NODE];
> +		job->buf[CONFIG_NODE] = buf[CONFIG_NODE];
>  	}
>  	spin_unlock_irqrestore(&node->ready_lock, flags);
> 
>  	/* Exit early if no config buffer has been queued. */
> -	if (!buf[CONFIG_NODE])
> +	if (!buf[CONFIG_NODE]) {
> +		kfree(job);
>  		return -ENODEV;
> +	}
> 
>  	config_index = buf[CONFIG_NODE]->vb.vb2_buf.index;
>  	job->config = &pispbe->config[config_index];
> @@ -498,7 +509,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
>  			continue;
> 
>  		buf[i] = NULL;
> -		if (!(pispbe->streaming_map & BIT(i)))
> +		if (!(streaming_map & BIT(i)))
>  			continue;
> 
>  		if ((!(rgb_en & PISP_BE_RGB_ENABLE_OUTPUT0) &&
> @@ -531,7 +542,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
>  						  ready_list);
>  		if (buf[i]) {
>  			list_del(&buf[i]->ready_list);
> -			pispbe->queued_job.buf[i] = buf[i];
> +			job->buf[i] = buf[i];
>  		}
>  		spin_unlock_irqrestore(&node->ready_lock, flags);
> 
> @@ -539,11 +550,13 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
>  			goto err_return_buffers;
>  	}
> 
> -	pispbe->queued_job.valid = true;
> -
>  	/* Convert buffers to DMA addresses for the hardware */
>  	pispbe_xlate_addrs(pispbe, job, buf);
> 
> +	spin_lock_irqsave(&pispbe->hw_lock, flags);
> +	list_add_tail(&job->queue, &pispbe->job_queue);
> +	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> +
>  	return 0;
> 
>  err_return_buffers:
> @@ -559,16 +572,15 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
>  		spin_unlock_irqrestore(&n->ready_lock, flags);
>  	}
> 
> -	memset(&pispbe->queued_job, 0, sizeof(pispbe->queued_job));
> +	kfree(job);
> 
>  	return -ENODEV;
>  }
> 
>  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
>  {
> -	struct pispbe_job_descriptor job;
> +	struct pispbe_job_descriptor *job;
>  	unsigned long flags;
> -	int ret;
> 
>  	spin_lock_irqsave(&pispbe->hw_lock, flags);
> 
> @@ -578,10 +590,18 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
>  	if (pispbe->hw_busy)
>  		goto unlock_and_return;
> 
> -	ret = pispbe_prepare_job(pispbe, &job);
> -	if (ret)
> +	job = list_first_entry_or_null(&pispbe->job_queue,
> +				       struct pispbe_job_descriptor,
> +				       queue);
> +	if (!job)
>  		goto unlock_and_return;
> 
> +	list_del(&job->queue);
> +
> +	for (unsigned int i = 0; i < PISPBE_NUM_NODES; i++)
> +		pispbe->queued_job.buf[i] = job->buf[i];
> +	pispbe->queued_job.valid = true;

Could queued_job and running_job be turned into pointers ?

> +
>  	/*
>  	 * We can kick the job off without the hw_lock, as this can
>  	 * never run again until hw_busy is cleared, which will happen
> @@ -591,9 +611,9 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
>  	pispbe->hw_busy = true;
>  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> 
> -	if (job.config->num_tiles <= 0 ||
> -	    job.config->num_tiles > PISP_BACK_END_NUM_TILES ||
> -	    !((job.hw_enables.bayer_enables | job.hw_enables.rgb_enables) &
> +	if (job->config->num_tiles <= 0 ||
> +	    job->config->num_tiles > PISP_BACK_END_NUM_TILES ||
> +	    !((job->hw_enables.bayer_enables | job->hw_enables.rgb_enables) &
>  	      PISP_BE_BAYER_ENABLE_INPUT)) {
>  		/*
>  		 * Bad job. We can't let it proceed as it could lock up
> @@ -605,11 +625,12 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
>  		 * but we seem to survive...
>  		 */
>  		dev_dbg(pispbe->dev, "Bad job: invalid number of tiles: %u\n",
> -			job.config->num_tiles);
> -		job.config->num_tiles = 0;
> +			job->config->num_tiles);
> +		job->config->num_tiles = 0;
>  	}

Does this need to be done at schedule time or can it be moved to prepare
time ?

> 
> -	pispbe_queue_job(pispbe, &job);
> +	pispbe_queue_job(pispbe, job);
> +	kfree(job);
> 
>  	return;
> 
> @@ -871,7 +892,8 @@ static void pispbe_node_buffer_queue(struct vb2_buffer *buf)
>  	 * Every time we add a buffer, check if there's now some work for the hw
>  	 * to do.
>  	 */
> -	pispbe_schedule(pispbe, false);
> +	if (!pispbe_prepare_job(pispbe))
> +		pispbe_schedule(pispbe, false);
>  }
> 
>  static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> @@ -897,7 +919,8 @@ static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
>  		node->pispbe->streaming_map);
> 
>  	/* Maybe we're ready to run. */
> -	pispbe_schedule(pispbe, false);
> +	if (!pispbe_prepare_job(pispbe))
> +		pispbe_schedule(pispbe, false);
> 
>  	return 0;
> 
> @@ -949,6 +972,21 @@ static void pispbe_node_stop_streaming(struct vb2_queue *q)
> 
>  	spin_lock_irqsave(&pispbe->hw_lock, flags);
>  	pispbe->streaming_map &= ~BIT(node->id);
> +
> +	/* Release all jobs once all nodes have stopped streaming. */
> +	if (pispbe->streaming_map == 0) {
> +		struct pispbe_job_descriptor *job;
> +
> +		do {
> +			job = list_first_entry_or_null(&pispbe->job_queue,
> +						struct pispbe_job_descriptor,
> +						queue);
> +			if (job) {
> +				list_del(&job->queue);
> +				kfree(job);
> +			}
> +		} while (!list_empty(&pispbe->job_queue));

How about a list_for_each_entry_safe() ?

> +	}
>  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> 
>  	pm_runtime_mark_last_busy(pispbe->dev);
> @@ -1691,6 +1729,8 @@ static int pispbe_probe(struct platform_device *pdev)
>  	if (!pispbe)
>  		return -ENOMEM;
> 
> +	INIT_LIST_HEAD(&pispbe->job_queue);
> +
>  	dev_set_drvdata(&pdev->dev, pispbe);
>  	pispbe->dev = &pdev->dev;
>  	platform_set_drvdata(pdev, pispbe);
  
Jacopo Mondi Aug. 26, 2024, 9:36 a.m. UTC | #2
Hi Laurent

On Sun, Aug 11, 2024 at 10:41:41PM GMT, Laurent Pinchart wrote:
> Hi Jacopo,
>
> Thank you for the patch.
>
> On Tue, Aug 06, 2024 at 04:36:51PM +0200, Jacopo Mondi wrote:
> > Currently the 'pispbe_schedule()' function does two things:
> >
> > 1) Tries to assemble a job by inspecting all the video node queues
> >    to make sure all the required buffers are available
> > 2) Submit the job to the hardware
> >
> > The pispbe_schedule() function is called at:
> >
> > - video device start_streaming() time
> > - video device qbuf() time
> > - irq handler
> >
> > As assembling a job requires inspecting all queues, it is a rather
> > time consuming operation which is better not run in IRQ context.
> >
> > To avoid executing the time consuming job creation in interrupt
> > context, split the job creation and job scheduling in two distinct
> > operations. When a well-formed job is created, append it to the
> > newly introduced 'pispbe->job_queue' where it will be dequeued from
> > by the scheduling routine.
> >
> > At start_streaming() and qbuf() time immediately try to schedule a job
> > if one has been created as the irq handler routine is only called when
> > a job has completed, and we can't solely rely on it for scheduling new
> > jobs.
> >
> > Signed-off-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
> > ---
> >  .../platform/raspberrypi/pisp_be/pisp_be.c    | 98 +++++++++++++------
> >  1 file changed, 69 insertions(+), 29 deletions(-)
> >
> > diff --git a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > index 65ff2382cffe..ae8b79530e72 100644
> > --- a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > +++ b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > @@ -190,9 +190,11 @@ struct pispbe_hw_enables {
> >
> >  /* Records a job configuration and memory addresses. */
> >  struct pispbe_job_descriptor {
> > +	struct pispbe_buffer *buf[PISPBE_NUM_NODES];
>
> Maybe bufs instead of buf, or even buffers.
>
> >  	dma_addr_t hw_dma_addrs[N_HW_ADDRESSES];
> >  	struct pisp_be_tiles_config *config;
> >  	struct pispbe_hw_enables hw_enables;
> > +	struct list_head queue;
>
> I'd put this one at the top, it feels weird to have it here in the
> middle.
>
> >  	dma_addr_t tiles;
> >  };
> >
> > @@ -215,8 +217,10 @@ struct pispbe_dev {
> >  	unsigned int sequence;
> >  	u32 streaming_map;
> >  	struct pispbe_job queued_job, running_job;
> > -	spinlock_t hw_lock; /* protects "hw_busy" flag and streaming_map */
> > +	/* protects "hw_busy" flag, streaming_map and job queue*/
>
> s/job queue/job_queue /
>
> > +	spinlock_t hw_lock;
> >  	bool hw_busy; /* non-zero if a job is queued or is being started */
> > +	struct list_head job_queue;
> >  	int irq;
> >  	u32 hw_version;
> >  	u8 done, started;
> > @@ -443,27 +447,32 @@ static void pispbe_xlate_addrs(struct pispbe_dev *pispbe,
>
> While at it, in pispbe_xlate_addrs() there's a comment referencing
> pispbe_schedule_internal(). That function doesn't exist, could you fix
> it ?

Sure, I'll drop it

>
> >   * For Output0, Output1, Tdn and Stitch, a buffer only needs to be
> >   * available if the blocks are enabled in the config.
> >   *
> > - * Needs to be called with hw_lock held.
> > + * If all the buffers required to form a job are available, append the
> > + * job descriptor to the job queue to be later queued to the HW.
> >   *
> >   * Returns 0 if a job has been successfully prepared, < 0 otherwise.
> >   */
> > -static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > -			      struct pispbe_job_descriptor *job)
> > +static int pispbe_prepare_job(struct pispbe_dev *pispbe)
> >  {
> >  	struct pispbe_buffer *buf[PISPBE_NUM_NODES] = {};
> > +	struct pispbe_job_descriptor *job;
> > +	unsigned int streaming_map;
> >  	unsigned int config_index;
> >  	struct pispbe_node *node;
> >  	unsigned long flags;
> >
> > -	lockdep_assert_held(&pispbe->hw_lock);
> > -
> > -	memset(job, 0, sizeof(struct pispbe_job_descriptor));
> > +	spin_lock_irqsave(&pispbe->hw_lock, flags);
>
> Is _irqsave() the right version, now that this function isn't called
> from interrupt context anymore ?
>

Yeah, I should probably drop the _irqsave() version

> > +	streaming_map = pispbe->streaming_map;
> > +	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
>
> Are you taking the lock here to honour the lock documentation, or is it
> actually needed ?
>

The lock protects pispbe->streaming_map which is accessed for writing
by start/stop_streaming which can be called on a video node
concurrently to a qbuf (which again calls pispbe_prepare_job() and
access pispbe->streaming_map) on a different video node. If I'm not
mistaken the lock is needed then

> >
> > -	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) &
> > -		pispbe->streaming_map) !=
> > -			(BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
> > +	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) & streaming_map) !=
> > +	    (BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
> >  		return -ENODEV;
>
> Another way to handle this could be
>
> 	scoped_guard(spinlock_irqsave)(&pispbe->hw_lock) {
> 		static const mask = BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE);
>
> 		if (pispbe->streaming_map & mask != mask)
> 			return -ENODEV;
> 	}
>
> I think I'm starting to really like scoped guards.

It's a new construct to me, I'll use it as it seems nice

>
> I however wonder if we really need to keep this check. Isn't it enough
> that buffers have been queued ? I also think we need to revisit when to
> actually start and stop streaming, you remove queued jobs when all video
> devices have been stopped, but only prepare them when both the config
> and main input buffers are available. I think it would be better to take
> links into account.
>

Probably yes, if I'm not mistaken the same has been suggested for the
C55. However it can be done on top probably

> >
> > +	job = kzalloc(sizeof(*job), GFP_KERNEL);
> > +	if (!job)
> > +		return -ENOMEM;
> > +
> >  	node = &pispbe->node[CONFIG_NODE];
> >  	spin_lock_irqsave(&node->ready_lock, flags);
> >  	buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> > @@ -471,13 +480,15 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> >  						    ready_list);
> >  	if (buf[CONFIG_NODE]) {
> >  		list_del(&buf[CONFIG_NODE]->ready_list);
> > -		pispbe->queued_job.buf[CONFIG_NODE] = buf[CONFIG_NODE];
> > +		job->buf[CONFIG_NODE] = buf[CONFIG_NODE];
> >  	}
> >  	spin_unlock_irqrestore(&node->ready_lock, flags);
> >
> >  	/* Exit early if no config buffer has been queued. */
> > -	if (!buf[CONFIG_NODE])
> > +	if (!buf[CONFIG_NODE]) {
> > +		kfree(job);
> >  		return -ENODEV;
> > +	}
> >
> >  	config_index = buf[CONFIG_NODE]->vb.vb2_buf.index;
> >  	job->config = &pispbe->config[config_index];
> > @@ -498,7 +509,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> >  			continue;
> >
> >  		buf[i] = NULL;
> > -		if (!(pispbe->streaming_map & BIT(i)))
> > +		if (!(streaming_map & BIT(i)))
> >  			continue;
> >
> >  		if ((!(rgb_en & PISP_BE_RGB_ENABLE_OUTPUT0) &&
> > @@ -531,7 +542,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> >  						  ready_list);
> >  		if (buf[i]) {
> >  			list_del(&buf[i]->ready_list);
> > -			pispbe->queued_job.buf[i] = buf[i];
> > +			job->buf[i] = buf[i];
> >  		}
> >  		spin_unlock_irqrestore(&node->ready_lock, flags);
> >
> > @@ -539,11 +550,13 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> >  			goto err_return_buffers;
> >  	}
> >
> > -	pispbe->queued_job.valid = true;
> > -
> >  	/* Convert buffers to DMA addresses for the hardware */
> >  	pispbe_xlate_addrs(pispbe, job, buf);
> >
> > +	spin_lock_irqsave(&pispbe->hw_lock, flags);
> > +	list_add_tail(&job->queue, &pispbe->job_queue);
> > +	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > +
> >  	return 0;
> >
> >  err_return_buffers:
> > @@ -559,16 +572,15 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> >  		spin_unlock_irqrestore(&n->ready_lock, flags);
> >  	}
> >
> > -	memset(&pispbe->queued_job, 0, sizeof(pispbe->queued_job));
> > +	kfree(job);
> >
> >  	return -ENODEV;
> >  }
> >
> >  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> >  {
> > -	struct pispbe_job_descriptor job;
> > +	struct pispbe_job_descriptor *job;
> >  	unsigned long flags;
> > -	int ret;
> >
> >  	spin_lock_irqsave(&pispbe->hw_lock, flags);
> >
> > @@ -578,10 +590,18 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> >  	if (pispbe->hw_busy)
> >  		goto unlock_and_return;
> >
> > -	ret = pispbe_prepare_job(pispbe, &job);
> > -	if (ret)
> > +	job = list_first_entry_or_null(&pispbe->job_queue,
> > +				       struct pispbe_job_descriptor,
> > +				       queue);
> > +	if (!job)
> >  		goto unlock_and_return;
> >
> > +	list_del(&job->queue);
> > +
> > +	for (unsigned int i = 0; i < PISPBE_NUM_NODES; i++)
> > +		pispbe->queued_job.buf[i] = job->buf[i];
> > +	pispbe->queued_job.valid = true;
>
> Could queued_job and running_job be turned into pointers ?
>

I should probably unify struct pispbe_job and struct
pispbe_job_descriptor as the former contains a subset of the latter.
With this, I can make both queued_job and running_job pointers most
probably.

> > +
> >  	/*
> >  	 * We can kick the job off without the hw_lock, as this can
> >  	 * never run again until hw_busy is cleared, which will happen
> > @@ -591,9 +611,9 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> >  	pispbe->hw_busy = true;
> >  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> >
> > -	if (job.config->num_tiles <= 0 ||
> > -	    job.config->num_tiles > PISP_BACK_END_NUM_TILES ||
> > -	    !((job.hw_enables.bayer_enables | job.hw_enables.rgb_enables) &
> > +	if (job->config->num_tiles <= 0 ||
> > +	    job->config->num_tiles > PISP_BACK_END_NUM_TILES ||
> > +	    !((job->hw_enables.bayer_enables | job->hw_enables.rgb_enables) &
> >  	      PISP_BE_BAYER_ENABLE_INPUT)) {
> >  		/*
> >  		 * Bad job. We can't let it proceed as it could lock up
> > @@ -605,11 +625,12 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> >  		 * but we seem to survive...
> >  		 */
> >  		dev_dbg(pispbe->dev, "Bad job: invalid number of tiles: %u\n",
> > -			job.config->num_tiles);
> > -		job.config->num_tiles = 0;
> > +			job->config->num_tiles);
> > +		job->config->num_tiles = 0;
> >  	}
>
> Does this need to be done at schedule time or can it be moved to prepare
> time ?

The num_tiles validation can even be moved to
pisp_be_validate_config() when a buffer is queued to the config node.

When it comes to the hw_enables mask validatation I think it already
is validated by

       	if (!(bayer_enables & PISP_BE_BAYER_ENABLE_INPUT) ==
	    !(rgb_enables & PISP_BE_RGB_ENABLE_INPUT)) {
		dev_dbg(dev, "%s: Not one input enabled\n", __func__);
		return -EIO;
	}

in pisp_be_validate_config() so it can probably be dropped from here


>
> >
> > -	pispbe_queue_job(pispbe, &job);
> > +	pispbe_queue_job(pispbe, job);
> > +	kfree(job);
> >
> >  	return;
> >
> > @@ -871,7 +892,8 @@ static void pispbe_node_buffer_queue(struct vb2_buffer *buf)
> >  	 * Every time we add a buffer, check if there's now some work for the hw
> >  	 * to do.
> >  	 */
> > -	pispbe_schedule(pispbe, false);
> > +	if (!pispbe_prepare_job(pispbe))
> > +		pispbe_schedule(pispbe, false);
> >  }
> >
> >  static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> > @@ -897,7 +919,8 @@ static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> >  		node->pispbe->streaming_map);
> >
> >  	/* Maybe we're ready to run. */
> > -	pispbe_schedule(pispbe, false);
> > +	if (!pispbe_prepare_job(pispbe))
> > +		pispbe_schedule(pispbe, false);
> >
> >  	return 0;
> >
> > @@ -949,6 +972,21 @@ static void pispbe_node_stop_streaming(struct vb2_queue *q)
> >
> >  	spin_lock_irqsave(&pispbe->hw_lock, flags);
> >  	pispbe->streaming_map &= ~BIT(node->id);
> > +
> > +	/* Release all jobs once all nodes have stopped streaming. */
> > +	if (pispbe->streaming_map == 0) {
> > +		struct pispbe_job_descriptor *job;
> > +
> > +		do {
> > +			job = list_first_entry_or_null(&pispbe->job_queue,
> > +						struct pispbe_job_descriptor,
> > +						queue);
> > +			if (job) {
> > +				list_del(&job->queue);
> > +				kfree(job);
> > +			}
> > +		} while (!list_empty(&pispbe->job_queue));
>
> How about a list_for_each_entry_safe() ?

probably better!

>
> > +	}
> >  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> >
> >  	pm_runtime_mark_last_busy(pispbe->dev);
> > @@ -1691,6 +1729,8 @@ static int pispbe_probe(struct platform_device *pdev)
> >  	if (!pispbe)
> >  		return -ENOMEM;
> >
> > +	INIT_LIST_HEAD(&pispbe->job_queue);
> > +
> >  	dev_set_drvdata(&pdev->dev, pispbe);
> >  	pispbe->dev = &pdev->dev;
> >  	platform_set_drvdata(pdev, pispbe);
>
> --
> Regards,
>
> Laurent Pinchart
  
Laurent Pinchart Aug. 26, 2024, 9:52 a.m. UTC | #3
On Mon, Aug 26, 2024 at 11:36:07AM +0200, Jacopo Mondi wrote:
> On Sun, Aug 11, 2024 at 10:41:41PM GMT, Laurent Pinchart wrote:
> > On Tue, Aug 06, 2024 at 04:36:51PM +0200, Jacopo Mondi wrote:
> > > Currently the 'pispbe_schedule()' function does two things:
> > >
> > > 1) Tries to assemble a job by inspecting all the video node queues
> > >    to make sure all the required buffers are available
> > > 2) Submit the job to the hardware
> > >
> > > The pispbe_schedule() function is called at:
> > >
> > > - video device start_streaming() time
> > > - video device qbuf() time
> > > - irq handler
> > >
> > > As assembling a job requires inspecting all queues, it is a rather
> > > time consuming operation which is better not run in IRQ context.
> > >
> > > To avoid executing the time consuming job creation in interrupt
> > > context, split the job creation and job scheduling in two distinct
> > > operations. When a well-formed job is created, append it to the
> > > newly introduced 'pispbe->job_queue' where it will be dequeued from
> > > by the scheduling routine.
> > >
> > > At start_streaming() and qbuf() time immediately try to schedule a job
> > > if one has been created as the irq handler routine is only called when
> > > a job has completed, and we can't solely rely on it for scheduling new
> > > jobs.
> > >
> > > Signed-off-by: Jacopo Mondi <jacopo.mondi@ideasonboard.com>
> > > ---
> > >  .../platform/raspberrypi/pisp_be/pisp_be.c    | 98 +++++++++++++------
> > >  1 file changed, 69 insertions(+), 29 deletions(-)
> > >
> > > diff --git a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > > index 65ff2382cffe..ae8b79530e72 100644
> > > --- a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > > +++ b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
> > > @@ -190,9 +190,11 @@ struct pispbe_hw_enables {
> > >
> > >  /* Records a job configuration and memory addresses. */
> > >  struct pispbe_job_descriptor {
> > > +	struct pispbe_buffer *buf[PISPBE_NUM_NODES];
> >
> > Maybe bufs instead of buf, or even buffers.
> >
> > >  	dma_addr_t hw_dma_addrs[N_HW_ADDRESSES];
> > >  	struct pisp_be_tiles_config *config;
> > >  	struct pispbe_hw_enables hw_enables;
> > > +	struct list_head queue;
> >
> > I'd put this one at the top, it feels weird to have it here in the
> > middle.
> >
> > >  	dma_addr_t tiles;
> > >  };
> > >
> > > @@ -215,8 +217,10 @@ struct pispbe_dev {
> > >  	unsigned int sequence;
> > >  	u32 streaming_map;
> > >  	struct pispbe_job queued_job, running_job;
> > > -	spinlock_t hw_lock; /* protects "hw_busy" flag and streaming_map */
> > > +	/* protects "hw_busy" flag, streaming_map and job queue*/
> >
> > s/job queue/job_queue /
> >
> > > +	spinlock_t hw_lock;
> > >  	bool hw_busy; /* non-zero if a job is queued or is being started */
> > > +	struct list_head job_queue;
> > >  	int irq;
> > >  	u32 hw_version;
> > >  	u8 done, started;
> > > @@ -443,27 +447,32 @@ static void pispbe_xlate_addrs(struct pispbe_dev *pispbe,
> >
> > While at it, in pispbe_xlate_addrs() there's a comment referencing
> > pispbe_schedule_internal(). That function doesn't exist, could you fix
> > it ?
> 
> Sure, I'll drop it
> 
> >
> > >   * For Output0, Output1, Tdn and Stitch, a buffer only needs to be
> > >   * available if the blocks are enabled in the config.
> > >   *
> > > - * Needs to be called with hw_lock held.
> > > + * If all the buffers required to form a job are available, append the
> > > + * job descriptor to the job queue to be later queued to the HW.
> > >   *
> > >   * Returns 0 if a job has been successfully prepared, < 0 otherwise.
> > >   */
> > > -static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > > -			      struct pispbe_job_descriptor *job)
> > > +static int pispbe_prepare_job(struct pispbe_dev *pispbe)
> > >  {
> > >  	struct pispbe_buffer *buf[PISPBE_NUM_NODES] = {};
> > > +	struct pispbe_job_descriptor *job;
> > > +	unsigned int streaming_map;
> > >  	unsigned int config_index;
> > >  	struct pispbe_node *node;
> > >  	unsigned long flags;
> > >
> > > -	lockdep_assert_held(&pispbe->hw_lock);
> > > -
> > > -	memset(job, 0, sizeof(struct pispbe_job_descriptor));
> > > +	spin_lock_irqsave(&pispbe->hw_lock, flags);
> >
> > Is _irqsave() the right version, now that this function isn't called
> > from interrupt context anymore ?
> >
> 
> Yeah, I should probably drop the _irqsave() version
> 
> > > +	streaming_map = pispbe->streaming_map;
> > > +	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> >
> > Are you taking the lock here to honour the lock documentation, or is it
> > actually needed ?
> 
> The lock protects pispbe->streaming_map which is accessed for writing
> by start/stop_streaming which can be called on a video node
> concurrently to a qbuf (which again calls pispbe_prepare_job() and
> access pispbe->streaming_map) on a different video node. If I'm not
> mistaken the lock is needed then

Right.

There will be a race then, as pispbe->streaming_map could change right
after you drop the lock. Hence the comment below regarding whether we
really need to keep this check.

> > >
> > > -	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) &
> > > -		pispbe->streaming_map) !=
> > > -			(BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
> > > +	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) & streaming_map) !=
> > > +	    (BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
> > >  		return -ENODEV;
> >
> > Another way to handle this could be
> >
> > 	scoped_guard(spinlock_irqsave)(&pispbe->hw_lock) {
> > 		static const mask = BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE);
> >
> > 		if (pispbe->streaming_map & mask != mask)
> > 			return -ENODEV;
> > 	}
> >
> > I think I'm starting to really like scoped guards.
> 
> It's a new construct to me, I'll use it as it seems nice
> 
> > I however wonder if we really need to keep this check. Isn't it enough
> > that buffers have been queued ? I also think we need to revisit when to
> > actually start and stop streaming, you remove queued jobs when all video
> > devices have been stopped, but only prepare them when both the config
> > and main input buffers are available. I think it would be better to take
> > links into account.
> 
> Probably yes, if I'm not mistaken the same has been suggested for the
> C55. However it can be done on top probably

Fine with me.

> > >
> > > +	job = kzalloc(sizeof(*job), GFP_KERNEL);
> > > +	if (!job)
> > > +		return -ENOMEM;
> > > +
> > >  	node = &pispbe->node[CONFIG_NODE];
> > >  	spin_lock_irqsave(&node->ready_lock, flags);
> > >  	buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
> > > @@ -471,13 +480,15 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > >  						    ready_list);
> > >  	if (buf[CONFIG_NODE]) {
> > >  		list_del(&buf[CONFIG_NODE]->ready_list);
> > > -		pispbe->queued_job.buf[CONFIG_NODE] = buf[CONFIG_NODE];
> > > +		job->buf[CONFIG_NODE] = buf[CONFIG_NODE];
> > >  	}
> > >  	spin_unlock_irqrestore(&node->ready_lock, flags);
> > >
> > >  	/* Exit early if no config buffer has been queued. */
> > > -	if (!buf[CONFIG_NODE])
> > > +	if (!buf[CONFIG_NODE]) {
> > > +		kfree(job);
> > >  		return -ENODEV;
> > > +	}
> > >
> > >  	config_index = buf[CONFIG_NODE]->vb.vb2_buf.index;
> > >  	job->config = &pispbe->config[config_index];
> > > @@ -498,7 +509,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > >  			continue;
> > >
> > >  		buf[i] = NULL;
> > > -		if (!(pispbe->streaming_map & BIT(i)))
> > > +		if (!(streaming_map & BIT(i)))
> > >  			continue;
> > >
> > >  		if ((!(rgb_en & PISP_BE_RGB_ENABLE_OUTPUT0) &&
> > > @@ -531,7 +542,7 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > >  						  ready_list);
> > >  		if (buf[i]) {
> > >  			list_del(&buf[i]->ready_list);
> > > -			pispbe->queued_job.buf[i] = buf[i];
> > > +			job->buf[i] = buf[i];
> > >  		}
> > >  		spin_unlock_irqrestore(&node->ready_lock, flags);
> > >
> > > @@ -539,11 +550,13 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > >  			goto err_return_buffers;
> > >  	}
> > >
> > > -	pispbe->queued_job.valid = true;
> > > -
> > >  	/* Convert buffers to DMA addresses for the hardware */
> > >  	pispbe_xlate_addrs(pispbe, job, buf);
> > >
> > > +	spin_lock_irqsave(&pispbe->hw_lock, flags);
> > > +	list_add_tail(&job->queue, &pispbe->job_queue);
> > > +	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > > +
> > >  	return 0;
> > >
> > >  err_return_buffers:
> > > @@ -559,16 +572,15 @@ static int pispbe_prepare_job(struct pispbe_dev *pispbe,
> > >  		spin_unlock_irqrestore(&n->ready_lock, flags);
> > >  	}
> > >
> > > -	memset(&pispbe->queued_job, 0, sizeof(pispbe->queued_job));
> > > +	kfree(job);
> > >
> > >  	return -ENODEV;
> > >  }
> > >
> > >  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> > >  {
> > > -	struct pispbe_job_descriptor job;
> > > +	struct pispbe_job_descriptor *job;
> > >  	unsigned long flags;
> > > -	int ret;
> > >
> > >  	spin_lock_irqsave(&pispbe->hw_lock, flags);
> > >
> > > @@ -578,10 +590,18 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> > >  	if (pispbe->hw_busy)
> > >  		goto unlock_and_return;
> > >
> > > -	ret = pispbe_prepare_job(pispbe, &job);
> > > -	if (ret)
> > > +	job = list_first_entry_or_null(&pispbe->job_queue,
> > > +				       struct pispbe_job_descriptor,
> > > +				       queue);
> > > +	if (!job)
> > >  		goto unlock_and_return;
> > >
> > > +	list_del(&job->queue);
> > > +
> > > +	for (unsigned int i = 0; i < PISPBE_NUM_NODES; i++)
> > > +		pispbe->queued_job.buf[i] = job->buf[i];
> > > +	pispbe->queued_job.valid = true;
> >
> > Could queued_job and running_job be turned into pointers ?
> 
> I should probably unify struct pispbe_job and struct
> pispbe_job_descriptor as the former contains a subset of the latter.
> With this, I can make both queued_job and running_job pointers most
> probably.
> 
> > > +
> > >  	/*
> > >  	 * We can kick the job off without the hw_lock, as this can
> > >  	 * never run again until hw_busy is cleared, which will happen
> > > @@ -591,9 +611,9 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> > >  	pispbe->hw_busy = true;
> > >  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > >
> > > -	if (job.config->num_tiles <= 0 ||
> > > -	    job.config->num_tiles > PISP_BACK_END_NUM_TILES ||
> > > -	    !((job.hw_enables.bayer_enables | job.hw_enables.rgb_enables) &
> > > +	if (job->config->num_tiles <= 0 ||
> > > +	    job->config->num_tiles > PISP_BACK_END_NUM_TILES ||
> > > +	    !((job->hw_enables.bayer_enables | job->hw_enables.rgb_enables) &
> > >  	      PISP_BE_BAYER_ENABLE_INPUT)) {
> > >  		/*
> > >  		 * Bad job. We can't let it proceed as it could lock up
> > > @@ -605,11 +625,12 @@ static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
> > >  		 * but we seem to survive...
> > >  		 */
> > >  		dev_dbg(pispbe->dev, "Bad job: invalid number of tiles: %u\n",
> > > -			job.config->num_tiles);
> > > -		job.config->num_tiles = 0;
> > > +			job->config->num_tiles);
> > > +		job->config->num_tiles = 0;
> > >  	}
> >
> > Does this need to be done at schedule time or can it be moved to prepare
> > time ?
> 
> The num_tiles validation can even be moved to
> pisp_be_validate_config() when a buffer is queued to the config node.
> 
> When it comes to the hw_enables mask validatation I think it already
> is validated by
> 
>        	if (!(bayer_enables & PISP_BE_BAYER_ENABLE_INPUT) ==
> 	    !(rgb_enables & PISP_BE_RGB_ENABLE_INPUT)) {
> 		dev_dbg(dev, "%s: Not one input enabled\n", __func__);
> 		return -EIO;
> 	}
> 
> in pisp_be_validate_config() so it can probably be dropped from here
> 
> > >
> > > -	pispbe_queue_job(pispbe, &job);
> > > +	pispbe_queue_job(pispbe, job);
> > > +	kfree(job);
> > >
> > >  	return;
> > >
> > > @@ -871,7 +892,8 @@ static void pispbe_node_buffer_queue(struct vb2_buffer *buf)
> > >  	 * Every time we add a buffer, check if there's now some work for the hw
> > >  	 * to do.
> > >  	 */
> > > -	pispbe_schedule(pispbe, false);
> > > +	if (!pispbe_prepare_job(pispbe))
> > > +		pispbe_schedule(pispbe, false);
> > >  }
> > >
> > >  static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> > > @@ -897,7 +919,8 @@ static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
> > >  		node->pispbe->streaming_map);
> > >
> > >  	/* Maybe we're ready to run. */
> > > -	pispbe_schedule(pispbe, false);
> > > +	if (!pispbe_prepare_job(pispbe))
> > > +		pispbe_schedule(pispbe, false);
> > >
> > >  	return 0;
> > >
> > > @@ -949,6 +972,21 @@ static void pispbe_node_stop_streaming(struct vb2_queue *q)
> > >
> > >  	spin_lock_irqsave(&pispbe->hw_lock, flags);
> > >  	pispbe->streaming_map &= ~BIT(node->id);
> > > +
> > > +	/* Release all jobs once all nodes have stopped streaming. */
> > > +	if (pispbe->streaming_map == 0) {
> > > +		struct pispbe_job_descriptor *job;
> > > +
> > > +		do {
> > > +			job = list_first_entry_or_null(&pispbe->job_queue,
> > > +						struct pispbe_job_descriptor,
> > > +						queue);
> > > +			if (job) {
> > > +				list_del(&job->queue);
> > > +				kfree(job);
> > > +			}
> > > +		} while (!list_empty(&pispbe->job_queue));
> >
> > How about a list_for_each_entry_safe() ?
> 
> probably better!
> 
> >
> > > +	}
> > >  	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
> > >
> > >  	pm_runtime_mark_last_busy(pispbe->dev);
> > > @@ -1691,6 +1729,8 @@ static int pispbe_probe(struct platform_device *pdev)
> > >  	if (!pispbe)
> > >  		return -ENOMEM;
> > >
> > > +	INIT_LIST_HEAD(&pispbe->job_queue);
> > > +
> > >  	dev_set_drvdata(&pdev->dev, pispbe);
> > >  	pispbe->dev = &pdev->dev;
> > >  	platform_set_drvdata(pdev, pispbe);
  

Patch

diff --git a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
index 65ff2382cffe..ae8b79530e72 100644
--- a/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
+++ b/drivers/media/platform/raspberrypi/pisp_be/pisp_be.c
@@ -190,9 +190,11 @@  struct pispbe_hw_enables {

 /* Records a job configuration and memory addresses. */
 struct pispbe_job_descriptor {
+	struct pispbe_buffer *buf[PISPBE_NUM_NODES];
 	dma_addr_t hw_dma_addrs[N_HW_ADDRESSES];
 	struct pisp_be_tiles_config *config;
 	struct pispbe_hw_enables hw_enables;
+	struct list_head queue;
 	dma_addr_t tiles;
 };

@@ -215,8 +217,10 @@  struct pispbe_dev {
 	unsigned int sequence;
 	u32 streaming_map;
 	struct pispbe_job queued_job, running_job;
-	spinlock_t hw_lock; /* protects "hw_busy" flag and streaming_map */
+	/* protects "hw_busy" flag, streaming_map and job queue*/
+	spinlock_t hw_lock;
 	bool hw_busy; /* non-zero if a job is queued or is being started */
+	struct list_head job_queue;
 	int irq;
 	u32 hw_version;
 	u8 done, started;
@@ -443,27 +447,32 @@  static void pispbe_xlate_addrs(struct pispbe_dev *pispbe,
  * For Output0, Output1, Tdn and Stitch, a buffer only needs to be
  * available if the blocks are enabled in the config.
  *
- * Needs to be called with hw_lock held.
+ * If all the buffers required to form a job are available, append the
+ * job descriptor to the job queue to be later queued to the HW.
  *
  * Returns 0 if a job has been successfully prepared, < 0 otherwise.
  */
-static int pispbe_prepare_job(struct pispbe_dev *pispbe,
-			      struct pispbe_job_descriptor *job)
+static int pispbe_prepare_job(struct pispbe_dev *pispbe)
 {
 	struct pispbe_buffer *buf[PISPBE_NUM_NODES] = {};
+	struct pispbe_job_descriptor *job;
+	unsigned int streaming_map;
 	unsigned int config_index;
 	struct pispbe_node *node;
 	unsigned long flags;

-	lockdep_assert_held(&pispbe->hw_lock);
-
-	memset(job, 0, sizeof(struct pispbe_job_descriptor));
+	spin_lock_irqsave(&pispbe->hw_lock, flags);
+	streaming_map = pispbe->streaming_map;
+	spin_unlock_irqrestore(&pispbe->hw_lock, flags);

-	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) &
-		pispbe->streaming_map) !=
-			(BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
+	if (((BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)) & streaming_map) !=
+	    (BIT(CONFIG_NODE) | BIT(MAIN_INPUT_NODE)))
 		return -ENODEV;

+	job = kzalloc(sizeof(*job), GFP_KERNEL);
+	if (!job)
+		return -ENOMEM;
+
 	node = &pispbe->node[CONFIG_NODE];
 	spin_lock_irqsave(&node->ready_lock, flags);
 	buf[CONFIG_NODE] = list_first_entry_or_null(&node->ready_queue,
@@ -471,13 +480,15 @@  static int pispbe_prepare_job(struct pispbe_dev *pispbe,
 						    ready_list);
 	if (buf[CONFIG_NODE]) {
 		list_del(&buf[CONFIG_NODE]->ready_list);
-		pispbe->queued_job.buf[CONFIG_NODE] = buf[CONFIG_NODE];
+		job->buf[CONFIG_NODE] = buf[CONFIG_NODE];
 	}
 	spin_unlock_irqrestore(&node->ready_lock, flags);

 	/* Exit early if no config buffer has been queued. */
-	if (!buf[CONFIG_NODE])
+	if (!buf[CONFIG_NODE]) {
+		kfree(job);
 		return -ENODEV;
+	}

 	config_index = buf[CONFIG_NODE]->vb.vb2_buf.index;
 	job->config = &pispbe->config[config_index];
@@ -498,7 +509,7 @@  static int pispbe_prepare_job(struct pispbe_dev *pispbe,
 			continue;

 		buf[i] = NULL;
-		if (!(pispbe->streaming_map & BIT(i)))
+		if (!(streaming_map & BIT(i)))
 			continue;

 		if ((!(rgb_en & PISP_BE_RGB_ENABLE_OUTPUT0) &&
@@ -531,7 +542,7 @@  static int pispbe_prepare_job(struct pispbe_dev *pispbe,
 						  ready_list);
 		if (buf[i]) {
 			list_del(&buf[i]->ready_list);
-			pispbe->queued_job.buf[i] = buf[i];
+			job->buf[i] = buf[i];
 		}
 		spin_unlock_irqrestore(&node->ready_lock, flags);

@@ -539,11 +550,13 @@  static int pispbe_prepare_job(struct pispbe_dev *pispbe,
 			goto err_return_buffers;
 	}

-	pispbe->queued_job.valid = true;
-
 	/* Convert buffers to DMA addresses for the hardware */
 	pispbe_xlate_addrs(pispbe, job, buf);

+	spin_lock_irqsave(&pispbe->hw_lock, flags);
+	list_add_tail(&job->queue, &pispbe->job_queue);
+	spin_unlock_irqrestore(&pispbe->hw_lock, flags);
+
 	return 0;

 err_return_buffers:
@@ -559,16 +572,15 @@  static int pispbe_prepare_job(struct pispbe_dev *pispbe,
 		spin_unlock_irqrestore(&n->ready_lock, flags);
 	}

-	memset(&pispbe->queued_job, 0, sizeof(pispbe->queued_job));
+	kfree(job);

 	return -ENODEV;
 }

 static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
 {
-	struct pispbe_job_descriptor job;
+	struct pispbe_job_descriptor *job;
 	unsigned long flags;
-	int ret;

 	spin_lock_irqsave(&pispbe->hw_lock, flags);

@@ -578,10 +590,18 @@  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
 	if (pispbe->hw_busy)
 		goto unlock_and_return;

-	ret = pispbe_prepare_job(pispbe, &job);
-	if (ret)
+	job = list_first_entry_or_null(&pispbe->job_queue,
+				       struct pispbe_job_descriptor,
+				       queue);
+	if (!job)
 		goto unlock_and_return;

+	list_del(&job->queue);
+
+	for (unsigned int i = 0; i < PISPBE_NUM_NODES; i++)
+		pispbe->queued_job.buf[i] = job->buf[i];
+	pispbe->queued_job.valid = true;
+
 	/*
 	 * We can kick the job off without the hw_lock, as this can
 	 * never run again until hw_busy is cleared, which will happen
@@ -591,9 +611,9 @@  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
 	pispbe->hw_busy = true;
 	spin_unlock_irqrestore(&pispbe->hw_lock, flags);

-	if (job.config->num_tiles <= 0 ||
-	    job.config->num_tiles > PISP_BACK_END_NUM_TILES ||
-	    !((job.hw_enables.bayer_enables | job.hw_enables.rgb_enables) &
+	if (job->config->num_tiles <= 0 ||
+	    job->config->num_tiles > PISP_BACK_END_NUM_TILES ||
+	    !((job->hw_enables.bayer_enables | job->hw_enables.rgb_enables) &
 	      PISP_BE_BAYER_ENABLE_INPUT)) {
 		/*
 		 * Bad job. We can't let it proceed as it could lock up
@@ -605,11 +625,12 @@  static void pispbe_schedule(struct pispbe_dev *pispbe, bool clear_hw_busy)
 		 * but we seem to survive...
 		 */
 		dev_dbg(pispbe->dev, "Bad job: invalid number of tiles: %u\n",
-			job.config->num_tiles);
-		job.config->num_tiles = 0;
+			job->config->num_tiles);
+		job->config->num_tiles = 0;
 	}

-	pispbe_queue_job(pispbe, &job);
+	pispbe_queue_job(pispbe, job);
+	kfree(job);

 	return;

@@ -871,7 +892,8 @@  static void pispbe_node_buffer_queue(struct vb2_buffer *buf)
 	 * Every time we add a buffer, check if there's now some work for the hw
 	 * to do.
 	 */
-	pispbe_schedule(pispbe, false);
+	if (!pispbe_prepare_job(pispbe))
+		pispbe_schedule(pispbe, false);
 }

 static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
@@ -897,7 +919,8 @@  static int pispbe_node_start_streaming(struct vb2_queue *q, unsigned int count)
 		node->pispbe->streaming_map);

 	/* Maybe we're ready to run. */
-	pispbe_schedule(pispbe, false);
+	if (!pispbe_prepare_job(pispbe))
+		pispbe_schedule(pispbe, false);

 	return 0;

@@ -949,6 +972,21 @@  static void pispbe_node_stop_streaming(struct vb2_queue *q)

 	spin_lock_irqsave(&pispbe->hw_lock, flags);
 	pispbe->streaming_map &= ~BIT(node->id);
+
+	/* Release all jobs once all nodes have stopped streaming. */
+	if (pispbe->streaming_map == 0) {
+		struct pispbe_job_descriptor *job;
+
+		do {
+			job = list_first_entry_or_null(&pispbe->job_queue,
+						struct pispbe_job_descriptor,
+						queue);
+			if (job) {
+				list_del(&job->queue);
+				kfree(job);
+			}
+		} while (!list_empty(&pispbe->job_queue));
+	}
 	spin_unlock_irqrestore(&pispbe->hw_lock, flags);

 	pm_runtime_mark_last_busy(pispbe->dev);
@@ -1691,6 +1729,8 @@  static int pispbe_probe(struct platform_device *pdev)
 	if (!pispbe)
 		return -ENOMEM;

+	INIT_LIST_HEAD(&pispbe->job_queue);
+
 	dev_set_drvdata(&pdev->dev, pispbe);
 	pispbe->dev = &pdev->dev;
 	platform_set_drvdata(pdev, pispbe);