本网站(662p.com)打包出售,且带程序代码数据,662p.com域名,程序内核采用TP框架开发,需要联系扣扣:2360248666 /wx:lianweikj
精品域名一口价出售:1y1m.com(350元) ,6b7b.com(400元) , 5k5j.com(380元) , yayj.com(1800元), jiongzhun.com(1000元) , niuzen.com(2800元) , zennei.com(5000元)
需要联系扣扣:2360248666 /wx:lianweikj
Python实现关键路径和七格图计算详解
kenrry1992 · 218浏览 · 发布于2023-03-15 +关注

这篇文章主要为大家详细介绍了如何利用Python实现关键路径和七格图计算,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起了解一下


关键路径计算是项目管理中关于进度管理的基本计算。 但是对于绝大多数同学来说, 关键路径计算都只是对一些简单情形的计算。

今天,田老师根据以往的经验,用Python实现了在相对较复杂的情形下:

1.关键路径计算

2.七格图计算,包括

  • 最早结束时间EFT

  • 最晚开始时间LST

  • 最晚结束时间LFT

  • 总浮动时间TF

  • 自由浮动时间FF

3.紧前关系绘图法(AON)

并且, 将输出结果保存为网页。 而且在网页底部附上了完整的计算过程。

这个Python程序的实现,可以帮助项目管理者更加方便地进行关键路径计算和进度管理。同时,通过输出结果保存为网页的形式,也可以方便地与团队成员进行共享和交流。

1.主程序

主程序主要实现了一个Project类,其中包含了计算关键路径和七格图的方法。具体实现方式如下:

1.  定义了一个Activity类,包含了活动的id、名称、持续时间和紧前任务列表等属性。 

2.  定义了一个Project类,包含了活动列表、项目持续时间、日志等属性,以及计算关键路径、计算七格图、计算总浮动时间、计算自由浮动时间等方法。

3.  从JSON文件中读取活动信息,并创建Project对象并添加活动。

4.  调用Project对象的calculate方法,计算每个活动的最早开始时间、最晚开始时间等数据。

5.  调用Project对象的calculate_critical_path方法,计算关键路径。

6.  调用Project对象的calculate_project_duration方法,计算项目总工期。

7.  使用Jinja2模板引擎生成项目的活动清单,并将关键路径

import json 
from datetime import datetime 
from typing import List 
   
import graphviz 
from jinja2 import Template 
   
