Python算法实现与数据处理技巧解析

内容分享8小时前发布
1 0 0

table {
border-collapse: collapse;
width: 100%;
margin-bottom: 1rem;
}
th, td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
th {
background-color: #f2f2f2;
}
tr:nth-child(even) {
background-color: #f9f9f9;
}
pre {
background-color: #f8f8f8;
padding: 15px;
border-radius: 4px;
overflow-x: auto;
}

1、考虑以下算法:算法要求集合� = {�0, �1, �2, …, ��} 且 �≥1,同时有一个函数 �(�) 用于对每个值进行偏移和缩放。算法步骤如下:1: �←0;2: �←0;3: 当 �≠� 时执行;4: �←� + �(��);5: �←� + 1;6: 结束循环;7: �←�/�。有三种用Python编写此算法的方式:• 作为更新有状态变量的for语句;• 作为生成器表达式;• 作为应用函数的map()操作。请用Python写出这三种版本。测试数据如下:�←{7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73},缩放函数为:�(�)=(� – 7.5)/2.031。计算得到的 � 值约为零。

以下是三种Python实现方式:

作为更新有状态变量的for语句:


V = [7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73]
def f(v): return (v - 7.5) / 2.031
s = 0
for v in V: s = s + f(v)
m = s / len(V)
print(m)

作为生成器表达式:


V = [7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73]
def f(v): return (v - 7.5) / 2.031
m = sum(f(v) for v in V) / len(V)
print(m)

作为应用函数的map()操作:


V = [7.46, 6.77, 12.74, 7.11, 7.81, 8.84, 6.08, 5.39, 8.15, 6.42, 5.73]
def f(v): return (v - 7.5) / 2.031
m = sum(map(f, V)) / len(V)
print(m)

2、将表达式 dict((c.name, c) for c in palette) 重写为字典推导式。

{c.name: c for c in palette}

3、给定五张牌,牌面的值为1到13(包含1和13)。这五张牌有多种分组方式。我们感兴趣的手牌类型如下:1. 五张牌完全相同。2. 五张牌中有四张相同。3. 五张牌中有三张相同。与扑克不同的是,不考虑另外两张牌是否相同。4. 有两组不同的对子。5. 两张牌相同,形成一对。6. 没有牌相同。现在需要一个函数,输入是一个包含五个值(值在1到13之间)的列表,输出应该是一个数字代码,表示所找到的六种手牌类型中的哪一种。


该函数的大致算法如下:

**算法 7** 对手牌进行分类

要求: = {0, 1, 2, 3, 4}

1: 如果  中五张牌值相同,则返回 1  
2: 否则,如果  中有四张牌值相同,则返回 2  
3: 否则,如果  中有三张牌值相同,则返回 3  
4: 否则,如果  中有两种不同的值,每种值各有两张,则返回 4  
5: 否则,如果  中有两张牌值相同,则返回 5  
6: 否则返回 6  
7: 结束条件判断  

提示:对于更通用的扑克牌手牌识别,将牌值按升序排序会有帮助。对于这个简化算法,将列表转换为 `Counter` 对象并检查各种牌面等级的频率会有所帮助。`Counter` 类在 `collections` 模块中定义,该模块还有许多其他有用的集合类。

4、在继续之前,要么完成上一个练习,要么设计一个可行的方案来解决上一个练习。然后,提供一个能妥善处理额外或不同状态消息(如新增的“降级”状态)的实现。目标是将状态消息检查隔离到一个易于替换的函数中。


可以通过编写几个评估状态值的函数,如 `is_stopped()`、`is_degraded()` 和 `is_working()`,当需要更改时,编写新函数(如 `is_degraded_2()`)来替代旧函数。创建一个应用程序,不更改任何特定函数的实现,而是添加新函数,新函数将重用现有函数和新函数来完成扩展目标。

对于状态消息检查,可将其封装在函数中,方便替换。

两种可能的实现方式:

