الانتقال إلى المحتوى الرئيسي
Ingestion API هي طريقة خفيفة لفهرسة المستندات. إذا كنت بحاجة إلى مزيد من التحكم، فكر في إنشاء موصل في لوحة الإدارة أو اتباع دليل إنشاء موصل.

متى تستخدم Ingestion API؟

استخدم Ingestion API من أجل:
  • مصادر غير مدعومة: إضافة محتوى من أنظمة ليس لديها موصلات مدمجة
  • بيانات تكميلية: إضافة سياق إضافي لبيانات الموصل الموجودة (مثل ملفات README لـ GitLab)
  • تحرير المستندات: تعديل المستندات في Gorbit عندما لا يمكن للمسؤول تحديث المستند الأصلي في المصدر
  • سير العمل البرمجي: دمج فهرسة المستندات في خطوط بياناتك الحالية

الدليل

انتقل إلى قسم الكود الكامل إذا كنت لا تريد الدليل التفصيلي. في هذا المثال، سنفهرس ملفات محلية باستخدام Ingestion API.
عنوان URL الأساسي لـ Ingestion API هو https://cloud.gorbit.app/api/gorbit-api/ingestion أو نطاقك الخاص.
1

جهّز موصلك

بما أننا نريد أن تظهر هذه الملفات في صفحة الموصلات، سنحتاج أولاً إلى إنشاء موصل. لهذا المثال، سننشئ موصل ملف مع ملف نصي وهمي.إذا كان لديك بالفعل موصل تريد ربط ملفات Ingestion به، انقر على الموصل وانسخ cc_pair_id من URL.https://cloud.gorbit.app/admin/connector/308 -> cc_pair_id هو 243
2

جهّز طلبك

import requests

API_BASE_URL = "https://cloud.gorbit.app/api" # أو نطاقك الخاص
API_KEY = "YOUR_KEY_HERE"

headers = {
  "Authorization": f"Bearer {API_KEY}",
  "Content-Type": "application/json"
}
3

معالجة مستنداتك إلى كائن JSON المناسب

إذا كنت تريد أن تظهر مستنداتك في صفحة الموصلات، يجب تحديد cc_pair_id في البيانات!
راجع المفاهيم الأساسية: المستندات للحصول على تفاصيل حول كائن IngestionDocument.
JSON
{
  "document": {
    "id": "my_unique_id_1",
    "semantic_identifier": "Gorbit FAQ v1",
    "sections": [
      { "text": "What is Gorbit?\nGorbit is..." }
    ],
    "source": "file",
    "metadata": {
      "category": "faq"
    }
  },
  "cc_pair_id": 243
}
{
  "document": {
    "id": "my_unique_id_1",
    "semantic_identifier": "Gorbit FAQ - العنوان المعروض في الواجهة",
    "title": "Gorbit FAQ v1 - العنوان للبحث",
    "sections": [
      {
        "text": "ما هو Gorbit؟\nGorbit هو...",
        "link": "https://docs.gorbit.app/faq#what-is-gorbit"
      },
      {
        "text": "كيف أبدأ؟\nللبدء...",
        "link": "https://docs.gorbit.app/faq#getting-started"
      },
      {
        "image_file_id": "uuid_generated_by_gorbit_file_store",
        "text": "متقدم - يجب رفع الصورة أولاً! POST /user/file/upload",
        "link": "https://docs.gorbit.app/faq#about"
      }
    ],
    "source": "file",
    "metadata": {
      "category": "faq",
      "tags": ["frequently-asked", "help"]
    },
    "doc_updated_at": "2025-09-19T08:20:00Z",
    "chunk_count": 15,
    "primary_owners": [
      {
        "display_name": "Alex Chen",
        "first_name": "Alex",
        "middle_initial": null,
        "last_name": "Chen",
        "email": "alex@gorbit.app"
      }
    ],
    "secondary_owners": [
      {
        "display_name": "Sarah Johnson",
        "first_name": "Sarah",
        "middle_initial": "M",
        "last_name": "Johnson",
        "email": "sarah@gorbit.app"
      }
    ],
    "from_ingestion_api": true
  },
  "cc_pair_id": 243
}
import os
from pathlib import Path
import PyPDF2
import pandas as pd
from docx import Document as DocxDocument

