Hybrid flow shop¶
If you’re using this notebook in Google Colab, be sure to install PyJobShop first by executing
pip install pyjobshopin a cell.
In this notebook, we demonstrate how to model and solve a hybrid flow shop (HFS) problem using PyJobShop.
The HFS is a common manufacturing environment in practice and widely studied by scheduling researchers. See for example the surveys by Ruiz and Vázquez-Rodríguez (2010) and Ribas et al. (2010) for an overview of HFS studies.
Problem description¶
The HFS problem is characterized as follows:
There is a set of \(n\) jobs that need to be processed in a series of stages.
The number of processing stages is at least 2.
Each stage has at least one machine in parallel, and in at least one stage there is more than one machine.
All jobs are processed in the same production order: production starts in stage 1, then stage 2, etc. until the last stage. A job can only start processing when it has finished processing in the previous stage.
The figure below illustrates a small HFS manufacturing environment. The first stage has three machines, the second stage has two machines, and the third stage has also two machines. Each machine name \(M_{kl}\) describes the stage \(k\) and \(l\)-th machine in the stage. The machines in each stage are identical, meaning that each machine has the same processing speed for a given job.
Stage 0 Stage 1 Stage 2
┌─────┐ ┌─────┐ ┌─────┐
│ M00 │───┬───▶│ M10 │───┬───▶│ M20 │
└─────┘ │ └─────┘ │ └─────┘
│ │
┌─────┐ │ ┌─────┐ │ ┌─────┐
│ M01 │───┼───▶│ M11 │───┴───▶│ M21 │
└─────┘ │ └─────┘ └─────┘
│
┌─────┐ │
│ M02 │───┘
└─────┘
We can model an HFS problem using PyJobShop. Define a task \(T_{jk}\) for each job \(j\) and each stage \(k\). Each task represents the processing of the given job at the given stage. We also need to make sure that the previous task \(T_{j, k-1}\) is processed before \(T_{jk}\) for \(k > 1\).
Let’s now implement the above example using PyJobShop.
Data¶
We generate random processing times for the example that we showed above.
[1]:
import random
num_jobs = 8
num_stages = 3
num_machines = [3, 2, 2]
random.seed(42)
PROCESSING_TIMES = {
(job, stage): random.randint(5, 15)
for job in range(num_jobs)
for stage in range(num_stages)
}
Model¶
Now that all our parameters are defined, we can start defining the model.
[2]:
from pyjobshop import Model
model = Model()
The following functions will nicely typeset the plot labels:
[3]:
def machine_name(stage: int, machine: int):
return f"$M_{{{stage}{machine}}}$"
def task_name(job: int, stage: int):
return f"$T_{{{job}{stage}}}$"
First define the machine environment, where we group the machines by their stage:
[4]:
machines = [
[model.add_machine(name=machine_name(k, m)) for m in range(num)]
for k, num in enumerate(num_machines)
]
Now, we need to define jobs and the tasks that have to be scheduled.
[5]:
jobs = [model.add_job() for _ in range(num_jobs)]
tasks = {}
for j, job in enumerate(jobs):
for k in range(num_stages):
tasks[j, k] = model.add_task(job, name=task_name(j, k))
Next, we have to add processing times for each task through defining the corresponding modes. Remember that each task can be processed on every machine in its given stage, so we have to define a mode for every machine.
[6]:
for j in range(num_jobs):
for k in range(num_stages):
for machine in machines[k]:
task = tasks[j, k]
duration = PROCESSING_TIMES[j, k]
model.add_mode(task, machine, duration)
The final restriction is to make sure that the previous task \(T_{j, k-1}\) is processed before \(T_{jk}\) for \(k > 1\), which can be achieved by adding precedence constraints.
[7]:
for j in range(num_jobs):
for k in range(num_stages - 1):
first = tasks[j, k]
second = tasks[j, k + 1]
model.add_end_before_start(first, second)
That’s it! We’ve completed the modeling of an HFS problem instance. Let’s now solve it and plot the Gantt chart.
[8]:
result = model.solve(display=False)
print(result)
Solution results
================
objective: 51.00
lower bound: 51.00
status: Optimal
runtime: 0.03 seconds
Let’s now plot the solution that we’ve found.
[9]:
from pyjobshop.plot import plot_machine_gantt
data = model.data()
plot_machine_gantt(result.best, model.data(), plot_labels=True)
Let’s visually check a few things:
Each job is scheduled in each stage exactly once.
For each job, tasks on consecutive machines maintain proper sequencing (one completes before the next begins).
Great! Let’s look at a more complex version of HFS next.
Transportation restrictions¶
In some manufacturing environments, there are physical restrictions on how jobs can move between machines. For example, conveyor belts or material handling systems may only connect specific machines.
Let’s introduce some transportation restrictions to the machine environment that we considered earlier:
Stage 0 Stage 1 Stage 2
┌─────┐ ┌─────┐ ┌─────┐
│ M00 │───────▶│ M10 │───────▶│ M20 │
└─────┘ └─────┘ └─────┘
┌─────┐ ┌─────┐ ┌─────┐
│ M01 │───┬───▶│ M11 │───────▶│ M21 │
└─────┘ │ └─────┘ └─────┘
│
┌─────┐ │
│ M02 │───┘
└─────┘
This is a three-stage flexible flow shop with dedicated routing constraints. Jobs scheduled on \(M_{00}\) can only be processed on \(M_{10}\) next, then must proceed to \(M_{20}\). Similarly, jobs on \(M_{01}\) or \(M_{02}\) can only move to \(M_{11}\), then to \(M_{21}\).
This creates two distinct processing paths:
\(M_{00} \rightarrow M_{10} \rightarrow M_{20}\)
\(\{M_{01}, M_{02}\} \rightarrow M_{11} \rightarrow M_{21}\)
To model such dependencies, we can use the ModeDependency constraint. If a mode is selected for one task, this constraint restricts which modes can be selected for another task.
Let’s start by copying the previous HFS example, where we also store all modes:
[10]:
model = Model()
machines = [
[model.add_machine(name=machine_name(k, m)) for m in range(num)]
for k, num in enumerate(num_machines)
]
jobs = [model.add_job() for _ in range(num_jobs)]
tasks = {}
modes = {}
for j, job in enumerate(jobs):
for k in range(num_stages):
task = model.add_task(job, name=task_name(j, k))
tasks[j, k] = task
for k in range(num_stages):
for m, machine in enumerate(machines[k]):
task = tasks[j, k]
duration = PROCESSING_TIMES[j, k]
mode = model.add_mode(task, machine, duration)
modes[j, k, m] = mode
for k in range(num_stages - 1):
first = tasks[j, k]
second = tasks[j, k + 1]
model.add_end_before_start(first, second)
Now we add the transportation restrictions using mode dependencies:
[11]:
transportation_connections = {
(0, 0): [(1, 0)], # M00 -> M10
(0, 1): [(1, 1)], # M01 -> M11
(0, 2): [(1, 1)], # M02 -> M11
(1, 0): [(2, 0)], # M10 -> M20
(1, 1): [(2, 1)], # M11 -> M21
}
for job_idx in range(num_jobs):
for stage in range(num_stages - 1):
next_stage = stage + 1
for from_machine_idx in range(num_machines[stage]):
from_key = (stage, from_machine_idx)
from_mode = modes[job_idx, stage, from_machine_idx]
if from_key in transportation_connections:
allowed_destinations = transportation_connections[from_key]
allowed_modes = [
modes[job_idx, to_stage, to_machine_idx]
for to_stage, to_machine_idx in allowed_destinations
if to_stage == next_stage
]
if allowed_modes:
model.add_mode_dependency(from_mode, allowed_modes)
[12]:
result = model.solve(display=False)
print(result)
Solution results
================
objective: 51.00
lower bound: 51.00
status: Optimal
runtime: 0.02 seconds
[13]:
plot_machine_gantt(result.best, model.data(), plot_labels=True)
We can verify that tasks belonging to the same job either follow path 1 (\(M_{00} \rightarrow M_{10} \rightarrow M_{20}\)) or path 2 (\(\{M_{01}, M_{02}\} \rightarrow M_{11} \rightarrow M_{21}\)), meaning that all transportation restrictions are respected.
Conclusion¶
This notebook demonstrated how to model and solve hybrid flow shop problems using PyJobShop. We showed how to model:
Classic HFS: Define tasks, modes, and precedence constraints to create a standard hybrid flow shop
HFS with transportation restrictions: Use mode dependencies to restrict which machines can be used in sequence, modeling physical transportation restrictions in manufacturing environments