1. 一是编写四个过滤函数,应用于状态值序列并统计匹配数量,据此决定系统整体健康的三种响应;
2. 二是编写映射应用严重程度数字,向量的最大值为系统整体健康值。

需实现所有变体并比较代码的清晰度和表达性。

5、实现fib()函数的两个版本(朴素版本和使用缓存的版本),并描述缓存在计算大斐波那契数时对所需时间的影响。

可实现如下两个版本的

fib()

函数:


朴素版本


未使用缓存,每次计算斐波那契数时都需重新计算之前的数,复杂度约为O(2ⁿ)。


使用缓存的版本


借助

functools.cache


functools.lru_cache

装饰器,将之前计算的结果存储在缓存中,再次遇到相同参数时直接从缓存获取结果,避免重复计算。


fib(20)

为例,执行1000次的计时结果显示:

朴素版本耗时

3.23秒

使用缓存的版本耗时

0.0779秒

可见,缓存能显著减少计算大斐波那契数所需的时间,提升性能。

注意事项

使用缓存时需注意:

若使用

timeit

模块测试带缓存的函数,需在每次使用前清除缓存,否则后续迭代会直接从缓存获取结果,计时几乎为零。

可通过函数的

cache_clear()

方法清除缓存。

6、将

all_print()

函数重构为两个函数:一个通用的目录遍历函数,该函数对每个具有预期后缀的文本文件应用一个函数并对结果求和;另一个函数用于计算给定 Python 文件中“print”的出现次数。

以下是重构后的代码:


from collections import deque
from pathlib import Path

def count_print_in_py_file(file_path: Path) -> int:
    return file_path.read_text().count("print")

def traverse_directory(start: Path, suffix: str, func) -> int:
    count = 0
    pending: deque[Path] = deque([start])
    while pending:
        dir_path = pending.pop()
        for path in dir_path.iterdir():
            if path.is_file():
                if path.suffix == suffix:
                    count += func(path)
            elif path.is_dir():
                if not path.stem.startswith('.'):
                    pending.append(path)
    return count

可以使用以下方式调用这些函数:


start_directory = Path('.')
result = traverse_directory(start_directory, '.py', count_print_in_py_file)
print(result)

7、列出必须用类似以下示例代码实现的方法列表:def some_method(self,

args: Any,


kwargs: Any) -> None: if self.frozen: raise RuntimeError(“mapping is frozen”) else: super.some_method(

args, **kwargs)。评论拥有冻结映射的价值。将实现这个类所需的工作与有状态字典可能带来的混淆进行对比。为编写这个类或搁置该想法并寻找更好的解决方案提供成本效益分析。请记住,字典键查找非常快,依靠哈希计算而不是冗长的搜索。

需实现的方法列表

要确定需用给定示例代码包装的方法,需关注那些会改变字典内部状态的方法,比如

__setitem__


__delitem__


update


setdefault


pop


popitem

等。这些方法在字典处于“冻结”状态时不应被调用。

冻结映射的价值


避免意外修改

:在很多场景中,字典一旦创建就不需要再改变,像存储翻译映射的字典。冻结映射能防止代码中意外修改字典内容,增强代码的稳定性与可预测性。


简化状态管理

:有状态字典可能因状态改变产生复杂问题,而冻结映射只有一个不变状态,能简化状态管理。

实现类所需工作与有状态字典的混淆对比


实现类所需工作

:要实现一个冻结字典类,需要找出所有会改变字典状态的方法,并用示例代码包装这些方法,还要添加“冻结”状态管理逻辑,这需要一定的工作量且容易出错。


有状态字典的混淆

:有状态字典可能因状态改变产生复杂问题,比如在不恰当的时候修改字典,导致程序出现难以调试的错误。

成本效益分析


编写这个类的理由

:如果应用程序中有大量字典在创建后不需要改变,使用冻结字典类可以避免意外修改,提高代码的稳定性和可维护性。虽然实现这个类需要一定的工作量,但从长期来看,能减少调试和维护的成本。


搁置该想法并寻找更好解决方案的理由

