Server IP : 3.128.248.115 / Your IP : 18.221.192.248 Web Server : Apache/2.4.41 (Ubuntu) System : Linux ip-172-31-33-233 5.15.0-1037-aws #41~20.04.1-Ubuntu SMP Mon May 22 18:18:00 UTC 2023 x86_64 User : www-data ( 33) PHP Version : 7.4.28 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /bin/ |
Upload File : |
#!/usr/bin/python3 # The EC2 Spot hibernation agent. This agent does several things: # 1. Upon startup it checks for sufficient swap space to allow hibernate and fails # if it's present but there's not enough of it. # 2. If there's no swap space, it creates it and launches a background thread to # touch all of its blocks to make sure that EBS volumes are pre-warmed. # 3. It updates the offset of the swap file in the kernel using SNAPSHOT_SET_SWAP_AREA ioctl. # 4. It daemonizes and starts a polling thread to listen for instance termination notifications. # # This file is compatible both with Python 2 and Python 3 import argparse import array import atexit import ctypes as ctypes import fcntl import mmap import os import struct import sys import syslog import requests from subprocess import check_call, check_output from threading import Thread from math import ceil from time import sleep try: from urllib.request import urlopen, Request except ImportError: from urllib2 import urlopen, Request, HTTPError try: from ConfigParser import ConfigParser, NoSectionError, NoOptionError except: from configparser import ConfigParser, NoSectionError, NoOptionError GRUB_FILE = '/boot/grub/menu.lst' GRUB2_DIR = '/etc/default/grub.d' SWAP_RESERVED_SIZE = 16384 log_to_syslog = True log_to_stderr = True IMDS_BASEURL = 'http://169.254.169.254' IMDS_API_TOKEN_PATH = 'latest/api/token' IMDS_SPOT_ACTION_PATH = 'latest/meta-data/hibernation/configured' def log(message): if log_to_syslog: syslog.syslog(message) if log_to_stderr: sys.stderr.write("%s\n" % message) def fallocate(fl, size): try: _libc = ctypes.CDLL('libc.so.6') _fallocate = _libc.fallocate _fallocate.argtypes = [ctypes.c_int, ctypes.c_int, ctypes.c_ulong, ctypes.c_ulong] # (FD, mode, offset, len) res = _fallocate(fl.fileno(), 0, 0, size) if res != 0: raise Exception("Failed to perform fallocate(). Result: %d" % res) except Exception as e: log("Failed to call fallocate(), will use resize. Err: %s" % str(e)) fl.seek(size-1) fl.write(chr(0)) def mlockall(): log("Locking all the code in memory") try: _libc = ctypes.CDLL('libc.so.6') _mlockall = _libc.mlockall _mlockall.argtypes = [ctypes.c_int] _MCL_CURRENT = 1 _MCL_FUTURE = 2 _mlockall(_MCL_CURRENT | _MCL_FUTURE) except Exception as e: log("Failed to lock hibernation agent into RAM. Error: %s" % str(e)) def get_file_block_number(filename): with open(filename, 'r') as handle: buf = array.array('L', [0]) # from linux/fs.h FIBMAP = 0x01 result = fcntl.ioctl(handle.fileno(), FIBMAP, buf) if result < 0: raise Exception("Failed to get the file offset. Error=%d" % result) return buf[0] def get_swap_space(): # Format is (tab-separated): # Filename Type Size Used Priority # / swapfile file 536870908 0 - 1 with open('/proc/swaps') as swp: lines = swp.readlines()[1:] if not lines: return 0 return int(lines[0].split()[2]) * 1024 def get_partuuid(device): return check_output( ['lsblk', '-dno', 'PARTUUID', device]).decode('ascii').strip() def patch_grub_config(swap_device, offset, grub_file, grub2_dir): log("Updating GRUB to use the device %s with offset %d for resume" % (swap_device, offset)) if grub_file and os.path.exists(grub_file): lines = [] with open(grub_file) as fl: for ln in fl.readlines(): params = ln.split() if not params or params[0] != 'kernel': lines.append(ln) continue new_params = [] for param in params: if "resume_offset=" in param or "resume=" in param: continue new_params.append(param) if "no_console_suspend=" not in ln: new_params.append("no_console_suspend=1") new_params.append("resume_offset=%d" % offset) new_params.append("resume=%s" % swap_device) lines.append(" ".join(new_params)+"\n") with open(grub_file, "w") as fl: fl.write("".join(lines)) # Do GRUB2 update as well if grub2_dir and os.path.exists(grub2_dir): offset_file = os.path.join(grub2_dir, '99-set-swap.cfg') if swap_device.startswith("/dev"): swap_device = "PARTUUID=%s" % get_partuuid(swap_device) if not os.path.exists(offset_file): with open(offset_file, 'w') as fl: fl.write('GRUB_CMDLINE_LINUX_DEFAULT="$GRUB_CMDLINE_LINUX_DEFAULT no_console_suspend=1 ' 'resume_offset=%d resume=%s"\n' % (offset, swap_device)) check_call('/usr/sbin/update-grub2') log("GRUB configuration is updated") def update_kernel_swap_offset(grub_update): with open('/proc/swaps') as swp: lines = swp.readlines()[1:] if not lines: raise Exception("Swap file is not found") filename = lines[0].split()[0] log("Updating the kernel offset for the swapfile: %s" % filename) statbuf = os.stat(filename) dev = statbuf.st_dev offset = get_file_block_number(filename) if grub_update: # Find the mount point for the swap file ('df -P /swap') df_out = check_output(['df', '-P', filename]).decode('ascii') dev_str = df_out.split("\n")[1].split()[0] patch_grub_config(dev_str, offset, GRUB_FILE, GRUB2_DIR) else: log("Skipping GRUB configuration update") log("Setting swap device to %d with offset %d" % (dev, offset)) # Set the kernel swap offset, see https://www.kernel.org/doc/Documentation/power/userland-swsusp.txt # From linux/suspend_ioctls.h SNAPSHOT_SET_SWAP_AREA = 0x400C330D buf = struct.pack('LI', offset, dev) with open('/dev/snapshot', 'r') as snap: fcntl.ioctl(snap, SNAPSHOT_SET_SWAP_AREA, buf) log("Done updating the swap offset") class SwapInitializer(object): def __init__(self, filename, swap_size, touch_swap, mkswap, swapon): self.filename = filename self.swap_size = swap_size self.need_to_hurry = False self.mkswap = mkswap self.swapon = swapon self.touch_swap = touch_swap def do_allocate(self): log("Allocating %d bytes in %s" % (self.swap_size, self.filename)) with open(self.filename, 'w+') as fl: fallocate(fl, self.swap_size) os.chmod(self.filename, 0o600) def init_swap(self): """ Initialize the swap using direct IO to avoid polluting the page cache """ try: cur_swap_size = os.stat(self.filename).st_size if cur_swap_size >= self.swap_size: log("Swap file size (%d bytes) is already large enough" % cur_swap_size) return except OSError: pass self.do_allocate() if not self.touch_swap: log("Swap pre-heating is skipped, the swap blocks won't be touched during " "initialization to ensure they are ready") return written = 0 log("Opening %s for direct IO" % self.filename) fd = os.open(self.filename, os.O_RDWR | os.O_DIRECT | os.O_SYNC | os.O_DSYNC) if fd < 0: raise Exception("Failed to initialize the swap. Err: %s" % os.strerror(os.errno)) filler_block = None try: # Create a filler block that is correctly aligned for direct IO filler_block = mmap.mmap(-1, 1024 * 1024) # We're using 'b' to avoid optimizations that might happen for zero-filled pages filler_block.write(b'b' * 1024 * 1024) log("Touching all blocks in %s" % self.filename) while written < self.swap_size and not self.need_to_hurry: res = os.write(fd, filler_block) if res <= 0: raise Exception("Failed to touch a block. Err: %s" % os.strerror(os.errno)) written += res finally: os.close(fd) if filler_block: filler_block.close() log("Swap file %s is ready" % self.filename) def turn_on_swap(self): # Do mkswap try: mkswap = self.mkswap.format(swapfile=self.filename) log("Running: %s" % mkswap) check_call(mkswap, shell=True) swapon = self.swapon.format(swapfile=self.filename) log("Running: %s" % swapon) check_call(swapon, shell=True) except Exception as e: log("Failed to initialize swap, reason: %s" % str(e)) class BackgroundInitializerRunner(object): def __init__(self, swapper, update_grub): self.swapper = swapper self.thread = None self.error = None self.update_grub = update_grub def start_init(self): self.thread = Thread(target=self.do_async_init, name="SwapInitializer") self.thread.setDaemon(True) self.thread.start() def check_finished(self): if self.thread is not None: self.thread.join(timeout=0) if self.thread.isAlive(): return False self.thread = None log("Background swap initialization thread is complete.") if self.error is not None: raise self.error return True def force_completion(self): log("We're out of time, stopping the background swap initialization.") self.swapper.need_to_hurry = True self.thread.join() log("Background swap initialization thread has stopped.") self.thread = None if self.error is not None: raise self.error def do_async_init(self): try: self.swapper.init_swap() self.swapper.turn_on_swap() update_kernel_swap_offset(self.update_grub) except Exception as ex: log("Failed to initialize swap, reason: %s" % str(ex)) self.error = ex class ItnPoller(object): def __init__(self, url, hibernate_cmd, initializer): self.url = url self.hibernate_cmd = hibernate_cmd self.initializer = initializer def poll_loop(self): log("Starting the hibernation polling loop") while True: self.run_loop_iteration() sleep(1) def run_loop_iteration(self): if self.initializer and self.initializer.check_finished(): self.initializer = None if self.poll_for_termination(): if self.initializer: self.initializer.force_completion() self.initializer = None self.do_hibernate() def poll_for_termination(self): # noinspection PyBroadException response1 = None response2 = None try: request1 = Request("http://169.254.169.254/latest/api/token") request1.add_header('X-aws-ec2-metadata-token-ttl-seconds', '21600') request1.get_method = lambda:"PUT" response1 = urlopen(request1) token = response1.read() request2 = Request(self.url) request2.add_header('X-aws-ec2-metadata-token', token) response2 = urlopen(request2) res = response2.read() return b"hibernate" in res except: return False finally: if response1: response1.close() if response2: response2.close() def do_hibernate(self): log("Attempting to hibernate") try: check_call(self.hibernate_cmd, shell=True) except Exception as e: log("Failed to hibernate, reason: %s" % str(e)) # We're not guaranteed to be stopped immediately after the hibernate # command fires. So wait a little bit to avoid triggering ourselves twice sleep(2) def daemonize(pidfile): """ Convert the process into a daemon, doing the usual Unix magic """ try: pid = os.fork() if pid > 0: # Exit from first parent sys.exit(0) except OSError as e: log("Fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # Decouple from parent environment os.chdir("/") os.setsid() os.umask(0) # Second fork try: pid = os.fork() if pid > 0: # Exit from second parent sys.exit(0) except OSError as e: log("Fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) sys.exit(1) # Write the PID file pid = str(os.getpid()) with open(pidfile, "w+") as fl: fl.write("%s\n" % pid) atexit.register(lambda: os.unlink(pidfile)) # Redirect standard file descriptors to null to avoid blocking nul = open('/dev/null', 'a+') os.dup2(nul.fileno(), sys.stdin.fileno()) os.dup2(nul.fileno(), sys.stdout.fileno()) os.dup2(nul.fileno(), sys.stderr.fileno()) def detect_hibernate_cmd(): if os.path.exists("/run/systemd/system"): return "/bin/systemctl hibernate" else: return "/usr/sbin/pm-hibernate" class Config(object): def __init__(self, config, args): def get(section, name): try: return config.get(section, name) except NoSectionError: return None except NoOptionError: return None def get_int(section, name): v = get(section, name) if v is None: return None return int(v) self.lock_in_ram = self.merge( self.to_bool(get('core', 'lock-in-ram')), self.to_bool(args.lock_in_ram), True) self.log_to_syslog = self.merge( self.to_bool(get('core', 'log-to-syslog')), self.to_bool(args.log_to_syslog), True) self.log_to_stderr = self.merge( self.to_bool(get('core', 'log-to-stderr')), self.to_bool(args.log_to_stderr), True) self.touch_swap = self.merge( self.to_bool(get('core', 'touch-swap')), self.to_bool(args.touch_swap), True) self.grub_update = self.merge( self.to_bool(get('core', 'grub-update')), self.to_bool(args.grub_update), True) self.ephemeral_check = self.merge( self.to_bool(get('core', 'check-ephemeral-volumes')), self.to_bool(args.check_ephemeral_volumes), True) self.freeze_timeout_curve = self.merge(get('core', 'freeze-timeout-curve'), args.freeze_timeout_curve, '0-8:20,8-16:40,16-64:60,64-128:150,128-256:200,256-:400') self.swap_percentage = self.merge( get_int('swap', 'percentage-of-ram'), args.swap_ram_percentage, 100) self.swap_mb = self.merge( get_int('swap', 'target-size-mb'), args.swap_target_size_mb, 4000) self.mkswap = self.merge(get('swap', 'mkswap'), args.mkswap, '/sbin/mkswap {swapfile}') self.swapon = self.merge(get('swap', 'swapon'), args.swapon, '/sbin/swapon {swapfile}') self.swapfile = self.merge(get('swap', 'swapfile'), args.swapfile, '/swap') self.hibernate = self.merge( get('pm-utils', 'hibernate-command'), args.hibernate, detect_hibernate_cmd()) self.url = self.merge( get('notification', 'monitored-url'), args.monitored_url, 'http://169.254.169.254/latest/meta-data/spot/instance-action') def merge(self, cf_value, arg_value, def_val): if arg_value is not None: return arg_value if cf_value is not None: return cf_value return def_val def to_bool(self, bool_str): """Parse the string and return the boolean value encoded or raise an exception""" if bool_str is None: return None if bool_str.lower() in ['true', 't', '1']: return True elif bool_str.lower() in ['false', 'f', '0']: return False # if here we couldn't parse it raise ValueError("%s is not recognized as a boolean value" % bool_str) def __str__(self): return str(self.__dict__) def get_pm_freeze_timeout(freeze_timeout_curve, ram_bytes): if not freeze_timeout_curve: return None ram_gb = ceil(ram_bytes / (1024.0*1024.0*1024.0)) try: for curve_part in freeze_timeout_curve.split(","): ram_sizes, timeout = curve_part.split(":") sizes_parts = ram_sizes.split("-") if len(sizes_parts) == 2 and sizes_parts[1] and sizes_parts[0]: ram_min = int(sizes_parts[0]) ram_max = int(sizes_parts[1]) elif len(sizes_parts) == 1 and sizes_parts[0] or \ len(sizes_parts) == 2 and not sizes_parts[1]: ram_min = int(sizes_parts[0]) ram_max = None else: raise Exception("can't parse %s, expected <int>-[<int>]" % ram_sizes) if (ram_min <= ram_gb and ram_max is None) or (ram_min <= ram_gb < ram_max): return int(timeout) except Exception as ex: log("Failed to parse the freeze timeout curve, error: %s. " "The pm_freeze_timeout will not be adjusted." % str(ex)) return None log("Failed to find a fitting PM freeze timeout curve segment " "for %d GB of RAM. The pm_freeze_timeout will not be adjusted." % ram_gb) return None def adjust_pm_timeout(timeout): try: with open('/sys/power/pm_freeze_timeout') as fl: cur_timeout = int(fl.read()) / 1000 if cur_timeout >= timeout: log("Info current pm_freeze_timeout (%d seconds) is greater than or equal " "to the requested (%d seconds) timeout, doing nothing" % (cur_timeout, timeout)) else: with open('/sys/power/pm_freeze_timeout', 'w') as fl: fl.write("%d" % (timeout*1000)) log("Adjusted pm_freeze_timeout to %d from %d" % (timeout, cur_timeout)) except Exception as e: log("Failed to adjust pm_freeze_timeout to %d. Error: %s" % (timeout, str(e))) exit(1) def get_imds_token(seconds=21600): """ Get a token to access instance metadata. """ log("Requesting new IMDSv2 token.") request_header = {'X-aws-ec2-metadata-token-ttl-seconds': '{}'.format(seconds)} token_url = '{}/{}'.format(IMDS_BASEURL, IMDS_API_TOKEN_PATH) response = requests.put(token_url, headers=request_header) response.close() if response.status_code != requests.codes.ok: return None return response.text def hibernation_enabled(): """Returns a boolean indicating whether hibernation-option.configured is enabled or not.""" imds_token = get_imds_token() if imds_token is None: log("IMDS_V2 http endpoint is disabled") # IMDS http endpoint is disabled return False request_header = {'X-aws-ec2-metadata-token': imds_token} response = requests.get("{}/{}".format(IMDS_BASEURL, IMDS_SPOT_ACTION_PATH), headers=request_header) response.close() if response.status_code != requests.codes.ok or response.text.lower() == "false": return False log("Hibernation Configured Flag found") return True def main(): # Parse arguments parser = argparse.ArgumentParser(description="An EC2 agent that watches for instance stop " "notifications and initiates hibernation") parser.add_argument('-c', '--config', help='Configuration file to use', type=str) parser.add_argument('-i', '--pidfile', help='The file to write PID to', type=str, default='/run/hibagent') parser.add_argument('-f', '--foreground', help="Run in foreground, don't daemonize", action="store_true") parser.add_argument("-l", "--lock-in-ram", help='Lock the code in RAM', type=str) parser.add_argument("-syslog", "--log-to-syslog", help='Log to syslog', type=str) parser.add_argument("-stderr", "--log-to-stderr", help='Log to stderr', type=str) parser.add_argument("-touch", "--touch-swap", help='Do swap initialization', type=str) parser.add_argument("-grub", "--grub-update", help='Update GRUB config with resume offset', type=str) parser.add_argument("-e", "--check-ephemeral-volumes", help='Check if ephemeral volumes are mounted', type=str) parser.add_argument("-u", "--freeze-timeout-curve", help='The pm_freeze_timeout curve (by RAM size)', type=str) parser.add_argument("-p", "--swap-ram-percentage", help='The target swap size as a percentage of RAM', type=int) parser.add_argument("-s", "--swap-target-size-mb", help='The target swap size in megabytes', type=int) parser.add_argument("-w", "--swapfile", help="Swap file name", type=str) parser.add_argument('--mkswap', help='The command line utility to set up swap', type=str) parser.add_argument('--swapon', help='The command line utility to turn on swap', type=str) parser.add_argument('--hibernate', help='The command line utility to initiate hibernation', type=str) parser.add_argument('--monitored-url', help='The URL to monitor for notifications', type=str) args = parser.parse_args() config_file = ConfigParser() if args.config: config_file.read(args.config) config = Config(config_file, args) global log_to_syslog, log_to_stderr log_to_stderr = config.log_to_stderr log_to_syslog = config.log_to_syslog log("Effective config: %s" % config) # Let's first check if we need to kill the Spot Hibernate Agent if hibernation_enabled(): log("Spot Instance Launch has enabled Hibernation Configured Flag. hibagent exiting!!") exit(0) target_swap_size = config.swap_mb * 1024 * 1024 ram_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES') swap_percentage_size = ram_bytes * config.swap_percentage // 100 if swap_percentage_size > target_swap_size: target_swap_size = int(swap_percentage_size) log("Will check if swap is at least: %d megabytes" % (target_swap_size // (1024*1024))) timeout = get_pm_freeze_timeout(config.freeze_timeout_curve, ram_bytes) if timeout: adjust_pm_timeout(timeout) # Validate the swap configuration cur_swap = get_swap_space() bi = None if cur_swap >= target_swap_size - SWAP_RESERVED_SIZE: log("There's sufficient swap available (have %d, need %d)" % (cur_swap, target_swap_size)) update_kernel_swap_offset(config.grub_update) elif cur_swap > 0: log("There's not enough swap space available (have %d, need %d), exiting" % (cur_swap, target_swap_size)) exit(1) else: log("No swap is present, will create and initialize it") # We need to create swap, but first validate that we have enough free space swap_dev = os.path.dirname(config.swapfile) st = os.statvfs(swap_dev) free_bytes = st.f_bavail * st.f_frsize free_space_needed = target_swap_size + 10 * 1024 * 1024 if free_space_needed >= free_bytes: log("There's not enough space (%d present, %d needed) on the swap device: %s" % ( free_bytes, free_space_needed, swap_dev)) exit(1) log("There's enough space (%d present, %d needed) on the swap device: %s" % ( free_bytes, free_space_needed, swap_dev)) sw = SwapInitializer(config.swapfile, target_swap_size, config.touch_swap, config.mkswap, config.swapon) bi = BackgroundInitializerRunner(sw, config.grub_update) # Daemonize now! The parent process will not return from this method if not args.foreground: log("Initial checks are finished, daemonizing and writing PID into %s" % args.pidfile) daemonize(args.pidfile) else: log("Initial checks are finished, will run in foreground now") poller = ItnPoller(config.url, config.hibernate, bi) if config.lock_in_ram: mlockall() # This loop will now be running inside the child if bi: bi.start_init() poller.poll_loop() if __name__ == '__main__': main()