from flask import Flask, request import subprocess import hashlib import hmac import os app = Flask(__name__) # Secret key used for HMAC signature verification secret_token = os.environ.get('SECRET_TOKEN') @app.route('/webhook', methods=['POST']) def webhook(): # Check if the received webhook is from Git if request.headers.get('X-GitHub-Event') == 'push': # Verify HMAC signature signature = request.headers.get('X-Hub-Signature-256') if verify_signature(request.data, signature, secret_token): return 'Invalid HMAC signature.', 400 # Pull the latest changes from Git subprocess.run(['git', 'pull']) # Restart the example_app subprocess.run(['systemctl', 'restart', 'restart_this_app.service']) return 'Success!', 200 else: return 'Invalid webhook event.', 400 # https://docs.github.com/en/enterprise-server@3.6/webhooks-and-events/webhooks/securing-your-webhooks#python-example def verify_signature(payload_body, signature_header, secret_token): """Verify that the payload was sent from GitHub by validating SHA256. Raise and return 403 if not authorized. Args: payload_body: original request body to verify (request.body()) secret_token: GitHub app webhook token (WEBHOOK_SECRET) signature_header: header received from GitHub (x-hub-signature-256) """ if not signature_header: return False hash_object = hmac.new(secret_token.encode('utf-8'), msg=payload_body, digestmod=hashlib.sha256) expected_signature = "sha256=" + hash_object.hexdigest() if not hmac.compare_digest(expected_signature, signature_header): return False return True if __name__ == '__main__': app.run(host='0.0.0.0', port=5001)