:如果应用程序中大部分字典都需要频繁修改,实现冻结字典类的工作量可能大于其带来的好处。此时,可以考虑使用普通字典,并在代码中小心管理状态,或者寻找其他更合适的数据结构。

8、比较 string.join() 和 reduce() 两种方法在字符串拼接上的效率。使用 timeit 模块找出哪种方法更高效,并计算出这两种方法的时间比例。同时,观察随着字符串集合大小的变化,这两种方法的性能如何改变。构建一个小模块,用大小为 100、200、300、…、900 的字符串集合来进行测试,以此观察字符串拼接工作随字符串数量的变化情况。


reduce(operator.add, list_of_strings, "") 会计算大量中间字符串对象,是相对昂贵的操作,而 `"".join(list_of_strings)` 这种方式更高效。可使用 `timeit` 模块收集数据并分析这两种方法的效率差异、时间比例以及随字符串集合大小变化的性能表现。可构建一个小模块,使用大小为 100、200、300、...、900 的字符串集合进行测试。

9、有一个包含长途旅行一系列航路点的KML文件,名为Winter 2012 – 2013.kml。有一个基础的row_iter_kml()函数,它为旅程中的每个航路点生成一系列列表[str]对象。为了使航路点有用,必须成对处理它们。toolz.itertoolz.sliding_window()函数和itertools.pairwise()函数都可以将简单序列分解为成对。还有一个distance()函数,用于计算两个航路点之间足够接近的距离,但该函数是为处理复杂的NamedTuple对象而设计的。现在要求重新设计并重新实现这个distance函数,使其能够处理用包含“latitude”和“longitude”键的字典表示的点。源数据解析的基础是row_iter_kml()函数,它依赖于底层的xml.etree模块,该函数将每个航路点转换为字符串序列。重新设计源数据解析以使用toolz包,一般处理可以使用toolz.functoolz.pipe将源字符串转换为更有用的结果字典,确保将纬度和经度值转换为带正确符号的浮点值。重新设计后,比较和对比这两种实现,判断哪种看起来更清晰简洁。使用timeit模块比较性能,看看是否有特定的性能优势。

本题要求对航路点数据处理进行一系列操作,包括:

重新设计并实现

distance

函数,使其能处理以字典形式(含

"latitude"


"longitude"

键)表示的点;

重新设计源数据解析,使用

toolz

包和

toolz.functoolz.pipe

将源字符串转换为更有用的字典,并将纬度和经度值转换为带正确符号的浮点值;

比较新旧两种实现方式,判断哪种更清晰简洁;

使用

timeit

模块比较两种实现的性能。

10、航点计算练习处理包含多个航点的文件,航点连接形成从起点到终点的行程,也可将航点视为具有纬度和经度的孤立位置样本。根据最大和最小纬度以及最大和最小经度可计算出一个简单边界,该边界实际是越靠近极点越窄的梯形。现在需要一个类似于航点计算练习中描述的解析管道,且航点不必成对组合。界定航程有以下几种方法:一是给定航程的边界,为边界梯形的四个角定义四个点,用这四个点确定行程的中点;二是给定两个纬度和经度序列,计算平均纬度和平均经度;三是给定两个纬度和经度序列,计算中位纬度和中位经度。确定边界和中心的替代方案后,使用等距矩形距离计算来确定行程中最接近中心的点。请简述整个航点处理及航程界定的流程。

航点处理相关内容


航点连接

可将航点连接成完整的行程

也可将航点单独作为孤立位置样本处理


边界计算

可根据经纬度极值计算边界

实际形成的边界为梯形


航程界定方法

需要类似解析管道的处理方式

航点无需成对存在

确定梯形四角点后找中点

计算平均经纬度

计算中位经纬度


中心点确定与匹配

在边界和中心确定后

使用等距矩形距离计算方法

寻找行程中最接近中心的点

11、编写一个

bd_datetime()

函数,尝试多种日期格式转换,寻找能产生有效日期的格式。若年份缺失,可使用

datetime.replace()