def read_files_from_folder(folder_path, cc_pair_id=243):
  """
  قراءة ملفات PDF و TXT و DOCX و CSV و XLSX وإنشاء بيانات ingestion دنيا
  """
  documents = []
  folder = Path(folder_path)

  # امتدادات الملفات المدعومة
  supported_extensions = {'.pdf', '.txt', '.docx', '.csv', '.xlsx'}

  for file_path in folder.rglob('*'):
    if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
      try:
        # استخراج النص بناءً على نوع الملف
        if file_path.suffix.lower() == '.txt':
          with open(file_path, 'r', encoding='utf-8') as f:
            content = f.read()

        elif file_path.suffix.lower() == '.pdf':
          with open(file_path, 'rb') as f:
            reader = PyPDF2.PdfReader(f)
            content = ""
            for page in reader.pages:
              content += page.extract_text() + "\n"

        elif file_path.suffix.lower() == '.docx':
          doc = DocxDocument(file_path)
          content = "\n".join([paragraph.text for paragraph in doc.paragraphs])

        elif file_path.suffix.lower() in ['.csv', '.xlsx']:
          df = pd.read_csv(file_path) if file_path.suffix.lower() == '.csv' else pd.read_excel(file_path)
          content = df.to_string(index=False)

        # تخطي الملفات الفارغة
        if not content.strip():
          continue

        # إنشاء بيانات دنيا صالحة
        document_payload = {
          "document": {
            "semantic_identifier": file_path.name,
            "sections": [
              {"text": content}
            ],
            "source": "file",
            "metadata": {
              "file_type": file_path.suffix.lower()
            }
          },
          "cc_pair_id": cc_pair_id
        }

        documents.append(document_payload)
        print(f"تمت المعالجة: {file_path.name}")

      except Exception as e:
        print(f"خطأ في قراءة {file_path.name}: {e}")
        continue

  return documents

# مثال على الاستخدام
LOCAL_FOLDER = "./documents"  # غيّر إلى مسار مجلدك
CC_PAIR_ID = 243  # استخدم cc_pair_id الفعلي الخاص بك

# قراءة ومعالجة الملفات
documents_to_ingest = read_files_from_folder(LOCAL_FOLDER, CC_PAIR_ID)
print(f"\nتم العثور على {len(documents_to_ingest)} مستند جاهز للفهرسة")
4

قم بإجراء الطلب إلى نقطة النهاية /ingestion

successful_ingestions = 0
failed_ingestions = 0

for i, document_data in enumerate(documents_to_ingest):
  print(f"جاري فهرسة المستند {i+1}/{len(documents_to_ingest)}: {document_data['document']['semantic_identifier']}")

  response = requests.post(
    f"{API_BASE_URL}/gorbit-api/ingestion",
    headers=headers,
    json=document_data
  )

  if response.status_code == 200:
    print(f"تمت الفهرسة بنجاح: {document_data['document']['semantic_identifier']}")
    successful_ingestions += 1
  else:
    print(f"فشلت الفهرسة {document_data['document']['semantic_identifier']}: {response.status_code}")
    print(f"   خطأ: {response.text}")
    failed_ingestions += 1

print(f"\nاكتملت الفهرسة: {successful_ingestions} ناجحة، {failed_ingestions} فاشلة")
سيعيد API استجابة نجاح إذا تم قبول المستند للمعالجة. تحدث الفهرسة الفعلية بشكل غير متزامن.

الكود الكامل

Python
import requests
import os
from pathlib import Path
import time
import PyPDF2
import pandas as pd
from docx import Document as DocxDocument

API_BASE_URL = "https://cloud.gorbit.app/api"  # أو نطاقك الخاص
API_KEY = "YOUR_KEY_HERE"
CC_PAIR_ID = 243  # استبدل بـ cc_pair_id الفعلي الخاص بك
LOCAL_FOLDER = "./documents"  # غيّر هذا إلى مسار مجلدك

headers = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

