How to Automatically Execute Python Script When Files Change in a Directory (Linux/Mac)


2 views

When working with automated data pipelines, a common requirement is triggering actions when new files arrive in a specific directory. In your case, you're dealing with daily CSV file updates in /tmp/data_upload where old files get replaced, and need to execute test.py upon each update.

While cron jobs can check for file changes periodically, they're inefficient for real-time monitoring. Linux's inotify subsystem provides kernel-level file change notifications, making it ideal for this scenario. Here's why it's superior:

  • Immediate triggering (no polling delays)
  • Low system resource usage
  • Granular event types (modify, create, delete)

The easiest cross-platform solution uses Python's watchdog library. First install it:

pip install watchdog

Then create file_monitor.py:

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess

class CSVHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.src_path.endswith('.csv'):
            print(f"Detected change in: {event.src_path}")
            subprocess.run(['python3', '/path/to/test.py'])

if __name__ == "__main__":
    path = "/tmp/data_upload"
    event_handler = CSVHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

For systems without Python, use the inotify-tools package:

sudo apt-get install inotify-tools

Then create a monitoring script:

#!/bin/bash
while inotifywait -e modify -e create -e delete /tmp/data_upload/*.csv; do
    python3 /path/to/test.py
done

For production systems, consider these enhancements:

  • Debouncing: Add a delay to handle rapid successive writes
  • File Locking: Check if writes are complete before processing
  • Logging: Implement proper logging for debugging

Here's an improved version with debouncing:

import time
import subprocess
from threading import Timer

last_trigger = 0
debounce_sec = 5  # Wait 5 seconds after last change

def run_script():
    subprocess.run(['python3', '/path/to/test.py'])

class CSVHandler(FileSystemEventHandler):
    def on_modified(self, event):
        global last_trigger
        if event.src_path.endswith('.csv'):
            current_time = time.time()
            if current_time - last_trigger > debounce_sec:
                last_trigger = current_time
                Timer(debounce_sec, run_script).start()

For permanent monitoring, create a systemd service:

[Unit]
Description=CSV File Monitor Service

[Service]
ExecStart=/usr/bin/python3 /path/to/file_monitor.py
Restart=always
User=your_user

[Install]
WantedBy=multi-user.target

Save as /etc/systemd/system/csv-monitor.service then:

sudo systemctl daemon-reload
sudo systemctl enable csv-monitor
sudo systemctl start csv-monitor

When working with data pipelines, a common requirement is to trigger actions when new files arrive in a specific directory. In this case, we need to execute a Python script whenever CSV files are updated in /tmp/data_upload. The challenge is to detect these changes reliably without manual intervention.

While cron jobs can check for file changes periodically, they're inefficient for real-time monitoring. Linux's inotify API provides the perfect solution by notifying applications about filesystem events instantly. Here's why it's superior:

  • Immediate response to changes (no polling delay)
  • Low resource consumption
  • Granular event types (create, modify, delete, etc.)

The easiest way to implement this in Python is using the watchdog library, which provides a nice abstraction over inotify:

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import subprocess
import time

class CSVHandler(FileSystemEventHandler):
    def on_modified(self, event):
        if event.src_path.endswith('.csv'):
            print(f"Detected change in: {event.src_path}")
            subprocess.run(['python', 'test.py', event.src_path])

if __name__ == "__main__":
    path = "/tmp/data_upload"
    event_handler = CSVHandler()
    observer = Observer()
    observer.schedule(event_handler, path, recursive=False)
    observer.start()
    
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
    observer.join()

For those preferring shell scripts, here's a solution using inotifywait from the inotify-tools package:

#!/bin/bash
while inotifywait -e close_write /tmp/data_upload/*.csv; do
    python test.py
done

When deploying this in production environments, consider these enhancements:

  • Add error handling for cases where the Python script might fail
  • Implement logging to track file changes and script executions
  • Handle cases where multiple files might change simultaneously
  • Consider adding a small delay to avoid rapid consecutive executions

To prevent multiple rapid executions when files are being written, you can implement a debounce mechanism:

from threading import Timer

class DebouncedHandler(FileSystemEventHandler):
    def __init__(self):
        self.timer = None
    
    def on_modified(self, event):
        if self.timer:
            self.timer.cancel()
        
        self.timer = Timer(5.0, self.execute_script, [event.src_path])
        self.timer.start()
    
    def execute_script(self, path):
        if path.endswith('.csv'):
            subprocess.run(['python', 'test.py', path])