方法以当前年份构建最终日期结果。基本实现完成后,创建包含有效和无效日期的适当测试用例。确保设计足够灵活,以便在添加另一种格式时不会太困难。

以下是实现

bd_datetime()

函数及测试用例的 Python 代码:


import datetime

# 定义可能的日期格式
DATE_FORMATS = ["%Y-%b-%d", "%b-%d", "%Y-%m-%d"]

def bd_datetime(date_str):
    current_year = datetime.datetime.now().year
    for fmt in DATE_FORMATS:
        try:
            dt = datetime.datetime.strptime(date_str, fmt)
            if "%Y" not in fmt:
                dt = dt.replace(year=current_year)
            return dt
        except ValueError:
            continue
    return None

# 测试用例
valid_dates = ["2024-Jan-01", "Jan-01", "2024-01-01"]
invalid_dates = ["2024/01/01", "abc"]

# 测试有效日期
for date in valid_dates:
    result = bd_datetime(date)
    if result:
        print(f"{date} 转换成功: {result}")
    else:
        print(f"{date} 转换失败")

# 测试无效日期
for date in invalid_dates:
    result = bd_datetime(date)
    if result:
        print(f"{date} 转换成功: {result}")
    else:
        print(f"{date} 转换失败")

此代码定义了

bd_datetime()

函数,它会尝试多种日期格式转换,若年份缺失则使用当前年份。同时提供了包含有效和无效日期的测试用例,且设计灵活,添加新格式只需在

DATE_FORMATS

列表中添加即可。

12、重写 clean_list() 函数以消除递归。clean_list() 函数定义为 def clean_list(text: str, *, to_replace: tuple[str, …]) -> str:,它原本有尾调用递归,规则如下:当 to_replace 参数值为空时,无需替换,文本参数的值将保持不变返回;否则,遍历 to_replace 中的每个字符,将文本参数中该字符的所有出现移除。现在要求重写该函数,消除递归。


def clean_list(text: str, *, to_replace: tuple[str, ...]) -> str:
    for char in to_replace:
        text = text.replace(char, "")
    return text

13、编写一个装饰器,用于处理可接收 float 类型参数并返回 float 类型结果的函数,使其能够优雅地处理 None 值。如果这个无值容忍装饰器名为 @none_tolerant,以下是一个测试用例:@none_tolerant def x2(x: float) -> float: return 2 * x def test_x2() -> None: assert x2(42.) == 84.0 assert x2(None) == None assert list(map(x2, [1, 2, None, 3])) == [2, 4, None, 6]

以下是实现

@none_tolerant

装饰器的代码:


from collections.abc import Callable
from functools import wraps

def none_tolerant(function: Callable[[float], float]) -> Callable[[float | None], float | None]:
    @wraps(function)
    def null_wrapper(value: float | None) -> float | None:
        return None if value is None else function(value)
    return null_wrapper

@none_tolerant
def x2(x: float) -> float:
    return 2 * x

def test_x2() -> None:
    assert x2(42.) == 84.0
    assert x2(None) == None
    assert list(map(x2, [1, 2, None, 3])) == [2, 4, None, 6]
    print('All test cases passed!')

test_x2()

上述代码定义了

none_tolerant

装饰器,它接受一个函数作为参数,并返回一个新的函数

null_wrapper


null_wrapper

函数会检查传入的值是否为

None

,如果是则返回

None

,否则调用原函数。

14、首先,定义一些函数来复制目录。需要定义单独的函数来完成以下状态更改:创建一个新的空目录;将文件从源目录的某个位置复制到目标目录。利用 pathlib.Path.glob() 方法提供的目录内容视图,编写一个整体函数来调用上述两个函数,以创建子目录并将文件复制到其中。其次,定义一个装饰器,在进行试运行时阻止操作,并将其应用于目录创建函数和文件复制函数。第三,创建一个合适的单元测试,以确认试运行模式只是走过场,不会改变底层文件系统。使用 pytest.tmp_path 夹具提供一个临时工作目录,避免在调试时不断删除和重新创建输出目录。