from activity import Activity 
   
   
class Project: 
    def __init__(self): 
        self.activities: List[Activity] = [] 
        self.duration = 0 
        self.logger = [] 
   
    def log(self, log: str) -> None: 
        self.logger.append(log) 
   
    def add_activity(self, activity: Activity) -> None: 
        """ 
        添加一个活动到项目中 
        :param 
        activity: 待添加的活动 
        """        # 将活动添加到项目中 
        self.activities.append(activity) 
   
    def calculate(self) -> None: 
        """ 计算整个项目的关键信息 
        :return: None 
        """        self.calculate_successor() 
        self._calculate_forward_pass()  # 计算正推法 
        self._calculate_backward_pass()  # 计算倒推法 
        self._calculate_total_floats()  # 计算总浮动时间 
        self._calculate_free_floats()  # 计算自由浮动时间 
   
    def calculate_successor(self) -> None: 
        self.log("开始计算紧后活动") 
        for act in self.activities: 
            for pred in act.predecessors: 
                for act_inner in self.activities: 
                    if act_inner.id == pred: 
                        act_inner.successors.append(act.id) 
   
    def _calculate_forward_pass(self) -> None: 
        self.log("## 开始正推法计算") 
        # 进入 while 循环,只有当所有活动的最早开始时间和最早完成时间都已经计算出来时,
        才会退出循环 
        while not self._is_forward_pass_calculated(): 
            # 遍历每个活动 
            for activity in self.activities: 
                # 如果活动的最早开始时间已经被计算过,则跳过 
                if activity.est is not None: 
                    continue 
                # 如果活动没有前置活动, 则从1开始计算最早开始时间和最早结束时间 
                if not activity.predecessors: 
                    activity.est = 1 
                    activity.eft = activity.est + activity.duration - 1 
                    self.log( 
                        f"活动 {activity.name} 没有紧前活动,设定最早开始时间为1, 
                        并根据工期计算最早结束时间为{activity.eft}") 
                else: 
                    # 计算当前活动的所有前置活动的最早完成时间 
                    predecessors_eft = [act.eft for act in self.activities if 
                                        act.id in activity.predecessors and act.eft is not None] 
                    # 如果当前活动的所有前置活动的最早完成时间都已经计算出来,
                    则计算当前活动的最早开始时间和最早完成时间 
                    if len(predecessors_eft) == len(activity.predecessors): 
                        activity.est = max(predecessors_eft) + 1 
                        activity.eft = activity.est + activity.duration - 1 
                        self.log( 
                            f"活动 {activity.name} 紧前活动已完成正推法计算, 
                            开始日期按最早开始时间里面最大的," + 
                            f"设定为{activity.est}并根据工期计算最早结束时间为{activity.eft}") 
   
                        # 更新项目总持续时间为最大最早完成时间 
        self.duration = max([act.eft for act in self.activities]) 
   
    def _calculate_backward_pass(self) -> None: 
        """ 计算倒推法 
        :return: None 
        """ 
        self.log("## 开始倒推法计算")  # 输出提示信息 
        # 进入 while 循环,只有当所有活动的最晚开始时间和最晚完成时间都已经计算出来时,
        才会退出循环 
        while not self._is_backward_pass_calculated(): 
            # 遍历每个活动 
            for act in reversed(self.activities): 
                # 如果活动的最晚开始时间已经被计算过,则跳过 
                if act.lft is not None: 
                    continue 
                # 如果活动没有后继活动, 则从总持续时间开始计算最晚开始时间和最晚结束时间 
                if not act.successors: 
                    act.lft = self.duration 
                    act.lst = act.lft - act.duration + 1 
                    self.log(f"活动 {act.name} 没有紧后活动,按照正推工期设定最晚结束时间为
                    {act.lft}," + 
                             f"并根据工期计算最晚开始时间为{act.lst}") 
                else: 
                    # 计算当前活动的所有后继活动的最晚开始时间 
                    successors_lst = self._calculate_lst(act) 
                    # 如果当前活动的所有后继活动的最晚开始时间都已经计算出来,
                    则计算当前活动的最晚开始时间和最晚完成时间 
                    if len(successors_lst) == len(act.successors): 
                        act.lft = min(successors_lst) - 1 
                        act.lst = act.lft - act.duration + 1 
                        self.log(f"活动 {act.name} 紧后活动计算完成,
                        按照倒推工期设定最晚结束时间为{act.lft}," + 
                                 f"并根据工期计算最晚开始时间为{act.lst}") 
        # 更新项目总持续时间为最大最晚完成时间 
        self.duration = max([act.lft for act in self.activities]) 
   
    def _calculate_lst(self, activity: Activity) -> List[int]: 
        """计算某一活动的所有最晚开始时间 
        :param activity: 活动对象 
        :return: 最晚开始时间列表 
        """        rst = []  # 初始化结果列表 
        for act in activity.successors:  # 遍历该活动的后继活动 
            for act2 in self.activities:  # 遍历所有活动 
                if act2.id == act and act2.lst is not None: 
                 # 如果找到了该后继活动且其最晚开始时间不为空 
                    rst.append(act2.lst)  # 将最晚开始时间加入结果列表 
        return rst  # 返回结果列表 
   
    def _is_forward_pass_calculated(self) -> bool: 
        """ 判断整个项目正推法计算已经完成 
        :return: 若已计算正向传递则返回True,否则返回False 
        """ 
        for act in self.activities:  # 遍历所有活动 
            if act.est is None or act.eft is None:  # 如果该活动的最早开始时间或最早完成时间为空 
                return False  # 则返回False,表示还未计算正向传递 
        return True  # 如果所有活动的最早开始时间和最早完成时间都已计算,
        则返回True,表示已计算正向传递 
   
    def _is_backward_pass_calculated(self) -> bool: 
        """ 判断整个项目倒推法计算已经完成 
        :return: 若已计算倒推法则返回True,否则返回False 
        """        for act in self.activities:  # 遍历所有活动 
            if act.lst is None or act.lft is None:  # 如果该活动的最晚开始时间或最晚完成时间为空 
                return False  # 则返回False,表示还未计算倒推法 
        return True  # 如果所有活动的最晚开始时间和最晚完成时间都已计算,
        则返回True,表示已计算倒推法 
   
    def _calculate_total_floats(self) -> None: 
        """ 计算所有活动的总浮动时间 
        :return: None 
         """ 
        self.log(f"## 开始计算项目所有活动的总浮动时间") 
        for act in self.activities:  # 遍历所有活动 
            if act.est is not None and act.lst is not None:  
            # 如果该活动的最早开始时间和最晚开始时间都已计算 
                act.tf = act.lst - act.est  # 则计算该活动的总浮动时间 
                self.log(f"计算{act.name}的总浮动时间" + f"最晚开始时间
                {act.lst} - 最早开始时间{act.est} = {act.tf}", ) 
            else:  # 如果该活动的最早开始时间或最晚开始时间为空 
                act.tf = None  # 则将该活动的总浮动时间设为None 
   
    def _calculate_free_floats(self) -> None: 
        """ 计算所有活动的自由浮动时间 
        :return: None 
        """        self.log(f"## 开始计算项目所有活动的自由浮动时间")  # 输出提示信息 
        for act in self.activities:  # 遍历所有活动 
            if act.tf == 0:  # 如果该活动的总浮动时间为0 
                self.log(f"计算{act.name}的自由浮动时间" + f"因为{act.name}
                的总浮动时间为0,自由浮动时间为0")  # 输出提示信息 
                act.ff = 0  # 则将该活动的自由浮动时间设为0 
            elif act.tf > 0:  # 如果该活动的总浮动时间大于0 
                self.log(f"计算{act.name}的自由浮动时间")  # 输出提示信息 
                self.log(f"- {act.name}的总浮动时间{act.tf} > 0,")  # 输出提示信息 
                tmp = []  # 初始化临时列表 
                for act2 in self.activities:  # 遍历所有活动 
                    if act2.id in act.successors:  # 如果该活动是该活动的紧后活动 
                        self.log(f"- {act.name}的紧后活动{act2.name}的自由浮动动时间为
                        {act2.tf}")  # 输出提示信息 
                        tmp.append(act2.tf)  # 将该紧后活动的自由浮动时间加入临时列表 
                if len(tmp) != 0:  # 如果临时列表不为空 
                    act.ff = act.tf - max(tmp)  # 则计算该活动的自由浮动时间 
                    if act.ff < 0: 
                        act.ff = 0 
                    self.log(f"- 用活动自己的总浮动{act.tf}减去多个紧后活动总浮动的最大值
                    {max(tmp)} = {act.ff}") 
                else:  # 如果临时列表为空 
                    act.ff = act.tf  # 则将该活动的自由浮动时间设为总浮动时间 
   
    def calculate_critical_path(self) -> List[Activity]: 
        """ 计算整个项目的关键路径 
        :return: 整个项目的关键路径 
        """        ctc_path = []  # 初始化关键路径列表 
        for act in self.activities:  # 遍历所有活动 
            if act.tf == 0:  # 如果该活动的总浮动时间为0 
                ctc_path.append(act)  # 则将该活动加入关键路径列表 
        return ctc_path  # 返回关键路径列表 
   
    def calculate_project_duration(self) -> int: 
        """ 计算整个项目的持续时间 
        :return: 整个项目的持续时间 
        """        return max(activity.eft for activity in self.activities)  
        # 返回所有活动的最早完成时间中的最大值,即整个项目的持续时间 
   
   
# 从JSON文件中读取活动信息 
with open('activities.json', 'r', encoding='utf-8') as f: 
    activities_data = json.load(f) 
   
# 创建Project对象并添加活动 
project = Project() 
for activity_data in activities_data: 
    activity = Activity( 
        activity_data['id'], 
        activity_data['name'], 
        activity_data['duration'], 
        activity_data['predecessors'] 
    )    project.add_activity(activity) 
   
# 计算每个活动的最早开始时间、最晚开始时间等数据 
project.calculate() 
   
# 计算关键路径和项目总工期 
critical_path = project.calculate_critical_path() 
project_duration = project.calculate_project_duration() 
   
# 生成项目的活动清单 
with open('template.html', 'r', encoding='utf-8') as f: 
    template = Template(f.read()) 
html = template.render( 
    activities=project.activities, 
    critical_path=critical_path, 
    project_duration=project_duration, 
    log=project.logger 
) 
   
# 生成项目进度网络图 
aon_graph = graphviz.Digraph(format='png', graph_attr={'rankdir': 'LR'}) 
for activity in project.activities: 
    aon_graph.node(str(activity.id), activity.name) 
    for predecessor in activity.predecessors: 
        aon_graph.edge(str(predecessor), str(activity.id)) 
   
timestamp = datetime.now().strftime('%Y%m%d%H%M%S') 
   
aon_filename = f"aon_{timestamp}" 
   
aon_graph.render(aon_filename) 
   
# 将项目进度网络图插入到HTML文件中 
aon_image = f'<img src="{aon_filename}.png" alt="Precedence Diagramming Method: AON">' 
html = html.replace('<p>Precedence Diagramming Method: AON: <br/>[image]</p>', 
                    '<p>紧前关系绘图法: AON: <br/>' + aon_image + '</p>') 
   
filename = datetime.now().strftime('%Y%m%d%H%M%S') + '.html' 
with open(filename, 'w', encoding='utf-8') as f: 
    f.write(html)

2.活动类

程序名:activity.py

class Activity: 
    """ 
    活动类,用于表示项目中的一个活动。 
       Attributes:        id (int): 活动的唯一标识符。 
        name (str): 活动的名称。 
        duration (int): 活动的持续时间。 
        predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。 
        est (int): 活动的最早开始时间。 
        lst (int): 活动的最晚开始时间。 
        eft (int): 活动的最早完成时间。 
        lft (int): 活动的最晚完成时间。 
        tf (int): 活动的总浮动时间。 
        ff (int): 活动的自由浮动时间。 
        successors (List[int]): 活动的后继活动列表,存储后继活动的Activity对象。 
    """ 
    def __init__(self, id: int, name: str, duration: int, predecessors: List[int]): 
        """ 
        初始化活动对象。 
           Args:            id (int): 活动的唯一标识符。 
            name (str): 活动的名称。 
            duration (int): 活动的持续时间。 
            predecessors (List[int]): 活动的前置活动列表,存储前置活动的id。 
        """        self.id = id 
        self.name = name 
        self.duration = duration 
        self.predecessors = predecessors 
        self.est = None 
        self.lst = None 
        self.eft = None 
        self.lft = None 
        self.tf = None 
        self.ff = None 
        self.successors = [] 
       def __str__(self): 
        return f"id: {self.id}, name: {self.name}, est: {self.est}, lst: {self.lst}, 
        eft: {self.eft}, lft: {self.lft}," 
        + f"successors: {self.successors}"

3.任务列表JSON文件

文件名:activities.json

[ 
  {    "id": 1, 
    "name": "A", 
    "duration": 2, 
    "predecessors": [] 
  }, 
  { 
    "id": 9, 
    "name": "A2", 
    "duration": 3, 
    "predecessors": [] 
  }, 
    { 
    "id": 10, 
    "name": "A3", 
    "duration": 2, 
    "predecessors": [] 
  }, 
  { 
    "id": 2, 
    "name": "B", 
    "duration": 3, 
    "predecessors": [ 
      1, 
      9 
    ] 
  }, 
  { 
    "id": 3, 
    "name": "C", 
    "duration": 4, 
    "predecessors": [ 
      1 
    ] 
  }, 
  { 
    "id": 4, 
    "name": "D", 
    "duration": 2, 
    "predecessors": [ 
      2,10 
    ] 
  }, 
  { 
    "id": 5, 
    "name": "E", 
    "duration": 3, 
    "predecessors": [ 
      2 
    ] 
  }, 
  { 
    "id": 6, 
    "name": "F", 
    "duration": 2, 
    "predecessors": [ 
      3 
    ] 
  }, 
  { 
    "id": 7, 
    "name": "G", 
    "duration": 3, 
    "predecessors": [ 
      4, 
      5 
    ] 
  }, 
  { 
    "id": 8, 
    "name": "H", 
    "duration": 2, 
    "predecessors": [ 
      6, 
      7 
    ] 
  }, 
  { 
    "id": 11, 
    "name": "H2", 
    "duration": 4, 
    "predecessors": [ 
      6, 
      7 
    ] 
  } 
]

4.输出模板文件

<!DOCTYPE html> 
<html> 
<head> 
    <meta charset="UTF-8"> 
    <title>PMP关键路径计算</title> 
    <style>        table { 
            border-collapse: collapse; 
            width: 100%; 
        } 
           th, td { 
            border: 1px solid black; 
            padding: 8px; 
            text-align: center; 
        } 
           th { 
            background-color: #4CAF50; 
            color: white; 
        } 
           .critical { 
            background-color: #ffcccc; 
        } 
    </style> 
</head> 
<body> 
<h1>活动清单</h1> 
<table> 
    <tr> 
        <th>ID</th> 
        <th>活动名</th> 
        <th>持续时间</th> 
        <th>紧前活动</th> 
        <th>紧后活动</th> 
        <th>最早开始时间EST</th> 
        <th>最早结束时间EFT</th> 
        <th>最晚开始时间LST</th> 
        <th>最晚结束时间LFT</th> 
        <th>总浮动时间TF</th> 
        <th>自由浮动时间FF</th> 
    </tr> 
    {% for activity in activities %} 
    <tr {% if activity in critical_path %}class="critical" {% endif %}> 
        <td>{{ activity.id }}</td> 
        <td>{{ activity.name }}</td> 
        <td>{{ activity.duration }}</td> 
        <td>            {% for predecessor in activity.predecessors %} 
            {% for act in activities %} 
            {% if act.id == predecessor %} 
            {{ act.name }} 
            {% endif %} 
            {% endfor %} 
            {% if not loop.last %}, {% endif %} 
            {% endfor %} 
        </td> 
        <td>            {% for successor in activity.successors %} 
            {% for act in activities %} 
               {% if act.id == successor %} 
            {{ act.name }} 
            {% endif %} 
            {% endfor %} 
            {% if not loop.last %}, {% endif %} 
            {% endfor %} 
        </td> 
        <td>{{ activity.est }}</td> 
        <td>{{ activity.eft }}</td> 
        <td>{{ activity.lst }}</td> 
        <td>{{ activity.lft }}</td> 
        <td>{{ activity.tf }}</td> 
        <td>{{ activity.ff }}</td> 
    </tr> 
    {% endfor %} 
</table> 
<p>关键路径是: {% for activity in critical_path %}{{ activity.name }}
{% if not loop.last %} -> {% endif %}{% endfor 
    %}</p> 
<p>项目总工期: {{ project_duration }}</p> 
<p>Precedence Diagramming Method: AON: <br/>[image]</p> 
<p> 
<table> 
    <tr> 
        <th>执行过程</th> 
    </tr> 
    {% for i in log %} 
    <tr> 
        <td style="text-align: left">{{i}}</td> 
    </tr> 
    {% endfor %} 
</table> 
</p> 
</body> 
</html>


相关推荐

PHP实现部分字符隐藏

沙雕mars · 1325浏览 · 2019-04-28 09:47:56
Java中ArrayList和LinkedList区别

kenrry1992 · 908浏览 · 2019-05-08 21:14:54
Tomcat 下载及安装配置

manongba · 970浏览 · 2019-05-13 21:03:56
JAVA变量介绍

manongba · 962浏览 · 2019-05-13 21:05:52
什么是SpringBoot

iamitnan · 1086浏览 · 2019-05-14 22:20:36
加载中

0评论

评论
分类专栏
小鸟云服务器
扫码进入手机网页