def read_files_from_folder(folder_path, cc_pair_id=243):
    """
    قراءة ملفات PDF و TXT و DOCX و CSV و XLSX وإنشاء بيانات ingestion دنيا
    """
    documents = []
    folder = Path(folder_path)

    if not folder.exists():
        print(f"المجلد غير موجود: {folder_path}")
        return documents

    supported_extensions = {'.pdf', '.txt', '.docx', '.csv', '.xlsx'}

    print(f"جاري قراءة الملفات من: {folder.absolute()}")

    for file_path in folder.rglob('*'):
        if file_path.is_file() and file_path.suffix.lower() in supported_extensions:
            try:
                # استخراج النص بناءً على نوع الملف
                if file_path.suffix.lower() == '.txt':
                    with open(file_path, 'r', encoding='utf-8') as f:
                        content = f.read()

                elif file_path.suffix.lower() == '.pdf':
                    with open(file_path, 'rb') as f:
                        reader = PyPDF2.PdfReader(f)
                        content = ""
                        for page in reader.pages:
                            content += page.extract_text() + "\n"

                elif file_path.suffix.lower() == '.docx':
                    doc = DocxDocument(file_path)
                    content = "\n".join([paragraph.text for paragraph in doc.paragraphs])

                elif file_path.suffix.lower() in ['.csv', '.xlsx']:
                    df = pd.read_csv(file_path) if file_path.suffix.lower() == '.csv' else pd.read_excel(file_path)
                    content = df.to_string(index=False)

                # تخطي الملفات الفارغة
                if not content.strip():
                    print(f"تخطي الملف الفارغ: {file_path.name}")
                    continue

                # إنشاء بيانات دنيا صالحة
                document_payload = {
                    "document": {
                        "semantic_identifier": file_path.name,
                        "sections": [
                            {"text": content}
                        ],
                        "source": "file",
                        "metadata": {
                            "file_type": file_path.suffix.lower()
                        }
                    },
                    "cc_pair_id": cc_pair_id
                }

                documents.append(document_payload)
                print(f"تمت المعالجة: {file_path.name}")

            except Exception as e:
                print(f"خطأ في قراءة {file_path.name}: {e}")
                continue
    return documents

# قراءة جميع المستندات من المجلد
print("جاري بدء عملية فهرسة الملفات...")
documents_to_ingest = read_files_from_folder(LOCAL_FOLDER, CC_PAIR_ID)

if not documents_to_ingest:
    print("لم يتم العثور على مستندات للفهرسة. تحقق من مسار المجلد وأنواع الملفات.")
    exit(1)

print(f"تم العثور على {len(documents_to_ingest)} مستند للفهرسة")

# إجراء طلبات الفهرسة لجميع المستندات
successful_ingestions = 0
failed_ingestions = 0

print("\nجاري بدء الفهرسة...")
for i, document_data in enumerate(documents_to_ingest):
    print(f"[{i+1}/{len(documents_to_ingest)}] جاري فهرسة: {document_data['document']['semantic_identifier']}")

    response = requests.post(
        f"{API_BASE_URL}/gorbit-api/ingestion",
        headers=headers,
        json=document_data
    )

    if response.status_code == 200:
        result = response.json()
        print(f"نجاح: {document_data['document']['semantic_identifier']}")
        if result.get('already_existed'):
            print("   تم تحديث المستند (كان موجوداً بالفعل)")
        else:
            print("   تم إنشاء مستند جديد")
        successful_ingestions += 1
    else:
        print(f"فشل: {document_data['document']['semantic_identifier']}")
        print(f"   الحالة: {response.status_code}")
        print(f"   الخطأ: {response.text}")
        failed_ingestions += 1

print(f"\nملخص الفهرسة:")
print(f"   ناجح: {successful_ingestions}")
print(f"   فاشل: {failed_ingestions}")
print(f"   الإجمالي: {len(documents_to_ingest)}")

مثال إضافي

هذا النص البرمجي يزحف إلى مجلد محدد للبحث عن ملفات JSON ويرسل المحتويات إلى Ingestion API.
Python
#!/usr/bin/env python3
"""
نص برمجي للفهرسة
"""

import argparse
import json
import logging
import sys
from pathlib import Path
from typing import Any, List, Optional

import requests
from requests.exceptions import RequestException
from tqdm import tqdm  # type: ignore

def setup_logging() -> None:
    """تكوين التسجيل للتطبيق."""
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        handlers=[logging.StreamHandler()],
    )

def parse_arguments() -> argparse.Namespace:
    """تحليل معاملات سطر الأوامر."""
    parser = argparse.ArgumentParser(
        description="الزحف إلى مجلد للبحث عن ملفات JSON وإرسالها إلى نقطة نهاية API."
    )
    parser.add_argument(
        "-d", "--directory",
        required=True,
        type=str,
        help="المجلد للزحف للبحث عن ملفات JSON"
    )
    parser.add_argument(
        "-t", "--tracking-file",
        required=True,
        type=str,
        help="ملف لتتبع معرفات المستندات المعالجة (واحد في كل سطر)"
    )
    parser.add_argument(
        "-u", "--url",
        type=str,
        help="عنوان URL لنقطة نهاية API لإرسال بيانات JSON إليه",
        default="http://localhost:8080/api"
    )
    parser.add_argument(
        "--headers",
        type=str,
        help="سلسلة JSON للعناوين لتضمينها في الطلب"
    )
    parser.add_argument(
        "--timeout",
        type=int,
        default=30,
        help="مهلة الطلب بالثواني (الافتراضي: 30)"
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        help="لا ترسل الطلبات فعلياً، فقط اطبع ما سيتم إرساله"
    )
    parser.add_argument(
        "--cc-pair-id",
        type=int,
        help="معرف زوج بيانات اعتماد الموصل لربط المستندات به"
    )

    return parser.parse_args()

