Python é amado por sua flexibilidade, tipagem dinâmica e vasta gama de bibliotecas. No entanto, essa liberdade pode se tornar um pesadelo para engenheiros e arquitetos de segurança, especialmente em sistemas de alta carga com requisitos de segurança rigorosos. Este artigo discute como interceptar a execução do código Python, proibir chamadas perigosas e construir um sistema de contratos sem modificar o código-fonte.
A Dinâmica Ruim
Vamos começar com alguns exemplos que demonstram os problemas da execução dinâmica:
-
Execução de código arbitrário:
pythonuser_input = "import os; os.system('rm -rf /')" eval(user_input)Como podemos ver,
eval()executa o conteúdo deuser_inputsem verificação. Nunca faça isso! -
Substituição de métodos em tempo de execução:
pythonimport requests requests.get = lambda *args, **kwargs: print("Interceptado!")Aqui, substituímos o método padrão
requests.getpor nossa própria função lambda. Isso afeta todas as importações derequestsem nosso aplicativo e em quaisquer bibliotecas de terceiros que o utilizem. Outras partes do programa, esperando uma resposta real do servidor, de repente começarão a receberNone(porqueprintretornaNone). -
Acesso não autorizado ao sistema de arquivos:
pythondef read_sensitive_data(): with open('/etc/passwd', 'r') as f: return f.read()Em scripts domésticos, isso pode ser feito, mas em produção, definitivamente não deve ser.
O que podemos oferecer para combater isso? A primeira coisa que vem à mente é a boa e velha análise estática. No entanto, essa ferramenta não protege muito contra esses erros. O fato é que ela verifica os tipos, mas não controla o que acontecerá durante a execução. E para resolver esse problema, precisamos de mecanismos mais profundos.
Por exemplo, vamos considerar algumas ferramentas padrão e suas desvantagens. Assim, mypy verifica os tipos de argumentos e valores de retorno. Ao mesmo tempo, o utilitário não verifica a segurança das chamadas e os possíveis efeitos colaterais. A ferramenta pylint verifica o estilo do código e possíveis bugs. Por sua vez, o scanner bandit procura por vulnerabilidades conhecidas (segredos hardcoded, eval).
Quero meu próprio analisador
Mas o que fazer se não tivermos as capacidades oferecidas pelas ferramentas prontas? Nesse caso, podemos desenvolver nosso próprio analisador, por exemplo, com base no módulo ast.
Este módulo permite analisar a estrutura do código sem executá-lo. Aqui está um pequeno exemplo:
pythonimport ast import sys class SecurityAnalyzer(ast.NodeVisitor): """Analisador de segurança que proíbe padrões perigosos""" def __init__(self): self.violations = [] self.forbidden_functions = {'eval', 'exec', 'compile', '__import__'} self.forbidden_attrs = {'os.system', 'subprocess.Popen', 'os.remove'} def visit_Call(self, node): # Proibição da chamada eval/exec if isinstance(node.func, ast.Name): if node.func.id in self.forbidden_functions: self.violations.append({ 'line': node.lineno, 'message': f"Chamada {node.func.id}() proibida", 'severity': 'CRITICAL' }) # Proibição de métodos perigosos elif isinstance(node.func, ast.Attribute): attr_path = self._get_attr_path(node.func) if attr_path in self.forbidden_attrs: self.violations.append({ 'line': node.lineno, 'message': f"Chamada {attr_path} proibida", 'severity': 'HIGH' }) self.generic_visit(node) def visit_Import(self, node): # Verificação de importações for alias in node.names: if alias.name in ['os', 'subprocess', 'socket', 'ctypes']: self.violations.append({ 'line': node.lineno, 'message': f"Importação perigosa: {alias.name}", 'severity': 'MEDIUM' }) def visit_ImportFrom(self, node): self.visit_Import(node) def _get_attr_path(self, attr_node): """Reconstrução recursiva do nome completo do atributo (a.b.c)""" if isinstance(attr_node, ast.Attribute): return f"{self._get_attr_path(attr_node.value)}.{attr_node.attr}" elif isinstance(attr_node, ast.Name): return attr_node.id return "" def analyze_file(self, filename): with open(filename, 'r') as f: tree = ast.parse(f.read(), filename=filename) self.visit(tree) return self.violations # Uso if __name__ == "__main__": analyzer = SecurityAnalyzer() violations = analyzer.analyze_file("target_script.py") for v in violations: print(f"[{v['severity']}] Linha {v['line']}: {v['message']}") if violations: sys.exit(1) # Bloqueia a execução
Vamos ver o que esse código faz. O arquivo target_script.py passado para análise é lido e convertido em uma AST - uma estrutura em árvore, onde cada nó representa um elemento de código (função, chamada, importação, etc.). Em seguida, a árvore é percorrida usando o analisador SecurityAnalyzer, que chama os métodos apropriados (visit_Call, visit_Import, etc.) para cada nó.
Dependendo de quais inconsistências são encontradas, os níveis de criticidade são atribuídos a elas. Assim, funções proibidas, como eval, exec, etc., receberão CRITICAL, métodos perigosos (os.system, etc.) - HIGH, e importações perigosas (os, subprocess, etc.) - MEDIUM. Todos os problemas encontrados são salvos em self.violations e impressos no formato [NÍVEL] Linha X: Mensagem. Se houver violações, o script é encerrado com o código 1 (erro).
Assim, para o trecho de código ruim fornecido como exemplo:
pythonimport os eval("print('Hello')") os.system("rm -rf /")
Obteremos o seguinte relatório:
[MEDIUM] Linha 1: Importação perigosa: os
[CRITICAL] Linha 2: Chamada eval() proibida
[HIGH] Linha 3: Chamada os.system proibida
Assim, podemos realizar a análise do código-fonte por conta própria. No entanto, apenas a análise estática não é suficiente, pois o código pode ser gerado dinamicamente, carregado de plugins, etc. Aqui, os interceptadores de execução vêm em nosso auxílio. Assim, a função settrace permite definir um manipulador que é chamado em cada evento de execução.
pythonimport sys import dis from typing import Optional, Any, Dict, Set from functools import wraps class ExecutionMonitor: """ Monitoramento da execução do código com proteção contra operações perigosas. Versão corrigida com o funcionamento correto de todos os componentes. """ def __init__(self): self.calls_count: int = 0 self.max_depth: int = 1000 self.is_tracing: bool = False self.forbidden_functions: Set[str] = {'eval', 'exec', '__import__', 'compile'} self.forbidden_modules: Set[str] = {'os', 'subprocess', 'socket', 'sys', 'shutil'} def trace_calls(self, frame, event: str, arg: Any) -> Optional[callable]: """ Manipulador de rastreamento. Retorna a si mesmo para continuar o monitoramento. """ if event == 'call': # Proteção contra loop recursivo do rastreador if self.is_tracing: return self.trace_calls self.is_tracing = True try: # Verificação da profundidade da recursão self.calls_count += 1 if self.calls_count > self.max_depth: raise RecursionError( f"Profundidade máxima de chamadas excedida ({self.max_depth})" ) # Verificamos o nome da função chamada func_name = frame.f_code.co_name # Verificação de funções perigosas if func_name in self.forbidden_functions: raise RuntimeError( f"Chamada de função perigosa proibida: {func_name}()" ) # Verificação de módulos perigosos (se uma função for importada deles) if func_name == '__import__': # No contexto da importação, você pode verificar os argumentos local_vars = frame.f_locals if 'name' in local_vars and local_vars['name'] in self.forbidden_modules: raise RuntimeError( f"Importação do módulo proibida: {local_vars['name']}" ) # Log da chamada (opcional) # print(f"[CALL] {func_name} em {frame.f_code.co_filename}:{frame.f_lineno}") finally: self.is_tracing = False elif event == 'line': # Log de linhas (pode ser desativado para desempenho) # print(f"[LINE] {frame.f_lineno} em {frame.f_code.co_filename}") pass elif event == 'return': self.is_tracing = True try: self.calls_count -= 1 finally: self.is_tracing = False elif event == 'exception': # Tratamento de exceções exc_type, exc_value, exc_traceback = arg print(f"[EXCEPTION] {exc_type.__name__}: {exc_value}") return self.trace_calls def analyze_bytecode(self, code_obj) -> Dict[str, list]: """ Análise do bytecode para a presença de instruções perigosas (sem bloqueio ativo). Útil para análise estática antes da execução. """ warnings = [] try: instructions = dis.get_instructions(code_obj) for instr in instructions: # Verificação de instruções perigosas if instr.opname == 'IMPORT_NAME': if instr.argrepr and any( mod in instr.argrepr.lower() for mod in ['os', 'subprocess', 'socket'] ): warnings.append({ 'type': 'dangerous_import', 'instruction': instr.opname, 'details': f'Importação: {instr.argrepr}', 'line': instr.starts_line }) elif instr.opname in ('CALL_FUNCTION', 'CALL_FUNCTION_KW'): # No bytecode, é difícil determinar o nome da função chamada # Você pode analisar as instruções LOAD_GLOBAL/LOAD_ATTR anteriores pass elif instr.opname == 'LOAD_GLOBAL': if instr.argrepr in self.forbidden_functions: warnings.append({ 'type': 'dangerous_function', 'instruction': instr.opname, 'details': f'Uso: {instr.argrepr}()', 'line': instr.starts_line }) except Exception as e: warnings.append({ 'type': 'analysis_error', 'details': str(e) }) return {'warnings': warnings, 'is_safe': len(warnings) == 0} @staticmethod def safe_executor(code: str, globals_dict: Optional[dict] = None): """ Execução segura do código em um ambiente isolado. """ # Criamos um namespace limpo safe_globals = { '__builtins__': { 'print': print, 'len': len, 'range': range, 'str': str, 'int': int, 'float': float, 'bool': bool, 'list': list, 'dict': dict, 'tuple': tuple, 'set': set, # Excluímos eval, exec, compile, import } } if globals_dict: safe_globals.update(globals_dict) # Criamos um monitor e ativamos o rastreamento monitor = ExecutionMonitor() old_trace = sys.gettrace() sys.settrace(monitor.trace_calls) try: # Executamos o código exec(code, safe_globals) except (RuntimeError, RecursionError) as e: print(f"[SEGURANÇA] Bloqueado: {e}") except Exception as e: print(f"[ERRO] Erro de execução: {e}") finally: # Restauramos o rastreador antigo sys.settrace(old_trace) class SandboxedFunction: """ Decorador para execução segura de funções com monitoramento. """ def __init__(self, max_depth: int = 500): self.max_depth = max_depth self.monitor = ExecutionMonitor() self.monitor.max_depth = max_depth def __call__(self, func): @wraps(func) def wrapper(*args, **kwargs): old_trace = sys.gettrace() sys.settrace(self.monitor.trace_calls) try: return func(*args, **kwargs) finally: sys.settrace(old_trace) return wrapper # Exemplos de uso if __name__ == "__main__": print("=== Exemplo 1: Execução segura ===") safe_code = """ print("Este é um código seguro") x = 10 + 20 print(f"Resultado: {x}") """ ExecutionMonitor.safe_executor(safe_code) print("\n=== Exemplo 2: Bloqueando eval ===") dangerous_code = """ print("Tentando usar eval...") eval("print('Hack!')") """ ExecutionMonitor.safe_executor(dangerous_code) print("\n=== Exemplo 3: Análise estática do bytecode ===") test_code = """ import os import json def bad_function(): eval("print('test')") return os.system('ls') """ monitor = ExecutionMonitor() # Compilamos o código em um objeto de código code_obj = compile(test_code, '<string>', 'exec') analysis = monitor.analyze_bytecode(code_obj) print(f"Código seguro: {analysis['is_safe']}") for warning in analysis['warnings']: print(f" Aviso: {warning['details']}") print("\n=== Exemplo 4: Decorador para uma função separada ===") @SandboxedFunction(max_depth=100) def my_secure_function(): print("Esta função está protegida") # A próxima linha causará uma exceção # eval("print('hack')") my_secure_function()
Este código implementa um sistema para monitorar e restringir a execução do código Python para criar um ambiente isolado. Seu objetivo principal é restringir a execução de código potencialmente perigoso, proibindo funções perigosas (eval, exec, compile, import), módulos (os, subprocess, socket, sys, shutil) e recursão muito profunda (proteção contra estouro de pilha).
Monitoramento de chamadas do sistema
O audit hook é o mecanismo mais poderoso para sistemas de produção. Ele intercepta todas as operações perigosas no nível do interpretador, como importação de módulos, compilação dinâmica de código, interação com o sistema de arquivos, solicitações de rede e outras ações potencialmente significativas. Ele também permite coletar informações sobre ações internas ou de outra forma indetectáveis do Python ou bibliotecas escritas em Python.
Aqui está um exemplo de como usar esse mecanismo:
pythonimport sys import os class AuditLogger: """Log de todas as chamadas do sistema""" def __init__(self, blocked_operations=None): self.blocked = blocked_operations or [ 'os.system', 'subprocess.Popen', 'open', 'exec', 'import' ] self.audit_log = [] def audit_hook(self, event, args): """Manipulador de eventos de auditoria""" # Logamos o evento log_entry = { 'event': event, 'args': str(args)[:100], # Limitamos o comprimento 'timestamp': __import__('time').time() } self.audit_log.append(log_entry) # Verificamos se precisamos bloquear for blocked in self.blocked: if blocked in event: print(f" BLOQUEIO: {event} com argumentos {args}") raise PermissionError(f"Operação {event} proibida pela política de segurança") print(f" {event}: {args}") def install(self): sys.addaudithook(self.audit_hook) def get_report(self): return self.audit_log # Instalação do audit-hook audit = AuditLogger(blocked_operations=['open', 'os.system']) audit.install() # Exemplo 1: Tentativa de abrir um arquivo try: f = open('/etc/passwd', 'r') except PermissionError as e: print(f"Bloqueado: {e}") # Exemplo 2: Tentativa de executar um comando do sistema try: os.system('whoami') except PermissionError as e: print(f"Bloqueado: {e}") # Exemplo 3: Operação segura x = 1 + 2 print(f"Resultado: {x}") # Saída do relatório print("\n=== RELATÓRIO DE AUDITORIA ===") for entry in audit.get_report(): print(f"{entry['event']}: {entry['args'][:50]}")
O resultado deste exemplo será o seguinte:
Para o exemplo 1:
BLOQUEIO: open com argumentos ('/etc/passwd', 'r')
Bloqueado: Operação open proibida pela política de segurança
Para o exemplo 2:
BLOQUEIO: os.system com argumentos ('whoami',)
Bloqueado: Operação os.system proibida pela política de segurança
=== RELATÓRIO DE AUDITORIA ===
open: ('/etc/passwd', 'r')
os.system: ('whoami',)
builtins.id: (1,)
Modificação de bytecode
E, finalmente, o vôo mais alto - alterar o código antes de sua execução.
pythonimport dis import types def inspect_bytecode(func): """Descompilamos a função em bytecode""" print(f"=== BYTECODE {func.__name__} ===") dis.dis(func) # Obtemos o bytecode bruto code = func.__code__ print(f"\nInstruções: {code.co_code}") print(f"Constantes: {code.co_consts}") print(f"Nomes de variáveis: {code.co_names}") print(f"Variáveis locais: {code.co_varnames}") def example(a, b): x = a + b return x * 2 inspect_bytecode(example)
Este código descompila uma função Python em bytecode e mostra sua estrutura interna. Aqui, primeiro importamos os módulos: dis (converte bytecode em uma forma legível por humanos) e types (para trabalhar com tipos). Em seguida, a função inspect_bytecode(func) realiza a inspeção do bytecode. Como resultado, obtemos o bytecode em um formato compreensível: números de instruções, as próprias instruções e seus argumentos.
=== BYTECODE example ===
2 0 LOAD_FAST 0 (a)
2 LOAD_FAST 1 (b)
4 BINARY_ADD
6 STORE_FAST 2 (x)
3 8 LOAD_FAST 2 (x)
10 LOAD_CONST 1 (2)
12 BINARY_MULTIPLY
14 RETURN_VALUE
Instruções: b'\x97\x00\x97\x01\x17\x00}\x02\x97\x02d\x01\x14\x00S\x00'
Constantes: (None, 2)
Nomes de variáveis: ()
Variáveis locais: ('a', 'b', 'x')
No final, vamos considerar um analisador de segurança estática do código Python, que examina o bytecode das funções para detectar padrões potencialmente perigosos. Detectaremos automaticamente código suspeito analisando o bytecode sem executá-lo, o que nos permite avaliar os riscos antes de iniciar o programa.
pythonimport dis import types from typing import List, Dict, Any class SecurityBytecodeInspector: """Analisa o bytecode para a presença de padrões perigosos""" DANGEROUS_PATTERNS = { 'IMPORT_NAME': 'importação de módulos (possivelmente perigosos)', 'LOAD_GLOBAL': 'uso de funções globais', 'CALL_FUNCTION': 'chamada de funções', 'LOAD_ATTR': 'acesso a atributos (possivelmente, chamada de métodos)', } HIGHLY_SUSPICIOUS = { 'eval': 'execução dinâmica de código', 'exec': 'execução dinâmica de código', '__import__': 'importação dinâmica', 'open': 'leitura/gravação de arquivos', 'compile': 'compilação de código', 'globals': 'acesso a variáveis globais', 'locals': 'acesso a variáveis locais', '__builtins__': 'acesso a funções embutidas', } def __init__(self): self.findings = [] def analyze_function(self, func) -> Dict[str, Any]: """Análise de segurança da função""" self.findings = [] print(f"\n ANÁLISE DE SEGURANÇA: {func.__name__}") print("=" * 50) # Obtemos o bytecode code = func.__code__ # Analisamos as constantes self._check_constants(code.co_consts) # Analisamos os nomes self._check_names(code.co_names) # Análise detalhada das instruções self._analyze_instructions(func) return { 'function': func.__name__, 'is_safe': len(self.findings) == 0, 'findings': self.findings, 'constants': code.co_consts, 'global_names': code.co_names, 'local_names': code.co_varnames, } def _check_constants(self, constants): """Verificação de constantes para strings perigosas""" for const in constants: if isinstance(const, str): # Verificação de strings suspeitas suspicious = ['__import__', 'eval', 'exec', 'system', 'subprocess'] for sus in suspicious: if sus in const: self.findings.append({ 'severity': 'HIGH', 'type': 'SUSPICIOUS_STRING', 'detail': f'String suspeita encontrada: "{const}"', 'suggestion': 'Possivelmente, o código está tentando executar operações dinâmicas', }) def _check_names(self, names): """Verificação de nomes usados""" for name in names: if name in self.HIGHLY_SUSPICIOUS: self.findings.append({ 'severity': 'CRITICAL', 'type': 'DANGEROUS_FUNCTION', 'detail': f'Função perigosa detectada: {name}()', 'suggestion': self.HIGHLY_SUSPICIOUS[name], }) def _analyze_instructions(self, func): """Análise detalhada das instruções de bytecode""" print("\n📋 BUSCA POR INSTRUÇÕES PERIGOSAS:") for instr in dis.get_instructions(func): # Verificamos as instruções de importação if instr.opname == 'IMPORT_NAME': self.findings.append({ 'severity': 'MEDIUM', 'type': 'IMPORT', 'detail': f'Importação do módulo: {instr.argrepr}', 'line': instr.starts_line, }) print(f" Importação encontrada: {instr.argrepr} (linha {instr.starts_line})") # Verificamos o carregamento de variáveis globais elif instr.opname == 'LOAD_GLOBAL' and instr.argrepr in self.HIGHLY_SUSPICIOUS: self.findings.append({ 'severity': 'HIGH', 'type': 'SUSPICIOUS_GLOBAL', 'detail': f'Carregamento de função global perigosa: {instr.argrepr}', 'line': instr.starts_line, }) print(f" CRÍTICO: {instr.argrepr}() (linha {instr.starts_line})") # Verificação de acesso a atributos (possivelmente, métodos de objetos) elif instr.opname == 'LOAD_ATTR': self.findings.append({ 'severity': 'LOW', 'type': 'ATTRIBUTE_ACCESS', 'detail': f'Acesso ao atributo: {instr.argrepr}', 'line': instr.starts_line, }) def generate_report(self) -> str: """Geração de um relatório de segurança""" if not self.findings: return " O código é seguro para execução" report = [" VULNERABILIDADES DETECTADAS:\n"] critical = [f for f in self.findings if f['severity'] == 'CRITICAL'] high = [f for f in self.findings if f['severity'] == 'HIGH'] medium = [f for f in self.findings if f['severity'] == 'MEDIUM'] if critical: report.append(f" CRÍTICO ({len(critical)}):") for finding in critical: report.append(f" - {finding['detail']}") report.append(f" → {finding['suggestion']}") if high: report.append(f"\n ALTO RISCO ({len(high)}):") for finding in high: report.append(f" - {finding['detail']}") if medium: report.append(f"\n RISCO MÉDIO ({len(medium)}):") for finding in medium: report.append(f" - {finding['detail']}") return "\n".join(report) # ===== EXEMPLOS DE USO ===== def safe_calculation(x,