文件系统操作编程任务

这是一个关于文件系统操作的编程任务,步骤如下:

定义复制目录所需的函数,包括创建新空目录和复制文件的函数,可利用

pathlib.Path.glob()

方法辅助实现整体复制功能。

定义装饰器,在“dry-run”(试运行)模式下阻止实际操作,并将该装饰器应用于目录创建和文件复制函数。

编写单元测试,使用

pytest.tmp_path

夹具提供临时工作目录,确保“dry-run”模式不改变底层文件系统。

15、在处理大量数据时,当前设计是将每个文件分配给一个单独的工作进程。为确定这是否是合适的粒度级别,可考虑实现以下替代设计并比较吞吐量: • 创建两个工作进程池:一个池读取文件并以1024行为一块返回行数据。第二个工作进程池包含

analysis()

函数的主要部分。这个第二个池中的工作进程会解析块中的每一行以创建一个

Access

对象,创建一个

AccessDetails

对象,应用过滤器并总结结果。这导致了两级映射,将工作从解析工作进程传递给分析工作进程。 • 将10MB的日志文件分解成更小的尺寸。编写一个应用程序来读取一个日志文件并写入新文件,每个新文件限制为4096个单独的日志条目。将分析应用程序应用于这个更大的小文件集合,而不是原始大日志文件的较小集合。 • 分解

analysis()

函数以使用三个单独的工作进程池。一个池解析文件并返回

Access

对象块。另一个池将

Access

对象转换为

AccessDetails

对象。第三个工作进程池应用过滤器并总结

AccessDetails

对象。请总结使用不同处理管道分析大量数据的结果。

数据分析处理管道设计与优化

在使用不同处理管道分析大量数据时,可以考虑多种替代设计来确定合适的粒度级别。以下是一些可行的方案:

创建两个工作进程池:

一个用于读取文件并返回行数据

另一个用于进行数据分析

将大日志文件分解成小文件后再进行分析

分解

analysis()

函数,使用三个工作进程池分别进行:

解析

转换

总结

优化注意事项

在设计处理管道时,应注意以下优化策略:


关注映射

:合理分配 I/O 操作到更多的核心上


减少数据传递

:尽量减少进程间传递的数据量


基准测试

:通过实验确定资源的最优分配和设计

初步测试结果

不同方法的处理时长如下所示:

concurrent.futures/threadpool

: 106.58 秒

concurrent.futures/processpool

: 40.81 秒

multiprocessing/imap_unordered

: 27.26 秒

multiprocessing/map_async

: 27.45 秒

16、在示例中,要求对响应进行扩展,使结果数据除了可以序列化为 CSV 和 JSON 之外,还能序列化为 XML。请说明实现 XML 序列化的方法,以及添加 XML 序列化格式时需要额外做什么?

有两种实现方式:

下载安装能序列化

Series


Pair

对象的库

编写可处理

list[dict[str, Any]]

对象的函数

同时,添加 XML 序列化格式时,需添加测试用例来确保响应格式和内容符合预期。

17、在示例中扩展响应,要求除了将结果数据序列化为 CSV 和 JSON 格式外,还要将其序列化为 HTML 格式。已知 HTML 序列化比 XML 序列化更复杂,原因是数据的 HTML 呈现存在较多开销。通常不呈现 Pair 对象,而是要包含一个完整的 HTML 表格结构,该结构与 CSV 的行和列相对应。此外,添加 HTML 序列化格式时需要做什么?

在相关示例里,要把响应拓展为除了将结果数据按 CSV 和 JSON 格式序列化外,也按 HTML 格式序列化。由于数据的 HTML 展示有较多开销,HTML 序列化比 XML 序列化更复杂。通常不呈现

Pair

对象,而是包含完整的、与 CSV 行列对应的 HTML 表格结构。并且添加 HTML 序列化格式时,需添加测试用例来确保响应格式和内容符合预期。

© 版权声明

相关文章

暂无评论

none
暂无评论...