def find_json_files(directory: str) -> List[Path]:
    """
    العثور على جميع ملفات JSON في المجلد المحدد والمجلدات الفرعية له.

    المعاملات:
        directory: المجلد للبحث فيه

    العائدات:
        قائمة بكائنات Path لجميع ملفات JSON التي تم العثور عليها
    """
    directory_path = Path(directory)
    if not directory_path.exists() or not directory_path.is_dir():
        raise ValueError(f"المسار المحدد '{directory}' ليس مجلداً صالحاً")

    json_files = list(directory_path.glob("**/*.json"))
    return json_files

def load_processed_ids(tracking_file: str) -> set:
    """
    تحميل معرفات المستندات المعالجة بالفعل من ملف التتبع.

    المعاملات:
        tracking_file: مسار ملف التتبع

    العائدات:
        مجموعة من معرفات المستندات التي تمت معالجتها بالفعل
    """
    tracking_path = Path(tracking_file)

    if not tracking_path.exists():
        print(f"ملف التتبع '{tracking_file}' غير موجود. جاري إنشاء ملف جديد.")
        tracking_path.parent.mkdir(parents=True, exist_ok=True)
        tracking_path.touch()
        return set()

    try:
        with open(tracking_path, "r", encoding="utf-8") as f:
            processed_ids = {line.strip() for line in f if line.strip()}
        return processed_ids
    except Exception as e:
        logging.getLogger(__name__).warning(f"خطأ في قراءة ملف التتبع: {e}. البدء بمجموعة فارغة.")
        return set()

def save_processed_id(tracking_file: str, document_id: str) -> None:
    """
    حفظ معرف مستند معالج في ملف التتبع.

    المعاملات:
        tracking_file: مسار ملف التتبع
        document_id: معرف المستند للحفظ
    """
    try:
        with open(tracking_file, "a", encoding="utf-8") as f:
            f.write(f"{document_id}\n")
    except Exception as e:
        logging.getLogger(__name__).error(f"خطأ في حفظ معرف المستند في ملف التتبع: {e}")

def transform_json_to_document_format(file_path: Path, cc_pair_id: int | None = None) -> dict:
    """
    تحويل ملف JSON إلى تنسيق المستند المطلوب من قبل API.

    المعاملات:
        file_path: مسار ملف JSON
        cc_pair_id: معرف زوج بيانات اعتماد الموصل اختياري

    العائدات:
        قاموس بالتنسيق المطلوب للمستند
    """
    logger = logging.getLogger(__name__)

    try:
        with open(file_path, "r", encoding="utf-8") as f:
            data = json.load(f)

        # إنشاء هيكل المستند
        document: dict[str, Any] = {
            "document": {
                "id": data["id"],
                "sections": [
                    {
                        "text": data["page_content"],
                        "link": data["source"]
                    }
                ],
                "semantic_identifier": data["title"],
                "metadata": {
                    "tags": data["metadata"]["tags"]
                },
                "source": "web"
            }
        }

        # إضافة cc_pair_id إذا تم توفيره
        if cc_pair_id is not None:
            document["cc_pair_id"] = cc_pair_id
            logger.debug(f"تمت إضافة cc_pair_id {cc_pair_id} للمستند من {file_path}")

        return document

    except Exception as e:
        logger.error(f"خطأ في تحويل {file_path}: {str(e)}")
        return {}

def send_json_to_api(
    file_path: Path,
    api_url: str,
    tracking_file: str,
    document_id: str,
    headers: Optional[dict] = None,
    timeout: int = 30,
    dry_run: bool = False,
    cc_pair_id: Optional[int] = None
) -> bool:
    """
    قراءة ملف JSON، وتحويله إلى التنسيق المطلوب، وإرساله إلى API.

    المعاملات:
        file_path: مسار ملف JSON
        api_url: عنوان URL لنقطة نهاية API
        tracking_file: مسار ملف التتبع
        document_id: معرف المستند للتتبع
        headers: عناوين اختيارية لتضمينها في الطلب
        timeout: مهلة الطلب بالثواني
        dry_run: إذا كان True، لا ترسل الطلب فعلياً
        cc_pair_id: معرف زوج بيانات اعتماد الموصل اختياري

    العائدات:
        True إذا نجح، False خلاف ذلك
    """
    logger = logging.getLogger(__name__)

    try:
        # تحويل JSON إلى التنسيق المطلوب
        transformed_data = transform_json_to_document_format(file_path, cc_pair_id)

        if not transformed_data:
            logger.error(f"فشل تحويل البيانات من {file_path}")
            return False

        if dry_run:
            logger.info(f"DRY RUN: سيتم إرسال البيانات المحولة من {file_path} إلى {api_url}")
            save_processed_id(tracking_file, document_id)
            return True

        response = requests.post(
            api_url,
            json=transformed_data,
            headers=headers or {},
            timeout=timeout
        )

        response.raise_for_status()
        logger.info(f"تم إرسال البيانات المحولة من {file_path} إلى API بنجاح")

        # حفظ معرف المستند في ملف التتبع عند المعالجة الناجحة
        save_processed_id(tracking_file, document_id)
        return True

    except json.JSONDecodeError:
        logger.error(f"فشل تحليل JSON من {file_path}")
    except RequestException as e:
        logger.error(f"فشل طلب API لـ {file_path}: {str(e)}")
    except Exception as e:
        logger.error(f"خطأ غير متوقع في معالجة {file_path}: {str(e)}")

    return False

def main() -> int:
    """الوظيفة الرئيسية لتشغيل النص البرمجي."""
    setup_logging()
    logger = logging.getLogger(__name__)

    try:
        args = parse_arguments()

        # تحليل العناوين إذا تم توفيرها
        headers = None
        if args.headers:
            try:
                headers = json.loads(args.headers)
            except json.JSONDecodeError:
                logger.error("فشل تحليل سلسلة JSON للعناوين")
                return 1

        # تسجيل cc_pair_id إذا تم توفيره
        if args.cc_pair_id is not None:
            logger.info(f"استخدام معرف زوج بيانات اعتماد الموصل: {args.cc_pair_id}")

        # تحميل معرفات المستندات المعالجة بالفعل
        processed_ids = load_processed_ids(args.tracking_file)
        logger.info(f"تم تحميل {len(processed_ids)} معرف مستند معالج بالفعل")

        # العثور على جميع ملفات JSON
        logger.info(f"البحث عن ملفات JSON في {args.directory}")
        json_files = find_json_files(args.directory)
        logger.info(f"تم العثور على {len(json_files)} ملف JSON")

        if not json_files:
            logger.warning("لم يتم العثور على ملفات JSON. الخروج.")
            return 0

        # تصفية الملفات المعالجة بالفعل
        unprocessed_files = []
        for file_path in json_files:
            try:
                # استخراج معرف المستند من ملف JSON
                with open(file_path, "r", encoding="utf-8") as f:
                    data = json.load(f)
                document_id = data.get("id")

                if not document_id:
                    logger.warning(f"لم يتم العثور على حقل 'id' في {file_path}، تخطي")
                    continue

                if document_id not in processed_ids:
                    unprocessed_files.append((file_path, document_id))
                else:
                    logger.debug(f"تخطي المستند المعالج بالفعل: {document_id}")
            except Exception as e:
                logger.warning(f"خطأ في قراءة معرف المستند من {file_path}: {e}، تخطي")
                continue

        logger.info(f"تم العثور على {len(unprocessed_files)} ملف غير معالج من أصل {len(json_files)} ملف إجمالي")

        if not unprocessed_files:
            logger.info("تمت معالجة جميع الملفات بالفعل. الخروج.")
            return 0

        # معالجة كل ملف JSON غير معالج
        success_count = 0
        failure_count = 0

        for file_path, document_id in tqdm(unprocessed_files, desc="معالجة الملفات"):
            success = send_json_to_api(
                file_path=file_path,
                api_url=args.url,
                tracking_file=args.tracking_file,
                document_id=document_id,
                headers=headers,
                timeout=args.timeout,
                dry_run=args.dry_run,
                cc_pair_id=args.cc_pair_id
            )

            if success:
                success_count += 1
            else:
                failure_count += 1

        # الإبلاغ عن النتائج
        logger.info(f"اكتملت المعالجة. نجاح: {success_count}، فشل: {failure_count}")

        return 0 if failure_count == 0 else 1

    except KeyboardInterrupt:
        logger.info("تمت مقاطعة العملية من قبل المستخدم")
        return 130
    except Exception as e:
        logger.exception(f"استثناء غير معالج: {str(e)}")
        return 1

if __name__ == "__main__":
    main()